From 77a96030ba8d34d014e05d882393e89ad1f69062 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 24 Jun 2025 15:53:11 +0300 Subject: [PATCH] cob dash integration --- run_integrated_rl_cob_dashboard.py | 513 +++++++++++++++++++++++++++++ web/enhanced_cob_dashboard.html | Bin 0 -> 3298 bytes 2 files changed, 513 insertions(+) create mode 100644 run_integrated_rl_cob_dashboard.py create mode 100644 web/enhanced_cob_dashboard.html diff --git a/run_integrated_rl_cob_dashboard.py b/run_integrated_rl_cob_dashboard.py new file mode 100644 index 0000000..7f6731d --- /dev/null +++ b/run_integrated_rl_cob_dashboard.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +""" +Integrated Real-time RL COB Trading System with Dashboard + +This script starts both: +1. RealtimeRLCOBTrader - 1B parameter RL model with real-time training +2. COB Dashboard - Real-time visualization with RL predictions + +The RL predictions are integrated into the dashboard for live visualization. +""" + +import asyncio +import logging +import signal +import sys +import json +import os +from datetime import datetime +from typing import Dict, Any +from aiohttp import web + +# Local imports +from core.realtime_rl_cob_trader import RealtimeRLCOBTrader, PredictionResult +from core.trading_executor import TradingExecutor +from core.config import load_config +from web.cob_realtime_dashboard import COBDashboardServer + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/integrated_rl_cob_system.log'), + logging.StreamHandler(sys.stdout) + ] +) + +logger = logging.getLogger(__name__) + +class IntegratedRLCOBSystem: + """ + Integrated Real-time RL COB Trading System with Dashboard + """ + + def __init__(self, config_path: str = "config.yaml"): + """Initialize integrated system with configuration""" + self.config = load_config(config_path) + self.trader = None + self.dashboard = None + self.trading_executor = None + self.running = False + + # RL prediction storage for dashboard + self.rl_predictions: Dict[str, list] = {} + self.prediction_history: Dict[str, list] = {} + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + logger.info("IntegratedRLCOBSystem initialized") + + def _signal_handler(self, signum, frame): + """Handle shutdown signals""" + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + self.running = False + + async def start(self): + """Start the integrated RL COB trading system with dashboard""" + try: + logger.info("=" * 60) + logger.info("INTEGRATED RL COB SYSTEM STARTING") + logger.info("šŸ”„ Real-time RL Trading + Dashboard") + logger.info("=" * 60) + + # Initialize trading executor + await self._initialize_trading_executor() + + # Initialize RL trader with prediction callback + await self._initialize_rl_trader() + + # Initialize dashboard with RL integration + await self._initialize_dashboard() + + # Start the integrated system + await self._start_integrated_system() + + # Run main loop + await self._run_main_loop() + + except Exception as e: + logger.error(f"Critical error in integrated system: {e}") + raise + finally: + await self.stop() + + async def _initialize_trading_executor(self): + """Initialize the trading executor""" + logger.info("Initializing Trading Executor...") + + # Get trading configuration + trading_config = self.config.get('trading', {}) + mexc_config = self.config.get('mexc', {}) + + # Determine if we should run in simulation mode + simulation_mode = mexc_config.get('simulation_mode', True) + + if simulation_mode: + logger.info("Running in SIMULATION mode - no real trades will be executed") + else: + logger.warning("Running in LIVE TRADING mode - real money at risk!") + + # Add safety confirmation for live trading + confirmation = input("Type 'CONFIRM_LIVE_TRADING' to proceed with live trading: ") + if confirmation != 'CONFIRM_LIVE_TRADING': + logger.info("Live trading not confirmed, switching to simulation mode") + simulation_mode = True + + # Initialize trading executor + self.trading_executor = TradingExecutor( + simulation_mode=simulation_mode, + mexc_config=mexc_config + ) + + logger.info(f"Trading Executor initialized in {'SIMULATION' if simulation_mode else 'LIVE'} mode") + + async def _initialize_rl_trader(self): + """Initialize the RL trader with prediction callbacks""" + logger.info("Initializing Real-time RL COB Trader...") + + # Get RL configuration + rl_config = self.config.get('realtime_rl', {}) + + # Trading symbols + symbols = rl_config.get('symbols', ['BTC/USDT', 'ETH/USDT']) + + # Initialize prediction storage + for symbol in symbols: + self.rl_predictions[symbol] = [] + self.prediction_history[symbol] = [] + + # RL parameters + inference_interval_ms = rl_config.get('inference_interval_ms', 200) + min_confidence_threshold = rl_config.get('min_confidence_threshold', 0.7) + required_confident_predictions = rl_config.get('required_confident_predictions', 3) + model_checkpoint_dir = rl_config.get('model_checkpoint_dir', 'models/realtime_rl_cob') + + # Initialize RL trader + self.trader = RealtimeRLCOBTrader( + symbols=symbols, + trading_executor=self.trading_executor, + model_checkpoint_dir=model_checkpoint_dir, + inference_interval_ms=inference_interval_ms, + min_confidence_threshold=min_confidence_threshold, + required_confident_predictions=required_confident_predictions + ) + + # Monkey-patch the trader to capture predictions + original_add_signal = self.trader._add_signal + def enhanced_add_signal(symbol: str, prediction: PredictionResult): + # Call original method + original_add_signal(symbol, prediction) + # Capture prediction for dashboard + self._on_rl_prediction(symbol, prediction) + + self.trader._add_signal = enhanced_add_signal + + logger.info(f"RL Trader initialized for symbols: {symbols}") + logger.info(f"Inference interval: {inference_interval_ms}ms") + logger.info(f"Confidence threshold: {min_confidence_threshold}") + logger.info(f"Required predictions: {required_confident_predictions}") + + def _on_rl_prediction(self, symbol: str, prediction: PredictionResult): + """Handle RL predictions for dashboard integration""" + try: + # Convert prediction to dashboard format + prediction_data = { + 'timestamp': prediction.timestamp.isoformat(), + 'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP + 'confidence': prediction.confidence, + 'predicted_change': prediction.predicted_change, + 'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction], + 'color': ['red', 'gray', 'green'][prediction.predicted_direction] + } + + # Add to current predictions (for live display) + self.rl_predictions[symbol].append(prediction_data) + if len(self.rl_predictions[symbol]) > 100: # Keep last 100 + self.rl_predictions[symbol] = self.rl_predictions[symbol][-100:] + + # Add to history (for chart overlay) + self.prediction_history[symbol].append(prediction_data) + if len(self.prediction_history[symbol]) > 1000: # Keep last 1000 + self.prediction_history[symbol] = self.prediction_history[symbol][-1000:] + + logger.debug(f"Captured RL prediction for {symbol}: {prediction.predicted_direction} " + f"(confidence: {prediction.confidence:.3f})") + + except Exception as e: + logger.error(f"Error capturing RL prediction: {e}") + + async def _initialize_dashboard(self): + """Initialize the COB dashboard with RL integration""" + logger.info("Initializing COB Dashboard with RL Integration...") + + # Get dashboard configuration + dashboard_config = self.config.get('dashboard', {}) + host = dashboard_config.get('host', 'localhost') + port = dashboard_config.get('port', 8053) + + # Create enhanced dashboard server + self.dashboard = EnhancedCOBDashboardServer( + host=host, + port=port, + rl_system=self # Pass reference to get predictions + ) + + logger.info(f"COB Dashboard initialized at http://{host}:{port}") + + async def _start_integrated_system(self): + """Start the complete integrated system""" + logger.info("Starting Integrated RL COB System...") + + # Start RL trader first (this initializes COB integration) + await self.trader.start() + logger.info("āœ… RL Trader started") + + # Start dashboard (uses same COB integration) + await self.dashboard.start() + logger.info("āœ… COB Dashboard started") + + self.running = True + + logger.info("šŸŽ‰ INTEGRATED SYSTEM FULLY OPERATIONAL!") + logger.info("šŸ”„ 1B parameter RL model: ACTIVE") + logger.info("šŸ“Š Real-time COB data: STREAMING") + logger.info("šŸŽÆ Signal accumulation: ACTIVE") + logger.info("šŸ’¹ Live predictions: VISIBLE IN DASHBOARD") + logger.info("⚔ Continuous training: ACTIVE") + logger.info(f"🌐 Dashboard URL: http://{self.dashboard.host}:{self.dashboard.port}") + + async def _run_main_loop(self): + """Main monitoring and statistics loop""" + logger.info("Starting integrated system monitoring...") + + last_stats_time = datetime.now() + stats_interval = 60 # Print stats every 60 seconds + + while self.running: + try: + # Sleep for a bit + await asyncio.sleep(10) + + # Print periodic statistics + current_time = datetime.now() + if (current_time - last_stats_time).total_seconds() >= stats_interval: + await self._print_integrated_stats() + last_stats_time = current_time + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in main loop: {e}") + await asyncio.sleep(5) + + logger.info("Integrated system monitoring stopped") + + async def _print_integrated_stats(self): + """Print comprehensive integrated system statistics""" + try: + logger.info("=" * 80) + logger.info("šŸ”„ INTEGRATED RL COB SYSTEM STATISTICS") + logger.info("=" * 80) + + # RL Trader Statistics + if self.trader: + rl_stats = self.trader.get_performance_stats() + logger.info("\nšŸ¤– RL TRADER PERFORMANCE:") + + for symbol in self.trader.symbols: + training_stats = rl_stats.get('training_stats', {}).get(symbol, {}) + inference_stats = rl_stats.get('inference_stats', {}).get(symbol, {}) + signal_stats = rl_stats.get('signal_stats', {}).get(symbol, {}) + + logger.info(f"\n šŸ“ˆ {symbol}:") + logger.info(f" Predictions: {training_stats.get('total_predictions', 0)}") + logger.info(f" Success Rate: {signal_stats.get('success_rate', 0):.1%}") + logger.info(f" Avg Inference: {inference_stats.get('average_inference_time_ms', 0):.1f}ms") + logger.info(f" Current Signals: {signal_stats.get('current_signals', 0)}") + + # RL prediction stats for dashboard + recent_predictions = len(self.rl_predictions.get(symbol, [])) + total_predictions = len(self.prediction_history.get(symbol, [])) + logger.info(f" Dashboard Predictions: {recent_predictions} recent, {total_predictions} total") + + # Dashboard Statistics + if self.dashboard: + logger.info(f"\n🌐 DASHBOARD STATISTICS:") + logger.info(f" Active Connections: {len(self.dashboard.websocket_connections)}") + logger.info(f" Server Status: {'RUNNING' if self.dashboard.site else 'STOPPED'}") + logger.info(f" URL: http://{self.dashboard.host}:{self.dashboard.port}") + + # Trading Executor Statistics + if self.trading_executor: + positions = self.trading_executor.get_positions() + trade_history = self.trading_executor.get_trade_history() + + logger.info(f"\nšŸ’° TRADING STATISTICS:") + logger.info(f" Active Positions: {len(positions)}") + logger.info(f" Total Trades: {len(trade_history)}") + + if trade_history: + total_pnl = sum(trade.pnl for trade in trade_history) + profitable_trades = sum(1 for trade in trade_history if trade.pnl > 0) + win_rate = (profitable_trades / len(trade_history)) * 100 + + logger.info(f" Total P&L: ${total_pnl:.2f}") + logger.info(f" Win Rate: {win_rate:.1f}%") + + logger.info("=" * 80) + + except Exception as e: + logger.error(f"Error printing integrated stats: {e}") + + async def stop(self): + """Stop the integrated system gracefully""" + if not self.running: + return + + logger.info("Stopping Integrated RL COB System...") + + self.running = False + + # Stop dashboard + if self.dashboard: + await self.dashboard.stop() + logger.info("āœ… Dashboard stopped") + + # Stop RL trader + if self.trader: + await self.trader.stop() + logger.info("āœ… RL Trader stopped") + + logger.info("šŸ Integrated system stopped successfully") + + def get_rl_predictions(self, symbol: str) -> Dict[str, Any]: + """Get RL predictions for dashboard display""" + return { + 'recent_predictions': self.rl_predictions.get(symbol, []), + 'prediction_history': self.prediction_history.get(symbol, []), + 'total_predictions': len(self.prediction_history.get(symbol, [])), + 'recent_count': len(self.rl_predictions.get(symbol, [])) + } + +class EnhancedCOBDashboardServer(COBDashboardServer): + """Enhanced COB Dashboard with RL prediction integration""" + + def __init__(self, host: str = 'localhost', port: int = 8053, rl_system: IntegratedRLCOBSystem = None): + super().__init__(host, port) + self.rl_system = rl_system + + # Add RL prediction routes + self._setup_rl_routes() + + logger.info("Enhanced COB Dashboard with RL predictions initialized") + + async def serve_dashboard(self, request): + """Serve the enhanced dashboard HTML with RL predictions""" + try: + # Read the enhanced dashboard HTML + dashboard_path = os.path.join(os.path.dirname(__file__), 'enhanced_cob_dashboard.html') + + if os.path.exists(dashboard_path): + with open(dashboard_path, 'r', encoding='utf-8') as f: + html_content = f.read() + return web.Response(text=html_content, content_type='text/html') + else: + # Fallback to basic dashboard + logger.warning("Enhanced dashboard HTML not found, using basic dashboard") + return await super().serve_dashboard(request) + + except Exception as e: + logger.error(f"Error serving enhanced dashboard: {e}") + return web.Response(text="Dashboard error", status=500) + + def _setup_rl_routes(self): + """Setup additional routes for RL predictions""" + self.app.router.add_get('/api/rl-predictions/{symbol}', self.get_rl_predictions) + self.app.router.add_get('/api/rl-status', self.get_rl_status) + + async def get_rl_predictions(self, request): + """Get RL predictions for a symbol""" + try: + symbol = request.match_info['symbol'] + symbol = symbol.replace('%2F', '/') + + if symbol not in self.symbols: + return web.json_response({ + 'error': f'Symbol {symbol} not supported' + }, status=400) + + if not self.rl_system: + return web.json_response({ + 'error': 'RL system not available' + }, status=503) + + predictions = self.rl_system.get_rl_predictions(symbol) + + return web.json_response({ + 'symbol': symbol, + 'timestamp': datetime.now().isoformat(), + 'predictions': predictions + }) + + except Exception as e: + logger.error(f"Error getting RL predictions: {e}") + return web.json_response({ + 'error': str(e) + }, status=500) + + async def get_rl_status(self, request): + """Get RL system status""" + try: + if not self.rl_system or not self.rl_system.trader: + return web.json_response({ + 'status': 'inactive', + 'error': 'RL system not available' + }) + + rl_stats = self.rl_system.trader.get_performance_stats() + + status = { + 'status': 'active', + 'symbols': self.rl_system.trader.symbols, + 'model_info': rl_stats.get('model_info', {}), + 'inference_interval_ms': self.rl_system.trader.inference_interval_ms, + 'confidence_threshold': self.rl_system.trader.min_confidence_threshold, + 'required_predictions': self.rl_system.trader.required_confident_predictions, + 'device': str(self.rl_system.trader.device), + 'running': self.rl_system.trader.running + } + + return web.json_response(status) + + except Exception as e: + logger.error(f"Error getting RL status: {e}") + return web.json_response({ + 'status': 'error', + 'error': str(e) + }, status=500) + + async def _broadcast_cob_update(self, symbol: str, data: Dict): + """Enhanced COB update broadcast with RL predictions""" + try: + # Get RL predictions if available + rl_data = {} + if self.rl_system: + rl_predictions = self.rl_system.get_rl_predictions(symbol) + rl_data = { + 'rl_predictions': rl_predictions.get('recent_predictions', [])[-10:], # Last 10 + 'prediction_count': rl_predictions.get('total_predictions', 0) + } + + # Enhanced data with RL predictions + enhanced_data = { + **data, + 'rl_data': rl_data + } + + # Broadcast to all WebSocket connections + message = json.dumps({ + 'type': 'cob_update', + 'symbol': symbol, + 'timestamp': datetime.now().isoformat(), + 'data': enhanced_data + }, default=str) + + # Send to all connected clients + disconnected = [] + for ws in self.websocket_connections: + try: + await ws.send_str(message) + except Exception as e: + logger.debug(f"WebSocket send failed: {e}") + disconnected.append(ws) + + # Remove disconnected clients + for ws in disconnected: + self.websocket_connections.discard(ws) + + except Exception as e: + logger.error(f"Error broadcasting enhanced COB update: {e}") + +async def main(): + """Main entry point for integrated RL COB system""" + # Create logs directory + os.makedirs('logs', exist_ok=True) + + # Initialize and start integrated system + system = IntegratedRLCOBSystem() + + try: + await system.start() + except KeyboardInterrupt: + logger.info("Received keyboard interrupt") + except Exception as e: + logger.error(f"System error: {e}") + raise + finally: + await system.stop() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/web/enhanced_cob_dashboard.html b/web/enhanced_cob_dashboard.html new file mode 100644 index 0000000000000000000000000000000000000000..04f29eb749facedd018170654db25b7e7295f179 GIT binary patch literal 3298 zcmchaZA%+L6ov0+q5olMIu)9XB_}AM$XC~vu zWZgz6Maa(1%zZoO%-!+#ua-TxUHfQlJG9TXZ*MHJQ!DJ;&TPx(tabkuG1IZWo!F)| ztjDNvzoW-W;@%?c_1D5f4?i8oU3_#bv)s+d`V-OKTGL*Sd{kNWv5^n$A6m-j$_93U zpMn{AyzdHrhF6}u9~r@ST1%c{$>IkvKhWr{`0zLW@Otmf>}NU>Hdw&kg;w6}0O zwm_)w>ghY*3H}ajkKZ;Z?(jS$L!qH4C%o_SmSb^5zCs}Znap5|$0_lHid&AY z#BL@ut*nNQmx48~mkCzFUPXCZ8=L3;!X% zIhLW`fn!2lqU;ula+KI^Lqi(^YHUG4NiAJeM{60)wvQpEWlowOF$@LPZy=> zE1(`96HmSEjmsd+?x2kKcu^(~)wzI$&b~TgX7A|CA6%#21oK8kU)G8Acn-0MIEkUR z?^og-5!Zi#PFUBe@SMrY_mUgpR8=-zt9rWkSALmN`|0vHa+(y^<+rq`z$-AMFNf;5 z=BZkrj$^gzg{Nnr4Ku#-YvKs`)fyg!{}p+ZCun|@pZeZyMI}K2I`sprb(R~jE;f6N z7He$$x1{52>U@V7y44)v^$hhW&R6?Z>p$3qIQ~`>Y-foWJd}}reNvgfvD8->?87r; z@|5g@t!^c{ljwwqRh`Rhg!c>AJrla4Z(SMI8{b#Id-#l@ogjIU^ z>S(lPYi_iR=2qwrjT**7Eq5?CeX`2cI@MgDvv7Vrm$NXbE}Ubj(~!fR`i592sQQv| zUBr;rIS5GC`mXA^?L79}2{~ZSXC<$?KS-ss`7Lj?v)2=H^7oTlV(Mg^jPfe%3vc4{ lVK$N7)!-g{*6pcUyPNnbRNYxMCk2O_n!m?||6ZPv)gLn=&+q^M literal 0 HcmV?d00001