#!/usr/bin/env python3 """ Enhanced COB WebSocket Implementation Robust WebSocket implementation for Consolidated Order Book data with: - Maximum allowed depth subscription - Clear error handling and warnings - Automatic reconnection with exponential backoff - Fallback to REST API when WebSocket fails - Dashboard integration with status updates This replaces the existing COB WebSocket implementation with a more reliable version. """ import asyncio import json import logging import time import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass import aiohttp import weakref try: import websockets from websockets.client import connect as websockets_connect from websockets.exceptions import ConnectionClosed, WebSocketException WEBSOCKETS_AVAILABLE = True except ImportError: websockets = None websockets_connect = None ConnectionClosed = Exception WebSocketException = Exception WEBSOCKETS_AVAILABLE = False logger = logging.getLogger(__name__) @dataclass class COBWebSocketStatus: """Status tracking for COB WebSocket connections""" connected: bool = False last_message_time: Optional[datetime] = None connection_attempts: int = 0 last_error: Optional[str] = None reconnect_delay: float = 1.0 max_reconnect_delay: float = 60.0 messages_received: int = 0 def reset_reconnect_delay(self): """Reset reconnect delay on successful connection""" self.reconnect_delay = 1.0 def increase_reconnect_delay(self): """Increase reconnect delay with exponential backoff""" self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) class EnhancedCOBWebSocket: """Enhanced COB WebSocket with robust error handling and fallback""" _instances = {} # Track instances by symbols to prevent duplicates def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None): """ Initialize Enhanced COB WebSocket Args: symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) dashboard_callback: Callback function for dashboard status updates """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback # Check for existing instances to prevent duplicate connections symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: logger.warning(f"EnhancedCOBWebSocket already exists for symbols {self.symbols} - reusing existing instance") existing = EnhancedCOBWebSocket._instances[symbols_key] # Copy existing instance data self.__dict__.update(existing.__dict__) return # Register this instance EnhancedCOBWebSocket._instances[symbols_key] = self # Connection status tracking self.status: Dict[str, COBWebSocketStatus] = { symbol: COBWebSocketStatus() for symbol in self.symbols } # Data callbacks self.cob_callbacks: List[Callable] = [] self.error_callbacks: List[Callable] = [] # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} # Rate limiting for message processing self.last_message_time: Dict[str, datetime] = {} self.min_message_interval = 0.1 # Minimum 100ms between messages per symbol # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} # REST API fallback self.rest_session: Optional[aiohttp.ClientSession] = None self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols} self.rest_tasks: Dict[str, asyncio.Task] = {} # Configuration self.max_depth = 1000 # Maximum depth for order book self.update_speed = '1000ms' # Binance update speed - reduced for stability logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: logger.error("WebSockets module not available - COB data will be limited to REST API") def add_cob_callback(self, callback: Callable): """Add callback for COB data updates""" self.cob_callbacks.append(callback) def add_error_callback(self, callback: Callable): """Add callback for error notifications""" self.error_callbacks.append(callback) async def start(self): """Start COB WebSocket connections""" logger.info("Starting Enhanced COB WebSocket system") # Initialize REST session for fallback await self._init_rest_session() # Start WebSocket connections for each symbol for symbol in self.symbols: await self._start_symbol_websocket(symbol) # Start monitoring task asyncio.create_task(self._monitor_connections()) logger.info("Enhanced COB WebSocket system started") async def stop(self): """Stop all WebSocket connections""" logger.info("Stopping Enhanced COB WebSocket system") # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Cancel all REST tasks for symbol, task in self.rest_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Close REST session if self.rest_session: await self.rest_session.close() # Remove from instances registry symbols_key = tuple(sorted(self.symbols)) if symbols_key in EnhancedCOBWebSocket._instances: del EnhancedCOBWebSocket._instances[symbols_key] logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): """Initialize REST API session for fallback and snapshots""" try: # Windows-compatible configuration without aiodns timeout = aiohttp.ClientTimeout(total=10, connect=5) connector = aiohttp.TCPConnector( limit=100, limit_per_host=10, enable_cleanup_closed=True, use_dns_cache=False, # Disable DNS cache to avoid aiodns family=0 # Use default family ) self.rest_session = aiohttp.ClientSession( timeout=timeout, connector=connector, headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} ) logger.info("✅ REST API session initialized (Windows compatible)") except Exception as e: logger.warning(f"⚠️ Failed to initialize REST session: {e}") # Try with minimal configuration try: self.rest_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=10), connector=aiohttp.TCPConnector(use_dns_cache=False) ) logger.info("✅ REST API session initialized with minimal config") except Exception as e2: logger.warning(f"⚠️ Failed to initialize minimal REST session: {e2}") # Continue without REST session - WebSocket only self.rest_session = None async def _get_order_book_snapshot(self, symbol: str): """Get initial order book snapshot from REST API This is necessary for properly maintaining the order book state with the WebSocket depth stream. """ try: # Ensure REST session is available if not self.rest_session: await self._init_rest_session() if not self.rest_session: logger.warning(f"⚠️ Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only") return # Convert symbol format for Binance API binance_symbol = symbol.replace('/', '') # Get order book snapshot with maximum depth url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000" logger.debug(f"🔍 Getting order book snapshot for {symbol} from {url}") async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() # Validate response structure if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data: logger.error(f"❌ Invalid order book snapshot response for {symbol}: missing bids/asks") return # Initialize order book state for proper WebSocket synchronization self.order_books[symbol] = { 'bids': {float(price): float(qty) for price, qty in data['bids']}, 'asks': {float(price): float(qty) for price, qty in data['asks']} } # Store last update ID for synchronization if 'lastUpdateId' in data: self.last_update_ids[symbol] = data['lastUpdateId'] logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") # Create initial COB data from snapshot bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0] asks = [{'price': float(price), 'size': float(qty)} for price, qty in data['asks'] if float(qty) > 0] # Sort bids (descending) and asks (ascending) bids.sort(key=lambda x: x['price'], reverse=True) asks.sort(key=lambda x: x['price']) # Create COB data structure if we have valid data if bids and asks: best_bid = bids[0] best_ask = asks[0] mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Calculate volumes bid_volume = sum(bid['size'] * bid['price'] for bid in bids) ask_volume = sum(ask['size'] * ask['price'] for ask in asks) total_volume = bid_volume + ask_volume cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids, 'asks': asks, 'source': 'rest_snapshot', 'exchange': 'binance', 'stats': { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'imbalance': (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0, 'bid_levels': len(bids), 'ask_levels': len(asks), 'timestamp': datetime.now().isoformat() } } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps") else: logger.warning(f"⚠️ No valid bid/ask data in snapshot for {symbol}") elif response.status == 429: logger.warning(f"⚠️ Rate limited getting snapshot for {symbol}, will continue with WebSocket only") else: logger.error(f"❌ Failed to get order book snapshot for {symbol}: HTTP {response.status}") response_text = await response.text() logger.debug(f"Response: {response_text}") except asyncio.TimeoutError: logger.warning(f"⚠️ Timeout getting order book snapshot for {symbol}, will continue with WebSocket only") except Exception as e: logger.warning(f"⚠️ Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only") logger.debug(f"Snapshot error details: {e}") # Don't fail the entire connection due to snapshot issues async def _start_symbol_websocket(self, symbol: str): """Start WebSocket connection for a specific symbol""" if not WEBSOCKETS_AVAILABLE: logger.warning(f"WebSockets not available for {symbol}, starting REST fallback") await self._start_rest_fallback(symbol) return # Cancel existing task if running if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): self.websocket_tasks[symbol].cancel() # Start new WebSocket task self.websocket_tasks[symbol] = asyncio.create_task( self._websocket_connection_loop(symbol) ) logger.info(f"Started WebSocket task for {symbol}") async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic Uses depth@1000ms for stable updates with maximum depth. """ status = self.status[symbol] while True: try: logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 # Create WebSocket URL with reasonable update rate - use depth@1000ms for stable updates ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@1000ms" logger.info(f"Connecting to: {ws_url}") async with websockets_connect(ws_url) as websocket: # Connection successful status.connected = True status.last_error = None status.reset_reconnect_delay() logger.info(f"WebSocket connected for {symbol}") await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") # Deactivate REST fallback if self.rest_fallback_active[symbol]: await self._stop_rest_fallback(symbol) # Message receiving loop async for message in websocket: try: # Rate limiting: skip messages that come too frequently now = datetime.now() if symbol in self.last_message_time: time_since_last = (now - self.last_message_time[symbol]).total_seconds() if time_since_last < self.min_message_interval: continue # Skip this message self.last_message_time[symbol] = now data = json.loads(message) await self._process_websocket_message(symbol, data) status.last_message_time = now status.messages_received += 1 except json.JSONDecodeError as e: logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") except ConnectionClosed as e: status.connected = False status.last_error = f"Connection closed: {e}" logger.warning(f"WebSocket connection closed for {symbol}: {e}") except WebSocketException as e: status.connected = False status.last_error = f"WebSocket error: {e}" logger.error(f"WebSocket error for {symbol}: {e}") except Exception as e: status.connected = False status.last_error = f"Unexpected error: {e}" logger.error(f"Unexpected WebSocket error for {symbol}: {e}") logger.error(traceback.format_exc()) # Connection failed or closed - start REST fallback await self._notify_dashboard_status(symbol, "disconnected", status.last_error) await self._start_rest_fallback(symbol) # Wait before reconnecting status.increase_reconnect_delay() logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}") await asyncio.sleep(status.reconnect_delay) async def _process_websocket_message(self, symbol: str, data: Dict): """Process WebSocket message and convert to COB format Based on the working implementation from cob_realtime_dashboard.py Using maximum depth for best performance - no order book maintenance needed. """ try: # Extract bids and asks from the message - handle all possible formats bids_data = data.get('b', []) asks_data = data.get('a', []) # Process the order book data - filter out zero quantities # Binance uses 0 quantity to indicate removal from the book valid_bids = [] valid_asks = [] # Process bids for bid in bids_data: try: if len(bid) >= 2: price = float(bid[0]) size = float(bid[1]) if size > 0: # Only include non-zero quantities valid_bids.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Process asks for ask in asks_data: try: if len(ask) >= 2: price = float(ask[0]) size = float(ask[1]) if size > 0: # Only include non-zero quantities valid_asks.append({'price': price, 'size': size}) except (IndexError, ValueError, TypeError): continue # Sort bids (descending) and asks (ascending) for proper order book valid_bids.sort(key=lambda x: x['price'], reverse=True) valid_asks.sort(key=lambda x: x['price']) # Limit to maximum depth (1000 levels for maximum DOM) max_depth = 1000 if len(valid_bids) > max_depth: valid_bids = valid_bids[:max_depth] if len(valid_asks) > max_depth: valid_asks = valid_asks[:max_depth] # Create COB data structure matching the working dashboard format cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': valid_bids, 'asks': valid_asks, 'source': 'enhanced_websocket', 'exchange': 'binance' } # Calculate comprehensive stats if we have valid data if valid_bids and valid_asks: best_bid = valid_bids[0] # Already sorted, first is highest best_ask = valid_asks[0] # Already sorted, first is lowest # Core price metrics mid_price = (best_bid['price'] + best_ask['price']) / 2 spread = best_ask['price'] - best_bid['price'] spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 # Volume calculations (notional value) - limit to top 20 levels for performance top_bids = valid_bids[:20] top_asks = valid_asks[:20] bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids) ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks) # Size calculations (base currency) bid_size = sum(bid['size'] for bid in top_bids) ask_size = sum(ask['size'] for ask in top_asks) # Imbalance calculations total_volume = bid_volume + ask_volume volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0 total_size = bid_size + ask_size size_imbalance = (bid_size - ask_size) / total_size if total_size > 0 else 0 cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_volume': bid_volume, 'ask_volume': ask_volume, 'total_bid_volume': bid_volume, 'total_ask_volume': ask_volume, 'bid_liquidity': bid_volume, # Add liquidity fields 'ask_liquidity': ask_volume, 'total_bid_liquidity': bid_volume, 'total_ask_liquidity': ask_volume, 'bid_size': bid_size, 'ask_size': ask_size, 'volume_imbalance': volume_imbalance, 'size_imbalance': size_imbalance, 'imbalance': volume_imbalance, # Default to volume imbalance 'bid_levels': len(valid_bids), 'ask_levels': len(valid_asks), 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), # Binance update ID 'event_time': data.get('E', 0) # Binance event time } else: # Provide default stats if no valid data cob_data['stats'] = { 'best_bid': 0, 'best_ask': 0, 'mid_price': 0, 'spread': 0, 'spread_bps': 0, 'bid_volume': 0, 'ask_volume': 0, 'total_bid_volume': 0, 'total_ask_volume': 0, 'bid_size': 0, 'ask_size': 0, 'volume_imbalance': 0, 'size_imbalance': 0, 'imbalance': 0, 'bid_levels': 0, 'ask_levels': 0, 'timestamp': datetime.now().isoformat(), 'update_id': data.get('u', 0), 'event_time': data.get('E', 0) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB callback: {e}") # Log success with key metrics (only for non-empty updates) if valid_bids and valid_asks: logger.debug(f"{symbol}: ${cob_data['stats']['mid_price']:.2f}, {len(valid_bids)} bids, {len(valid_asks)} asks, spread: {cob_data['stats']['spread_bps']:.1f} bps") except Exception as e: logger.error(f"Error processing WebSocket message for {symbol}: {e}") import traceback logger.debug(traceback.format_exc()) async def _start_rest_fallback(self, symbol: str): """Start REST API fallback for a symbol""" if self.rest_fallback_active[symbol]: return # Already active self.rest_fallback_active[symbol] = True # Cancel existing REST task if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() # Start new REST task self.rest_tasks[symbol] = asyncio.create_task( self._rest_fallback_loop(symbol) ) logger.warning(f"Started REST API fallback for {symbol}") await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback") async def _stop_rest_fallback(self, symbol: str): """Stop REST API fallback for a symbol""" if not self.rest_fallback_active[symbol]: return self.rest_fallback_active[symbol] = False if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() logger.info(f"Stopped REST API fallback for {symbol}") async def _rest_fallback_loop(self, symbol: str): """REST API fallback loop""" while self.rest_fallback_active[symbol]: try: await self._fetch_rest_orderbook(symbol) await asyncio.sleep(1) # Update every second except asyncio.CancelledError: break except Exception as e: logger.error(f"REST fallback error for {symbol}: {e}") await asyncio.sleep(5) # Wait longer on error async def _fetch_rest_orderbook(self, symbol: str): """Fetch order book data via REST API""" try: if not self.rest_session: return # Binance REST API rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000" async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']], 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']], 'source': 'rest_fallback', 'exchange': 'binance' } # Calculate stats if cob_data['bids'] and cob_data['asks']: best_bid = max(cob_data['bids'], key=lambda x: x['price']) best_ask = min(cob_data['asks'], key=lambda x: x['price']) cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'spread': best_ask['price'] - best_bid['price'], 'mid_price': (best_bid['price'] + best_ask['price']) / 2, 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") else: logger.warning(f"REST API error for {symbol}: HTTP {response.status}") except Exception as e: logger.error(f"Error fetching REST order book for {symbol}: {e}") async def _monitor_connections(self): """Monitor WebSocket connections and provide status updates""" while True: try: await asyncio.sleep(10) # Check every 10 seconds for symbol in self.symbols: status = self.status[symbol] # Check for stale connections if status.connected and status.last_message_time: time_since_last = datetime.now() - status.last_message_time if time_since_last > timedelta(seconds=30): logger.warning(f"No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") await self._notify_dashboard_status(symbol, "stale", "No recent messages") # Log status if status.connected: logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") elif self.rest_fallback_active[symbol]: logger.debug(f"{symbol}: Using REST fallback") else: logger.debug(f"{symbol}: Disconnected, last error: {status.last_error}") except Exception as e: logger.error(f"Error in connection monitor: {e}") async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of status changes""" try: if self.dashboard_callback: status_data = { 'type': 'cob_status', 'symbol': symbol, 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() } # Check if callback is async or sync if asyncio.iscoroutinefunction(self.dashboard_callback): await self.dashboard_callback(status_data) else: # Call sync function directly self.dashboard_callback(status_data) except Exception as e: logger.error(f"Error notifying dashboard: {e}") def get_status_summary(self) -> Dict[str, Any]: """Get status summary for all symbols""" summary = { 'websockets_available': WEBSOCKETS_AVAILABLE, 'symbols': {}, 'overall_status': 'unknown' } connected_count = 0 fallback_count = 0 for symbol in self.symbols: status = self.status[symbol] symbol_status = { 'connected': status.connected, 'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None, 'connection_attempts': status.connection_attempts, 'last_error': status.last_error, 'messages_received': status.messages_received, 'rest_fallback_active': self.rest_fallback_active[symbol] } if status.connected: connected_count += 1 elif self.rest_fallback_active[symbol]: fallback_count += 1 summary['symbols'][symbol] = symbol_status # Determine overall status if connected_count == len(self.symbols): summary['overall_status'] = 'all_connected' elif connected_count + fallback_count == len(self.symbols): summary['overall_status'] = 'partial_fallback' else: summary['overall_status'] = 'degraded' return summary # Global instance for easy access enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None) -> EnhancedCOBWebSocket: """Get or create the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket is None: enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback) await enhanced_cob_websocket.start() return enhanced_cob_websocket async def stop_enhanced_cob_websocket(): """Stop the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket: await enhanced_cob_websocket.stop() enhanced_cob_websocket = None