#!/usr/bin/env python3 """ Enhanced COB WebSocket Implementation Robust WebSocket implementation for Consolidated Order Book data with: - Maximum allowed depth subscription - Clear error handling and warnings - Automatic reconnection with exponential backoff - Fallback to REST API when WebSocket fails - Dashboard integration with status updates This replaces the existing COB WebSocket implementation with a more reliable version. binance DOCS: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams """ import asyncio import json import logging import time import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass import aiohttp import weakref try: import websockets from websockets.client import connect as websockets_connect from websockets.exceptions import ConnectionClosed, WebSocketException WEBSOCKETS_AVAILABLE = True except ImportError: websockets = None websockets_connect = None ConnectionClosed = Exception WebSocketException = Exception WEBSOCKETS_AVAILABLE = False logger = logging.getLogger(__name__) @dataclass class COBWebSocketStatus: """Status tracking for COB WebSocket connections""" connected: bool = False last_message_time: Optional[datetime] = None connection_attempts: int = 0 last_error: Optional[str] = None reconnect_delay: float = 1.0 max_reconnect_delay: float = 60.0 messages_received: int = 0 def reset_reconnect_delay(self): """Reset reconnect delay on successful connection""" self.reconnect_delay = 1.0 def increase_reconnect_delay(self): """Increase reconnect delay with exponential backoff""" self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) class EnhancedCOBWebSocket: """Enhanced COB WebSocket with multi-stream support and robust error handling Subscribes to multiple Binance WebSocket streams: - Order book depth updates (@depth@1000ms) - 1-second candlestick data (@kline_1s) with timezone support - 24hr ticker statistics (@ticker) - includes volume data - Aggregated trade data (@aggTrade) - for real-time volume analysis Features: - Binance WebSocket compliance (ping/pong, rate limiting, 24hr reconnection) - Proper order book synchronization with REST API snapshots - Real-time volume metrics and trade analysis - Timezone offset support for kline streams (UTC or UTC+8) - Fallback to REST API when WebSocket fails Usage Examples: # UTC timezone (default) cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT']) # UTC+8 timezone for kline streams cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT'], timezone_offset='+08:00') # Get kline data with timezone conversion kline_utc8 = cob_ws.get_kline_with_timezone('BTC/USDT', '+08:00') """ _instances = {} # Track instances by symbols to prevent duplicates def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None): """ Initialize Enhanced COB WebSocket Args: symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) dashboard_callback: Callback function for dashboard status updates timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback self.timezone_offset = timezone_offset # None for UTC, '+08:00' for UTC+8 # Check for existing instances to prevent duplicate connections symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: logger.warning(f"EnhancedCOBWebSocket already exists for symbols {self.symbols} - reusing existing instance") existing = EnhancedCOBWebSocket._instances[symbols_key] # Copy existing instance data self.__dict__.update(existing.__dict__) return # Register this instance EnhancedCOBWebSocket._instances[symbols_key] = self # Connection status tracking self.status: Dict[str, COBWebSocketStatus] = { symbol: COBWebSocketStatus() for symbol in self.symbols } # Data callbacks self.cob_callbacks: List[Callable] = [] self.error_callbacks: List[Callable] = [] # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} # Order book management for proper diff depth stream handling (Binance compliant) self.order_books: Dict[str, Dict] = {} # {symbol: {'bids': {price: qty}, 'asks': {price: qty}}} self.last_update_ids: Dict[str, int] = {} # Track last update IDs for synchronization self.order_book_initialized: Dict[str, bool] = {} # Track if order book is properly initialized # Event buffering for proper Binance order book synchronization self.event_buffers: Dict[str, List[Dict]] = {} # Buffer events before snapshot self.first_event_u: Dict[str, int] = {} # Track first event U for synchronization self.snapshot_in_progress: Dict[str, bool] = {} # Track snapshot initialization # Rate limiting for message processing (Binance: max 5 messages per second) self.last_message_time: Dict[str, datetime] = {} self.min_message_interval = 0.2 # 200ms = 5 messages per second compliance self.message_count: Dict[str, int] = {} self.message_window_start: Dict[str, datetime] = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} # REST API fallback self.rest_session: Optional[aiohttp.ClientSession] = None self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols} self.rest_tasks: Dict[str, asyncio.Task] = {} # Configuration self.max_depth = 1000 # Maximum depth for order book self.update_speed = '1000ms' # Binance update speed - reduced for stability # Timezone configuration if self.timezone_offset == '+08:00': logger.info("🕐 Configured for UTC+8 timezone (Asian markets)") elif self.timezone_offset: logger.info(f"🕐 Configured for {self.timezone_offset} timezone") else: logger.info("🕐 Configured for UTC timezone (default)") logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: logger.error("WebSockets module not available - COB data will be limited to REST API") def add_cob_callback(self, callback: Callable): """Add callback for COB data updates""" self.cob_callbacks.append(callback) def add_error_callback(self, callback: Callable): """Add callback for error notifications""" self.error_callbacks.append(callback) async def start(self): """Start COB WebSocket connections""" logger.info("Starting Enhanced COB WebSocket system") # Initialize REST session for fallback await self._init_rest_session() # Start WebSocket connections for each symbol for symbol in self.symbols: await self._start_symbol_websocket(symbol) # Start monitoring task asyncio.create_task(self._monitor_connections()) logger.info("Enhanced COB WebSocket system started") async def stop(self): """Stop all WebSocket connections""" logger.info("Stopping Enhanced COB WebSocket system") # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Cancel all REST tasks for symbol, task in self.rest_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Close REST session if self.rest_session: await self.rest_session.close() # Remove from instances registry symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: del EnhancedCOBWebSocket._instances[symbols_key] logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): """Initialize REST API session for fallback and snapshots - Windows compatible""" import platform try: # Detect Windows and use specific configuration is_windows = platform.system().lower() == 'windows' if is_windows: # Windows-specific configuration to avoid aiodns issues import asyncio # Check if we're using ProactorEventLoop (Windows default) loop = asyncio.get_event_loop() loop_type = type(loop).__name__ logger.debug(f"Event loop type: {loop_type}") # Use basic connector without DNS caching for Windows connector = aiohttp.TCPConnector( limit=50, limit_per_host=10, enable_cleanup_closed=True, use_dns_cache=False, # Critical: disable DNS cache resolver=None, # Use default resolver, not aiodns family=0, # Use default address family ssl=False # Disable SSL verification for simplicity ) timeout = aiohttp.ClientTimeout(total=15, connect=10) self.rest_session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={ 'User-Agent': 'Enhanced-COB-WebSocket/1.0', 'Accept': 'application/json' } ) logger.info("✅ REST API session initialized (Windows ProactorEventLoop compatible)") else: # Unix/Linux configuration (can use more advanced features) connector = aiohttp.TCPConnector( limit=100, limit_per_host=20, enable_cleanup_closed=True, use_dns_cache=True ) timeout = aiohttp.ClientTimeout(total=10, connect=5) self.rest_session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} ) logger.info("✅ REST API session initialized (Unix/Linux)") except Exception as e: logger.warning(f"⚠️ Failed to initialize REST session: {e}") # Fallback: Ultra-minimal configuration try: # Most basic configuration possible self.rest_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=20), headers={'User-Agent': 'COB-WebSocket/1.0'} ) logger.info("✅ REST API session initialized with ultra-minimal config") except Exception as e2: logger.error(f"❌ Failed to initialize any REST session: {e2}") logger.info("🔄 Continuing without REST session - WebSocket-only mode") self.rest_session = None async def _get_order_book_snapshot(self, symbol: str): """Get initial order book snapshot from REST API This is necessary for properly maintaining the order book state with the WebSocket depth stream. """ try: # Ensure REST session is available if not self.rest_session: await self._init_rest_session() if not self.rest_session: logger.warning(f"⚠️ Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only") return # Convert symbol format for Binance API binance_symbol = symbol.replace('/', '') # Get order book snapshot with maximum depth url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000" logger.debug(f"🔍 Getting order book snapshot for {symbol} from {url}") async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() # Validate response structure if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data: logger.error(f"❌ Invalid order book snapshot response for {symbol}: missing bids/asks") return # Initialize order book state for proper WebSocket synchronization if symbol not in self.order_books: self.order_books[symbol] = {'bids': {}, 'asks': {}} # Clear existing data and populate with snapshot self.order_books[symbol]['bids'] = {float(price): float(qty) for price, qty in data['bids'] if float(qty) > 0} self.order_books[symbol]['asks'] = {float(price): float(qty) for price, qty in data['asks'] if float(qty) > 0} # Store last update ID for synchronization if 'lastUpdateId' in data: self.last_update_ids[symbol] = data['lastUpdateId'] logger.debug(f"Order book snapshot for {symbol}: lastUpdateId = {data['lastUpdateId']}") # Mark as initialized self.order_book_initialized[symbol] = True logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") # Create initial COB data from snapshot bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0] asks = [{'price': float(price), 'size': float(qty)} for price, qty in data['asks'] if float(qty) > 0] # Sort bids (descending) and asks (ascending) bids.sort(key=lambda x: x['price'], reverse=True) asks.sort(key=lambda x: x['price']) # Create COB data structure if we have valid data if bids and asks: best_bid = bids[0] best_ask = asks[0] mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Calculate volumes bid_volume = sum(bid['size'] * bid['price'] for bid in bids) ask_volume = sum(ask['size'] * ask['price'] for ask in asks) total_volume = bid_volume + ask_volume cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids, 'asks': asks, 'source': 'rest_snapshot', 'exchange': 'binance', 'stats': { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'imbalance': (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0, 'bid_levels': len(bids), 'ask_levels': len(asks), 'timestamp': datetime.now().isoformat() } } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps") else: logger.warning(f"⚠️ No valid bid/ask data in snapshot for {symbol}") elif response.status == 429: logger.warning(f"⚠️ Rate limited getting snapshot for {symbol}, will continue with WebSocket only") else: logger.error(f"❌ Failed to get order book snapshot for {symbol}: HTTP {response.status}") response_text = await response.text() logger.debug(f"Response: {response_text}") except asyncio.TimeoutError: logger.warning(f"⚠️ Timeout getting order book snapshot for {symbol}, will continue with WebSocket only") except Exception as e: logger.warning(f"⚠️ Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only") logger.debug(f"Snapshot error details: {e}") # Don't fail the entire connection due to snapshot issues async def _start_symbol_websocket(self, symbol: str): """Start WebSocket connection for a specific symbol""" if not WEBSOCKETS_AVAILABLE: logger.warning(f"WebSockets not available for {symbol}, starting REST fallback") await self._start_rest_fallback(symbol) return # Cancel existing task if running if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): self.websocket_tasks[symbol].cancel() # Start new WebSocket task self.websocket_tasks[symbol] = asyncio.create_task( self._websocket_connection_loop(symbol) ) logger.info(f"Started WebSocket task for {symbol}") async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic Subscribes to multiple streams via combined stream endpoint: - Order book depth (@depth@1000ms) - 1-second candlesticks (@kline_1s) - 24hr ticker with volume (@ticker) - Aggregated trades (@aggTrade) """ status = self.status[symbol] while True: try: logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 # Create WebSocket URL with combined streams for multiple data types ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT # Subscribe to multiple streams: # 1. depth@1000ms - Order book depth updates # 2. kline_1s - 1-second candlestick data (with optional timezone offset) # 3. ticker - 24hr ticker statistics (includes volume) # 4. aggTrade - Aggregated trade data for volume analysis # Configure kline stream with timezone offset if specified if self.timezone_offset: kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}" logger.info(f"Using timezone offset {self.timezone_offset} for kline stream") else: kline_stream = f"{ws_symbol}@kline_1s" logger.info("Using UTC timezone for kline stream") streams = [ f"{ws_symbol}@depth@1000ms", # Order book depth kline_stream, # 1-second candlesticks (with timezone) f"{ws_symbol}@ticker", # 24hr ticker with volume f"{ws_symbol}@aggTrade" # Aggregated trades ] # Use combined stream endpoint streams_param = "/".join(streams) ws_url = f"wss://stream.binance.com:9443/stream?streams={streams_param}" logger.info(f"Connecting to: {ws_url}") # Add ping/pong handling and proper connection management async with websockets_connect( ws_url, ping_interval=20, # Binance sends ping every 20 seconds ping_timeout=60, # Binance disconnects after 1 minute without pong close_timeout=10 ) as websocket: # Connection successful status.connected = True status.last_error = None status.reset_reconnect_delay() logger.info(f"WebSocket connected for {symbol} with ping/pong handling") await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") # Deactivate REST fallback if self.rest_fallback_active[symbol]: await self._stop_rest_fallback(symbol) # Track connection start time for 24-hour limit compliance connection_start = datetime.now() # Message receiving loop with proper ping/pong handling try: async for message in websocket: try: # Check for 24-hour connection limit (Binance requirement) if (datetime.now() - connection_start).total_seconds() > 23.5 * 3600: # 23.5 hours logger.info(f"Approaching 24-hour limit for {symbol}, reconnecting...") break # Handle different message types if isinstance(message, bytes): # Handle ping frames (though websockets library handles this automatically) continue # Rate limiting: Binance allows max 5 messages per second now = datetime.now() # Initialize rate limiting tracking if symbol not in self.message_window_start: self.message_window_start[symbol] = now self.message_count[symbol] = 0 # Reset counter every second if (now - self.message_window_start[symbol]).total_seconds() >= 1.0: self.message_window_start[symbol] = now self.message_count[symbol] = 0 # Check rate limit (5 messages per second) if self.message_count[symbol] >= 5: continue # Skip this message to comply with rate limit self.message_count[symbol] += 1 self.last_message_time[symbol] = now # Parse JSON message data = json.loads(message) # Handle combined stream format if 'stream' in data and 'data' in data: # Combined stream format: {"stream":"","data":} stream_name = data['stream'] stream_data = data['data'] await self._process_combined_stream_message(symbol, stream_name, stream_data) else: # Single stream format (fallback) await self._process_websocket_message(symbol, data) status.last_message_time = now status.messages_received += 1 except json.JSONDecodeError as e: logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") except websockets.exceptions.ConnectionClosedOK: logger.info(f"WebSocket connection closed normally for {symbol}") except websockets.exceptions.ConnectionClosedError as e: logger.warning(f"WebSocket connection closed with error for {symbol}: {e}") raise except ConnectionClosed as e: status.connected = False status.last_error = f"Connection closed: {e}" logger.warning(f"WebSocket connection closed for {symbol}: {e}") except WebSocketException as e: status.connected = False status.last_error = f"WebSocket error: {e}" logger.error(f"WebSocket error for {symbol}: {e}") except Exception as e: status.connected = False status.last_error = f"Unexpected error: {e}" logger.error(f"Unexpected WebSocket error for {symbol}: {e}") logger.error(traceback.format_exc()) # Connection failed or closed - start REST fallback await self._notify_dashboard_status(symbol, "disconnected", status.last_error) await self._start_rest_fallback(symbol) # Wait before reconnecting status.increase_reconnect_delay() logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}") await asyncio.sleep(status.reconnect_delay) async def _process_websocket_message(self, symbol: str, data: Dict): """Process WebSocket message and convert to COB format Properly handles Binance depth stream format according to documentation: - depthUpdate events with U (first update ID) and u (final update ID) - Maintains local order book with proper synchronization """ try: # Check if this is a depth update event if data.get('e') == 'depthUpdate': # Handle diff depth stream according to Binance documentation await self._handle_depth_update(symbol, data) return # Fallback: handle partial book depth format (depth5, depth10, depth20) # Extract bids and asks from the message - handle all possible formats bids_data = data.get('bids', data.get('b', [])) asks_data = data.get('asks', data.get('a', [])) # Process the order book data - filter out zero quantities # Binance uses 0 quantity to indicate removal from the book valid_bids = [] valid_asks = [] # Process bids for bid in bids_data: try: if len(bid) >= 2: price = float(bid[0]) size = float(bid[1]) if size > 0: # Only include non-zero quantities valid_bids.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Process asks for ask in asks_data: try: if len(ask) >= 2: price = float(ask[0]) size = float(ask[1]) if size > 0: # Only include non-zero quantities valid_asks.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Sort bids (descending) and asks (ascending) for proper order book valid_bids.sort(key=lambda x: x['price'], reverse=True) valid_asks.sort(key=lambda x: x['price']) # Limit to maximum depth (1000 levels for maximum DOM) max_depth = 1000 if len(valid_bids) > max_depth: valid_bids = valid_bids[:max_depth] if len(valid_asks) > max_depth: valid_asks = valid_asks[:max_depth] # Create COB data structure matching the working dashboard format cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': valid_bids, 'asks': valid_asks, 'source': 'enhanced_websocket', 'exchange': 'binance' } # Calculate comprehensive stats if we have valid data if valid_bids and valid_asks: best_bid = valid_bids[0] # Already sorted, first is highest best_ask = valid_asks[0] # Already sorted, first is lowest # Core price metrics mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Volume calculations (notional value) - limit to top 20 levels for performance top_bids = valid_bids[:20] top_asks = valid_asks[:20] bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) # Size calculations (base currency) bid_size = sum(bid['size'] for bid in top_bids) ask_size = sum(ask['size'] for ask in top_asks) # Imbalance calculations total_volume = bid_volume + ask_volume volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 total_size = bid_size + ask_size size_imbalance = (bid_size - ask_size) / total_size if total_size > 0 else 0 cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'bid_liquidity': bid_volume, # Add liquidity fields 'ask_liquidity': ask_volume, 'total_bid_liquidity': bid_volume, 'total_ask_liquidity': ask_volume, 'bid_size': bid_size, 'ask_size': ask_size, 'volume_imbalance': volume_imbalance, 'size_imbalance': size_imbalance, 'imbalance': volume_imbalance, # Default to volume imbalance 'bid_levels': len(valid_bids), 'ask_levels': len(valid_asks), 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), # Binance update ID 'event_time': data.get('E', 0) # Binance event time } else: # Provide default stats if no valid data cob_data['stats'] = { 'best_bid': 0, 'best_ask': 0, 'mid_price': 0, 'spread': 0, 'spread_bps': 0, 'bid_volume': 0, 'ask_volume': 0, 'total_bid_volume': 0, 'total_ask_volume': 0, 'bid_size': 0, 'ask_size': 0, 'volume_imbalance': 0, 'size_imbalance': 0, 'imbalance': 0, 'bid_levels': 0, 'ask_levels': 0, 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), 'event_time': data.get('E', 0) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB callback: {e}") # Log success with key metrics (only for non-empty updates) if valid_bids and valid_asks: logger.debug(f"{symbol}: ${cob_data['stats']['mid_price']:.2f}, {len(valid_bids)} bids, {len(valid_asks)} asks, spread: {cob_data['stats']['spread_bps']:.1f} bps") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") import traceback logger.debug(traceback.format_exc()) async def _handle_depth_update(self, symbol: str, data: Dict): """Handle Binance depth update events following official recommendations Binance Order Book Management Procedure: 1. Buffer events received from stream, note U of first event 2. Get depth snapshot from REST API 3. If snapshot lastUpdateId < first event U, get new snapshot 4. Discard buffered events where u <= snapshot lastUpdateId 5. First buffered event should have lastUpdateId within [U;u] range 6. Set local order book to snapshot 7. Apply buffered events, then subsequent events """ try: # Extract update IDs first_update_id = data.get('U') # First update ID in event final_update_id = data.get('u') # Final update ID in event if first_update_id is None or final_update_id is None: logger.warning(f"Missing update IDs in depth update for {symbol}") return # Initialize buffers if needed if symbol not in self.event_buffers: self.event_buffers[symbol] = [] self.first_event_u[symbol] = first_update_id self.snapshot_in_progress[symbol] = False logger.debug(f"Started buffering events for {symbol}, first U: {first_update_id}") # Check if order book is initialized and synchronized if not self.order_book_initialized.get(symbol, False): # Buffer this event self.event_buffers[symbol].append(data) logger.debug(f"Buffered event for {symbol}: U={first_update_id}, u={final_update_id}") # Start snapshot initialization if not already in progress if not self.snapshot_in_progress.get(symbol, False): self.snapshot_in_progress[symbol] = True await self._initialize_order_book_with_buffering(symbol) return # Order book is initialized, apply update directly await self._apply_depth_update(symbol, data) except Exception as e: logger.error(f"Error handling depth update for {symbol}: {e}") async def _initialize_order_book_with_buffering(self, symbol: str): """Initialize order book following Binance recommendations with event buffering""" try: max_attempts = 3 attempt = 0 while attempt < max_attempts: attempt += 1 logger.info(f"Initializing order book for {symbol} (attempt {attempt})") # Get snapshot from REST API snapshot_data = await self._get_order_book_snapshot_data(symbol) if not snapshot_data: logger.error(f"Failed to get snapshot for {symbol}") await asyncio.sleep(1) continue snapshot_last_update_id = snapshot_data.get('lastUpdateId', 0) first_buffered_u = self.first_event_u.get(symbol, 0) logger.debug(f"Snapshot lastUpdateId: {snapshot_last_update_id}, First buffered U: {first_buffered_u}") # Check if snapshot is valid (step 3 in Binance procedure) if snapshot_last_update_id < first_buffered_u: logger.warning(f"Snapshot too old for {symbol}: {snapshot_last_update_id} < {first_buffered_u}, retrying...") await asyncio.sleep(0.5) continue # Snapshot is valid, set up order book await self._setup_order_book_from_snapshot(symbol, snapshot_data) # Process buffered events (steps 4-7 in Binance procedure) await self._process_buffered_events(symbol, snapshot_last_update_id) # Mark as initialized self.order_book_initialized[symbol] = True self.snapshot_in_progress[symbol] = False logger.info(f"✅ Order book initialized for {symbol} with {len(self.event_buffers[symbol])} buffered events") return logger.error(f"❌ Failed to initialize order book for {symbol} after {max_attempts} attempts") self.snapshot_in_progress[symbol] = False except Exception as e: logger.error(f"Error initializing order book with buffering for {symbol}: {e}") self.snapshot_in_progress[symbol] = False async def _get_order_book_snapshot_data(self, symbol: str) -> Optional[Dict]: """Get order book snapshot data from REST API""" try: if not self.rest_session: await self._init_rest_session() if not self.rest_session: return None # Convert symbol format for Binance API binance_symbol = symbol.replace('/', '') url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000" async with self.rest_session.get(url) as response: if response.status == 200: return await response.json() else: logger.error(f"Failed to get snapshot for {symbol}: HTTP {response.status}") return None except Exception as e: logger.error(f"Error getting snapshot data for {symbol}: {e}") return None async def _setup_order_book_from_snapshot(self, symbol: str, snapshot_data: Dict): """Set up local order book from REST API snapshot""" try: # Initialize order book structure if symbol not in self.order_books: self.order_books[symbol] = {'bids': {}, 'asks': {}} # Clear existing data and populate with snapshot self.order_books[symbol]['bids'] = { float(price): float(qty) for price, qty in snapshot_data['bids'] if float(qty) > 0 } self.order_books[symbol]['asks'] = { float(price): float(qty) for price, qty in snapshot_data['asks'] if float(qty) > 0 } # Set last update ID from snapshot self.last_update_ids[symbol] = snapshot_data['lastUpdateId'] logger.debug(f"Set up order book for {symbol}: {len(self.order_books[symbol]['bids'])} bids, {len(self.order_books[symbol]['asks'])} asks, lastUpdateId: {snapshot_data['lastUpdateId']}") except Exception as e: logger.error(f"Error setting up order book from snapshot for {symbol}: {e}") async def _process_buffered_events(self, symbol: str, snapshot_last_update_id: int): """Process buffered events according to Binance procedure""" try: buffered_events = self.event_buffers.get(symbol, []) valid_events = [] # Step 4: Discard events where u <= lastUpdateId of snapshot for event in buffered_events: event_final_u = event.get('u', 0) if event_final_u > snapshot_last_update_id: valid_events.append(event) else: logger.debug(f"Discarded buffered event for {symbol}: u={event_final_u} <= snapshot lastUpdateId={snapshot_last_update_id}") # Step 5: Validate first buffered event if valid_events: first_event = valid_events[0] first_u = first_event.get('U', 0) first_final_u = first_event.get('u', 0) if not (first_u <= snapshot_last_update_id <= first_final_u): logger.error(f"❌ Synchronization error for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} not in range [{first_u}, {first_final_u}]") # Reset and try again self.order_book_initialized[symbol] = False self.event_buffers[symbol] = [] return logger.debug(f"✅ First buffered event valid for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} in range [{first_u}, {first_final_u}]") # Step 6-7: Apply all valid buffered events for event in valid_events: await self._apply_depth_update(symbol, event) # Clear buffer self.event_buffers[symbol] = [] logger.info(f"Processed {len(valid_events)} buffered events for {symbol}") except Exception as e: logger.error(f"Error processing buffered events for {symbol}: {e}") async def _apply_depth_update(self, symbol: str, data: Dict): """Apply a single depth update to the local order book""" try: first_update_id = data.get('U', 0) final_update_id = data.get('u', 0) last_update_id = self.last_update_ids.get(symbol, 0) # Validate update sequence if final_update_id <= last_update_id: # Event is older than our current state, ignore logger.debug(f"Ignoring old event for {symbol}: u={final_update_id} <= lastUpdateId={last_update_id}") return if first_update_id > last_update_id + 1: # Gap detected, need to reinitialize logger.warning(f"Gap detected in depth updates for {symbol}: U={first_update_id} > lastUpdateId+1={last_update_id+1}") self.order_book_initialized[symbol] = False self.event_buffers[symbol] = [] self.first_event_u[symbol] = first_update_id await self._initialize_order_book_with_buffering(symbol) return # Apply updates to local order book bids_updates = data.get('b', []) asks_updates = data.get('a', []) # Update bids for bid_update in bids_updates: if len(bid_update) >= 2: price = float(bid_update[0]) quantity = float(bid_update[1]) if quantity == 0: # Remove price level self.order_books[symbol]['bids'].pop(price, None) else: # Update price level self.order_books[symbol]['bids'][price] = quantity # Update asks for ask_update in asks_updates: if len(ask_update) >= 2: price = float(ask_update[0]) quantity = float(ask_update[1]) if quantity == 0: # Remove price level self.order_books[symbol]['asks'].pop(price, None) else: # Update price level self.order_books[symbol]['asks'][price] = quantity # Update last update ID self.last_update_ids[symbol] = final_update_id # Convert to COB format and notify callbacks await self._create_cob_from_order_book(symbol) logger.debug(f"Applied depth update for {symbol}: U={first_update_id}, u={final_update_id}") except Exception as e: logger.error(f"Error applying depth update for {symbol}: {e}") async def _initialize_order_book(self, symbol: str): """Initialize order book from REST API snapshot (legacy method)""" try: # Use the new buffering approach for proper synchronization if symbol not in self.event_buffers: self.event_buffers[symbol] = [] self.first_event_u[symbol] = 0 self.snapshot_in_progress[symbol] = False await self._initialize_order_book_with_buffering(symbol) except Exception as e: logger.error(f"Failed to initialize order book for {symbol}: {e}") self.order_book_initialized[symbol] = False async def _create_cob_from_order_book(self, symbol: str): """Create COB data from maintained order book""" try: if symbol not in self.order_books: return order_book = self.order_books[symbol] # Sort and convert to list format bids = [{'price': price, 'size': qty} for price, qty in order_book['bids'].items()] asks = [{'price': price, 'size': qty} for price, qty in order_book['asks'].items()] # Sort bids (descending) and asks (ascending) bids.sort(key=lambda x: x['price'], reverse=True) asks.sort(key=lambda x: x['price']) # Limit depth max_depth = 1000 if len(bids) > max_depth: bids = bids[:max_depth] if len(asks) > max_depth: asks = asks[:max_depth] # Create COB data structure cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids, 'asks': asks, 'source': 'binance_depth_stream', 'exchange': 'binance' } # Calculate stats if we have valid data if bids and asks: best_bid = bids[0] best_ask = asks[0] mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Calculate volumes (top 20 levels for performance) top_bids = bids[:20] top_asks = asks[:20] bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) total_volume = bid_volume + ask_volume volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'imbalance': volume_imbalance, 'bid_levels': len(bids), 'ask_levels': len(asks), 'timestamp': datetime.now().isoformat(), 'update_id': self.last_update_ids.get(symbol, 0) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB callback: {e}") logger.debug(f"{symbol}: Depth stream update - ${cob_data.get('stats', {}).get('mid_price', 0):.2f}") except Exception as e: logger.error(f"Error creating COB from order book for {symbol}: {e}") async def _process_combined_stream_message(self, symbol: str, stream_name: str, data: Dict): """Process messages from combined stream format""" try: # Extract stream type from stream name (e.g., "btcusdt@depth@1000ms" -> "depth") stream_parts = stream_name.split('@') if len(stream_parts) < 2: logger.warning(f"Invalid stream name format: {stream_name}") return stream_type = stream_parts[1] # Route to appropriate handler based on stream type if stream_type == 'depth': await self._handle_depth_stream(symbol, data) elif stream_type == 'kline_1s': await self._handle_kline_stream(symbol, data) elif stream_type == 'ticker': await self._handle_ticker_stream(symbol, data) elif stream_type == 'aggTrade': await self._handle_aggtrade_stream(symbol, data) else: logger.debug(f"Unhandled stream type: {stream_type} for {symbol}") except Exception as e: logger.error(f"Error processing combined stream message for {symbol}: {e}") async def _handle_depth_stream(self, symbol: str, data: Dict): """Handle depth stream data (order book updates)""" try: # Check if this is a depth update event if data.get('e') == 'depthUpdate': await self._handle_depth_update(symbol, data) else: # Handle partial book depth format await self._process_websocket_message(symbol, data) except Exception as e: logger.error(f"Error handling depth stream for {symbol}: {e}") async def _handle_kline_stream(self, symbol: str, data: Dict): """Handle 1-second kline/candlestick data with timezone support""" try: if data.get('e') != 'kline': return kline_data = data.get('k', {}) if not kline_data: return # Extract timestamps (always in UTC milliseconds according to Binance docs) open_time_ms = kline_data.get('t', 0) close_time_ms = kline_data.get('T', 0) event_time_ms = data.get('E', 0) # Convert timestamps to datetime objects open_time_utc = datetime.fromtimestamp(open_time_ms / 1000) if open_time_ms else None close_time_utc = datetime.fromtimestamp(close_time_ms / 1000) if close_time_ms else None event_time_utc = datetime.fromtimestamp(event_time_ms / 1000) if event_time_ms else datetime.now() # Extract candlestick data candlestick = { 'symbol': symbol, 'timestamp': datetime.now(), 'event_time': event_time_utc, 'open_time': open_time_ms, # Raw timestamp (ms) 'close_time': close_time_ms, # Raw timestamp (ms) 'open_time_utc': open_time_utc, # Converted to UTC datetime 'close_time_utc': close_time_utc, # Converted to UTC datetime 'interval': kline_data.get('i', '1s'), 'open_price': float(kline_data.get('o', 0)), 'high_price': float(kline_data.get('h', 0)), 'low_price': float(kline_data.get('l', 0)), 'close_price': float(kline_data.get('c', 0)), 'volume': float(kline_data.get('v', 0)), # Base asset volume 'quote_volume': float(kline_data.get('q', 0)), # Quote asset volume 'trades_count': kline_data.get('n', 0), 'taker_buy_volume': float(kline_data.get('V', 0)), # Taker buy base asset volume 'taker_buy_quote_volume': float(kline_data.get('Q', 0)), # Taker buy quote asset volume 'is_closed': kline_data.get('x', False), # Is this kline closed? 'first_trade_id': kline_data.get('f', 0), 'last_trade_id': kline_data.get('L', 0), 'timezone_offset': self.timezone_offset, # Timezone info 'source': f'binance_kline_1s_{self.timezone_offset or "utc"}' } # Store latest candlestick data if not hasattr(self, 'latest_candlestick_data'): self.latest_candlestick_data = {} self.latest_candlestick_data[symbol] = candlestick # Notify callbacks with candlestick data for callback in self.cob_callbacks: try: await callback(symbol, {'type': 'candlestick', 'data': candlestick}) except Exception as e: logger.error(f"Error in candlestick callback: {e}") logger.debug(f"{symbol}: 1s Kline - O:{candlestick['open_price']:.2f} H:{candlestick['high_price']:.2f} L:{candlestick['low_price']:.2f} C:{candlestick['close_price']:.2f} V:{candlestick['volume']:.2f}") except Exception as e: logger.error(f"Error handling kline stream for {symbol}: {e}") async def _handle_ticker_stream(self, symbol: str, data: Dict): """Handle 24hr ticker data (includes volume statistics)""" try: if data.get('e') != '24hrTicker': return # Extract ticker data ticker = { 'symbol': symbol, 'timestamp': datetime.now(), 'price_change': float(data.get('p', 0)), 'price_change_percent': float(data.get('P', 0)), 'weighted_avg_price': float(data.get('w', 0)), 'last_price': float(data.get('c', 0)), 'last_qty': float(data.get('Q', 0)), 'best_bid_price': float(data.get('b', 0)), 'best_bid_qty': float(data.get('B', 0)), 'best_ask_price': float(data.get('a', 0)), 'best_ask_qty': float(data.get('A', 0)), 'open_price': float(data.get('o', 0)), 'high_price': float(data.get('h', 0)), 'low_price': float(data.get('l', 0)), 'volume': float(data.get('v', 0)), # 24hr volume 'quote_volume': float(data.get('q', 0)), # 24hr quote volume 'open_time': data.get('O', 0), 'close_time': data.get('C', 0), 'first_trade_id': data.get('F', 0), 'last_trade_id': data.get('L', 0), 'trades_count': data.get('n', 0), 'source': 'binance_24hr_ticker' } # Store latest ticker data if not hasattr(self, 'latest_ticker_data'): self.latest_ticker_data = {} self.latest_ticker_data[symbol] = ticker # Notify callbacks with ticker data for callback in self.cob_callbacks: try: await callback(symbol, {'type': 'ticker', 'data': ticker}) except Exception as e: logger.error(f"Error in ticker callback: {e}") logger.debug(f"{symbol}: 24hr Ticker - Price:{ticker['last_price']:.2f} Volume:{ticker['volume']:.2f} Change:{ticker['price_change_percent']:.2f}%") except Exception as e: logger.error(f"Error handling ticker stream for {symbol}: {e}") async def _handle_aggtrade_stream(self, symbol: str, data: Dict): """Handle aggregated trade data for real-time volume analysis""" try: if data.get('e') != 'aggTrade': return # Extract aggregated trade data agg_trade = { 'symbol': symbol, 'timestamp': datetime.now(), 'trade_id': data.get('a', 0), 'price': float(data.get('p', 0)), 'quantity': float(data.get('q', 0)), 'first_trade_id': data.get('f', 0), 'last_trade_id': data.get('l', 0), 'trade_time': data.get('T', 0), 'is_buyer_maker': data.get('m', False), # True if buyer is market maker 'notional_value': float(data.get('p', 0)) * float(data.get('q', 0)), 'source': 'binance_aggtrade' } # Store latest trade data if not hasattr(self, 'latest_trade_data'): self.latest_trade_data = {} if symbol not in self.latest_trade_data: self.latest_trade_data[symbol] = [] # Keep last 100 trades for volume analysis self.latest_trade_data[symbol].append(agg_trade) if len(self.latest_trade_data[symbol]) > 100: self.latest_trade_data[symbol] = self.latest_trade_data[symbol][-100:] # Calculate real-time volume metrics recent_trades = self.latest_trade_data[symbol][-10:] # Last 10 trades buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) total_volume = buy_volume + sell_volume volume_metrics = { 'buy_volume': buy_volume, 'sell_volume': sell_volume, 'total_volume': total_volume, 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0 } # Notify callbacks with trade data for callback in self.cob_callbacks: try: await callback(symbol, { 'type': 'aggtrade', 'data': agg_trade, 'volume_metrics': volume_metrics }) except Exception as e: logger.error(f"Error in aggtrade callback: {e}") logger.debug(f"{symbol}: AggTrade - Price:{agg_trade['price']:.2f} Qty:{agg_trade['quantity']:.4f} BuyerMaker:{agg_trade['is_buyer_maker']}") except Exception as e: logger.error(f"Error handling aggtrade stream for {symbol}: {e}") async def _start_rest_fallback(self, symbol: str): """Start REST API fallback for a symbol""" if self.rest_fallback_active[symbol]: return # Already active self.rest_fallback_active[symbol] = True # Cancel existing REST task if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() # Start new REST task self.rest_tasks[symbol] = asyncio.create_task( self._rest_fallback_loop(symbol) ) logger.warning(f"Started REST API fallback for {symbol}") await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback") async def _stop_rest_fallback(self, symbol: str): """Stop REST API fallback for a symbol""" if not self.rest_fallback_active[symbol]: return self.rest_fallback_active[symbol] = False if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() logger.info(f"Stopped REST API fallback for {symbol}") async def _rest_fallback_loop(self, symbol: str): """REST API fallback loop""" while self.rest_fallback_active[symbol]: try: await self._fetch_rest_orderbook(symbol) await asyncio.sleep(1) # Update every second except asyncio.CancelledError: break except Exception as e: logger.error(f"REST fallback error for {symbol}: {e}") await asyncio.sleep(5) # Wait longer on error async def _fetch_rest_orderbook(self, symbol: str): """Fetch order book data via REST API""" try: if not self.rest_session: return # Binance REST API rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000" async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']], 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']], 'source': 'rest_fallback', 'exchange': 'binance' } # Calculate stats if cob_data['bids'] and cob_data['asks']: best_bid = max(cob_data['bids'], key=lambda x: x['price']) best_ask = min(cob_data['asks'], key=lambda x: x['price']) cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'spread': best_ask['price'] - best_bid['price'], 'mid_price': (best_bid['price'] + best_ask['price']) / 2, 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") else: logger.warning(f"REST API error for {symbol}: HTTP {response.status}") except Exception as e: logger.error(f"Error fetching REST order book for {symbol}: {e}") async def _monitor_connections(self): """Monitor WebSocket connections and provide status updates""" while True: try: await asyncio.sleep(10) # Check every 10 seconds for symbol in self.symbols: status = self.status[symbol] # Check for stale connections (no messages for 30 seconds) if status.connected and status.last_message_time: time_since_last = (datetime.now() - status.last_message_time).total_seconds() if time_since_last > 30: logger.warning(f"Stale connection detected for {symbol}: {time_since_last:.1f}s since last message") status.connected = False status.last_error = f"Stale connection: {time_since_last:.1f}s without messages" await self._notify_dashboard_status(symbol, "stale", status.last_error) # Log connection status and order book sync status if status.connected: ob_status = "✅ Synced" if self.order_book_initialized.get(symbol, False) else "🔄 Syncing" buffered_count = len(self.event_buffers.get(symbol, [])) logger.debug(f"{symbol}: Connected, {status.messages_received} messages, OB: {ob_status}, Buffered: {buffered_count}") else: logger.debug(f"{symbol}: Disconnected - {status.last_error}") # Log overall system status connected_count = sum(1 for status in self.status.values() if status.connected) logger.debug(f"COB WebSocket status: {connected_count}/{len(self.symbols)} symbols connected") except Exception as e: logger.error(f"Error in connection monitor: {e}") await asyncio.sleep(10) async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of WebSocket status changes""" try: if self.dashboard_callback: status_data = { 'type': 'websocket_status', 'data': { 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() } } await self.dashboard_callback(symbol, status_data) except Exception as e: logger.error(f"Error notifying dashboard status for {symbol}: {e}") def get_latest_candlestick(self, symbol: str) -> Optional[Dict]: """Get the latest 1-second candlestick data for a symbol""" if hasattr(self, 'latest_candlestick_data'): return self.latest_candlestick_data.get(symbol) return None def get_latest_ticker(self, symbol: str) -> Optional[Dict]: """Get the latest 24hr ticker data for a symbol""" if hasattr(self, 'latest_ticker_data'): return self.latest_ticker_data.get(symbol) return None def get_latest_trades(self, symbol: str, count: int = 10) -> List[Dict]: """Get the latest aggregated trades for a symbol""" if hasattr(self, 'latest_trade_data') and symbol in self.latest_trade_data: return self.latest_trade_data[symbol][-count:] return [] def get_volume_metrics(self, symbol: str, timeframe_seconds: int = 60) -> Dict: """Calculate volume metrics for a given timeframe""" try: if not hasattr(self, 'latest_trade_data') or symbol not in self.latest_trade_data: return {'error': 'No trade data available'} current_time = datetime.now().timestamp() * 1000 # Convert to milliseconds cutoff_time = current_time - (timeframe_seconds * 1000) # Filter trades within timeframe recent_trades = [ trade for trade in self.latest_trade_data[symbol] if trade['trade_time'] >= cutoff_time ] if not recent_trades: return {'error': 'No recent trades in timeframe'} # Calculate metrics buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) total_volume = buy_volume + sell_volume trade_count = len(recent_trades) avg_trade_size = total_volume / trade_count if trade_count > 0 else 0 return { 'timeframe_seconds': timeframe_seconds, 'trade_count': trade_count, 'buy_volume': buy_volume, 'sell_volume': sell_volume, 'total_volume': total_volume, 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0, 'avg_trade_size': avg_trade_size, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error calculating volume metrics for {symbol}: {e}") return {'error': str(e)} def _convert_timestamp_to_timezone(self, timestamp_ms: int, target_timezone: str = None) -> datetime: """Convert UTC timestamp to specified timezone Args: timestamp_ms: UTC timestamp in milliseconds target_timezone: Target timezone (e.g., '+08:00' for UTC+8) Returns: datetime object in target timezone """ try: from datetime import timezone, timedelta # Convert to UTC datetime utc_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if not target_timezone: return utc_dt # Parse timezone offset (e.g., '+08:00') if target_timezone.startswith('+') or target_timezone.startswith('-'): sign = 1 if target_timezone.startswith('+') else -1 hours, minutes = map(int, target_timezone[1:].split(':')) offset = timedelta(hours=sign * hours, minutes=sign * minutes) target_tz = timezone(offset) return utc_dt.astimezone(target_tz) return utc_dt except Exception as e: logger.error(f"Error converting timestamp to timezone: {e}") return datetime.fromtimestamp(timestamp_ms / 1000) def get_kline_with_timezone(self, symbol: str, target_timezone: str = None) -> Optional[Dict]: """Get latest kline data with timezone conversion Args: symbol: Trading symbol target_timezone: Target timezone (e.g., '+08:00' for UTC+8) Returns: Kline data with timezone-converted timestamps """ try: candlestick = self.get_latest_candlestick(symbol) if not candlestick: return None # Create a copy with timezone conversions result = candlestick.copy() if target_timezone and candlestick.get('open_time'): result['open_time_local'] = self._convert_timestamp_to_timezone( candlestick['open_time'], target_timezone ) result['close_time_local'] = self._convert_timestamp_to_timezone( candlestick['close_time'], target_timezone ) result['target_timezone'] = target_timezone return result except Exception as e: logger.error(f"Error getting kline with timezone for {symbol}: {e}") return None def get_order_book_status(self, symbol: str) -> Dict: """Get detailed order book synchronization status""" try: return { 'symbol': symbol, 'initialized': self.order_book_initialized.get(symbol, False), 'snapshot_in_progress': self.snapshot_in_progress.get(symbol, False), 'buffered_events': len(self.event_buffers.get(symbol, [])), 'first_event_u': self.first_event_u.get(symbol, 0), 'last_update_id': self.last_update_ids.get(symbol, 0), 'bid_levels': len(self.order_books.get(symbol, {}).get('bids', {})), 'ask_levels': len(self.order_books.get(symbol, {}).get('asks', {})), 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting order book status for {symbol}: {e}") return {'error': str(e)} def get_all_order_book_status(self) -> Dict[str, Dict]: """Get order book status for all symbols""" return {symbol: self.get_order_book_status(symbol) for symbol in self.symbols} async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of status changes""" try: if self.dashboard_callback: status_data = { 'type': 'cob_status', 'symbol': symbol, 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() } # Check if callback is async or sync if asyncio.iscoroutinefunction(self.dashboard_callback): await self.dashboard_callback(status_data) else: # Call sync function directly self.dashboard_callback(status_data) except Exception as e: logger.error(f"Error notifying dashboard: {e}") def get_status_summary(self) -> Dict[str, Any]: """Get status summary for all symbols""" summary = { 'websockets_available': WEBSOCKETS_AVAILABLE, 'symbols': {}, 'overall_status': 'unknown' } connected_count = 0 fallback_count = 0 for symbol in self.symbols: status = self.status[symbol] symbol_status = { 'connected': status.connected, 'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None, 'connection_attempts': status.connection_attempts, 'last_error': status.last_error, 'messages_received': status.messages_received, 'rest_fallback_active': self.rest_fallback_active[symbol] } if status.connected: connected_count += 1 elif self.rest_fallback_active[symbol]: fallback_count += 1 summary['symbols'][symbol] = symbol_status # Determine overall status if connected_count == len(self.symbols): summary['overall_status'] = 'all_connected' elif connected_count + fallback_count == len(self.symbols): summary['overall_status'] = 'partial_fallback' else: summary['overall_status'] = 'degraded' return summary # Global instance for easy access enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None) -> EnhancedCOBWebSocket: """Get or create the global enhanced COB WebSocket instance Args: symbols: List of symbols to monitor dashboard_callback: Callback function for dashboard updates timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) """ global enhanced_cob_websocket if enhanced_cob_websocket is None: enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback, timezone_offset) await enhanced_cob_websocket.start() return enhanced_cob_websocket async def stop_enhanced_cob_websocket(): """Stop the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket: await enhanced_cob_websocket.stop() enhanced_cob_websocket = None