#!/usr/bin/env python3 """ Consolidated Order Book (COB) Real-time Dashboard Server Provides a web interface for visualizing: - Consolidated order book across multiple exchanges - Session Volume Profile (SVP) from actual trades - Real-time statistics for neural network models - Hybrid WebSocket + REST API order book data Windows-compatible implementation with proper error handling. """ import asyncio import json import logging import weakref from datetime import datetime, timedelta from collections import deque from typing import Dict, List, Optional, Any import traceback # Windows-compatible imports try: from aiohttp import web, WSMsgType import aiohttp_cors except ImportError as e: logging.error(f"Required dependencies missing: {e}") raise from core.cob_integration import COBIntegration logger = logging.getLogger(__name__) class COBDashboardServer: """ Real-time COB Dashboard Server with Windows compatibility """ def __init__(self, host: str = 'localhost', port: int = 8053): self.host = host self.port = port self.app = web.Application() self.symbols = ['BTC/USDT', 'ETH/USDT'] # COB components self.cob_integration: Optional[COBIntegration] = None # Web server components self.runner = None self.site = None # WebSocket connections for real-time updates self.websocket_connections = weakref.WeakSet() # Latest data cache for quick serving self.latest_cob_data: Dict[str, Dict] = {} self.latest_stats: Dict = {} # Update timestamps for monitoring self.update_timestamps: Dict[str, deque] = { symbol: deque(maxlen=100) for symbol in self.symbols } # OHLCV data for mini charts (5 minutes = 300 1-second candles) self.ohlcv_data: Dict[str, deque] = { symbol: deque(maxlen=300) for symbol in self.symbols } # Current candle data (building 1-second candles) self.current_candles: Dict[str, Dict] = {} # Setup routes and CORS self._setup_routes() self._setup_cors() logger.info(f"COB Dashboard Server initialized for {self.symbols}") def _setup_routes(self): """Setup HTTP routes""" # Static files self.app.router.add_get('/', self.serve_dashboard) # API endpoints self.app.router.add_get('/api/symbols', self.get_symbols) self.app.router.add_get('/api/cob/{symbol}', self.get_cob_data) self.app.router.add_get('/api/realtime/{symbol}', self.get_realtime_stats) self.app.router.add_get('/api/status', self.get_status) # WebSocket endpoint self.app.router.add_get('/ws', self.websocket_handler) def _setup_cors(self): """Setup CORS for cross-origin requests""" cors = aiohttp_cors.setup(self.app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) # Add CORS to all routes for route in list(self.app.router.routes()): cors.add(route) async def start(self): """Start the dashboard server""" try: logger.info(f"Starting COB Dashboard Server on {self.host}:{self.port}") # Start web server first self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, self.host, self.port) await self.site.start() logger.info(f"COB Dashboard Server running at http://{self.host}:{self.port}") # Initialize COB integration self.cob_integration = COBIntegration(symbols=self.symbols) self.cob_integration.add_dashboard_callback(self._sync_cob_update_wrapper) # Start COB data streaming as background task asyncio.create_task(self.cob_integration.start()) # Start periodic tasks as background tasks asyncio.create_task(self._periodic_stats_update()) asyncio.create_task(self._cleanup_old_data()) # Keep the server running while True: await asyncio.sleep(1) except Exception as e: logger.error(f"Error starting COB Dashboard Server: {e}") logger.error(traceback.format_exc()) raise async def stop(self): """Stop the dashboard server""" logger.info("Stopping COB Dashboard Server") # Close all WebSocket connections for ws in list(self.websocket_connections): try: await ws.close() except Exception as e: logger.warning(f"Error closing WebSocket: {e}") # Stop web server if self.site: await self.site.stop() if self.runner: await self.runner.cleanup() # Stop COB integration if self.cob_integration: await self.cob_integration.stop() logger.info("COB Dashboard Server stopped") async def serve_dashboard(self, request): """Serve the main dashboard HTML page""" try: return web.FileResponse('web/cob_dashboard.html') except FileNotFoundError: return web.Response( text="Dashboard HTML file not found", status=404, content_type='text/plain' ) async def get_symbols(self, request): """Get available symbols""" return web.json_response({ 'symbols': self.symbols, 'default': self.symbols[0] if self.symbols else None }) async def get_cob_data(self, request): """Get consolidated order book data for a symbol""" try: symbol = request.match_info['symbol'] symbol = symbol.replace('%2F', '/') # URL decode if symbol not in self.symbols: return web.json_response({ 'error': f'Symbol {symbol} not supported', 'available_symbols': self.symbols }, status=400) # Get latest data from cache or COB integration if symbol in self.latest_cob_data: data = self.latest_cob_data[symbol].copy() # Add OHLCV data to REST response if symbol in self.ohlcv_data: data['ohlcv'] = list(self.ohlcv_data[symbol]) logger.debug(f"REST API: Added {len(data['ohlcv'])} OHLCV candles for {symbol}") elif self.cob_integration: data = await self._generate_dashboard_data(symbol) else: data = self._get_empty_data(symbol) return web.json_response({ 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'data': data }) except Exception as e: logger.error(f"Error getting COB data: {e}") return web.json_response({ 'error': str(e) }, status=500) async def get_realtime_stats(self, request): """Get real-time statistics for neural network models""" try: symbol = request.match_info['symbol'] symbol = symbol.replace('%2F', '/') if symbol not in self.symbols: return web.json_response({ 'error': f'Symbol {symbol} not supported' }, status=400) stats = {} if self.cob_integration: stats = self.cob_integration.get_realtime_stats_for_nn(symbol) return web.json_response({ 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'stats': stats }) except Exception as e: logger.error(f"Error getting realtime stats: {e}") return web.json_response({ 'error': str(e) }, status=500) async def get_status(self, request): """Get server status""" status = { 'server': 'running', 'symbols': self.symbols, 'websocket_connections': len(self.websocket_connections), 'cob_integration': 'active' if self.cob_integration else 'inactive', 'last_updates': {} } # Add last update times for symbol in self.symbols: if symbol in self.update_timestamps and self.update_timestamps[symbol]: status['last_updates'][symbol] = self.update_timestamps[symbol][-1].isoformat() return web.json_response(status) async def websocket_handler(self, request): """Handle WebSocket connections""" ws = web.WebSocketResponse() await ws.prepare(request) # Add to connections self.websocket_connections.add(ws) logger.info(f"WebSocket connected. Total connections: {len(self.websocket_connections)}") try: # Send initial data for symbol in self.symbols: if symbol in self.latest_cob_data: await self._send_websocket_data(ws, 'cob_update', symbol, self.latest_cob_data[symbol]) # Handle incoming messages async for msg in ws: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) await self._handle_websocket_message(ws, data) except json.JSONDecodeError: await ws.send_str(json.dumps({ 'type': 'error', 'message': 'Invalid JSON' })) elif msg.type == WSMsgType.ERROR: logger.error(f'WebSocket error: {ws.exception()}') break except Exception as e: logger.error(f"WebSocket error: {e}") finally: # Remove from connections self.websocket_connections.discard(ws) logger.info(f"WebSocket disconnected. Remaining connections: {len(self.websocket_connections)}") return ws async def _handle_websocket_message(self, ws, data): """Handle incoming WebSocket messages""" try: message_type = data.get('type') if message_type == 'subscribe': symbol = data.get('symbol') if symbol in self.symbols and symbol in self.latest_cob_data: await self._send_websocket_data(ws, 'cob_update', symbol, self.latest_cob_data[symbol]) elif message_type == 'ping': await ws.send_str(json.dumps({ 'type': 'pong', 'timestamp': datetime.now().isoformat() })) except Exception as e: logger.error(f"Error handling WebSocket message: {e}") def _sync_cob_update_wrapper(self, symbol: str, data: Dict): """Sync wrapper for async COB update handler""" try: # Create async task to handle the update asyncio.create_task(self._on_cob_update(symbol, data)) except Exception as e: logger.error(f"Error in COB update wrapper for {symbol}: {e}") async def _on_cob_update(self, symbol: str, data: Dict): """Handle COB updates from integration""" try: logger.debug(f"Received COB update for {symbol}") # Process OHLCV data from mid price await self._process_ohlcv_update(symbol, data) # Update cache self.latest_cob_data[symbol] = data self.update_timestamps[symbol].append(datetime.now()) # Broadcast to WebSocket clients await self._broadcast_cob_update(symbol, data) logger.debug(f"Broadcasted COB update for {symbol} to {len(self.websocket_connections)} connections") except Exception as e: logger.error(f"Error handling COB update for {symbol}: {e}") async def _process_ohlcv_update(self, symbol: str, data: Dict): """Process price updates into 1-second OHLCV candles""" try: stats = data.get('stats', {}) mid_price = stats.get('mid_price', 0) if mid_price <= 0: return now = datetime.now() current_second = now.replace(microsecond=0) # Get or create current candle if symbol not in self.current_candles: self.current_candles[symbol] = { 'timestamp': current_second, 'open': mid_price, 'high': mid_price, 'low': mid_price, 'close': mid_price, 'volume': 0, # We don't have volume from order book, use tick count 'tick_count': 1 } else: current_candle = self.current_candles[symbol] # Check if we need to close current candle and start new one if current_second > current_candle['timestamp']: # Close previous candle finished_candle = { 'timestamp': current_candle['timestamp'].isoformat(), 'open': current_candle['open'], 'high': current_candle['high'], 'low': current_candle['low'], 'close': current_candle['close'], 'volume': current_candle['tick_count'], # Use tick count as volume 'tick_count': current_candle['tick_count'] } # Add to OHLCV history self.ohlcv_data[symbol].append(finished_candle) # Start new candle self.current_candles[symbol] = { 'timestamp': current_second, 'open': mid_price, 'high': mid_price, 'low': mid_price, 'close': mid_price, 'volume': 0, 'tick_count': 1 } else: # Update current candle current_candle['high'] = max(current_candle['high'], mid_price) current_candle['low'] = min(current_candle['low'], mid_price) current_candle['close'] = mid_price current_candle['tick_count'] += 1 except Exception as e: logger.error(f"Error processing OHLCV update for {symbol}: {e}") async def _broadcast_cob_update(self, symbol: str, data: Dict): """Broadcast COB update to all connected WebSocket clients""" if not self.websocket_connections: return # Add OHLCV data to the broadcast enhanced_data = data.copy() if symbol in self.ohlcv_data: enhanced_data['ohlcv'] = list(self.ohlcv_data[symbol]) message = { 'type': 'cob_update', 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'data': enhanced_data } # Send to all connections dead_connections = [] for ws in self.websocket_connections: try: await ws.send_str(json.dumps(message)) except Exception as e: logger.warning(f"Failed to send to WebSocket: {e}") dead_connections.append(ws) # Clean up dead connections for ws in dead_connections: self.websocket_connections.discard(ws) async def _send_websocket_data(self, ws, msg_type: str, symbol: str, data: Dict): """Send data to a specific WebSocket connection""" try: message = { 'type': msg_type, 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'data': data } await ws.send_str(json.dumps(message)) except Exception as e: logger.error(f"Error sending WebSocket data: {e}") async def _generate_dashboard_data(self, symbol: str) -> Dict: """Generate dashboard data for a symbol""" try: # Return cached data from COB integration callbacks if symbol in self.latest_cob_data: return self.latest_cob_data[symbol] else: return self._get_empty_data(symbol) except Exception as e: logger.error(f"Error generating dashboard data for {symbol}: {e}") return self._get_empty_data(symbol) def _get_empty_data(self, symbol: str) -> Dict: """Get empty data structure""" return { 'symbol': symbol, 'bids': [], 'asks': [], 'svp': {'data': []}, 'ohlcv': [], 'stats': { 'mid_price': 0, 'spread_bps': 0, 'bid_liquidity': 0, 'ask_liquidity': 0, 'bid_levels': 0, 'ask_levels': 0, 'imbalance': 0 } } async def _periodic_stats_update(self): """Periodically update and broadcast statistics""" while True: try: # Calculate update frequencies update_frequencies = {} for symbol in self.symbols: if symbol in self.update_timestamps and len(self.update_timestamps[symbol]) > 1: timestamps = list(self.update_timestamps[symbol]) if len(timestamps) >= 2: time_diff = (timestamps[-1] - timestamps[-2]).total_seconds() if time_diff > 0: update_frequencies[symbol] = 1.0 / time_diff # Broadcast stats if needed if update_frequencies: stats_message = { 'type': 'stats_update', 'timestamp': datetime.now().isoformat(), 'update_frequencies': update_frequencies } for ws in list(self.websocket_connections): try: await ws.send_str(json.dumps(stats_message)) except Exception: self.websocket_connections.discard(ws) await asyncio.sleep(1) # Update every 1 second for real-time responsiveness except Exception as e: logger.error(f"Error in periodic stats update: {e}") await asyncio.sleep(5) async def _cleanup_old_data(self): """Clean up old data to prevent memory leaks""" while True: try: cutoff_time = datetime.now() - timedelta(hours=1) # Clean up old timestamps for symbol in self.symbols: if symbol in self.update_timestamps: timestamps = self.update_timestamps[symbol] while timestamps and timestamps[0] < cutoff_time: timestamps.popleft() await asyncio.sleep(300) # Clean up every 5 minutes except Exception as e: logger.error(f"Error in cleanup: {e}") await asyncio.sleep(300) async def main(): """Main entry point""" # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger.info("Starting COB Dashboard Server") try: # Windows event loop policy fix if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'): asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) server = COBDashboardServer() await server.start() except KeyboardInterrupt: logger.info("COB Dashboard Server interrupted by user") except Exception as e: logger.error(f"COB Dashboard Server failed: {e}") logger.error(traceback.format_exc()) finally: if 'server' in locals(): await server.stop() if __name__ == "__main__": asyncio.run(main())