From 0838a828ce5673cd9669ed5450e6870ae904e381 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 20 Jul 2025 21:23:27 +0300 Subject: [PATCH] refactoring cob ws --- web/clean_dashboard.py | 166 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index a3ddac6..d450756 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -224,7 +224,7 @@ class CleanTradingDashboard: logger.debug("Universal Data Adapter ready for orchestrator data access") # Initialize COB integration with enhanced WebSocket - self._initialize_enhanced_cob_integration() + self._initialize_cob_integration() # Use the working COB integration method # Start signal generation loop to ensure continuous trading signals self._start_signal_generation_loop() @@ -5379,6 +5379,170 @@ class CleanTradingDashboard: logger.warning("Falling back to direct data provider COB collection") self._start_simple_cob_collection() + def _initialize_enhanced_cob_integration(self): + """Initialize enhanced COB integration with WebSocket status monitoring""" + try: + if not COB_INTEGRATION_AVAILABLE: + logger.warning("⚠️ COB integration not available - WebSocket status will show as unavailable") + return + + logger.info("🚀 Initializing Enhanced COB Integration with WebSocket monitoring") + + # Initialize COB integration + self.cob_integration = COBIntegration( + data_provider=self.data_provider, + symbols=['ETH/USDT', 'BTC/USDT'] + ) + + # Add dashboard callback for COB data + self.cob_integration.add_dashboard_callback(self._on_enhanced_cob_update) + + # Start COB integration in background thread + def start_cob_integration(): + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.cob_integration.start()) + loop.run_forever() + except Exception as e: + logger.error(f"❌ Error in COB integration thread: {e}") + + cob_thread = threading.Thread(target=start_cob_integration, daemon=True) + cob_thread.start() + + logger.info("✅ Enhanced COB Integration started with WebSocket monitoring") + + except Exception as e: + logger.error(f"❌ Error initializing Enhanced COB Integration: {e}") + + def _on_enhanced_cob_update(self, symbol: str, data: Dict): + """Handle enhanced COB updates with WebSocket status""" + try: + # Update COB data cache + self.latest_cob_data[symbol] = data + + # Extract WebSocket status if available + if isinstance(data, dict) and 'type' in data: + if data['type'] == 'websocket_status': + status_data = data.get('data', {}) + status = status_data.get('status', 'unknown') + message = status_data.get('message', '') + + # Update COB cache with status + if symbol not in self.cob_cache: + self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + + self.cob_cache[symbol]['websocket_status'] = status + self.cob_cache[symbol]['websocket_message'] = message + self.cob_cache[symbol]['last_status_update'] = time.time() + + logger.info(f"🔌 COB WebSocket status for {symbol}: {status} - {message}") + + elif data['type'] == 'cob_update': + # Regular COB data update + cob_data = data.get('data', {}) + stats = cob_data.get('stats', {}) + + # Update cache + self.cob_cache[symbol]['data'] = cob_data + self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['updates_count'] += 1 + + # Update WebSocket status from stats + websocket_status = stats.get('websocket_status', 'unknown') + source = stats.get('source', 'unknown') + + self.cob_cache[symbol]['websocket_status'] = websocket_status + self.cob_cache[symbol]['source'] = source + + logger.debug(f"📊 Enhanced COB update for {symbol}: {websocket_status} via {source}") + + except Exception as e: + logger.error(f"❌ Error handling enhanced COB update for {symbol}: {e}") + + def get_cob_websocket_status(self) -> Dict[str, Any]: + """Get COB WebSocket status for dashboard display""" + try: + status_summary = { + 'overall_status': 'unknown', + 'symbols': {}, + 'last_update': None, + 'warning_message': None + } + + if not COB_INTEGRATION_AVAILABLE: + status_summary['overall_status'] = 'unavailable' + status_summary['warning_message'] = 'COB integration not available' + return status_summary + + connected_count = 0 + fallback_count = 0 + error_count = 0 + + for symbol in ['ETH/USDT', 'BTC/USDT']: + symbol_status = { + 'status': 'unknown', + 'message': 'No data', + 'last_update': None, + 'source': 'unknown' + } + + if symbol in self.cob_cache: + cache_data = self.cob_cache[symbol] + ws_status = cache_data.get('websocket_status', 'unknown') + source = cache_data.get('source', 'unknown') + last_update = cache_data.get('last_update', 0) + + symbol_status['status'] = ws_status + symbol_status['source'] = source + symbol_status['last_update'] = datetime.fromtimestamp(last_update).isoformat() if last_update > 0 else None + + # Determine status category + if ws_status == 'connected': + connected_count += 1 + symbol_status['message'] = 'WebSocket connected' + elif ws_status == 'fallback' or source == 'rest_fallback': + fallback_count += 1 + symbol_status['message'] = 'Using REST API fallback' + else: + error_count += 1 + symbol_status['message'] = cache_data.get('websocket_message', 'Connection error') + + status_summary['symbols'][symbol] = symbol_status + + # Determine overall status + total_symbols = len(['ETH/USDT', 'BTC/USDT']) + + if connected_count == total_symbols: + status_summary['overall_status'] = 'all_connected' + status_summary['warning_message'] = None + elif connected_count + fallback_count == total_symbols: + status_summary['overall_status'] = 'partial_fallback' + status_summary['warning_message'] = f'⚠️ {fallback_count} symbol(s) using REST fallback - WebSocket connection failed' + elif fallback_count > 0: + status_summary['overall_status'] = 'degraded' + status_summary['warning_message'] = f'⚠️ COB WebSocket degraded - {error_count} error(s), {fallback_count} fallback(s)' + else: + status_summary['overall_status'] = 'error' + status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down' + + # Set last update time + last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()] + if last_updates and max(last_updates) > 0: + status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat() + + return status_summary + + except Exception as e: + logger.error(f"❌ Error getting COB WebSocket status: {e}") + return { + 'overall_status': 'error', + 'warning_message': f'Error getting status: {e}', + 'symbols': {}, + 'last_update': None + } + def _start_simple_cob_collection(self): """Start COB data collection using the centralized data provider""" try: