import asyncio import json import logging import datetime from typing import Dict, List, Optional import websockets import plotly.graph_objects as go from plotly.subplots import make_subplots import dash from dash import html, dcc from dash.dependencies import Input, Output import pandas as pd import numpy as np from collections import deque import time from threading import Thread # Configure logging with more detailed format logging.basicConfig( level=logging.DEBUG, # Changed to DEBUG for more detailed logs format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('realtime_chart.log') ] ) logger = logging.getLogger(__name__) class TradeTickStorage: """Store and manage raw trade ticks for display and candle formation""" def __init__(self, max_age_seconds: int = 300): # 5 minutes by default self.ticks = [] self.max_age_seconds = max_age_seconds self.last_cleanup_time = time.time() self.cleanup_interval = 10 # Run cleanup every 10 seconds to avoid doing it on every tick self.last_tick = None logger.info(f"Initialized TradeTickStorage with max age: {max_age_seconds} seconds") def add_tick(self, tick: Dict): """Add a new trade tick to storage""" self.ticks.append(tick) self.last_tick = tick logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}") # Only clean up periodically rather than on every tick current_time = time.time() if current_time - self.last_cleanup_time > self.cleanup_interval: self._cleanup() self.last_cleanup_time = current_time def _cleanup(self): """Remove ticks older than max_age_seconds""" now = int(time.time() * 1000) # Current time in ms cutoff = now - (self.max_age_seconds * 1000) old_count = len(self.ticks) self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] removed = old_count - len(self.ticks) if removed > 0: logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}") def get_latest_price(self) -> Optional[float]: """Get the latest price from the most recent tick""" if self.last_tick: return self.last_tick.get('price') elif self.ticks: # If last_tick not available but ticks exist, use the last tick self.last_tick = self.ticks[-1] return self.last_tick.get('price') return None def get_price_stats(self) -> Dict: """Get stats about the prices in storage""" if not self.ticks: return { 'min': None, 'max': None, 'latest': None, 'count': 0, 'age_seconds': 0 } prices = [tick['price'] for tick in self.ticks] latest_timestamp = self.ticks[-1]['timestamp'] oldest_timestamp = self.ticks[0]['timestamp'] return { 'min': min(prices), 'max': max(prices), 'latest': prices[-1], 'count': len(prices), 'age_seconds': (latest_timestamp - oldest_timestamp) / 1000 } def get_ticks_as_df(self) -> pd.DataFrame: """Return ticks as a DataFrame""" if not self.ticks: logger.warning("No ticks available for DataFrame conversion") return pd.DataFrame() # Ensure we have fresh data self._cleanup() df = pd.DataFrame(self.ticks) if not df.empty: logger.debug(f"Converting timestamps for {len(df)} ticks") # Ensure timestamp column exists if 'timestamp' not in df.columns: logger.error("Tick data missing timestamp column") return pd.DataFrame() # Check timestamp datatype before conversion sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}") # Convert timestamps to datetime try: df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') logger.debug(f"Timestamps converted to datetime successfully") if len(df) > 0: logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}") except Exception as e: logger.error(f"Error converting timestamps: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() return df def get_candles(self, interval_seconds: int = 1) -> pd.DataFrame: """Convert ticks to OHLCV candles at specified interval""" if not self.ticks: logger.warning("No ticks available for candle formation") return pd.DataFrame() # Ensure ticks are up to date self._cleanup() # Convert to DataFrame df = self.get_ticks_as_df() if df.empty: logger.warning("Tick DataFrame is empty after conversion") return pd.DataFrame() logger.info(f"Preparing to create candles from {len(df)} ticks") try: # Use timestamp column for resampling df = df.set_index('timestamp') # Create interval string for resampling - use 's' instead of deprecated 'S' interval_str = f'{interval_seconds}s' # Resample to create OHLCV candles logger.debug(f"Resampling with interval: {interval_str}") candles = df.resample(interval_str).agg({ 'price': ['first', 'max', 'min', 'last'], 'volume': 'sum' }) # Check if resampling produced any data if candles.empty: logger.warning("Resampling produced empty dataframe - check timestamp distribution") # Show timestamp ranges to diagnose potential resampling issues if not df.empty: min_time = df.index.min() max_time = df.index.max() logger.info(f"Tick timestamp range: {min_time} to {max_time}") return pd.DataFrame() # Flatten MultiIndex columns candles.columns = ['open', 'high', 'low', 'close', 'volume'] # Reset index to get timestamp as column candles = candles.reset_index() # Ensure no NaN values candles = candles.dropna() logger.debug(f"Generated {len(candles)} candles from {len(self.ticks)} ticks") return candles except Exception as e: logger.error(f"Error in candle formation: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() class CandlestickData: def __init__(self, max_length: int = 300): self.timestamps = deque(maxlen=max_length) self.opens = deque(maxlen=max_length) self.highs = deque(maxlen=max_length) self.lows = deque(maxlen=max_length) self.closes = deque(maxlen=max_length) self.volumes = deque(maxlen=max_length) self.current_candle = { 'timestamp': None, 'open': None, 'high': None, 'low': None, 'close': None, 'volume': 0 } self.candle_interval = 1 # 1 second by default def update_from_trade(self, trade: Dict): timestamp = trade['timestamp'] price = trade['price'] volume = trade.get('volume', 0) # Round timestamp to nearest candle interval candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000) if self.current_candle['timestamp'] != candle_timestamp: # Save current candle if it exists if self.current_candle['timestamp'] is not None: self.timestamps.append(self.current_candle['timestamp']) self.opens.append(self.current_candle['open']) self.highs.append(self.current_candle['high']) self.lows.append(self.current_candle['low']) self.closes.append(self.current_candle['close']) self.volumes.append(self.current_candle['volume']) logger.debug(f"New candle saved: {self.current_candle}") # Start new candle self.current_candle = { 'timestamp': candle_timestamp, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': volume } logger.debug(f"New candle started: {self.current_candle}") else: # Update current candle if self.current_candle['high'] is None or price > self.current_candle['high']: self.current_candle['high'] = price if self.current_candle['low'] is None or price < self.current_candle['low']: self.current_candle['low'] = price self.current_candle['close'] = price self.current_candle['volume'] += volume logger.debug(f"Updated current candle: {self.current_candle}") def get_dataframe(self) -> pd.DataFrame: # Include current candle in the dataframe if it exists timestamps = list(self.timestamps) opens = list(self.opens) highs = list(self.highs) lows = list(self.lows) closes = list(self.closes) volumes = list(self.volumes) if self.current_candle['timestamp'] is not None: timestamps.append(self.current_candle['timestamp']) opens.append(self.current_candle['open']) highs.append(self.current_candle['high']) lows.append(self.current_candle['low']) closes.append(self.current_candle['close']) volumes.append(self.current_candle['volume']) df = pd.DataFrame({ 'timestamp': timestamps, 'open': opens, 'high': highs, 'low': lows, 'close': closes, 'volume': volumes }) if not df.empty: df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return df class MEXCWebSocket: """MEXC-specific WebSocket implementation""" def __init__(self, symbol: str): self.symbol = symbol.replace('/', '').upper() self.ws = None self.running = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.ping_interval = 20 self.last_ping_time = 0 self.message_count = 0 # MEXC WebSocket configuration self.ws_url = "wss://wbs-api.mexc.com/ws" self.ws_sub_params = [ f"spot@public.kline.v3.api@{self.symbol}@Min1" ] self.subscribe_msgs = [ { "method": "SUBSCRIPTION", "params": self.ws_sub_params } ] logger.info(f"Initialized MEXC WebSocket for symbol: {self.symbol}") logger.debug(f"Subscribe messages: {json.dumps(self.subscribe_msgs)}") async def connect(self): while True: try: logger.info(f"Attempting to connect to {self.ws_url}") self.ws = await websockets.connect(self.ws_url) logger.info("WebSocket connection established") # Subscribe to the streams for msg in self.subscribe_msgs: logger.info(f"Sending subscription message: {json.dumps(msg)}") await self.ws.send(json.dumps(msg)) # Wait for subscription confirmation response = await self.ws.recv() logger.info(f"Subscription response: {response}") if "Not Subscribed" in response: logger.error(f"Subscription error: {response}") await self.unsubscribe() await self.close() return False self.running = True self.reconnect_delay = 1 logger.info(f"Successfully connected to MEXC WebSocket for {self.symbol}") # Start ping task asyncio.create_task(self.ping_loop()) return True except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await self.unsubscribe() await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) continue async def ping_loop(self): """Send ping messages to keep connection alive""" while self.running: try: current_time = time.time() if current_time - self.last_ping_time >= self.ping_interval: ping_msg = {"method": "PING"} logger.debug("Sending ping") await self.ws.send(json.dumps(ping_msg)) self.last_ping_time = current_time await asyncio.sleep(1) except Exception as e: logger.error(f"Error in ping loop: {str(e)}") break async def receive(self) -> Optional[Dict]: if not self.ws: return None try: message = await self.ws.recv() self.message_count += 1 if self.message_count % 10 == 0: logger.info(f"Received message #{self.message_count}") logger.debug(f"Raw message: {message[:200]}...") if isinstance(message, bytes): return None data = json.loads(message) # Handle PONG response if isinstance(data, dict) and data.get('msg') == 'PONG': logger.debug("Received pong") return None # Handle kline data if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): kline = data['data'][0] if len(kline) >= 6: kline_data = { 'timestamp': int(kline[0]), # Timestamp 'open': float(kline[1]), # Open 'high': float(kline[2]), # High 'low': float(kline[3]), # Low 'price': float(kline[4]), # Close 'volume': float(kline[5]), # Volume 'type': 'kline' } logger.info(f"Processed kline data: {kline_data}") return kline_data return None except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.running = False return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") return None except Exception as e: logger.error(f"Error receiving message: {str(e)}") return None async def unsubscribe(self): """Unsubscribe from all channels""" if self.ws: for msg in self.subscribe_msgs: unsub_msg = { "method": "UNSUBSCRIPTION", "params": msg["params"] } try: await self.ws.send(json.dumps(unsub_msg)) except: pass async def close(self): """Close the WebSocket connection""" if self.ws: await self.unsubscribe() await self.ws.close() self.running = False logger.info("WebSocket connection closed") class BinanceWebSocket: """Binance WebSocket implementation for real-time tick data""" def __init__(self, symbol: str): self.symbol = symbol.replace('/', '').lower() self.ws = None self.running = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.message_count = 0 # Binance WebSocket configuration self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade" logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}") async def connect(self): while True: try: logger.info(f"Attempting to connect to {self.ws_url}") self.ws = await websockets.connect(self.ws_url) logger.info("WebSocket connection established") self.running = True self.reconnect_delay = 1 logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}") return True except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) continue async def receive(self) -> Optional[Dict]: if not self.ws: return None try: message = await self.ws.recv() self.message_count += 1 if self.message_count % 100 == 0: # Log every 100th message to avoid spam logger.info(f"Received message #{self.message_count}") logger.debug(f"Raw message: {message[:200]}...") data = json.loads(message) # Process trade data if 'e' in data and data['e'] == 'trade': trade_data = { 'timestamp': data['T'], # Trade time 'price': float(data['p']), # Price 'volume': float(data['q']), # Quantity 'type': 'trade' } logger.debug(f"Processed trade data: {trade_data}") return trade_data return None except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.running = False return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") return None except Exception as e: logger.error(f"Error receiving message: {str(e)}") return None async def close(self): """Close the WebSocket connection""" if self.ws: await self.ws.close() self.running = False logger.info("WebSocket connection closed") class ExchangeWebSocket: """Generic WebSocket interface for cryptocurrency exchanges""" def __init__(self, symbol: str, exchange: str = "binance"): self.symbol = symbol self.exchange = exchange.lower() self.ws = None # Initialize the appropriate WebSocket implementation if self.exchange == "binance": self.ws = BinanceWebSocket(symbol) elif self.exchange == "mexc": self.ws = MEXCWebSocket(symbol) else: raise ValueError(f"Unsupported exchange: {exchange}") async def connect(self): """Connect to the exchange WebSocket""" return await self.ws.connect() async def receive(self) -> Optional[Dict]: """Receive data from the WebSocket""" return await self.ws.receive() async def close(self): """Close the WebSocket connection""" await self.ws.close() @property def running(self): """Check if the WebSocket is running""" return self.ws.running if self.ws else False class RealTimeChart: def __init__(self, symbol: str): self.symbol = symbol self.app = dash.Dash(__name__) self.candlestick_data = CandlestickData() self.tick_storage = TradeTickStorage(max_age_seconds=300) # Store 5 minutes of ticks self.ohlcv_cache = { # Cache for different intervals '1s': None, '1m': None, '1h': None, '1d': None } logger.info(f"Initializing RealTimeChart for {symbol}") # Button style button_style = { 'background-color': '#4CAF50', 'color': 'white', 'padding': '10px 15px', 'margin': '5px', 'border': 'none', 'border-radius': '5px', 'font-size': '14px', 'cursor': 'pointer', 'transition': 'background-color 0.3s', 'font-weight': 'bold' } active_button_style = { **button_style, 'background-color': '#2E7D32', 'box-shadow': '0 0 5px #2E7D32' } # Initialize the layout with improved styling self.app.layout = html.Div([ # Header with symbol and title html.Div([ html.H1(f"{symbol} Real-Time Price Chart", style={ 'textAlign': 'center', 'color': '#FFFFFF', 'fontFamily': 'Arial, sans-serif', 'margin': '10px', 'textShadow': '2px 2px 4px #000000' }), ], style={ 'backgroundColor': '#1E1E1E', 'padding': '10px', 'borderRadius': '5px', 'marginBottom': '10px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' }), # Interval selection buttons html.Div([ html.Div("Candlestick Interval:", style={ 'color': '#FFFFFF', 'marginRight': '10px', 'fontSize': '16px', 'fontWeight': 'bold' }), html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style), html.Button('5s', id='btn-5s', n_clicks=0, style=button_style), html.Button('15s', id='btn-15s', n_clicks=0, style=button_style), html.Button('30s', id='btn-30s', n_clicks=0, style=button_style), html.Button('1m', id='btn-1m', n_clicks=0, style=button_style), ], style={ 'display': 'flex', 'alignItems': 'center', 'justifyContent': 'center', 'margin': '15px', 'backgroundColor': '#2C2C2C', 'padding': '10px', 'borderRadius': '5px' }), # Store for current interval dcc.Store(id='interval-store', data={'interval': 1}), # Main chart dcc.Graph( id='live-chart', style={ 'height': '80vh', 'border': '1px solid #444444', 'borderRadius': '5px', 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' } ), # Update interval dcc.Interval( id='interval-component', interval=500, # Update every 500ms for smoother display n_intervals=0 ) ], style={ 'backgroundColor': '#121212', 'padding': '20px', 'height': '100vh', 'fontFamily': 'Arial, sans-serif' }) # Callback to update interval based on button clicks and update button styles @self.app.callback( [Output('interval-store', 'data'), Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), Output('btn-30s', 'style'), Output('btn-1m', 'style')], [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), Input('btn-30s', 'n_clicks'), Input('btn-1m', 'n_clicks')], [dash.dependencies.State('interval-store', 'data')] ) def update_interval(n1, n5, n15, n30, n60, data): ctx = dash.callback_context if not ctx.triggered: # Default state (1s selected) return ({'interval': 1}, active_button_style, button_style, button_style, button_style, button_style) button_id = ctx.triggered[0]['prop_id'].split('.')[0] if button_id == 'btn-1s': return ({'interval': 1}, active_button_style, button_style, button_style, button_style, button_style) elif button_id == 'btn-5s': return ({'interval': 5}, button_style, active_button_style, button_style, button_style, button_style) elif button_id == 'btn-15s': return ({'interval': 15}, button_style, button_style, active_button_style, button_style, button_style) elif button_id == 'btn-30s': return ({'interval': 30}, button_style, button_style, button_style, active_button_style, button_style) elif button_id == 'btn-1m': return ({'interval': 60}, button_style, button_style, button_style, button_style, active_button_style) # Default case - keep current interval and highlight appropriate button current_interval = data.get('interval', 1) styles = [button_style] * 5 # All inactive by default # Set active style based on current interval if current_interval == 1: styles[0] = active_button_style elif current_interval == 5: styles[1] = active_button_style elif current_interval == 15: styles[2] = active_button_style elif current_interval == 30: styles[3] = active_button_style elif current_interval == 60: styles[4] = active_button_style return (data, *styles) # Callback to update the chart @self.app.callback( Output('live-chart', 'figure'), [Input('interval-component', 'n_intervals'), Input('interval-store', 'data')] ) def update_chart(n, interval_data): try: interval = interval_data.get('interval', 1) logger.debug(f"Updating chart for {self.symbol} with interval {interval}s") # Get candlesticks from tick storage df = self.tick_storage.get_candles(interval_seconds=interval) # Get current price and stats using our enhanced methods current_price = self.tick_storage.get_latest_price() price_stats = self.tick_storage.get_price_stats() logger.debug(f"Current price: {current_price}, Stats: {price_stats}") fig = make_subplots( rows=6, cols=1, # Adjusted to accommodate new subcharts vertical_spacing=0.03, subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1s OHLCV', '1m OHLCV', '1h OHLCV', '1d OHLCV'), row_heights=[0.4, 0.1, 0.1, 0.1, 0.1, 0.1] # Adjusted heights ) if not df.empty and len(df) > 0: logger.debug(f"Candles dataframe shape: {df.shape}, columns: {df.columns.tolist()}") # Add candlestick chart fig.add_trace( go.Candlestick( x=df['timestamp'], open=df['open'], high=df['high'], low=df['low'], close=df['close'], name='Price', increasing_line_color='#33CC33', # Green decreasing_line_color='#FF4136' # Red ), row=1, col=1 ) # Add volume bars colors = ['#33CC33' if close >= open else '#FF4136' for close, open in zip(df['close'], df['open'])] fig.add_trace( go.Bar( x=df['timestamp'], y=df['volume'], name='Volume', marker_color=colors ), row=2, col=1 ) # Add latest price line from the candlestick data latest_price = df['close'].iloc[-1] fig.add_shape( type="line", x0=df['timestamp'].min(), y0=latest_price, x1=df['timestamp'].max(), y1=latest_price, line=dict(color="yellow", width=1, dash="dash"), row=1, col=1 ) # Annotation for last candle close price fig.add_annotation( x=df['timestamp'].max(), y=latest_price, text=f"{latest_price:.2f}", showarrow=False, font=dict(size=14, color="yellow"), xshift=50, row=1, col=1 ) # If we have a more recent price from ticks, add that too if current_price and abs(current_price - latest_price) > 0.01: # Add current price line fig.add_shape( type="line", x0=df['timestamp'].min(), y0=current_price, x1=df['timestamp'].max(), y1=current_price, line=dict(color="cyan", width=1, dash="dot"), row=1, col=1 ) # Add current price annotation fig.add_annotation( x=df['timestamp'].max(), y=current_price, text=f"Current: {current_price:.2f}", showarrow=False, font=dict(size=14, color="cyan"), xshift=50, yshift=20, row=1, col=1 ) # Fetch and cache OHLCV data for different intervals for interval_key in self.ohlcv_cache.keys(): if self.ohlcv_cache[interval_key] is None: self.ohlcv_cache[interval_key] = self.tick_storage.get_candles(interval_seconds=self._interval_to_seconds(interval_key)) # Add OHLCV subcharts for i, (interval_key, ohlcv_df) in enumerate(self.ohlcv_cache.items(), start=3): if ohlcv_df is not None and not ohlcv_df.empty: fig.add_trace( go.Candlestick( x=ohlcv_df['timestamp'], open=ohlcv_df['open'], high=ohlcv_df['high'], low=ohlcv_df['low'], close=ohlcv_df['close'], name=f'{interval_key} OHLCV', increasing_line_color='#33CC33', decreasing_line_color='#FF4136' ), row=i, col=1 ) else: # If no data, add a text annotation to the chart logger.warning(f"No data to display for {self.symbol} - tick count: {len(self.tick_storage.ticks)}") # Add a message to the empty chart fig.add_annotation( x=0.5, y=0.5, text=f"Waiting for {self.symbol} data...", showarrow=False, font=dict(size=20, color="white"), xref="paper", yref="paper" ) # Build info box text with all the statistics info_lines = [f"{self.symbol}"] # Add current price if available if current_price: info_lines.append(f"Current: {current_price:.2f} USDT") # Add price statistics if available if price_stats['count'] > 0: # Format time range age_text = f"{price_stats['age_seconds']:.1f}s" if price_stats['age_seconds'] > 60: minutes = int(price_stats['age_seconds'] / 60) seconds = int(price_stats['age_seconds'] % 60) age_text = f"{minutes}m {seconds}s" # Add price range and change if price_stats['min'] is not None and price_stats['max'] is not None: price_range = f"Range: {price_stats['min']:.2f} - {price_stats['max']:.2f}" info_lines.append(price_range) # Add tick count and time range info_lines.append(f"Ticks: {price_stats['count']} in {age_text}") # Add candle count candle_count = len(df) if not df.empty else 0 info_lines.append(f"Candles: {candle_count} ({interval}s)") # Add info box to the chart fig.add_annotation( x=0.01, y=0.99, xref="paper", yref="paper", text="
".join(info_lines), showarrow=False, font=dict(size=12, color="white"), align="left", bgcolor="rgba(0,0,50,0.7)", bordercolor="#3366CC", borderwidth=2, borderpad=5, xanchor="left", yanchor="top" ) # Update layout with improved styling interval_text = { 1: "1 second", 5: "5 seconds", 15: "15 seconds", 30: "30 seconds", 60: "1 minute" }.get(interval, f"{interval}s") fig.update_layout( title_text=f"{self.symbol} Real-Time Data ({interval_text})", title_x=0.5, # Center the title xaxis_rangeslider_visible=False, height=1200, # Adjusted height for new subcharts template='plotly_dark', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(25,25,50,1)', font=dict(family="Arial, sans-serif", size=12, color="white"), showlegend=True, legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01 ) ) # Update axes styling fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') return fig except Exception as e: logger.error(f"Error updating chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Create a minimal figure with error message fig = go.Figure() fig.add_annotation( x=0.5, y=0.5, text=f"Error updating chart: {str(e)}", showarrow=False, font=dict(size=14, color="red"), xref="paper", yref="paper" ) fig.update_layout( height=800, template='plotly_dark', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(25,25,50,1)' ) return fig def _interval_to_seconds(self, interval_key: str) -> int: """Convert interval key to seconds""" mapping = { '1s': 1, '1m': 60, '1h': 3600, '1d': 86400 } return mapping.get(interval_key, 1) async def start_websocket(self): ws = ExchangeWebSocket(self.symbol) connection_attempts = 0 max_attempts = 10 # Maximum connection attempts before longer waiting period while True: # Keep trying to maintain connection connection_attempts += 1 if not await ws.connect(): logger.error(f"Failed to connect to exchange for {self.symbol}") # Gradually increase wait time based on number of connection failures wait_time = min(5 * connection_attempts, 60) # Cap at 60 seconds logger.warning(f"Waiting {wait_time} seconds before retry (attempt {connection_attempts})") if connection_attempts >= max_attempts: logger.warning(f"Reached {max_attempts} connection attempts, taking a longer break") await asyncio.sleep(120) # 2 minutes wait after max attempts connection_attempts = 0 # Reset counter else: await asyncio.sleep(wait_time) continue # Successfully connected connection_attempts = 0 try: logger.info(f"WebSocket connected for {self.symbol}, beginning data collection") tick_count = 0 last_tick_count_log = time.time() last_status_report = time.time() # Track stats for reporting price_min = float('inf') price_max = float('-inf') price_last = None volume_total = 0 start_collection_time = time.time() while True: if not ws.running: logger.warning(f"WebSocket connection lost for {self.symbol}, breaking loop") break data = await ws.receive() if data: if data.get('type') == 'kline': # Use kline data directly for candlestick trade_data = { 'timestamp': data['timestamp'], 'price': data['price'], 'volume': data['volume'], 'open': data['open'], 'high': data['high'], 'low': data['low'] } logger.debug(f"Received kline data: {data}") else: # Use trade data trade_data = { 'timestamp': data['timestamp'], 'price': data['price'], 'volume': data['volume'] } # Update stats price = trade_data['price'] volume = trade_data['volume'] price_min = min(price_min, price) price_max = max(price_max, price) price_last = price volume_total += volume # Store raw tick in the tick storage self.tick_storage.add_tick(trade_data) tick_count += 1 # Also update the old candlestick data for backward compatibility self.candlestick_data.update_from_trade(trade_data) # Log tick counts periodically current_time = time.time() if current_time - last_tick_count_log >= 10: # Log every 10 seconds elapsed = current_time - last_tick_count_log tps = tick_count / elapsed if elapsed > 0 else 0 logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {elapsed:.1f}s ({tps:.2f} ticks/sec), total: {len(self.tick_storage.ticks)}") last_tick_count_log = current_time tick_count = 0 # Check if ticks are being converted to candles if len(self.tick_storage.ticks) > 0: sample_df = self.tick_storage.get_candles(interval_seconds=1) logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}") # Periodic status report (every 60 seconds) if current_time - last_status_report >= 60: elapsed_total = current_time - start_collection_time logger.info(f"{self.symbol} Status Report:") logger.info(f" Collection time: {elapsed_total:.1f} seconds") logger.info(f" Price range: {price_min:.2f} - {price_max:.2f} (last: {price_last:.2f})") logger.info(f" Total volume: {volume_total:.8f}") logger.info(f" Active ticks in storage: {len(self.tick_storage.ticks)}") # Reset stats for next period last_status_report = current_time price_min = float('inf') if price_last is None else price_last price_max = float('-inf') if price_last is None else price_last volume_total = 0 await asyncio.sleep(0.01) except websockets.exceptions.ConnectionClosed as e: logger.error(f"WebSocket connection closed for {self.symbol}: {str(e)}") except Exception as e: logger.error(f"Error in WebSocket loop for {self.symbol}: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: logger.info(f"Closing WebSocket connection for {self.symbol}") await ws.close() logger.info(f"Waiting 5 seconds before reconnecting {self.symbol} WebSocket...") await asyncio.sleep(5) def run(self, host='localhost', port=8050): try: logger.info(f"Starting Dash server for {self.symbol} on {host}:{port}") self.app.run(debug=False, host=host, port=port) except Exception as e: logger.error(f"Error running Dash server on port {port}: {str(e)}") # Try an alternative port if the primary one is in use if "Address already in use" in str(e): alt_port = port + 100 logger.warning(f"Port {port} is busy, trying alternative port {alt_port}") try: self.app.run(debug=False, host=host, port=alt_port) except Exception as alt_e: logger.error(f"Error running Dash server on alternative port {alt_port}: {str(alt_e)}") else: # Re-raise other exceptions raise async def main(): symbols = ["ETH/USDT", "BTC/USDT"] logger.info(f"Starting application for symbols: {symbols}") charts = [] websocket_tasks = [] # Create a chart and websocket task for each symbol for symbol in symbols: chart = RealTimeChart(symbol) charts.append(chart) websocket_tasks.append(asyncio.create_task(chart.start_websocket())) # Run Dash in a separate thread to not block the event loop server_threads = [] for i, chart in enumerate(charts): port = 8050 + i # Use different ports for each chart logger.info(f"Starting chart for {chart.symbol} on port {port}") thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed thread.daemon = True thread.start() server_threads.append(thread) logger.info(f"Thread started for {chart.symbol} on port {port}") try: # Keep the main task running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: for task in websocket_tasks: task.cancel() try: await task except asyncio.CancelledError: pass if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Application terminated by user")