import asyncio import json import logging import datetime from typing import Dict, List, Optional import websockets import plotly.graph_objects as go from plotly.subplots import make_subplots import dash from dash import html, dcc from dash.dependencies import Input, Output import pandas as pd import numpy as np from collections import deque import time from threading import Thread import requests import os from datetime import datetime, timedelta # Configure logging with more detailed format logging.basicConfig( level=logging.INFO, # Changed to DEBUG for more detailed logs format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('realtime_chart.log') ] ) logger = logging.getLogger(__name__) class TradeTickStorage: """Store and manage raw trade ticks for display and candle formation""" def __init__(self, max_age_seconds: int = 1800): # 30 minutes by default self.ticks = [] self.max_age_seconds = max_age_seconds self.last_cleanup_time = time.time() # Adjust cleanup interval based on max_age_seconds (more time = less frequent cleanup) self.cleanup_interval = min(max(10, max_age_seconds // 60), 60) # Between 10-60 seconds self.last_tick = None logger.info(f"Initialized TradeTickStorage with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds") def add_tick(self, tick: Dict): """Add a new trade tick to storage""" self.ticks.append(tick) self.last_tick = tick logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}") # Only clean up periodically rather than on every tick current_time = time.time() if current_time - self.last_cleanup_time > self.cleanup_interval: self._cleanup() self.last_cleanup_time = current_time def _cleanup(self): """Remove ticks older than max_age_seconds""" now = int(time.time() * 1000) # Current time in ms cutoff = now - (self.max_age_seconds * 1000) old_count = len(self.ticks) self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] removed = old_count - len(self.ticks) if removed > 0: logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}") def get_latest_price(self) -> Optional[float]: """Get the latest price from the most recent tick""" if self.last_tick: return self.last_tick.get('price') elif self.ticks: # If last_tick not available but ticks exist, use the last tick self.last_tick = self.ticks[-1] return self.last_tick.get('price') return None def get_price_stats(self) -> Dict: """Get stats about the prices in storage""" if not self.ticks: return { 'min': None, 'max': None, 'latest': None, 'count': 0, 'age_seconds': 0 } prices = [tick['price'] for tick in self.ticks] latest_timestamp = self.ticks[-1]['timestamp'] oldest_timestamp = self.ticks[0]['timestamp'] return { 'min': min(prices), 'max': max(prices), 'latest': prices[-1], 'count': len(prices), 'age_seconds': (latest_timestamp - oldest_timestamp) / 1000 } def get_ticks_as_df(self) -> pd.DataFrame: """Return ticks as a DataFrame""" if not self.ticks: logger.warning("No ticks available for DataFrame conversion") return pd.DataFrame() # Ensure we have fresh data self._cleanup() df = pd.DataFrame(self.ticks) if not df.empty: logger.debug(f"Converting timestamps for {len(df)} ticks") # Ensure timestamp column exists if 'timestamp' not in df.columns: logger.error("Tick data missing timestamp column") return pd.DataFrame() # Check timestamp datatype before conversion sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}") # Convert timestamps to datetime try: df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') logger.debug(f"Timestamps converted to datetime successfully") if len(df) > 0: logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}") except Exception as e: logger.error(f"Error converting timestamps: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() return df def get_candles(self, interval_seconds: int = 1, start_time_ms: int = None, end_time_ms: int = None) -> pd.DataFrame: """Convert ticks to OHLCV candles at specified interval with optional time range filtering Args: interval_seconds: The interval in seconds for each candle start_time_ms: Optional start time in milliseconds for filtering end_time_ms: Optional end time in milliseconds for filtering Returns: DataFrame with OHLCV candles """ if not self.ticks: logger.warning("No ticks available for candle formation") return pd.DataFrame() # Ensure ticks are up to date self._cleanup() # Get ticks from specified time range if provided if start_time_ms is not None or end_time_ms is not None: logger.debug(f"Filtering ticks for time range from {start_time_ms} to {end_time_ms}") filtered_ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) if not filtered_ticks: logger.warning("No ticks in the specified time range") return pd.DataFrame() df = pd.DataFrame(filtered_ticks) else: # Use all available ticks df = self.get_ticks_as_df() if df.empty: logger.warning("Tick DataFrame is empty after filtering/conversion") return pd.DataFrame() logger.info(f"Preparing to create candles from {len(df)} ticks") try: # Use timestamp column for resampling df = df.set_index('timestamp') # Create interval string for resampling - use 's' instead of deprecated 'S' interval_str = f'{interval_seconds}s' # Resample to create OHLCV candles logger.debug(f"Resampling with interval: {interval_str}") candles = df.resample(interval_str).agg({ 'price': ['first', 'max', 'min', 'last'], 'volume': 'sum' }) # Check if resampling produced any data if candles.empty: logger.warning("Resampling produced empty dataframe - check timestamp distribution") # Show timestamp ranges to diagnose potential resampling issues if not df.empty: min_time = df.index.min() max_time = df.index.max() logger.info(f"Tick timestamp range: {min_time} to {max_time}") return pd.DataFrame() # Flatten MultiIndex columns candles.columns = ['open', 'high', 'low', 'close', 'volume'] # Reset index to get timestamp as column candles = candles.reset_index() # Ensure no NaN values candles = candles.dropna() logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks") return candles except Exception as e: logger.error(f"Error in candle formation: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() def get_ticks_from_time(self, start_time_ms: int = None, end_time_ms: int = None) -> List[Dict]: """Get ticks within a specific time range Args: start_time_ms: Start time in milliseconds (None for no lower bound) end_time_ms: End time in milliseconds (None for no upper bound) Returns: List of ticks within the time range """ if not self.ticks: return [] # Ensure ticks are updated self._cleanup() # Apply time filters if specified filtered_ticks = self.ticks if start_time_ms is not None: filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] >= start_time_ms] if end_time_ms is not None: filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] <= end_time_ms] logger.debug(f"Retrieved {len(filtered_ticks)} ticks from time range {start_time_ms} to {end_time_ms}") return filtered_ticks def get_time_based_stats(self) -> Dict: """Get statistics about the ticks organized by time periods Returns: Dictionary with statistics for different time periods """ if not self.ticks: return { 'total_ticks': 0, 'periods': {} } # Ensure ticks are updated self._cleanup() now = int(time.time() * 1000) # Current time in ms # Define time periods to analyze periods = { '1min': now - (60 * 1000), '5min': now - (5 * 60 * 1000), '15min': now - (15 * 60 * 1000), '30min': now - (30 * 60 * 1000) } stats = { 'total_ticks': len(self.ticks), 'oldest_tick': self.ticks[0]['timestamp'] if self.ticks else None, 'newest_tick': self.ticks[-1]['timestamp'] if self.ticks else None, 'time_span_seconds': (self.ticks[-1]['timestamp'] - self.ticks[0]['timestamp']) / 1000 if self.ticks else 0, 'periods': {} } # Calculate stats for each period for period_name, cutoff_time in periods.items(): period_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff_time] if period_ticks: prices = [tick['price'] for tick in period_ticks] volumes = [tick.get('volume', 0) for tick in period_ticks] period_stats = { 'tick_count': len(period_ticks), 'min_price': min(prices) if prices else None, 'max_price': max(prices) if prices else None, 'avg_price': sum(prices) / len(prices) if prices else None, 'last_price': period_ticks[-1]['price'] if period_ticks else None, 'total_volume': sum(volumes), 'ticks_per_second': len(period_ticks) / (int(period_name[:-3]) * 60) if period_ticks else 0 } stats['periods'][period_name] = period_stats logger.debug(f"Generated time-based stats: {len(stats['periods'])} periods") return stats class CandlestickData: def __init__(self, max_length: int = 300): self.timestamps = deque(maxlen=max_length) self.opens = deque(maxlen=max_length) self.highs = deque(maxlen=max_length) self.lows = deque(maxlen=max_length) self.closes = deque(maxlen=max_length) self.volumes = deque(maxlen=max_length) self.current_candle = { 'timestamp': None, 'open': None, 'high': None, 'low': None, 'close': None, 'volume': 0 } self.candle_interval = 1 # 1 second by default def update_from_trade(self, trade: Dict): timestamp = trade['timestamp'] price = trade['price'] volume = trade.get('volume', 0) # Round timestamp to nearest candle interval candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000) if self.current_candle['timestamp'] != candle_timestamp: # Save current candle if it exists if self.current_candle['timestamp'] is not None: self.timestamps.append(self.current_candle['timestamp']) self.opens.append(self.current_candle['open']) self.highs.append(self.current_candle['high']) self.lows.append(self.current_candle['low']) self.closes.append(self.current_candle['close']) self.volumes.append(self.current_candle['volume']) logger.debug(f"New candle saved: {self.current_candle}") # Start new candle self.current_candle = { 'timestamp': candle_timestamp, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': volume } logger.debug(f"New candle started: {self.current_candle}") else: # Update current candle if self.current_candle['high'] is None or price > self.current_candle['high']: self.current_candle['high'] = price if self.current_candle['low'] is None or price < self.current_candle['low']: self.current_candle['low'] = price self.current_candle['close'] = price self.current_candle['volume'] += volume logger.debug(f"Updated current candle: {self.current_candle}") def get_dataframe(self) -> pd.DataFrame: # Include current candle in the dataframe if it exists timestamps = list(self.timestamps) opens = list(self.opens) highs = list(self.highs) lows = list(self.lows) closes = list(self.closes) volumes = list(self.volumes) if self.current_candle['timestamp'] is not None: timestamps.append(self.current_candle['timestamp']) opens.append(self.current_candle['open']) highs.append(self.current_candle['high']) lows.append(self.current_candle['low']) closes.append(self.current_candle['close']) volumes.append(self.current_candle['volume']) df = pd.DataFrame({ 'timestamp': timestamps, 'open': opens, 'high': highs, 'low': lows, 'close': closes, 'volume': volumes }) if not df.empty: df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return df class BinanceWebSocket: """Binance WebSocket implementation for real-time tick data""" def __init__(self, symbol: str): self.symbol = symbol.replace('/', '').lower() self.ws = None self.running = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.message_count = 0 # Binance WebSocket configuration self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade" logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}") async def connect(self): while True: try: logger.info(f"Attempting to connect to {self.ws_url}") self.ws = await websockets.connect(self.ws_url) logger.info("WebSocket connection established") self.running = True self.reconnect_delay = 1 logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}") return True except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) continue async def receive(self) -> Optional[Dict]: if not self.ws: return None try: message = await self.ws.recv() self.message_count += 1 if self.message_count % 100 == 0: # Log every 100th message to avoid spam logger.info(f"Received message #{self.message_count}") logger.debug(f"Raw message: {message[:200]}...") data = json.loads(message) # Process trade data if 'e' in data and data['e'] == 'trade': trade_data = { 'timestamp': data['T'], # Trade time 'price': float(data['p']), # Price 'volume': float(data['q']), # Quantity 'type': 'trade' } logger.debug(f"Processed trade data: {trade_data}") return trade_data return None except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.running = False return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") return None except Exception as e: logger.error(f"Error receiving message: {str(e)}") return None async def close(self): """Close the WebSocket connection""" if self.ws: await self.ws.close() self.running = False logger.info("WebSocket connection closed") class BinanceHistoricalData: """Fetch historical candle data from Binance""" def __init__(self): self.base_url = "https://api.binance.com/api/v3/klines" # Create a cache directory if it doesn't exist self.cache_dir = os.path.join(os.getcwd(), "cache") os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}") def _get_interval_string(self, interval_seconds: int) -> str: """Convert interval seconds to Binance interval string""" if interval_seconds == 60: # 1m return "1m" elif interval_seconds == 3600: # 1h return "1h" elif interval_seconds == 86400: # 1d return "1d" else: # Default to 1m if not recognized logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m") return "1m" def _get_cache_filename(self, symbol: str, interval: str) -> str: """Generate cache filename for the symbol and interval""" # Replace any slashes in symbol with underscore safe_symbol = symbol.replace("/", "_") return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv") def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]: """Load candle data from cache if available and not expired""" filename = self._get_cache_filename(symbol, interval) if not os.path.exists(filename): logger.debug(f"No cache file found for {symbol} {interval}") return None # Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d) file_age = time.time() - os.path.getmtime(filename) max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others if file_age > max_age: logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)") return None try: df = pd.read_csv(filename) # Convert timestamp string back to datetime df['timestamp'] = pd.to_datetime(df['timestamp']) logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}") return df except Exception as e: logger.error(f"Error loading from cache: {str(e)}") return None def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool: """Save candle data to cache""" if df.empty: logger.warning(f"No data to cache for {symbol} {interval}") return False filename = self._get_cache_filename(symbol, interval) try: df.to_csv(filename, index=False) logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}") return True except Exception as e: logger.error(f"Error saving to cache: {str(e)}") return False def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame: """Get historical candle data for the specified symbol and interval""" # Convert to Binance format clean_symbol = symbol.replace("/", "") interval = self._get_interval_string(interval_seconds) # Try to load from cache first cached_data = self._load_from_cache(symbol, interval) if cached_data is not None and len(cached_data) >= limit: return cached_data.tail(limit) # Fetch from API if not cached or insufficient try: logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API") params = { "symbol": clean_symbol, "interval": interval, "limit": limit } response = requests.get(self.base_url, params=params) response.raise_for_status() # Raise exception for HTTP errors # Process the data candles = response.json() if not candles: logger.warning(f"No candles returned from Binance for {symbol} {interval}") return pd.DataFrame() # Convert to DataFrame - Binance returns data in this format: # [ # [ # 1499040000000, // Open time # "0.01634790", // Open # "0.80000000", // High # "0.01575800", // Low # "0.01577100", // Close # "148976.11427815", // Volume # ... // Ignore the rest # ], # ... # ] df = pd.DataFrame(candles, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ]) # Convert types df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) # Keep only needed columns df = df[["timestamp", "open", "high", "low", "close", "volume"]] # Cache the results self._save_to_cache(df, symbol, interval) logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}") return df except Exception as e: logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() class ExchangeWebSocket: """Generic WebSocket interface for cryptocurrency exchanges""" def __init__(self, symbol: str, exchange: str = "binance"): self.symbol = symbol self.exchange = exchange.lower() self.ws = None # Initialize the appropriate WebSocket implementation if self.exchange == "binance": self.ws = BinanceWebSocket(symbol) elif self.exchange == "mexc": self.ws = MEXCWebSocket(symbol) else: raise ValueError(f"Unsupported exchange: {exchange}") async def connect(self): """Connect to the exchange WebSocket""" return await self.ws.connect() async def receive(self) -> Optional[Dict]: """Receive data from the WebSocket""" return await self.ws.receive() async def close(self): """Close the WebSocket connection""" await self.ws.close() @property def running(self): """Check if the WebSocket is running""" return self.ws.running if self.ws else False class CandleCache: def __init__(self, max_candles: int = 5000): self.candles = { '1s': deque(maxlen=max_candles), '1m': deque(maxlen=max_candles), '1h': deque(maxlen=max_candles), '1d': deque(maxlen=max_candles) } logger.info(f"Initialized CandleCache with max candles: {max_candles}") def add_candles(self, interval: str, new_candles: pd.DataFrame): if interval in self.candles and not new_candles.empty: # Convert DataFrame to list of dicts to avoid pandas issues for _, row in new_candles.iterrows(): candle_dict = row.to_dict() self.candles[interval].append(candle_dict) logger.debug(f"Added {len(new_candles)} candles to {interval} cache") def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame: if interval in self.candles and self.candles[interval]: # Convert deque to list of dicts first all_candles = list(self.candles[interval]) # Check if we're requesting more candles than we have if count > len(all_candles): logger.debug(f"Requested {count} candles, but only have {len(all_candles)} for {interval}") count = len(all_candles) recent_candles = all_candles[-count:] logger.debug(f"Returning {len(recent_candles)} recent candles for {interval} (requested {count})") # Create DataFrame and ensure timestamp is datetime type df = pd.DataFrame(recent_candles) if not df.empty and 'timestamp' in df.columns: try: if not pd.api.types.is_datetime64_any_dtype(df['timestamp']): df['timestamp'] = pd.to_datetime(df['timestamp']) except Exception as e: logger.warning(f"Error converting timestamps in get_recent_candles: {str(e)}") return df logger.debug(f"No candles available for {interval}") return pd.DataFrame() def update_cache(self, interval: str, new_candles: pd.DataFrame): if interval not in self.candles: logger.warning(f"Invalid interval {interval} for cache update") return if new_candles.empty: logger.debug(f"No new candles to update {interval} cache") return # Check if timestamp column exists if 'timestamp' not in new_candles.columns: logger.warning(f"No timestamp column in new candles for {interval}") return try: # If we have no candles in cache, add all new candles if not self.candles[interval]: logger.debug(f"No existing candles for {interval}, adding all {len(new_candles)} candles") self.add_candles(interval, new_candles) return # Get the timestamp from the last cached candle last_cached_candle = self.candles[interval][-1] if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle: last_cached_time = last_cached_candle['timestamp'] logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}") # Filter new candles that are after the last cached candle filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time] if not filtered_candles.empty: logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}") self.add_candles(interval, filtered_candles) else: logger.debug(f"No new candles to add for {interval}") else: logger.warning(f"Invalid last cached candle format for {interval}") except Exception as e: logger.error(f"Error updating cache for {interval}: {str(e)}") import traceback logger.error(traceback.format_exc()) class RealTimeChart: def __init__(self, symbol: str): self.symbol = symbol # Create a multi-page Dash app instead of a simple Dash app self.app = dash.Dash(__name__, suppress_callback_exceptions=True, meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]) self.candlestick_data = CandlestickData() self.tick_storage = TradeTickStorage(max_age_seconds=1800) # Store 30 minutes of ticks self.ohlcv_cache = { # Cache for different intervals '1s': None, '1m': None, '1h': None, '1d': None } self.candle_cache = CandleCache(max_candles=5000) # Increase max candles to 5000 for better history self.historical_data = BinanceHistoricalData() # For fetching historical data self.last_cache_save_time = time.time() # Track last time we saved cache to disk self.first_render = True # Flag to track first render logger.info(f"Initializing RealTimeChart for {symbol}") # Load historical data for longer timeframes at startup self._load_historical_data() # Setup the multi-page layout self._setup_app_layout() # Save the loaded cache immediately to ensure we have the data saved self._save_candles_to_disk(force=True) def _setup_app_layout(self): # Button style button_style = { 'background-color': '#4CAF50', 'color': 'white', 'padding': '10px 15px', 'margin': '5px', 'border': 'none', 'border-radius': '5px', 'font-size': '14px', 'cursor': 'pointer', 'transition': 'background-color 0.3s', 'font-weight': 'bold' } active_button_style = { **button_style, 'background-color': '#2E7D32', 'box-shadow': '0 0 5px #2E7D32' } nav_button_style = { 'background-color': '#3f51b5', 'color': 'white', 'padding': '10px 15px', 'margin': '5px', 'border': 'none', 'border-radius': '5px', 'font-size': '14px', 'cursor': 'pointer', 'font-weight': 'bold' } # Content div for the current page content_div = html.Div(id='page-content') # Initialize the layout with navigation self.app.layout = html.Div([ # Header with symbol and title html.Div([ html.H1(f"{self.symbol} Real-Time Data", style={ 'textAlign': 'center', 'color': '#FFFFFF', 'fontFamily': 'Arial, sans-serif', 'margin': '10px', 'textShadow': '2px 2px 4px #000000' }), # Navigation bar html.Div([ dcc.Link( html.Button('Price Chart', style=nav_button_style), href=f'/{self.symbol.replace("/", "-")}/chart', id='chart-link' ), dcc.Link( html.Button('Raw Ticks', style=nav_button_style), href=f'/{self.symbol.replace("/", "-")}/ticks', id='ticks-link' ), ], style={ 'display': 'flex', 'justifyContent': 'center', 'margin': '10px' }), ], style={ 'backgroundColor': '#1E1E1E', 'padding': '10px', 'borderRadius': '5px', 'marginBottom': '10px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' }), # URL Bar dcc.Location(id='url', refresh=False), # Content div will be populated based on the URL content_div, # Interval component for periodic updates dcc.Interval( id='interval-component', interval=500, # Update every 500ms for smoother display n_intervals=0 ) ], style={ 'backgroundColor': '#121212', 'padding': '20px', 'minHeight': '100vh', 'fontFamily': 'Arial, sans-serif' }) # Register URL callback to update page content @self.app.callback( Output('page-content', 'children'), [Input('url', 'pathname')] ) def display_page(pathname): if pathname is None: pathname = f'/{self.symbol.replace("/", "-")}/chart' symbol_path = self.symbol.replace("/", "-") if pathname == f'/{symbol_path}/chart' or pathname == f'/' or pathname == f'/{symbol_path}': return self._get_chart_layout(button_style, active_button_style) elif pathname == f'/{symbol_path}/ticks': return self._get_ticks_layout() else: return html.Div([ html.H1('404 - Page Not Found', style={'textAlign': 'center', 'color': 'white'}) ]) # Register callback to update the interval selection from button clicks self._setup_interval_callback(button_style, active_button_style) # Register callback to update the chart data self._setup_chart_callback() # Register callback to update the ticks data self._setup_ticks_callback() def _get_chart_layout(self, button_style, active_button_style): # Chart page layout return html.Div([ # Interval selection buttons html.Div([ html.Div("Candlestick Interval:", style={ 'color': '#FFFFFF', 'marginRight': '10px', 'fontSize': '16px', 'fontWeight': 'bold' }), html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style), html.Button('5s', id='btn-5s', n_clicks=0, style=button_style), html.Button('15s', id='btn-15s', n_clicks=0, style=button_style), html.Button('30s', id='btn-30s', n_clicks=0, style=button_style), html.Button('1m', id='btn-1m', n_clicks=0, style=button_style), ], style={ 'display': 'flex', 'alignItems': 'center', 'justifyContent': 'center', 'margin': '15px', 'backgroundColor': '#2C2C2C', 'padding': '10px', 'borderRadius': '5px' }), # Store for current interval dcc.Store(id='interval-store', data={'interval': 1}), # Main chart dcc.Graph( id='live-chart', style={ 'height': '180vh', 'border': '1px solid #444444', 'borderRadius': '5px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' } ), ]) def _get_ticks_layout(self): # Ticks data page layout return html.Div([ # Header and controls html.Div([ html.H2(f"{self.symbol} Raw Tick Data (Last 5 Minutes)", style={ 'textAlign': 'center', 'color': '#FFFFFF', 'margin': '10px 0' }), # Refresh button html.Button('Refresh Data', id='refresh-ticks-btn', n_clicks=0, style={ 'backgroundColor': '#4CAF50', 'color': 'white', 'padding': '10px 20px', 'margin': '10px auto', 'border': 'none', 'borderRadius': '5px', 'fontSize': '14px', 'cursor': 'pointer', 'display': 'block' }), # Time window selector html.Div([ html.Label("Time Window:", style={'color': 'white', 'marginRight': '10px'}), dcc.Dropdown( id='time-window-dropdown', options=[ {'label': 'Last 1 minute', 'value': 60}, {'label': 'Last 5 minutes', 'value': 300}, {'label': 'Last 15 minutes', 'value': 900}, {'label': 'Last 30 minutes', 'value': 1800}, ], value=300, # Default to 5 minutes style={'width': '200px', 'backgroundColor': '#2C2C2C', 'color': 'black'} ) ], style={ 'display': 'flex', 'alignItems': 'center', 'justifyContent': 'center', 'margin': '10px' }), ], style={ 'backgroundColor': '#2C2C2C', 'padding': '10px', 'borderRadius': '5px', 'marginBottom': '15px' }), # Stats cards html.Div(id='tick-stats-cards', style={ 'display': 'flex', 'flexWrap': 'wrap', 'justifyContent': 'space-around', 'marginBottom': '15px' }), # Ticks data table html.Div(id='ticks-table-container', style={ 'backgroundColor': '#232323', 'padding': '10px', 'borderRadius': '5px', 'overflowX': 'auto' }), # Price movement chart html.Div([ html.H3("Price Movement", style={ 'textAlign': 'center', 'color': '#FFFFFF', 'margin': '10px 0' }), dcc.Graph(id='tick-price-chart') ], style={ 'backgroundColor': '#232323', 'padding': '10px', 'borderRadius': '5px', 'marginTop': '15px' }) ]) def _setup_interval_callback(self, button_style, active_button_style): # Callback to update interval based on button clicks and update button styles @self.app.callback( [Output('interval-store', 'data'), Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), Output('btn-30s', 'style'), Output('btn-1m', 'style')], [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), Input('btn-30s', 'n_clicks'), Input('btn-1m', 'n_clicks')], [dash.dependencies.State('interval-store', 'data')] ) def update_interval(n1, n5, n15, n30, n60, data): ctx = dash.callback_context if not ctx.triggered: # Default state (1s selected) return ({'interval': 1}, active_button_style, button_style, button_style, button_style, button_style) button_id = ctx.triggered[0]['prop_id'].split('.')[0] if button_id == 'btn-1s': return ({'interval': 1}, active_button_style, button_style, button_style, button_style, button_style) elif button_id == 'btn-5s': return ({'interval': 5}, button_style, active_button_style, button_style, button_style, button_style) elif button_id == 'btn-15s': return ({'interval': 15}, button_style, button_style, active_button_style, button_style, button_style) elif button_id == 'btn-30s': return ({'interval': 30}, button_style, button_style, button_style, active_button_style, button_style) elif button_id == 'btn-1m': return ({'interval': 60}, button_style, button_style, button_style, button_style, active_button_style) # Default case - keep current interval and highlight appropriate button current_interval = data.get('interval', 1) styles = [button_style] * 5 # All inactive by default # Set active style based on current interval if current_interval == 1: styles[0] = active_button_style elif current_interval == 5: styles[1] = active_button_style elif current_interval == 15: styles[2] = active_button_style elif current_interval == 30: styles[3] = active_button_style elif current_interval == 60: styles[4] = active_button_style return (data, *styles) def _setup_chart_callback(self): # Callback to update the chart @self.app.callback( Output('live-chart', 'figure'), [Input('interval-component', 'n_intervals'), Input('interval-store', 'data')] ) def update_chart(n, interval_data): try: interval = interval_data.get('interval', 1) logger.debug(f"Updating chart for {self.symbol} with interval {interval}s") # First render flag - used to force cache loading is_first_render = self.first_render if is_first_render: logger.info("First render - ensuring cached data is loaded") self.first_render = False # Check if we have cached data for the requested interval cached_candles = None if interval == 1 and self.ohlcv_cache['1s'] is not None and not self.ohlcv_cache['1s'].empty: cached_candles = self.ohlcv_cache['1s'] logger.info(f"We have {len(cached_candles)} cached 1s candles available") elif interval == 60 and self.ohlcv_cache['1m'] is not None and not self.ohlcv_cache['1m'].empty: cached_candles = self.ohlcv_cache['1m'] logger.info(f"We have {len(cached_candles)} cached 1m candles available") # Get candlesticks from tick storage for real-time data df = self.tick_storage.get_candles(interval_seconds=interval) logger.debug(f"Got {len(df) if not df.empty else 0} candles from tick storage") # If we don't have real-time data yet, use cached data if df.empty and cached_candles is not None and not cached_candles.empty: df = cached_candles logger.info(f"Using {len(df)} cached candles for main chart") # Get current price and stats using our enhanced methods current_price = self.tick_storage.get_latest_price() price_stats = self.tick_storage.get_price_stats() time_stats = self.tick_storage.get_time_based_stats() # Periodically save candles to disk if n % 60 == 0 or is_first_render: # Every 60 chart updates or on first render self._save_candles_to_disk() logger.debug(f"Current price: {current_price}, Stats: {price_stats}") # Create subplot layout - don't include 1s since that's the main chart fig = make_subplots( rows=5, cols=1, vertical_spacing=0.05, subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1m OHLCV', '1h OHLCV', '1d OHLCV'), row_heights=[0.2, 0.2, 0.2, 0.2, 0.2] ) # Process and add main chart data if not df.empty and len(df) > 0: # Limit how many candles we display for better performance display_df = df if len(df) > 500: logger.debug(f"Limiting main chart display from {len(df)} to 500 candles") display_df = df.tail(500) logger.debug(f"Displaying {len(display_df)} candles in main chart") # Add candlestick chart fig.add_trace( go.Candlestick( x=display_df['timestamp'], open=display_df['open'], high=display_df['high'], low=display_df['low'], close=display_df['close'], name='Price', increasing_line_color='#33CC33', # Green decreasing_line_color='#FF4136' # Red ), row=1, col=1 ) # Add volume bars colors = ['#33CC33' if close >= open else '#FF4136' for close, open in zip(display_df['close'], display_df['open'])] fig.add_trace( go.Bar( x=display_df['timestamp'], y=display_df['volume'], name='Volume', marker_color=colors ), row=2, col=1 ) # Add latest price line from the candlestick data latest_price = display_df['close'].iloc[-1] fig.add_shape( type="line", x0=display_df['timestamp'].min(), y0=latest_price, x1=display_df['timestamp'].max(), y1=latest_price, line=dict(color="yellow", width=1, dash="dash"), row=1, col=1 ) # Annotation for last candle close price fig.add_annotation( x=display_df['timestamp'].max(), y=latest_price, text=f"{latest_price:.2f}", showarrow=False, font=dict(size=14, color="yellow"), xshift=50, row=1, col=1 ) # If we have a more recent price from ticks, add that too if current_price and abs(current_price - latest_price) > 0.01: # Add current price line fig.add_shape( type="line", x0=display_df['timestamp'].min(), y0=current_price, x1=display_df['timestamp'].max(), y1=current_price, line=dict(color="cyan", width=1, dash="dot"), row=1, col=1 ) # Add current price annotation fig.add_annotation( x=display_df['timestamp'].max(), y=current_price, text=f"Current: {current_price:.2f}", showarrow=False, font=dict(size=14, color="cyan"), xshift=50, yshift=20, row=1, col=1 ) # Update candle cache for all timeframes if we have new data if not df.empty: try: # Update the 1s cache with current df (if it's 1s data) if interval == 1: self.candle_cache.update_cache('1s', df) self.ohlcv_cache['1s'] = self.candle_cache.get_recent_candles('1s', count=2000) logger.debug(f"Updated 1s cache, now has {len(self.ohlcv_cache['1s'])} candles") # For other intervals, get fresh data from tick storage for interval_key in ['1m', '1h', '1d']: int_seconds = self._interval_to_seconds(interval_key) new_candles = self.tick_storage.get_candles(interval_seconds=int_seconds) if not new_candles.empty: self.candle_cache.update_cache(interval_key, new_candles) self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000) logger.debug(f"Updated cache for {interval_key}, now has {len(self.ohlcv_cache[interval_key])} candles") except Exception as e: logger.error(f"Error updating candle caches: {str(e)}") # Add OHLCV subcharts for 1m, 1h, 1d (not 1s since it's the main chart) timeframe_map = { '1m': (3, '1 Minute'), '1h': (4, '1 Hour'), '1d': (5, '1 Day') } for interval_key, (row_idx, label) in timeframe_map.items(): ohlcv_df = self.ohlcv_cache.get(interval_key) if ohlcv_df is not None and not ohlcv_df.empty: # Limit to last 100 candles to avoid overcrowding if len(ohlcv_df) > 100: ohlcv_df = ohlcv_df.tail(100) fig.add_trace( go.Candlestick( x=ohlcv_df['timestamp'], open=ohlcv_df['open'], high=ohlcv_df['high'], low=ohlcv_df['low'], close=ohlcv_df['close'], name=f'{label}', increasing_line_color='#33CC33', decreasing_line_color='#FF4136', showlegend=False ), row=row_idx, col=1 ) # Add latest price line latest_timeframe_price = ohlcv_df['close'].iloc[-1] if len(ohlcv_df) > 0 else None if latest_timeframe_price: fig.add_shape( type="line", x0=ohlcv_df['timestamp'].min(), y0=latest_timeframe_price, x1=ohlcv_df['timestamp'].max(), y1=latest_timeframe_price, line=dict(color="yellow", width=1, dash="dash"), row=row_idx, col=1 ) # Add price annotation fig.add_annotation( x=ohlcv_df['timestamp'].max(), y=latest_timeframe_price, text=f"{latest_timeframe_price:.2f}", showarrow=False, font=dict(size=12, color="yellow"), xshift=50, row=row_idx, col=1 ) else: # If no data, add a text annotation to the chart logger.warning(f"No data to display for {self.symbol} - tick count: {len(self.tick_storage.ticks)}") # Try to display cached data for each timeframe even if main chart is empty timeframe_map = { '1m': (3, '1 Minute'), '1h': (4, '1 Hour'), '1d': (5, '1 Day') } has_any_data = False for interval_key, (row_idx, label) in timeframe_map.items(): ohlcv_df = self.ohlcv_cache.get(interval_key) if ohlcv_df is not None and not ohlcv_df.empty: has_any_data = True # Limit to last 100 candles if len(ohlcv_df) > 100: ohlcv_df = ohlcv_df.tail(100) fig.add_trace( go.Candlestick( x=ohlcv_df['timestamp'], open=ohlcv_df['open'], high=ohlcv_df['high'], low=ohlcv_df['low'], close=ohlcv_df['close'], name=f'{label}', increasing_line_color='#33CC33', decreasing_line_color='#FF4136', showlegend=False ), row=row_idx, col=1 ) # Add a message to the empty main chart fig.add_annotation( x=0.5, y=0.5, text=f"Waiting for {self.symbol} real-time data...", showarrow=False, font=dict(size=20, color="white"), xref="paper", yref="paper", row=1, col=1 ) if not has_any_data: # If no data at all, show message fig.add_annotation( x=0.5, y=0.5, text=f"No data available for {self.symbol}", showarrow=False, font=dict(size=20, color="white"), xref="paper", yref="paper" ) # Build info box text with all the statistics info_lines = [f"{self.symbol}"] # Add current price if available if current_price: info_lines.append(f"Current: {current_price:.2f} USDT") # Add price statistics if available if price_stats['count'] > 0: # Format time range age_text = f"{price_stats['age_seconds']:.1f}s" if price_stats['age_seconds'] > 60: minutes = int(price_stats['age_seconds'] / 60) seconds = int(price_stats['age_seconds'] % 60) age_text = f"{minutes}m {seconds}s" # Add price range and change if price_stats['min'] is not None and price_stats['max'] is not None: price_range = f"Range: {price_stats['min']:.2f} - {price_stats['max']:.2f}" info_lines.append(price_range) # Add tick count and time range info_lines.append(f"Ticks: {price_stats['count']} in {age_text}") # Add candle count candle_count = len(df) if not df.empty else 0 info_lines.append(f"Candles: {candle_count} ({interval}s)") # Add time-based statistics if time_stats and time_stats['periods']: info_lines.append("Time-Based Stats:") for period, stats in time_stats['periods'].items(): if stats['tick_count'] > 0: info_lines.append(f"{period}: {stats['tick_count']} ticks, {stats['ticks_per_second']:.2f}/s") if stats['min_price'] is not None and stats['max_price'] is not None: price_change = stats['last_price'] - stats['min_price'] change_pct = (price_change / stats['min_price']) * 100 if stats['min_price'] > 0 else 0 info_lines.append(f" Range: {stats['min_price']:.2f}-{stats['max_price']:.2f} ({change_pct:.2f}%)") # Add cache information info_lines.append("Cached Candles:") for interval_key, cache_df in self.ohlcv_cache.items(): count = len(cache_df) if cache_df is not None else 0 info_lines.append(f"{interval_key}: {count}") # Add info box to the chart fig.add_annotation( x=0.01, y=0.99, xref="paper", yref="paper", text="
".join(info_lines), showarrow=False, font=dict(size=12, color="white"), align="left", bgcolor="rgba(0,0,50,0.7)", bordercolor="#3366CC", borderwidth=2, borderpad=5, xanchor="left", yanchor="top" ) # Update layout with improved styling interval_text = { 1: "1 second", 5: "5 seconds", 15: "15 seconds", 30: "30 seconds", 60: "1 minute" }.get(interval, f"{interval}s") fig.update_layout( title_text=f"{self.symbol} Real-Time Data ({interval_text})", title_x=0.5, # Center the title xaxis_rangeslider_visible=False, height=1800, # Increased height for taller display template='plotly_dark', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(25,25,50,1)', font=dict(family="Arial, sans-serif", size=12, color="white"), showlegend=True, legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01 ) ) # Update axes styling fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') return fig except Exception as e: logger.error(f"Error updating chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Create a minimal figure with error message fig = go.Figure() fig.add_annotation( x=0.5, y=0.5, text=f"Error updating chart: {str(e)}", showarrow=False, font=dict(size=14, color="red"), xref="paper", yref="paper" ) fig.update_layout( height=800, template='plotly_dark', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(25,25,50,1)' ) return fig def _setup_ticks_callback(self): # Callbacks to update the ticks data display # Callback to update stats cards @self.app.callback( Output('tick-stats-cards', 'children'), [Input('interval-component', 'n_intervals'), Input('refresh-ticks-btn', 'n_clicks'), Input('time-window-dropdown', 'value')] ) def update_tick_stats(n_intervals, n_clicks, time_window): # Get time window in seconds window_seconds = time_window if time_window else 300 # Calculate time range for filtering now = int(time.time() * 1000) start_time = now - (window_seconds * 1000) # Get filtered ticks filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) if not filtered_ticks: return [html.Div("No tick data available in the selected time window.", style={'color': 'white', 'textAlign': 'center', 'margin': '20px'})] # Calculate stats tick_count = len(filtered_ticks) prices = [tick['price'] for tick in filtered_ticks] min_price = min(prices) if prices else 0 max_price = max(prices) if prices else 0 avg_price = sum(prices) / len(prices) if prices else 0 price_range = max_price - min_price volumes = [tick.get('volume', 0) for tick in filtered_ticks] total_volume = sum(volumes) avg_volume = total_volume / len(volumes) if volumes else 0 # Calculate additional volume stats max_volume = max(volumes) if volumes else 0 min_volume = min(volumes) if volumes else 0 median_volume = sorted(volumes)[len(volumes)//2] if volumes else 0 # Calculate trade value in USDT trade_values = [p * v for p, v in zip(prices, volumes)] total_value = sum(trade_values) avg_trade_value = total_value / len(trade_values) if trade_values else 0 first_timestamp = filtered_ticks[0]['timestamp'] last_timestamp = filtered_ticks[-1]['timestamp'] time_span_ms = last_timestamp - first_timestamp time_span_seconds = time_span_ms / 1000 ticks_per_second = tick_count / time_span_seconds if time_span_seconds > 0 else 0 # Create stat cards card_style = { 'backgroundColor': '#1E2130', 'borderRadius': '5px', 'padding': '15px', 'marginBottom': '10px', 'width': '23%', 'boxShadow': '0 2px 4px rgba(0,0,0,0.2)', 'color': 'white', 'display': 'flex', 'flexDirection': 'column', 'alignItems': 'center', 'justifyContent': 'center' } mobile_card_style = { **card_style, 'width': '100%', 'marginBottom': '10px' } value_style = { 'fontSize': '24px', 'fontWeight': 'bold', 'color': '#4CAF50', 'marginTop': '5px' } label_style = { 'fontSize': '14px', 'color': '#AAAAAA' } return [ html.Div([ html.Div("Tick Count", style=label_style), html.Div(f"{tick_count:,}", style=value_style), html.Div(f"{ticks_per_second:.2f} ticks/sec", style=label_style) ], style=card_style), html.Div([ html.Div("Price Range", style=label_style), html.Div(f"{min_price:.2f} - {max_price:.2f}", style=value_style), html.Div(f"Range: {price_range:.2f}", style=label_style) ], style=card_style), html.Div([ html.Div("Average Price", style=label_style), html.Div(f"{avg_price:.2f}", style=value_style), html.Div("USDT", style=label_style) ], style=card_style), html.Div([ html.Div("Total Volume", style=label_style), html.Div(f"{total_volume:.8f}", style=value_style), html.Div(f"Avg: {avg_volume:.8f}", style=label_style) ], style=card_style), html.Div([ html.Div("Max Volume", style=label_style), html.Div(f"{max_volume:.8f}", style=value_style), html.Div("Median: {:.8f}".format(median_volume), style=label_style) ], style=card_style), html.Div([ html.Div("Total Value", style=label_style), html.Div(f"{total_value:.2f}", style=value_style), html.Div(f"Avg: {avg_trade_value:.2f}", style=label_style) ], style=card_style) ] # Callback to update ticks table @self.app.callback( Output('ticks-table-container', 'children'), [Input('interval-component', 'n_intervals'), Input('refresh-ticks-btn', 'n_clicks'), Input('time-window-dropdown', 'value')] ) def update_ticks_table(n_intervals, n_clicks, time_window): # Get time window in seconds window_seconds = time_window if time_window else 300 # Calculate time range for filtering now = int(time.time() * 1000) start_time = now - (window_seconds * 1000) # Get filtered ticks filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) if not filtered_ticks: return html.Div("No tick data available in the selected time window.", style={'color': 'white', 'textAlign': 'center', 'margin': '20px'}) # Convert to readable format formatted_ticks = [] for tick in filtered_ticks: timestamp_dt = datetime.fromtimestamp(tick['timestamp'] / 1000) formatted_time = timestamp_dt.strftime('%H:%M:%S.%f')[:-3] # Format as HH:MM:SS.mmm formatted_ticks.append({ 'time': formatted_time, 'price': f"{tick['price']:.2f}", 'volume': f"{tick['volume']:.8f}", 'value': f"{tick['price'] * tick['volume']:.2f}" }) # Display only the most recent ticks display_limit = 100 limited_ticks = formatted_ticks[-display_limit:] if len(formatted_ticks) > display_limit else formatted_ticks limited_ticks.reverse() # Show most recent at the top # Create table table_style = { 'width': '100%', 'borderCollapse': 'collapse', 'color': 'white', 'fontFamily': 'monospace' } header_style = { 'backgroundColor': '#1A1A1A', 'fontWeight': 'bold', 'padding': '8px', 'textAlign': 'left', 'borderBottom': '2px solid #444' } cell_style = { 'padding': '6px', 'borderBottom': '1px solid #333', 'textAlign': 'right' } time_cell_style = { **cell_style, 'textAlign': 'left' } # Create table header header = html.Tr([ html.Th("Time", style=header_style), html.Th("Price", style=header_style), html.Th("Volume", style=header_style), html.Th("Value (USDT)", style=header_style) ]) # Create table rows rows = [] for i, tick in enumerate(limited_ticks): row_style = {'backgroundColor': '#1E1E1E'} if i % 2 == 0 else {'backgroundColor': '#252525'} rows.append(html.Tr([ html.Td(tick['time'], style={**time_cell_style, **row_style}), html.Td(tick['price'], style={**cell_style, **row_style}), html.Td(tick['volume'], style={**cell_style, **row_style}), html.Td(tick['value'], style={**cell_style, **row_style}) ])) return [ html.Div([ html.H3(f"Latest {len(limited_ticks)} Ticks (from {len(filtered_ticks)} total)", style={'color': 'white', 'marginBottom': '10px', 'textAlign': 'center'}), html.Table([html.Thead(header), html.Tbody(rows)], style=table_style) ]) ] # Callback to update price chart @self.app.callback( Output('tick-price-chart', 'figure'), [Input('interval-component', 'n_intervals'), Input('refresh-ticks-btn', 'n_clicks'), Input('time-window-dropdown', 'value')] ) def update_price_chart(n_intervals, n_clicks, time_window): # Get time window in seconds window_seconds = time_window if time_window else 300 # Calculate time range for filtering now = int(time.time() * 1000) start_time = now - (window_seconds * 1000) # Get filtered ticks filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) # Create figure with 2 subplots - price and volume fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.03, row_heights=[0.7, 0.3], subplot_titles=(f"Price Movement (Last {window_seconds // 60} minutes)", "Volume") ) if not filtered_ticks: fig.add_annotation( x=0.5, y=0.5, text="No tick data available in the selected time window.", showarrow=False, font=dict(size=14, color="white"), xref="paper", yref="paper" ) else: # Convert timestamps to datetime for better x-axis display timestamps = [datetime.fromtimestamp(tick['timestamp'] / 1000) for tick in filtered_ticks] prices = [tick['price'] for tick in filtered_ticks] volumes = [tick.get('volume', 0) for tick in filtered_ticks] # Add price scatter plot fig.add_trace( go.Scatter( x=timestamps, y=prices, mode='lines', name='Price', line=dict(color='#4CAF50', width=1.5) ), row=1, col=1 ) # Create a volume profile on the right side of the price chart if len(prices) > 5: # Only create profile if we have enough data # Group prices into bins price_min = min(prices) price_max = max(prices) # Create approximately 20 bins based on price range bin_size = max(0.01, (price_max - price_min) / 20) # Create a dictionary to hold volume by price level volume_by_price = {} # Group volumes by price bins for p, v in zip(prices, volumes): bin_key = round(p / bin_size) * bin_size if bin_key in volume_by_price: volume_by_price[bin_key] += v else: volume_by_price[bin_key] = v # Sort by price level sorted_bins = sorted(volume_by_price.items()) profile_prices = [p for p, _ in sorted_bins] profile_volumes = [v for _, v in sorted_bins] # Add separate volume profile trace fig.add_trace( go.Bar( y=profile_prices, x=profile_volumes, orientation='h', name='Volume Profile', marker=dict( color=['#33CC33' if p <= latest_price else '#FF4136' for p in profile_prices], opacity=0.5 ), showlegend=True, hovertemplate='Price: %{y:.2f}
Volume: %{x:.8f}' ), row=1, col=1 ) # Add a line marking the latest price fig.add_shape( type="line", y0=latest_price, y1=latest_price, x0=0, x1=max(profile_volumes) * 1.1, line=dict(color="yellow", width=1, dash="dash"), row=1, col=1 ) # Add volume bars in separate subplot # Color volume bars green for price increases, red for decreases if len(timestamps) > 1: # Compare each price with the previous to determine color colors = [] for i in range(len(prices)): if i == 0: colors.append('#33CC33') # Default to green for first tick else: if prices[i] >= prices[i-1]: colors.append('#33CC33') # Green for price increase/same else: colors.append('#FF4136') # Red for price decrease else: colors = ['#33CC33'] # Default green if only one tick fig.add_trace( go.Bar( x=timestamps, y=volumes, name='Volume', marker=dict(color=colors) ), row=2, col=1 ) # Compute stats for annotations latest_price = prices[-1] if prices else 0 total_volume = sum(volumes) max_volume = max(volumes) if volumes else 0 avg_volume = total_volume / len(volumes) if volumes else 0 # Add annotations for latest price fig.add_annotation( x=timestamps[-1] if timestamps else 0, y=latest_price, text=f"{latest_price:.2f}", showarrow=True, arrowhead=1, arrowsize=1, arrowwidth=2, arrowcolor="#4CAF50", font=dict(size=12, color="#4CAF50"), xshift=50, row=1, col=1 ) # Add annotations for volume stats fig.add_annotation( x=timestamps[-1] if timestamps else 0, y=max_volume, text=f"Max: {max_volume:.8f}", showarrow=False, font=dict(size=10, color="rgba(128, 128, 255, 1)"), xshift=50, row=2, col=1 ) # Update layout fig.update_layout( title_text=f"{self.symbol} Tick Data", title_x=0.5, template='plotly_dark', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(25,25,50,1)', height=600, # Increased height for better visualization margin=dict(l=40, r=40, t=80, b=40), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), hovermode="x unified" # Show all data points at the same x-coordinate ) # Update x-axis to be shared fig.update_xaxes( showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)', rangeslider_visible=False ) # Update y-axes fig.update_yaxes( title_text="Price (USDT)", showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)', row=1, col=1 ) fig.update_yaxes( title_text="Volume", showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)', row=2, col=1 ) return fig def _interval_to_seconds(self, interval_key: str) -> int: """Convert interval key to seconds""" mapping = { '1s': 1, '1m': 60, '1h': 3600, '1d': 86400 } return mapping.get(interval_key, 1) async def start_websocket(self): ws = ExchangeWebSocket(self.symbol) connection_attempts = 0 max_attempts = 10 # Maximum connection attempts before longer waiting period while True: # Keep trying to maintain connection connection_attempts += 1 if not await ws.connect(): logger.error(f"Failed to connect to exchange for {self.symbol}") # Gradually increase wait time based on number of connection failures wait_time = min(5 * connection_attempts, 60) # Cap at 60 seconds logger.warning(f"Waiting {wait_time} seconds before retry (attempt {connection_attempts})") if connection_attempts >= max_attempts: logger.warning(f"Reached {max_attempts} connection attempts, taking a longer break") await asyncio.sleep(120) # 2 minutes wait after max attempts connection_attempts = 0 # Reset counter else: await asyncio.sleep(wait_time) continue # Successfully connected connection_attempts = 0 try: logger.info(f"WebSocket connected for {self.symbol}, beginning data collection") tick_count = 0 last_tick_count_log = time.time() last_status_report = time.time() # Track stats for reporting price_min = float('inf') price_max = float('-inf') price_last = None volume_total = 0 start_collection_time = time.time() while True: if not ws.running: logger.warning(f"WebSocket connection lost for {self.symbol}, breaking loop") break data = await ws.receive() if data: if data.get('type') == 'kline': # Use kline data directly for candlestick trade_data = { 'timestamp': data['timestamp'], 'price': data['price'], 'volume': data['volume'], 'open': data['open'], 'high': data['high'], 'low': data['low'] } logger.debug(f"Received kline data: {data}") else: # Use trade data trade_data = { 'timestamp': data['timestamp'], 'price': data['price'], 'volume': data['volume'] } # Update stats price = trade_data['price'] volume = trade_data['volume'] price_min = min(price_min, price) price_max = max(price_max, price) price_last = price volume_total += volume # Store raw tick in the tick storage self.tick_storage.add_tick(trade_data) tick_count += 1 # Also update the old candlestick data for backward compatibility self.candlestick_data.update_from_trade(trade_data) # Log tick counts periodically current_time = time.time() if current_time - last_tick_count_log >= 10: # Log every 10 seconds elapsed = current_time - last_tick_count_log tps = tick_count / elapsed if elapsed > 0 else 0 logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {elapsed:.1f}s ({tps:.2f} ticks/sec), total: {len(self.tick_storage.ticks)}") last_tick_count_log = current_time tick_count = 0 # Check if ticks are being converted to candles if len(self.tick_storage.ticks) > 0: sample_df = self.tick_storage.get_candles(interval_seconds=1) logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}") # Periodic status report (every 60 seconds) if current_time - last_status_report >= 60: elapsed_total = current_time - start_collection_time logger.info(f"{self.symbol} Status Report:") logger.info(f" Collection time: {elapsed_total:.1f} seconds") logger.info(f" Price range: {price_min:.2f} - {price_max:.2f} (last: {price_last:.2f})") logger.info(f" Total volume: {volume_total:.8f}") logger.info(f" Active ticks in storage: {len(self.tick_storage.ticks)}") # Reset stats for next period last_status_report = current_time price_min = float('inf') if price_last is None else price_last price_max = float('-inf') if price_last is None else price_last volume_total = 0 await asyncio.sleep(0.01) except websockets.exceptions.ConnectionClosed as e: logger.error(f"WebSocket connection closed for {self.symbol}: {str(e)}") except Exception as e: logger.error(f"Error in WebSocket loop for {self.symbol}: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: logger.info(f"Closing WebSocket connection for {self.symbol}") await ws.close() logger.info(f"Waiting 5 seconds before reconnecting {self.symbol} WebSocket...") await asyncio.sleep(5) def run(self, host='localhost', port=8050): try: logger.info(f"Starting Dash server for {self.symbol} on {host}:{port}") self.app.run(debug=False, host=host, port=port) except Exception as e: logger.error(f"Error running Dash server on port {port}: {str(e)}") # Try an alternative port if the primary one is in use if "Address already in use" in str(e): alt_port = port + 100 logger.warning(f"Port {port} is busy, trying alternative port {alt_port}") try: self.app.run(debug=False, host=host, port=alt_port) except Exception as alt_e: logger.error(f"Error running Dash server on alternative port {alt_port}: {str(alt_e)}") else: # Re-raise other exceptions raise def _load_historical_data(self): """Load historical data for all timeframes from Binance API and local cache""" try: logger.info(f"Loading historical data for {self.symbol}...") # Define intervals to fetch intervals = { '1s': 1, '1m': 60, '1h': 3600, '1d': 86400 } # Track load status load_status = {interval: False for interval in intervals.keys()} # First try to load from local cache files logger.info("Step 1: Loading from local cache files...") for interval_key, interval_seconds in intervals.items(): try: cache_file = os.path.join(self.historical_data.cache_dir, f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv") logger.info(f"Checking for cached {interval_key} data at {cache_file}") if os.path.exists(cache_file): # Check if cache is fresh (less than 1 day old for anything but 1d, 3 days for 1d) file_age = time.time() - os.path.getmtime(cache_file) max_age = 259200 if interval_key == '1d' else 86400 # 3 days for 1d, 1 day for others logger.info(f"Cache file age: {file_age:.1f}s, max allowed: {max_age}s") if file_age <= max_age: logger.info(f"Loading {interval_key} candles from cache") cached_df = pd.read_csv(cache_file) if not cached_df.empty: # Diagnostic info about the loaded data logger.info(f"Loaded {len(cached_df)} candles from {cache_file}") logger.info(f"Columns: {cached_df.columns.tolist()}") logger.info(f"First few rows: {cached_df.head(2).to_dict('records')}") # Convert timestamp string back to datetime if 'timestamp' in cached_df.columns: try: if not pd.api.types.is_datetime64_any_dtype(cached_df['timestamp']): cached_df['timestamp'] = pd.to_datetime(cached_df['timestamp']) logger.info("Successfully converted timestamps to datetime") except Exception as e: logger.warning(f"Could not convert timestamp column for {interval_key}: {str(e)}") # Only keep the last 2000 candles for memory efficiency if len(cached_df) > 2000: cached_df = cached_df.tail(2000) logger.info(f"Truncated to last 2000 candles") # Add to cache for _, row in cached_df.iterrows(): candle_dict = row.to_dict() self.candle_cache.candles[interval_key].append(candle_dict) # Update ohlcv_cache self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000) logger.info(f"Successfully loaded {len(self.ohlcv_cache[interval_key])} cached {interval_key} candles") if len(self.ohlcv_cache[interval_key]) >= 500: load_status[interval_key] = True # Skip fetching from API if we loaded from cache (except for 1d timeframe which we always refresh) if interval_key != '1d': continue else: logger.info(f"Cache file for {interval_key} is too old ({file_age:.1f}s)") else: logger.info(f"No cache file found for {interval_key}") except Exception as e: logger.error(f"Error loading cached {interval_key} candles: {str(e)}") import traceback logger.error(traceback.format_exc()) # For timeframes other than 1s, fetch from API as backup or for fresh data logger.info("Step 2: Fetching data from API for missing timeframes...") for interval_key, interval_seconds in intervals.items(): # Skip 1s for API requests if interval_key == '1s' or load_status[interval_key]: logger.info(f"Skipping API fetch for {interval_key}: already loaded or 1s timeframe") continue # Fetch historical data from API try: logger.info(f"Fetching {interval_key} candles from API for {self.symbol}") historical_df = self.historical_data.get_historical_candles( symbol=self.symbol, interval_seconds=interval_seconds, limit=500 # Get 500 candles ) if not historical_df.empty: logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key} from API") # If we already have data in cache, merge with new data to avoid duplicates if self.ohlcv_cache[interval_key] is not None and not self.ohlcv_cache[interval_key].empty: existing_df = self.ohlcv_cache[interval_key] # Get the latest timestamp from existing data latest_time = existing_df['timestamp'].max() # Only keep newer records from API new_candles = historical_df[historical_df['timestamp'] > latest_time] if not new_candles.empty: logger.info(f"Adding {len(new_candles)} new candles to existing {interval_key} cache") # Add to cache for _, row in new_candles.iterrows(): candle_dict = row.to_dict() self.candle_cache.candles[interval_key].append(candle_dict) else: # No existing data, add all from API for _, row in historical_df.iterrows(): candle_dict = row.to_dict() self.candle_cache.candles[interval_key].append(candle_dict) # Update ohlcv_cache with combined data self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000) logger.info(f"Total {interval_key} candles in cache: {len(self.ohlcv_cache[interval_key])}") if len(self.ohlcv_cache[interval_key]) >= 500: load_status[interval_key] = True else: logger.warning(f"No historical data available from API for {self.symbol} {interval_key}") except Exception as e: logger.error(f"Error fetching {interval_key} data from API: {str(e)}") import traceback logger.error(traceback.format_exc()) # Log summary of loaded data logger.info("Historical data load summary:") for interval_key in intervals.keys(): count = len(self.ohlcv_cache[interval_key]) if self.ohlcv_cache[interval_key] is not None else 0 status = "Success" if load_status[interval_key] else "Failed" if count > 0 and count < 500: status = "Partial" logger.info(f"{interval_key}: {count} candles - {status}") except Exception as e: logger.error(f"Error in _load_historical_data: {str(e)}") import traceback logger.error(traceback.format_exc()) def _save_candles_to_disk(self, force=False): """Save current candle cache to disk for persistence between runs""" try: # Only save if we have data and sufficient time has passed (every 5 minutes) current_time = time.time() if not force and current_time - self.last_cache_save_time < 300: # 5 minutes return # Save each timeframe's candles to disk for interval_key, candles in self.candle_cache.candles.items(): if candles: # Convert to DataFrame df = pd.DataFrame(list(candles)) if not df.empty: # Ensure timestamp is properly formatted if 'timestamp' in df.columns: try: if not pd.api.types.is_datetime64_any_dtype(df['timestamp']): df['timestamp'] = pd.to_datetime(df['timestamp']) except: logger.warning(f"Could not convert timestamp column for {interval_key}") # Save to disk in the cache directory cache_file = os.path.join(self.historical_data.cache_dir, f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv") df.to_csv(cache_file, index=False) logger.info(f"Saved {len(df)} {interval_key} candles to {cache_file}") self.last_cache_save_time = current_time logger.info(f"Saved all candle caches to disk at {datetime.now()}") except Exception as e: logger.error(f"Error saving candles to disk: {str(e)}") import traceback logger.error(traceback.format_exc()) async def main(): symbols = ["ETH/USDT", "BTC/USDT"] logger.info(f"Starting application for symbols: {symbols}") charts = [] websocket_tasks = [] # Create a chart and websocket task for each symbol for symbol in symbols: chart = RealTimeChart(symbol) charts.append(chart) websocket_tasks.append(asyncio.create_task(chart.start_websocket())) # Run Dash in a separate thread to not block the event loop server_threads = [] for i, chart in enumerate(charts): port = 8050 + i # Use different ports for each chart logger.info(f"Starting chart for {chart.symbol} on port {port}") thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed thread.daemon = True thread.start() server_threads.append(thread) logger.info(f"Thread started for {chart.symbol} on port {port}") try: # Keep the main task running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: for task in websocket_tasks: task.cancel() try: await task except asyncio.CancelledError: pass if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Application terminated by user")