From 6d55061e8676236d6360ebca815b32509e34b531 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Thu, 17 Jul 2025 02:51:20 +0300 Subject: [PATCH] wip training --- audit_training_system.py | 0 core/overnight_training_coordinator.py | 710 +++++++++++++++++++++++++ start_overnight_training.py | 179 +++++++ web/clean_dashboard.py | 51 ++ 4 files changed, 940 insertions(+) create mode 100644 audit_training_system.py create mode 100644 core/overnight_training_coordinator.py create mode 100644 start_overnight_training.py diff --git a/audit_training_system.py b/audit_training_system.py new file mode 100644 index 0000000..e69de29 diff --git a/core/overnight_training_coordinator.py b/core/overnight_training_coordinator.py new file mode 100644 index 0000000..9049794 --- /dev/null +++ b/core/overnight_training_coordinator.py @@ -0,0 +1,710 @@ +""" +Overnight Training Coordinator + +This module coordinates comprehensive training for CNN and COB RL models during overnight sessions. +It ensures that: +1. Training passes occur on each signal when predictions change +2. Trades are executed and recorded in simulation mode +3. Performance statistics are tracked and logged +4. Models learn from both successful and unsuccessful trades +""" + +import logging +import time +import threading +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Tuple +from dataclasses import dataclass, field +from collections import deque +import numpy as np +import json +import os + +logger = logging.getLogger(__name__) + +@dataclass +class TrainingSession: + """Represents a training session for a model""" + model_name: str + symbol: str + start_time: datetime + end_time: Optional[datetime] = None + training_samples: int = 0 + initial_loss: Optional[float] = None + final_loss: Optional[float] = None + improvement: Optional[float] = None + trades_executed: int = 0 + successful_trades: int = 0 + total_pnl: float = 0.0 + +@dataclass +class SignalTradeRecord: + """Records a signal and its corresponding trade execution""" + timestamp: datetime + symbol: str + signal_action: str + signal_confidence: float + model_source: str + executed: bool = False + execution_price: Optional[float] = None + trade_pnl: Optional[float] = None + training_triggered: bool = False + training_loss: Optional[float] = None + +class OvernightTrainingCoordinator: + """ + Coordinates comprehensive overnight training for all models + """ + + def __init__(self, orchestrator, data_provider, trading_executor, dashboard=None): + self.orchestrator = orchestrator + self.data_provider = data_provider + self.trading_executor = trading_executor + self.dashboard = dashboard + + # Training configuration + self.config = { + 'training_on_signal_change': True, # Train when prediction changes + 'min_confidence_for_trade': 0.3, # Minimum confidence to execute trade + 'max_trades_per_hour': 20, # Rate limiting + 'training_batch_size': 32, # Training batch size + 'performance_tracking_window': 100, # Number of trades to track for performance + 'model_checkpoint_interval': 50, # Save checkpoints every N trades + } + + # State tracking + self.is_running = False + self.training_thread = None + self.last_predictions: Dict[str, Dict[str, Any]] = {} # {symbol: {model: prediction}} + self.signal_trade_records: deque = deque(maxlen=1000) + self.training_sessions: Dict[str, TrainingSession] = {} + + # Performance tracking + self.performance_stats = { + 'total_signals': 0, + 'total_trades': 0, + 'successful_trades': 0, + 'total_pnl': 0.0, + 'training_sessions': 0, + 'models_trained': set(), + 'hourly_stats': deque(maxlen=24) # Last 24 hours + } + + # Rate limiting + self.last_trade_time: Dict[str, datetime] = {} + self.trades_this_hour: Dict[str, int] = {} + self.hour_reset_time = datetime.now().replace(minute=0, second=0, microsecond=0) + + logger.info("Overnight Training Coordinator initialized") + + def start_overnight_training(self): + """Start the overnight training session""" + if self.is_running: + logger.warning("Training coordinator already running") + return + + self.is_running = True + self.training_thread = threading.Thread(target=self._training_loop, daemon=True) + self.training_thread.start() + + logger.info("šŸŒ™ OVERNIGHT TRAINING SESSION STARTED") + logger.info("=" * 60) + logger.info("Features enabled:") + logger.info("āœ… CNN training on signal changes") + logger.info("āœ… COB RL training on market microstructure") + logger.info("āœ… Trade execution and recording") + logger.info("āœ… Performance tracking and statistics") + logger.info("āœ… Model checkpointing") + logger.info("=" * 60) + + def stop_overnight_training(self): + """Stop the overnight training session""" + self.is_running = False + if self.training_thread: + self.training_thread.join(timeout=10) + + # Generate final report + self._generate_training_report() + + logger.info("šŸŒ… OVERNIGHT TRAINING SESSION COMPLETED") + + def _training_loop(self): + """Main training loop that monitors signals and triggers training""" + while self.is_running: + try: + # Reset hourly counters if needed + self._reset_hourly_counters() + + # Process signals from orchestrator + self._process_orchestrator_signals() + + # Check for model training opportunities + self._check_training_opportunities() + + # Update performance statistics + self._update_performance_stats() + + # Sleep briefly to avoid overwhelming the system + time.sleep(0.5) + + except Exception as e: + logger.error(f"Error in training loop: {e}") + time.sleep(5) + + def _process_orchestrator_signals(self): + """Process signals from the orchestrator and trigger training/trading""" + try: + # Get recent decisions from orchestrator + if not hasattr(self.orchestrator, 'recent_decisions'): + return + + for symbol in self.orchestrator.symbols: + if symbol not in self.orchestrator.recent_decisions: + continue + + recent_decisions = self.orchestrator.recent_decisions[symbol] + if not recent_decisions: + continue + + # Get the latest decision + latest_decision = recent_decisions[-1] + + # Check if this is a new signal that requires processing + if self._is_new_signal_requiring_action(symbol, latest_decision): + self._process_new_signal(symbol, latest_decision) + + except Exception as e: + logger.error(f"Error processing orchestrator signals: {e}") + + def _is_new_signal_requiring_action(self, symbol: str, decision) -> bool: + """Check if this signal requires training or trading action""" + try: + # Get current prediction for comparison + current_action = decision.action + current_confidence = decision.confidence + current_time = decision.timestamp + + # Check if we have a previous prediction for this symbol + if symbol not in self.last_predictions: + self.last_predictions[symbol] = {} + + # Check if prediction has changed significantly + last_action = self.last_predictions[symbol].get('action') + last_confidence = self.last_predictions[symbol].get('confidence', 0.0) + last_time = self.last_predictions[symbol].get('timestamp') + + # Determine if action is required + action_changed = last_action != current_action + confidence_changed = abs(current_confidence - last_confidence) > 0.1 + time_elapsed = not last_time or (current_time - last_time).total_seconds() > 30 + + # Update last prediction + self.last_predictions[symbol] = { + 'action': current_action, + 'confidence': current_confidence, + 'timestamp': current_time + } + + return action_changed or confidence_changed or time_elapsed + + except Exception as e: + logger.error(f"Error checking if signal requires action: {e}") + return False + + def _process_new_signal(self, symbol: str, decision): + """Process a new signal by triggering training and potentially executing trade""" + try: + signal_record = SignalTradeRecord( + timestamp=decision.timestamp, + symbol=symbol, + signal_action=decision.action, + signal_confidence=decision.confidence, + model_source=getattr(decision, 'reasoning', {}).get('primary_model', 'orchestrator') + ) + + # 1. Trigger training on signal change + if self.config['training_on_signal_change']: + training_loss = self._trigger_model_training(symbol, decision) + signal_record.training_triggered = True + signal_record.training_loss = training_loss + + # 2. Execute trade if confidence is sufficient + if (decision.confidence >= self.config['min_confidence_for_trade'] and + decision.action in ['BUY', 'SELL'] and + self._can_execute_trade(symbol)): + + trade_executed, execution_price, trade_pnl = self._execute_signal_trade(symbol, decision) + signal_record.executed = trade_executed + signal_record.execution_price = execution_price + signal_record.trade_pnl = trade_pnl + + # Update performance stats + self.performance_stats['total_trades'] += 1 + if trade_pnl and trade_pnl > 0: + self.performance_stats['successful_trades'] += 1 + if trade_pnl: + self.performance_stats['total_pnl'] += trade_pnl + + # 3. Record the signal + self.signal_trade_records.append(signal_record) + self.performance_stats['total_signals'] += 1 + + # 4. Log the action + status = "EXECUTED" if signal_record.executed else "SIGNAL_ONLY" + logger.info(f"[{status}] {symbol} {decision.action} " + f"(conf: {decision.confidence:.3f}, " + f"training: {'āœ…' if signal_record.training_triggered else 'āŒ'}, " + f"pnl: {signal_record.trade_pnl:.2f if signal_record.trade_pnl else 'N/A'})") + + except Exception as e: + logger.error(f"Error processing new signal for {symbol}: {e}") + + def _trigger_model_training(self, symbol: str, decision) -> Optional[float]: + """Trigger training for all relevant models""" + try: + training_losses = [] + + # 1. Train CNN model + if hasattr(self.orchestrator, 'cnn_model') and self.orchestrator.cnn_model: + cnn_loss = self._train_cnn_model(symbol, decision) + if cnn_loss is not None: + training_losses.append(cnn_loss) + self.performance_stats['models_trained'].add('CNN') + + # 2. Train COB RL model + if hasattr(self.orchestrator, 'cob_rl_agent') and self.orchestrator.cob_rl_agent: + cob_rl_loss = self._train_cob_rl_model(symbol, decision) + if cob_rl_loss is not None: + training_losses.append(cob_rl_loss) + self.performance_stats['models_trained'].add('COB_RL') + + # 3. Train DQN model + if hasattr(self.orchestrator, 'rl_agent') and self.orchestrator.rl_agent: + dqn_loss = self._train_dqn_model(symbol, decision) + if dqn_loss is not None: + training_losses.append(dqn_loss) + self.performance_stats['models_trained'].add('DQN') + + # Return average loss + return np.mean(training_losses) if training_losses else None + + except Exception as e: + logger.error(f"Error triggering model training: {e}") + return None + + def _train_cnn_model(self, symbol: str, decision) -> Optional[float]: + """Train CNN model on current market data""" + try: + # Get market data for training + df = self.data_provider.get_historical_data(symbol, '1m', limit=100) + if df is None or len(df) < 50: + return None + + # Prepare training data + features = self._prepare_cnn_features(df) + target = self._prepare_cnn_target(decision) + + if features is None or target is None: + return None + + # Train the model + if hasattr(self.orchestrator.cnn_model, 'train_on_batch'): + loss = self.orchestrator.cnn_model.train_on_batch(features, target) + logger.debug(f"CNN training loss for {symbol}: {loss:.4f}") + return loss + + return None + + except Exception as e: + logger.error(f"Error training CNN model: {e}") + return None + + def _train_cob_rl_model(self, symbol: str, decision) -> Optional[float]: + """Train COB RL model on market microstructure data""" + try: + # Get COB data if available + if not hasattr(self.dashboard, 'latest_cob_data') or symbol not in self.dashboard.latest_cob_data: + return None + + cob_data = self.dashboard.latest_cob_data[symbol] + + # Prepare COB features + features = self._prepare_cob_features(cob_data) + reward = self._calculate_cob_reward(decision) + + if features is None: + return None + + # Train the model + if hasattr(self.orchestrator.cob_rl_agent, 'train'): + loss = self.orchestrator.cob_rl_agent.train(features, reward) + logger.debug(f"COB RL training loss for {symbol}: {loss:.4f}") + return loss + + return None + + except Exception as e: + logger.error(f"Error training COB RL model: {e}") + return None + + def _train_dqn_model(self, symbol: str, decision) -> Optional[float]: + """Train DQN model on trading decision""" + try: + # Get state features + state_features = self._prepare_dqn_state(symbol) + action = self._map_action_to_index(decision.action) + reward = decision.confidence # Use confidence as immediate reward + + if state_features is None: + return None + + # Add experience to replay buffer + if hasattr(self.orchestrator.rl_agent, 'remember'): + # We'll use a dummy next_state for now + next_state = state_features # Simplified + done = False + self.orchestrator.rl_agent.remember(state_features, action, reward, next_state, done) + + # Train if we have enough experiences + if hasattr(self.orchestrator.rl_agent, 'replay'): + loss = self.orchestrator.rl_agent.replay() + if loss is not None: + logger.debug(f"DQN training loss for {symbol}: {loss:.4f}") + return loss + + return None + + except Exception as e: + logger.error(f"Error training DQN model: {e}") + return None + + def _execute_signal_trade(self, symbol: str, decision) -> Tuple[bool, Optional[float], Optional[float]]: + """Execute a trade based on the signal""" + try: + if not self.trading_executor: + return False, None, None + + # Get current price + current_price = self.data_provider.get_current_price(symbol) + if not current_price: + return False, None, None + + # Execute the trade + success = self.trading_executor.execute_signal( + symbol=symbol, + action=decision.action, + confidence=decision.confidence, + current_price=current_price + ) + + if success: + # Calculate PnL (simplified - in real implementation this would be more complex) + trade_pnl = self._calculate_trade_pnl(symbol, decision.action, current_price) + + # Update rate limiting + self.last_trade_time[symbol] = datetime.now() + if symbol not in self.trades_this_hour: + self.trades_this_hour[symbol] = 0 + self.trades_this_hour[symbol] += 1 + + return True, current_price, trade_pnl + + return False, None, None + + except Exception as e: + logger.error(f"Error executing signal trade: {e}") + return False, None, None + + def _can_execute_trade(self, symbol: str) -> bool: + """Check if we can execute a trade based on rate limiting""" + try: + # Check hourly limit + if symbol in self.trades_this_hour: + if self.trades_this_hour[symbol] >= self.config['max_trades_per_hour']: + return False + + # Check minimum time between trades (30 seconds) + if symbol in self.last_trade_time: + time_since_last = (datetime.now() - self.last_trade_time[symbol]).total_seconds() + if time_since_last < 30: + return False + + return True + + except Exception as e: + logger.error(f"Error checking if can execute trade: {e}") + return False + + def _prepare_cnn_features(self, df) -> Optional[np.ndarray]: + """Prepare features for CNN training""" + try: + # Use OHLCV data as features + features = df[['open', 'high', 'low', 'close', 'volume']].values + + # Normalize features + features = (features - features.mean(axis=0)) / (features.std(axis=0) + 1e-8) + + # Reshape for CNN (add batch and channel dimensions) + features = features.reshape(1, features.shape[0], features.shape[1]) + + return features.astype(np.float32) + + except Exception as e: + logger.error(f"Error preparing CNN features: {e}") + return None + + def _prepare_cnn_target(self, decision) -> Optional[np.ndarray]: + """Prepare target for CNN training""" + try: + # Map action to target + action_map = {'BUY': [1, 0, 0], 'SELL': [0, 1, 0], 'HOLD': [0, 0, 1]} + target = action_map.get(decision.action, [0, 0, 1]) + + return np.array([target], dtype=np.float32) + + except Exception as e: + logger.error(f"Error preparing CNN target: {e}") + return None + + def _prepare_cob_features(self, cob_data) -> Optional[np.ndarray]: + """Prepare COB features for training""" + try: + # Extract key COB features + features = [] + + # Order book imbalance + imbalance = cob_data.get('stats', {}).get('imbalance', 0) + features.append(imbalance) + + # Bid/Ask liquidity + bid_liquidity = cob_data.get('stats', {}).get('bid_liquidity', 0) + ask_liquidity = cob_data.get('stats', {}).get('ask_liquidity', 0) + features.extend([bid_liquidity, ask_liquidity]) + + # Spread + spread = cob_data.get('stats', {}).get('spread_bps', 0) + features.append(spread) + + # Pad to expected size (2000 features for COB RL) + while len(features) < 2000: + features.append(0.0) + + return np.array(features[:2000], dtype=np.float32) + + except Exception as e: + logger.error(f"Error preparing COB features: {e}") + return None + + def _calculate_cob_reward(self, decision) -> float: + """Calculate reward for COB RL training""" + try: + # Use confidence as base reward + base_reward = decision.confidence + + # Adjust based on action + if decision.action in ['BUY', 'SELL']: + return base_reward + else: + return base_reward * 0.1 # Lower reward for HOLD + + except Exception as e: + logger.error(f"Error calculating COB reward: {e}") + return 0.0 + + def _prepare_dqn_state(self, symbol: str) -> Optional[np.ndarray]: + """Prepare state features for DQN training""" + try: + # Get market data + df = self.data_provider.get_historical_data(symbol, '1m', limit=50) + if df is None or len(df) < 10: + return None + + # Prepare basic features + features = [] + + # Price features + close_prices = df['close'].values + features.extend(close_prices[-10:]) # Last 10 prices + + # Technical indicators + if len(close_prices) >= 20: + sma_20 = np.mean(close_prices[-20:]) + features.append(sma_20) + else: + features.append(close_prices[-1]) + + # Volume features + volumes = df['volume'].values + features.extend(volumes[-5:]) # Last 5 volumes + + # Pad to expected size (100 features for DQN) + while len(features) < 100: + features.append(0.0) + + return np.array(features[:100], dtype=np.float32) + + except Exception as e: + logger.error(f"Error preparing DQN state: {e}") + return None + + def _map_action_to_index(self, action: str) -> int: + """Map action string to index""" + action_map = {'BUY': 0, 'SELL': 1, 'HOLD': 2} + return action_map.get(action, 2) + + def _calculate_trade_pnl(self, symbol: str, action: str, price: float) -> float: + """Calculate simplified PnL for a trade""" + try: + # This is a simplified PnL calculation + # In a real implementation, this would track actual position changes + + # Get previous price for comparison + df = self.data_provider.get_historical_data(symbol, '1m', limit=2) + if df is None or len(df) < 2: + return 0.0 + + prev_price = df['close'].iloc[-2] + current_price = price + + # Calculate price change + price_change = (current_price - prev_price) / prev_price + + # Apply action direction + if action == 'BUY': + return price_change * 100 # Simplified PnL + elif action == 'SELL': + return -price_change * 100 # Simplified PnL + else: + return 0.0 + + except Exception as e: + logger.error(f"Error calculating trade PnL: {e}") + return 0.0 + + def _check_training_opportunities(self): + """Check for additional training opportunities""" + try: + # Check if we should save model checkpoints + if (self.performance_stats['total_trades'] > 0 and + self.performance_stats['total_trades'] % self.config['model_checkpoint_interval'] == 0): + self._save_model_checkpoints() + + except Exception as e: + logger.error(f"Error checking training opportunities: {e}") + + def _save_model_checkpoints(self): + """Save model checkpoints""" + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Save CNN model + if hasattr(self.orchestrator, 'cnn_model') and self.orchestrator.cnn_model: + if hasattr(self.orchestrator.cnn_model, 'save'): + checkpoint_path = f"models/overnight_cnn_{timestamp}.pth" + self.orchestrator.cnn_model.save(checkpoint_path) + logger.info(f"CNN checkpoint saved: {checkpoint_path}") + + # Save COB RL model + if hasattr(self.orchestrator, 'cob_rl_agent') and self.orchestrator.cob_rl_agent: + if hasattr(self.orchestrator.cob_rl_agent, 'save_model'): + checkpoint_path = f"models/overnight_cob_rl_{timestamp}.pth" + self.orchestrator.cob_rl_agent.save_model(checkpoint_path) + logger.info(f"COB RL checkpoint saved: {checkpoint_path}") + + # Save DQN model + if hasattr(self.orchestrator, 'rl_agent') and self.orchestrator.rl_agent: + if hasattr(self.orchestrator.rl_agent, 'save'): + checkpoint_path = f"models/overnight_dqn_{timestamp}.pth" + self.orchestrator.rl_agent.save(checkpoint_path) + logger.info(f"DQN checkpoint saved: {checkpoint_path}") + + except Exception as e: + logger.error(f"Error saving model checkpoints: {e}") + + def _reset_hourly_counters(self): + """Reset hourly trade counters""" + try: + current_hour = datetime.now().replace(minute=0, second=0, microsecond=0) + if current_hour > self.hour_reset_time: + self.trades_this_hour = {} + self.hour_reset_time = current_hour + logger.info("Hourly trade counters reset") + + except Exception as e: + logger.error(f"Error resetting hourly counters: {e}") + + def _update_performance_stats(self): + """Update performance statistics""" + try: + # Update hourly stats every hour + current_hour = datetime.now().replace(minute=0, second=0, microsecond=0) + + # Check if we need to add a new hourly stat + if not self.performance_stats['hourly_stats'] or self.performance_stats['hourly_stats'][-1]['hour'] != current_hour: + hourly_stat = { + 'hour': current_hour, + 'signals': 0, + 'trades': 0, + 'pnl': 0.0, + 'models_trained': set() + } + self.performance_stats['hourly_stats'].append(hourly_stat) + + except Exception as e: + logger.error(f"Error updating performance stats: {e}") + + def _generate_training_report(self): + """Generate a comprehensive training report""" + try: + logger.info("=" * 80) + logger.info("šŸŒ… OVERNIGHT TRAINING SESSION REPORT") + logger.info("=" * 80) + + # Overall statistics + logger.info(f"šŸ“Š OVERALL STATISTICS:") + logger.info(f" Total Signals Processed: {self.performance_stats['total_signals']}") + logger.info(f" Total Trades Executed: {self.performance_stats['total_trades']}") + logger.info(f" Successful Trades: {self.performance_stats['successful_trades']}") + logger.info(f" Success Rate: {(self.performance_stats['successful_trades'] / max(1, self.performance_stats['total_trades']) * 100):.1f}%") + logger.info(f" Total P&L: ${self.performance_stats['total_pnl']:.2f}") + + # Model training statistics + logger.info(f"🧠 MODEL TRAINING:") + logger.info(f" Models Trained: {', '.join(self.performance_stats['models_trained'])}") + logger.info(f" Training Sessions: {len(self.training_sessions)}") + + # Recent performance + if self.signal_trade_records: + recent_records = list(self.signal_trade_records)[-20:] # Last 20 records + executed_trades = [r for r in recent_records if r.executed] + successful_trades = [r for r in executed_trades if r.trade_pnl and r.trade_pnl > 0] + + logger.info(f"šŸ“ˆ RECENT PERFORMANCE (Last 20 signals):") + logger.info(f" Signals: {len(recent_records)}") + logger.info(f" Executed: {len(executed_trades)}") + logger.info(f" Successful: {len(successful_trades)}") + if executed_trades: + recent_pnl = sum(r.trade_pnl for r in executed_trades if r.trade_pnl) + logger.info(f" Recent P&L: ${recent_pnl:.2f}") + + logger.info("=" * 80) + + except Exception as e: + logger.error(f"Error generating training report: {e}") + + def get_performance_summary(self) -> Dict[str, Any]: + """Get current performance summary""" + try: + return { + 'total_signals': self.performance_stats['total_signals'], + 'total_trades': self.performance_stats['total_trades'], + 'successful_trades': self.performance_stats['successful_trades'], + 'success_rate': (self.performance_stats['successful_trades'] / max(1, self.performance_stats['total_trades'])), + 'total_pnl': self.performance_stats['total_pnl'], + 'models_trained': list(self.performance_stats['models_trained']), + 'is_running': self.is_running, + 'recent_signals': len(self.signal_trade_records) + } + except Exception as e: + logger.error(f"Error getting performance summary: {e}") + return {} \ No newline at end of file diff --git a/start_overnight_training.py b/start_overnight_training.py new file mode 100644 index 0000000..2479e85 --- /dev/null +++ b/start_overnight_training.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +""" +Start Overnight Training Session + +This script starts a comprehensive overnight training session that: +1. Ensures CNN and COB RL training processes are implemented and running +2. Executes training passes on each signal when predictions change +3. Calculates PnL and records trades in SIM mode +4. Tracks model performance statistics +5. Converts signals to actual trades for performance tracking +""" + +import os +import sys +import time +import logging +from datetime import datetime +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent +sys.path.insert(0, str(project_root)) + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(f'overnight_training_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + +def main(): + """Start the overnight training session""" + try: + logger.info("šŸŒ™ STARTING OVERNIGHT TRAINING SESSION") + logger.info("=" * 80) + + # Import required components + from core.config import get_config, setup_logging + from core.data_provider import DataProvider + from core.orchestrator import TradingOrchestrator + from core.trading_executor import TradingExecutor + from web.clean_dashboard import CleanTradingDashboard + + # Setup logging + setup_logging() + + # Initialize components + logger.info("Initializing components...") + + # Create data provider + data_provider = DataProvider() + logger.info("āœ… Data Provider initialized") + + # Create trading executor in simulation mode + trading_executor = TradingExecutor() + trading_executor.simulation_mode = True # Ensure we're in simulation mode + logger.info("āœ… Trading Executor initialized (SIMULATION MODE)") + + # Create orchestrator with enhanced training + orchestrator = TradingOrchestrator( + data_provider=data_provider, + enhanced_rl_training=True + ) + logger.info("āœ… Trading Orchestrator initialized") + + # Connect trading executor to orchestrator + if hasattr(orchestrator, 'set_trading_executor'): + orchestrator.set_trading_executor(trading_executor) + logger.info("āœ… Trading Executor connected to Orchestrator") + + # Create dashboard (this initializes the overnight training coordinator) + dashboard = CleanTradingDashboard( + data_provider=data_provider, + orchestrator=orchestrator, + trading_executor=trading_executor + ) + logger.info("āœ… Dashboard initialized with Overnight Training Coordinator") + + # Start the overnight training session + logger.info("Starting overnight training session...") + success = dashboard.start_overnight_training() + + if success: + logger.info("šŸŒ™ OVERNIGHT TRAINING SESSION STARTED SUCCESSFULLY") + logger.info("=" * 80) + logger.info("Training Features Active:") + logger.info("āœ… CNN training on signal changes") + logger.info("āœ… COB RL training on market microstructure") + logger.info("āœ… DQN training on trading decisions") + logger.info("āœ… Trade execution and recording (SIMULATION)") + logger.info("āœ… Performance tracking and statistics") + logger.info("āœ… Model checkpointing every 50 trades") + logger.info("āœ… Signal-to-trade conversion with PnL calculation") + logger.info("=" * 80) + + # Monitor training progress + logger.info("Monitoring training progress...") + logger.info("Press Ctrl+C to stop the training session") + + # Keep the session running and periodically report progress + start_time = datetime.now() + last_report_time = start_time + + while True: + try: + time.sleep(60) # Check every minute + + current_time = datetime.now() + elapsed_time = current_time - start_time + + # Get performance summary every 10 minutes + if (current_time - last_report_time).total_seconds() >= 600: # 10 minutes + performance = dashboard.get_training_performance_summary() + + logger.info("=" * 60) + logger.info(f"šŸŒ™ TRAINING PROGRESS REPORT - {elapsed_time}") + logger.info("=" * 60) + logger.info(f"Total Signals: {performance.get('total_signals', 0)}") + logger.info(f"Total Trades: {performance.get('total_trades', 0)}") + logger.info(f"Successful Trades: {performance.get('successful_trades', 0)}") + logger.info(f"Success Rate: {performance.get('success_rate', 0):.1%}") + logger.info(f"Total P&L: ${performance.get('total_pnl', 0):.2f}") + logger.info(f"Models Trained: {', '.join(performance.get('models_trained', []))}") + logger.info(f"Training Status: {'ACTIVE' if performance.get('is_running', False) else 'INACTIVE'}") + logger.info("=" * 60) + + last_report_time = current_time + + except KeyboardInterrupt: + logger.info("\nšŸ›‘ Training session interrupted by user") + break + except Exception as e: + logger.error(f"Error during training monitoring: {e}") + time.sleep(30) # Wait 30 seconds before retrying + + # Stop the training session + logger.info("Stopping overnight training session...") + dashboard.stop_overnight_training() + + # Final report + final_performance = dashboard.get_training_performance_summary() + total_time = datetime.now() - start_time + + logger.info("=" * 80) + logger.info("šŸŒ… OVERNIGHT TRAINING SESSION COMPLETED") + logger.info("=" * 80) + logger.info(f"Total Duration: {total_time}") + logger.info(f"Final Statistics:") + logger.info(f" Total Signals: {final_performance.get('total_signals', 0)}") + logger.info(f" Total Trades: {final_performance.get('total_trades', 0)}") + logger.info(f" Successful Trades: {final_performance.get('successful_trades', 0)}") + logger.info(f" Success Rate: {final_performance.get('success_rate', 0):.1%}") + logger.info(f" Total P&L: ${final_performance.get('total_pnl', 0):.2f}") + logger.info(f" Models Trained: {', '.join(final_performance.get('models_trained', []))}") + logger.info("=" * 80) + + else: + logger.error("āŒ Failed to start overnight training session") + return 1 + + except KeyboardInterrupt: + logger.info("\nšŸ›‘ Training session interrupted by user") + return 0 + except Exception as e: + logger.error(f"āŒ Error in overnight training session: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == "__main__": + exit_code = main() + sys.exit(exit_code) \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index c680587..48ebf09 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -80,6 +80,9 @@ except ImportError: # Import RL COB trader for 1B parameter model integration from core.realtime_rl_cob_trader import RealtimeRLCOBTrader, PredictionResult +# Import overnight training coordinator +from core.overnight_training_coordinator import OvernightTrainingCoordinator + # Single unified orchestrator with full ML capabilities class CleanTradingDashboard: @@ -220,10 +223,58 @@ class CleanTradingDashboard: if not self.trading_executor.simulation_mode: threading.Thread(target=self._monitor_order_execution, daemon=True).start() + # Initialize overnight training coordinator + self.overnight_training_coordinator = OvernightTrainingCoordinator( + orchestrator=self.orchestrator, + data_provider=self.data_provider, + trading_executor=self.trading_executor, + dashboard=self + ) + # Start training sessions if models are showing FRESH status threading.Thread(target=self._delayed_training_check, daemon=True).start() logger.debug("Clean Trading Dashboard initialized with HIGH-FREQUENCY COB integration and signal generation") + logger.info("šŸŒ™ Overnight Training Coordinator ready - call start_overnight_training() to begin") + + def start_overnight_training(self): + """Start the overnight training session""" + try: + if hasattr(self, 'overnight_training_coordinator'): + self.overnight_training_coordinator.start_overnight_training() + logger.info("šŸŒ™ OVERNIGHT TRAINING SESSION STARTED") + return True + else: + logger.error("Overnight training coordinator not available") + return False + except Exception as e: + logger.error(f"Error starting overnight training: {e}") + return False + + def stop_overnight_training(self): + """Stop the overnight training session""" + try: + if hasattr(self, 'overnight_training_coordinator'): + self.overnight_training_coordinator.stop_overnight_training() + logger.info("šŸŒ… OVERNIGHT TRAINING SESSION STOPPED") + return True + else: + logger.error("Overnight training coordinator not available") + return False + except Exception as e: + logger.error(f"Error stopping overnight training: {e}") + return False + + def get_training_performance_summary(self) -> Dict[str, Any]: + """Get training performance summary""" + try: + if hasattr(self, 'overnight_training_coordinator'): + return self.overnight_training_coordinator.get_performance_summary() + else: + return {'error': 'Training coordinator not available'} + except Exception as e: + logger.error(f"Error getting training performance summary: {e}") + return {'error': str(e)} def _get_universal_data_from_orchestrator(self) -> Optional[UniversalDataStream]: """Get universal data through orchestrator as per architecture."""