324 lines
13 KiB
Python
324 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Real-time RL COB Trader Launcher
|
|
|
|
Launch script for the real-time reinforcement learning trader that:
|
|
1. Uses COB data for training a 1B parameter model
|
|
2. Performs inference every 200ms
|
|
3. Accumulates confident signals for trade execution
|
|
4. Trains continuously in real-time based on outcomes
|
|
|
|
This script provides a complete trading system integration.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
# Local imports
|
|
from core.realtime_rl_cob_trader import RealtimeRLCOBTrader
|
|
from core.trading_executor import TradingExecutor
|
|
from core.config import load_config
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('logs/realtime_rl_cob_trader.log'),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RealtimeRLCOBTraderLauncher:
|
|
"""
|
|
Launcher for Real-time RL COB Trader system
|
|
"""
|
|
|
|
def __init__(self, config_path: str = "config.yaml"):
|
|
"""Initialize launcher with configuration"""
|
|
self.config_path = config_path
|
|
self.config = load_config(config_path)
|
|
self.trader: Optional[RealtimeRLCOBTrader] = None
|
|
self.trading_executor: Optional[TradingExecutor] = None
|
|
self.running = False
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
logger.info("RealtimeRLCOBTraderLauncher initialized")
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Handle shutdown signals"""
|
|
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
|
|
self.running = False
|
|
|
|
async def start(self):
|
|
"""Start the real-time RL COB trading system"""
|
|
try:
|
|
logger.info("=" * 60)
|
|
logger.info("REAL-TIME RL COB TRADER SYSTEM STARTING")
|
|
logger.info("=" * 60)
|
|
|
|
# Initialize trading executor
|
|
await self._initialize_trading_executor()
|
|
|
|
# Initialize RL trader
|
|
await self._initialize_rl_trader()
|
|
|
|
# Start the trading system
|
|
await self._start_trading_system()
|
|
|
|
# Run main loop
|
|
await self._run_main_loop()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Critical error in trader launcher: {e}")
|
|
raise
|
|
finally:
|
|
await self.stop()
|
|
|
|
async def _initialize_trading_executor(self):
|
|
"""Initialize the trading executor"""
|
|
logger.info("Initializing Trading Executor...")
|
|
|
|
# Get trading configuration
|
|
trading_config = self.config.get('trading', {})
|
|
mexc_config = self.config.get('mexc', {})
|
|
|
|
# Determine if we should run in simulation mode
|
|
simulation_mode = mexc_config.get('simulation_mode', True)
|
|
|
|
if simulation_mode:
|
|
logger.info("Running in SIMULATION mode - no real trades will be executed")
|
|
else:
|
|
logger.warning("Running in LIVE TRADING mode - real money at risk!")
|
|
|
|
# Add safety confirmation for live trading
|
|
confirmation = input("Type 'CONFIRM_LIVE_TRADING' to proceed with live trading: ")
|
|
if confirmation != 'CONFIRM_LIVE_TRADING':
|
|
logger.info("Live trading not confirmed, switching to simulation mode")
|
|
simulation_mode = True
|
|
|
|
# Initialize trading executor
|
|
self.trading_executor = TradingExecutor(self.config_path)
|
|
|
|
logger.info(f"Trading Executor initialized in {'SIMULATION' if simulation_mode else 'LIVE'} mode")
|
|
|
|
async def _initialize_rl_trader(self):
|
|
"""Initialize the RL trader"""
|
|
logger.info("Initializing Real-time RL COB Trader...")
|
|
|
|
# Get RL configuration
|
|
rl_config = self.config.get('realtime_rl', {})
|
|
|
|
# Trading symbols
|
|
symbols = rl_config.get('symbols', ['BTC/USDT', 'ETH/USDT'])
|
|
|
|
# RL parameters
|
|
inference_interval_ms = rl_config.get('inference_interval_ms', 200)
|
|
min_confidence_threshold = rl_config.get('min_confidence_threshold', 0.7)
|
|
required_confident_predictions = rl_config.get('required_confident_predictions', 3)
|
|
model_checkpoint_dir = rl_config.get('model_checkpoint_dir', 'models/realtime_rl_cob')
|
|
|
|
# Initialize RL trader
|
|
if self.trading_executor is None:
|
|
raise RuntimeError("Trading executor not initialized")
|
|
|
|
self.trader = RealtimeRLCOBTrader(
|
|
symbols=symbols,
|
|
trading_executor=self.trading_executor,
|
|
model_checkpoint_dir=model_checkpoint_dir,
|
|
inference_interval_ms=inference_interval_ms,
|
|
min_confidence_threshold=min_confidence_threshold,
|
|
required_confident_predictions=required_confident_predictions
|
|
)
|
|
|
|
logger.info(f"RL Trader initialized for symbols: {symbols}")
|
|
logger.info(f"Inference interval: {inference_interval_ms}ms")
|
|
logger.info(f"Confidence threshold: {min_confidence_threshold}")
|
|
logger.info(f"Required predictions: {required_confident_predictions}")
|
|
|
|
async def _start_trading_system(self):
|
|
"""Start the complete trading system"""
|
|
logger.info("Starting Real-time RL COB Trading System...")
|
|
|
|
# Start RL trader (this will start COB integration internally)
|
|
if self.trader is None:
|
|
raise RuntimeError("RL trader not initialized")
|
|
await self.trader.start()
|
|
|
|
self.running = True
|
|
|
|
logger.info("✅ Real-time RL COB Trading System started successfully!")
|
|
logger.info("🔥 1B parameter model training and inference active")
|
|
logger.info("📊 COB data streaming and processing")
|
|
logger.info("🎯 Signal accumulation and trade execution ready")
|
|
logger.info("⚡ Real-time training on prediction outcomes")
|
|
|
|
async def _run_main_loop(self):
|
|
"""Main monitoring and statistics loop"""
|
|
logger.info("Starting main monitoring loop...")
|
|
|
|
last_stats_time = datetime.now()
|
|
stats_interval = 60 # Print stats every 60 seconds
|
|
|
|
while self.running:
|
|
try:
|
|
# Sleep for a bit
|
|
await asyncio.sleep(10)
|
|
|
|
# Print periodic statistics
|
|
current_time = datetime.now()
|
|
if (current_time - last_stats_time).total_seconds() >= stats_interval:
|
|
await self._print_performance_stats()
|
|
last_stats_time = current_time
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in main loop: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
logger.info("Main monitoring loop stopped")
|
|
|
|
async def _print_performance_stats(self):
|
|
"""Print comprehensive performance statistics"""
|
|
try:
|
|
if not self.trader:
|
|
return
|
|
|
|
stats = self.trader.get_performance_stats()
|
|
|
|
logger.info("=" * 80)
|
|
logger.info("🔥 REAL-TIME RL COB TRADER PERFORMANCE STATISTICS")
|
|
logger.info("=" * 80)
|
|
|
|
# Model information
|
|
logger.info("📊 Model Information:")
|
|
for symbol, model_info in stats.get('model_info', {}).items():
|
|
total_params = model_info.get('total_parameters', 0)
|
|
logger.info(f" {symbol}: {total_params:,} parameters ({total_params/1e9:.2f}B)")
|
|
|
|
# Training statistics
|
|
logger.info("\n🧠 Training Statistics:")
|
|
for symbol, training_stats in stats.get('training_stats', {}).items():
|
|
total_preds = training_stats.get('total_predictions', 0)
|
|
successful_preds = training_stats.get('successful_predictions', 0)
|
|
success_rate = (successful_preds / max(1, total_preds)) * 100
|
|
avg_loss = training_stats.get('average_loss', 0.0)
|
|
training_steps = training_stats.get('total_training_steps', 0)
|
|
last_training = training_stats.get('last_training_time')
|
|
|
|
logger.info(f" {symbol}:")
|
|
logger.info(f" Predictions: {total_preds} (Success: {success_rate:.1f}%)")
|
|
logger.info(f" Training Steps: {training_steps}")
|
|
logger.info(f" Average Loss: {avg_loss:.6f}")
|
|
if last_training:
|
|
logger.info(f" Last Training: {last_training}")
|
|
|
|
# Inference statistics
|
|
logger.info("\n⚡ Inference Statistics:")
|
|
for symbol, inference_stats in stats.get('inference_stats', {}).items():
|
|
total_inferences = inference_stats.get('total_inferences', 0)
|
|
avg_time = inference_stats.get('average_inference_time_ms', 0.0)
|
|
last_inference = inference_stats.get('last_inference_time')
|
|
|
|
logger.info(f" {symbol}:")
|
|
logger.info(f" Total Inferences: {total_inferences}")
|
|
logger.info(f" Average Time: {avg_time:.1f}ms")
|
|
if last_inference:
|
|
logger.info(f" Last Inference: {last_inference}")
|
|
|
|
# Signal statistics
|
|
logger.info("\n🎯 Signal Accumulation:")
|
|
for symbol, signal_stats in stats.get('signal_stats', {}).items():
|
|
current_signals = signal_stats.get('current_signals', 0)
|
|
confidence_sum = signal_stats.get('confidence_sum', 0.0)
|
|
success_rate = signal_stats.get('success_rate', 0.0) * 100
|
|
|
|
logger.info(f" {symbol}:")
|
|
logger.info(f" Current Signals: {current_signals}")
|
|
logger.info(f" Confidence Sum: {confidence_sum:.2f}")
|
|
logger.info(f" Historical Success Rate: {success_rate:.1f}%")
|
|
|
|
# Trading executor statistics
|
|
if self.trading_executor:
|
|
positions = self.trading_executor.get_positions()
|
|
trade_history = self.trading_executor.get_trade_history()
|
|
|
|
logger.info("\n💰 Trading Statistics:")
|
|
logger.info(f" Active Positions: {len(positions)}")
|
|
logger.info(f" Total Trades: {len(trade_history)}")
|
|
|
|
if trade_history:
|
|
# Calculate P&L statistics
|
|
total_pnl = sum(trade.pnl for trade in trade_history)
|
|
profitable_trades = sum(1 for trade in trade_history if trade.pnl > 0)
|
|
win_rate = (profitable_trades / len(trade_history)) * 100
|
|
|
|
logger.info(f" Total P&L: ${total_pnl:.2f}")
|
|
logger.info(f" Win Rate: {win_rate:.1f}%")
|
|
|
|
# Show active positions
|
|
if positions:
|
|
logger.info("\n📍 Active Positions:")
|
|
for symbol, position in positions.items():
|
|
logger.info(f" {symbol}: {position.side} {position.quantity:.6f} @ ${position.entry_price:.2f}")
|
|
|
|
logger.info("=" * 80)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error printing performance stats: {e}")
|
|
|
|
async def stop(self):
|
|
"""Stop the trading system gracefully"""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("Stopping Real-time RL COB Trading System...")
|
|
|
|
self.running = False
|
|
|
|
# Stop RL trader
|
|
if self.trader:
|
|
await self.trader.stop()
|
|
logger.info("✅ RL Trader stopped")
|
|
|
|
# Print final statistics
|
|
if self.trader:
|
|
logger.info("\n📊 Final Performance Summary:")
|
|
await self._print_performance_stats()
|
|
|
|
logger.info("Real-time RL COB Trading System stopped successfully")
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
try:
|
|
# Create logs directory if it doesn't exist
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
# Initialize and start launcher
|
|
launcher = RealtimeRLCOBTraderLauncher()
|
|
await launcher.start()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Received keyboard interrupt, shutting down...")
|
|
except Exception as e:
|
|
logger.error(f"Critical error: {e}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
# Set event loop policy for Windows compatibility
|
|
if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'):
|
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
|
|
|
|
asyncio.run(main()) |