#!/usr/bin/env python3 """ Enhanced COB WebSocket Implementation Robust WebSocket implementation for Consolidated Order Book data with: - Maximum allowed depth subscription - Clear error handling and warnings - Automatic reconnection with exponential backoff - Fallback to REST API when WebSocket fails - Dashboard integration with status updates This replaces the existing COB WebSocket implementation with a more reliable version. binance DOCS: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams """ import asyncio import json import logging import time import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass import aiohttp import weakref try: import websockets from websockets.client import connect as websockets_connect from websockets.exceptions import ConnectionClosed, WebSocketException WEBSOCKETS_AVAILABLE = True except ImportError: websockets = None websockets_connect = None ConnectionClosed = Exception WebSocketException = Exception WEBSOCKETS_AVAILABLE = False logger = logging.getLogger(__name__) @dataclass class COBWebSocketStatus: """Status tracking for COB WebSocket connections""" connected: bool = False last_message_time: Optional[datetime] = None connection_attempts: int = 0 last_error: Optional[str] = None reconnect_delay: float = 1.0 max_reconnect_delay: float = 60.0 messages_received: int = 0 def reset_reconnect_delay(self): """Reset reconnect delay on successful connection""" self.reconnect_delay = 1.0 def increase_reconnect_delay(self): """Increase reconnect delay with exponential backoff""" self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) class EnhancedCOBWebSocket: """Enhanced COB WebSocket with multi-stream support and robust error handling Subscribes to multiple Binance WebSocket streams: - Order book depth updates (@depth@1000ms) - 1-second candlestick data (@kline_1s) with timezone support - 24hr ticker statistics (@ticker) - includes volume data - Aggregated trade data (@aggTrade) - for real-time volume analysis Features: - Binance WebSocket compliance (ping/pong, rate limiting, 24hr reconnection) - Proper order book synchronization with REST API snapshots - Real-time volume metrics and trade analysis - Timezone offset support for kline streams (UTC or UTC+8) - Fallback to REST API when WebSocket fails Usage Examples: # UTC timezone (default) cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT']) # UTC+8 timezone for kline streams cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT'], timezone_offset='+08:00') # Get kline data with timezone conversion kline_utc8 = cob_ws.get_kline_with_timezone('BTC/USDT', '+08:00') """ _instances = {} # Track instances by symbols to prevent duplicates def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None): """ Initialize Enhanced COB WebSocket Args: symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) dashboard_callback: Callback function for dashboard status updates timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback self.timezone_offset = timezone_offset # None for UTC, '+08:00' for UTC+8 # Check for existing instances to prevent duplicate connections symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: logger.warning(f"EnhancedCOBWebSocket already exists for symbols {self.symbols} - reusing existing instance") existing = EnhancedCOBWebSocket._instances[symbols_key] # Copy existing instance data self.__dict__.update(existing.__dict__) return # Register this instance EnhancedCOBWebSocket._instances[symbols_key] = self # Connection status tracking self.status: Dict[str, COBWebSocketStatus] = { symbol: COBWebSocketStatus() for symbol in self.symbols } # Data callbacks self.cob_callbacks: List[Callable] = [] self.error_callbacks: List[Callable] = [] # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} # Order book management for proper diff depth stream handling self.order_books: Dict[str, Dict] = {} # {symbol: {'bids': {price: qty}, 'asks': {price: qty}}} self.last_update_ids: Dict[str, int] = {} # Track last update IDs for synchronization self.order_book_initialized: Dict[str, bool] = {} # Track if order book is properly initialized # Rate limiting for message processing (Binance: max 5 messages per second) self.last_message_time: Dict[str, datetime] = {} self.min_message_interval = 0.2 # 200ms = 5 messages per second compliance self.message_count: Dict[str, int] = {} self.message_window_start: Dict[str, datetime] = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} # REST API fallback self.rest_session: Optional[aiohttp.ClientSession] = None self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols} self.rest_tasks: Dict[str, asyncio.Task] = {} # Configuration self.max_depth = 1000 # Maximum depth for order book self.update_speed = '1000ms' # Binance update speed - reduced for stability # Timezone configuration if self.timezone_offset == '+08:00': logger.info("🕐 Configured for UTC+8 timezone (Asian markets)") elif self.timezone_offset: logger.info(f"🕐 Configured for {self.timezone_offset} timezone") else: logger.info("🕐 Configured for UTC timezone (default)") logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: logger.error("WebSockets module not available - COB data will be limited to REST API") def add_cob_callback(self, callback: Callable): """Add callback for COB data updates""" self.cob_callbacks.append(callback) def add_error_callback(self, callback: Callable): """Add callback for error notifications""" self.error_callbacks.append(callback) async def start(self): """Start COB WebSocket connections""" logger.info("Starting Enhanced COB WebSocket system") # Initialize REST session for fallback await self._init_rest_session() # Start WebSocket connections for each symbol for symbol in self.symbols: await self._start_symbol_websocket(symbol) # Start monitoring task asyncio.create_task(self._monitor_connections()) logger.info("Enhanced COB WebSocket system started") async def stop(self): """Stop all WebSocket connections""" logger.info("Stopping Enhanced COB WebSocket system") # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Cancel all REST tasks for symbol, task in self.rest_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Close REST session if self.rest_session: await self.rest_session.close() # Remove from instances registry symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: del EnhancedCOBWebSocket._instances[symbols_key] logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): """Initialize REST API session for fallback and snapshots""" try: # Windows-compatible configuration without aiodns timeout = aiohttp.ClientTimeout(total=10, connect=5) connector = aiohttp.TCPConnector( limit=100, limit_per_host=10, enable_cleanup_closed=True, use_dns_cache=False, # Disable DNS cache to avoid aiodns family=0 # Use default family ) self.rest_session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} ) logger.info("✅ REST API session initialized (Windows compatible)") except Exception as e: logger.warning(f"⚠️ Failed to initialize REST session: {e}") # Try with minimal configuration try: self.rest_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=10), connector=aiohttp.TCPConnector(use_dns_cache=False) ) logger.info("✅ REST API session initialized with minimal config") except Exception as e2: logger.warning(f"⚠️ Failed to initialize minimal REST session: {e2}") # Continue without REST session - WebSocket only self.rest_session = None async def _get_order_book_snapshot(self, symbol: str): """Get initial order book snapshot from REST API This is necessary for properly maintaining the order book state with the WebSocket depth stream. """ try: # Ensure REST session is available if not self.rest_session: await self._init_rest_session() if not self.rest_session: logger.warning(f"⚠️ Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only") return # Convert symbol format for Binance API binance_symbol = symbol.replace('/', '') # Get order book snapshot with maximum depth url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000" logger.debug(f"🔍 Getting order book snapshot for {symbol} from {url}") async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() # Validate response structure if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data: logger.error(f"❌ Invalid order book snapshot response for {symbol}: missing bids/asks") return # Initialize order book state for proper WebSocket synchronization if symbol not in self.order_books: self.order_books[symbol] = {'bids': {}, 'asks': {}} # Clear existing data and populate with snapshot self.order_books[symbol]['bids'] = {float(price): float(qty) for price, qty in data['bids'] if float(qty) > 0} self.order_books[symbol]['asks'] = {float(price): float(qty) for price, qty in data['asks'] if float(qty) > 0} # Store last update ID for synchronization if 'lastUpdateId' in data: self.last_update_ids[symbol] = data['lastUpdateId'] logger.debug(f"Order book snapshot for {symbol}: lastUpdateId = {data['lastUpdateId']}") # Mark as initialized self.order_book_initialized[symbol] = True logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") # Create initial COB data from snapshot bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0] asks = [{'price': float(price), 'size': float(qty)} for price, qty in data['asks'] if float(qty) > 0] # Sort bids (descending) and asks (ascending) bids.sort(key=lambda x: x['price'], reverse=True) asks.sort(key=lambda x: x['price']) # Create COB data structure if we have valid data if bids and asks: best_bid = bids[0] best_ask = asks[0] mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Calculate volumes bid_volume = sum(bid['size'] * bid['price'] for bid in bids) ask_volume = sum(ask['size'] * ask['price'] for ask in asks) total_volume = bid_volume + ask_volume cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids, 'asks': asks, 'source': 'rest_snapshot', 'exchange': 'binance', 'stats': { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'imbalance': (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0, 'bid_levels': len(bids), 'ask_levels': len(asks), 'timestamp': datetime.now().isoformat() } } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps") else: logger.warning(f"⚠️ No valid bid/ask data in snapshot for {symbol}") elif response.status == 429: logger.warning(f"⚠️ Rate limited getting snapshot for {symbol}, will continue with WebSocket only") else: logger.error(f"❌ Failed to get order book snapshot for {symbol}: HTTP {response.status}") response_text = await response.text() logger.debug(f"Response: {response_text}") except asyncio.TimeoutError: logger.warning(f"⚠️ Timeout getting order book snapshot for {symbol}, will continue with WebSocket only") except Exception as e: logger.warning(f"⚠️ Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only") logger.debug(f"Snapshot error details: {e}") # Don't fail the entire connection due to snapshot issues async def _start_symbol_websocket(self, symbol: str): """Start WebSocket connection for a specific symbol""" if not WEBSOCKETS_AVAILABLE: logger.warning(f"WebSockets not available for {symbol}, starting REST fallback") await self._start_rest_fallback(symbol) return # Cancel existing task if running if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): self.websocket_tasks[symbol].cancel() # Start new WebSocket task self.websocket_tasks[symbol] = asyncio.create_task( self._websocket_connection_loop(symbol) ) logger.info(f"Started WebSocket task for {symbol}") async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic Subscribes to multiple streams via combined stream endpoint: - Order book depth (@depth@1000ms) - 1-second candlesticks (@kline_1s) - 24hr ticker with volume (@ticker) - Aggregated trades (@aggTrade) """ status = self.status[symbol] while True: try: logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 # Create WebSocket URL with combined streams for multiple data types ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT # Subscribe to multiple streams: # 1. depth@1000ms - Order book depth updates # 2. kline_1s - 1-second candlestick data (with optional timezone offset) # 3. ticker - 24hr ticker statistics (includes volume) # 4. aggTrade - Aggregated trade data for volume analysis # Configure kline stream with timezone offset if specified if self.timezone_offset: kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}" logger.info(f"Using timezone offset {self.timezone_offset} for kline stream") else: kline_stream = f"{ws_symbol}@kline_1s" logger.info("Using UTC timezone for kline stream") streams = [ f"{ws_symbol}@depth@1000ms", # Order book depth kline_stream, # 1-second candlesticks (with timezone) f"{ws_symbol}@ticker", # 24hr ticker with volume f"{ws_symbol}@aggTrade" # Aggregated trades ] # Use combined stream endpoint streams_param = "/".join(streams) ws_url = f"wss://stream.binance.com:9443/stream?streams={streams_param}" logger.info(f"Connecting to: {ws_url}") # Add ping/pong handling and proper connection management async with websockets_connect( ws_url, ping_interval=20, # Binance sends ping every 20 seconds ping_timeout=60, # Binance disconnects after 1 minute without pong close_timeout=10 ) as websocket: # Connection successful status.connected = True status.last_error = None status.reset_reconnect_delay() logger.info(f"WebSocket connected for {symbol} with ping/pong handling") await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") # Deactivate REST fallback if self.rest_fallback_active[symbol]: await self._stop_rest_fallback(symbol) # Track connection start time for 24-hour limit compliance connection_start = datetime.now() # Message receiving loop with proper ping/pong handling try: async for message in websocket: try: # Check for 24-hour connection limit (Binance requirement) if (datetime.now() - connection_start).total_seconds() > 23.5 * 3600: # 23.5 hours logger.info(f"Approaching 24-hour limit for {symbol}, reconnecting...") break # Handle different message types if isinstance(message, bytes): # Handle ping frames (though websockets library handles this automatically) continue # Rate limiting: Binance allows max 5 messages per second now = datetime.now() # Initialize rate limiting tracking if symbol not in self.message_window_start: self.message_window_start[symbol] = now self.message_count[symbol] = 0 # Reset counter every second if (now - self.message_window_start[symbol]).total_seconds() >= 1.0: self.message_window_start[symbol] = now self.message_count[symbol] = 0 # Check rate limit (5 messages per second) if self.message_count[symbol] >= 5: continue # Skip this message to comply with rate limit self.message_count[symbol] += 1 self.last_message_time[symbol] = now # Parse JSON message data = json.loads(message) # Handle combined stream format if 'stream' in data and 'data' in data: # Combined stream format: {"stream":"","data":} stream_name = data['stream'] stream_data = data['data'] await self._process_combined_stream_message(symbol, stream_name, stream_data) else: # Single stream format (fallback) await self._process_websocket_message(symbol, data) status.last_message_time = now status.messages_received += 1 except json.JSONDecodeError as e: logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") except websockets.exceptions.ConnectionClosedOK: logger.info(f"WebSocket connection closed normally for {symbol}") except websockets.exceptions.ConnectionClosedError as e: logger.warning(f"WebSocket connection closed with error for {symbol}: {e}") raise except ConnectionClosed as e: status.connected = False status.last_error = f"Connection closed: {e}" logger.warning(f"WebSocket connection closed for {symbol}: {e}") except WebSocketException as e: status.connected = False status.last_error = f"WebSocket error: {e}" logger.error(f"WebSocket error for {symbol}: {e}") except Exception as e: status.connected = False status.last_error = f"Unexpected error: {e}" logger.error(f"Unexpected WebSocket error for {symbol}: {e}") logger.error(traceback.format_exc()) # Connection failed or closed - start REST fallback await self._notify_dashboard_status(symbol, "disconnected", status.last_error) await self._start_rest_fallback(symbol) # Wait before reconnecting status.increase_reconnect_delay() logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}") await asyncio.sleep(status.reconnect_delay) async def _process_websocket_message(self, symbol: str, data: Dict): """Process WebSocket message and convert to COB format Properly handles Binance depth stream format according to documentation: - depthUpdate events with U (first update ID) and u (final update ID) - Maintains local order book with proper synchronization """ try: # Check if this is a depth update event if data.get('e') == 'depthUpdate': # Handle diff depth stream according to Binance documentation await self._handle_depth_update(symbol, data) return # Fallback: handle partial book depth format (depth5, depth10, depth20) # Extract bids and asks from the message - handle all possible formats bids_data = data.get('bids', data.get('b', [])) asks_data = data.get('asks', data.get('a', [])) # Process the order book data - filter out zero quantities # Binance uses 0 quantity to indicate removal from the book valid_bids = [] valid_asks = [] # Process bids for bid in bids_data: try: if len(bid) >= 2: price = float(bid[0]) size = float(bid[1]) if size > 0: # Only include non-zero quantities valid_bids.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Process asks for ask in asks_data: try: if len(ask) >= 2: price = float(ask[0]) size = float(ask[1]) if size > 0: # Only include non-zero quantities valid_asks.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Sort bids (descending) and asks (ascending) for proper order book valid_bids.sort(key=lambda x: x['price'], reverse=True) valid_asks.sort(key=lambda x: x['price']) # Limit to maximum depth (1000 levels for maximum DOM) max_depth = 1000 if len(valid_bids) > max_depth: valid_bids = valid_bids[:max_depth] if len(valid_asks) > max_depth: valid_asks = valid_asks[:max_depth] # Create COB data structure matching the working dashboard format cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': valid_bids, 'asks': valid_asks, 'source': 'enhanced_websocket', 'exchange': 'binance' } # Calculate comprehensive stats if we have valid data if valid_bids and valid_asks: best_bid = valid_bids[0] # Already sorted, first is highest best_ask = valid_asks[0] # Already sorted, first is lowest # Core price metrics mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Volume calculations (notional value) - limit to top 20 levels for performance top_bids = valid_bids[:20] top_asks = valid_asks[:20] bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) # Size calculations (base currency) bid_size = sum(bid['size'] for bid in top_bids) ask_size = sum(ask['size'] for ask in top_asks) # Imbalance calculations total_volume = bid_volume + ask_volume volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 total_size = bid_size + ask_size size_imbalance = (bid_size - ask_size) / total_size if total_size > 0 else 0 cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'bid_liquidity': bid_volume, # Add liquidity fields 'ask_liquidity': ask_volume, 'total_bid_liquidity': bid_volume, 'total_ask_liquidity': ask_volume, 'bid_size': bid_size, 'ask_size': ask_size, 'volume_imbalance': volume_imbalance, 'size_imbalance': size_imbalance, 'imbalance': volume_imbalance, # Default to volume imbalance 'bid_levels': len(valid_bids), 'ask_levels': len(valid_asks), 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), # Binance update ID 'event_time': data.get('E', 0) # Binance event time } else: # Provide default stats if no valid data cob_data['stats'] = { 'best_bid': 0, 'best_ask': 0, 'mid_price': 0, 'spread': 0, 'spread_bps': 0, 'bid_volume': 0, 'ask_volume': 0, 'total_bid_volume': 0, 'total_ask_volume': 0, 'bid_size': 0, 'ask_size': 0, 'volume_imbalance': 0, 'size_imbalance': 0, 'imbalance': 0, 'bid_levels': 0, 'ask_levels': 0, 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), 'event_time': data.get('E', 0) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB callback: {e}") # Log success with key metrics (only for non-empty updates) if valid_bids and valid_asks: logger.debug(f"{symbol}: ${cob_data['stats']['mid_price']:.2f}, {len(valid_bids)} bids, {len(valid_asks)} asks, spread: {cob_data['stats']['spread_bps']:.1f} bps") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") import traceback logger.debug(traceback.format_exc()) async def _handle_depth_update(self, symbol: str, data: Dict): """Handle Binance depth update events according to documentation""" try: # Extract update IDs first_update_id = data.get('U') # First update ID in event final_update_id = data.get('u') # Final update ID in event if first_update_id is None or final_update_id is None: logger.warning(f"Missing update IDs in depth update for {symbol}") return # Initialize order book if needed if symbol not in self.order_book_initialized or not self.order_book_initialized[symbol]: # Get initial snapshot from REST API await self._initialize_order_book(symbol) # Check if we have a valid order book if symbol not in self.order_books: logger.warning(f"Order book not initialized for {symbol}, skipping update") return # Get last update ID for this symbol last_update_id = self.last_update_ids.get(symbol, 0) # Apply Binance synchronization rules if final_update_id <= last_update_id: # Event is older than our current state, ignore return if first_update_id > last_update_id + 1: # Gap detected, need to reinitialize logger.warning(f"Gap detected in depth updates for {symbol}, reinitializing order book") self.order_book_initialized[symbol] = False await self._initialize_order_book(symbol) return # Apply updates to local order book bids_updates = data.get('b', []) asks_updates = data.get('a', []) # Update bids for bid_update in bids_updates: if len(bid_update) >= 2: price = float(bid_update[0]) quantity = float(bid_update[1]) if quantity == 0: # Remove price level self.order_books[symbol]['bids'].pop(price, None) else: # Update price level self.order_books[symbol]['bids'][price] = quantity # Update asks for ask_update in asks_updates: if len(ask_update) >= 2: price = float(ask_update[0]) quantity = float(ask_update[1]) if quantity == 0: # Remove price level self.order_books[symbol]['asks'].pop(price, None) else: # Update price level self.order_books[symbol]['asks'][price] = quantity # Update last update ID self.last_update_ids[symbol] = final_update_id # Convert to COB format and notify callbacks await self._create_cob_from_order_book(symbol) except Exception as e: logger.error(f"Error handling depth update for {symbol}: {e}") async def _initialize_order_book(self, symbol: str): """Initialize order book from REST API snapshot""" try: # Get snapshot (this method already exists) await self._get_order_book_snapshot(symbol) self.order_book_initialized[symbol] = True logger.info(f"Order book initialized for {symbol}") except Exception as e: logger.error(f"Failed to initialize order book for {symbol}: {e}") self.order_book_initialized[symbol] = False async def _create_cob_from_order_book(self, symbol: str): """Create COB data from maintained order book""" try: if symbol not in self.order_books: return order_book = self.order_books[symbol] # Sort and convert to list format bids = [{'price': price, 'size': qty} for price, qty in order_book['bids'].items()] asks = [{'price': price, 'size': qty} for price, qty in order_book['asks'].items()] # Sort bids (descending) and asks (ascending) bids.sort(key=lambda x: x['price'], reverse=True) asks.sort(key=lambda x: x['price']) # Limit depth max_depth = 1000 if len(bids) > max_depth: bids = bids[:max_depth] if len(asks) > max_depth: asks = asks[:max_depth] # Create COB data structure cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids, 'asks': asks, 'source': 'binance_depth_stream', 'exchange': 'binance' } # Calculate stats if we have valid data if bids and asks: best_bid = bids[0] best_ask = asks[0] mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Calculate volumes (top 20 levels for performance) top_bids = bids[:20] top_asks = asks[:20] bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) total_volume = bid_volume + ask_volume volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'imbalance': volume_imbalance, 'bid_levels': len(bids), 'ask_levels': len(asks), 'timestamp': datetime.now().isoformat(), 'update_id': self.last_update_ids.get(symbol, 0) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB callback: {e}") logger.debug(f"{symbol}: Depth stream update - ${cob_data.get('stats', {}).get('mid_price', 0):.2f}") except Exception as e: logger.error(f"Error creating COB from order book for {symbol}: {e}") async def _process_combined_stream_message(self, symbol: str, stream_name: str, data: Dict): """Process messages from combined stream format""" try: # Extract stream type from stream name (e.g., "btcusdt@depth@1000ms" -> "depth") stream_parts = stream_name.split('@') if len(stream_parts) < 2: logger.warning(f"Invalid stream name format: {stream_name}") return stream_type = stream_parts[1] # Route to appropriate handler based on stream type if stream_type == 'depth': await self._handle_depth_stream(symbol, data) elif stream_type == 'kline_1s': await self._handle_kline_stream(symbol, data) elif stream_type == 'ticker': await self._handle_ticker_stream(symbol, data) elif stream_type == 'aggTrade': await self._handle_aggtrade_stream(symbol, data) else: logger.debug(f"Unhandled stream type: {stream_type} for {symbol}") except Exception as e: logger.error(f"Error processing combined stream message for {symbol}: {e}") async def _handle_depth_stream(self, symbol: str, data: Dict): """Handle depth stream data (order book updates)""" try: # Check if this is a depth update event if data.get('e') == 'depthUpdate': await self._handle_depth_update(symbol, data) else: # Handle partial book depth format await self._process_websocket_message(symbol, data) except Exception as e: logger.error(f"Error handling depth stream for {symbol}: {e}") async def _handle_kline_stream(self, symbol: str, data: Dict): """Handle 1-second kline/candlestick data with timezone support""" try: if data.get('e') != 'kline': return kline_data = data.get('k', {}) if not kline_data: return # Extract timestamps (always in UTC milliseconds according to Binance docs) open_time_ms = kline_data.get('t', 0) close_time_ms = kline_data.get('T', 0) event_time_ms = data.get('E', 0) # Convert timestamps to datetime objects open_time_utc = datetime.fromtimestamp(open_time_ms / 1000) if open_time_ms else None close_time_utc = datetime.fromtimestamp(close_time_ms / 1000) if close_time_ms else None event_time_utc = datetime.fromtimestamp(event_time_ms / 1000) if event_time_ms else datetime.now() # Extract candlestick data candlestick = { 'symbol': symbol, 'timestamp': datetime.now(), 'event_time': event_time_utc, 'open_time': open_time_ms, # Raw timestamp (ms) 'close_time': close_time_ms, # Raw timestamp (ms) 'open_time_utc': open_time_utc, # Converted to UTC datetime 'close_time_utc': close_time_utc, # Converted to UTC datetime 'interval': kline_data.get('i', '1s'), 'open_price': float(kline_data.get('o', 0)), 'high_price': float(kline_data.get('h', 0)), 'low_price': float(kline_data.get('l', 0)), 'close_price': float(kline_data.get('c', 0)), 'volume': float(kline_data.get('v', 0)), # Base asset volume 'quote_volume': float(kline_data.get('q', 0)), # Quote asset volume 'trades_count': kline_data.get('n', 0), 'taker_buy_volume': float(kline_data.get('V', 0)), # Taker buy base asset volume 'taker_buy_quote_volume': float(kline_data.get('Q', 0)), # Taker buy quote asset volume 'is_closed': kline_data.get('x', False), # Is this kline closed? 'first_trade_id': kline_data.get('f', 0), 'last_trade_id': kline_data.get('L', 0), 'timezone_offset': self.timezone_offset, # Timezone info 'source': f'binance_kline_1s_{self.timezone_offset or "utc"}' } # Store latest candlestick data if not hasattr(self, 'latest_candlestick_data'): self.latest_candlestick_data = {} self.latest_candlestick_data[symbol] = candlestick # Notify callbacks with candlestick data for callback in self.cob_callbacks: try: await callback(symbol, {'type': 'candlestick', 'data': candlestick}) except Exception as e: logger.error(f"Error in candlestick callback: {e}") logger.debug(f"{symbol}: 1s Kline - O:{candlestick['open_price']:.2f} H:{candlestick['high_price']:.2f} L:{candlestick['low_price']:.2f} C:{candlestick['close_price']:.2f} V:{candlestick['volume']:.2f}") except Exception as e: logger.error(f"Error handling kline stream for {symbol}: {e}") async def _handle_ticker_stream(self, symbol: str, data: Dict): """Handle 24hr ticker data (includes volume statistics)""" try: if data.get('e') != '24hrTicker': return # Extract ticker data ticker = { 'symbol': symbol, 'timestamp': datetime.now(), 'price_change': float(data.get('p', 0)), 'price_change_percent': float(data.get('P', 0)), 'weighted_avg_price': float(data.get('w', 0)), 'last_price': float(data.get('c', 0)), 'last_qty': float(data.get('Q', 0)), 'best_bid_price': float(data.get('b', 0)), 'best_bid_qty': float(data.get('B', 0)), 'best_ask_price': float(data.get('a', 0)), 'best_ask_qty': float(data.get('A', 0)), 'open_price': float(data.get('o', 0)), 'high_price': float(data.get('h', 0)), 'low_price': float(data.get('l', 0)), 'volume': float(data.get('v', 0)), # 24hr volume 'quote_volume': float(data.get('q', 0)), # 24hr quote volume 'open_time': data.get('O', 0), 'close_time': data.get('C', 0), 'first_trade_id': data.get('F', 0), 'last_trade_id': data.get('L', 0), 'trades_count': data.get('n', 0), 'source': 'binance_24hr_ticker' } # Store latest ticker data if not hasattr(self, 'latest_ticker_data'): self.latest_ticker_data = {} self.latest_ticker_data[symbol] = ticker # Notify callbacks with ticker data for callback in self.cob_callbacks: try: await callback(symbol, {'type': 'ticker', 'data': ticker}) except Exception as e: logger.error(f"Error in ticker callback: {e}") logger.debug(f"{symbol}: 24hr Ticker - Price:{ticker['last_price']:.2f} Volume:{ticker['volume']:.2f} Change:{ticker['price_change_percent']:.2f}%") except Exception as e: logger.error(f"Error handling ticker stream for {symbol}: {e}") async def _handle_aggtrade_stream(self, symbol: str, data: Dict): """Handle aggregated trade data for real-time volume analysis""" try: if data.get('e') != 'aggTrade': return # Extract aggregated trade data agg_trade = { 'symbol': symbol, 'timestamp': datetime.now(), 'trade_id': data.get('a', 0), 'price': float(data.get('p', 0)), 'quantity': float(data.get('q', 0)), 'first_trade_id': data.get('f', 0), 'last_trade_id': data.get('l', 0), 'trade_time': data.get('T', 0), 'is_buyer_maker': data.get('m', False), # True if buyer is market maker 'notional_value': float(data.get('p', 0)) * float(data.get('q', 0)), 'source': 'binance_aggtrade' } # Store latest trade data if not hasattr(self, 'latest_trade_data'): self.latest_trade_data = {} if symbol not in self.latest_trade_data: self.latest_trade_data[symbol] = [] # Keep last 100 trades for volume analysis self.latest_trade_data[symbol].append(agg_trade) if len(self.latest_trade_data[symbol]) > 100: self.latest_trade_data[symbol] = self.latest_trade_data[symbol][-100:] # Calculate real-time volume metrics recent_trades = self.latest_trade_data[symbol][-10:] # Last 10 trades buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) total_volume = buy_volume + sell_volume volume_metrics = { 'buy_volume': buy_volume, 'sell_volume': sell_volume, 'total_volume': total_volume, 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0 } # Notify callbacks with trade data for callback in self.cob_callbacks: try: await callback(symbol, { 'type': 'aggtrade', 'data': agg_trade, 'volume_metrics': volume_metrics }) except Exception as e: logger.error(f"Error in aggtrade callback: {e}") logger.debug(f"{symbol}: AggTrade - Price:{agg_trade['price']:.2f} Qty:{agg_trade['quantity']:.4f} BuyerMaker:{agg_trade['is_buyer_maker']}") except Exception as e: logger.error(f"Error handling aggtrade stream for {symbol}: {e}") async def _start_rest_fallback(self, symbol: str): """Start REST API fallback for a symbol""" if self.rest_fallback_active[symbol]: return # Already active self.rest_fallback_active[symbol] = True # Cancel existing REST task if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() # Start new REST task self.rest_tasks[symbol] = asyncio.create_task( self._rest_fallback_loop(symbol) ) logger.warning(f"Started REST API fallback for {symbol}") await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback") async def _stop_rest_fallback(self, symbol: str): """Stop REST API fallback for a symbol""" if not self.rest_fallback_active[symbol]: return self.rest_fallback_active[symbol] = False if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() logger.info(f"Stopped REST API fallback for {symbol}") async def _rest_fallback_loop(self, symbol: str): """REST API fallback loop""" while self.rest_fallback_active[symbol]: try: await self._fetch_rest_orderbook(symbol) await asyncio.sleep(1) # Update every second except asyncio.CancelledError: break except Exception as e: logger.error(f"REST fallback error for {symbol}: {e}") await asyncio.sleep(5) # Wait longer on error async def _fetch_rest_orderbook(self, symbol: str): """Fetch order book data via REST API""" try: if not self.rest_session: return # Binance REST API rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000" async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']], 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']], 'source': 'rest_fallback', 'exchange': 'binance' } # Calculate stats if cob_data['bids'] and cob_data['asks']: best_bid = max(cob_data['bids'], key=lambda x: x['price']) best_ask = min(cob_data['asks'], key=lambda x: x['price']) cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'spread': best_ask['price'] - best_bid['price'], 'mid_price': (best_bid['price'] + best_ask['price']) / 2, 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") else: logger.warning(f"REST API error for {symbol}: HTTP {response.status}") except Exception as e: logger.error(f"Error fetching REST order book for {symbol}: {e}") async def _monitor_connections(self): """Monitor WebSocket connections and provide status updates""" while True: try: await asyncio.sleep(10) # Check every 10 seconds for symbol in self.symbols: status = self.status[symbol] # Check for stale connections (no messages for 30 seconds) if status.connected and status.last_message_time: time_since_last = (datetime.now() - status.last_message_time).total_seconds() if time_since_last > 30: logger.warning(f"Stale connection detected for {symbol}: {time_since_last:.1f}s since last message") status.connected = False status.last_error = f"Stale connection: {time_since_last:.1f}s without messages" await self._notify_dashboard_status(symbol, "stale", status.last_error) # Log connection status if status.connected: logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") else: logger.debug(f"{symbol}: Disconnected - {status.last_error}") # Log overall system status connected_count = sum(1 for status in self.status.values() if status.connected) logger.debug(f"COB WebSocket status: {connected_count}/{len(self.symbols)} symbols connected") except Exception as e: logger.error(f"Error in connection monitor: {e}") await asyncio.sleep(10) async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of WebSocket status changes""" try: if self.dashboard_callback: status_data = { 'type': 'websocket_status', 'data': { 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() } } await self.dashboard_callback(symbol, status_data) except Exception as e: logger.error(f"Error notifying dashboard status for {symbol}: {e}") def get_latest_candlestick(self, symbol: str) -> Optional[Dict]: """Get the latest 1-second candlestick data for a symbol""" if hasattr(self, 'latest_candlestick_data'): return self.latest_candlestick_data.get(symbol) return None def get_latest_ticker(self, symbol: str) -> Optional[Dict]: """Get the latest 24hr ticker data for a symbol""" if hasattr(self, 'latest_ticker_data'): return self.latest_ticker_data.get(symbol) return None def get_latest_trades(self, symbol: str, count: int = 10) -> List[Dict]: """Get the latest aggregated trades for a symbol""" if hasattr(self, 'latest_trade_data') and symbol in self.latest_trade_data: return self.latest_trade_data[symbol][-count:] return [] def get_volume_metrics(self, symbol: str, timeframe_seconds: int = 60) -> Dict: """Calculate volume metrics for a given timeframe""" try: if not hasattr(self, 'latest_trade_data') or symbol not in self.latest_trade_data: return {'error': 'No trade data available'} current_time = datetime.now().timestamp() * 1000 # Convert to milliseconds cutoff_time = current_time - (timeframe_seconds * 1000) # Filter trades within timeframe recent_trades = [ trade for trade in self.latest_trade_data[symbol] if trade['trade_time'] >= cutoff_time ] if not recent_trades: return {'error': 'No recent trades in timeframe'} # Calculate metrics buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker']) sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker']) total_volume = buy_volume + sell_volume trade_count = len(recent_trades) avg_trade_size = total_volume / trade_count if trade_count > 0 else 0 return { 'timeframe_seconds': timeframe_seconds, 'trade_count': trade_count, 'buy_volume': buy_volume, 'sell_volume': sell_volume, 'total_volume': total_volume, 'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'), 'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0, 'avg_trade_size': avg_trade_size, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error calculating volume metrics for {symbol}: {e}") return {'error': str(e)} def _convert_timestamp_to_timezone(self, timestamp_ms: int, target_timezone: str = None) -> datetime: """Convert UTC timestamp to specified timezone Args: timestamp_ms: UTC timestamp in milliseconds target_timezone: Target timezone (e.g., '+08:00' for UTC+8) Returns: datetime object in target timezone """ try: from datetime import timezone, timedelta # Convert to UTC datetime utc_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if not target_timezone: return utc_dt # Parse timezone offset (e.g., '+08:00') if target_timezone.startswith('+') or target_timezone.startswith('-'): sign = 1 if target_timezone.startswith('+') else -1 hours, minutes = map(int, target_timezone[1:].split(':')) offset = timedelta(hours=sign * hours, minutes=sign * minutes) target_tz = timezone(offset) return utc_dt.astimezone(target_tz) return utc_dt except Exception as e: logger.error(f"Error converting timestamp to timezone: {e}") return datetime.fromtimestamp(timestamp_ms / 1000) def get_kline_with_timezone(self, symbol: str, target_timezone: str = None) -> Optional[Dict]: """Get latest kline data with timezone conversion Args: symbol: Trading symbol target_timezone: Target timezone (e.g., '+08:00' for UTC+8) Returns: Kline data with timezone-converted timestamps """ try: candlestick = self.get_latest_candlestick(symbol) if not candlestick: return None # Create a copy with timezone conversions result = candlestick.copy() if target_timezone and candlestick.get('open_time'): result['open_time_local'] = self._convert_timestamp_to_timezone( candlestick['open_time'], target_timezone ) result['close_time_local'] = self._convert_timestamp_to_timezone( candlestick['close_time'], target_timezone ) result['target_timezone'] = target_timezone return result except Exception as e: logger.error(f"Error getting kline with timezone for {symbol}: {e}") return None status.last_message_time: time_since_last = datetime.now() - status.last_message_time if time_since_last > timedelta(seconds=30): logger.warning(f"No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") await self._notify_dashboard_status(symbol, "stale", "No recent messages") # Log status if status.connected: logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") elif self.rest_fallback_active[symbol]: logger.debug(f"{symbol}: Using REST fallback") else: logger.debug(f"{symbol}: Disconnected, last error: {status.last_error}") except Exception as e: logger.error(f"Error in connection monitor: {e}") async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of status changes""" try: if self.dashboard_callback: status_data = { 'type': 'cob_status', 'symbol': symbol, 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() } # Check if callback is async or sync if asyncio.iscoroutinefunction(self.dashboard_callback): await self.dashboard_callback(status_data) else: # Call sync function directly self.dashboard_callback(status_data) except Exception as e: logger.error(f"Error notifying dashboard: {e}") def get_status_summary(self) -> Dict[str, Any]: """Get status summary for all symbols""" summary = { 'websockets_available': WEBSOCKETS_AVAILABLE, 'symbols': {}, 'overall_status': 'unknown' } connected_count = 0 fallback_count = 0 for symbol in self.symbols: status = self.status[symbol] symbol_status = { 'connected': status.connected, 'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None, 'connection_attempts': status.connection_attempts, 'last_error': status.last_error, 'messages_received': status.messages_received, 'rest_fallback_active': self.rest_fallback_active[symbol] } if status.connected: connected_count += 1 elif self.rest_fallback_active[symbol]: fallback_count += 1 summary['symbols'][symbol] = symbol_status # Determine overall status if connected_count == len(self.symbols): summary['overall_status'] = 'all_connected' elif connected_count + fallback_count == len(self.symbols): summary['overall_status'] = 'partial_fallback' else: summary['overall_status'] = 'degraded' return summary # Global instance for easy access enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None) -> EnhancedCOBWebSocket: """Get or create the global enhanced COB WebSocket instance Args: symbols: List of symbols to monitor dashboard_callback: Callback function for dashboard updates timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8) """ global enhanced_cob_websocket if enhanced_cob_websocket is None: enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback, timezone_offset) await enhanced_cob_websocket.start() return enhanced_cob_websocket async def stop_enhanced_cob_websocket(): """Stop the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket: await enhanced_cob_websocket.stop() enhanced_cob_websocket = None