From 7c508ab536ebf5a7e47b75fffe4656034478f49f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 28 Jul 2025 11:12:42 +0300 Subject: [PATCH] cob --- core/cob_integration.py | 2 +- core/enhanced_cob_websocket.py | 351 +++++++++++++++++++++++---------- web/clean_dashboard.py | 6 +- 3 files changed, 251 insertions(+), 108 deletions(-) diff --git a/core/cob_integration.py b/core/cob_integration.py index b2c3b50..6daa7b3 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -163,7 +163,7 @@ class COBIntegration: if symbol: self.websocket_status[symbol] = status - logger.info(f"🔌 WebSocket status for {symbol}: {status} - {message}") + logger.info(f"WebSocket status for {symbol}: {status} - {message}") # Notify dashboard callbacks about status change status_update = { diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index 670a181..07dbf85 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -154,11 +154,11 @@ class EnhancedCOBWebSocket: # Timezone configuration if self.timezone_offset == '+08:00': - logger.info("🕐 Configured for UTC+8 timezone (Asian markets)") + logger.info("Configured for UTC+8 timezone (Asian markets)") elif self.timezone_offset: - logger.info(f"🕐 Configured for {self.timezone_offset} timezone") + logger.info(f"Configured for {self.timezone_offset} timezone") else: - logger.info("🕐 Configured for UTC timezone (default)") + logger.info("Configured for UTC timezone (default)") logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: @@ -222,79 +222,39 @@ class EnhancedCOBWebSocket: logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): - """Initialize REST API session for fallback and snapshots - Windows compatible""" + """Initialize REST API session - Windows uses requests-only mode""" import platform + # On Windows, completely skip aiohttp to avoid aiodns issues + is_windows = platform.system().lower() == 'windows' + + if is_windows: + logger.info("Windows detected - using WebSocket-only mode (skipping REST API due to rate limits)") + self.rest_session = None # Force requests fallback + return + + # Non-Windows: try aiohttp try: - # Detect Windows and use specific configuration - is_windows = platform.system().lower() == 'windows' + connector = aiohttp.TCPConnector( + limit=100, + limit_per_host=20, + enable_cleanup_closed=True, + use_dns_cache=True + ) + + timeout = aiohttp.ClientTimeout(total=10, connect=5) + + self.rest_session = aiohttp.ClientSession( + timeout=timeout, + connector=connector, + headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} + ) + logger.info("REST API session initialized (Unix/Linux)") - if is_windows: - # Windows-specific configuration to avoid aiodns issues - import asyncio - - # Check if we're using ProactorEventLoop (Windows default) - loop = asyncio.get_event_loop() - loop_type = type(loop).__name__ - logger.debug(f"Event loop type: {loop_type}") - - # Use basic connector without DNS caching for Windows - connector = aiohttp.TCPConnector( - limit=50, - limit_per_host=10, - enable_cleanup_closed=True, - use_dns_cache=False, # Critical: disable DNS cache - resolver=None, # Use default resolver, not aiodns - family=0, # Use default address family - ssl=False # Disable SSL verification for simplicity - ) - - timeout = aiohttp.ClientTimeout(total=15, connect=10) - - self.rest_session = aiohttp.ClientSession( - timeout=timeout, - connector=connector, - headers={ - 'User-Agent': 'Enhanced-COB-WebSocket/1.0', - 'Accept': 'application/json' - } - ) - logger.info("✅ REST API session initialized (Windows ProactorEventLoop compatible)") - - else: - # Unix/Linux configuration (can use more advanced features) - connector = aiohttp.TCPConnector( - limit=100, - limit_per_host=20, - enable_cleanup_closed=True, - use_dns_cache=True - ) - - timeout = aiohttp.ClientTimeout(total=10, connect=5) - - self.rest_session = aiohttp.ClientSession( - timeout=timeout, - connector=connector, - headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'} - ) - logger.info("✅ REST API session initialized (Unix/Linux)") - except Exception as e: - logger.warning(f"⚠️ Failed to initialize REST session: {e}") - - # Fallback: Ultra-minimal configuration - try: - # Most basic configuration possible - self.rest_session = aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=20), - headers={'User-Agent': 'COB-WebSocket/1.0'} - ) - logger.info("✅ REST API session initialized with ultra-minimal config") - - except Exception as e2: - logger.error(f"❌ Failed to initialize any REST session: {e2}") - logger.info("🔄 Continuing without REST session - WebSocket-only mode") - self.rest_session = None + logger.warning(f"Failed to initialize aiohttp session: {e}") + logger.info("Falling back to requests-only mode") + self.rest_session = None async def _get_order_book_snapshot(self, symbol: str): """Get initial order book snapshot from REST API @@ -308,7 +268,7 @@ class EnhancedCOBWebSocket: await self._init_rest_session() if not self.rest_session: - logger.warning(f"⚠️ Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only") + logger.warning(f"Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only") return # Convert symbol format for Binance API @@ -317,7 +277,7 @@ class EnhancedCOBWebSocket: # Get order book snapshot with maximum depth url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000" - logger.debug(f"🔍 Getting order book snapshot for {symbol} from {url}") + logger.debug(f"Getting order book snapshot for {symbol} from {url}") async with self.rest_session.get(url) as response: if response.status == 200: @@ -325,7 +285,7 @@ class EnhancedCOBWebSocket: # Validate response structure if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data: - logger.error(f"❌ Invalid order book snapshot response for {symbol}: missing bids/asks") + logger.error(f"Invalid order book snapshot response for {symbol}: missing bids/asks") return # Initialize order book state for proper WebSocket synchronization @@ -344,7 +304,7 @@ class EnhancedCOBWebSocket: # Mark as initialized self.order_book_initialized[symbol] = True - logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") + logger.info(f"Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") # Create initial COB data from snapshot bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0] @@ -401,21 +361,21 @@ class EnhancedCOBWebSocket: except Exception as e: logger.error(f"❌ Error in COB callback: {e}") - logger.debug(f"📊 Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps") + logger.debug(f"Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps") else: - logger.warning(f"⚠️ No valid bid/ask data in snapshot for {symbol}") + logger.warning(f"No valid bid/ask data in snapshot for {symbol}") elif response.status == 429: - logger.warning(f"⚠️ Rate limited getting snapshot for {symbol}, will continue with WebSocket only") + logger.warning(f"Rate limited getting snapshot for {symbol}, will continue with WebSocket only") else: - logger.error(f"❌ Failed to get order book snapshot for {symbol}: HTTP {response.status}") + logger.error(f"Failed to get order book snapshot for {symbol}: HTTP {response.status}") response_text = await response.text() logger.debug(f"Response: {response_text}") except asyncio.TimeoutError: - logger.warning(f"⚠️ Timeout getting order book snapshot for {symbol}, will continue with WebSocket only") + logger.warning(f"Timeout getting order book snapshot for {symbol}, will continue with WebSocket only") except Exception as e: - logger.warning(f"⚠️ Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only") + logger.warning(f"Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only") logger.debug(f"Snapshot error details: {e}") # Don't fail the entire connection due to snapshot issues @@ -796,7 +756,24 @@ class EnhancedCOBWebSocket: return # Order book is initialized, apply update directly - await self._apply_depth_update(symbol, data) + # In WebSocket-only mode (no REST snapshot), be more lenient with update IDs + last_update_id = self.last_update_ids.get(symbol, 0) + + if last_update_id == 0: + # WebSocket-only mode: accept any update as starting point + logger.debug(f"WebSocket-only mode for {symbol}: accepting update as starting point (U={first_update_id}, u={final_update_id})") + await self._apply_depth_update(symbol, data) + elif final_update_id <= last_update_id: + # Event is older than our current state, ignore + logger.debug(f"Ignoring old update for {symbol}: {final_update_id} <= {last_update_id}") + return + elif first_update_id > last_update_id + 1: + # Gap detected - in WebSocket-only mode, just continue (less strict) + logger.warning(f"Gap detected for {symbol}: {first_update_id} > {last_update_id + 1}, continuing anyway (WebSocket-only mode)") + await self._apply_depth_update(symbol, data) + else: + # Normal update + await self._apply_depth_update(symbol, data) except Exception as e: logger.error(f"Error handling depth update for {symbol}: {e}") @@ -804,6 +781,13 @@ class EnhancedCOBWebSocket: async def _initialize_order_book_with_buffering(self, symbol: str): """Initialize order book following Binance recommendations with event buffering""" try: + # On Windows, skip REST API and go directly to simplified mode to avoid rate limits + import platform + if platform.system().lower() == 'windows': + logger.info(f"Windows detected - using WebSocket-only mode for {symbol} (avoiding REST API rate limits)") + await self._init_simplified_mode(symbol) + return + max_attempts = 3 attempt = 0 @@ -839,38 +823,185 @@ class EnhancedCOBWebSocket: self.order_book_initialized[symbol] = True self.snapshot_in_progress[symbol] = False - logger.info(f"✅ Order book initialized for {symbol} with {len(self.event_buffers[symbol])} buffered events") + logger.info(f"Order book initialized for {symbol} with {len(self.event_buffers[symbol])} buffered events") return - logger.error(f"❌ Failed to initialize order book for {symbol} after {max_attempts} attempts") - self.snapshot_in_progress[symbol] = False + logger.error(f"Failed to initialize order book for {symbol} after {max_attempts} attempts") + logger.info(f"Switching to simplified mode for {symbol} (no order book sync)") + + # Use simplified mode that doesn't maintain order book + await self._init_simplified_mode(symbol) except Exception as e: logger.error(f"Error initializing order book with buffering for {symbol}: {e}") self.snapshot_in_progress[symbol] = False - async def _get_order_book_snapshot_data(self, symbol: str) -> Optional[Dict]: - """Get order book snapshot data from REST API""" + async def _init_simplified_mode(self, symbol: str): + """Initialize simplified mode that processes WebSocket data without order book sync""" try: + # Mark as using simplified mode + if not hasattr(self, 'simplified_mode'): + self.simplified_mode = {} + self.simplified_mode[symbol] = True + + # Clear any existing state + if symbol in self.event_buffers: + self.event_buffers[symbol] = [] + if symbol in self.order_books: + del self.order_books[symbol] + + # Mark as initialized + self.order_book_initialized[symbol] = True + self.snapshot_in_progress[symbol] = False + + logger.info(f"Simplified mode initialized for {symbol} - will process raw WebSocket data") + + except Exception as e: + logger.error(f"Error initializing simplified mode for {symbol}: {e}") + + async def _process_simplified_depth_data(self, symbol: str, data: Dict): + """Process depth data in simplified mode without order book synchronization""" + try: + # Extract bids and asks directly from the update + bids_data = data.get('b', []) + asks_data = data.get('a', []) + + if not bids_data and not asks_data: + return + + # Convert to simple format + bids = [] + asks = [] + + for bid in bids_data: + if len(bid) >= 2: + price = float(bid[0]) + size = float(bid[1]) + if size > 0: # Only include non-zero quantities + bids.append({'price': price, 'size': size}) + + for ask in asks_data: + if len(ask) >= 2: + price = float(ask[0]) + size = float(ask[1]) + if size > 0: # Only include non-zero quantities + asks.append({'price': price, 'size': size}) + + if not bids and not asks: + return + + # Sort for best bid/ask calculation + if bids: + bids.sort(key=lambda x: x['price'], reverse=True) + if asks: + asks.sort(key=lambda x: x['price']) + + # Create simplified COB data + cob_data = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': bids, + 'asks': asks, + 'source': 'simplified_depth_stream', + 'exchange': 'binance' + } + + # Calculate basic stats if we have data + if bids and asks: + best_bid = bids[0]['price'] + best_ask = asks[0]['price'] + mid_price = (best_bid + best_ask) / 2 + spread = best_ask - best_bid + spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0 + + cob_data['stats'] = { + 'best_bid': best_bid, + 'best_ask': best_ask, + 'mid_price': mid_price, + 'spread': spread, + 'spread_bps': spread_bps, + 'bid_levels': len(bids), + 'ask_levels': len(asks), + 'timestamp': datetime.now().isoformat(), + 'mode': 'simplified' + } + + # Update cache + self.latest_cob_data[symbol] = cob_data + + # Notify callbacks + for callback in self.cob_callbacks: + try: + await callback(symbol, cob_data) + except Exception as e: + logger.error(f"Error in COB callback: {e}") + + logger.debug(f"{symbol}: Simplified depth - {len(bids)} bids, {len(asks)} asks") + + except Exception as e: + logger.error(f"Error processing simplified depth data for {symbol}: {e}") + + async def _get_order_book_snapshot_data(self, symbol: str) -> Optional[Dict]: + """Get order book snapshot data from REST API with Windows fallback""" + try: + # Try aiohttp first if not self.rest_session: await self._init_rest_session() - if not self.rest_session: + if self.rest_session: + # Convert symbol format for Binance API + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000" + + async with self.rest_session.get(url) as response: + if response.status == 200: + return await response.json() + else: + logger.error(f"Failed to get snapshot for {symbol}: HTTP {response.status}") + return None + else: + # Fallback to synchronous requests (Windows compatible) + logger.info(f"Using synchronous HTTP fallback for {symbol} snapshot") + return await self._get_snapshot_with_requests(symbol) + + except Exception as e: + logger.error(f"Error getting snapshot data for {symbol}: {e}") + # Try synchronous fallback + try: + return await self._get_snapshot_with_requests(symbol) + except Exception as e2: + logger.error(f"Fallback also failed for {symbol}: {e2}") return None + + async def _get_snapshot_with_requests(self, symbol: str) -> Optional[Dict]: + """Fallback method using synchronous requests library (Windows compatible)""" + try: + import requests + import asyncio # Convert symbol format for Binance API binance_symbol = symbol.replace('/', '') url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000" - async with self.rest_session.get(url) as response: - if response.status == 200: - return await response.json() - else: - logger.error(f"Failed to get snapshot for {symbol}: HTTP {response.status}") - return None - + # Run in thread pool to avoid blocking + loop = asyncio.get_event_loop() + response = await loop.run_in_executor( + None, + lambda: requests.get(url, timeout=10, headers={'User-Agent': 'COB-WebSocket/1.0'}) + ) + + if response.status_code == 200: + logger.info(f"Got snapshot for {symbol} using requests fallback") + return response.json() + else: + logger.error(f"Requests fallback failed for {symbol}: HTTP {response.status_code}") + return None + + except ImportError: + logger.error("requests library not available for fallback") + return None except Exception as e: - logger.error(f"Error getting snapshot data for {symbol}: {e}") + logger.error(f"Error in requests fallback for {symbol}: {e}") return None async def _setup_order_book_from_snapshot(self, symbol: str, snapshot_data: Dict): @@ -917,13 +1048,13 @@ class EnhancedCOBWebSocket: first_final_u = first_event.get('u', 0) if not (first_u <= snapshot_last_update_id <= first_final_u): - logger.error(f"❌ Synchronization error for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} not in range [{first_u}, {first_final_u}]") + logger.error(f"Synchronization error for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} not in range [{first_u}, {first_final_u}]") # Reset and try again self.order_book_initialized[symbol] = False self.event_buffers[symbol] = [] - return - - logger.debug(f"✅ First buffered event valid for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} in range [{first_u}, {first_final_u}]") + return + + logger.debug(f"First buffered event valid for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} in range [{first_u}, {first_final_u}]") # Step 6-7: Apply all valid buffered events for event in valid_events: @@ -1126,7 +1257,13 @@ class EnhancedCOBWebSocket: async def _handle_depth_stream(self, symbol: str, data: Dict): """Handle depth stream data (order book updates)""" try: - # Check if this is a depth update event + # Check if we're in simplified mode + if hasattr(self, 'simplified_mode') and self.simplified_mode.get(symbol, False): + # Simplified mode: process raw data without order book sync + await self._process_simplified_depth_data(symbol, data) + return + + # Normal mode: try to maintain order book if data.get('e') == 'depthUpdate': await self._handle_depth_update(symbol, data) else: @@ -1134,6 +1271,12 @@ class EnhancedCOBWebSocket: await self._process_websocket_message(symbol, data) except Exception as e: logger.error(f"Error handling depth stream for {symbol}: {e}") + # Fall back to simplified mode on repeated errors + if not hasattr(self, 'simplified_mode'): + self.simplified_mode = {} + if not self.simplified_mode.get(symbol, False): + logger.info(f"Switching {symbol} to simplified mode due to errors") + await self._init_simplified_mode(symbol) async def _handle_kline_stream(self, symbol: str, data: Dict): """Handle 1-second kline/candlestick data with timezone support""" @@ -1397,9 +1540,9 @@ class EnhancedCOBWebSocket: try: await callback(symbol, cob_data) except Exception as e: - logger.error(f"❌ Error in COB callback: {e}") + logger.error(f"Error in COB callback: {e}") - logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") + logger.debug(f"Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") else: logger.warning(f"REST API error for {symbol}: HTTP {response.status}") @@ -1427,7 +1570,7 @@ class EnhancedCOBWebSocket: # Log connection status and order book sync status if status.connected: - ob_status = "✅ Synced" if self.order_book_initialized.get(symbol, False) else "🔄 Syncing" + ob_status = "Synced" if self.order_book_initialized.get(symbol, False) else "Syncing" buffered_count = len(self.event_buffers.get(symbol, [])) logger.debug(f"{symbol}: Connected, {status.messages_received} messages, OB: {ob_status}, Buffered: {buffered_count}") else: diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 5b84f51..181ea7c 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -369,7 +369,7 @@ class CleanTradingDashboard: else: self.cob_cache[symbol]['update_rate'] = 0.0 - logger.info(f"📊 Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})") + logger.debug(f"Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})") # Continue with existing logic # Update latest COB data cache @@ -6634,10 +6634,10 @@ class CleanTradingDashboard: self.cob_cache[symbol]['websocket_status'] = 'connected' self.cob_cache[symbol]['source'] = 'orchestrator' - logger.info(f"📊 Updated COB cache for {symbol} from orchestrator: {self.cob_cache[symbol]['websocket_status']} (updates: {self.cob_cache[symbol]['updates_count']})") + logger.debug(f"Updated COB cache for {symbol} from orchestrator: {self.cob_cache[symbol]['websocket_status']} (updates: {self.cob_cache[symbol]['updates_count']})") except Exception as e: - logger.error(f"❌ Error updating COB cache from orchestrator for {symbol}: {e}") + logger.error(f"Error updating COB cache from orchestrator for {symbol}: {e}") def _on_enhanced_cob_update(self, symbol: str, data: Dict): """Handle enhanced COB updates with WebSocket status"""