diff --git a/.kiro/specs/multi-modal-trading-system/design.md b/.kiro/specs/multi-modal-trading-system/design.md index 469e1aa..3fec0a5 100644 --- a/.kiro/specs/multi-modal-trading-system/design.md +++ b/.kiro/specs/multi-modal-trading-system/design.md @@ -74,6 +74,16 @@ Based on the existing implementation in `core/data_provider.py`, we'll enhance i - 1,5,15 and 60s MA of the COB imbalance counting +- 5 COB buckets - ***OUTPUTS***: suggested trade action (BUY/SELL) +# Standardized input for all models: +{ + 'primary_symbol': 'ETH/USDT', + 'reference_symbol': 'BTC/USDT', + 'eth_data': {'ETH_1s': df, 'ETH_1m': df, 'ETH_1h': df, 'ETH_1d': df}, + 'btc_data': {'BTC_1s': df}, + 'current_prices': {'ETH': price, 'BTC': price}, + 'data_completeness': {...} +} + ### 2. CNN Model The CNN Model is responsible for analyzing patterns in market data and predicting pivot points across multiple timeframes. diff --git a/.kiro/specs/multi-modal-trading-system/tasks.md b/.kiro/specs/multi-modal-trading-system/tasks.md index 5e6b345..be0b4d9 100644 --- a/.kiro/specs/multi-modal-trading-system/tasks.md +++ b/.kiro/specs/multi-modal-trading-system/tasks.md @@ -197,7 +197,8 @@ - Ensure validation occurs before any model inference - _Requirements: 9.1, 9.4_ -- [ ] 5.2. Implement persistent inference history storage +- [x] 5.2. Implement persistent inference history storage + - Create InferenceHistoryStore class for persistent storage - Store complete input data packages with each prediction - Include timestamp, symbol, input features, prediction outputs, confidence scores diff --git a/core/orchestrator.py b/core/orchestrator.py index 7ddb660..a4483e6 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -110,7 +110,9 @@ class TradingOrchestrator: self.confidence_threshold_close = self.config.orchestrator.get('confidence_threshold_close', 0.08) # Lowered from 0.10 # Decision frequency limit to prevent excessive trading self.decision_frequency = self.config.orchestrator.get('decision_frequency', 30) - self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols + + self.symbol = self.config.get('symbol', "ETH/USDT") # main symbol we wre trading and making predictions on. only one! + self.ref_symbols = self.config.get('ref_symbols', [ 'BTC/USDT']) # Enhanced to support multiple reference symbols. ToDo: we can add 'SOL/USDT' later # NEW: Aggressiveness parameters self.entry_aggressiveness = self.config.orchestrator.get('entry_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive @@ -153,12 +155,11 @@ class TradingOrchestrator: self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions self.prediction_accuracy_history: Dict[str, deque] = {} # {symbol: List[Dict]} - Prediction accuracy tracking - # Initialize prediction tracking for each symbol - for symbol in self.symbols: - self.recent_dqn_predictions[symbol] = deque(maxlen=100) - self.recent_cnn_predictions[symbol] = deque(maxlen=50) - self.prediction_accuracy_history[symbol] = deque(maxlen=200) - self.signal_accumulator[symbol] = [] + # Initialize prediction tracking for the primary trading symbol only + self.recent_dqn_predictions[self.symbol] = deque(maxlen=100) + self.recent_cnn_predictions[self.symbol] = deque(maxlen=50) + self.prediction_accuracy_history[self.symbol] = deque(maxlen=200) + self.signal_accumulator[self.symbol] = [] # Decision callbacks self.decision_callbacks: List[Any] = [] @@ -177,7 +178,7 @@ class TradingOrchestrator: self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot} self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features - self.cob_feature_history: Dict[str, List[Any]] = {symbol: [] for symbol in self.symbols} # Rolling history for models + self.cob_feature_history: Dict[str, List[Any]] = {self.symbol: []} # Rolling history for primary trading symbol # Enhanced ML Models self.rl_agent: Any = None # DQN Agent @@ -204,13 +205,13 @@ class TradingOrchestrator: # Training tracking self.last_trained_symbols: Dict[str, datetime] = {} - # INFERENCE DATA STORAGE - Store model inputs and outputs for training - self.inference_history: Dict[str, deque] = {} # {symbol: deque of inference records} - self.max_inference_history = 1000 # Keep last 1000 inference records per symbol + # INFERENCE DATA STORAGE - Per-model storage with memory optimization + self.inference_history: Dict[str, deque] = {} # {model_name: deque of last 5 inference records} + self.max_memory_inferences = 5 # Keep only last 5 inferences in memory per model + self.max_disk_files_per_model = 200 # Cap disk files per model - # Initialize inference history for each symbol - for symbol in self.symbols: - self.inference_history[symbol] = deque(maxlen=self.max_inference_history) + # Initialize inference history for each model (will be populated as models make predictions) + # We'll create entries dynamically as models are used # ENHANCED: Real-time Training System Integration self.enhanced_training_system = None # Will be set to EnhancedRealtimeTrainingSystem if available @@ -1071,9 +1072,9 @@ class TradingOrchestrator: # Store input data for generic model model_input = input_data.get('generic_input') - # Store inference data for training + # Store inference data for training (per-model, async) if prediction and model_input is not None: - self._store_inference_data(symbol, model_name, model_input, prediction, current_time) + await self._store_inference_data_async(model_name, model_input, prediction, current_time) except Exception as e: logger.error(f"Error getting prediction from {model_name}: {e}") @@ -1085,61 +1086,134 @@ class TradingOrchestrator: return predictions async def _collect_model_input_data(self, symbol: str) -> Dict[str, Any]: - """Collect comprehensive input data for all models""" + """Collect standardized input data for all models - ETH primary + BTC reference""" try: - input_data = {} + # Only collect data for ETH (primary symbol) - we inference only for ETH + if symbol != 'ETH/USDT': + return {} - # Get current market data from data provider - current_price = self.data_provider.get_current_price(symbol) + # Standardized input: 4 ETH timeframes + 1s BTC reference + eth_data = {} + eth_timeframes = ['1s', '1m', '1h', '1d'] - # Collect OHLCV data for multiple timeframes - ohlcv_data = {} - timeframes = ['1s', '1m', '1h', '1d'] - for tf in timeframes: - df = self.data_provider.get_historical_data(symbol, tf, limit=300) + # Collect ETH data for all timeframes + for tf in eth_timeframes: + df = self.data_provider.get_historical_data('ETH/USDT', tf, limit=300) if df is not None and not df.empty: - ohlcv_data[tf] = df + eth_data[f'ETH_{tf}'] = df - # Collect COB data if available - cob_data = self.get_cob_snapshot(symbol) + # Collect BTC 1s reference data + btc_1s = self.data_provider.get_historical_data('BTC/USDT', '1s', limit=300) + btc_data = {} + if btc_1s is not None and not btc_1s.empty: + btc_data['BTC_1s'] = btc_1s - # Collect technical indicators - technical_indicators = {} - if '1h' in ohlcv_data: - df = ohlcv_data['1h'] - if len(df) > 20: - technical_indicators['sma_20'] = df['close'].rolling(20).mean().iloc[-1] - technical_indicators['rsi'] = self._calculate_rsi(df['close']) + # Get current prices + eth_price = self.data_provider.get_current_price('ETH/USDT') + btc_price = self.data_provider.get_current_price('BTC/USDT') - # Prepare CNN input - cnn_input = self._prepare_cnn_input_data(ohlcv_data, cob_data, technical_indicators) - - # Prepare RL input - rl_input = self._prepare_rl_input_data(ohlcv_data, cob_data, technical_indicators) - - # Prepare generic input - generic_input = { - 'symbol': symbol, - 'current_price': current_price, - 'ohlcv_data': ohlcv_data, - 'cob_data': cob_data, - 'technical_indicators': technical_indicators - } - - input_data = { - 'cnn_input': cnn_input, - 'rl_input': rl_input, - 'generic_input': generic_input, + # Create standardized input package + standardized_input = { 'timestamp': datetime.now(), - 'symbol': symbol + 'primary_symbol': 'ETH/USDT', + 'reference_symbol': 'BTC/USDT', + 'eth_data': eth_data, + 'btc_data': btc_data, + 'current_prices': { + 'ETH': eth_price, + 'BTC': btc_price + }, + 'data_completeness': { + 'eth_timeframes': len(eth_data), + 'btc_reference': len(btc_data), + 'total_expected': 5 # 4 ETH + 1 BTC + } } - return input_data + return standardized_input except Exception as e: - logger.error(f"Error collecting model input data for {symbol}: {e}") + logger.error(f"Error collecting standardized model input data: {e}") return {} + async def _store_inference_data_async(self, model_name: str, model_input: Any, prediction: Prediction, timestamp: datetime): + """Store inference data per-model with async file operations and memory optimization""" + try: + # Create comprehensive inference record + inference_record = { + 'timestamp': timestamp.isoformat(), + 'model_name': model_name, + 'model_input': model_input, + 'prediction': { + 'action': prediction.action, + 'confidence': prediction.confidence, + 'probabilities': prediction.probabilities, + 'timeframe': prediction.timeframe + }, + 'metadata': prediction.metadata or {} + } + + # Store in memory (only last 5 per model) + if model_name not in self.inference_history: + self.inference_history[model_name] = deque(maxlen=self.max_memory_inferences) + + self.inference_history[model_name].append(inference_record) + + # Async file storage (don't wait for completion) + asyncio.create_task(self._save_inference_to_disk_async(model_name, inference_record)) + + logger.debug(f"Stored inference data for {model_name} (memory: {len(self.inference_history[model_name])}/5)") + + except Exception as e: + logger.error(f"Error storing inference data for {model_name}: {e}") + + async def _save_inference_to_disk_async(self, model_name: str, inference_record: Dict): + """Async save inference record to disk with file capping""" + try: + # Create model-specific directory + model_dir = Path(f"training_data/inference_history/{model_name}") + model_dir.mkdir(parents=True, exist_ok=True) + + # Create filename with timestamp + timestamp_str = datetime.fromisoformat(inference_record['timestamp']).strftime('%Y%m%d_%H%M%S_%f')[:-3] + filename = f"inference_{timestamp_str}.json" + filepath = model_dir / filename + + # Convert to JSON-serializable format + serializable_record = self._make_json_serializable(inference_record) + + # Save to file + with open(filepath, 'w') as f: + json.dump(serializable_record, f, indent=2) + + # Cap files per model (keep only latest 200) + await self._cap_model_files(model_dir) + + logger.debug(f"Saved inference record to disk: {filepath}") + + except Exception as e: + logger.error(f"Error saving inference to disk for {model_name}: {e}") + + async def _cap_model_files(self, model_dir: Path): + """Cap the number of files per model to max_disk_files_per_model""" + try: + # Get all inference files + files = list(model_dir.glob("inference_*.json")) + + if len(files) > self.max_disk_files_per_model: + # Sort by modification time (oldest first) + files.sort(key=lambda x: x.stat().st_mtime) + + # Remove oldest files + files_to_remove = files[:-self.max_disk_files_per_model] + for file_path in files_to_remove: + file_path.unlink() + + logger.debug(f"Removed {len(files_to_remove)} old inference files from {model_dir.name}") + + except Exception as e: + logger.error(f"Error capping model files in {model_dir}: {e}") + def _prepare_cnn_input_data(self, ohlcv_data: Dict, cob_data: Any, technical_indicators: Dict) -> np.ndarray: """Prepare standardized input data for CNN models""" try: @@ -1350,6 +1424,35 @@ class TradingOrchestrator: logger.error(f"Error loading inference history from disk: {e}") return [] + async def load_model_inference_history(self, model_name: str, limit: int = 50) -> List[Dict]: + """Load inference history for a specific model from disk""" + try: + model_dir = Path(f"training_data/inference_history/{model_name}") + if not model_dir.exists(): + return [] + + # Get all inference files + files = list(model_dir.glob("inference_*.json")) + files.sort(key=lambda x: x.stat().st_mtime, reverse=True) # Newest first + + # Load up to 'limit' files + inference_records = [] + for filepath in files[:limit]: + try: + with open(filepath, 'r') as f: + record = json.load(f) + inference_records.append(record) + except Exception as e: + logger.warning(f"Error loading inference file {filepath}: {e}") + continue + + logger.info(f"Loaded {len(inference_records)} inference records for {model_name}") + return inference_records + + except Exception as e: + logger.error(f"Error loading model inference history for {model_name}: {e}") + return [] + def get_model_training_data(self, model_name: str, symbol: str = None) -> List[Dict]: """Get training data for a specific model""" try: diff --git a/docs/requirements.md b/docs/requirements.md index 3c55acb..4e20e35 100644 --- a/docs/requirements.md +++ b/docs/requirements.md @@ -16,6 +16,18 @@ ETH: 300s of 1s OHLCV data (5 min) 300 OHLCV + indicatros bars of each 1m 1h 1d and 1s BTC +so: + +# Standardized input for all models: +{ + 'primary_symbol': 'ETH/USDT', + 'reference_symbol': 'BTC/USDT', + 'eth_data': {'ETH_1s': df, 'ETH_1m': df, 'ETH_1h': df, 'ETH_1d': df}, + 'btc_data': {'BTC_1s': df}, + 'current_prices': {'ETH': price, 'BTC': price}, + 'data_completeness': {...} +} + RL model should have also access of the last hidden layers of the CNN model where patterns are learned. it can be empty if CNN model is not active or missing. as well as the output (predictions) of the CNN model for each timeframe (1s 1m 1h 1d) and next expected pivot point ## CNN model