import asyncio import json import logging from typing import Dict, List, Optional import websockets import plotly.graph_objects as go from plotly.subplots import make_subplots import dash from dash import html, dcc from dash.dependencies import Input, Output import pandas as pd import numpy as np from collections import deque import time from threading import Thread import requests import os from datetime import datetime, timedelta import pytz import tzlocal import threading import random import dash_bootstrap_components as dbc import uuid class BinanceHistoricalData: """ Class for fetching historical price data from Binance. """ def __init__(self): self.base_url = "https://api.binance.com/api/v3" self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache') if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) # Timestamp of last data update self.last_update = None def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000): """ Fetch historical candles from Binance API. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h) limit (int): Number of candles to fetch Returns: pd.DataFrame: DataFrame with OHLCV data """ # Convert interval_seconds to Binance interval format interval_map = { 1: "1s", 60: "1m", 300: "5m", 900: "15m", 1800: "30m", 3600: "1h", 14400: "4h", 86400: "1d" } interval = interval_map.get(interval_seconds, "1h") # Format symbol for Binance API (remove slash) formatted_symbol = symbol.replace("/", "").lower() # Check if we have cached data first cache_file = self._get_cache_filename(formatted_symbol, interval) cached_data = self._load_from_cache(formatted_symbol, interval) # If we have cached data that's recent enough, use it if cached_data is not None and len(cached_data) >= limit: cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60 if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old logger.info(f"Using cached historical data for {symbol} ({interval})") return cached_data try: # Build URL for klines endpoint url = f"{self.base_url}/klines" params = { "symbol": formatted_symbol, "interval": interval, "limit": limit } # Make the request response = requests.get(url, params=params) response.raise_for_status() # Parse the response data = response.json() # Create dataframe df = pd.DataFrame(data, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ]) # Convert timestamp to datetime df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") # Convert price columns to float for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) # Sort by timestamp df = df.sort_values("timestamp") # Save to cache for future use self._save_to_cache(df, formatted_symbol, interval) self.last_update = datetime.now() logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})") return df except Exception as e: logger.error(f"Error fetching historical data from Binance: {str(e)}") # Return cached data if we have it, even if it's not enough if cached_data is not None: logger.warning(f"Using cached data instead (may be incomplete)") return cached_data # Return empty dataframe on error return pd.DataFrame() def _get_cache_filename(self, symbol, interval): """Get filename for cache file""" return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv") def _load_from_cache(self, symbol, interval): """Load candles from cache file""" try: cache_file = self._get_cache_filename(symbol, interval) if os.path.exists(cache_file): # For 1s interval, check if the cache is recent (less than 10 minutes old) if interval == "1s" or interval == 1: file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) time_diff = (datetime.now() - file_mod_time).total_seconds() / 60 if time_diff > 10: logger.info("1s cache is older than 10 minutes, skipping load") return None logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)") df = pd.read_csv(cache_file) df["timestamp"] = pd.to_datetime(df["timestamp"]) logger.info(f"Loaded {len(df)} candles from cache: {cache_file}") return df except Exception as e: logger.error(f"Error loading cached data: {str(e)}") return None def _save_to_cache(self, df, symbol, interval): """Save candles to cache file""" try: cache_file = self._get_cache_filename(symbol, interval) df.to_csv(cache_file, index=False) logger.info(f"Saved {len(df)} candles to cache: {cache_file}") return True except Exception as e: logger.error(f"Error saving to cache: {str(e)}") return False def get_recent_trades(self, symbol, limit=1000): """Get recent trades for a symbol""" formatted_symbol = symbol.replace("/", "") try: url = f"{self.base_url}/trades" params = { "symbol": formatted_symbol, "limit": limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Create dataframe df = pd.DataFrame(data) df["time"] = pd.to_datetime(df["time"], unit="ms") df["price"] = df["price"].astype(float) df["qty"] = df["qty"].astype(float) return df except Exception as e: logger.error(f"Error fetching recent trades: {str(e)}") return pd.DataFrame() # Configure logging with more detailed format logging.basicConfig( level=logging.INFO, # Changed to DEBUG for more detailed logs format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('realtime_chart.log') ] ) logger = logging.getLogger(__name__) # Neural Network integration (conditional import) NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1' nn_orchestrator = None nn_inference_thread = None if NN_ENABLED: try: import sys # Add project root to sys.path if needed project_root = os.path.dirname(os.path.abspath(__file__)) if project_root not in sys.path: sys.path.append(project_root) from NN.main import NeuralNetworkOrchestrator logger.info("Neural Network module enabled") except ImportError as e: logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}") NN_ENABLED = False # NN utility functions def setup_neural_network(): """Initialize the neural network components if enabled""" global nn_orchestrator, NN_ENABLED if not NN_ENABLED: return False try: # Get configuration from environment variables or use defaults symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT') timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',') output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL # Configure the orchestrator config = { 'symbol': symbol, 'timeframes': timeframes, 'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')), 'n_features': 5, # OHLCV 'output_size': output_size, 'model_dir': 'NN/models/saved', 'data_dir': 'NN/data' } # Initialize the orchestrator logger.info(f"Initializing Neural Network Orchestrator with config: {config}") nn_orchestrator = NeuralNetworkOrchestrator(config) # Start inference thread if enabled inference_interval = int(os.environ.get('NN_INFERENCE_INTERVAL', '60')) if inference_interval > 0: start_nn_inference_thread(inference_interval) return True except Exception as e: logger.error(f"Error setting up neural network: {str(e)}") import traceback logger.error(traceback.format_exc()) NN_ENABLED = False return False def start_nn_inference_thread(interval_seconds): """Start a background thread to periodically run inference with the neural network""" global nn_inference_thread if not NN_ENABLED or nn_orchestrator is None: logger.warning("Cannot start inference thread - Neural Network not enabled or initialized") return False def inference_worker(): """Worker function for the inference thread""" model_type = os.environ.get('NN_MODEL_TYPE', 'cnn') timeframe = os.environ.get('NN_TIMEFRAME', '1h') logger.info(f"Starting neural network inference thread with {interval_seconds}s interval") logger.info(f"Using model type: {model_type}, timeframe: {timeframe}") # Wait a bit for charts to initialize time.sleep(5) # Track active charts active_charts = [] while True: try: # Find active charts if we don't have them yet if not active_charts and 'charts' in globals(): active_charts = globals()['charts'] logger.info(f"Found {len(active_charts)} active charts for NN signals") # Run inference result = nn_orchestrator.run_inference_pipeline( model_type=model_type, timeframe=timeframe ) if result: # Log the result logger.info(f"Neural network inference result: {result}") # Add signal to charts if active_charts: try: if 'action' in result: action = result['action'] timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00')) # Get probability if available probability = None if 'probability' in result: probability = result['probability'] elif 'probabilities' in result: probability = result['probabilities'].get(action, None) # Add signal to each chart for chart in active_charts: if hasattr(chart, 'add_nn_signal'): chart.add_nn_signal(action, timestamp, probability) except Exception as e: logger.error(f"Error adding NN signal to chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Sleep for the interval time.sleep(interval_seconds) except Exception as e: logger.error(f"Error in inference thread: {str(e)}") import traceback logger.error(traceback.format_exc()) time.sleep(5) # Wait a bit before retrying # Create and start the thread nn_inference_thread = threading.Thread(target=inference_worker, daemon=True) nn_inference_thread.start() return True # Try to get local timezone, default to Sofia/EET if not available try: local_timezone = tzlocal.get_localzone() # Get timezone name safely try: tz_name = str(local_timezone) # Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone if hasattr(local_timezone, 'zone'): tz_name = local_timezone.zone elif hasattr(local_timezone, 'key'): tz_name = local_timezone.key else: tz_name = str(local_timezone) except: tz_name = "Local" logger.info(f"Detected local timezone: {local_timezone} ({tz_name})") except Exception as e: logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET") local_timezone = pytz.timezone('Europe/Sofia') tz_name = "Europe/Sofia" def convert_to_local_time(timestamp): """Convert timestamp to local timezone""" try: if isinstance(timestamp, pd.Timestamp): dt = timestamp.to_pydatetime() elif isinstance(timestamp, np.datetime64): dt = pd.Timestamp(timestamp).to_pydatetime() elif isinstance(timestamp, str): dt = pd.to_datetime(timestamp).to_pydatetime() else: dt = timestamp # If datetime is naive (no timezone), assume it's UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=pytz.UTC) # Convert to local timezone local_dt = dt.astimezone(local_timezone) return local_dt except Exception as e: logger.error(f"Error converting timestamp to local time: {str(e)}") return timestamp class TickStorage: """Simple storage for ticks and candles""" def __init__(self): self.ticks = [] self.candles = {} # Organized by timeframe key (e.g., '1s', '1m', '1h') self.latest_price = None self.last_update = datetime.now() # Initialize empty candle arrays for different timeframes for timeframe in ['1s', '5s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d']: self.candles[timeframe] = [] def add_tick(self, price, volume=0, timestamp=None): """Add a tick to the storage""" if timestamp is None: timestamp = datetime.now() # Ensure timestamp is datetime if isinstance(timestamp, (int, float)): timestamp = datetime.fromtimestamp(timestamp) tick = { 'price': price, 'volume': volume, 'timestamp': timestamp } self.ticks.append(tick) self.latest_price = price self.last_update = datetime.now() # Keep only last 10000 ticks if len(self.ticks) > 10000: self.ticks = self.ticks[-10000:] # Update all timeframe candles self._update_all_candles(tick) # Verify 1s candles are being updated - periodically log for monitoring if len(self.ticks) % 100 == 0: logger.debug(f"Tick count: {len(self.ticks)}, 1s candles count: {len(self.candles['1s'])}") return tick def get_latest_price(self): """Get the latest price""" return self.latest_price def _update_all_candles(self, tick): """Update all candle timeframes with the new tick""" # Define intervals in seconds intervals = { '1s': 1, '5s': 5, '15s': 15, '30s': 30, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } # Update each timeframe for interval_key, seconds in intervals.items(): self._update_candles_for_timeframe(interval_key, seconds, tick) def _update_candles_for_timeframe(self, interval_key, interval_seconds, tick): """Update candles for a specific timeframe""" # Get or create the current candle current_candle = self._get_current_candle(interval_key, tick['timestamp'], interval_seconds) # If this is a new candle, initialize it with the tick price if current_candle['open'] == 0.0: current_candle['open'] = tick['price'] current_candle['high'] = tick['price'] current_candle['low'] = tick['price'] # Update the candle with the new tick if current_candle['high'] < tick['price']: current_candle['high'] = tick['price'] if current_candle['low'] > tick['price'] or current_candle['low'] == 0: current_candle['low'] = tick['price'] current_candle['close'] = tick['price'] current_candle['volume'] += tick['volume'] # For 1s timeframe specifically, log a debug message to confirm updates are occurring if interval_key == '1s': logger.debug(f"Updated 1s candle at {current_candle['timestamp']} with price {tick['price']}") # Limit the number of candles to keep for each timeframe # Keep more candles for shorter timeframes, fewer for longer ones max_candles = { '1s': 1000, # ~16 minutes of 1s data '5s': 1000, # ~83 minutes of 5s data '15s': 800, # ~3.3 hours of 15s data '30s': 600, # ~5 hours of 30s data '1m': 500, # ~8 hours of 1m data '5m': 300, # ~25 hours of 5m data '15m': 200, # ~50 hours of 15m data '30m': 150, # ~3 days of 30m data '1h': 168, # 7 days of 1h data '4h': 90, # ~15 days of 4h data '1d': 365 # 1 year of daily data } # Trim candles list if needed if len(self.candles[interval_key]) > max_candles.get(interval_key, 500): self.candles[interval_key] = self.candles[interval_key][-max_candles.get(interval_key, 500):] def _get_current_candle(self, interval_key, timestamp, interval_seconds): """Get the current candle for the given interval, or create a new one""" # Calculate the candle start time based on the timeframe candle_start = self._calculate_candle_start(timestamp, interval_seconds) # Check if we already have a candle for this time for candle in self.candles[interval_key]: if candle['timestamp'] == candle_start: return candle # Create a new candle candle = { 'timestamp': candle_start, 'open': 0.0, 'high': 0.0, 'low': float('inf'), 'close': 0.0, 'volume': 0 } # Log when we're creating a new 1s candle for debugging if interval_key == '1s': logger.debug(f"Creating new 1s candle at {candle_start}") self.candles[interval_key].append(candle) return candle def _calculate_candle_start(self, timestamp, interval_seconds): """Calculate the start time of a candle based on interval""" # Seconds timeframes (1s, 5s, 15s, 30s) if interval_seconds < 60: # Round down to the nearest multiple of interval_seconds seconds_since_hour = timestamp.second + timestamp.minute * 60 candle_seconds = (seconds_since_hour // interval_seconds) * interval_seconds candle_minute = candle_seconds // 60 candle_second = candle_seconds % 60 return timestamp.replace( microsecond=0, second=candle_second, minute=candle_minute ) # Minute timeframes (1m, 5m, 15m, 30m) elif interval_seconds < 3600: minutes_in_interval = interval_seconds // 60 return timestamp.replace( microsecond=0, second=0, minute=(timestamp.minute // minutes_in_interval) * minutes_in_interval ) # Hour timeframes (1h, 4h) elif interval_seconds < 86400: hours_in_interval = interval_seconds // 3600 return timestamp.replace( microsecond=0, second=0, minute=0, hour=(timestamp.hour // hours_in_interval) * hours_in_interval ) # Day timeframe (1d) else: return timestamp.replace( microsecond=0, second=0, minute=0, hour=0 ) def get_candles(self, interval='1m'): """Get candles for the specified interval""" # Convert legacy interval format to new format if isinstance(interval, int): # Convert seconds to the appropriate key if interval < 60: interval_key = f"{interval}s" elif interval < 3600: interval_key = f"{interval // 60}m" elif interval < 86400: interval_key = f"{interval // 3600}h" else: interval_key = f"{interval // 86400}d" else: interval_key = interval # Ensure the interval key exists in our candles dict if interval_key not in self.candles: logger.warning(f"Invalid interval key: {interval_key}") return None if not self.candles[interval_key]: logger.warning(f"No candles available for {interval_key}") return None # Convert to DataFrame df = pd.DataFrame(self.candles[interval_key]) if df.empty: return None # Set timestamp as index df.set_index('timestamp', inplace=True) # Sort by timestamp df = df.sort_index() return df def load_from_file(self, file_path): """Load ticks from a file""" try: df = pd.read_csv(file_path) for _, row in df.iterrows(): if 'timestamp' in row: timestamp = pd.to_datetime(row['timestamp']) else: timestamp = None self.add_tick( price=row.get('price', row.get('close', 0)), volume=row.get('volume', 0), timestamp=timestamp ) logger.info(f"Loaded {len(df)} ticks from {file_path}") except Exception as e: logger.error(f"Error loading ticks from file: {str(e)}") def load_historical_data(self, historical_data, symbol): """Load historical data for all timeframes""" try: # Clear any existing 1s candles to prevent using old cached data self.candles['1s'] = [] # Clear tick data to ensure we start with an empty collection self.ticks = [] # Load data for different timeframes (without 1s - we'll handle it separately) timeframes = [ (60, '1m'), # 1 minute (5, '5s'), # 5 seconds (15, '15s'), # 15 seconds (300, '5m'), # 5 minutes (900, '15m'), # 15 minutes (3600, '1h'), # 1 hour (14400, '4h'), # 4 hours (86400, '1d') # 1 day ] # For 1s, we only load from cache if available (handled in _load_from_cache method) # The _load_from_cache method will check if cache is no more than 10 minutes old df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Try to get 1s data from cache if df_1s is not None and not df_1s.empty: logger.info(f"Loaded {len(df_1s)} recent 1s candles from cache") # Convert to our candle format and store candles_1s = [] for _, row in df_1s.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles_1s.append(candle) # Add the 1s candles to our candles storage self.candles['1s'] = candles_1s else: logger.info("No recent 1s cache available, starting with empty 1s data") # Load the remaining timeframes normally for interval_seconds, interval_key in timeframes: # Set appropriate limits based on timeframe limit = 1000 # Default if interval_seconds < 60: limit = 500 # For seconds-level data elif interval_seconds < 300: limit = 1000 # 1m elif interval_seconds < 900: limit = 500 # 5m elif interval_seconds < 3600: limit = 300 # 15m else: limit = 200 # hourly/daily data try: # Load normal historical data df = historical_data.get_historical_candles(symbol, interval_seconds, limit) if df is not None and not df.empty: logger.info(f"Loaded {len(df)} historical candles for {symbol} ({interval_key})") # Convert to our candle format and store candles = [] for _, row in df.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles.append(candle) # Set the candles for this timeframe self.candles[interval_key] = candles # No longer load 1m data into ticks collection, as this persists the problem # Just store the latest price from the most recent candle for reference if interval_key == '1m' and candles: self.latest_price = candles[-1]['close'] logger.info(f"Set latest price to ${self.latest_price:.2f} from historical data without adding to ticks") except Exception as e: logger.error(f"Error loading {interval_key} data: {e}") continue logger.info(f"Completed loading historical data for {symbol}") except Exception as e: logger.error(f"Error loading historical data: {str(e)}") import traceback logger.error(traceback.format_exc()) class Position: """Represents a trading position""" def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.001): self.action = action self.entry_price = entry_price self.amount = amount self.entry_timestamp = timestamp or datetime.now() self.exit_timestamp = None self.exit_price = None self.pnl = None self.is_open = True self.trade_id = trade_id or str(uuid.uuid4())[:8] self.fee_rate = fee_rate self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee def close(self, exit_price, exit_timestamp=None): """Close an open position""" self.exit_price = exit_price self.exit_timestamp = exit_timestamp or datetime.now() self.is_open = False # Calculate P&L if self.action == "BUY": price_diff = self.exit_price - self.entry_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee else: # SELL price_diff = self.entry_price - self.exit_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee return self.pnl class RealTimeChart: """Real-time chart using Dash and Plotly""" def __init__(self, symbol: str): """Initialize the chart with a symbol""" self.symbol = symbol self.tick_storage = TickStorage() self.latest_price = None self.latest_volume = None self.latest_timestamp = None # Initialize with empty positions list to prevent old trade actions from affecting chart resizing # We MUST start with a clean state for each new chart instance self.positions = [] # Empty positions list - CRITICAL for proper chart resizing self.accumulative_pnl = 0.0 # Reset PnL self.current_balance = 100.0 # Start with $100 balance # Store historical data for different timeframes self.timeframe_data = { '1s': [], '5s': [], '15s': [], '1m': [], '5m': [], '15m': [], '1h': [], '4h': [], '1d': [] } # Initialize Dash app self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.DARKLY]) # Define button styles self.button_style = { 'padding': '5px 10px', 'margin': '0 5px', 'backgroundColor': '#444', 'color': 'white', 'border': 'none', 'borderRadius': '5px', 'cursor': 'pointer' } self.active_button_style = { 'padding': '5px 10px', 'margin': '0 5px', 'backgroundColor': '#007bff', 'color': 'white', 'border': 'none', 'borderRadius': '5px', 'cursor': 'pointer', 'boxShadow': '0 0 5px rgba(0, 123, 255, 0.5)' } # Create the layout self.app.layout = html.Div([ # Header section with title and current price html.Div([ html.H1(f"{symbol} Real-Time Chart", className="display-4"), # Current price ticker html.Div([ html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}), html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}), html.Div([ html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}), ], style={"display": "inline-block", "marginLeft": "40px"}), html.Div([ html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}), ], style={"display": "inline-block", "marginLeft": "40px"}), ], style={"textAlign": "center", "margin": "20px 0"}), ], style={"textAlign": "center", "marginBottom": "20px"}), # Add interval component for periodic updates dcc.Interval( id='interval-component', interval=500, # in milliseconds n_intervals=0 ), # Add timeframe selection buttons html.Div([ html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style), html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style), html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style), html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style), html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style), html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style), html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style), ], style={"textAlign": "center", "marginBottom": "20px"}), # Store for the selected timeframe dcc.Store(id='interval-store', data={'interval': 1}), # Chart containers dcc.Graph(id='live-chart', style={"height": "600px"}), dcc.Graph(id='secondary-charts', style={"height": "500px"}), # Positions list container html.Div(id='positions-list') ]) # Setup callbacks self._setup_callbacks() def _setup_callbacks(self): """Set up all the callbacks for the dashboard""" # Callback for timeframe selection @self.app.callback( [Output('interval-store', 'data'), Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'), Output('btn-1h', 'style')], [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'), Input('btn-1h', 'n_clicks')], [dash.dependencies.State('interval-store', 'data')] ) def update_interval(n1, n5, n15, n60, n300, n900, n3600, data): ctx = dash.callback_context if not ctx.triggered: # Default state (1s selected) return ({'interval': 1}, self.active_button_style, self.button_style, self.button_style, self.button_style, self.button_style, self.button_style, self.button_style) button_id = ctx.triggered[0]['prop_id'].split('.')[0] # Initialize all buttons to inactive button_styles = [self.button_style] * 7 # Set the active button and interval if button_id == 'btn-1s': button_styles[0] = self.active_button_style return ({'interval': 1}, *button_styles) elif button_id == 'btn-5s': button_styles[1] = self.active_button_style return ({'interval': 5}, *button_styles) elif button_id == 'btn-15s': button_styles[2] = self.active_button_style return ({'interval': 15}, *button_styles) elif button_id == 'btn-1m': button_styles[3] = self.active_button_style return ({'interval': 60}, *button_styles) elif button_id == 'btn-5m': button_styles[4] = self.active_button_style return ({'interval': 300}, *button_styles) elif button_id == 'btn-15m': button_styles[5] = self.active_button_style return ({'interval': 900}, *button_styles) elif button_id == 'btn-1h': button_styles[6] = self.active_button_style return ({'interval': 3600}, *button_styles) # Default - keep current interval current_interval = data.get('interval', 1) # Set the appropriate button as active if current_interval == 1: button_styles[0] = self.active_button_style elif current_interval == 5: button_styles[1] = self.active_button_style elif current_interval == 15: button_styles[2] = self.active_button_style elif current_interval == 60: button_styles[3] = self.active_button_style elif current_interval == 300: button_styles[4] = self.active_button_style elif current_interval == 900: button_styles[5] = self.active_button_style elif current_interval == 3600: button_styles[6] = self.active_button_style return (data, *button_styles) # Main update callback @self.app.callback( [Output('live-chart', 'figure'), Output('secondary-charts', 'figure'), Output('positions-list', 'children'), Output('current-price', 'children'), Output('current-balance', 'children'), Output('accumulated-pnl', 'children')], [Input('interval-component', 'n_intervals'), Input('interval-store', 'data')] ) def update_all(n, interval_data): try: # Get selected interval interval = interval_data.get('interval', 1) # Get updated chart figures main_fig = self._update_main_chart(interval) secondary_fig = self._update_secondary_charts() # Get updated positions list positions = self._get_position_list_rows() # Format the current price current_price = "$ ---.--" if self.latest_price is not None: current_price = f"${self.latest_price:.2f}" # Format balance and PnL balance_text = f"${self.current_balance:.2f}" pnl_text = f"${self.accumulative_pnl:.2f}" return main_fig, secondary_fig, positions, current_price, balance_text, pnl_text except Exception as e: logger.error(f"Error in update callback: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return empty updates on error return {}, {}, [], "Error", "$0.00", "$0.00" def _update_main_chart(self, interval=1): """Update the main chart with OHLC data""" try: # Get candles for the interval interval_key = self._get_interval_key(interval) # Make sure we have data for this interval if interval_key not in self.tick_storage.candles: logger.warning(f"No candle data structure available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( text=f"No data available for {interval_key}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) fig.update_layout(title=f"{self.symbol} - {interval_key}") return fig # For 1s specifically, log more debug info if interval_key == '1s': logger.info(f"1s candles count: {len(self.tick_storage.candles[interval_key])}") logger.info(f"Ticks count: {len(self.tick_storage.ticks)}") if not self.tick_storage.candles[interval_key]: logger.warning("No 1s candles available - this may indicate the WebSocket isn't sending data, or candles aren't being created") # Check if we have any candles for this interval if not self.tick_storage.candles[interval_key]: logger.warning(f"No candle data available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( text=f"No data available for {interval_key}. Waiting for real-time data...", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) fig.update_layout(title=f"{self.symbol} - {interval_key} (waiting for data)") return fig # For rendering, limit to the last 500 candles for performance candles = self.tick_storage.candles[interval_key][-500:] # Ensure we have at least 1 candle if not candles: logger.warning(f"No historical candles available for {interval_key}") return go.Figure() # Extract OHLC values timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] lows = [candle['low'] for candle in candles] closes = [candle['close'] for candle in candles] volumes = [candle['volume'] for candle in candles] # Create figure fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02, row_heights=[0.8, 0.2], specs=[[{"type": "candlestick"}], [{"type": "bar"}]]) # Add candlestick trace fig.add_trace(go.Candlestick( x=timestamps, open=opens, high=highs, low=lows, close=closes, name='OHLC', increasing_line_color='rgba(0, 180, 0, 0.7)', decreasing_line_color='rgba(255, 0, 0, 0.7)', ), row=1, col=1) # Add volume bars fig.add_trace(go.Bar( x=timestamps, y=volumes, name='Volume', marker_color='rgba(100, 100, 255, 0.5)' ), row=2, col=1) # Add trading markers if available if hasattr(self, 'positions') and self.positions: # Get last 100 positions for display (to avoid too many markers) positions = self.positions[-100:] buy_timestamps = [] buy_prices = [] sell_timestamps = [] sell_prices = [] for pos in positions: if pos.action == 'BUY': buy_timestamps.append(pos.entry_timestamp) buy_prices.append(pos.entry_price) elif pos.action == 'SELL': sell_timestamps.append(pos.entry_timestamp) # Using entry_time for consistency sell_prices.append(pos.entry_price) # Using entry_price for consistency # Add buy markers if buy_timestamps: fig.add_trace(go.Scatter( x=buy_timestamps, y=buy_prices, mode='markers', name='Buy', marker=dict( symbol='triangle-up', size=15, color='rgba(0, 180, 0, 0.8)', line=dict(width=1, color='rgba(0, 180, 0, 1)') ) ), row=1, col=1) # Add sell markers if sell_timestamps: fig.add_trace(go.Scatter( x=sell_timestamps, y=sell_prices, mode='markers', name='Sell', marker=dict( symbol='triangle-down', size=15, color='rgba(255, 0, 0, 0.8)', line=dict(width=1, color='rgba(255, 0, 0, 1)') ) ), row=1, col=1) # Update layout fig.update_layout( title=f"{self.symbol} - {interval_key}", xaxis_title="Time", yaxis_title="Price", height=600, template="plotly_dark", showlegend=True, margin=dict(l=0, r=0, t=50, b=20), legend=dict(orientation="h", y=1.02, x=0.5, xanchor="center"), uirevision='true' # To maintain zoom level on updates ) # Format Y-axis with enough decimal places for cryptocurrency fig.update_yaxes(tickformat=".2f") # Format X-axis with date/time fig.update_xaxes( rangeslider_visible=False, rangebreaks=[ dict(bounds=["sat", "mon"]) # hide weekends ] ) return fig except Exception as e: logger.error(f"Error updating main chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return empty figure on error return go.Figure() def _update_secondary_charts(self): """Update the secondary charts for other timeframes""" try: # For each timeframe, create a small chart secondary_timeframes = ['1m', '5m', '15m', '1h'] if not all(tf in self.tick_storage.candles for tf in secondary_timeframes): logger.warning("Not all secondary timeframes available") # Return empty figure with a message fig = make_subplots(rows=1, cols=4) for i, tf in enumerate(secondary_timeframes, 1): fig.add_annotation( text=f"No data for {tf}", xref=f"x{i}", yref=f"y{i}", x=0.5, y=0.5, showarrow=False ) return fig # Create subplots for each timeframe fig = make_subplots( rows=1, cols=4, subplot_titles=secondary_timeframes, shared_yaxes=True ) # Loop through each timeframe for i, timeframe in enumerate(secondary_timeframes, 1): interval_key = timeframe # Get candles for this timeframe if interval_key in self.tick_storage.candles and self.tick_storage.candles[interval_key]: # For rendering, limit to the last 100 candles for performance candles = self.tick_storage.candles[interval_key][-100:] if candles: # Extract OHLC values timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] lows = [candle['low'] for candle in candles] closes = [candle['close'] for candle in candles] # Add candlestick trace fig.add_trace(go.Candlestick( x=timestamps, open=opens, high=highs, low=lows, close=closes, name=interval_key, increasing_line_color='rgba(0, 180, 0, 0.7)', decreasing_line_color='rgba(255, 0, 0, 0.7)', showlegend=False ), row=1, col=i) else: # Add empty annotation if no data fig.add_annotation( text=f"No data for {interval_key}", xref=f"x{i}", yref=f"y{i}", x=0.5, y=0.5, showarrow=False ) # Update layout fig.update_layout( height=250, template="plotly_dark", showlegend=False, margin=dict(l=0, r=0, t=30, b=0), ) # Format Y-axis with 2 decimal places fig.update_yaxes(tickformat=".2f") # Format X-axis to show only the date (no time) for i in range(1, 5): fig.update_xaxes( row=1, col=i, rangeslider_visible=False, rangebreaks=[dict(bounds=["sat", "mon"])], # hide weekends tickformat="%m-%d" # Show month-day only ) return fig except Exception as e: logger.error(f"Error updating secondary charts: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return empty figure on error return make_subplots(rows=1, cols=4) def _get_position_list_rows(self): """Generate HTML for the positions list (last 10 positions only)""" try: if not hasattr(self, 'positions') or not self.positions: # Return placeholder if no positions return html.Div("No positions to display", style={"textAlign": "center", "padding": "20px"}) # Create table headers table_header = [ html.Thead(html.Tr([ html.Th("ID"), html.Th("Action"), html.Th("Entry Price"), html.Th("Exit Price"), html.Th("Amount"), html.Th("PnL"), html.Th("Time") ])) ] # Create table rows for only the last 10 positions to avoid overcrowding rows = [] last_positions = self.positions[-10:] if len(self.positions) > 10 else self.positions for position in last_positions: # Format times entry_time = position.entry_timestamp.strftime("%H:%M:%S") exit_time = position.exit_timestamp.strftime("%H:%M:%S") if position.exit_timestamp else "-" # Format PnL pnl_value = position.pnl if position.pnl is not None else 0 pnl_text = f"${pnl_value:.2f}" if position.pnl is not None else "-" pnl_style = {"color": "green" if position.pnl and position.pnl > 0 else "red"} # Create row row = html.Tr([ html.Td(position.trade_id), html.Td(position.action), html.Td(f"${position.entry_price:.2f}"), html.Td(f"${position.exit_price:.2f}" if position.exit_price else "-"), html.Td(f"{position.amount:.4f}"), html.Td(pnl_text, style=pnl_style), html.Td(f"{entry_time} → {exit_time}") ]) rows.append(row) table_body = [html.Tbody(rows)] # Add summary row for total PnL and other statistics total_trades = len(self.positions) winning_trades = sum(1 for p in self.positions if p.pnl and p.pnl > 0) win_rate = winning_trades / total_trades * 100 if total_trades > 0 else 0 # Format display colors for PnL pnl_color = "green" if self.accumulative_pnl >= 0 else "red" summary_row = html.Tr([ html.Td("SUMMARY", colSpan=2, style={"fontWeight": "bold"}), html.Td(f"Trades: {total_trades}"), html.Td(f"Win Rate: {win_rate:.1f}%"), html.Td("Total PnL:", style={"fontWeight": "bold"}), html.Td(f"${self.accumulative_pnl:.2f}", style={"color": pnl_color, "fontWeight": "bold"}), html.Td(f"Balance: ${self.current_balance:.2f}") ], style={"backgroundColor": "rgba(80, 80, 80, 0.3)"}) # Create the table with improved styling table = html.Table( table_header + table_body + [html.Tfoot([summary_row])], style={ "width": "100%", "textAlign": "center", "borderCollapse": "collapse", "marginTop": "20px" }, className="table table-striped table-dark" ) return table except Exception as e: logger.error(f"Error generating position list: {str(e)}") import traceback logger.error(traceback.format_exc()) return html.Div("Error displaying positions") def get_candles(self, interval_seconds=60): """Get candles for the specified interval""" try: # Get candles from tick storage interval_key = self._get_interval_key(interval_seconds) df = self.tick_storage.get_candles(interval_key) if df is None or df.empty: logger.warning(f"No candle data available for {interval_key}") return [] # Return empty list if no data # Convert dataframe to list of dictionaries candles = [] for idx, row in df.iterrows(): candle = { 'timestamp': idx, 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles.append(candle) return candles except Exception as e: logger.error(f"Error getting candles: {str(e)}") import traceback logger.error(traceback.format_exc()) return [] # Return empty list on error def _get_interval_key(self, interval_seconds): """Convert interval seconds to a key used in the tick storage""" if interval_seconds < 60: return f"{interval_seconds}s" elif interval_seconds < 3600: return f"{interval_seconds // 60}m" elif interval_seconds < 86400: return f"{interval_seconds // 3600}h" else: return f"{interval_seconds // 86400}d" def _update_chart_and_positions(self): """Update the chart with current data and positions""" try: # Force an update of the charts self._update_main_chart(1) # Update 1s chart by default self._update_secondary_charts() logger.debug("Updated charts and positions") return True except Exception as e: logger.error(f"Error updating chart and positions: {str(e)}") import traceback logger.error(traceback.format_exc()) return False async def start_websocket(self): """Start the websocket connection for real-time data""" try: # Step 1: Clear everything related to positions FIRST, before any other operations logger.info(f"Initializing fresh chart for {self.symbol} - clearing all previous positions") self.positions = [] # Clear positions list self.accumulative_pnl = 0.0 # Reset accumulated PnL self.current_balance = 100.0 # Reset balance # Step 2: Clear any previous tick data to avoid using stale data from previous training sessions self.tick_storage.ticks = [] # Step 3: Clear any previous 1s candles before loading historical data self.tick_storage.candles['1s'] = [] logger.info("Initialized empty 1s candles, tick collection, and positions for fresh data") # Load historical data first to ensure we have candles for all timeframes logger.info(f"Loading historical data for {self.symbol}") # Initialize a BinanceHistoricalData instance historical_data = BinanceHistoricalData() # Load historical data for all timeframes (1s will load from cache if recent, otherwise empty) self.tick_storage.load_historical_data(historical_data, self.symbol) # Double check that we have the 1s timeframe initialized if '1s' not in self.tick_storage.candles: self.tick_storage.candles['1s'] = [] logger.info(f"After loading historical data, 1s candles count: {len(self.tick_storage.candles['1s'])}") # Make sure we update the charts once with historical data before websocket starts # Update all the charts with the initial historical data self._update_chart_and_positions() # Initialize websocket self.websocket = ExchangeWebSocket(self.symbol) await self.websocket.connect() logger.info(f"WebSocket connected for {self.symbol}") # Counter for received ticks tick_count = 0 last_update_time = time.time() # Start receiving data while self.websocket.running: try: data = await self.websocket.receive() if data: # Process the received data if 'price' in data: tick_count += 1 # Update tick storage self.tick_storage.add_tick( price=data['price'], volume=data.get('volume', 0), timestamp=datetime.fromtimestamp(data['timestamp'] / 1000) # Convert ms to datetime ) # Store latest values self.latest_price = data['price'] self.latest_volume = data.get('volume', 0) self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000) # Force chart update every 5 seconds current_time = time.time() if current_time - last_update_time >= 5.0: self._update_chart_and_positions() last_update_time = current_time logger.debug("Forced chart update after new ticks") # Log tick processing for debugging (every 100 ticks) if tick_count % 100 == 0: logger.info(f"Processed {tick_count} ticks, current price: ${self.latest_price:.2f}") logger.info(f"Current 1s candles count: {len(self.tick_storage.candles['1s'])}") except Exception as e: logger.error(f"Error processing websocket data: {str(e)}") await asyncio.sleep(1) # Wait before retrying except Exception as e: logger.error(f"WebSocket error for {self.symbol}: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: if hasattr(self, 'websocket'): await self.websocket.close() def run(self, host='localhost', port=8050): """Run the Dash app on the specified host and port""" try: logger.info(f"Starting Dash app for {self.symbol} on {host}:{port}") self.app.run(debug=False, use_reloader=False, host=host, port=port) except Exception as e: logger.error(f"Error running Dash app: {str(e)}") import traceback logger.error(traceback.format_exc()) class BinanceWebSocket: """Binance WebSocket implementation for real-time tick data""" def __init__(self, symbol: str): self.symbol = symbol.replace('/', '').lower() self.ws = None self.running = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.message_count = 0 # Binance WebSocket configuration self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade" logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}") async def connect(self): while True: try: logger.info(f"Attempting to connect to {self.ws_url}") self.ws = await websockets.connect(self.ws_url) logger.info("WebSocket connection established") self.running = True self.reconnect_delay = 1 logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}") return True except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) continue async def receive(self) -> Optional[Dict]: if not self.ws: return None try: message = await self.ws.recv() self.message_count += 1 if self.message_count % 100 == 0: # Log every 100th message to avoid spam logger.info(f"Received message #{self.message_count}") logger.debug(f"Raw message: {message[:200]}...") data = json.loads(message) # Process trade data if 'e' in data and data['e'] == 'trade': trade_data = { 'timestamp': data['T'], # Trade time 'price': float(data['p']), # Price 'volume': float(data['q']), # Quantity 'type': 'trade' } logger.debug(f"Processed trade data: {trade_data}") return trade_data return None except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.running = False return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") return None except Exception as e: logger.error(f"Error receiving message: {str(e)}") return None async def close(self): """Close the WebSocket connection""" if self.ws: await self.ws.close() class ExchangeWebSocket: """Generic WebSocket interface for cryptocurrency exchanges""" def __init__(self, symbol: str, exchange: str = "binance"): self.symbol = symbol self.exchange = exchange.lower() self.ws = None # Initialize the appropriate WebSocket implementation if self.exchange == "binance": self.ws = BinanceWebSocket(symbol) else: raise ValueError(f"Unsupported exchange: {exchange}") async def connect(self): """Connect to the exchange WebSocket""" return await self.ws.connect() async def receive(self) -> Optional[Dict]: """Receive data from the WebSocket""" return await self.ws.receive() async def close(self): """Close the WebSocket connection""" await self.ws.close() @property def running(self): """Check if the WebSocket is running""" return self.ws.running if self.ws else False async def main(): global charts # Make charts globally accessible for NN integration symbols = ["ETH/USDT", "ETH/USDT"] logger.info(f"Starting application for symbols: {symbols}") # Initialize neural network if enabled if NN_ENABLED: logger.info("Initializing Neural Network integration...") if setup_neural_network(): logger.info("Neural Network integration initialized successfully") else: logger.warning("Neural Network integration failed to initialize") charts = [] websocket_tasks = [] # Create a chart and websocket task for each symbol for symbol in symbols: chart = RealTimeChart(symbol) charts.append(chart) websocket_tasks.append(asyncio.create_task(chart.start_websocket())) # Run Dash in a separate thread to not block the event loop server_threads = [] for i, chart in enumerate(charts): port = 8050 + i # Use different ports for each chart logger.info(f"Starting chart for {chart.symbol} on port {port}") thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed thread.daemon = True thread.start() server_threads.append(thread) logger.info(f"Thread started for {chart.symbol} on port {port}") try: # Keep the main task running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: for task in websocket_tasks: task.cancel() try: await task except asyncio.CancelledError: pass if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Application terminated by user")