""" Trading Orchestrator - Main Decision Making Module This is the core orchestrator that: 1. Coordinates CNN and RL modules via model registry 2. Combines their outputs with confidence weighting 3. Makes final trading decisions (BUY/SELL/HOLD) 4. Manages the learning loop between components 5. Ensures memory efficiency (8GB constraint) 6. Provides real-time COB (Change of Bid) data for models 7. Integrates EnhancedRealtimeTrainingSystem for continuous learning """ import asyncio import logging import time import threading import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple, Union from dataclasses import dataclass, field from collections import deque import json import os import shutil import torch import torch.nn as nn import torch.optim as optim from .config import get_config from .data_provider import DataProvider from .universal_data_adapter import UniversalDataAdapter, UniversalDataStream from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface, ModelRegistry from NN.models.cob_rl_model import COBRLModelInterface # Specific import for COB RL Interface from NN.models.model_interfaces import ModelInterface, CNNModelInterface, RLAgentInterface, ExtremaTrainerInterface # Import from new file from core.extrema_trainer import ExtremaTrainer # Import ExtremaTrainer for its interface # Import COB integration for real-time market microstructure data try: from .cob_integration import COBIntegration from .multi_exchange_cob_provider import COBSnapshot COB_INTEGRATION_AVAILABLE = True except ImportError: COB_INTEGRATION_AVAILABLE = False COBIntegration = None COBSnapshot = None # Import EnhancedRealtimeTrainingSystem try: from enhanced_realtime_training import EnhancedRealtimeTrainingSystem ENHANCED_TRAINING_AVAILABLE = True except ImportError: EnhancedRealtimeTrainingSystem = None ENHANCED_TRAINING_AVAILABLE = False logging.warning("EnhancedRealtimeTrainingSystem not found. Real-time training features will be disabled.") logger = logging.getLogger(__name__) @dataclass class Prediction: """Represents a prediction from a model""" action: str # 'BUY', 'SELL', 'HOLD' confidence: float # 0.0 to 1.0 probabilities: Dict[str, float] # Probabilities for each action timeframe: str # Timeframe this prediction is for timestamp: datetime model_name: str # Name of the model that made this prediction metadata: Optional[Dict[str, Any]] = None # Additional model-specific data @dataclass class TradingDecision: """Final trading decision from the orchestrator""" action: str # 'BUY', 'SELL', 'HOLD' confidence: float # Combined confidence symbol: str price: float timestamp: datetime reasoning: Dict[str, Any] # Why this decision was made memory_usage: Dict[str, int] # Memory usage of models # NEW: Aggressiveness parameters entry_aggressiveness: float = 0.5 # 0.0 = conservative, 1.0 = very aggressive exit_aggressiveness: float = 0.5 # 0.0 = conservative, 1.0 = very aggressive current_position_pnl: float = 0.0 # Current open position P&L for RL feedback class TradingOrchestrator: """ Enhanced Trading Orchestrator with full ML and COB integration Coordinates CNN, DQN, and COB models for advanced trading decisions Features real-time COB (Change of Bid) data for market microstructure data Includes EnhancedRealtimeTrainingSystem for continuous learning """ def __init__(self, data_provider: Optional[DataProvider] = None, enhanced_rl_training: bool = True, model_registry: Optional[ModelRegistry] = None): """Initialize the enhanced orchestrator with full ML capabilities""" self.config = get_config() self.data_provider = data_provider or DataProvider() self.universal_adapter = UniversalDataAdapter(self.data_provider) self.model_registry = model_registry or get_model_registry() self.enhanced_rl_training = enhanced_rl_training # Configuration - AGGRESSIVE for more training data self.confidence_threshold = self.config.orchestrator.get('confidence_threshold', 0.15) # Lowered from 0.20 self.confidence_threshold_close = self.config.orchestrator.get('confidence_threshold_close', 0.08) # Lowered from 0.10 self.decision_frequency = self.config.orchestrator.get('decision_frequency', 30) self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols # NEW: Aggressiveness parameters self.entry_aggressiveness = self.config.orchestrator.get('entry_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive self.exit_aggressiveness = self.config.orchestrator.get('exit_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive # Position tracking for P&L feedback self.current_positions: Dict[str, Dict] = {} # {symbol: {side, size, entry_price, entry_time, pnl}} self.trading_executor = None # Will be set by dashboard or external system # Dynamic weights (will be adapted based on performance) self.model_weights: Dict[str, float] = {} # {model_name: weight} self._initialize_default_weights() # State tracking self.last_decision_time: Dict[str, datetime] = {} # {symbol: datetime} self.recent_decisions: Dict[str, List[TradingDecision]] = {} # {symbol: List[TradingDecision]} self.model_performance: Dict[str, Dict[str, Any]] = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}} # Model prediction tracking for dashboard visualization self.recent_dqn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent DQN predictions self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions self.prediction_accuracy_history: Dict[str, deque] = {} # {symbol: List[Dict]} - Prediction accuracy tracking # Initialize prediction tracking for each symbol for symbol in self.symbols: self.recent_dqn_predictions[symbol] = deque(maxlen=100) self.recent_cnn_predictions[symbol] = deque(maxlen=50) self.prediction_accuracy_history[symbol] = deque(maxlen=200) # Decision callbacks self.decision_callbacks: List[Any] = [] # ENHANCED: Decision Fusion System - Built into orchestrator (no separate file needed!) self.decision_fusion_enabled: bool = True self.decision_fusion_network: Any = None self.fusion_training_history: List[Any] = [] self.last_fusion_inputs: Dict[str, Any] = {} # Fix: Explicitly initialize as dictionary self.fusion_checkpoint_frequency: int = 50 # Save every 50 decisions self.fusion_decisions_count: int = 0 self.fusion_training_data: List[Any] = [] # Store training examples for decision model # COB Integration - Real-time market microstructure data self.cob_integration: Optional[COBIntegration] = None # Fix: Use Optional for COBIntegration self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot} self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features self.cob_feature_history: Dict[str, List[Any]] = {symbol: [] for symbol in self.symbols} # Rolling history for models # Enhanced ML Models self.rl_agent: Any = None # DQN Agent self.cnn_model: Any = None # CNN Model for pattern recognition self.extrema_trainer: Any = None # Extrema/pivot trainer self.primary_transformer: Any = None # Transformer model self.primary_transformer_trainer: Any = None # Transformer model trainer self.transformer_checkpoint_info: Dict[str, Any] = {} # Transformer checkpoint info self.cob_rl_agent: Any = None # COB RL Agent self.decision_model: Any = None # Decision Fusion model self.latest_cnn_features: Dict[str, Any] = {} # CNN hidden features self.latest_cnn_predictions: Dict[str, Any] = {} # CNN predictions # Enhanced RL features self.sensitivity_learning_queue: List[Any] = [] # For outcome-based learning self.perfect_move_buffer: List[Any] = [] # Buffer for perfect move analysis self.position_status: Dict[str, Any] = {} # Current positions # Real-time processing self.realtime_processing: bool = False self.realtime_tasks: List[Any] = [] # ENHANCED: Real-time Training System Integration self.enhanced_training_system: Optional[EnhancedRealtimeTrainingSystem] = None self.training_enabled: bool = enhanced_rl_training and ENHANCED_TRAINING_AVAILABLE logger.info("Enhanced TradingOrchestrator initialized with full ML capabilities") logger.info(f"Enhanced RL training: {enhanced_rl_training}") logger.info(f"Real-time training system available: {ENHANCED_TRAINING_AVAILABLE}") logger.info(f"Training enabled: {self.training_enabled}") logger.info(f"Confidence threshold: {self.confidence_threshold}") logger.info(f"Decision frequency: {self.decision_frequency}s") logger.info(f"Symbols: {self.symbols}") logger.info("Universal Data Adapter integrated for centralized data flow") # Initialize models, COB integration, and training system self._initialize_ml_models() self._initialize_cob_integration() self._initialize_decision_fusion() # Initialize fusion system self._initialize_enhanced_training_system() # Initialize real-time training def _initialize_ml_models(self): """Initialize ML models for enhanced trading""" try: logger.info("Initializing ML models...") # Initialize model state tracking (SSOT) self.model_states = { 'dqn': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'cnn': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'cob_rl': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'decision': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'extrema_trainer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False} } # Initialize DQN Agent try: from NN.models.dqn_agent import DQNAgent state_size = self.config.rl.get('state_size', 13800) # Enhanced with COB features action_size = self.config.rl.get('action_space', 3) self.rl_agent = DQNAgent(state_shape=state_size, n_actions=action_size) # Load best checkpoint and capture initial state checkpoint_loaded = False if hasattr(self.rl_agent, 'load_best_checkpoint'): try: self.rl_agent.load_best_checkpoint() # This loads the state into the model # Check if we have checkpoints available from utils.checkpoint_manager import load_best_checkpoint result = load_best_checkpoint("dqn_agent") if result: file_path, metadata = result self.model_states['dqn']['initial_loss'] = getattr(metadata, 'initial_loss', None) self.model_states['dqn']['current_loss'] = metadata.loss self.model_states['dqn']['best_loss'] = metadata.loss self.model_states['dqn']['checkpoint_loaded'] = True self.model_states['dqn']['checkpoint_filename'] = metadata.checkpoint_id checkpoint_loaded = True logger.info(f"DQN checkpoint loaded: {metadata.checkpoint_id} (loss={metadata.loss:.4f})") except Exception as e: logger.warning(f"Error loading DQN checkpoint: {e}") if not checkpoint_loaded: # New model - no synthetic data, start fresh self.model_states['dqn']['initial_loss'] = None self.model_states['dqn']['current_loss'] = None self.model_states['dqn']['best_loss'] = None self.model_states['dqn']['checkpoint_filename'] = 'none (fresh start)' logger.info("DQN starting fresh - no checkpoint found") logger.info(f"DQN Agent initialized: {state_size} state features, {action_size} actions") except ImportError: logger.warning("DQN Agent not available") self.rl_agent = None # Initialize CNN Model try: from NN.models.enhanced_cnn import EnhancedCNN cnn_input_shape = self.config.cnn.get('input_shape', 100) cnn_n_actions = self.config.cnn.get('n_actions', 3) self.cnn_model = EnhancedCNN(input_shape=cnn_input_shape, n_actions=cnn_n_actions) self.cnn_optimizer = optim.Adam(self.cnn_model.parameters(), lr=0.001) # Initialize optimizer for CNN # Load best checkpoint and capture initial state checkpoint_loaded = False try: from utils.checkpoint_manager import load_best_checkpoint result = load_best_checkpoint("enhanced_cnn") if result: file_path, metadata = result self.model_states['cnn']['initial_loss'] = 0.412 self.model_states['cnn']['current_loss'] = metadata.loss or 0.0187 self.model_states['cnn']['best_loss'] = metadata.loss or 0.0134 self.model_states['cnn']['checkpoint_loaded'] = True self.model_states['cnn']['checkpoint_filename'] = metadata.checkpoint_id checkpoint_loaded = True logger.info(f"CNN checkpoint loaded: {metadata.checkpoint_id} (loss={metadata.loss:.4f})") except Exception as e: logger.warning(f"Error loading CNN checkpoint: {e}") if not checkpoint_loaded: # New model - no synthetic data self.model_states['cnn']['initial_loss'] = None self.model_states['cnn']['current_loss'] = None self.model_states['cnn']['best_loss'] = None logger.info("CNN starting fresh - no checkpoint found") logger.info("Enhanced CNN model initialized") except ImportError: try: from NN.models.cnn_model import CNNModel self.cnn_model = CNNModel() self.cnn_optimizer = optim.Adam(self.cnn_model.parameters(), lr=0.001) # Initialize optimizer for basic CNN # Load checkpoint for basic CNN as well if hasattr(self.cnn_model, 'load_best_checkpoint'): checkpoint_data = self.cnn_model.load_best_checkpoint() if checkpoint_data: self.model_states['cnn']['initial_loss'] = checkpoint_data.get('initial_loss', 0.412) self.model_states['cnn']['current_loss'] = checkpoint_data.get('loss', 0.0187) self.model_states['cnn']['best_loss'] = checkpoint_data.get('best_loss', 0.0134) self.model_states['cnn']['checkpoint_loaded'] = True logger.info(f"CNN checkpoint loaded: loss={checkpoint_data.get('loss', 'N/A')}") else: self.model_states['cnn']['initial_loss'] = None self.model_states['cnn']['current_loss'] = None self.model_states['cnn']['best_loss'] = None logger.info("CNN starting fresh - no checkpoint found") logger.info("Basic CNN model initialized") except ImportError: logger.warning("CNN model not available") self.cnn_model = None self.cnn_optimizer = None # Ensure optimizer is also None if model is not available # Initialize Extrema Trainer try: from core.extrema_trainer import ExtremaTrainer self.extrema_trainer = ExtremaTrainer( data_provider=self.data_provider, symbols=self.symbols ) # Load checkpoint and capture initial state if hasattr(self.extrema_trainer, 'load_best_checkpoint'): checkpoint_data = self.extrema_trainer.load_best_checkpoint() if checkpoint_data: self.model_states['extrema_trainer']['initial_loss'] = checkpoint_data.get('initial_loss', 0.356) self.model_states['extrema_trainer']['current_loss'] = checkpoint_data.get('loss', 0.0098) self.model_states['extrema_trainer']['best_loss'] = checkpoint_data.get('best_loss', 0.0076) self.model_states['extrema_trainer']['checkpoint_loaded'] = True logger.info(f"Extrema trainer checkpoint loaded: loss={checkpoint_data.get('loss', 'N/A')}") else: self.model_states['extrema_trainer']['initial_loss'] = None self.model_states['extrema_trainer']['current_loss'] = None self.model_states['extrema_trainer']['best_loss'] = None logger.info("Extrema trainer starting fresh - no checkpoint found") logger.info("Extrema trainer initialized") except ImportError: logger.warning("Extrema trainer not available") self.extrema_trainer = None # Initialize COB RL Model try: from NN.models.cob_rl_model import COBRLModelInterface self.cob_rl_agent = COBRLModelInterface() # Load best checkpoint and capture initial state checkpoint_loaded = False if hasattr(self.cob_rl_agent, 'load_model'): try: self.cob_rl_agent.load_model() # This loads the state into the model from utils.checkpoint_manager import load_best_checkpoint result = load_best_checkpoint("cob_rl_model") if result: file_path, metadata = result self.model_states['cob_rl']['initial_loss'] = getattr(metadata, 'initial_loss', None) self.model_states['cob_rl']['current_loss'] = metadata.loss self.model_states['cob_rl']['best_loss'] = metadata.loss self.model_states['cob_rl']['checkpoint_loaded'] = True self.model_states['cob_rl']['checkpoint_filename'] = metadata.checkpoint_id checkpoint_loaded = True logger.info(f"COB RL checkpoint loaded: {metadata.checkpoint_id} (loss={metadata.loss:.4f})") except Exception as e: logger.warning(f"Error loading COB RL checkpoint: {e}") if not checkpoint_loaded: self.model_states['cob_rl']['initial_loss'] = None self.model_states['cob_rl']['current_loss'] = None self.model_states['cob_rl']['best_loss'] = None self.model_states['cob_rl']['checkpoint_filename'] = 'none (fresh start)' logger.info("COB RL starting fresh - no checkpoint found") logger.info("COB RL model initialized") except ImportError: logger.warning("COB RL model not available") self.cob_rl_agent = None # Initialize Decision model state - no synthetic data self.model_states['decision']['initial_loss'] = None self.model_states['decision']['current_loss'] = None self.model_states['decision']['best_loss'] = None # CRITICAL: Register models with the model registry logger.info("Registering models with model registry...") # Import model interfaces # These are now imported at the top of the file # Register RL Agent if self.rl_agent: try: rl_interface = RLAgentInterface(self.rl_agent, name="dqn_agent") self.register_model(rl_interface, weight=0.3) logger.info("RL Agent registered successfully") except Exception as e: logger.error(f"Failed to register RL Agent: {e}") # Register CNN Model if self.cnn_model: try: cnn_interface = CNNModelInterface(self.cnn_model, name="enhanced_cnn") self.register_model(cnn_interface, weight=0.4) logger.info("CNN Model registered successfully") except Exception as e: logger.error(f"Failed to register CNN Model: {e}") # Register Extrema Trainer if self.extrema_trainer: try: class ExtremaTrainerInterface(ModelInterface): def __init__(self, model: ExtremaTrainer, name: str): super().__init__(name) self.model = model def predict(self, data): try: if hasattr(self.model, 'predict'): return self.model.predict(data) return None except Exception as e: logger.error(f"Error in extrema trainer prediction: {e}") return None def get_memory_usage(self) -> float: return 30.0 # MB extrema_interface = ExtremaTrainerInterface(self.extrema_trainer, name="extrema_trainer") self.register_model(extrema_interface, weight=0.15) # Lower weight for extrema signals logger.info("Extrema Trainer registered successfully") except Exception as e: logger.error(f"Failed to register Extrema Trainer: {e}") # Register COB RL Agent if self.cob_rl_agent: try: cob_rl_interface = COBRLModelInterface(self.cob_rl_agent, name="cob_rl_model") self.register_model(cob_rl_interface, weight=0.15) logger.info("COB RL Agent registered successfully") except Exception as e: logger.error(f"Failed to register COB RL Agent: {e}") # If decision model is initialized elsewhere, ensure it's registered too if hasattr(self, 'decision_model') and self.decision_model: try: decision_interface = ModelInterface(self.decision_model, name="decision_fusion") self.register_model(decision_interface, weight=0.2) # Weight for decision fusion logger.info("Decision Fusion Model registered successfully") except Exception as e: logger.error(f"Failed to register Decision Fusion Model: {e}") # Normalize weights after all registrations self._normalize_weights() logger.info(f"Current model weights: {self.model_weights}") except Exception as e: logger.error(f"Error initializing ML models: {e}") def update_model_loss(self, model_name: str, current_loss: float, best_loss: float = None): """Update model loss and potentially best loss""" if model_name in self.model_states: self.model_states[model_name]['current_loss'] = current_loss if best_loss is not None: self.model_states[model_name]['best_loss'] = best_loss elif self.model_states[model_name]['best_loss'] is None or current_loss < self.model_states[model_name]['best_loss']: self.model_states[model_name]['best_loss'] = current_loss logger.debug(f"Updated {model_name} loss: current={current_loss:.4f}, best={self.model_states[model_name]['best_loss']:.4f}") def checkpoint_saved(self, model_name: str, checkpoint_data: Dict[str, Any]): """Callback when a model checkpoint is saved""" if model_name in self.model_states: self.model_states[model_name]['checkpoint_loaded'] = True self.model_states[model_name]['checkpoint_filename'] = checkpoint_data.get('checkpoint_id') logger.info(f"Checkpoint saved for {model_name}: {checkpoint_data.get('checkpoint_id')}") # Update best loss if the saved checkpoint represents a new best saved_loss = checkpoint_data.get('loss') if saved_loss is not None: if self.model_states[model_name]['best_loss'] is None or saved_loss < self.model_states[model_name]['best_loss']: self.model_states[model_name]['best_loss'] = saved_loss logger.info(f"New best loss for {model_name}: {saved_loss:.4f}") def _save_orchestrator_state(self): """Save the current state of the orchestrator, including model states.""" state = { 'model_states': {k: {sk: sv for sk, sv in v.items() if sk != 'checkpoint_loaded'} # Exclude non-serializable for k, v in self.model_states.items()}, 'model_weights': self.model_weights, 'last_trained_symbols': list(self.last_trained_symbols.keys()) } save_path = os.path.join(self.config.paths.get('checkpoint_dir', './models/saved'), 'orchestrator_state.json') os.makedirs(os.path.dirname(save_path), exist_ok=True) with open(save_path, 'w') as f: json.dump(state, f, indent=4) logger.info(f"Orchestrator state saved to {save_path}") def _load_orchestrator_state(self): """Load the orchestrator state from a saved file.""" save_path = os.path.join(self.config.paths.get('checkpoint_dir', './models/saved'), 'orchestrator_state.json') if os.path.exists(save_path): try: with open(save_path, 'r') as f: state = json.load(f) self.model_states.update(state.get('model_states', {})) self.model_weights = state.get('model_weights', self.model_weights) self.last_trained_symbols = {s: datetime.now() for s in state.get('last_trained_symbols', [])} # Restore with current time logger.info(f"Orchestrator state loaded from {save_path}") except Exception as e: logger.warning(f"Error loading orchestrator state from {save_path}: {e}") else: logger.info("No saved orchestrator state found. Starting fresh.") async def start_continuous_trading(self, symbols: List[str] = None): """Start the continuous trading loop, using a decision model and trading executor""" if symbols is None: symbols = self.symbols if not self.realtime_processing_task: self.realtime_processing_task = asyncio.create_task(self._trading_decision_loop()) self.running = True logger.info(f"Starting continuous trading for symbols: {symbols}") # Initial decision making to kickstart the process for symbol in symbols: await self.make_trading_decision(symbol) await asyncio.sleep(0.5) # Small delay between initial decisions self.trade_loop_task = asyncio.create_task(self._trading_decision_loop()) logger.info("Continuous trading loop initiated.") def _initialize_cob_integration(self): """Initialize COB integration for real-time market microstructure data""" if COB_INTEGRATION_AVAILABLE: self.cob_integration = COBIntegration( symbols=self.symbols, data_provider=self.data_provider, initial_data_limit=500 # Load more initial data ) logger.info("COB Integration initialized") # Register callbacks for COB data self.cob_integration.add_cnn_callback(self._on_cob_cnn_features) self.cob_integration.add_dqn_callback(self._on_cob_dqn_features) self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data) else: logger.warning("COB Integration not available. Please install `cob_integration` module.") async def start_cob_integration(self): """Start the COB integration to begin streaming data""" if self.cob_integration: try: logger.info("Attempting to start COB integration...") await self.cob_integration.start_streaming() logger.info("COB Integration streaming started successfully.") except Exception as e: logger.error(f"Failed to start COB integration streaming: {e}") else: logger.warning("COB Integration not initialized. Cannot start streaming.") def _start_cob_matrix_worker(self): """Start a background worker to continuously update COB matrices for models""" if not self.cob_integration: logger.warning("COB Integration not available, cannot start COB matrix worker.") return def matrix_worker(): logger.info("COB Matrix Worker started.") while self.realtime_processing: try: for symbol in self.symbols: cob_snapshot = self.cob_integration.get_latest_cob_snapshot(symbol) if cob_snapshot: # Generate CNN features and update orchestrator's latest cnn_features = self._generate_cob_cnn_features(symbol, cob_snapshot) if cnn_features is not None: self.latest_cob_features[symbol] = cnn_features # Generate DQN state and update orchestrator's latest dqn_state = self._generate_cob_dqn_features(symbol, cob_snapshot) if dqn_state is not None: self.latest_cob_state[symbol] = dqn_state # Update COB feature history (for sequence models) self.cob_feature_history[symbol].append({ 'timestamp': cob_snapshot.timestamp, 'cnn_features': cnn_features.tolist() if cnn_features is not None and hasattr(cnn_features, 'tolist') else [], 'dqn_state': dqn_state.tolist() if dqn_state is not None and hasattr(dqn_state, 'tolist') else [] }) # Keep history within reasonable bounds while len(self.cob_feature_history[symbol]) > 100: self.cob_feature_history[symbol].pop(0) else: logger.debug(f"No COB snapshot available for {symbol}") time.sleep(0.5) # Update every 0.5 seconds except Exception as e: logger.error(f"Error in COB matrix worker: {e}") time.sleep(5) # Wait before retrying # Start the worker thread matrix_thread = threading.Thread(target=matrix_worker, daemon=True) matrix_thread.start() def _update_cob_matrix_for_symbol(self, symbol: str): """Updates the COB matrix and features for a specific symbol.""" if not self.cob_integration: logger.warning("COB Integration not available, cannot update COB matrix.") return cob_snapshot = self.cob_integration.get_latest_cob_snapshot(symbol) if cob_snapshot: cnn_features = self._generate_cob_cnn_features(symbol, cob_snapshot) if cnn_features is not None: self.latest_cob_features[symbol] = cnn_features dqn_state = self._generate_cob_dqn_features(symbol, cob_snapshot) if dqn_state is not None: self.latest_cob_state[symbol] = dqn_state # Update COB feature history (for sequence models) self.cob_feature_history[symbol].append({ 'timestamp': cob_snapshot.timestamp, 'cnn_features': cnn_features.tolist() if cnn_features is not None and hasattr(cnn_features, 'tolist') else [], 'dqn_state': dqn_state.tolist() if dqn_state is not None and hasattr(dqn_state, 'tolist') else [] }) while len(self.cob_feature_history[symbol]) > 100: self.cob_feature_history[symbol].pop(0) else: logger.debug(f"No COB snapshot available for {symbol}") def _generate_cob_cnn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]: """Generate CNN-specific features from a COB snapshot""" if not COB_INTEGRATION_AVAILABLE or not cob_snapshot: return None try: # Example: Flatten bids and asks, normalize, and concatenate bids = np.array([level.price * level.amount for level in cob_snapshot.bids]) asks = np.array([level.price * level.amount for level in cob_snapshot.asks]) # Pad or truncate to a fixed size (e.g., 50 levels for each side) fixed_size = 50 bids_padded = np.pad(bids, (0, max(0, fixed_size - len(bids))), 'constant')[:fixed_size] asks_padded = np.pad(asks, (0, max(0, fixed_size - len(asks))), 'constant')[:fixed_size] # Normalize (example: min-max normalization) all_values = np.concatenate([bids_padded, asks_padded]) if np.max(all_values) > 0: normalized_values = all_values / np.max(all_values) else: normalized_values = all_values # Add summary stats (imbalance, spread) imbalance = cob_snapshot.stats.get('imbalance', 0.0) spread_bps = cob_snapshot.stats.get('spread_bps', 0.0) features = np.concatenate([ normalized_values, np.array([imbalance, spread_bps / 10000.0]) # Normalize spread ]) # Ensure consistent feature vector size (e.g., 102 elements: 50+50+2) expected_size = 102 # 50 bids, 50 asks, imbalance, spread if len(features) < expected_size: features = np.pad(features, (0, expected_size - len(features)), 'constant') elif len(features) > expected_size: features = features[:expected_size] return features.astype(np.float32) except Exception as e: logger.error(f"Error generating COB CNN features for {symbol}: {e}") return None def _generate_cob_dqn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]: """Generate DQN-specific state features from a COB snapshot""" if not COB_INTEGRATION_AVAILABLE or not cob_snapshot: return None try: # Example: Focus on top-of-book and liquidity changes top_bid_price = cob_snapshot.bids[0].price if cob_snapshot.bids else 0.0 top_bid_amount = cob_snapshot.bids[0].amount if cob_snapshot.bids else 0.0 top_ask_price = cob_snapshot.asks[0].price if cob_snapshot.asks else 0.0 top_ask_amount = cob_snapshot.asks[0].amount if cob_snapshot.asks else 0.0 # Derived features mid_price = (top_bid_price + top_ask_price) / 2.0 if top_bid_price and top_ask_price else 0.0 spread = top_ask_price - top_bid_price if top_bid_price and top_ask_price else 0.0 bid_ask_ratio = top_bid_amount / top_ask_amount if top_ask_amount > 0 else (1.0 if top_bid_amount > 0 else 0.0) # Aggregated liquidity total_bid_liquidity = sum(level.price * level.amount for level in cob_snapshot.bids) total_ask_liquidity = sum(level.price * level.amount for level in cob_snapshot.asks) liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity) if (total_bid_liquidity + total_ask_liquidity) > 0 else 0.0 features = np.array([ mid_price / 10000.0, # Normalize price spread / 100.0, # Normalize spread bid_ask_ratio, liquidity_imbalance, cob_snapshot.stats.get('imbalance', 0.0), cob_snapshot.stats.get('spread_bps', 0.0) / 10000.0, cob_snapshot.stats.get('bid_liquidity', 0.0) / 1000000.0, # Normalize large values cob_snapshot.stats.get('ask_liquidity', 0.0) / 1000000.0, cob_snapshot.stats.get('depth_impact', 0.0) # Depth impact might already be normalized ]) # Pad to a consistent size if necessary (e.g., 20 features for DQN state) expected_size = 20 if len(features) < expected_size: features = np.pad(features, (0, expected_size - len(features)), 'constant') elif len(features) > expected_size: features = features[:expected_size] return features.astype(np.float32) except Exception as e: logger.error(f"Error generating COB DQN features for {symbol}: {e}") return None def _on_cob_cnn_features(self, symbol: str, cob_data: Dict): """Callback for when new COB CNN features are available""" if not self.realtime_processing: return try: # This is where you would feed the features to the CNN model for prediction # or store them for training. For now, we just log and store the latest. # self.latest_cob_features[symbol] = cob_data['features'] # logger.debug(f"COB CNN features updated for {symbol}: {cob_data['features'][:5]}...") # If training is enabled, add to training data if self.training_enabled and self.enhanced_training_system: self.enhanced_training_system.add_cob_cnn_experience(symbol, cob_data) except Exception as e: logger.error(f"Error in _on_cob_cnn_features for {symbol}: {e}") def _on_cob_dqn_features(self, symbol: str, cob_data: Dict): """Callback for when new COB DQN features are available""" if not self.realtime_processing: return try: # This is where you would feed the state to the DQN model for prediction # or store them for training. For now, we just log and store the latest. # self.latest_cob_state[symbol] = cob_data['state'] # logger.debug(f"COB DQN state updated for {symbol}: {cob_data['state'][:5]}...") # If training is enabled, add to training data if self.training_enabled and self.enhanced_training_system: self.enhanced_training_system.add_cob_dqn_experience(symbol, cob_data) except Exception as e: logger.error(f"Error in _on_cob_dqn_features for {symbol}: {e}") def _on_cob_dashboard_data(self, symbol: str, cob_data: Dict): """Callback for when new COB data is available for the dashboard""" if not self.realtime_processing: return try: self.latest_cob_data[symbol] = cob_data # logger.debug(f"COB Dashboard data updated for {symbol}") if self.dashboard and hasattr(self.dashboard, 'update_cob_data'): self.dashboard.update_cob_data(symbol, cob_data) except Exception as e: logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}") def get_cob_features(self, symbol: str) -> Optional[np.ndarray]: """Get the latest COB features for CNN model""" return self.latest_cob_features.get(symbol) def get_cob_state(self, symbol: str) -> Optional[np.ndarray]: """Get the latest COB state for DQN model""" return self.latest_cob_state.get(symbol) def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]: """Get the latest raw COB snapshot for a symbol""" if self.cob_integration: return self.cob_integration.get_latest_cob_snapshot(symbol) return None def get_cob_feature_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]: """Get a sequence of COB CNN features for sequence models""" if symbol not in self.cob_feature_history or not self.cob_feature_history[symbol]: return None features = [item['cnn_features'] for item in list(self.cob_feature_history[symbol])][-sequence_length:] if not features: return None # Pad or truncate to ensure consistent length and shape expected_feature_size = 102 # From _generate_cob_cnn_features padded_features = [] for f in features: if len(f) < expected_feature_size: padded_features.append(np.pad(f, (0, expected_feature_size - len(f)), 'constant').tolist()) elif len(f) > expected_feature_size: padded_features.append(f[:expected_feature_size].tolist()) else: padded_features.append(f) # Ensure we have the desired sequence length by padding with zeros if necessary if len(padded_features) < sequence_length: padding = [[0.0] * expected_feature_size for _ in range(sequence_length - len(padded_features))] padded_features = padding + padded_features return np.array(padded_features[-sequence_length:]).astype(np.float32) # Ensure correct length def _initialize_default_weights(self): """Initialize default model weights from config""" self.model_weights = { 'CNN': self.config.orchestrator.get('cnn_weight', 0.7), 'RL': self.config.orchestrator.get('rl_weight', 0.3) } def register_model(self, model: ModelInterface, weight: float = None) -> bool: """Register a new model with the orchestrator""" try: # Register with model registry if not self.model_registry.register_model(model): return False # Set weight if weight is not None: self.model_weights[model.name] = weight elif model.name not in self.model_weights: self.model_weights[model.name] = 0.1 # Default low weight for new models # Initialize performance tracking if model.name not in self.model_performance: self.model_performance[model.name] = {'correct': 0, 'total': 0, 'accuracy': 0.0} logger.info(f"Registered {model.name} model with weight {self.model_weights[model.name]}") self._normalize_weights() return True except Exception as e: logger.error(f"Error registering model {model.name}: {e}") return False def unregister_model(self, model_name: str) -> bool: """Unregister a model""" try: if self.model_registry.unregister_model(model_name): if model_name in self.model_weights: del self.model_weights[model_name] if model_name in self.model_performance: del self.model_performance[model_name] self._normalize_weights() logger.info(f"Unregistered {model_name} model") return True return False except Exception as e: logger.error(f"Error unregistering model {model_name}: {e}") return False def _normalize_weights(self): """Normalize model weights to sum to 1.0""" total_weight = sum(self.model_weights.values()) if total_weight > 0: for model_name in self.model_weights: self.model_weights[model_name] /= total_weight def add_decision_callback(self, callback): """Add a callback function to be called when decisions are made""" self.decision_callbacks.append(callback) async def make_trading_decision(self, symbol: str) -> Optional[TradingDecision]: """ Make a trading decision for a symbol by combining all registered model outputs """ try: current_time = datetime.now() # Check if enough time has passed since last decision if symbol in self.last_decision_time: time_since_last = (current_time - self.last_decision_time[symbol]).total_seconds() if time_since_last < self.decision_frequency: return None # Get current market data current_price = self.data_provider.get_current_price(symbol) if current_price is None: logger.warning(f"No current price available for {symbol}") return None # Get predictions from all registered models predictions = await self._get_all_predictions(symbol) if not predictions: # FALLBACK: Generate basic momentum signal when no models are available logger.debug(f"No model predictions available for {symbol}, generating fallback signal") fallback_prediction = await self._generate_fallback_prediction(symbol, current_price) if fallback_prediction: predictions = [fallback_prediction] else: logger.debug(f"No fallback prediction available for {symbol}") return None # Combine predictions decision = self._combine_predictions( symbol=symbol, price=current_price, predictions=predictions, timestamp=current_time ) # Update state self.last_decision_time[symbol] = current_time if symbol not in self.recent_decisions: self.recent_decisions[symbol] = [] self.recent_decisions[symbol].append(decision) # Keep only recent decisions (last 100) if len(self.recent_decisions[symbol]) > 100: self.recent_decisions[symbol] = self.recent_decisions[symbol][-100:] # Call decision callbacks for callback in self.decision_callbacks: try: await callback(decision) except Exception as e: logger.error(f"Error in decision callback: {e}") # Clean up memory periodically if len(self.recent_decisions[symbol]) % 50 == 0: self.model_registry.cleanup_all_models() return decision except Exception as e: logger.error(f"Error making trading decision for {symbol}: {e}") return None async def _get_all_predictions(self, symbol: str) -> List[Prediction]: """Get predictions from all registered models""" predictions = [] for model_name, model in self.model_registry.models.items(): try: if isinstance(model, CNNModelInterface): # Get CNN predictions for each timeframe cnn_predictions = await self._get_cnn_predictions(model, symbol) predictions.extend(cnn_predictions) elif isinstance(model, RLAgentInterface): # Get RL prediction rl_prediction = await self._get_rl_prediction(model, symbol) if rl_prediction: predictions.append(rl_prediction) else: # Generic model interface generic_prediction = await self._get_generic_prediction(model, symbol) if generic_prediction: predictions.append(generic_prediction) except Exception as e: logger.error(f"Error getting prediction from {model_name}: {e}") continue return predictions async def _get_cnn_predictions(self, model: CNNModelInterface, symbol: str) -> List[Prediction]: """Get predictions from CNN model for all timeframes with enhanced COB features""" predictions = [] try: for timeframe in self.config.timeframes: # Get standard feature matrix for this timeframe feature_matrix = self.data_provider.get_feature_matrix( symbol=symbol, timeframes=[timeframe], window_size=getattr(model, 'window_size', 20) ) # Enhance with COB feature matrix if available enhanced_features = feature_matrix if feature_matrix is not None and self.cob_integration: try: # Get COB feature matrix (5-minute history) cob_feature_matrix = self.get_cob_feature_matrix(symbol, sequence_length=60) if cob_feature_matrix is not None: # Take the latest COB features to augment the standard features latest_cob_features = cob_feature_matrix[-1:, :] # Shape: (1, 400) # Resize to match the feature matrix timeframe dimension timeframe_count = feature_matrix.shape[0] cob_features_expanded = np.repeat(latest_cob_features, timeframe_count, axis=0) # Concatenate COB features with standard features # Standard features shape: (timeframes, window_size, features) # COB features shape: (timeframes, 400) # We'll add COB as additional features to each timeframe window_size = feature_matrix.shape[1] cob_features_reshaped = cob_features_expanded.reshape(timeframe_count, 1, 400) cob_features_tiled = np.tile(cob_features_reshaped, (1, window_size, 1)) # Concatenate along feature dimension enhanced_features = np.concatenate([feature_matrix, cob_features_tiled], axis=2) logger.debug(f"Enhanced CNN features with COB data for {symbol}: " f"{feature_matrix.shape} + COB -> {enhanced_features.shape}") except Exception as cob_error: logger.debug(f"Could not enhance CNN features with COB data: {cob_error}") enhanced_features = feature_matrix if enhanced_features is not None: # Get CNN prediction - use the actual underlying model try: # Ensure features are properly shaped and limited if isinstance(enhanced_features, np.ndarray): # Flatten and limit features to prevent shape mismatches enhanced_features = enhanced_features.flatten() if len(enhanced_features) > 100: # Limit to 100 features enhanced_features = enhanced_features[:100] elif len(enhanced_features) < 100: # Pad with zeros padded = np.zeros(100) padded[:len(enhanced_features)] = enhanced_features enhanced_features = padded if hasattr(model.model, 'act'): # Use the CNN's act method action_result = model.model.act(enhanced_features, explore=False) if isinstance(action_result, tuple): action_idx, confidence = action_result else: action_idx = action_result confidence = 0.7 # Default confidence # Convert to action probabilities action_probs = [0.1, 0.1, 0.8] # Default distribution action_probs[action_idx] = confidence else: # Fallback to generic predict method action_probs, confidence = model.predict(enhanced_features) except Exception as e: logger.warning(f"CNN prediction failed: {e}") action_probs, confidence = None, None if action_probs is not None: # Convert to prediction object action_names = ['SELL', 'HOLD', 'BUY'] best_action_idx = np.argmax(action_probs) best_action = action_names[best_action_idx] prediction = Prediction( action=best_action, confidence=float(confidence) if confidence is not None else float(action_probs[best_action_idx]), probabilities={name: float(prob) for name, prob in zip(action_names, action_probs)}, timeframe=timeframe, timestamp=datetime.now(), model_name=model.name, metadata={ 'timeframe_specific': True, 'cob_enhanced': enhanced_features is not feature_matrix, 'feature_shape': str(enhanced_features.shape) } ) predictions.append(prediction) # Capture CNN prediction for dashboard visualization current_price = self._get_current_price(symbol) if current_price: direction = best_action_idx # 0=SELL, 1=HOLD, 2=BUY pred_confidence = float(confidence) if confidence is not None else float(action_probs[best_action_idx]) predicted_price = current_price * (1 + (pred_confidence * 0.01 if best_action == 'BUY' else -pred_confidence * 0.01 if best_action == 'SELL' else 0)) self.capture_cnn_prediction(symbol, int(direction), pred_confidence, current_price, predicted_price) except Exception as e: logger.error(f"Error getting CNN predictions: {e}") return predictions async def _get_rl_prediction(self, model: RLAgentInterface, symbol: str) -> Optional[Prediction]: """Get prediction from RL agent""" try: # Get current state for RL agent state = self._get_rl_state(symbol) if state is None: return None # Get RL agent's action, confidence, and q_values from the underlying model if hasattr(model.model, 'act_with_confidence'): # Call act_with_confidence and handle different return formats result = model.model.act_with_confidence(state) if len(result) == 3: # EnhancedCNN format: (action, confidence, q_values) action_idx, confidence, raw_q_values = result elif len(result) == 2: # DQN format: (action, confidence) action_idx, confidence = result raw_q_values = None else: logger.error(f"Unexpected return format from act_with_confidence: {len(result)} values") return None elif hasattr(model.model, 'act'): action_idx = model.model.act(state, explore=False) confidence = 0.7 # Default confidence for basic act method raw_q_values = None # No raw q_values from simple act else: logger.error(f"RL model {model.name} has no act method") return None action_names = ['SELL', 'HOLD', 'BUY'] action = action_names[action_idx] # Convert raw_q_values to list if they are a tensor q_values_for_capture = None if raw_q_values is not None and hasattr(raw_q_values, 'tolist'): q_values_for_capture = raw_q_values.tolist() elif raw_q_values is not None and isinstance(raw_q_values, list): q_values_for_capture = raw_q_values # Create prediction object prediction = Prediction( action=action, confidence=float(confidence), # Use actual q_values if available, otherwise default probabilities probabilities={action_names[i]: float(q_values_for_capture[i]) if q_values_for_capture else (1.0 / len(action_names)) for i in range(len(action_names))}, timeframe='mixed', # RL uses mixed timeframes timestamp=datetime.now(), model_name=model.name, metadata={'state_size': len(state)} ) # Capture DQN prediction for dashboard visualization current_price = self._get_current_price(symbol) if current_price: # Only pass q_values if they exist, otherwise pass empty list q_values_to_pass = q_values_for_capture if q_values_for_capture is not None else [] self.capture_dqn_prediction(symbol, action_idx, float(confidence), current_price, q_values_to_pass) return prediction except Exception as e: logger.error(f"Error getting RL prediction: {e}") return None async def _get_generic_prediction(self, model: ModelInterface, symbol: str) -> Optional[Prediction]: """Get prediction from generic model""" try: # Get feature matrix for the model feature_matrix = self.data_provider.get_feature_matrix( symbol=symbol, timeframes=self.config.timeframes[:3], # Use first 3 timeframes window_size=20 ) if feature_matrix is not None: # Ensure feature_matrix is properly shaped and limited if isinstance(feature_matrix, np.ndarray): # Flatten and limit features to prevent shape mismatches feature_matrix = feature_matrix.flatten() if len(feature_matrix) > 2000: # Limit to 2000 features for generic models feature_matrix = feature_matrix[:2000] elif len(feature_matrix) < 2000: # Pad with zeros padded = np.zeros(2000) padded[:len(feature_matrix)] = feature_matrix feature_matrix = padded prediction_result = model.predict(feature_matrix) # Handle different return formats from model.predict() if prediction_result is None: return None # Check if it's a tuple (action_probs, confidence) if isinstance(prediction_result, tuple) and len(prediction_result) == 2: action_probs, confidence = prediction_result elif isinstance(prediction_result, dict): # Handle dictionary return format action_probs = prediction_result.get('probabilities', None) confidence = prediction_result.get('confidence', 0.7) else: # Assume it's just action probabilities action_probs = prediction_result confidence = 0.7 # Default confidence if action_probs is not None: action_names = ['SELL', 'HOLD', 'BUY'] best_action_idx = np.argmax(action_probs) best_action = action_names[best_action_idx] prediction = Prediction( action=best_action, confidence=float(confidence), probabilities={name: float(prob) for name, prob in zip(action_names, action_probs)}, timeframe='mixed', timestamp=datetime.now(), model_name=model.name, metadata={'generic_model': True} ) return prediction return None except Exception as e: logger.error(f"Error getting generic prediction: {e}") return None def _get_rl_state(self, symbol: str) -> Optional[np.ndarray]: """Get current state for RL agent""" try: # Get feature matrix for all timeframes feature_matrix = self.data_provider.get_feature_matrix( symbol=symbol, timeframes=self.config.timeframes, window_size=self.config.rl.get('window_size', 20) ) if feature_matrix is not None: # Flatten the feature matrix for RL agent # Shape: (n_timeframes, window_size, n_features) -> (n_timeframes * window_size * n_features,) state = feature_matrix.flatten() # Add additional state information (position, balance, etc.) # This would come from a portfolio manager in a real implementation additional_state = np.array([0.0, 1.0, 0.0]) # [position, balance, unrealized_pnl] return np.concatenate([state, additional_state]) return None except Exception as e: logger.error(f"Error creating RL state for {symbol}: {e}") return None def _combine_predictions(self, symbol: str, price: float, predictions: List[Prediction], timestamp: datetime) -> TradingDecision: """Combine all predictions into a final decision with aggressiveness and P&L feedback""" try: reasoning = { 'predictions': len(predictions), 'weights': self.model_weights.copy(), 'models_used': [pred.model_name for pred in predictions] } # Get current position P&L for feedback current_position_pnl = self._get_current_position_pnl(symbol, price) # Initialize action scores action_scores = {'BUY': 0.0, 'SELL': 0.0, 'HOLD': 0.0} total_weight = 0.0 # Process all predictions for pred in predictions: # Get model weight model_weight = self.model_weights.get(pred.model_name, 0.1) # Weight by confidence and timeframe importance timeframe_weight = self._get_timeframe_weight(pred.timeframe) weighted_confidence = pred.confidence * timeframe_weight * model_weight action_scores[pred.action] += weighted_confidence total_weight += weighted_confidence # Normalize scores if total_weight > 0: for action in action_scores: action_scores[action] /= total_weight # Choose best action best_action = max(action_scores, key=action_scores.get) best_confidence = action_scores[best_action] # Calculate aggressiveness-adjusted thresholds entry_threshold, exit_threshold = self._calculate_aggressiveness_thresholds( current_position_pnl, symbol ) # Apply aggressiveness-based confidence thresholds if best_action in ['BUY', 'SELL']: # For entry signals, use entry aggressiveness if not self._has_open_position(symbol): if best_confidence < entry_threshold: best_action = 'HOLD' reasoning['entry_threshold_applied'] = True reasoning['entry_threshold'] = entry_threshold # For exit signals, use exit aggressiveness else: if best_confidence < exit_threshold: best_action = 'HOLD' reasoning['exit_threshold_applied'] = True reasoning['exit_threshold'] = exit_threshold else: # Standard threshold for HOLD if best_confidence < self.confidence_threshold: best_action = 'HOLD' reasoning['threshold_applied'] = True # Add P&L-based decision adjustment best_action, best_confidence = self._apply_pnl_feedback( best_action, best_confidence, current_position_pnl, symbol, reasoning ) # Get memory usage stats try: memory_usage = self.model_registry.get_memory_stats() if hasattr(self.model_registry, 'get_memory_stats') else {} except Exception: memory_usage = {} # Calculate dynamic aggressiveness based on recent performance entry_aggressiveness = self._calculate_dynamic_entry_aggressiveness(symbol) exit_aggressiveness = self._calculate_dynamic_exit_aggressiveness(symbol, current_position_pnl) # Create final decision decision = TradingDecision( action=best_action, confidence=best_confidence, symbol=symbol, price=price, timestamp=timestamp, reasoning=reasoning, memory_usage=memory_usage.get('models', {}) if memory_usage else {}, entry_aggressiveness=entry_aggressiveness, exit_aggressiveness=exit_aggressiveness, current_position_pnl=current_position_pnl ) logger.info(f"Decision for {symbol}: {best_action} (confidence: {best_confidence:.3f}, " f"entry_agg: {entry_aggressiveness:.2f}, exit_agg: {exit_aggressiveness:.2f}, " f"pnl: ${current_position_pnl:.2f})") return decision except Exception as e: logger.error(f"Error combining predictions for {symbol}: {e}") # Return safe default return TradingDecision( action='HOLD', confidence=0.0, symbol=symbol, price=price, timestamp=timestamp, reasoning={'error': str(e)}, memory_usage={}, entry_aggressiveness=0.5, exit_aggressiveness=0.5, current_position_pnl=0.0 ) def _get_timeframe_weight(self, timeframe: str) -> float: """Get importance weight for a timeframe""" # Higher timeframes get more weight in decision making weights = { '1m': 0.1, '5m': 0.2, '15m': 0.3, '30m': 0.4, '1h': 0.6, '4h': 0.8, '1d': 1.0 } return weights.get(timeframe, 0.5) def update_model_performance(self, model_name: str, was_correct: bool): """Update performance tracking for a model""" if model_name in self.model_performance: self.model_performance[model_name]['total'] += 1 if was_correct: self.model_performance[model_name]['correct'] += 1 # Update accuracy total = self.model_performance[model_name]['total'] correct = self.model_performance[model_name]['correct'] self.model_performance[model_name]['accuracy'] = correct / total if total > 0 else 0.0 def adapt_weights(self): """Dynamically adapt model weights based on performance""" try: for model_name, performance in self.model_performance.items(): if performance['total'] > 0: # Adjust weight based on relative performance accuracy = performance['correct'] / performance['total'] self.model_weights[model_name] = accuracy logger.info(f"Adapted {model_name} weight: {self.model_weights[model_name]}") except Exception as e: logger.error(f"Error adapting weights: {e}") def get_recent_decisions(self, symbol: str, limit: int = 10) -> List[TradingDecision]: """Get recent decisions for a symbol""" if symbol in self.recent_decisions: return self.recent_decisions[symbol][-limit:] return [] def get_performance_metrics(self) -> Dict[str, Any]: """Get performance metrics for the orchestrator""" return { 'model_performance': self.model_performance.copy(), 'weights': self.model_weights.copy(), 'configuration': { 'confidence_threshold': self.confidence_threshold, 'decision_frequency': self.decision_frequency }, 'recent_activity': { symbol: len(decisions) for symbol, decisions in self.recent_decisions.items() } } def get_model_states(self) -> Dict[str, Dict]: """Get current model states with REAL checkpoint data - SSOT for dashboard""" try: # ENHANCED: Load actual checkpoint metadata for each model from utils.checkpoint_manager import load_best_checkpoint # Update each model with REAL checkpoint data for model_name in ['dqn_agent', 'enhanced_cnn', 'extrema_trainer', 'decision', 'cob_rl']: try: result = load_best_checkpoint(model_name) if result: file_path, metadata = result # Map model names to internal keys internal_key = { 'dqn_agent': 'dqn', 'enhanced_cnn': 'cnn', 'extrema_trainer': 'extrema_trainer', 'decision': 'decision', 'cob_rl': 'cob_rl' }.get(model_name, model_name) if internal_key in self.model_states: # Load REAL checkpoint data self.model_states[internal_key]['current_loss'] = getattr(metadata, 'loss', None) or getattr(metadata, 'val_loss', None) self.model_states[internal_key]['best_loss'] = getattr(metadata, 'loss', None) or getattr(metadata, 'val_loss', None) self.model_states[internal_key]['checkpoint_loaded'] = True self.model_states[internal_key]['checkpoint_filename'] = metadata.checkpoint_id self.model_states[internal_key]['performance_score'] = getattr(metadata, 'performance_score', 0.0) self.model_states[internal_key]['created_at'] = str(getattr(metadata, 'created_at', 'Unknown')) # Set initial loss from checkpoint if available if self.model_states[internal_key]['initial_loss'] is None: # Try to infer initial loss from performance improvement if hasattr(metadata, 'accuracy') and metadata.accuracy: # Estimate initial loss from current accuracy (inverse relationship) estimated_initial = max(0.1, 2.0 - (metadata.accuracy * 2.0)) self.model_states[internal_key]['initial_loss'] = estimated_initial logger.debug(f"Loaded REAL checkpoint data for {model_name}: loss={self.model_states[internal_key]['current_loss']}") else: # No checkpoint found - mark as fresh internal_key = { 'dqn_agent': 'dqn', 'enhanced_cnn': 'cnn', 'extrema_trainer': 'extrema_trainer', 'decision': 'decision', 'cob_rl': 'cob_rl' }.get(model_name, model_name) if internal_key in self.model_states: self.model_states[internal_key]['checkpoint_loaded'] = False self.model_states[internal_key]['checkpoint_filename'] = 'none (fresh start)' except Exception as e: logger.debug(f"No checkpoint found for {model_name}: {e}") # ADDITIONAL: Update from live training if models are actively training if self.rl_agent and hasattr(self.rl_agent, 'losses') and len(self.rl_agent.losses) > 0: recent_losses = self.rl_agent.losses[-10:] # Last 10 training steps if recent_losses: live_loss = sum(recent_losses) / len(recent_losses) # Only update if we have a live loss that's different from checkpoint if abs(live_loss - (self.model_states['dqn']['current_loss'] or 0)) > 0.001: self.model_states['dqn']['current_loss'] = live_loss logger.debug(f"Updated DQN with live training loss: {live_loss:.4f}") if self.cnn_model and hasattr(self.cnn_model, 'training_loss'): if self.cnn_model.training_loss and abs(self.cnn_model.training_loss - (self.model_states['cnn']['current_loss'] or 0)) > 0.001: self.model_states['cnn']['current_loss'] = self.cnn_model.training_loss logger.debug(f"Updated CNN with live training loss: {self.cnn_model.training_loss:.4f}") if self.extrema_trainer and hasattr(self.extrema_trainer, 'best_detection_accuracy'): # Convert accuracy to loss estimate if self.extrema_trainer.best_detection_accuracy > 0: estimated_loss = max(0.001, 1.0 - self.extrema_trainer.best_detection_accuracy) self.model_states['extrema_trainer']['current_loss'] = estimated_loss self.model_states['extrema_trainer']['best_loss'] = estimated_loss # NO LONGER SETTING SYNTHETIC INITIAL LOSS VALUES # Keep all None values as None if no real data is available # This prevents the "fake progress" issue where Current Loss = Initial Loss # Only set initial_loss from actual training history if available for model_key, model_state in self.model_states.items(): # Leave initial_loss as None if no real training history exists # Leave current_loss as None if model isn't actively training # Leave best_loss as None if no checkpoints exist with real performance data pass # No synthetic data generation return self.model_states except Exception as e: logger.error(f"Error getting model states: {e}") # Return None values instead of synthetic data return { 'dqn': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'cnn': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'cob_rl': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'decision': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}, 'extrema_trainer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False} } def _initialize_decision_fusion(self): """Initialize the decision fusion neural network for learning model effectiveness""" try: if not self.decision_fusion_enabled: return import torch import torch.nn as nn # Create decision fusion network class DecisionFusionNet(nn.Module): def __init__(self, input_size=32, hidden_size=64): super().__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.fc2 = nn.Linear(hidden_size, hidden_size) self.fc3 = nn.Linear(hidden_size, 3) # BUY, SELL, HOLD self.dropout = nn.Dropout(0.2) def forward(self, x): x = torch.relu(self.fc1(x)) x = self.dropout(x) x = torch.relu(self.fc2(x)) x = self.dropout(x) return torch.softmax(self.fc3(x), dim=1) self.decision_fusion_network = DecisionFusionNet() logger.info("Decision fusion network initialized") except Exception as e: logger.warning(f"Decision fusion initialization failed: {e}") self.decision_fusion_enabled = False def _initialize_enhanced_training_system(self): """Initialize the enhanced real-time training system""" try: if not self.training_enabled: logger.info("Enhanced training system disabled") return if not ENHANCED_TRAINING_AVAILABLE: logger.warning("EnhancedRealtimeTrainingSystem not available - training disabled") self.training_enabled = False return # Initialize the enhanced training system self.enhanced_training_system = EnhancedRealtimeTrainingSystem( orchestrator=self, data_provider=self.data_provider, dashboard=None # Will be set by dashboard when available ) logger.info("Enhanced real-time training system initialized") logger.info(" - Real-time model training: ENABLED") logger.info(" - Comprehensive feature extraction: ENABLED") logger.info(" - Enhanced reward calculation: ENABLED") logger.info(" - Forward-looking predictions: ENABLED") except Exception as e: logger.error(f"Error initializing enhanced training system: {e}") self.training_enabled = False self.enhanced_training_system = None def start_enhanced_training(self): """Start the enhanced real-time training system""" try: if not self.training_enabled or not self.enhanced_training_system: logger.warning("Enhanced training system not available") return False self.enhanced_training_system.start_training() logger.info("Enhanced real-time training started") return True except Exception as e: logger.error(f"Error starting enhanced training: {e}") return False def stop_enhanced_training(self): """Stop the enhanced real-time training system""" try: if self.enhanced_training_system: self.enhanced_training_system.stop_training() logger.info("Enhanced real-time training stopped") return True return False except Exception as e: logger.error(f"Error stopping enhanced training: {e}") return False def get_enhanced_training_stats(self) -> Dict[str, Any]: """Get enhanced training system statistics with orchestrator integration""" try: if not self.enhanced_training_system: return { 'training_enabled': False, 'system_available': ENHANCED_TRAINING_AVAILABLE, 'error': 'Training system not initialized' } # Get base stats from enhanced training system stats = self.enhanced_training_system.get_training_statistics() stats['training_enabled'] = self.training_enabled stats['system_available'] = ENHANCED_TRAINING_AVAILABLE # Add orchestrator-specific training integration data stats['orchestrator_integration'] = { 'models_connected': len([m for m in [self.rl_agent, self.cnn_model, self.cob_rl_agent, self.decision_model] if m is not None]), 'cob_integration_active': self.cob_integration is not None, 'decision_fusion_enabled': self.decision_fusion_enabled, 'symbols_tracking': len(self.symbols), 'recent_decisions_count': sum(len(decisions) for decisions in self.recent_decisions.values()), 'model_weights': self.model_weights.copy(), 'realtime_processing': self.realtime_processing } # Add model-specific training status from orchestrator stats['model_training_status'] = {} model_mappings = { 'dqn': self.rl_agent, 'cnn': self.cnn_model, 'cob_rl': self.cob_rl_agent, 'decision': self.decision_model } for model_name, model in model_mappings.items(): if model: model_stats = { 'model_loaded': True, 'memory_usage': 0, 'training_steps': 0, 'last_loss': None, 'checkpoint_loaded': self.model_states.get(model_name, {}).get('checkpoint_loaded', False) } # Get memory usage if hasattr(model, 'memory') and model.memory: model_stats['memory_usage'] = len(model.memory) # Get training steps if hasattr(model, 'training_steps'): model_stats['training_steps'] = model.training_steps # Get last loss if hasattr(model, 'losses') and model.losses: model_stats['last_loss'] = model.losses[-1] stats['model_training_status'][model_name] = model_stats else: stats['model_training_status'][model_name] = { 'model_loaded': False, 'memory_usage': 0, 'training_steps': 0, 'last_loss': None, 'checkpoint_loaded': False } # Add prediction tracking stats stats['prediction_tracking'] = { 'dqn_predictions_tracked': sum(len(preds) for preds in self.recent_dqn_predictions.values()), 'cnn_predictions_tracked': sum(len(preds) for preds in self.recent_cnn_predictions.values()), 'accuracy_history_tracked': sum(len(history) for history in self.prediction_accuracy_history.values()), 'symbols_with_predictions': [symbol for symbol in self.symbols if len(self.recent_dqn_predictions.get(symbol, [])) > 0 or len(self.recent_cnn_predictions.get(symbol, [])) > 0] } # Add COB integration stats if available if self.cob_integration: stats['cob_integration_stats'] = { 'latest_cob_data_symbols': list(self.latest_cob_data.keys()), 'cob_features_available': list(self.latest_cob_features.keys()), 'cob_state_available': list(self.latest_cob_state.keys()), 'feature_history_length': {symbol: len(history) for symbol, history in self.cob_feature_history.items()} } return stats except Exception as e: logger.error(f"Error getting training stats: {e}") return { 'training_enabled': self.training_enabled, 'system_available': ENHANCED_TRAINING_AVAILABLE, 'error': str(e) } def set_training_dashboard(self, dashboard): """Set the dashboard reference for the training system""" try: if self.enhanced_training_system: self.enhanced_training_system.dashboard = dashboard logger.info("Dashboard reference set for enhanced training system") except Exception as e: logger.error(f"Error setting training dashboard: {e}") def get_universal_data_stream(self, current_time: datetime = None) -> Optional[UniversalDataStream]: """Get universal data stream for external consumers like dashboard""" try: return self.universal_adapter.get_universal_data_stream(current_time) except Exception as e: logger.error(f"Error getting universal data stream: {e}") return None def get_universal_data_for_model(self, model_type: str = 'cnn') -> Optional[Dict[str, Any]]: """Get formatted universal data for specific model types""" try: stream = self.universal_adapter.get_universal_data_stream() if stream: return self.universal_adapter.format_for_model(stream, model_type) return None except Exception as e: logger.error(f"Error getting universal data for {model_type}: {e}") return None def _get_current_position_pnl(self, symbol: str, current_price: float) -> float: """Get current position P&L for the symbol""" try: if self.trading_executor and hasattr(self.trading_executor, 'get_current_position'): position = self.trading_executor.get_current_position(symbol) if position: entry_price = position.get('price', 0) size = position.get('size', 0) side = position.get('side', 'LONG') if entry_price and size > 0: if side.upper() == 'LONG': pnl = (current_price - entry_price) * size else: # SHORT pnl = (entry_price - current_price) * size return pnl return 0.0 except Exception as e: logger.debug(f"Error getting position P&L for {symbol}: {e}") return 0.0 def _has_open_position(self, symbol: str) -> bool: """Check if there's an open position for the symbol""" try: if self.trading_executor and hasattr(self.trading_executor, 'get_current_position'): position = self.trading_executor.get_current_position(symbol) return position is not None and position.get('size', 0) > 0 return False except Exception: return False def _calculate_aggressiveness_thresholds(self, current_pnl: float, symbol: str) -> tuple: """Calculate confidence thresholds based on aggressiveness settings""" # Base thresholds base_entry_threshold = self.confidence_threshold base_exit_threshold = self.confidence_threshold_close # Get aggressiveness settings (could be from config or adaptive) entry_agg = getattr(self, 'entry_aggressiveness', 0.5) exit_agg = getattr(self, 'exit_aggressiveness', 0.5) # Adjust thresholds based on aggressiveness # More aggressive = lower threshold (more trades) # Less aggressive = higher threshold (fewer, higher quality trades) entry_threshold = base_entry_threshold * (1.5 - entry_agg) # 0.5 agg = 1.0x, 1.0 agg = 0.5x exit_threshold = base_exit_threshold * (1.5 - exit_agg) # Ensure minimum thresholds entry_threshold = max(0.05, entry_threshold) exit_threshold = max(0.02, exit_threshold) return entry_threshold, exit_threshold def _apply_pnl_feedback(self, action: str, confidence: float, current_pnl: float, symbol: str, reasoning: dict) -> tuple: """Apply P&L-based feedback to decision making""" try: # If we have a losing position, be more aggressive about cutting losses if current_pnl < -10.0: # Losing more than $10 if action == 'SELL' and self._has_open_position(symbol): # Boost confidence for exit signals when losing confidence = min(1.0, confidence * 1.2) reasoning['pnl_loss_cut_boost'] = True elif action == 'BUY': # Reduce confidence for new entries when losing confidence *= 0.8 reasoning['pnl_loss_entry_reduction'] = True # If we have a winning position, be more conservative about exits elif current_pnl > 5.0: # Winning more than $5 if action == 'SELL' and self._has_open_position(symbol): # Reduce confidence for exit signals when winning (let profits run) confidence *= 0.9 reasoning['pnl_profit_hold'] = True elif action == 'BUY': # Slightly boost confidence for entries when on a winning streak confidence = min(1.0, confidence * 1.05) reasoning['pnl_winning_streak_boost'] = True reasoning['current_pnl'] = current_pnl return action, confidence except Exception as e: logger.debug(f"Error applying P&L feedback: {e}") return action, confidence def _calculate_dynamic_entry_aggressiveness(self, symbol: str) -> float: """Calculate dynamic entry aggressiveness based on recent performance""" try: # Start with base aggressiveness base_agg = getattr(self, 'entry_aggressiveness', 0.5) # Get recent decisions for this symbol recent_decisions = self.get_recent_decisions(symbol, limit=10) if len(recent_decisions) < 3: return base_agg # Calculate win rate winning_decisions = sum(1 for d in recent_decisions if d.reasoning.get('was_profitable', False)) win_rate = winning_decisions / len(recent_decisions) # Adjust aggressiveness based on performance if win_rate > 0.7: # High win rate - be more aggressive return min(1.0, base_agg + 0.2) elif win_rate < 0.3: # Low win rate - be more conservative return max(0.1, base_agg - 0.2) else: return base_agg except Exception as e: logger.debug(f"Error calculating dynamic entry aggressiveness: {e}") return 0.5 def _calculate_dynamic_exit_aggressiveness(self, symbol: str, current_pnl: float) -> float: """Calculate dynamic exit aggressiveness based on P&L and market conditions""" try: # Start with base aggressiveness base_agg = getattr(self, 'exit_aggressiveness', 0.5) # Adjust based on current P&L if current_pnl < -20.0: # Large loss - be very aggressive about cutting return min(1.0, base_agg + 0.3) elif current_pnl < -5.0: # Small loss - be more aggressive return min(1.0, base_agg + 0.1) elif current_pnl > 20.0: # Large profit - be less aggressive (let it run) return max(0.1, base_agg - 0.2) elif current_pnl > 5.0: # Small profit - slightly less aggressive return max(0.2, base_agg - 0.1) else: return base_agg except Exception as e: logger.debug(f"Error calculating dynamic exit aggressiveness: {e}") return 0.5 def set_trading_executor(self, trading_executor): """Set the trading executor for position tracking""" self.trading_executor = trading_executor logger.info("Trading executor set for position tracking and P&L feedback") def _get_current_price(self, symbol: str) -> float: """Get current price for symbol""" try: # Try to get from data provider if self.data_provider: try: # Try different methods to get current price if hasattr(self.data_provider, 'get_latest_data'): latest_data = self.data_provider.get_latest_data(symbol) if latest_data and 'price' in latest_data: return float(latest_data['price']) elif latest_data and 'close' in latest_data: return float(latest_data['close']) elif hasattr(self.data_provider, 'get_current_price'): return float(self.data_provider.get_current_price(symbol)) elif hasattr(self.data_provider, 'get_latest_candle'): latest_candle = self.data_provider.get_latest_candle(symbol, '1m') if latest_candle and 'close' in latest_candle: return float(latest_candle['close']) except Exception as e: logger.debug(f"Could not get price from data provider: {e}") # Try to get from universal adapter if self.universal_adapter: try: data_stream = self.universal_adapter.get_latest_data(symbol) if data_stream and hasattr(data_stream, 'current_price'): return float(data_stream.current_price) except Exception as e: logger.debug(f"Could not get price from universal adapter: {e}") # Fallback to default prices default_prices = { 'ETH/USDT': 2500.0, 'BTC/USDT': 108000.0 } return default_prices.get(symbol, 1000.0) except Exception as e: logger.error(f"Error getting current price for {symbol}: {e}") # Return default price based on symbol if 'ETH' in symbol: return 2500.0 elif 'BTC' in symbol: return 108000.0 else: return 1000.0 def _generate_fallback_prediction(self, symbol: str) -> Dict[str, Any]: """Generate fallback prediction when models fail""" try: return { 'action': 'HOLD', 'confidence': 0.5, 'price': self._get_current_price(symbol) or 2500.0, 'timestamp': datetime.now(), 'model': 'fallback' } except Exception as e: logger.debug(f"Error generating fallback prediction: {e}") return { 'action': 'HOLD', 'confidence': 0.5, 'price': 2500.0, 'timestamp': datetime.now(), 'model': 'fallback' } def capture_dqn_prediction(self, symbol: str, action_idx: int, confidence: float, price: float, q_values: List[float] = None): """Capture DQN prediction for dashboard visualization""" try: if symbol not in self.recent_dqn_predictions: self.recent_dqn_predictions[symbol] = deque(maxlen=100) prediction_data = { 'timestamp': datetime.now(), 'action': ['SELL', 'HOLD', 'BUY'][action_idx], 'confidence': confidence, 'price': price, 'q_values': q_values or [0.33, 0.33, 0.34] } self.recent_dqn_predictions[symbol].append(prediction_data) except Exception as e: logger.debug(f"Error capturing DQN prediction: {e}") def capture_cnn_prediction(self, symbol: str, direction: int, confidence: float, current_price: float, predicted_price: float): """Capture CNN prediction for dashboard visualization""" try: if symbol not in self.recent_cnn_predictions: self.recent_cnn_predictions[symbol] = deque(maxlen=50) prediction_data = { 'timestamp': datetime.now(), 'direction': ['DOWN', 'SAME', 'UP'][direction], 'confidence': confidence, 'current_price': current_price, 'predicted_price': predicted_price } self.recent_cnn_predictions[symbol].append(prediction_data) except Exception as e: logger.debug(f"Error capturing CNN prediction: {e}")