From 1084b7f5b5d7897e9e67636f9039ab23a8be9e8b Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 28 Jul 2025 10:31:24 +0300 Subject: [PATCH] cob buffered --- core/enhanced_cob_websocket.py | 350 +++++++++++++++++++++++++++------ 1 file changed, 291 insertions(+), 59 deletions(-) diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index 2167924..670a181 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -124,11 +124,16 @@ class EnhancedCOBWebSocket: # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} - # Order book management for proper diff depth stream handling + # Order book management for proper diff depth stream handling (Binance compliant) self.order_books: Dict[str, Dict] = {} # {symbol: {'bids': {price: qty}, 'asks': {price: qty}}} self.last_update_ids: Dict[str, int] = {} # Track last update IDs for synchronization self.order_book_initialized: Dict[str, bool] = {} # Track if order book is properly initialized + # Event buffering for proper Binance order book synchronization + self.event_buffers: Dict[str, List[Dict]] = {} # Buffer events before snapshot + self.first_event_u: Dict[str, int] = {} # Track first event U for synchronization + self.snapshot_in_progress: Dict[str, bool] = {} # Track snapshot initialization + # Rate limiting for message processing (Binance: max 5 messages per second) self.last_message_time: Dict[str, datetime] = {} self.min_message_interval = 0.2 # 200ms = 5 messages per second compliance @@ -217,35 +222,78 @@ class EnhancedCOBWebSocket: logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): - """Initialize REST API session for fallback and snapshots""" + """Initialize REST API session for fallback and snapshots - Windows compatible""" + import platform + try: - # Windows-compatible configuration without aiodns - timeout = aiohttp.ClientTimeout(total=10, connect=5) - connector = aiohttp.TCPConnector( - limit=100, - limit_per_host=10, - enable_cleanup_closed=True, - use_dns_cache=False, # Disable DNS cache to avoid aiodns - family=0 # Use default family - ) - self.rest_session = aiohttp.ClientSession( - timeout=timeout, - connector=connector, - headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} - ) - logger.info("✅ REST API session initialized (Windows compatible)") + # Detect Windows and use specific configuration + is_windows = platform.system().lower() == 'windows' + + if is_windows: + # Windows-specific configuration to avoid aiodns issues + import asyncio + + # Check if we're using ProactorEventLoop (Windows default) + loop = asyncio.get_event_loop() + loop_type = type(loop).__name__ + logger.debug(f"Event loop type: {loop_type}") + + # Use basic connector without DNS caching for Windows + connector = aiohttp.TCPConnector( + limit=50, + limit_per_host=10, + enable_cleanup_closed=True, + use_dns_cache=False, # Critical: disable DNS cache + resolver=None, # Use default resolver, not aiodns + family=0, # Use default address family + ssl=False # Disable SSL verification for simplicity + ) + + timeout = aiohttp.ClientTimeout(total=15, connect=10) + + self.rest_session = aiohttp.ClientSession( + timeout=timeout, + connector=connector, + headers={ + 'User-Agent': 'Enhanced-COB-WebSocket/1.0', + 'Accept': 'application/json' + } + ) + logger.info("✅ REST API session initialized (Windows ProactorEventLoop compatible)") + + else: + # Unix/Linux configuration (can use more advanced features) + connector = aiohttp.TCPConnector( + limit=100, + limit_per_host=20, + enable_cleanup_closed=True, + use_dns_cache=True + ) + + timeout = aiohttp.ClientTimeout(total=10, connect=5) + + self.rest_session = aiohttp.ClientSession( + timeout=timeout, + connector=connector, + headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} + ) + logger.info("✅ REST API session initialized (Unix/Linux)") + except Exception as e: logger.warning(f"⚠️ Failed to initialize REST session: {e}") - # Try with minimal configuration + + # Fallback: Ultra-minimal configuration try: + # Most basic configuration possible self.rest_session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=10), - connector=aiohttp.TCPConnector(use_dns_cache=False) + timeout=aiohttp.ClientTimeout(total=20), + headers={'User-Agent': 'COB-WebSocket/1.0'} ) - logger.info("✅ REST API session initialized with minimal config") + logger.info("✅ REST API session initialized with ultra-minimal config") + except Exception as e2: - logger.warning(f"⚠️ Failed to initialize minimal REST session: {e2}") - # Continue without REST session - WebSocket only + logger.error(f"❌ Failed to initialize any REST session: {e2}") + logger.info("🔄 Continuing without REST session - WebSocket-only mode") self.rest_session = None async def _get_order_book_snapshot(self, symbol: str): @@ -707,7 +755,17 @@ class EnhancedCOBWebSocket: logger.debug(traceback.format_exc()) async def _handle_depth_update(self, symbol: str, data: Dict): - """Handle Binance depth update events according to documentation""" + """Handle Binance depth update events following official recommendations + + Binance Order Book Management Procedure: + 1. Buffer events received from stream, note U of first event + 2. Get depth snapshot from REST API + 3. If snapshot lastUpdateId < first event U, get new snapshot + 4. Discard buffered events where u <= snapshot lastUpdateId + 5. First buffered event should have lastUpdateId within [U;u] range + 6. Set local order book to snapshot + 7. Apply buffered events, then subsequent events + """ try: # Extract update IDs first_update_id = data.get('U') # First update ID in event @@ -717,29 +775,188 @@ class EnhancedCOBWebSocket: logger.warning(f"Missing update IDs in depth update for {symbol}") return - # Initialize order book if needed - if symbol not in self.order_book_initialized or not self.order_book_initialized[symbol]: - # Get initial snapshot from REST API - await self._initialize_order_book(symbol) + # Initialize buffers if needed + if symbol not in self.event_buffers: + self.event_buffers[symbol] = [] + self.first_event_u[symbol] = first_update_id + self.snapshot_in_progress[symbol] = False + logger.debug(f"Started buffering events for {symbol}, first U: {first_update_id}") - # Check if we have a valid order book - if symbol not in self.order_books: - logger.warning(f"Order book not initialized for {symbol}, skipping update") + # Check if order book is initialized and synchronized + if not self.order_book_initialized.get(symbol, False): + # Buffer this event + self.event_buffers[symbol].append(data) + logger.debug(f"Buffered event for {symbol}: U={first_update_id}, u={final_update_id}") + + # Start snapshot initialization if not already in progress + if not self.snapshot_in_progress.get(symbol, False): + self.snapshot_in_progress[symbol] = True + await self._initialize_order_book_with_buffering(symbol) + return - # Get last update ID for this symbol + # Order book is initialized, apply update directly + await self._apply_depth_update(symbol, data) + + except Exception as e: + logger.error(f"Error handling depth update for {symbol}: {e}") + + async def _initialize_order_book_with_buffering(self, symbol: str): + """Initialize order book following Binance recommendations with event buffering""" + try: + max_attempts = 3 + attempt = 0 + + while attempt < max_attempts: + attempt += 1 + logger.info(f"Initializing order book for {symbol} (attempt {attempt})") + + # Get snapshot from REST API + snapshot_data = await self._get_order_book_snapshot_data(symbol) + if not snapshot_data: + logger.error(f"Failed to get snapshot for {symbol}") + await asyncio.sleep(1) + continue + + snapshot_last_update_id = snapshot_data.get('lastUpdateId', 0) + first_buffered_u = self.first_event_u.get(symbol, 0) + + logger.debug(f"Snapshot lastUpdateId: {snapshot_last_update_id}, First buffered U: {first_buffered_u}") + + # Check if snapshot is valid (step 3 in Binance procedure) + if snapshot_last_update_id < first_buffered_u: + logger.warning(f"Snapshot too old for {symbol}: {snapshot_last_update_id} < {first_buffered_u}, retrying...") + await asyncio.sleep(0.5) + continue + + # Snapshot is valid, set up order book + await self._setup_order_book_from_snapshot(symbol, snapshot_data) + + # Process buffered events (steps 4-7 in Binance procedure) + await self._process_buffered_events(symbol, snapshot_last_update_id) + + # Mark as initialized + self.order_book_initialized[symbol] = True + self.snapshot_in_progress[symbol] = False + + logger.info(f"✅ Order book initialized for {symbol} with {len(self.event_buffers[symbol])} buffered events") + return + + logger.error(f"❌ Failed to initialize order book for {symbol} after {max_attempts} attempts") + self.snapshot_in_progress[symbol] = False + + except Exception as e: + logger.error(f"Error initializing order book with buffering for {symbol}: {e}") + self.snapshot_in_progress[symbol] = False + + async def _get_order_book_snapshot_data(self, symbol: str) -> Optional[Dict]: + """Get order book snapshot data from REST API""" + try: + if not self.rest_session: + await self._init_rest_session() + + if not self.rest_session: + return None + + # Convert symbol format for Binance API + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000" + + async with self.rest_session.get(url) as response: + if response.status == 200: + return await response.json() + else: + logger.error(f"Failed to get snapshot for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting snapshot data for {symbol}: {e}") + return None + + async def _setup_order_book_from_snapshot(self, symbol: str, snapshot_data: Dict): + """Set up local order book from REST API snapshot""" + try: + # Initialize order book structure + if symbol not in self.order_books: + self.order_books[symbol] = {'bids': {}, 'asks': {}} + + # Clear existing data and populate with snapshot + self.order_books[symbol]['bids'] = { + float(price): float(qty) for price, qty in snapshot_data['bids'] if float(qty) > 0 + } + self.order_books[symbol]['asks'] = { + float(price): float(qty) for price, qty in snapshot_data['asks'] if float(qty) > 0 + } + + # Set last update ID from snapshot + self.last_update_ids[symbol] = snapshot_data['lastUpdateId'] + + logger.debug(f"Set up order book for {symbol}: {len(self.order_books[symbol]['bids'])} bids, {len(self.order_books[symbol]['asks'])} asks, lastUpdateId: {snapshot_data['lastUpdateId']}") + + except Exception as e: + logger.error(f"Error setting up order book from snapshot for {symbol}: {e}") + + async def _process_buffered_events(self, symbol: str, snapshot_last_update_id: int): + """Process buffered events according to Binance procedure""" + try: + buffered_events = self.event_buffers.get(symbol, []) + valid_events = [] + + # Step 4: Discard events where u <= lastUpdateId of snapshot + for event in buffered_events: + event_final_u = event.get('u', 0) + if event_final_u > snapshot_last_update_id: + valid_events.append(event) + else: + logger.debug(f"Discarded buffered event for {symbol}: u={event_final_u} <= snapshot lastUpdateId={snapshot_last_update_id}") + + # Step 5: Validate first buffered event + if valid_events: + first_event = valid_events[0] + first_u = first_event.get('U', 0) + first_final_u = first_event.get('u', 0) + + if not (first_u <= snapshot_last_update_id <= first_final_u): + logger.error(f"❌ Synchronization error for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} not in range [{first_u}, {first_final_u}]") + # Reset and try again + self.order_book_initialized[symbol] = False + self.event_buffers[symbol] = [] + return + + logger.debug(f"✅ First buffered event valid for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} in range [{first_u}, {first_final_u}]") + + # Step 6-7: Apply all valid buffered events + for event in valid_events: + await self._apply_depth_update(symbol, event) + + # Clear buffer + self.event_buffers[symbol] = [] + + logger.info(f"Processed {len(valid_events)} buffered events for {symbol}") + + except Exception as e: + logger.error(f"Error processing buffered events for {symbol}: {e}") + + async def _apply_depth_update(self, symbol: str, data: Dict): + """Apply a single depth update to the local order book""" + try: + first_update_id = data.get('U', 0) + final_update_id = data.get('u', 0) last_update_id = self.last_update_ids.get(symbol, 0) - # Apply Binance synchronization rules + # Validate update sequence if final_update_id <= last_update_id: # Event is older than our current state, ignore + logger.debug(f"Ignoring old event for {symbol}: u={final_update_id} <= lastUpdateId={last_update_id}") return if first_update_id > last_update_id + 1: # Gap detected, need to reinitialize - logger.warning(f"Gap detected in depth updates for {symbol}, reinitializing order book") + logger.warning(f"Gap detected in depth updates for {symbol}: U={first_update_id} > lastUpdateId+1={last_update_id+1}") self.order_book_initialized[symbol] = False - await self._initialize_order_book(symbol) + self.event_buffers[symbol] = [] + self.first_event_u[symbol] = first_update_id + await self._initialize_order_book_with_buffering(symbol) return # Apply updates to local order book @@ -778,16 +995,22 @@ class EnhancedCOBWebSocket: # Convert to COB format and notify callbacks await self._create_cob_from_order_book(symbol) + logger.debug(f"Applied depth update for {symbol}: U={first_update_id}, u={final_update_id}") + except Exception as e: - logger.error(f"Error handling depth update for {symbol}: {e}") + logger.error(f"Error applying depth update for {symbol}: {e}") async def _initialize_order_book(self, symbol: str): - """Initialize order book from REST API snapshot""" + """Initialize order book from REST API snapshot (legacy method)""" try: - # Get snapshot (this method already exists) - await self._get_order_book_snapshot(symbol) - self.order_book_initialized[symbol] = True - logger.info(f"Order book initialized for {symbol}") + # Use the new buffering approach for proper synchronization + if symbol not in self.event_buffers: + self.event_buffers[symbol] = [] + self.first_event_u[symbol] = 0 + self.snapshot_in_progress[symbol] = False + + await self._initialize_order_book_with_buffering(symbol) + except Exception as e: logger.error(f"Failed to initialize order book for {symbol}: {e}") self.order_book_initialized[symbol] = False @@ -1202,9 +1425,11 @@ class EnhancedCOBWebSocket: status.last_error = f"Stale connection: {time_since_last:.1f}s without messages" await self._notify_dashboard_status(symbol, "stale", status.last_error) - # Log connection status + # Log connection status and order book sync status if status.connected: - logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") + ob_status = "✅ Synced" if self.order_book_initialized.get(symbol, False) else "🔄 Syncing" + buffered_count = len(self.event_buffers.get(symbol, [])) + logger.debug(f"{symbol}: Connected, {status.messages_received} messages, OB: {ob_status}, Buffered: {buffered_count}") else: logger.debug(f"{symbol}: Disconnected - {status.last_error}") @@ -1358,22 +1583,29 @@ class EnhancedCOBWebSocket: except Exception as e: logger.error(f"Error getting kline with timezone for {symbol}: {e}") - return None status.last_message_time: - time_since_last = datetime.now() - status.last_message_time - if time_since_last > timedelta(seconds=30): - logger.warning(f"No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") - await self._notify_dashboard_status(symbol, "stale", "No recent messages") - - # Log status - if status.connected: - logger.debug(f"{symbol}: Connected, {status.messages_received} messages received") - elif self.rest_fallback_active[symbol]: - logger.debug(f"{symbol}: Using REST fallback") - else: - logger.debug(f"{symbol}: Disconnected, last error: {status.last_error}") - - except Exception as e: - logger.error(f"Error in connection monitor: {e}") + return None + + def get_order_book_status(self, symbol: str) -> Dict: + """Get detailed order book synchronization status""" + try: + return { + 'symbol': symbol, + 'initialized': self.order_book_initialized.get(symbol, False), + 'snapshot_in_progress': self.snapshot_in_progress.get(symbol, False), + 'buffered_events': len(self.event_buffers.get(symbol, [])), + 'first_event_u': self.first_event_u.get(symbol, 0), + 'last_update_id': self.last_update_ids.get(symbol, 0), + 'bid_levels': len(self.order_books.get(symbol, {}).get('bids', {})), + 'ask_levels': len(self.order_books.get(symbol, {}).get('asks', {})), + 'timestamp': datetime.now().isoformat() + } + except Exception as e: + logger.error(f"Error getting order book status for {symbol}: {e}") + return {'error': str(e)} + + def get_all_order_book_status(self) -> Dict[str, Dict]: + """Get order book status for all symbols""" + return {symbol: self.get_order_book_status(symbol) for symbol in self.symbols} async def _notify_dashboard_status(self, symbol: str, status: str, message: str): """Notify dashboard of status changes"""