From 619e39ac9b3985e82fa4b9d135c603ae296406a3 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 28 Jul 2025 10:26:47 +0300 Subject: [PATCH] binance WS api enhanced --- core/enhanced_cob_websocket.py | 767 +++++++++++++++++++++++++++++++-- web/clean_dashboard.py | 101 ++++- 2 files changed, 811 insertions(+), 57 deletions(-) diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index b00ccf3..2167924 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -10,6 +10,7 @@ Robust WebSocket implementation for Consolidated Order Book data with: - Dashboard integration with status updates This replaces the existing COB WebSocket implementation with a more reliable version. +binance DOCS: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams """ import asyncio @@ -58,20 +59,46 @@ class COBWebSocketStatus: self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) class EnhancedCOBWebSocket: - """Enhanced COB WebSocket with robust error handling and fallback""" + """Enhanced COB WebSocket with multi-stream support and robust error handling + + Subscribes to multiple Binance WebSocket streams: + - Order book depth updates (@depth@1000ms) + - 1-second candlestick data (@kline_1s) with timezone support + - 24hr ticker statistics (@ticker) - includes volume data + - Aggregated trade data (@aggTrade) - for real-time volume analysis + + Features: + - Binance WebSocket compliance (ping/pong, rate limiting, 24hr reconnection) + - Proper order book synchronization with REST API snapshots + - Real-time volume metrics and trade analysis + - Timezone offset support for kline streams (UTC or UTC+8) + - Fallback to REST API when WebSocket fails + + Usage Examples: + # UTC timezone (default) + cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT']) + + # UTC+8 timezone for kline streams + cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT'], timezone_offset='+08:00') + + # Get kline data with timezone conversion + kline_utc8 = cob_ws.get_kline_with_timezone('BTC/USDT', '+08:00') + """ _instances = {} # Track instances by symbols to prevent duplicates - def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None): + def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None): """ Initialize Enhanced COB WebSocket Args: symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) dashboard_callback: Callback function for dashboard status updates + timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback + self.timezone_offset = timezone_offset # None for UTC, '+08:00' for UTC+8 # Check for existing instances to prevent duplicate connections symbols_key = tuple(sorted(self.symbols)) @@ -97,9 +124,16 @@ class EnhancedCOBWebSocket: # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} - # Rate limiting for message processing + # Order book management for proper diff depth stream handling + self.order_books: Dict[str, Dict] = {} # {symbol: {'bids': {price: qty}, 'asks': {price: qty}}} + self.last_update_ids: Dict[str, int] = {} # Track last update IDs for synchronization + self.order_book_initialized: Dict[str, bool] = {} # Track if order book is properly initialized + + # Rate limiting for message processing (Binance: max 5 messages per second) self.last_message_time: Dict[str, datetime] = {} - self.min_message_interval = 0.1 # Minimum 100ms between messages per symbol + self.min_message_interval = 0.2 # 200ms = 5 messages per second compliance + self.message_count: Dict[str, int] = {} + self.message_window_start: Dict[str, datetime] = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} @@ -113,6 +147,14 @@ class EnhancedCOBWebSocket: self.max_depth = 1000 # Maximum depth for order book self.update_speed = '1000ms' # Binance update speed - reduced for stability + # Timezone configuration + if self.timezone_offset == '+08:00': + logger.info("🕐 Configured for UTC+8 timezone (Asian markets)") + elif self.timezone_offset: + logger.info(f"🕐 Configured for {self.timezone_offset} timezone") + else: + logger.info("🕐 Configured for UTC timezone (default)") + logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: logger.error("WebSockets module not available - COB data will be limited to REST API") @@ -239,14 +281,20 @@ class EnhancedCOBWebSocket: return # Initialize order book state for proper WebSocket synchronization - self.order_books[symbol] = { - 'bids': {float(price): float(qty) for price, qty in data['bids']}, - 'asks': {float(price): float(qty) for price, qty in data['asks']} - } + if symbol not in self.order_books: + self.order_books[symbol] = {'bids': {}, 'asks': {}} + + # Clear existing data and populate with snapshot + self.order_books[symbol]['bids'] = {float(price): float(qty) for price, qty in data['bids'] if float(qty) > 0} + self.order_books[symbol]['asks'] = {float(price): float(qty) for price, qty in data['asks'] if float(qty) > 0} # Store last update ID for synchronization if 'lastUpdateId' in data: self.last_update_ids[symbol] = data['lastUpdateId'] + logger.debug(f"Order book snapshot for {symbol}: lastUpdateId = {data['lastUpdateId']}") + + # Mark as initialized + self.order_book_initialized[symbol] = True logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") @@ -344,7 +392,11 @@ class EnhancedCOBWebSocket: async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic - Uses depth@1000ms for stable updates with maximum depth. + Subscribes to multiple streams via combined stream endpoint: + - Order book depth (@depth@1000ms) + - 1-second candlesticks (@kline_1s) + - 24hr ticker with volume (@ticker) + - Aggregated trades (@aggTrade) """ status = self.status[symbol] @@ -353,47 +405,118 @@ class EnhancedCOBWebSocket: logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 - # Create WebSocket URL with reasonable update rate - use depth@1000ms for stable updates + # Create WebSocket URL with combined streams for multiple data types ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT - ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@1000ms" + + # Subscribe to multiple streams: + # 1. depth@1000ms - Order book depth updates + # 2. kline_1s - 1-second candlestick data (with optional timezone offset) + # 3. ticker - 24hr ticker statistics (includes volume) + # 4. aggTrade - Aggregated trade data for volume analysis + + # Configure kline stream with timezone offset if specified + if self.timezone_offset: + kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}" + logger.info(f"Using timezone offset {self.timezone_offset} for kline stream") + else: + kline_stream = f"{ws_symbol}@kline_1s" + logger.info("Using UTC timezone for kline stream") + + streams = [ + f"{ws_symbol}@depth@1000ms", # Order book depth + kline_stream, # 1-second candlesticks (with timezone) + f"{ws_symbol}@ticker", # 24hr ticker with volume + f"{ws_symbol}@aggTrade" # Aggregated trades + ] + + # Use combined stream endpoint + streams_param = "/".join(streams) + ws_url = f"wss://stream.binance.com:9443/stream?streams={streams_param}" logger.info(f"Connecting to: {ws_url}") - async with websockets_connect(ws_url) as websocket: + # Add ping/pong handling and proper connection management + async with websockets_connect( + ws_url, + ping_interval=20, # Binance sends ping every 20 seconds + ping_timeout=60, # Binance disconnects after 1 minute without pong + close_timeout=10 + ) as websocket: # Connection successful status.connected = True status.last_error = None status.reset_reconnect_delay() - logger.info(f"WebSocket connected for {symbol}") + logger.info(f"WebSocket connected for {symbol} with ping/pong handling") await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") # Deactivate REST fallback if self.rest_fallback_active[symbol]: await self._stop_rest_fallback(symbol) - # Message receiving loop - async for message in websocket: - try: - # Rate limiting: skip messages that come too frequently - now = datetime.now() - if symbol in self.last_message_time: - time_since_last = (now - self.last_message_time[symbol]).total_seconds() - if time_since_last < self.min_message_interval: - continue # Skip this message - - self.last_message_time[symbol] = now - - data = json.loads(message) - await self._process_websocket_message(symbol, data) - - status.last_message_time = now - status.messages_received += 1 - - except json.JSONDecodeError as e: - logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}") - except Exception as e: - logger.error(f"Error processing WebSocket message for {symbol}: {e}") + # Track connection start time for 24-hour limit compliance + connection_start = datetime.now() + + # Message receiving loop with proper ping/pong handling + try: + async for message in websocket: + try: + # Check for 24-hour connection limit (Binance requirement) + if (datetime.now() - connection_start).total_seconds() > 23.5 * 3600: # 23.5 hours + logger.info(f"Approaching 24-hour limit for {symbol}, reconnecting...") + break + + # Handle different message types + if isinstance(message, bytes): + # Handle ping frames (though websockets library handles this automatically) + continue + + # Rate limiting: Binance allows max 5 messages per second + now = datetime.now() + + # Initialize rate limiting tracking + if symbol not in self.message_window_start: + self.message_window_start[symbol] = now + self.message_count[symbol] = 0 + + # Reset counter every second + if (now - self.message_window_start[symbol]).total_seconds() >= 1.0: + self.message_window_start[symbol] = now + self.message_count[symbol] = 0 + + # Check rate limit (5 messages per second) + if self.message_count[symbol] >= 5: + continue # Skip this message to comply with rate limit + + self.message_count[symbol] += 1 + self.last_message_time[symbol] = now + + # Parse JSON message + data = json.loads(message) + + # Handle combined stream format + if 'stream' in data and 'data' in data: + # Combined stream format: {"stream":"","data":} + stream_name = data['stream'] + stream_data = data['data'] + await self._process_combined_stream_message(symbol, stream_name, stream_data) + else: + # Single stream format (fallback) + await self._process_websocket_message(symbol, data) + + status.last_message_time = now + status.messages_received += 1 + + except json.JSONDecodeError as e: + logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}") + except Exception as e: + logger.error(f"Error processing WebSocket message for {symbol}: {e}") + + except websockets.exceptions.ConnectionClosedOK: + logger.info(f"WebSocket connection closed normally for {symbol}") + except websockets.exceptions.ConnectionClosedError as e: + logger.warning(f"WebSocket connection closed with error for {symbol}: {e}") + raise except ConnectionClosed as e: status.connected = False @@ -423,13 +546,21 @@ class EnhancedCOBWebSocket: async def _process_websocket_message(self, symbol: str, data: Dict): """Process WebSocket message and convert to COB format - Based on the working implementation from cob_realtime_dashboard.py - Using maximum depth for best performance - no order book maintenance needed. + Properly handles Binance depth stream format according to documentation: + - depthUpdate events with U (first update ID) and u (final update ID) + - Maintains local order book with proper synchronization """ try: + # Check if this is a depth update event + if data.get('e') == 'depthUpdate': + # Handle diff depth stream according to Binance documentation + await self._handle_depth_update(symbol, data) + return + + # Fallback: handle partial book depth format (depth5, depth10, depth20) # Extract bids and asks from the message - handle all possible formats - bids_data = data.get('b', []) - asks_data = data.get('a', []) + bids_data = data.get('bids', data.get('b', [])) + asks_data = data.get('asks', data.get('a', [])) # Process the order book data - filter out zero quantities # Binance uses 0 quantity to indicate removal from the book @@ -575,6 +706,386 @@ class EnhancedCOBWebSocket: import traceback logger.debug(traceback.format_exc()) + async def _handle_depth_update(self, symbol: str, data: Dict): + """Handle Binance depth update events according to documentation""" + try: + # Extract update IDs + first_update_id = data.get('U') # First update ID in event + final_update_id = data.get('u') # Final update ID in event + + if first_update_id is None or final_update_id is None: + logger.warning(f"Missing update IDs in depth update for {symbol}") + return + + # Initialize order book if needed + if symbol not in self.order_book_initialized or not self.order_book_initialized[symbol]: + # Get initial snapshot from REST API + await self._initialize_order_book(symbol) + + # Check if we have a valid order book + if symbol not in self.order_books: + logger.warning(f"Order book not initialized for {symbol}, skipping update") + return + + # Get last update ID for this symbol + last_update_id = self.last_update_ids.get(symbol, 0) + + # Apply Binance synchronization rules + if final_update_id <= last_update_id: + # Event is older than our current state, ignore + return + + if first_update_id > last_update_id + 1: + # Gap detected, need to reinitialize + logger.warning(f"Gap detected in depth updates for {symbol}, reinitializing order book") + self.order_book_initialized[symbol] = False + await self._initialize_order_book(symbol) + return + + # Apply updates to local order book + bids_updates = data.get('b', []) + asks_updates = data.get('a', []) + + # Update bids + for bid_update in bids_updates: + if len(bid_update) >= 2: + price = float(bid_update[0]) + quantity = float(bid_update[1]) + + if quantity == 0: + # Remove price level + self.order_books[symbol]['bids'].pop(price, None) + else: + # Update price level + self.order_books[symbol]['bids'][price] = quantity + + # Update asks + for ask_update in asks_updates: + if len(ask_update) >= 2: + price = float(ask_update[0]) + quantity = float(ask_update[1]) + + if quantity == 0: + # Remove price level + self.order_books[symbol]['asks'].pop(price, None) + else: + # Update price level + self.order_books[symbol]['asks'][price] = quantity + + # Update last update ID + self.last_update_ids[symbol] = final_update_id + + # Convert to COB format and notify callbacks + await self._create_cob_from_order_book(symbol) + + except Exception as e: + logger.error(f"Error handling depth update for {symbol}: {e}") + + async def _initialize_order_book(self, symbol: str): + """Initialize order book from REST API snapshot""" + try: + # Get snapshot (this method already exists) + await self._get_order_book_snapshot(symbol) + self.order_book_initialized[symbol] = True + logger.info(f"Order book initialized for {symbol}") + except Exception as e: + logger.error(f"Failed to initialize order book for {symbol}: {e}") + self.order_book_initialized[symbol] = False + + async def _create_cob_from_order_book(self, symbol: str): + """Create COB data from maintained order book""" + try: + if symbol not in self.order_books: + return + + order_book = self.order_books[symbol] + + # Sort and convert to list format + bids = [{'price': price, 'size': qty} for price, qty in order_book['bids'].items()] + asks = [{'price': price, 'size': qty} for price, qty in order_book['asks'].items()] + + # Sort bids (descending) and asks (ascending) + bids.sort(key=lambda x: x['price'], reverse=True) + asks.sort(key=lambda x: x['price']) + + # Limit depth + max_depth = 1000 + if len(bids) > max_depth: + bids = bids[:max_depth] + if len(asks) > max_depth: + asks = asks[:max_depth] + + # Create COB data structure + cob_data = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': bids, + 'asks': asks, + 'source': 'binance_depth_stream', + 'exchange': 'binance' + } + + # Calculate stats if we have valid data + if bids and asks: + best_bid = bids[0] + best_ask = asks[0] + + mid_price = (best_bid['price'] + best_ask['price']) / 2 + spread = best_ask['price'] - best_bid['price'] + spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 + + # Calculate volumes (top 20 levels for performance) + top_bids = bids[:20] + top_asks = asks[:20] + + bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) + ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) + + total_volume = bid_volume + ask_volume + volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 + + cob_data['stats'] = { + 'best_bid': best_bid['price'], + 'best_ask': best_ask['price'], + 'mid_price': mid_price, + 'spread': spread, + 'spread_bps': spread_bps, + 'bid_volume': bid_volume, + 'ask_volume': ask_volume, + 'imbalance': volume_imbalance, + 'bid_levels': len(bids), + 'ask_levels': len(asks), + 'timestamp': datetime.now().isoformat(), + 'update_id': self.last_update_ids.get(symbol, 0) + } + + # Update cache + self.latest_cob_data[symbol] = cob_data + + # Notify callbacks + for callback in self.cob_callbacks: + try: + await callback(symbol, cob_data) + except Exception as e: + logger.error(f"Error in COB callback: {e}") + + logger.debug(f"{symbol}: Depth stream update - ${cob_data.get('stats', {}).get('mid_price', 0):.2f}") + + except Exception as e: + logger.error(f"Error creating COB from order book for {symbol}: {e}") + + async def _process_combined_stream_message(self, symbol: str, stream_name: str, data: Dict): + """Process messages from combined stream format""" + try: + # Extract stream type from stream name (e.g., "btcusdt@depth@1000ms" -> "depth") + stream_parts = stream_name.split('@') + if len(stream_parts) < 2: + logger.warning(f"Invalid stream name format: {stream_name}") + return + + stream_type = stream_parts[1] + + # Route to appropriate handler based on stream type + if stream_type == 'depth': + await self._handle_depth_stream(symbol, data) + elif stream_type == 'kline_1s': + await self._handle_kline_stream(symbol, data) + elif stream_type == 'ticker': + await self._handle_ticker_stream(symbol, data) + elif stream_type == 'aggTrade': + await self._handle_aggtrade_stream(symbol, data) + else: + logger.debug(f"Unhandled stream type: {stream_type} for {symbol}") + + except Exception as e: + logger.error(f"Error processing combined stream message for {symbol}: {e}") + + async def _handle_depth_stream(self, symbol: str, data: Dict): + """Handle depth stream data (order book updates)""" + try: + # Check if this is a depth update event + if data.get('e') == 'depthUpdate': + await self._handle_depth_update(symbol, data) + else: + # Handle partial book depth format + await self._process_websocket_message(symbol, data) + except Exception as e: + logger.error(f"Error handling depth stream for {symbol}: {e}") + + async def _handle_kline_stream(self, symbol: str, data: Dict): + """Handle 1-second kline/candlestick data with timezone support""" + try: + if data.get('e') != 'kline': + return + + kline_data = data.get('k', {}) + if not kline_data: + return + + # Extract timestamps (always in UTC milliseconds according to Binance docs) + open_time_ms = kline_data.get('t', 0) + close_time_ms = kline_data.get('T', 0) + event_time_ms = data.get('E', 0) + + # Convert timestamps to datetime objects + open_time_utc = datetime.fromtimestamp(open_time_ms / 1000) if open_time_ms else None + close_time_utc = datetime.fromtimestamp(close_time_ms / 1000) if close_time_ms else None + event_time_utc = datetime.fromtimestamp(event_time_ms / 1000) if event_time_ms else datetime.now() + + # Extract candlestick data + candlestick = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'event_time': event_time_utc, + 'open_time': open_time_ms, # Raw timestamp (ms) + 'close_time': close_time_ms, # Raw timestamp (ms) + 'open_time_utc': open_time_utc, # Converted to UTC datetime + 'close_time_utc': close_time_utc, # Converted to UTC datetime + 'interval': kline_data.get('i', '1s'), + 'open_price': float(kline_data.get('o', 0)), + 'high_price': float(kline_data.get('h', 0)), + 'low_price': float(kline_data.get('l', 0)), + 'close_price': float(kline_data.get('c', 0)), + 'volume': float(kline_data.get('v', 0)), # Base asset volume + 'quote_volume': float(kline_data.get('q', 0)), # Quote asset volume + 'trades_count': kline_data.get('n', 0), + 'taker_buy_volume': float(kline_data.get('V', 0)), # Taker buy base asset volume + 'taker_buy_quote_volume': float(kline_data.get('Q', 0)), # Taker buy quote asset volume + 'is_closed': kline_data.get('x', False), # Is this kline closed? + 'first_trade_id': kline_data.get('f', 0), + 'last_trade_id': kline_data.get('L', 0), + 'timezone_offset': self.timezone_offset, # Timezone info + 'source': f'binance_kline_1s_{self.timezone_offset or "utc"}' + } + + # Store latest candlestick data + if not hasattr(self, 'latest_candlestick_data'): + self.latest_candlestick_data = {} + self.latest_candlestick_data[symbol] = candlestick + + # Notify callbacks with candlestick data + for callback in self.cob_callbacks: + try: + await callback(symbol, {'type': 'candlestick', 'data': candlestick}) + except Exception as e: + logger.error(f"Error in candlestick callback: {e}") + + logger.debug(f"{symbol}: 1s Kline - O:{candlestick['open_price']:.2f} H:{candlestick['high_price']:.2f} L:{candlestick['low_price']:.2f} C:{candlestick['close_price']:.2f} V:{candlestick['volume']:.2f}") + + except Exception as e: + logger.error(f"Error handling kline stream for {symbol}: {e}") + + async def _handle_ticker_stream(self, symbol: str, data: Dict): + """Handle 24hr ticker data (includes volume statistics)""" + try: + if data.get('e') != '24hrTicker': + return + + # Extract ticker data + ticker = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'price_change': float(data.get('p', 0)), + 'price_change_percent': float(data.get('P', 0)), + 'weighted_avg_price': float(data.get('w', 0)), + 'last_price': float(data.get('c', 0)), + 'last_qty': float(data.get('Q', 0)), + 'best_bid_price': float(data.get('b', 0)), + 'best_bid_qty': float(data.get('B', 0)), + 'best_ask_price': float(data.get('a', 0)), + 'best_ask_qty': float(data.get('A', 0)), + 'open_price': float(data.get('o', 0)), + 'high_price': float(data.get('h', 0)), + 'low_price': float(data.get('l', 0)), + 'volume': float(data.get('v', 0)), # 24hr volume + 'quote_volume': float(data.get('q', 0)), # 24hr quote volume + 'open_time': data.get('O', 0), + 'close_time': data.get('C', 0), + 'first_trade_id': data.get('F', 0), + 'last_trade_id': data.get('L', 0), + 'trades_count': data.get('n', 0), + 'source': 'binance_24hr_ticker' + } + + # Store latest ticker data + if not hasattr(self, 'latest_ticker_data'): + self.latest_ticker_data = {} + self.latest_ticker_data[symbol] = ticker + + # Notify callbacks with ticker data + for callback in self.cob_callbacks: + try: + await callback(symbol, {'type': 'ticker', 'data': ticker}) + except Exception as e: + logger.error(f"Error in ticker callback: {e}") + + logger.debug(f"{symbol}: 24hr Ticker - Price:{ticker['last_price']:.2f} Volume:{ticker['volume']:.2f} Change:{ticker['price_change_percent']:.2f}%") + + except Exception as e: + logger.error(f"Error handling ticker stream for {symbol}: {e}") + + async def _handle_aggtrade_stream(self, symbol: str, data: Dict): + """Handle aggregated trade data for real-time volume analysis""" + try: + if data.get('e') != 'aggTrade': + return + + # Extract aggregated trade data + agg_trade = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'trade_id': data.get('a', 0), + 'price': float(data.get('p', 0)), + 'quantity': float(data.get('q', 0)), + 'first_trade_id': data.get('f', 0), + 'last_trade_id': data.get('l', 0), + 'trade_time': data.get('T', 0), + 'is_buyer_maker': data.get('m', False), # True if buyer is market maker + 'notional_value': float(data.get('p', 0)) * float(data.get('q', 0)), + 'source': 'binance_aggtrade' + } + + # Store latest trade data + if not hasattr(self, 'latest_trade_data'): + self.latest_trade_data = {} + if symbol not in self.latest_trade_data: + self.latest_trade_data[symbol] = [] + + # Keep last 100 trades for volume analysis + self.latest_trade_data[symbol].append(agg_trade) + if len(self.latest_trade_data[symbol]) > 100: + self.latest_trade_data[symbol] = self.latest_trade_data[symbol][-100:] + + # Calculate real-time volume metrics + recent_trades = self.latest_trade_data[symbol][-10:] # Last 10 trades + buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) + sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) + total_volume = buy_volume + sell_volume + + volume_metrics = { + 'buy_volume': buy_volume, + 'sell_volume': sell_volume, + 'total_volume': total_volume, + 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), + 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0 + } + + # Notify callbacks with trade data + for callback in self.cob_callbacks: + try: + await callback(symbol, { + 'type': 'aggtrade', + 'data': agg_trade, + 'volume_metrics': volume_metrics + }) + except Exception as e: + logger.error(f"Error in aggtrade callback: {e}") + + logger.debug(f"{symbol}: AggTrade - Price:{agg_trade['price']:.2f} Qty:{agg_trade['quantity']:.4f} BuyerMaker:{agg_trade['is_buyer_maker']}") + + except Exception as e: + logger.error(f"Error handling aggtrade stream for {symbol}: {e}") + async def _start_rest_fallback(self, symbol: str): """Start REST API fallback for a symbol""" if self.rest_fallback_active[symbol]: @@ -682,8 +1193,172 @@ class EnhancedCOBWebSocket: for symbol in self.symbols: status = self.status[symbol] - # Check for stale connections + # Check for stale connections (no messages for 30 seconds) if status.connected and status.last_message_time: + time_since_last = (datetime.now() - status.last_message_time).total_seconds() + if time_since_last > 30: + logger.warning(f"Stale connection detected for {symbol}: {time_since_last:.1f}s since last message") + status.connected = False + status.last_error = f"Stale connection: {time_since_last:.1f}s without messages" + await self._notify_dashboard_status(symbol, "stale", status.last_error) + + # Log connection status + if status.connected: + logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") + else: + logger.debug(f"{symbol}: Disconnected - {status.last_error}") + + # Log overall system status + connected_count = sum(1 for status in self.status.values() if status.connected) + logger.debug(f"COB WebSocket status: {connected_count}/{len(self.symbols)} symbols connected") + + except Exception as e: + logger.error(f"Error in connection monitor: {e}") + + await asyncio.sleep(10) + + async def _notify_dashboard_status(self, symbol: str, status: str, message: str): + """Notify dashboard of WebSocket status changes""" + try: + if self.dashboard_callback: + status_data = { + 'type': 'websocket_status', + 'data': { + 'status': status, + 'message': message, + 'timestamp': datetime.now().isoformat() + } + } + await self.dashboard_callback(symbol, status_data) + except Exception as e: + logger.error(f"Error notifying dashboard status for {symbol}: {e}") + + def get_latest_candlestick(self, symbol: str) -> Optional[Dict]: + """Get the latest 1-second candlestick data for a symbol""" + if hasattr(self, 'latest_candlestick_data'): + return self.latest_candlestick_data.get(symbol) + return None + + def get_latest_ticker(self, symbol: str) -> Optional[Dict]: + """Get the latest 24hr ticker data for a symbol""" + if hasattr(self, 'latest_ticker_data'): + return self.latest_ticker_data.get(symbol) + return None + + def get_latest_trades(self, symbol: str, count: int = 10) -> List[Dict]: + """Get the latest aggregated trades for a symbol""" + if hasattr(self, 'latest_trade_data') and symbol in self.latest_trade_data: + return self.latest_trade_data[symbol][-count:] + return [] + + def get_volume_metrics(self, symbol: str, timeframe_seconds: int = 60) -> Dict: + """Calculate volume metrics for a given timeframe""" + try: + if not hasattr(self, 'latest_trade_data') or symbol not in self.latest_trade_data: + return {'error': 'No trade data available'} + + current_time = datetime.now().timestamp() * 1000 # Convert to milliseconds + cutoff_time = current_time - (timeframe_seconds * 1000) + + # Filter trades within timeframe + recent_trades = [ + trade for trade in self.latest_trade_data[symbol] + if trade['trade_time'] >= cutoff_time + ] + + if not recent_trades: + return {'error': 'No recent trades in timeframe'} + + # Calculate metrics + buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) + sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) + total_volume = buy_volume + sell_volume + trade_count = len(recent_trades) + + avg_trade_size = total_volume / trade_count if trade_count > 0 else 0 + + return { + 'timeframe_seconds': timeframe_seconds, + 'trade_count': trade_count, + 'buy_volume': buy_volume, + 'sell_volume': sell_volume, + 'total_volume': total_volume, + 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), + 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0, + 'avg_trade_size': avg_trade_size, + 'timestamp': datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error calculating volume metrics for {symbol}: {e}") + return {'error': str(e)} + + def _convert_timestamp_to_timezone(self, timestamp_ms: int, target_timezone: str = None) -> datetime: + """Convert UTC timestamp to specified timezone + + Args: + timestamp_ms: UTC timestamp in milliseconds + target_timezone: Target timezone (e.g., '+08:00' for UTC+8) + + Returns: + datetime object in target timezone + """ + try: + from datetime import timezone, timedelta + + # Convert to UTC datetime + utc_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + + if not target_timezone: + return utc_dt + + # Parse timezone offset (e.g., '+08:00') + if target_timezone.startswith('+') or target_timezone.startswith('-'): + sign = 1 if target_timezone.startswith('+') else -1 + hours, minutes = map(int, target_timezone[1:].split(':')) + offset = timedelta(hours=sign * hours, minutes=sign * minutes) + target_tz = timezone(offset) + + return utc_dt.astimezone(target_tz) + + return utc_dt + + except Exception as e: + logger.error(f"Error converting timestamp to timezone: {e}") + return datetime.fromtimestamp(timestamp_ms / 1000) + + def get_kline_with_timezone(self, symbol: str, target_timezone: str = None) -> Optional[Dict]: + """Get latest kline data with timezone conversion + + Args: + symbol: Trading symbol + target_timezone: Target timezone (e.g., '+08:00' for UTC+8) + + Returns: + Kline data with timezone-converted timestamps + """ + try: + candlestick = self.get_latest_candlestick(symbol) + if not candlestick: + return None + + # Create a copy with timezone conversions + result = candlestick.copy() + + if target_timezone and candlestick.get('open_time'): + result['open_time_local'] = self._convert_timestamp_to_timezone( + candlestick['open_time'], target_timezone + ) + result['close_time_local'] = self._convert_timestamp_to_timezone( + candlestick['close_time'], target_timezone + ) + result['target_timezone'] = target_timezone + + return result + + except Exception as e: + logger.error(f"Error getting kline with timezone for {symbol}: {e}") + return None status.last_message_time: time_since_last = datetime.now() - status.last_message_time if time_since_last > timedelta(seconds=30): logger.warning(f"No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") @@ -763,12 +1438,18 @@ class EnhancedCOBWebSocket: # Global instance for easy access enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None -async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None) -> EnhancedCOBWebSocket: - """Get or create the global enhanced COB WebSocket instance""" +async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None) -> EnhancedCOBWebSocket: + """Get or create the global enhanced COB WebSocket instance + + Args: + symbols: List of symbols to monitor + dashboard_callback: Callback function for dashboard updates + timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) + """ global enhanced_cob_websocket if enhanced_cob_websocket is None: - enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback) + enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback, timezone_offset) await enhanced_cob_websocket.start() return enhanced_cob_websocket diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 6255b06..5b84f51 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -227,8 +227,20 @@ class CleanTradingDashboard: # COB data cache - enhanced with price buckets and memory system self.cob_cache: dict = { - 'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}, - 'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0} + 'ETH/USDT': { + 'last_update': 0, + 'data': None, + 'updates_count': 0, + 'update_times': [], + 'update_rate': 0.0 + }, + 'BTC/USDT': { + 'last_update': 0, + 'data': None, + 'updates_count': 0, + 'update_times': [], + 'update_rate': 0.0 + } } self.latest_cob_data: dict = {} # Cache for COB integration data self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display) @@ -317,20 +329,46 @@ class CleanTradingDashboard: def _on_cob_data_update(self, symbol: str, cob_data: dict): """Handle COB data updates from data provider""" try: + # Also update the COB cache for status display if not hasattr(self, 'cob_cache'): self.cob_cache = {} if symbol not in self.cob_cache: - self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + self.cob_cache[symbol] = { + 'last_update': 0, + 'data': None, + 'updates_count': 0, + 'update_times': [], # Track recent update times for rate calculation + 'update_rate': 0.0 + } # Update cache + current_time = time.time() self.cob_cache[symbol]['data'] = cob_data - self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['last_update'] = current_time self.cob_cache[symbol]['updates_count'] += 1 self.cob_cache[symbol]['websocket_status'] = 'connected' self.cob_cache[symbol]['source'] = 'data_provider' + # Track update times for rate calculation (keep last 60 seconds) + self.cob_cache[symbol]['update_times'].append(current_time) + # Remove updates older than 60 seconds + cutoff_time = current_time - 60 + self.cob_cache[symbol]['update_times'] = [ + t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time + ] + + # Calculate update rate (updates per second) + if len(self.cob_cache[symbol]['update_times']) > 1: + time_span = current_time - self.cob_cache[symbol]['update_times'][0] + if time_span > 0: + self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span + else: + self.cob_cache[symbol]['update_rate'] = 0.0 + else: + self.cob_cache[symbol]['update_rate'] = 0.0 + logger.info(f"📊 Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})") # Continue with existing logic @@ -374,7 +412,6 @@ class CleanTradingDashboard: if not hasattr(self, 'cob_last_update'): self.cob_last_update = {} - import time self.cob_last_update[symbol] = time.time() # Update current price from COB data @@ -809,21 +846,22 @@ class CleanTradingDashboard: if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode: mexc_status = "LIVE+SYNC" # Indicate live trading with position sync - # COB WebSocket status + # COB WebSocket status with update rate cob_status = self.get_cob_websocket_status() overall_status = cob_status.get('overall_status', 'unknown') warning_message = cob_status.get('warning_message') + update_rate = cob_status.get('update_rate', 0.0) if overall_status == 'all_connected': - cob_status_str = "Connected" + cob_status_str = f"Connected ({update_rate:.1f}/s)" elif overall_status == 'partial_fallback': - cob_status_str = "Fallback" + cob_status_str = f"Fallback ({update_rate:.1f}/s)" elif overall_status == 'degraded': - cob_status_str = "Degraded" + cob_status_str = f"Degraded ({update_rate:.1f}/s)" elif overall_status == 'unavailable': cob_status_str = "N/A" else: - cob_status_str = "Error" + cob_status_str = f"Error ({update_rate:.1f}/s)" return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, cob_status_str, mexc_status @@ -6548,13 +6586,38 @@ class CleanTradingDashboard: self.cob_cache = {} if symbol not in self.cob_cache: - self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + self.cob_cache[symbol] = { + 'last_update': 0, + 'data': None, + 'updates_count': 0, + 'update_times': [], # Track recent update times for rate calculation + 'update_rate': 0.0 + } # Update cache with orchestrator data + current_time = time.time() self.cob_cache[symbol]['data'] = cob_data - self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['last_update'] = current_time self.cob_cache[symbol]['updates_count'] += 1 + # Track update times for rate calculation (keep last 60 seconds) + self.cob_cache[symbol]['update_times'].append(current_time) + # Remove updates older than 60 seconds + cutoff_time = current_time - 60 + self.cob_cache[symbol]['update_times'] = [ + t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time + ] + + # Calculate update rate (updates per second) + if len(self.cob_cache[symbol]['update_times']) > 1: + time_span = current_time - self.cob_cache[symbol]['update_times'][0] + if time_span > 0: + self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span + else: + self.cob_cache[symbol]['update_rate'] = 0.0 + else: + self.cob_cache[symbol]['update_rate'] = 0.0 + # Set WebSocket status based on data source if isinstance(cob_data, dict) and 'stats' in cob_data: source = cob_data['stats'].get('source', 'unknown') @@ -6591,7 +6654,13 @@ class CleanTradingDashboard: # Update COB cache with status if symbol not in self.cob_cache: - self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + self.cob_cache[symbol] = { + 'last_update': 0, + 'data': None, + 'updates_count': 0, + 'update_times': [], + 'update_rate': 0.0 + } self.cob_cache[symbol]['websocket_status'] = status self.cob_cache[symbol]['websocket_message'] = message @@ -6687,11 +6756,15 @@ class CleanTradingDashboard: status_summary['overall_status'] = 'error' status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down' - # Set last update time + # Set last update time and calculate overall update rate last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()] if last_updates and max(last_updates) > 0: status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat() + # Calculate overall update rate (sum of all symbols) + total_update_rate = sum(cache.get('update_rate', 0.0) for cache in self.cob_cache.values()) + status_summary['update_rate'] = total_update_rate + return status_summary except Exception as e: