568 lines
21 KiB
Python
568 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Consolidated Order Book (COB) Real-time Dashboard Server
|
|
|
|
Provides a web interface for visualizing:
|
|
- Consolidated order book across multiple exchanges
|
|
- Session Volume Profile (SVP) from actual trades
|
|
- Real-time statistics for neural network models
|
|
- Hybrid WebSocket + REST API order book data
|
|
|
|
Windows-compatible implementation with proper error handling.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import weakref
|
|
from datetime import datetime, timedelta
|
|
from collections import deque
|
|
from typing import Dict, List, Optional, Any
|
|
import traceback
|
|
|
|
# Windows-compatible imports
|
|
try:
|
|
from aiohttp import web, WSMsgType
|
|
import aiohttp_cors
|
|
except ImportError as e:
|
|
logging.error(f"Required dependencies missing: {e}")
|
|
raise
|
|
|
|
from core.cob_integration import COBIntegration
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class COBDashboardServer:
|
|
"""
|
|
Real-time COB Dashboard Server with Windows compatibility
|
|
"""
|
|
|
|
def __init__(self, host: str = 'localhost', port: int = 8053):
|
|
self.host = host
|
|
self.port = port
|
|
self.app = web.Application()
|
|
self.symbols = ['BTC/USDT', 'ETH/USDT']
|
|
|
|
# COB components
|
|
self.cob_integration: Optional[COBIntegration] = None
|
|
|
|
# Web server components
|
|
self.runner = None
|
|
self.site = None
|
|
|
|
# WebSocket connections for real-time updates
|
|
self.websocket_connections = weakref.WeakSet()
|
|
|
|
# Latest data cache for quick serving
|
|
self.latest_cob_data: Dict[str, Dict] = {}
|
|
self.latest_stats: Dict = {}
|
|
|
|
# Update timestamps for monitoring
|
|
self.update_timestamps: Dict[str, deque] = {
|
|
symbol: deque(maxlen=100) for symbol in self.symbols
|
|
}
|
|
|
|
# OHLCV data for mini charts (5 minutes = 300 1-second candles)
|
|
self.ohlcv_data: Dict[str, deque] = {
|
|
symbol: deque(maxlen=300) for symbol in self.symbols
|
|
}
|
|
|
|
# Current candle data (building 1-second candles)
|
|
self.current_candles: Dict[str, Dict] = {}
|
|
|
|
# Setup routes and CORS
|
|
self._setup_routes()
|
|
self._setup_cors()
|
|
|
|
logger.info(f"COB Dashboard Server initialized for {self.symbols}")
|
|
|
|
def _setup_routes(self):
|
|
"""Setup HTTP routes"""
|
|
# Static files
|
|
self.app.router.add_get('/', self.serve_dashboard)
|
|
|
|
# API endpoints
|
|
self.app.router.add_get('/api/symbols', self.get_symbols)
|
|
self.app.router.add_get('/api/cob/{symbol}', self.get_cob_data)
|
|
self.app.router.add_get('/api/realtime/{symbol}', self.get_realtime_stats)
|
|
self.app.router.add_get('/api/status', self.get_status)
|
|
|
|
# WebSocket endpoint
|
|
self.app.router.add_get('/ws', self.websocket_handler)
|
|
|
|
def _setup_cors(self):
|
|
"""Setup CORS for cross-origin requests"""
|
|
cors = aiohttp_cors.setup(self.app, defaults={
|
|
"*": aiohttp_cors.ResourceOptions(
|
|
allow_credentials=True,
|
|
expose_headers="*",
|
|
allow_headers="*",
|
|
allow_methods="*"
|
|
)
|
|
})
|
|
|
|
# Add CORS to all routes
|
|
for route in list(self.app.router.routes()):
|
|
cors.add(route)
|
|
|
|
async def start(self):
|
|
"""Start the dashboard server"""
|
|
try:
|
|
logger.info(f"Starting COB Dashboard Server on {self.host}:{self.port}")
|
|
|
|
# Start web server first
|
|
self.runner = web.AppRunner(self.app)
|
|
await self.runner.setup()
|
|
|
|
self.site = web.TCPSite(self.runner, self.host, self.port)
|
|
await self.site.start()
|
|
|
|
logger.info(f"COB Dashboard Server running at http://{self.host}:{self.port}")
|
|
|
|
# Initialize COB integration
|
|
self.cob_integration = COBIntegration(symbols=self.symbols)
|
|
self.cob_integration.add_dashboard_callback(self._sync_cob_update_wrapper)
|
|
|
|
# Start COB data streaming as background task
|
|
asyncio.create_task(self.cob_integration.start())
|
|
|
|
# Start periodic tasks as background tasks
|
|
asyncio.create_task(self._periodic_stats_update())
|
|
asyncio.create_task(self._cleanup_old_data())
|
|
|
|
# Keep the server running
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting COB Dashboard Server: {e}")
|
|
logger.error(traceback.format_exc())
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""Stop the dashboard server"""
|
|
logger.info("Stopping COB Dashboard Server")
|
|
|
|
# Close all WebSocket connections
|
|
for ws in list(self.websocket_connections):
|
|
try:
|
|
await ws.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error closing WebSocket: {e}")
|
|
|
|
# Stop web server
|
|
if self.site:
|
|
await self.site.stop()
|
|
if self.runner:
|
|
await self.runner.cleanup()
|
|
|
|
# Stop COB integration
|
|
if self.cob_integration:
|
|
await self.cob_integration.stop()
|
|
|
|
logger.info("COB Dashboard Server stopped")
|
|
|
|
async def serve_dashboard(self, request):
|
|
"""Serve the main dashboard HTML page"""
|
|
try:
|
|
return web.FileResponse('web/cob_dashboard.html')
|
|
except FileNotFoundError:
|
|
return web.Response(
|
|
text="Dashboard HTML file not found",
|
|
status=404,
|
|
content_type='text/plain'
|
|
)
|
|
|
|
async def get_symbols(self, request):
|
|
"""Get available symbols"""
|
|
return web.json_response({
|
|
'symbols': self.symbols,
|
|
'default': self.symbols[0] if self.symbols else None
|
|
})
|
|
|
|
async def get_cob_data(self, request):
|
|
"""Get consolidated order book data for a symbol"""
|
|
try:
|
|
symbol = request.match_info['symbol']
|
|
symbol = symbol.replace('%2F', '/') # URL decode
|
|
|
|
if symbol not in self.symbols:
|
|
return web.json_response({
|
|
'error': f'Symbol {symbol} not supported',
|
|
'available_symbols': self.symbols
|
|
}, status=400)
|
|
|
|
# Get latest data from cache or COB integration
|
|
if symbol in self.latest_cob_data:
|
|
data = self.latest_cob_data[symbol].copy()
|
|
# Add OHLCV data to REST response
|
|
if symbol in self.ohlcv_data:
|
|
data['ohlcv'] = list(self.ohlcv_data[symbol])
|
|
logger.debug(f"REST API: Added {len(data['ohlcv'])} OHLCV candles for {symbol}")
|
|
elif self.cob_integration:
|
|
data = await self._generate_dashboard_data(symbol)
|
|
else:
|
|
data = self._get_empty_data(symbol)
|
|
|
|
return web.json_response({
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'data': data
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting COB data: {e}")
|
|
return web.json_response({
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
async def get_realtime_stats(self, request):
|
|
"""Get real-time statistics for neural network models"""
|
|
try:
|
|
symbol = request.match_info['symbol']
|
|
symbol = symbol.replace('%2F', '/')
|
|
|
|
if symbol not in self.symbols:
|
|
return web.json_response({
|
|
'error': f'Symbol {symbol} not supported'
|
|
}, status=400)
|
|
|
|
stats = {}
|
|
if self.cob_integration:
|
|
stats = self.cob_integration.get_realtime_stats_for_nn(symbol)
|
|
|
|
return web.json_response({
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'stats': stats
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting realtime stats: {e}")
|
|
return web.json_response({
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
async def get_status(self, request):
|
|
"""Get server status"""
|
|
status = {
|
|
'server': 'running',
|
|
'symbols': self.symbols,
|
|
'websocket_connections': len(self.websocket_connections),
|
|
'cob_integration': 'active' if self.cob_integration else 'inactive',
|
|
'last_updates': {}
|
|
}
|
|
|
|
# Add last update times
|
|
for symbol in self.symbols:
|
|
if symbol in self.update_timestamps and self.update_timestamps[symbol]:
|
|
status['last_updates'][symbol] = self.update_timestamps[symbol][-1].isoformat()
|
|
|
|
return web.json_response(status)
|
|
|
|
async def websocket_handler(self, request):
|
|
"""Handle WebSocket connections"""
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
|
|
# Add to connections
|
|
self.websocket_connections.add(ws)
|
|
logger.info(f"WebSocket connected. Total connections: {len(self.websocket_connections)}")
|
|
|
|
try:
|
|
# Send initial data
|
|
for symbol in self.symbols:
|
|
if symbol in self.latest_cob_data:
|
|
await self._send_websocket_data(ws, 'cob_update', symbol, self.latest_cob_data[symbol])
|
|
|
|
# Handle incoming messages
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
try:
|
|
data = json.loads(msg.data)
|
|
await self._handle_websocket_message(ws, data)
|
|
except json.JSONDecodeError:
|
|
await ws.send_str(json.dumps({
|
|
'type': 'error',
|
|
'message': 'Invalid JSON'
|
|
}))
|
|
elif msg.type == WSMsgType.ERROR:
|
|
logger.error(f'WebSocket error: {ws.exception()}')
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
|
|
finally:
|
|
# Remove from connections
|
|
self.websocket_connections.discard(ws)
|
|
logger.info(f"WebSocket disconnected. Remaining connections: {len(self.websocket_connections)}")
|
|
|
|
return ws
|
|
|
|
async def _handle_websocket_message(self, ws, data):
|
|
"""Handle incoming WebSocket messages"""
|
|
try:
|
|
message_type = data.get('type')
|
|
|
|
if message_type == 'subscribe':
|
|
symbol = data.get('symbol')
|
|
if symbol in self.symbols and symbol in self.latest_cob_data:
|
|
await self._send_websocket_data(ws, 'cob_update', symbol, self.latest_cob_data[symbol])
|
|
|
|
elif message_type == 'ping':
|
|
await ws.send_str(json.dumps({
|
|
'type': 'pong',
|
|
'timestamp': datetime.now().isoformat()
|
|
}))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling WebSocket message: {e}")
|
|
|
|
def _sync_cob_update_wrapper(self, symbol: str, data: Dict):
|
|
"""Sync wrapper for async COB update handler"""
|
|
try:
|
|
# Create async task to handle the update
|
|
asyncio.create_task(self._on_cob_update(symbol, data))
|
|
except Exception as e:
|
|
logger.error(f"Error in COB update wrapper for {symbol}: {e}")
|
|
|
|
async def _on_cob_update(self, symbol: str, data: Dict):
|
|
"""Handle COB updates from integration"""
|
|
try:
|
|
logger.debug(f"Received COB update for {symbol}")
|
|
|
|
# Process OHLCV data from mid price
|
|
await self._process_ohlcv_update(symbol, data)
|
|
|
|
# Update cache
|
|
self.latest_cob_data[symbol] = data
|
|
self.update_timestamps[symbol].append(datetime.now())
|
|
|
|
# Broadcast to WebSocket clients
|
|
await self._broadcast_cob_update(symbol, data)
|
|
|
|
logger.debug(f"Broadcasted COB update for {symbol} to {len(self.websocket_connections)} connections")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling COB update for {symbol}: {e}")
|
|
|
|
async def _process_ohlcv_update(self, symbol: str, data: Dict):
|
|
"""Process price updates into 1-second OHLCV candles"""
|
|
try:
|
|
stats = data.get('stats', {})
|
|
mid_price = stats.get('mid_price', 0)
|
|
|
|
if mid_price <= 0:
|
|
return
|
|
|
|
now = datetime.now()
|
|
current_second = now.replace(microsecond=0)
|
|
|
|
# Get or create current candle
|
|
if symbol not in self.current_candles:
|
|
self.current_candles[symbol] = {
|
|
'timestamp': current_second,
|
|
'open': mid_price,
|
|
'high': mid_price,
|
|
'low': mid_price,
|
|
'close': mid_price,
|
|
'volume': 0, # We don't have volume from order book, use tick count
|
|
'tick_count': 1
|
|
}
|
|
else:
|
|
current_candle = self.current_candles[symbol]
|
|
|
|
# Check if we need to close current candle and start new one
|
|
if current_second > current_candle['timestamp']:
|
|
# Close previous candle
|
|
finished_candle = {
|
|
'timestamp': current_candle['timestamp'].isoformat(),
|
|
'open': current_candle['open'],
|
|
'high': current_candle['high'],
|
|
'low': current_candle['low'],
|
|
'close': current_candle['close'],
|
|
'volume': current_candle['tick_count'], # Use tick count as volume
|
|
'tick_count': current_candle['tick_count']
|
|
}
|
|
|
|
# Add to OHLCV history
|
|
self.ohlcv_data[symbol].append(finished_candle)
|
|
|
|
# Start new candle
|
|
self.current_candles[symbol] = {
|
|
'timestamp': current_second,
|
|
'open': mid_price,
|
|
'high': mid_price,
|
|
'low': mid_price,
|
|
'close': mid_price,
|
|
'volume': 0,
|
|
'tick_count': 1
|
|
}
|
|
else:
|
|
# Update current candle
|
|
current_candle['high'] = max(current_candle['high'], mid_price)
|
|
current_candle['low'] = min(current_candle['low'], mid_price)
|
|
current_candle['close'] = mid_price
|
|
current_candle['tick_count'] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing OHLCV update for {symbol}: {e}")
|
|
|
|
async def _broadcast_cob_update(self, symbol: str, data: Dict):
|
|
"""Broadcast COB update to all connected WebSocket clients"""
|
|
if not self.websocket_connections:
|
|
return
|
|
|
|
# Add OHLCV data to the broadcast
|
|
enhanced_data = data.copy()
|
|
if symbol in self.ohlcv_data:
|
|
enhanced_data['ohlcv'] = list(self.ohlcv_data[symbol])
|
|
|
|
message = {
|
|
'type': 'cob_update',
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'data': enhanced_data
|
|
}
|
|
|
|
# Send to all connections
|
|
dead_connections = []
|
|
for ws in self.websocket_connections:
|
|
try:
|
|
await ws.send_str(json.dumps(message))
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send to WebSocket: {e}")
|
|
dead_connections.append(ws)
|
|
|
|
# Clean up dead connections
|
|
for ws in dead_connections:
|
|
self.websocket_connections.discard(ws)
|
|
|
|
async def _send_websocket_data(self, ws, msg_type: str, symbol: str, data: Dict):
|
|
"""Send data to a specific WebSocket connection"""
|
|
try:
|
|
message = {
|
|
'type': msg_type,
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'data': data
|
|
}
|
|
await ws.send_str(json.dumps(message))
|
|
except Exception as e:
|
|
logger.error(f"Error sending WebSocket data: {e}")
|
|
|
|
async def _generate_dashboard_data(self, symbol: str) -> Dict:
|
|
"""Generate dashboard data for a symbol"""
|
|
try:
|
|
# Return cached data from COB integration callbacks
|
|
if symbol in self.latest_cob_data:
|
|
return self.latest_cob_data[symbol]
|
|
else:
|
|
return self._get_empty_data(symbol)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating dashboard data for {symbol}: {e}")
|
|
return self._get_empty_data(symbol)
|
|
|
|
def _get_empty_data(self, symbol: str) -> Dict:
|
|
"""Get empty data structure"""
|
|
return {
|
|
'symbol': symbol,
|
|
'bids': [],
|
|
'asks': [],
|
|
'svp': {'data': []},
|
|
'ohlcv': [],
|
|
'stats': {
|
|
'mid_price': 0,
|
|
'spread_bps': 0,
|
|
'bid_liquidity': 0,
|
|
'ask_liquidity': 0,
|
|
'bid_levels': 0,
|
|
'ask_levels': 0,
|
|
'imbalance': 0
|
|
}
|
|
}
|
|
|
|
async def _periodic_stats_update(self):
|
|
"""Periodically update and broadcast statistics"""
|
|
while True:
|
|
try:
|
|
# Calculate update frequencies
|
|
update_frequencies = {}
|
|
for symbol in self.symbols:
|
|
if symbol in self.update_timestamps and len(self.update_timestamps[symbol]) > 1:
|
|
timestamps = list(self.update_timestamps[symbol])
|
|
if len(timestamps) >= 2:
|
|
time_diff = (timestamps[-1] - timestamps[-2]).total_seconds()
|
|
if time_diff > 0:
|
|
update_frequencies[symbol] = 1.0 / time_diff
|
|
|
|
# Broadcast stats if needed
|
|
if update_frequencies:
|
|
stats_message = {
|
|
'type': 'stats_update',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'update_frequencies': update_frequencies
|
|
}
|
|
|
|
for ws in list(self.websocket_connections):
|
|
try:
|
|
await ws.send_str(json.dumps(stats_message))
|
|
except Exception:
|
|
self.websocket_connections.discard(ws)
|
|
|
|
await asyncio.sleep(1) # Update every 1 second for real-time responsiveness
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in periodic stats update: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
async def _cleanup_old_data(self):
|
|
"""Clean up old data to prevent memory leaks"""
|
|
while True:
|
|
try:
|
|
cutoff_time = datetime.now() - timedelta(hours=1)
|
|
|
|
# Clean up old timestamps
|
|
for symbol in self.symbols:
|
|
if symbol in self.update_timestamps:
|
|
timestamps = self.update_timestamps[symbol]
|
|
while timestamps and timestamps[0] < cutoff_time:
|
|
timestamps.popleft()
|
|
|
|
await asyncio.sleep(300) # Clean up every 5 minutes
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup: {e}")
|
|
await asyncio.sleep(300)
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
logger.info("Starting COB Dashboard Server")
|
|
|
|
try:
|
|
# Windows event loop policy fix
|
|
if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'):
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
|
server = COBDashboardServer()
|
|
await server.start()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("COB Dashboard Server interrupted by user")
|
|
except Exception as e:
|
|
logger.error(f"COB Dashboard Server failed: {e}")
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
if 'server' in locals():
|
|
await server.stop()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |