From f5416c4f1ed66ec4d27d94fecf5223ea3c547add Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 28 Jul 2025 09:46:49 +0300 Subject: [PATCH] cob update fix --- core/enhanced_cob_websocket.py | 42 +++++++++++++++--- core/orchestrator.py | 7 ++- core/realtime_rl_cob_trader.py | 2 +- web/clean_dashboard.py | 78 ++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 8 deletions(-) diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index c6042c0..b00ccf3 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -60,6 +60,8 @@ class COBWebSocketStatus: class EnhancedCOBWebSocket: """Enhanced COB WebSocket with robust error handling and fallback""" + _instances = {} # Track instances by symbols to prevent duplicates + def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None): """ Initialize Enhanced COB WebSocket @@ -71,6 +73,18 @@ class EnhancedCOBWebSocket: self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.dashboard_callback = dashboard_callback + # Check for existing instances to prevent duplicate connections + symbols_key = tuple(sorted(self.symbols)) + if symbols_key in EnhancedCOBWebSocket._instances: + logger.warning(f"EnhancedCOBWebSocket already exists for symbols {self.symbols} - reusing existing instance") + existing = EnhancedCOBWebSocket._instances[symbols_key] + # Copy existing instance data + self.__dict__.update(existing.__dict__) + return + + # Register this instance + EnhancedCOBWebSocket._instances[symbols_key] = self + # Connection status tracking self.status: Dict[str, COBWebSocketStatus] = { symbol: COBWebSocketStatus() for symbol in self.symbols @@ -83,6 +97,10 @@ class EnhancedCOBWebSocket: # Latest data cache self.latest_cob_data: Dict[str, Dict] = {} + # Rate limiting for message processing + self.last_message_time: Dict[str, datetime] = {} + self.min_message_interval = 0.1 # Minimum 100ms between messages per symbol + # WebSocket connections self.websocket_tasks: Dict[str, asyncio.Task] = {} @@ -93,7 +111,7 @@ class EnhancedCOBWebSocket: # Configuration self.max_depth = 1000 # Maximum depth for order book - self.update_speed = '100ms' # Binance update speed + self.update_speed = '1000ms' # Binance update speed - reduced for stability logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") if not WEBSOCKETS_AVAILABLE: @@ -149,6 +167,11 @@ class EnhancedCOBWebSocket: if self.rest_session: await self.rest_session.close() + # Remove from instances registry + symbols_key = tuple(sorted(self.symbols)) + if symbols_key in EnhancedCOBWebSocket._instances: + del EnhancedCOBWebSocket._instances[symbols_key] + logger.info("Enhanced COB WebSocket system stopped") async def _init_rest_session(self): @@ -321,7 +344,7 @@ class EnhancedCOBWebSocket: async def _websocket_connection_loop(self, symbol: str): """Main WebSocket connection loop with reconnection logic - Uses depth@100ms for fastest updates with maximum depth. + Uses depth@1000ms for stable updates with maximum depth. """ status = self.status[symbol] @@ -330,9 +353,9 @@ class EnhancedCOBWebSocket: logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") status.connection_attempts += 1 - # Create WebSocket URL with maximum depth - use depth@100ms for fastest updates + # Create WebSocket URL with reasonable update rate - use depth@1000ms for stable updates ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT - ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@100ms" + ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@1000ms" logger.info(f"Connecting to: {ws_url}") @@ -352,10 +375,19 @@ class EnhancedCOBWebSocket: # Message receiving loop async for message in websocket: try: + # Rate limiting: skip messages that come too frequently + now = datetime.now() + if symbol in self.last_message_time: + time_since_last = (now - self.last_message_time[symbol]).total_seconds() + if time_since_last < self.min_message_interval: + continue # Skip this message + + self.last_message_time[symbol] = now + data = json.loads(message) await self._process_websocket_message(symbol, data) - status.last_message_time = datetime.now() + status.last_message_time = now status.messages_received += 1 except json.JSONDecodeError as e: diff --git a/core/orchestrator.py b/core/orchestrator.py index e109168..cc7b8a9 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -1231,8 +1231,11 @@ class TradingOrchestrator: logger.debug(f"Invalidated data provider cache for {symbol} due to COB update") # Update dashboard - if self.dashboard and hasattr(self.dashboard, 'update_cob_data'): - self.dashboard.update_cob_data(symbol, cob_data) + if self.dashboard and hasattr(self.dashboard, 'update_cob_data_from_orchestrator'): + self.dashboard.update_cob_data_from_orchestrator(symbol, cob_data) + logger.debug(f"📊 Sent COB data for {symbol} to dashboard") + else: + logger.debug(f"📊 No dashboard connected to receive COB data for {symbol}") except Exception as e: logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}") diff --git a/core/realtime_rl_cob_trader.py b/core/realtime_rl_cob_trader.py index 31f3af7..64cc97e 100644 --- a/core/realtime_rl_cob_trader.py +++ b/core/realtime_rl_cob_trader.py @@ -597,7 +597,7 @@ class RealtimeRLCOBTrader: for symbol in self.symbols: await self._process_signals(symbol) - await asyncio.sleep(0.1) # Process signals every 100ms + await asyncio.sleep(0.5) # Process signals every 500ms to reduce load except Exception as e: logger.error(f"Error in signal processing loop: {e}") diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 3f59bda..6255b06 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -141,6 +141,31 @@ class CleanTradingDashboard: self.orchestrator.set_trading_executor(self.trading_executor) logger.info("Trading executor connected to orchestrator for signal execution") + # Connect dashboard to orchestrator for COB data updates + if hasattr(self.orchestrator, 'set_dashboard'): + self.orchestrator.set_dashboard(self) + logger.info("✅ Dashboard connected to orchestrator for COB data updates") + + # Start orchestrator's real-time processing to ensure COB data flows + if hasattr(self.orchestrator, 'start_continuous_trading'): + try: + # Start in background thread to avoid blocking dashboard startup + import threading + def start_orchestrator_trading(): + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.orchestrator.start_continuous_trading()) + except Exception as e: + logger.error(f"Error starting orchestrator trading: {e}") + + trading_thread = threading.Thread(target=start_orchestrator_trading, daemon=True) + trading_thread.start() + logger.info("✅ Started orchestrator real-time processing for COB data") + except Exception as e: + logger.error(f"Failed to start orchestrator trading: {e}") + # Initialize enhanced training system for predictions self.training_system = None self._initialize_enhanced_training_system() @@ -292,6 +317,23 @@ class CleanTradingDashboard: def _on_cob_data_update(self, symbol: str, cob_data: dict): """Handle COB data updates from data provider""" try: + # Also update the COB cache for status display + if not hasattr(self, 'cob_cache'): + self.cob_cache = {} + + if symbol not in self.cob_cache: + self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + + # Update cache + self.cob_cache[symbol]['data'] = cob_data + self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['updates_count'] += 1 + self.cob_cache[symbol]['websocket_status'] = 'connected' + self.cob_cache[symbol]['source'] = 'data_provider' + + logger.info(f"📊 Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})") + + # Continue with existing logic # Update latest COB data cache if not hasattr(self, 'latest_cob_data'): self.latest_cob_data = {} @@ -6498,6 +6540,42 @@ class CleanTradingDashboard: except Exception as e: logger.error(f"❌ Error initializing Enhanced COB Integration: {e}") + def update_cob_data_from_orchestrator(self, symbol: str, cob_data: Dict): + """Update COB cache from orchestrator data - called by orchestrator""" + try: + # Initialize cache if needed + if not hasattr(self, 'cob_cache'): + self.cob_cache = {} + + if symbol not in self.cob_cache: + self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + + # Update cache with orchestrator data + self.cob_cache[symbol]['data'] = cob_data + self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['updates_count'] += 1 + + # Set WebSocket status based on data source + if isinstance(cob_data, dict) and 'stats' in cob_data: + source = cob_data['stats'].get('source', 'unknown') + if 'websocket' in source.lower(): + self.cob_cache[symbol]['websocket_status'] = 'connected' + self.cob_cache[symbol]['source'] = source + elif 'rest' in source.lower() or 'fallback' in source.lower(): + self.cob_cache[symbol]['websocket_status'] = 'fallback' + self.cob_cache[symbol]['source'] = source + else: + self.cob_cache[symbol]['websocket_status'] = 'unknown' + self.cob_cache[symbol]['source'] = source + else: + self.cob_cache[symbol]['websocket_status'] = 'connected' + self.cob_cache[symbol]['source'] = 'orchestrator' + + logger.info(f"📊 Updated COB cache for {symbol} from orchestrator: {self.cob_cache[symbol]['websocket_status']} (updates: {self.cob_cache[symbol]['updates_count']})") + + except Exception as e: + logger.error(f"❌ Error updating COB cache from orchestrator for {symbol}: {e}") + def _on_enhanced_cob_update(self, symbol: str, data: Dict): """Handle enhanced COB updates with WebSocket status""" try: