#!/usr/bin/env python3 """ Enhanced COB WebSocket Implementation Robust WebSocket implementation for Consolidated Order Book data with: - Maximum allowed depth subscription - Clear error handling and warnings - Automatic reconnection with exponential backoff - Fallback to REST API when WebSocket fails - Dashboard integration with status updates This replaces the existing COB WebSocket implementation with a more reliable version. """ import asyncio import json import logging import time import traceback from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Callable from collections import deque, defaultdict from dataclasses import dataclass import aiohttp import weakref try: import websockets from websockets.client import connect as websockets_connect from websockets.exceptions import ConnectionClosed, WebSocketException WEBSOCKETS_AVAILABLE = True except ImportError: websockets = None websockets_connect = None ConnectionClosed = Exception WebSocketException = Exception WEBSOCKETS_AVAILABLE = False logger = logging.getLogger(__name__) @dataclass class COBWebSocketStatus: """Status tracking for COB WebSocket connections""" connected: bool = False last_message_time: Optional[datetime] = None connection_attempts: int = 0 last_error: Optional[str] = None reconnect_delay: float = 1.0 max_reconnect_delay: float = 60.0 messages_received: int = 0 def reset_reconnect_delay(self): """Reset reconnect delay on successful connection""" self.reconnect_delay = 1.0 def increase_reconnect_delay(self): """Increase reconnect delay with exponential backoff""" self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) class EnhancedCOBWebSocket: """Enhanced COB WebSocket with robust error handling and fallback""" def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None): """ Initialize Enhanced COB WebSocket Args: symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) dashboard_callback: Callback function for dashboard status updates """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback # Connection status tracking self.status: Dict[str, COBWebSocketStatus] = { symbol: COBWebSocketStatus() for symbol in self.symbols } # Data callbacks self.cob_callbacks: List[Callable] = [] self.error_callbacks: List[Callable] = [] # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} # REST API fallback self.rest_session: Optional[aiohttp.ClientSession] = None self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols} self.rest_tasks: Dict[str, asyncio.Task] = {} # Configuration self.max_depth = 1000 # Maximum depth for order book self.update_speed = '100ms' # Binance update speed logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: logger.error("⚠️ WebSockets module not available - COB data will be limited to REST API") def add_cob_callback(self, callback: Callable): """Add callback for COB data updates""" self.cob_callbacks.append(callback) def add_error_callback(self, callback: Callable): """Add callback for error notifications""" self.error_callbacks.append(callback) async def start(self): """Start COB WebSocket connections""" logger.info("🚀 Starting Enhanced COB WebSocket system") # Initialize REST session for fallback await self._init_rest_session() # Start WebSocket connections for each symbol for symbol in self.symbols: await self._start_symbol_websocket(symbol) # Start monitoring task asyncio.create_task(self._monitor_connections()) logger.info("✅ Enhanced COB WebSocket system started") async def stop(self): """Stop all WebSocket connections""" logger.info("🛑 Stopping Enhanced COB WebSocket system") # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Cancel all REST tasks for symbol, task in self.rest_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Close REST session if self.rest_session: await self.rest_session.close() logger.info("✅ Enhanced COB WebSocket system stopped") async def _init_rest_session(self): """Initialize REST API session for fallback""" try: timeout = aiohttp.ClientTimeout(total=10) self.rest_session = aiohttp.ClientSession(timeout=timeout) logger.info("✅ REST API session initialized for fallback") except Exception as e: logger.error(f"❌ Failed to initialize REST session: {e}") async def _start_symbol_websocket(self, symbol: str): """Start WebSocket connection for a specific symbol""" if not WEBSOCKETS_AVAILABLE: logger.warning(f"⚠️ WebSockets not available for {symbol}, starting REST fallback") await self._start_rest_fallback(symbol) return # Cancel existing task if running if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): self.websocket_tasks[symbol].cancel() # Start new WebSocket task self.websocket_tasks[symbol] = asyncio.create_task( self._websocket_connection_loop(symbol) ) logger.info(f"🔌 Started WebSocket task for {symbol}") async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic""" status = self.status[symbol] while True: try: logger.info(f"🔌 Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 # Create WebSocket URL with maximum depth ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@{self.update_speed}" logger.info(f"🔗 Connecting to: {ws_url}") async with websockets_connect(ws_url) as websocket: # Connection successful status.connected = True status.last_error = None status.reset_reconnect_delay() logger.info(f"✅ WebSocket connected for {symbol}") await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") # Deactivate REST fallback if self.rest_fallback_active[symbol]: await self._stop_rest_fallback(symbol) # Message receiving loop async for message in websocket: try: data = json.loads(message) await self._process_websocket_message(symbol, data) status.last_message_time = datetime.now() status.messages_received += 1 except json.JSONDecodeError as e: logger.warning(f"⚠️ Invalid JSON from {symbol} WebSocket: {e}") except Exception as e: logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}") except ConnectionClosed as e: status.connected = False status.last_error = f"Connection closed: {e}" logger.warning(f"🔌 WebSocket connection closed for {symbol}: {e}") except WebSocketException as e: status.connected = False status.last_error = f"WebSocket error: {e}" logger.error(f"❌ WebSocket error for {symbol}: {e}") except Exception as e: status.connected = False status.last_error = f"Unexpected error: {e}" logger.error(f"❌ Unexpected WebSocket error for {symbol}: {e}") logger.error(traceback.format_exc()) # Connection failed or closed - start REST fallback await self._notify_dashboard_status(symbol, "disconnected", status.last_error) await self._start_rest_fallback(symbol) # Wait before reconnecting status.increase_reconnect_delay() logger.info(f"⏳ Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}") await asyncio.sleep(status.reconnect_delay) async def _process_websocket_message(self, symbol: str, data: Dict): """Process WebSocket message and convert to COB format""" try: # Binance depth stream format if 'b' in data and 'a' in data: # bids and asks cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['b']], 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['a']], 'source': 'websocket', 'exchange': 'binance' } # Calculate stats if cob_data['bids'] and cob_data['asks']: best_bid = max(cob_data['bids'], key=lambda x: x['price']) best_ask = min(cob_data['asks'], key=lambda x: x['price']) cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'spread': best_ask['price'] - best_bid['price'], 'mid_price': (best_bid['price'] + best_ask['price']) / 2, 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Processed WebSocket COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") except Exception as e: logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}") async def _start_rest_fallback(self, symbol: str): """Start REST API fallback for a symbol""" if self.rest_fallback_active[symbol]: return # Already active self.rest_fallback_active[symbol] = True # Cancel existing REST task if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() # Start new REST task self.rest_tasks[symbol] = asyncio.create_task( self._rest_fallback_loop(symbol) ) logger.warning(f"⚠️ Started REST API fallback for {symbol}") await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback") async def _stop_rest_fallback(self, symbol: str): """Stop REST API fallback for a symbol""" if not self.rest_fallback_active[symbol]: return self.rest_fallback_active[symbol] = False if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): self.rest_tasks[symbol].cancel() logger.info(f"✅ Stopped REST API fallback for {symbol}") async def _rest_fallback_loop(self, symbol: str): """REST API fallback loop""" while self.rest_fallback_active[symbol]: try: await self._fetch_rest_orderbook(symbol) await asyncio.sleep(1) # Update every second except asyncio.CancelledError: break except Exception as e: logger.error(f"❌ REST fallback error for {symbol}: {e}") await asyncio.sleep(5) # Wait longer on error async def _fetch_rest_orderbook(self, symbol: str): """Fetch order book data via REST API""" try: if not self.rest_session: return # Binance REST API rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000" async with self.rest_session.get(url) as response: if response.status == 200: data = await response.json() cob_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']], 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']], 'source': 'rest_fallback', 'exchange': 'binance' } # Calculate stats if cob_data['bids'] and cob_data['asks']: best_bid = max(cob_data['bids'], key=lambda x: x['price']) best_ask = min(cob_data['asks'], key=lambda x: x['price']) cob_data['stats'] = { 'best_bid': best_bid['price'], 'best_ask': best_ask['price'], 'spread': best_ask['price'] - best_bid['price'], 'mid_price': (best_bid['price'] + best_ask['price']) / 2, 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) } # Update cache self.latest_cob_data[symbol] = cob_data # Notify callbacks for callback in self.cob_callbacks: try: await callback(symbol, cob_data) except Exception as e: logger.error(f"❌ Error in COB callback: {e}") logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") else: logger.warning(f"⚠️ REST API error for {symbol}: HTTP {response.status}") except Exception as e: logger.error(f"❌ Error fetching REST order book for {symbol}: {e}") async def _monitor_connections(self): """Monitor WebSocket connections and provide status updates""" while True: try: await asyncio.sleep(10) # Check every 10 seconds for symbol in self.symbols: status = self.status[symbol] # Check for stale connections if status.connected and status.last_message_time: time_since_last = datetime.now() - status.last_message_time if time_since_last > timedelta(seconds=30): logger.warning(f"⚠️ No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") await self._notify_dashboard_status(symbol, "stale", "No recent messages") # Log status if status.connected: logger.debug(f"✅ {symbol}: Connected, {status.messages_received} messages received") elif self.rest_fallback_active[symbol]: logger.debug(f"⚠️ {symbol}: Using REST fallback") else: logger.debug(f"❌ {symbol}: Disconnected, last error: {status.last_error}") except Exception as e: logger.error(f"❌ Error in connection monitor: {e}") async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of status changes""" try: if self.dashboard_callback: await self.dashboard_callback({ 'type': 'cob_status', 'symbol': symbol, 'status': status, 'message': message, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"❌ Error notifying dashboard: {e}") def get_status_summary(self) -> Dict[str, Any]: """Get status summary for all symbols""" summary = { 'websockets_available': WEBSOCKETS_AVAILABLE, 'symbols': {}, 'overall_status': 'unknown' } connected_count = 0 fallback_count = 0 for symbol in self.symbols: status = self.status[symbol] symbol_status = { 'connected': status.connected, 'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None, 'connection_attempts': status.connection_attempts, 'last_error': status.last_error, 'messages_received': status.messages_received, 'rest_fallback_active': self.rest_fallback_active[symbol] } if status.connected: connected_count += 1 elif self.rest_fallback_active[symbol]: fallback_count += 1 summary['symbols'][symbol] = symbol_status # Determine overall status if connected_count == len(self.symbols): summary['overall_status'] = 'all_connected' elif connected_count + fallback_count == len(self.symbols): summary['overall_status'] = 'partial_fallback' else: summary['overall_status'] = 'degraded' return summary # Global instance for easy access enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None) -> EnhancedCOBWebSocket: """Get or create the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket is None: enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback) await enhanced_cob_websocket.start() return enhanced_cob_websocket async def stop_enhanced_cob_websocket(): """Stop the global enhanced COB WebSocket instance""" global enhanced_cob_websocket if enhanced_cob_websocket: await enhanced_cob_websocket.stop() enhanced_cob_websocket = None