cob dash integration

This commit is contained in:
Dobromir Popov
2025-06-24 15:53:11 +03:00
parent ec420c2a5f
commit 77a96030ba
2 changed files with 513 additions and 0 deletions

View File

@ -0,0 +1,513 @@
#!/usr/bin/env python3
"""
Integrated Real-time RL COB Trading System with Dashboard
This script starts both:
1. RealtimeRLCOBTrader - 1B parameter RL model with real-time training
2. COB Dashboard - Real-time visualization with RL predictions
The RL predictions are integrated into the dashboard for live visualization.
"""
import asyncio
import logging
import signal
import sys
import json
import os
from datetime import datetime
from typing import Dict, Any
from aiohttp import web
# Local imports
from core.realtime_rl_cob_trader import RealtimeRLCOBTrader, PredictionResult
from core.trading_executor import TradingExecutor
from core.config import load_config
from web.cob_realtime_dashboard import COBDashboardServer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/integrated_rl_cob_system.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class IntegratedRLCOBSystem:
"""
Integrated Real-time RL COB Trading System with Dashboard
"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialize integrated system with configuration"""
self.config = load_config(config_path)
self.trader = None
self.dashboard = None
self.trading_executor = None
self.running = False
# RL prediction storage for dashboard
self.rl_predictions: Dict[str, list] = {}
self.prediction_history: Dict[str, list] = {}
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
logger.info("IntegratedRLCOBSystem initialized")
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
self.running = False
async def start(self):
"""Start the integrated RL COB trading system with dashboard"""
try:
logger.info("=" * 60)
logger.info("INTEGRATED RL COB SYSTEM STARTING")
logger.info("🔥 Real-time RL Trading + Dashboard")
logger.info("=" * 60)
# Initialize trading executor
await self._initialize_trading_executor()
# Initialize RL trader with prediction callback
await self._initialize_rl_trader()
# Initialize dashboard with RL integration
await self._initialize_dashboard()
# Start the integrated system
await self._start_integrated_system()
# Run main loop
await self._run_main_loop()
except Exception as e:
logger.error(f"Critical error in integrated system: {e}")
raise
finally:
await self.stop()
async def _initialize_trading_executor(self):
"""Initialize the trading executor"""
logger.info("Initializing Trading Executor...")
# Get trading configuration
trading_config = self.config.get('trading', {})
mexc_config = self.config.get('mexc', {})
# Determine if we should run in simulation mode
simulation_mode = mexc_config.get('simulation_mode', True)
if simulation_mode:
logger.info("Running in SIMULATION mode - no real trades will be executed")
else:
logger.warning("Running in LIVE TRADING mode - real money at risk!")
# Add safety confirmation for live trading
confirmation = input("Type 'CONFIRM_LIVE_TRADING' to proceed with live trading: ")
if confirmation != 'CONFIRM_LIVE_TRADING':
logger.info("Live trading not confirmed, switching to simulation mode")
simulation_mode = True
# Initialize trading executor
self.trading_executor = TradingExecutor(
simulation_mode=simulation_mode,
mexc_config=mexc_config
)
logger.info(f"Trading Executor initialized in {'SIMULATION' if simulation_mode else 'LIVE'} mode")
async def _initialize_rl_trader(self):
"""Initialize the RL trader with prediction callbacks"""
logger.info("Initializing Real-time RL COB Trader...")
# Get RL configuration
rl_config = self.config.get('realtime_rl', {})
# Trading symbols
symbols = rl_config.get('symbols', ['BTC/USDT', 'ETH/USDT'])
# Initialize prediction storage
for symbol in symbols:
self.rl_predictions[symbol] = []
self.prediction_history[symbol] = []
# RL parameters
inference_interval_ms = rl_config.get('inference_interval_ms', 200)
min_confidence_threshold = rl_config.get('min_confidence_threshold', 0.7)
required_confident_predictions = rl_config.get('required_confident_predictions', 3)
model_checkpoint_dir = rl_config.get('model_checkpoint_dir', 'models/realtime_rl_cob')
# Initialize RL trader
self.trader = RealtimeRLCOBTrader(
symbols=symbols,
trading_executor=self.trading_executor,
model_checkpoint_dir=model_checkpoint_dir,
inference_interval_ms=inference_interval_ms,
min_confidence_threshold=min_confidence_threshold,
required_confident_predictions=required_confident_predictions
)
# Monkey-patch the trader to capture predictions
original_add_signal = self.trader._add_signal
def enhanced_add_signal(symbol: str, prediction: PredictionResult):
# Call original method
original_add_signal(symbol, prediction)
# Capture prediction for dashboard
self._on_rl_prediction(symbol, prediction)
self.trader._add_signal = enhanced_add_signal
logger.info(f"RL Trader initialized for symbols: {symbols}")
logger.info(f"Inference interval: {inference_interval_ms}ms")
logger.info(f"Confidence threshold: {min_confidence_threshold}")
logger.info(f"Required predictions: {required_confident_predictions}")
def _on_rl_prediction(self, symbol: str, prediction: PredictionResult):
"""Handle RL predictions for dashboard integration"""
try:
# Convert prediction to dashboard format
prediction_data = {
'timestamp': prediction.timestamp.isoformat(),
'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': prediction.confidence,
'predicted_change': prediction.predicted_change,
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction],
'color': ['red', 'gray', 'green'][prediction.predicted_direction]
}
# Add to current predictions (for live display)
self.rl_predictions[symbol].append(prediction_data)
if len(self.rl_predictions[symbol]) > 100: # Keep last 100
self.rl_predictions[symbol] = self.rl_predictions[symbol][-100:]
# Add to history (for chart overlay)
self.prediction_history[symbol].append(prediction_data)
if len(self.prediction_history[symbol]) > 1000: # Keep last 1000
self.prediction_history[symbol] = self.prediction_history[symbol][-1000:]
logger.debug(f"Captured RL prediction for {symbol}: {prediction.predicted_direction} "
f"(confidence: {prediction.confidence:.3f})")
except Exception as e:
logger.error(f"Error capturing RL prediction: {e}")
async def _initialize_dashboard(self):
"""Initialize the COB dashboard with RL integration"""
logger.info("Initializing COB Dashboard with RL Integration...")
# Get dashboard configuration
dashboard_config = self.config.get('dashboard', {})
host = dashboard_config.get('host', 'localhost')
port = dashboard_config.get('port', 8053)
# Create enhanced dashboard server
self.dashboard = EnhancedCOBDashboardServer(
host=host,
port=port,
rl_system=self # Pass reference to get predictions
)
logger.info(f"COB Dashboard initialized at http://{host}:{port}")
async def _start_integrated_system(self):
"""Start the complete integrated system"""
logger.info("Starting Integrated RL COB System...")
# Start RL trader first (this initializes COB integration)
await self.trader.start()
logger.info("✅ RL Trader started")
# Start dashboard (uses same COB integration)
await self.dashboard.start()
logger.info("✅ COB Dashboard started")
self.running = True
logger.info("🎉 INTEGRATED SYSTEM FULLY OPERATIONAL!")
logger.info("🔥 1B parameter RL model: ACTIVE")
logger.info("📊 Real-time COB data: STREAMING")
logger.info("🎯 Signal accumulation: ACTIVE")
logger.info("💹 Live predictions: VISIBLE IN DASHBOARD")
logger.info("⚡ Continuous training: ACTIVE")
logger.info(f"🌐 Dashboard URL: http://{self.dashboard.host}:{self.dashboard.port}")
async def _run_main_loop(self):
"""Main monitoring and statistics loop"""
logger.info("Starting integrated system monitoring...")
last_stats_time = datetime.now()
stats_interval = 60 # Print stats every 60 seconds
while self.running:
try:
# Sleep for a bit
await asyncio.sleep(10)
# Print periodic statistics
current_time = datetime.now()
if (current_time - last_stats_time).total_seconds() >= stats_interval:
await self._print_integrated_stats()
last_stats_time = current_time
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in main loop: {e}")
await asyncio.sleep(5)
logger.info("Integrated system monitoring stopped")
async def _print_integrated_stats(self):
"""Print comprehensive integrated system statistics"""
try:
logger.info("=" * 80)
logger.info("🔥 INTEGRATED RL COB SYSTEM STATISTICS")
logger.info("=" * 80)
# RL Trader Statistics
if self.trader:
rl_stats = self.trader.get_performance_stats()
logger.info("\n🤖 RL TRADER PERFORMANCE:")
for symbol in self.trader.symbols:
training_stats = rl_stats.get('training_stats', {}).get(symbol, {})
inference_stats = rl_stats.get('inference_stats', {}).get(symbol, {})
signal_stats = rl_stats.get('signal_stats', {}).get(symbol, {})
logger.info(f"\n 📈 {symbol}:")
logger.info(f" Predictions: {training_stats.get('total_predictions', 0)}")
logger.info(f" Success Rate: {signal_stats.get('success_rate', 0):.1%}")
logger.info(f" Avg Inference: {inference_stats.get('average_inference_time_ms', 0):.1f}ms")
logger.info(f" Current Signals: {signal_stats.get('current_signals', 0)}")
# RL prediction stats for dashboard
recent_predictions = len(self.rl_predictions.get(symbol, []))
total_predictions = len(self.prediction_history.get(symbol, []))
logger.info(f" Dashboard Predictions: {recent_predictions} recent, {total_predictions} total")
# Dashboard Statistics
if self.dashboard:
logger.info(f"\n🌐 DASHBOARD STATISTICS:")
logger.info(f" Active Connections: {len(self.dashboard.websocket_connections)}")
logger.info(f" Server Status: {'RUNNING' if self.dashboard.site else 'STOPPED'}")
logger.info(f" URL: http://{self.dashboard.host}:{self.dashboard.port}")
# Trading Executor Statistics
if self.trading_executor:
positions = self.trading_executor.get_positions()
trade_history = self.trading_executor.get_trade_history()
logger.info(f"\n💰 TRADING STATISTICS:")
logger.info(f" Active Positions: {len(positions)}")
logger.info(f" Total Trades: {len(trade_history)}")
if trade_history:
total_pnl = sum(trade.pnl for trade in trade_history)
profitable_trades = sum(1 for trade in trade_history if trade.pnl > 0)
win_rate = (profitable_trades / len(trade_history)) * 100
logger.info(f" Total P&L: ${total_pnl:.2f}")
logger.info(f" Win Rate: {win_rate:.1f}%")
logger.info("=" * 80)
except Exception as e:
logger.error(f"Error printing integrated stats: {e}")
async def stop(self):
"""Stop the integrated system gracefully"""
if not self.running:
return
logger.info("Stopping Integrated RL COB System...")
self.running = False
# Stop dashboard
if self.dashboard:
await self.dashboard.stop()
logger.info("✅ Dashboard stopped")
# Stop RL trader
if self.trader:
await self.trader.stop()
logger.info("✅ RL Trader stopped")
logger.info("🏁 Integrated system stopped successfully")
def get_rl_predictions(self, symbol: str) -> Dict[str, Any]:
"""Get RL predictions for dashboard display"""
return {
'recent_predictions': self.rl_predictions.get(symbol, []),
'prediction_history': self.prediction_history.get(symbol, []),
'total_predictions': len(self.prediction_history.get(symbol, [])),
'recent_count': len(self.rl_predictions.get(symbol, []))
}
class EnhancedCOBDashboardServer(COBDashboardServer):
"""Enhanced COB Dashboard with RL prediction integration"""
def __init__(self, host: str = 'localhost', port: int = 8053, rl_system: IntegratedRLCOBSystem = None):
super().__init__(host, port)
self.rl_system = rl_system
# Add RL prediction routes
self._setup_rl_routes()
logger.info("Enhanced COB Dashboard with RL predictions initialized")
async def serve_dashboard(self, request):
"""Serve the enhanced dashboard HTML with RL predictions"""
try:
# Read the enhanced dashboard HTML
dashboard_path = os.path.join(os.path.dirname(__file__), 'enhanced_cob_dashboard.html')
if os.path.exists(dashboard_path):
with open(dashboard_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return web.Response(text=html_content, content_type='text/html')
else:
# Fallback to basic dashboard
logger.warning("Enhanced dashboard HTML not found, using basic dashboard")
return await super().serve_dashboard(request)
except Exception as e:
logger.error(f"Error serving enhanced dashboard: {e}")
return web.Response(text="Dashboard error", status=500)
def _setup_rl_routes(self):
"""Setup additional routes for RL predictions"""
self.app.router.add_get('/api/rl-predictions/{symbol}', self.get_rl_predictions)
self.app.router.add_get('/api/rl-status', self.get_rl_status)
async def get_rl_predictions(self, request):
"""Get RL predictions for a symbol"""
try:
symbol = request.match_info['symbol']
symbol = symbol.replace('%2F', '/')
if symbol not in self.symbols:
return web.json_response({
'error': f'Symbol {symbol} not supported'
}, status=400)
if not self.rl_system:
return web.json_response({
'error': 'RL system not available'
}, status=503)
predictions = self.rl_system.get_rl_predictions(symbol)
return web.json_response({
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'predictions': predictions
})
except Exception as e:
logger.error(f"Error getting RL predictions: {e}")
return web.json_response({
'error': str(e)
}, status=500)
async def get_rl_status(self, request):
"""Get RL system status"""
try:
if not self.rl_system or not self.rl_system.trader:
return web.json_response({
'status': 'inactive',
'error': 'RL system not available'
})
rl_stats = self.rl_system.trader.get_performance_stats()
status = {
'status': 'active',
'symbols': self.rl_system.trader.symbols,
'model_info': rl_stats.get('model_info', {}),
'inference_interval_ms': self.rl_system.trader.inference_interval_ms,
'confidence_threshold': self.rl_system.trader.min_confidence_threshold,
'required_predictions': self.rl_system.trader.required_confident_predictions,
'device': str(self.rl_system.trader.device),
'running': self.rl_system.trader.running
}
return web.json_response(status)
except Exception as e:
logger.error(f"Error getting RL status: {e}")
return web.json_response({
'status': 'error',
'error': str(e)
}, status=500)
async def _broadcast_cob_update(self, symbol: str, data: Dict):
"""Enhanced COB update broadcast with RL predictions"""
try:
# Get RL predictions if available
rl_data = {}
if self.rl_system:
rl_predictions = self.rl_system.get_rl_predictions(symbol)
rl_data = {
'rl_predictions': rl_predictions.get('recent_predictions', [])[-10:], # Last 10
'prediction_count': rl_predictions.get('total_predictions', 0)
}
# Enhanced data with RL predictions
enhanced_data = {
**data,
'rl_data': rl_data
}
# Broadcast to all WebSocket connections
message = json.dumps({
'type': 'cob_update',
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'data': enhanced_data
}, default=str)
# Send to all connected clients
disconnected = []
for ws in self.websocket_connections:
try:
await ws.send_str(message)
except Exception as e:
logger.debug(f"WebSocket send failed: {e}")
disconnected.append(ws)
# Remove disconnected clients
for ws in disconnected:
self.websocket_connections.discard(ws)
except Exception as e:
logger.error(f"Error broadcasting enhanced COB update: {e}")
async def main():
"""Main entry point for integrated RL COB system"""
# Create logs directory
os.makedirs('logs', exist_ok=True)
# Initialize and start integrated system
system = IntegratedRLCOBSystem()
try:
await system.start()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
except Exception as e:
logger.error(f"System error: {e}")
raise
finally:
await system.stop()
if __name__ == "__main__":
asyncio.run(main())

Binary file not shown.