From 330f0de053f4efa40cca226b55beb0ff802ae4fe Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 20 Jul 2025 20:38:42 +0300 Subject: [PATCH] COB WS fix --- core/cob_integration.py | 186 +++++++++++-- core/enhanced_cob_websocket.py | 488 +++++++++++++++++++++++++++++++++ core/orchestrator.py | 279 ++++++++++++++++++- test_enhanced_cob_websocket.py | 148 ++++++++++ web/clean_dashboard.py | 188 ++++++++++++- web/layout_manager.py | 1 + 6 files changed, 1260 insertions(+), 30 deletions(-) create mode 100644 core/enhanced_cob_websocket.py create mode 100644 test_enhanced_cob_websocket.py diff --git a/core/cob_integration.py b/core/cob_integration.py index 49dc8fe..b7960dd 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -26,6 +26,7 @@ from collections import defaultdict from .multi_exchange_cob_provider import MultiExchangeCOBProvider, COBSnapshot, ConsolidatedOrderBookLevel from .data_provider import DataProvider, MarketTick +from .enhanced_cob_websocket import EnhancedCOBWebSocket logger = logging.getLogger(__name__) @@ -48,6 +49,9 @@ class COBIntegration: # Initialize COB provider to None, will be set in start() self.cob_provider = None + # Enhanced WebSocket integration + self.enhanced_websocket: Optional[EnhancedCOBWebSocket] = None + # CNN/DQN integration self.cnn_callbacks: List[Callable] = [] self.dqn_callbacks: List[Callable] = [] @@ -62,43 +66,187 @@ class COBIntegration: self.cob_feature_cache: Dict[str, np.ndarray] = {} self.last_cob_features_update: Dict[str, datetime] = {} + # WebSocket status for dashboard + self.websocket_status: Dict[str, str] = {symbol: 'disconnected' for symbol in self.symbols} + # Initialize signal tracking for symbol in self.symbols: self.cob_signals[symbol] = [] self.liquidity_alerts[symbol] = [] self.arbitrage_opportunities[symbol] = [] - logger.info("COB Integration initialized (provider will be started in async)") + logger.info("COB Integration initialized with Enhanced WebSocket support") logger.info(f"Symbols: {self.symbols}") async def start(self): - """Start COB integration""" - logger.info("Starting COB Integration") + """Start COB integration with Enhanced WebSocket""" + logger.info("šŸš€ Starting COB Integration with Enhanced WebSocket") - # Initialize COB provider here, within the async context - self.cob_provider = MultiExchangeCOBProvider( - symbols=self.symbols, - bucket_size_bps=1.0 # 1 basis point granularity - ) - - # Register callbacks - self.cob_provider.subscribe_to_cob_updates(self._on_cob_update) - self.cob_provider.subscribe_to_bucket_updates(self._on_bucket_update) - - # Start COB provider streaming + # Initialize Enhanced WebSocket first try: - logger.info("Starting COB provider streaming...") - await self.cob_provider.start_streaming() + self.enhanced_websocket = EnhancedCOBWebSocket( + symbols=self.symbols, + dashboard_callback=self._on_websocket_status_update + ) + + # Add COB data callback + self.enhanced_websocket.add_cob_callback(self._on_enhanced_cob_update) + + # Start enhanced WebSocket + await self.enhanced_websocket.start() + logger.info("āœ… Enhanced WebSocket started successfully") + except Exception as e: - logger.error(f"Error starting COB provider streaming: {e}") - # Start a background task instead + logger.error(f"āŒ Error starting Enhanced WebSocket: {e}") + + # Initialize COB provider as fallback + try: + self.cob_provider = MultiExchangeCOBProvider( + symbols=self.symbols, + bucket_size_bps=1.0 # 1 basis point granularity + ) + + # Register callbacks + self.cob_provider.subscribe_to_cob_updates(self._on_cob_update) + self.cob_provider.subscribe_to_bucket_updates(self._on_bucket_update) + + # Start COB provider streaming as backup + logger.info("Starting COB provider as backup...") asyncio.create_task(self._start_cob_provider_background()) + + except Exception as e: + logger.error(f"āŒ Error initializing COB provider: {e}") # Start analysis threads asyncio.create_task(self._continuous_cob_analysis()) asyncio.create_task(self._continuous_signal_generation()) - logger.info("COB Integration started successfully") + logger.info("āœ… COB Integration started successfully with Enhanced WebSocket") + + async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict): + """Handle COB updates from Enhanced WebSocket""" + try: + logger.debug(f"šŸ“Š Enhanced WebSocket COB update for {symbol}") + + # Convert enhanced WebSocket data to COB format for existing callbacks + # Notify CNN callbacks + for callback in self.cnn_callbacks: + try: + callback(symbol, { + 'features': cob_data, + 'timestamp': cob_data.get('timestamp', datetime.now()), + 'type': 'enhanced_cob_features' + }) + except Exception as e: + logger.warning(f"Error in CNN callback: {e}") + + # Notify DQN callbacks + for callback in self.dqn_callbacks: + try: + callback(symbol, { + 'state': cob_data, + 'timestamp': cob_data.get('timestamp', datetime.now()), + 'type': 'enhanced_cob_state' + }) + except Exception as e: + logger.warning(f"Error in DQN callback: {e}") + + # Notify dashboard callbacks + dashboard_data = self._format_enhanced_cob_for_dashboard(symbol, cob_data) + for callback in self.dashboard_callbacks: + try: + if asyncio.iscoroutinefunction(callback): + asyncio.create_task(callback(symbol, dashboard_data)) + else: + callback(symbol, dashboard_data) + except Exception as e: + logger.warning(f"Error in dashboard callback: {e}") + + except Exception as e: + logger.error(f"Error processing Enhanced WebSocket COB update for {symbol}: {e}") + + async def _on_websocket_status_update(self, status_data: Dict): + """Handle WebSocket status updates for dashboard""" + try: + symbol = status_data.get('symbol') + status = status_data.get('status') + message = status_data.get('message', '') + + if symbol: + self.websocket_status[symbol] = status + logger.info(f"šŸ”Œ WebSocket status for {symbol}: {status} - {message}") + + # Notify dashboard callbacks about status change + status_update = { + 'type': 'websocket_status', + 'data': { + 'symbol': symbol, + 'status': status, + 'message': message, + 'timestamp': status_data.get('timestamp', datetime.now().isoformat()) + } + } + + for callback in self.dashboard_callbacks: + try: + if asyncio.iscoroutinefunction(callback): + asyncio.create_task(callback(symbol, status_update)) + else: + callback(symbol, status_update) + except Exception as e: + logger.warning(f"Error in dashboard status callback: {e}") + + except Exception as e: + logger.error(f"Error processing WebSocket status update: {e}") + + def _format_enhanced_cob_for_dashboard(self, symbol: str, cob_data: Dict) -> Dict: + """Format Enhanced WebSocket COB data for dashboard""" + try: + # Extract data from enhanced WebSocket format + bids = cob_data.get('bids', []) + asks = cob_data.get('asks', []) + stats = cob_data.get('stats', {}) + + # Format for dashboard + dashboard_data = { + 'type': 'cob_update', + 'data': { + 'bids': [{'price': bid['price'], 'volume': bid['size'] * bid['price'], 'side': 'bid'} for bid in bids[:100]], + 'asks': [{'price': ask['price'], 'volume': ask['size'] * ask['price'], 'side': 'ask'} for ask in asks[:100]], + 'svp': [], # SVP data not available from WebSocket + 'stats': { + 'symbol': symbol, + 'timestamp': cob_data.get('timestamp', datetime.now()).isoformat() if isinstance(cob_data.get('timestamp'), datetime) else cob_data.get('timestamp', datetime.now().isoformat()), + 'mid_price': stats.get('mid_price', 0), + 'spread_bps': (stats.get('spread', 0) / stats.get('mid_price', 1)) * 10000 if stats.get('mid_price', 0) > 0 else 0, + 'bid_liquidity': stats.get('bid_volume', 0) * stats.get('best_bid', 0), + 'ask_liquidity': stats.get('ask_volume', 0) * stats.get('best_ask', 0), + 'total_bid_liquidity': stats.get('bid_volume', 0) * stats.get('best_bid', 0), + 'total_ask_liquidity': stats.get('ask_volume', 0) * stats.get('best_ask', 0), + 'imbalance': (stats.get('bid_volume', 0) - stats.get('ask_volume', 0)) / (stats.get('bid_volume', 0) + stats.get('ask_volume', 0)) if (stats.get('bid_volume', 0) + stats.get('ask_volume', 0)) > 0 else 0, + 'liquidity_imbalance': (stats.get('bid_volume', 0) - stats.get('ask_volume', 0)) / (stats.get('bid_volume', 0) + stats.get('ask_volume', 0)) if (stats.get('bid_volume', 0) + stats.get('ask_volume', 0)) > 0 else 0, + 'bid_levels': len(bids), + 'ask_levels': len(asks), + 'exchanges_active': [cob_data.get('exchange', 'binance')], + 'bucket_size': 1.0, + 'websocket_status': self.websocket_status.get(symbol, 'unknown'), + 'source': cob_data.get('source', 'enhanced_websocket') + } + } + } + + return dashboard_data + + except Exception as e: + logger.error(f"Error formatting enhanced COB data for dashboard: {e}") + return { + 'type': 'error', + 'data': {'error': str(e)} + } + + def get_websocket_status(self) -> Dict[str, str]: + """Get current WebSocket status for all symbols""" + return self.websocket_status.copy() async def _start_cob_provider_background(self): """Start COB provider in background task""" diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py new file mode 100644 index 0000000..e54ba36 --- /dev/null +++ b/core/enhanced_cob_websocket.py @@ -0,0 +1,488 @@ +#!/usr/bin/env python3 +""" +Enhanced COB WebSocket Implementation + +Robust WebSocket implementation for Consolidated Order Book data with: +- Maximum allowed depth subscription +- Clear error handling and warnings +- Automatic reconnection with exponential backoff +- Fallback to REST API when WebSocket fails +- Dashboard integration with status updates + +This replaces the existing COB WebSocket implementation with a more reliable version. +""" + +import asyncio +import json +import logging +import time +import traceback +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Callable +from collections import deque, defaultdict +from dataclasses import dataclass +import aiohttp +import weakref + +try: + import websockets + from websockets.client import connect as websockets_connect + from websockets.exceptions import ConnectionClosed, WebSocketException + WEBSOCKETS_AVAILABLE = True +except ImportError: + websockets = None + websockets_connect = None + ConnectionClosed = Exception + WebSocketException = Exception + WEBSOCKETS_AVAILABLE = False + +logger = logging.getLogger(__name__) + +@dataclass +class COBWebSocketStatus: + """Status tracking for COB WebSocket connections""" + connected: bool = False + last_message_time: Optional[datetime] = None + connection_attempts: int = 0 + last_error: Optional[str] = None + reconnect_delay: float = 1.0 + max_reconnect_delay: float = 60.0 + messages_received: int = 0 + + def reset_reconnect_delay(self): + """Reset reconnect delay on successful connection""" + self.reconnect_delay = 1.0 + + def increase_reconnect_delay(self): + """Increase reconnect delay with exponential backoff""" + self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2) + +class EnhancedCOBWebSocket: + """Enhanced COB WebSocket with robust error handling and fallback""" + + def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None): + """ + Initialize Enhanced COB WebSocket + + Args: + symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT']) + dashboard_callback: Callback function for dashboard status updates + """ + self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] + self.dashboard_callback = dashboard_callback + + # Connection status tracking + self.status: Dict[str, COBWebSocketStatus] = { + symbol: COBWebSocketStatus() for symbol in self.symbols + } + + # Data callbacks + self.cob_callbacks: List[Callable] = [] + self.error_callbacks: List[Callable] = [] + + # Latest data cache + self.latest_cob_data: Dict[str, Dict] = {} + + # WebSocket connections + self.websocket_tasks: Dict[str, asyncio.Task] = {} + + # REST API fallback + self.rest_session: Optional[aiohttp.ClientSession] = None + self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols} + self.rest_tasks: Dict[str, asyncio.Task] = {} + + # Configuration + self.max_depth = 1000 # Maximum depth for order book + self.update_speed = '100ms' # Binance update speed + + logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}") + if not WEBSOCKETS_AVAILABLE: + logger.error("āš ļø WebSockets module not available - COB data will be limited to REST API") + + def add_cob_callback(self, callback: Callable): + """Add callback for COB data updates""" + self.cob_callbacks.append(callback) + + def add_error_callback(self, callback: Callable): + """Add callback for error notifications""" + self.error_callbacks.append(callback) + + async def start(self): + """Start COB WebSocket connections""" + logger.info("šŸš€ Starting Enhanced COB WebSocket system") + + # Initialize REST session for fallback + await self._init_rest_session() + + # Start WebSocket connections for each symbol + for symbol in self.symbols: + await self._start_symbol_websocket(symbol) + + # Start monitoring task + asyncio.create_task(self._monitor_connections()) + + logger.info("āœ… Enhanced COB WebSocket system started") + + async def stop(self): + """Stop all WebSocket connections""" + logger.info("šŸ›‘ Stopping Enhanced COB WebSocket system") + + # Cancel all WebSocket tasks + for symbol, task in self.websocket_tasks.items(): + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Cancel all REST tasks + for symbol, task in self.rest_tasks.items(): + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Close REST session + if self.rest_session: + await self.rest_session.close() + + logger.info("āœ… Enhanced COB WebSocket system stopped") + + async def _init_rest_session(self): + """Initialize REST API session for fallback""" + try: + timeout = aiohttp.ClientTimeout(total=10) + self.rest_session = aiohttp.ClientSession(timeout=timeout) + logger.info("āœ… REST API session initialized for fallback") + except Exception as e: + logger.error(f"āŒ Failed to initialize REST session: {e}") + + async def _start_symbol_websocket(self, symbol: str): + """Start WebSocket connection for a specific symbol""" + if not WEBSOCKETS_AVAILABLE: + logger.warning(f"āš ļø WebSockets not available for {symbol}, starting REST fallback") + await self._start_rest_fallback(symbol) + return + + # Cancel existing task if running + if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): + self.websocket_tasks[symbol].cancel() + + # Start new WebSocket task + self.websocket_tasks[symbol] = asyncio.create_task( + self._websocket_connection_loop(symbol) + ) + + logger.info(f"šŸ”Œ Started WebSocket task for {symbol}") + + async def _websocket_connection_loop(self, symbol: str): + """Main WebSocket connection loop with reconnection logic""" + status = self.status[symbol] + + while True: + try: + logger.info(f"šŸ”Œ Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})") + status.connection_attempts += 1 + + # Create WebSocket URL with maximum depth + ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT + ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@{self.update_speed}" + + logger.info(f"šŸ”— Connecting to: {ws_url}") + + async with websockets_connect(ws_url) as websocket: + # Connection successful + status.connected = True + status.last_error = None + status.reset_reconnect_delay() + + logger.info(f"āœ… WebSocket connected for {symbol}") + await self._notify_dashboard_status(symbol, "connected", "WebSocket connected") + + # Deactivate REST fallback + if self.rest_fallback_active[symbol]: + await self._stop_rest_fallback(symbol) + + # Message receiving loop + async for message in websocket: + try: + data = json.loads(message) + await self._process_websocket_message(symbol, data) + + status.last_message_time = datetime.now() + status.messages_received += 1 + + except json.JSONDecodeError as e: + logger.warning(f"āš ļø Invalid JSON from {symbol} WebSocket: {e}") + except Exception as e: + logger.error(f"āŒ Error processing WebSocket message for {symbol}: {e}") + + except ConnectionClosed as e: + status.connected = False + status.last_error = f"Connection closed: {e}" + logger.warning(f"šŸ”Œ WebSocket connection closed for {symbol}: {e}") + + except WebSocketException as e: + status.connected = False + status.last_error = f"WebSocket error: {e}" + logger.error(f"āŒ WebSocket error for {symbol}: {e}") + + except Exception as e: + status.connected = False + status.last_error = f"Unexpected error: {e}" + logger.error(f"āŒ Unexpected WebSocket error for {symbol}: {e}") + logger.error(traceback.format_exc()) + + # Connection failed or closed - start REST fallback + await self._notify_dashboard_status(symbol, "disconnected", status.last_error) + await self._start_rest_fallback(symbol) + + # Wait before reconnecting + status.increase_reconnect_delay() + logger.info(f"ā³ Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}") + await asyncio.sleep(status.reconnect_delay) + + async def _process_websocket_message(self, symbol: str, data: Dict): + """Process WebSocket message and convert to COB format""" + try: + # Binance depth stream format + if 'b' in data and 'a' in data: # bids and asks + cob_data = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['b']], + 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['a']], + 'source': 'websocket', + 'exchange': 'binance' + } + + # Calculate stats + if cob_data['bids'] and cob_data['asks']: + best_bid = max(cob_data['bids'], key=lambda x: x['price']) + best_ask = min(cob_data['asks'], key=lambda x: x['price']) + + cob_data['stats'] = { + 'best_bid': best_bid['price'], + 'best_ask': best_ask['price'], + 'spread': best_ask['price'] - best_bid['price'], + 'mid_price': (best_bid['price'] + best_ask['price']) / 2, + 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), + 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) + } + + # Update cache + self.latest_cob_data[symbol] = cob_data + + # Notify callbacks + for callback in self.cob_callbacks: + try: + await callback(symbol, cob_data) + except Exception as e: + logger.error(f"āŒ Error in COB callback: {e}") + + logger.debug(f"šŸ“Š Processed WebSocket COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") + + except Exception as e: + logger.error(f"āŒ Error processing WebSocket message for {symbol}: {e}") + + async def _start_rest_fallback(self, symbol: str): + """Start REST API fallback for a symbol""" + if self.rest_fallback_active[symbol]: + return # Already active + + self.rest_fallback_active[symbol] = True + + # Cancel existing REST task + if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): + self.rest_tasks[symbol].cancel() + + # Start new REST task + self.rest_tasks[symbol] = asyncio.create_task( + self._rest_fallback_loop(symbol) + ) + + logger.warning(f"āš ļø Started REST API fallback for {symbol}") + await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback") + + async def _stop_rest_fallback(self, symbol: str): + """Stop REST API fallback for a symbol""" + if not self.rest_fallback_active[symbol]: + return + + self.rest_fallback_active[symbol] = False + + if symbol in self.rest_tasks and not self.rest_tasks[symbol].done(): + self.rest_tasks[symbol].cancel() + + logger.info(f"āœ… Stopped REST API fallback for {symbol}") + + async def _rest_fallback_loop(self, symbol: str): + """REST API fallback loop""" + while self.rest_fallback_active[symbol]: + try: + await self._fetch_rest_orderbook(symbol) + await asyncio.sleep(1) # Update every second + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"āŒ REST fallback error for {symbol}: {e}") + await asyncio.sleep(5) # Wait longer on error + + async def _fetch_rest_orderbook(self, symbol: str): + """Fetch order book data via REST API""" + try: + if not self.rest_session: + return + + # Binance REST API + rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT + url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000" + + async with self.rest_session.get(url) as response: + if response.status == 200: + data = await response.json() + + cob_data = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']], + 'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']], + 'source': 'rest_fallback', + 'exchange': 'binance' + } + + # Calculate stats + if cob_data['bids'] and cob_data['asks']: + best_bid = max(cob_data['bids'], key=lambda x: x['price']) + best_ask = min(cob_data['asks'], key=lambda x: x['price']) + + cob_data['stats'] = { + 'best_bid': best_bid['price'], + 'best_ask': best_ask['price'], + 'spread': best_ask['price'] - best_bid['price'], + 'mid_price': (best_bid['price'] + best_ask['price']) / 2, + 'bid_volume': sum(bid['size'] for bid in cob_data['bids']), + 'ask_volume': sum(ask['size'] for ask in cob_data['asks']) + } + + # Update cache + self.latest_cob_data[symbol] = cob_data + + # Notify callbacks + for callback in self.cob_callbacks: + try: + await callback(symbol, cob_data) + except Exception as e: + logger.error(f"āŒ Error in COB callback: {e}") + + logger.debug(f"šŸ“Š Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks") + + else: + logger.warning(f"āš ļø REST API error for {symbol}: HTTP {response.status}") + + except Exception as e: + logger.error(f"āŒ Error fetching REST order book for {symbol}: {e}") + + async def _monitor_connections(self): + """Monitor WebSocket connections and provide status updates""" + while True: + try: + await asyncio.sleep(10) # Check every 10 seconds + + for symbol in self.symbols: + status = self.status[symbol] + + # Check for stale connections + if status.connected and status.last_message_time: + time_since_last = datetime.now() - status.last_message_time + if time_since_last > timedelta(seconds=30): + logger.warning(f"āš ļø No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s") + await self._notify_dashboard_status(symbol, "stale", "No recent messages") + + # Log status + if status.connected: + logger.debug(f"āœ… {symbol}: Connected, {status.messages_received} messages received") + elif self.rest_fallback_active[symbol]: + logger.debug(f"āš ļø {symbol}: Using REST fallback") + else: + logger.debug(f"āŒ {symbol}: Disconnected, last error: {status.last_error}") + + except Exception as e: + logger.error(f"āŒ Error in connection monitor: {e}") + + async def _notify_dashboard_status(self, symbol: str, status: str, message: str): + """Notify dashboard of status changes""" + try: + if self.dashboard_callback: + await self.dashboard_callback({ + 'type': 'cob_status', + 'symbol': symbol, + 'status': status, + 'message': message, + 'timestamp': datetime.now().isoformat() + }) + except Exception as e: + logger.error(f"āŒ Error notifying dashboard: {e}") + + def get_status_summary(self) -> Dict[str, Any]: + """Get status summary for all symbols""" + summary = { + 'websockets_available': WEBSOCKETS_AVAILABLE, + 'symbols': {}, + 'overall_status': 'unknown' + } + + connected_count = 0 + fallback_count = 0 + + for symbol in self.symbols: + status = self.status[symbol] + symbol_status = { + 'connected': status.connected, + 'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None, + 'connection_attempts': status.connection_attempts, + 'last_error': status.last_error, + 'messages_received': status.messages_received, + 'rest_fallback_active': self.rest_fallback_active[symbol] + } + + if status.connected: + connected_count += 1 + elif self.rest_fallback_active[symbol]: + fallback_count += 1 + + summary['symbols'][symbol] = symbol_status + + # Determine overall status + if connected_count == len(self.symbols): + summary['overall_status'] = 'all_connected' + elif connected_count + fallback_count == len(self.symbols): + summary['overall_status'] = 'partial_fallback' + else: + summary['overall_status'] = 'degraded' + + return summary + +# Global instance for easy access +enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None + +async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None) -> EnhancedCOBWebSocket: + """Get or create the global enhanced COB WebSocket instance""" + global enhanced_cob_websocket + + if enhanced_cob_websocket is None: + enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback) + await enhanced_cob_websocket.start() + + return enhanced_cob_websocket + +async def stop_enhanced_cob_websocket(): + """Stop the global enhanced COB WebSocket instance""" + global enhanced_cob_websocket + + if enhanced_cob_websocket: + await enhanced_cob_websocket.stop() + enhanced_cob_websocket = None \ No newline at end of file diff --git a/core/orchestrator.py b/core/orchestrator.py index 2666ea5..d215a86 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -1369,15 +1369,31 @@ class TradingOrchestrator: reasoning['models_aggregated'] = [pred.model_name for pred in predictions] reasoning['aggregated_confidence'] = best_confidence - # Apply confidence thresholds for signal confirmation + # Calculate dynamic aggressiveness based on recent performance + entry_aggressiveness = self._calculate_dynamic_entry_aggressiveness(symbol) + + # Adjust confidence threshold based on entry aggressiveness + # Higher aggressiveness = lower threshold (more trades) + # entry_aggressiveness: 0.0 = very conservative, 1.0 = very aggressive + base_threshold = self.confidence_threshold + aggressiveness_factor = 1.0 - entry_aggressiveness # Invert: high agg = low factor + dynamic_threshold = base_threshold * aggressiveness_factor + + # Ensure minimum threshold for safety (don't go below 1% confidence) + dynamic_threshold = max(0.01, dynamic_threshold) + + # Apply dynamic confidence threshold for signal confirmation if best_action != 'HOLD': - if best_confidence < self.confidence_threshold: - logger.debug(f"Signal below confidence threshold: {best_action} {symbol} " - f"(confidence: {best_confidence:.3f} < {self.confidence_threshold})") + if best_confidence < dynamic_threshold: + logger.debug(f"Signal below dynamic confidence threshold: {best_action} {symbol} " + f"(confidence: {best_confidence:.3f} < {dynamic_threshold:.3f}, " + f"base: {base_threshold:.3f}, aggressiveness: {entry_aggressiveness:.2f})") best_action = 'HOLD' best_confidence = 0.0 - reasoning['rejected_reason'] = 'low_confidence' else: + logger.info(f"SIGNAL ACCEPTED: {best_action} {symbol} " + f"(confidence: {best_confidence:.3f} >= {dynamic_threshold:.3f}, " + f"aggressiveness: {entry_aggressiveness:.2f})") # Add signal to accumulator for trend confirmation signal_data = { 'action': best_action, @@ -1418,8 +1434,7 @@ class TradingOrchestrator: except Exception: memory_usage = {} - # Calculate dynamic aggressiveness based on recent performance - entry_aggressiveness = self._calculate_dynamic_entry_aggressiveness(symbol) + # Get exit aggressiveness (entry aggressiveness already calculated above) exit_aggressiveness = self._calculate_dynamic_exit_aggressiveness(symbol, current_position_pnl) # Create final decision @@ -1440,6 +1455,9 @@ class TradingOrchestrator: f"entry_agg: {entry_aggressiveness:.2f}, exit_agg: {exit_aggressiveness:.2f}, " f"pnl: ${current_position_pnl:.2f})") + # Trigger training on each decision (especially for executed trades) + self._trigger_training_on_decision(decision, price) + return decision except Exception as e: @@ -2032,6 +2050,253 @@ class TradingOrchestrator: logger.error(f"Error calculating enhanced reward: {e}") return base_pnl + def _trigger_training_on_decision(self, decision: TradingDecision, current_price: float): + """Trigger training on each decision, especially executed trades + + This ensures models learn from every signal outcome, giving more weight + to executed trades as they have real market feedback. + """ + try: + # Only train if training is enabled and we have the enhanced training system + if not self.training_enabled or not self.enhanced_training_system: + return + + symbol = decision.symbol + action = decision.action + confidence = decision.confidence + + # Create training data from the decision + training_data = { + 'symbol': symbol, + 'action': action, + 'confidence': confidence, + 'price': current_price, + 'timestamp': decision.timestamp, + 'executed': action != 'HOLD', # Assume non-HOLD actions are executed + 'entry_aggressiveness': decision.entry_aggressiveness, + 'exit_aggressiveness': decision.exit_aggressiveness, + 'reasoning': decision.reasoning + } + + # Add to enhanced training system for immediate learning + if hasattr(self.enhanced_training_system, 'add_decision_for_training'): + self.enhanced_training_system.add_decision_for_training(training_data) + logger.debug(f"šŸŽ“ Added decision to training queue: {action} {symbol} (conf: {confidence:.3f})") + + # Trigger immediate training for executed trades (higher priority) + if action != 'HOLD': + if hasattr(self.enhanced_training_system, 'trigger_immediate_training'): + self.enhanced_training_system.trigger_immediate_training( + symbol=symbol, + priority='high' if confidence > 0.7 else 'medium' + ) + logger.info(f"šŸš€ Triggered immediate training for executed trade: {action} {symbol}") + + # Train all models on the decision outcome + self._train_models_on_decision(decision, current_price) + + except Exception as e: + logger.error(f"Error triggering training on decision: {e}") + + def _train_models_on_decision(self, decision: TradingDecision, current_price: float): + """Train all models on the decision outcome + + This provides immediate feedback to models about their predictions, + allowing them to learn from each signal they generate. + """ + try: + symbol = decision.symbol + action = decision.action + confidence = decision.confidence + + # Get current market data for training context + market_data = self._get_current_market_data(symbol) + if not market_data: + return + + # Train DQN agent if available + if self.rl_agent and hasattr(self.rl_agent, 'add_experience'): + try: + # Create state representation + state = self._create_state_for_training(symbol, market_data) + + # Map action to DQN action space + action_mapping = {'BUY': 0, 'SELL': 1, 'HOLD': 2} + dqn_action = action_mapping.get(action, 2) + + # Calculate immediate reward based on confidence and execution + immediate_reward = confidence if action != 'HOLD' else 0.0 + + # Add experience to DQN + self.rl_agent.add_experience( + state=state, + action=dqn_action, + reward=immediate_reward, + next_state=state, # Will be updated with actual outcome later + done=False + ) + + logger.debug(f"🧠 Added DQN experience: {action} {symbol} (reward: {immediate_reward:.3f})") + + except Exception as e: + logger.debug(f"Error training DQN on decision: {e}") + + # Train CNN model if available + if self.cnn_model and hasattr(self.cnn_model, 'add_training_sample'): + try: + # Create CNN input features + cnn_features = self._create_cnn_features_for_training(symbol, market_data) + + # Create target based on action + target_mapping = {'BUY': [1, 0, 0], 'SELL': [0, 1, 0], 'HOLD': [0, 0, 1]} + target = target_mapping.get(action, [0, 0, 1]) + + # Add training sample + self.cnn_model.add_training_sample(cnn_features, target, weight=confidence) + + logger.debug(f"šŸ” Added CNN training sample: {action} {symbol}") + + except Exception as e: + logger.debug(f"Error training CNN on decision: {e}") + + # Train COB RL model if available and we have COB data + if self.cob_rl_agent and symbol in self.latest_cob_data: + try: + cob_data = self.latest_cob_data[symbol] + if hasattr(self.cob_rl_agent, 'add_experience'): + # Create COB state representation + cob_state = self._create_cob_state_for_training(symbol, cob_data) + + # Add COB experience + self.cob_rl_agent.add_experience( + state=cob_state, + action=action, + reward=confidence, + symbol=symbol + ) + + logger.debug(f"šŸ“Š Added COB RL experience: {action} {symbol}") + + except Exception as e: + logger.debug(f"Error training COB RL on decision: {e}") + + except Exception as e: + logger.error(f"Error training models on decision: {e}") + + def _get_current_market_data(self, symbol: str) -> Optional[Dict]: + """Get current market data for training context""" + try: + if self.data_provider: + # Get recent data for training + df = self.data_provider.get_historical_data(symbol, '1m', limit=100) + if df is not None and not df.empty: + return { + 'ohlcv': df.tail(50).to_dict('records'), # Last 50 candles + 'current_price': float(df['close'].iloc[-1]), + 'volume': float(df['volume'].iloc[-1]), + 'timestamp': df.index[-1] + } + return None + except Exception as e: + logger.debug(f"Error getting market data for training: {e}") + return None + + def _create_state_for_training(self, symbol: str, market_data: Dict) -> np.ndarray: + """Create state representation for DQN training""" + try: + # Create a basic state representation + ohlcv_data = market_data.get('ohlcv', []) + if not ohlcv_data: + return np.zeros(100) # Default state size + + # Extract features from recent candles + features = [] + for candle in ohlcv_data[-20:]: # Last 20 candles + features.extend([ + candle.get('open', 0), + candle.get('high', 0), + candle.get('low', 0), + candle.get('close', 0), + candle.get('volume', 0) + ]) + + # Pad or truncate to expected size + state = np.array(features[:100]) + if len(state) < 100: + state = np.pad(state, (0, 100 - len(state)), 'constant') + + return state + + except Exception as e: + logger.debug(f"Error creating state for training: {e}") + return np.zeros(100) + + def _create_cnn_features_for_training(self, symbol: str, market_data: Dict) -> np.ndarray: + """Create CNN features for training""" + try: + # Similar to state creation but formatted for CNN + ohlcv_data = market_data.get('ohlcv', []) + if not ohlcv_data: + return np.zeros((1, 100)) + + # Create feature matrix + features = [] + for candle in ohlcv_data[-20:]: + features.extend([ + candle.get('open', 0), + candle.get('high', 0), + candle.get('low', 0), + candle.get('close', 0), + candle.get('volume', 0) + ]) + + # Reshape for CNN input + cnn_features = np.array(features[:100]).reshape(1, -1) + if cnn_features.shape[1] < 100: + cnn_features = np.pad(cnn_features, ((0, 0), (0, 100 - cnn_features.shape[1])), 'constant') + + return cnn_features + + except Exception as e: + logger.debug(f"Error creating CNN features for training: {e}") + return np.zeros((1, 100)) + + def _create_cob_state_for_training(self, symbol: str, cob_data: Dict) -> np.ndarray: + """Create COB state representation for training""" + try: + # Extract COB features for training + features = [] + + # Add bid/ask data + bids = cob_data.get('bids', [])[:10] # Top 10 bids + asks = cob_data.get('asks', [])[:10] # Top 10 asks + + for bid in bids: + features.extend([bid.get('price', 0), bid.get('size', 0)]) + for ask in asks: + features.extend([ask.get('price', 0), ask.get('size', 0)]) + + # Add market stats + stats = cob_data.get('stats', {}) + features.extend([ + stats.get('spread', 0), + stats.get('mid_price', 0), + stats.get('bid_volume', 0), + stats.get('ask_volume', 0), + stats.get('imbalance', 0) + ]) + + # Pad to expected COB state size (2000 features) + cob_state = np.array(features[:2000]) + if len(cob_state) < 2000: + cob_state = np.pad(cob_state, (0, 2000 - len(cob_state)), 'constant') + + return cob_state + + except Exception as e: + logger.debug(f"Error creating COB state for training: {e}") + return np.zeros(2000) + def _check_signal_confirmation(self, symbol: str, signal_data: Dict) -> Optional[str]: """Check if we have enough signal confirmations for trend confirmation with rate limiting""" try: diff --git a/test_enhanced_cob_websocket.py b/test_enhanced_cob_websocket.py new file mode 100644 index 0000000..367ec95 --- /dev/null +++ b/test_enhanced_cob_websocket.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Test Enhanced COB WebSocket Implementation + +This script tests the enhanced COB WebSocket system to ensure: +1. WebSocket connections work properly +2. Fallback to REST API when WebSocket fails +3. Dashboard status updates are working +4. Clear error messages and warnings are displayed +""" + +import asyncio +import logging +import sys +import time +from datetime import datetime + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Import the enhanced COB WebSocket +try: + from core.enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket + print("āœ… Enhanced COB WebSocket imported successfully") +except ImportError as e: + print(f"āŒ Failed to import Enhanced COB WebSocket: {e}") + sys.exit(1) + +async def test_dashboard_callback(status_data): + """Test dashboard callback function""" + print(f"šŸ“Š Dashboard callback received: {status_data}") + +async def test_cob_callback(symbol, cob_data): + """Test COB data callback function""" + stats = cob_data.get('stats', {}) + mid_price = stats.get('mid_price', 0) + bid_levels = len(cob_data.get('bids', [])) + ask_levels = len(cob_data.get('asks', [])) + source = cob_data.get('source', 'unknown') + + print(f"šŸ“ˆ COB data for {symbol}: ${mid_price:.2f}, {bid_levels} bids, {ask_levels} asks (via {source})") + +async def main(): + """Main test function""" + print("šŸš€ Testing Enhanced COB WebSocket System") + print("=" * 60) + + # Test 1: Initialize Enhanced COB WebSocket + print("\n1. Initializing Enhanced COB WebSocket...") + try: + cob_ws = EnhancedCOBWebSocket( + symbols=['BTC/USDT', 'ETH/USDT'], + dashboard_callback=test_dashboard_callback + ) + + # Add callbacks + cob_ws.add_cob_callback(test_cob_callback) + + print("āœ… Enhanced COB WebSocket initialized") + except Exception as e: + print(f"āŒ Failed to initialize: {e}") + return + + # Test 2: Start WebSocket connections + print("\n2. Starting WebSocket connections...") + try: + await cob_ws.start() + print("āœ… WebSocket connections started") + except Exception as e: + print(f"āŒ Failed to start connections: {e}") + return + + # Test 3: Monitor connections for 30 seconds + print("\n3. Monitoring connections for 30 seconds...") + start_time = time.time() + + while time.time() - start_time < 30: + try: + # Get status summary + status = cob_ws.get_status_summary() + overall_status = status.get('overall_status', 'unknown') + + print(f"ā±ļø Status: {overall_status}") + + # Print symbol-specific status + for symbol, symbol_status in status.get('symbols', {}).items(): + connected = symbol_status.get('connected', False) + fallback = symbol_status.get('rest_fallback_active', False) + messages = symbol_status.get('messages_received', 0) + + if connected: + print(f" {symbol}: āœ… Connected ({messages} messages)") + elif fallback: + print(f" {symbol}: āš ļø REST fallback active") + else: + error = symbol_status.get('last_error', 'Unknown error') + print(f" {symbol}: āŒ Error - {error}") + + await asyncio.sleep(5) # Check every 5 seconds + + except KeyboardInterrupt: + print("\nā¹ļø Test interrupted by user") + break + except Exception as e: + print(f"āŒ Error during monitoring: {e}") + break + + # Test 4: Final status check + print("\n4. Final status check...") + try: + final_status = cob_ws.get_status_summary() + print(f"Final overall status: {final_status.get('overall_status', 'unknown')}") + + for symbol, symbol_status in final_status.get('symbols', {}).items(): + print(f" {symbol}:") + print(f" Connected: {symbol_status.get('connected', False)}") + print(f" Messages received: {symbol_status.get('messages_received', 0)}") + print(f" REST fallback: {symbol_status.get('rest_fallback_active', False)}") + if symbol_status.get('last_error'): + print(f" Last error: {symbol_status.get('last_error')}") + + except Exception as e: + print(f"āŒ Error getting final status: {e}") + + # Test 5: Stop connections + print("\n5. Stopping connections...") + try: + await cob_ws.stop() + print("āœ… Connections stopped successfully") + except Exception as e: + print(f"āŒ Error stopping connections: {e}") + + print("\n" + "=" * 60) + print("šŸ Enhanced COB WebSocket test completed") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nā¹ļø Test interrupted") + except Exception as e: + print(f"āŒ Test failed: {e}") + import traceback + traceback.print_exc() \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 3074796..a3ddac6 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -223,8 +223,8 @@ class CleanTradingDashboard: # Universal Data Adapter is managed by orchestrator logger.debug("Universal Data Adapter ready for orchestrator data access") - # Initialize COB integration with high-frequency data handling - self._initialize_cob_integration() + # Initialize COB integration with enhanced WebSocket + self._initialize_enhanced_cob_integration() # Start signal generation loop to ensure continuous trading signals self._start_signal_generation_loop() @@ -497,6 +497,7 @@ class CleanTradingDashboard: Output('trade-count', 'children'), Output('portfolio-value', 'children'), Output('profitability-multiplier', 'children'), + Output('cob-websocket-status', 'children'), Output('mexc-status', 'children')], [Input('interval-component', 'n_intervals')] ) @@ -622,11 +623,27 @@ class CleanTradingDashboard: if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode: mexc_status = "LIVE+SYNC" # Indicate live trading with position sync - return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, mexc_status + # COB WebSocket status + cob_status = self.get_cob_websocket_status() + overall_status = cob_status.get('overall_status', 'unknown') + warning_message = cob_status.get('warning_message') + + if overall_status == 'all_connected': + cob_status_str = "Connected" + elif overall_status == 'partial_fallback': + cob_status_str = "Fallback" + elif overall_status == 'degraded': + cob_status_str = "Degraded" + elif overall_status == 'unavailable': + cob_status_str = "N/A" + else: + cob_status_str = "Error" + + return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, cob_status_str, mexc_status except Exception as e: logger.error(f"Error updating metrics: {e}") - return "Error", "$0.00", "Error", "0", "$100.00", "0.0x", "ERROR" + return "Error", "$0.00", "Error", "0", "$100.00", "0.0x", "Error", "ERROR" @self.app.callback( Output('recent-decisions', 'children'), @@ -7296,3 +7313,166 @@ def create_clean_dashboard(data_provider: Optional[DataProvider] = None, orchest # test edit + def _initialize_enhanced_cob_integration(self): + """Initialize enhanced COB integration with WebSocket status monitoring""" + try: + if not COB_INTEGRATION_AVAILABLE: + logger.warning("āš ļø COB integration not available - WebSocket status will show as unavailable") + return + + logger.info("šŸš€ Initializing Enhanced COB Integration with WebSocket monitoring") + + # Initialize COB integration + self.cob_integration = COBIntegration( + data_provider=self.data_provider, + symbols=['ETH/USDT', 'BTC/USDT'] + ) + + # Add dashboard callback for COB data + self.cob_integration.add_dashboard_callback(self._on_enhanced_cob_update) + + # Start COB integration in background thread + def start_cob_integration(): + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.cob_integration.start()) + loop.run_forever() + except Exception as e: + logger.error(f"āŒ Error in COB integration thread: {e}") + + cob_thread = threading.Thread(target=start_cob_integration, daemon=True) + cob_thread.start() + + logger.info("āœ… Enhanced COB Integration started with WebSocket monitoring") + + except Exception as e: + logger.error(f"āŒ Error initializing Enhanced COB Integration: {e}") + + def _on_enhanced_cob_update(self, symbol: str, data: Dict): + """Handle enhanced COB updates with WebSocket status""" + try: + # Update COB data cache + self.latest_cob_data[symbol] = data + + # Extract WebSocket status if available + if isinstance(data, dict) and 'type' in data: + if data['type'] == 'websocket_status': + status_data = data.get('data', {}) + status = status_data.get('status', 'unknown') + message = status_data.get('message', '') + + # Update COB cache with status + if symbol not in self.cob_cache: + self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} + + self.cob_cache[symbol]['websocket_status'] = status + self.cob_cache[symbol]['websocket_message'] = message + self.cob_cache[symbol]['last_status_update'] = time.time() + + logger.info(f"šŸ”Œ COB WebSocket status for {symbol}: {status} - {message}") + + elif data['type'] == 'cob_update': + # Regular COB data update + cob_data = data.get('data', {}) + stats = cob_data.get('stats', {}) + + # Update cache + self.cob_cache[symbol]['data'] = cob_data + self.cob_cache[symbol]['last_update'] = time.time() + self.cob_cache[symbol]['updates_count'] += 1 + + # Update WebSocket status from stats + websocket_status = stats.get('websocket_status', 'unknown') + source = stats.get('source', 'unknown') + + self.cob_cache[symbol]['websocket_status'] = websocket_status + self.cob_cache[symbol]['source'] = source + + logger.debug(f"šŸ“Š Enhanced COB update for {symbol}: {websocket_status} via {source}") + + except Exception as e: + logger.error(f"āŒ Error handling enhanced COB update for {symbol}: {e}") + + def get_cob_websocket_status(self) -> Dict[str, Any]: + """Get COB WebSocket status for dashboard display""" + try: + status_summary = { + 'overall_status': 'unknown', + 'symbols': {}, + 'last_update': None, + 'warning_message': None + } + + if not COB_INTEGRATION_AVAILABLE: + status_summary['overall_status'] = 'unavailable' + status_summary['warning_message'] = 'COB integration not available' + return status_summary + + connected_count = 0 + fallback_count = 0 + error_count = 0 + + for symbol in ['ETH/USDT', 'BTC/USDT']: + symbol_status = { + 'status': 'unknown', + 'message': 'No data', + 'last_update': None, + 'source': 'unknown' + } + + if symbol in self.cob_cache: + cache_data = self.cob_cache[symbol] + ws_status = cache_data.get('websocket_status', 'unknown') + source = cache_data.get('source', 'unknown') + last_update = cache_data.get('last_update', 0) + + symbol_status['status'] = ws_status + symbol_status['source'] = source + symbol_status['last_update'] = datetime.fromtimestamp(last_update).isoformat() if last_update > 0 else None + + # Determine status category + if ws_status == 'connected': + connected_count += 1 + symbol_status['message'] = 'WebSocket connected' + elif ws_status == 'fallback' or source == 'rest_fallback': + fallback_count += 1 + symbol_status['message'] = 'Using REST API fallback' + else: + error_count += 1 + symbol_status['message'] = cache_data.get('websocket_message', 'Connection error') + + status_summary['symbols'][symbol] = symbol_status + + # Determine overall status + total_symbols = len(['ETH/USDT', 'BTC/USDT']) + + if connected_count == total_symbols: + status_summary['overall_status'] = 'all_connected' + status_summary['warning_message'] = None + elif connected_count + fallback_count == total_symbols: + status_summary['overall_status'] = 'partial_fallback' + status_summary['warning_message'] = f'āš ļø {fallback_count} symbol(s) using REST fallback - WebSocket connection failed' + elif fallback_count > 0: + status_summary['overall_status'] = 'degraded' + status_summary['warning_message'] = f'āš ļø COB WebSocket degraded - {error_count} error(s), {fallback_count} fallback(s)' + else: + status_summary['overall_status'] = 'error' + status_summary['warning_message'] = 'āŒ COB WebSocket failed - All connections down' + + # Set last update time + last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()] + if last_updates and max(last_updates) > 0: + status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat() + + return status_summary + + except Exception as e: + logger.error(f"āŒ Error getting COB WebSocket status: {e}") + return { + 'overall_status': 'error', + 'warning_message': f'Error getting status: {e}', + 'symbols': {}, + 'last_update': None + } \ No newline at end of file diff --git a/web/layout_manager.py b/web/layout_manager.py index 302105e..ff683f3 100644 --- a/web/layout_manager.py +++ b/web/layout_manager.py @@ -94,6 +94,7 @@ class DashboardLayoutManager: ("trade-count", "Trades", "text-warning"), ("portfolio-value", "Portfolio", "text-secondary"), ("profitability-multiplier", "Profit Boost", "text-primary"), + ("cob-websocket-status", "COB WebSocket", "text-warning"), ("mexc-status", f"{exchange_name} API", "text-info") ]