""" Simplified Data Integration for Orchestrator Replaces complex FIFO queues with simple cache-based data access. Integrates with SmartDataUpdater for efficient data management. """ import logging from datetime import datetime from typing import Dict, List, Optional, Any import pandas as pd from .data_cache import get_data_cache from .smart_data_updater import SmartDataUpdater from .data_models import BaseDataInput, OHLCVBar logger = logging.getLogger(__name__) class SimplifiedDataIntegration: """ Simplified data integration that replaces FIFO queues with efficient caching """ def __init__(self, data_provider, symbols: List[str]): self.data_provider = data_provider self.symbols = symbols self.cache = get_data_cache() # Initialize smart data updater self.data_updater = SmartDataUpdater(data_provider, symbols) # Register for tick data if available self._setup_tick_integration() logger.info(f"SimplifiedDataIntegration initialized for {symbols}") def start(self): """Start the data integration system""" self.data_updater.start() logger.info("SimplifiedDataIntegration started") def stop(self): """Stop the data integration system""" self.data_updater.stop() logger.info("SimplifiedDataIntegration stopped") def _setup_tick_integration(self): """Setup integration with tick data sources""" try: # Register callbacks for tick data if available if hasattr(self.data_provider, 'register_tick_callback'): self.data_provider.register_tick_callback(self._on_tick_data) # Register for WebSocket data if available if hasattr(self.data_provider, 'register_websocket_callback'): self.data_provider.register_websocket_callback(self._on_websocket_data) except Exception as e: logger.warning(f"Tick integration setup failed: {e}") def _on_tick_data(self, symbol: str, price: float, volume: float, timestamp: datetime = None): """Handle incoming tick data""" self.data_updater.add_tick(symbol, price, volume, timestamp) def _on_websocket_data(self, symbol: str, data: Dict[str, Any]): """Handle WebSocket data updates""" try: # Extract price and volume from WebSocket data if 'price' in data and 'volume' in data: self.data_updater.add_tick(symbol, data['price'], data['volume']) except Exception as e: logger.error(f"Error processing WebSocket data: {e}") def build_base_data_input(self, symbol: str) -> Optional[BaseDataInput]: """ Build BaseDataInput from cached data (much simpler than FIFO queues) Args: symbol: Trading symbol Returns: BaseDataInput with consistent data structure """ try: # Check if we have minimum required data required_timeframes = ['1s', '1m', '1h', '1d'] missing_timeframes = [] for timeframe in required_timeframes: if not self.cache.has_data(f'ohlcv_{timeframe}', symbol, max_age_seconds=300): missing_timeframes.append(timeframe) if missing_timeframes: logger.warning(f"Missing data for {symbol}: {missing_timeframes}") # Try to use historical data as fallback if not self._try_historical_fallback(symbol, missing_timeframes): return None # Get current OHLCV data ohlcv_1s_list = self._get_ohlcv_data_list(symbol, '1s', 300) ohlcv_1m_list = self._get_ohlcv_data_list(symbol, '1m', 300) ohlcv_1h_list = self._get_ohlcv_data_list(symbol, '1h', 300) ohlcv_1d_list = self._get_ohlcv_data_list(symbol, '1d', 300) # Get BTC reference data btc_symbol = 'BTC/USDT' btc_ohlcv_1s_list = self._get_ohlcv_data_list(btc_symbol, '1s', 300) if not btc_ohlcv_1s_list: # Use ETH data as fallback btc_ohlcv_1s_list = ohlcv_1s_list logger.debug(f"Using {symbol} data as BTC fallback") # Get technical indicators technical_indicators = self.cache.get('technical_indicators', symbol) or {} # Get COB data if available cob_data = self.cache.get('cob_data', symbol) # Get recent model predictions last_predictions = self._get_recent_predictions(symbol) # Build BaseDataInput base_data = BaseDataInput( symbol=symbol, timestamp=datetime.now(), ohlcv_1s=ohlcv_1s_list, ohlcv_1m=ohlcv_1m_list, ohlcv_1h=ohlcv_1h_list, ohlcv_1d=ohlcv_1d_list, btc_ohlcv_1s=btc_ohlcv_1s_list, technical_indicators=technical_indicators, cob_data=cob_data, last_predictions=last_predictions ) # Validate the data if not base_data.validate(): logger.warning(f"BaseDataInput validation failed for {symbol}") return None return base_data except Exception as e: logger.error(f"Error building BaseDataInput for {symbol}: {e}") return None def _get_ohlcv_data_list(self, symbol: str, timeframe: str, max_count: int) -> List[OHLCVBar]: """Get OHLCV data list from cache and historical data""" try: data_list = [] # Get historical data first historical_df = self.cache.get_historical_data(symbol, timeframe) if historical_df is not None and not historical_df.empty: # Convert historical data to OHLCVBar objects for idx, row in historical_df.tail(max_count - 1).iterrows(): bar = OHLCVBar( symbol=symbol, timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=float(row['volume']), timeframe=timeframe ) data_list.append(bar) # Add current data from cache current_ohlcv = self.cache.get(f'ohlcv_{timeframe}', symbol) if current_ohlcv and isinstance(current_ohlcv, OHLCVBar): data_list.append(current_ohlcv) # Ensure we have the right amount of data (pad if necessary) while len(data_list) < max_count: # Pad with the last available data or create dummy data if data_list: last_bar = data_list[-1] dummy_bar = OHLCVBar( symbol=symbol, timestamp=last_bar.timestamp, open=last_bar.close, high=last_bar.close, low=last_bar.close, close=last_bar.close, volume=0.0, timeframe=timeframe ) else: # Create completely dummy data dummy_bar = OHLCVBar( symbol=symbol, timestamp=datetime.now(), open=0.0, high=0.0, low=0.0, close=0.0, volume=0.0, timeframe=timeframe ) data_list.append(dummy_bar) return data_list[-max_count:] # Return last max_count items except Exception as e: logger.error(f"Error getting OHLCV data list for {symbol} {timeframe}: {e}") return [] def _try_historical_fallback(self, symbol: str, missing_timeframes: List[str]) -> bool: """Try to use historical data for missing timeframes""" try: for timeframe in missing_timeframes: historical_df = self.cache.get_historical_data(symbol, timeframe) if historical_df is not None and not historical_df.empty: # Use latest historical data as current data latest_row = historical_df.iloc[-1] ohlcv_bar = OHLCVBar( symbol=symbol, timestamp=historical_df.index[-1] if hasattr(historical_df.index[-1], 'to_pydatetime') else datetime.now(), open=float(latest_row['open']), high=float(latest_row['high']), low=float(latest_row['low']), close=float(latest_row['close']), volume=float(latest_row['volume']), timeframe=timeframe ) self.cache.update(f'ohlcv_{timeframe}', symbol, ohlcv_bar, 'historical_fallback') logger.info(f"Used historical fallback for {symbol} {timeframe}") return True except Exception as e: logger.error(f"Error in historical fallback: {e}") return False def _get_recent_predictions(self, symbol: str) -> Dict[str, Any]: """Get recent model predictions""" try: predictions = {} # Get predictions from cache for model_type in ['cnn', 'rl', 'extrema']: prediction_data = self.cache.get(f'prediction_{model_type}', symbol) if prediction_data: predictions[model_type] = prediction_data return predictions except Exception as e: logger.error(f"Error getting recent predictions for {symbol}: {e}") return {} def update_model_prediction(self, model_name: str, symbol: str, prediction_data: Any): """Update model prediction in cache""" self.cache.update(f'prediction_{model_name}', symbol, prediction_data, model_name) def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol""" return self.data_updater.get_current_price(symbol) def get_cache_status(self) -> Dict[str, Any]: """Get cache status for monitoring""" return { 'cache_status': self.cache.get_status(), 'updater_status': self.data_updater.get_status() } def has_sufficient_data(self, symbol: str) -> bool: """Check if we have sufficient data for model predictions""" required_data = ['ohlcv_1s', 'ohlcv_1m', 'ohlcv_1h', 'ohlcv_1d'] for data_type in required_data: if not self.cache.has_data(data_type, symbol, max_age_seconds=300): # Check historical data as fallback timeframe = data_type.split('_')[1] if not self.cache.has_historical_data(symbol, timeframe, min_bars=50): return False return True