""" Simplified Data Integration for Orchestrator Replaces complex FIFO queues with simple cache-based data access. Integrates with SmartDataUpdater for efficient data management. """ import logging import threading from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import pandas as pd from .data_cache import get_data_cache from .smart_data_updater import SmartDataUpdater from .data_models import BaseDataInput, OHLCVBar logger = logging.getLogger(__name__) class SimplifiedDataIntegration: """ Simplified data integration that replaces FIFO queues with efficient caching """ def __init__(self, data_provider, symbols: List[str]): self.data_provider = data_provider self.symbols = symbols self.cache = get_data_cache() # Initialize smart data updater self.data_updater = SmartDataUpdater(data_provider, symbols) # Pre-built OHLCV data cache for instant access self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}} self._ohlcv_cache_lock = threading.RLock() self._last_cache_update = {} # {symbol: {timeframe: datetime}} # Register for tick data if available self._setup_tick_integration() logger.info(f"SimplifiedDataIntegration initialized for {symbols}") def start(self): """Start the data integration system""" self.data_updater.start() logger.info("SimplifiedDataIntegration started") def stop(self): """Stop the data integration system""" self.data_updater.stop() logger.info("SimplifiedDataIntegration stopped") def _setup_tick_integration(self): """Setup integration with tick data sources""" try: # Register callbacks for tick data if available if hasattr(self.data_provider, 'register_tick_callback'): self.data_provider.register_tick_callback(self._on_tick_data) # Register for WebSocket data if available if hasattr(self.data_provider, 'register_websocket_callback'): self.data_provider.register_websocket_callback(self._on_websocket_data) except Exception as e: logger.warning(f"Tick integration setup failed: {e}") def _on_tick_data(self, symbol: str, price: float, volume: float, timestamp: datetime = None): """Handle incoming tick data""" self.data_updater.add_tick(symbol, price, volume, timestamp) # Invalidate OHLCV cache for this symbol self._invalidate_ohlcv_cache(symbol) def _on_websocket_data(self, symbol: str, data: Dict[str, Any]): """Handle WebSocket data updates""" try: # Extract price and volume from WebSocket data if 'price' in data and 'volume' in data: self.data_updater.add_tick(symbol, data['price'], data['volume']) # Invalidate OHLCV cache for this symbol self._invalidate_ohlcv_cache(symbol) except Exception as e: logger.error(f"Error processing WebSocket data: {e}") def _invalidate_ohlcv_cache(self, symbol: str): """Invalidate OHLCV cache for a symbol when new data arrives""" try: with self._ohlcv_cache_lock: # Remove cached data for all timeframes of this symbol keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")] for key in keys_to_remove: if key in self._ohlcv_cache: del self._ohlcv_cache[key] if key in self._last_cache_update: del self._last_cache_update[key] except Exception as e: logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}") def build_base_data_input(self, symbol: str) -> Optional[BaseDataInput]: """ Build BaseDataInput from cached data (optimized for speed) Args: symbol: Trading symbol Returns: BaseDataInput with consistent data structure """ try: # Get OHLCV data directly from optimized cache (no validation checks for speed) ohlcv_1s_list = self._get_ohlcv_data_list(symbol, '1s', 300) ohlcv_1m_list = self._get_ohlcv_data_list(symbol, '1m', 300) ohlcv_1h_list = self._get_ohlcv_data_list(symbol, '1h', 300) ohlcv_1d_list = self._get_ohlcv_data_list(symbol, '1d', 300) # Get BTC reference data btc_symbol = 'BTC/USDT' btc_ohlcv_1s_list = self._get_ohlcv_data_list(btc_symbol, '1s', 300) if not btc_ohlcv_1s_list: # Use ETH data as fallback btc_ohlcv_1s_list = ohlcv_1s_list # Get cached data (fast lookups) technical_indicators = self.cache.get('technical_indicators', symbol) or {} cob_data = self.cache.get('cob_data', symbol) last_predictions = self._get_recent_predictions(symbol) # Build BaseDataInput (no validation for speed - assume data is good) base_data = BaseDataInput( symbol=symbol, timestamp=datetime.now(), ohlcv_1s=ohlcv_1s_list, ohlcv_1m=ohlcv_1m_list, ohlcv_1h=ohlcv_1h_list, ohlcv_1d=ohlcv_1d_list, btc_ohlcv_1s=btc_ohlcv_1s_list, technical_indicators=technical_indicators, cob_data=cob_data, last_predictions=last_predictions ) return base_data except Exception as e: logger.error(f"Error building BaseDataInput for {symbol}: {e}") return None def _get_ohlcv_data_list(self, symbol: str, timeframe: str, max_count: int) -> List[OHLCVBar]: """Get OHLCV data list from pre-built cache for instant access""" try: with self._ohlcv_cache_lock: cache_key = f"{symbol}_{timeframe}" # Check if we have fresh cached data (updated within last 5 seconds) last_update = self._last_cache_update.get(cache_key) if (last_update and (datetime.now() - last_update).total_seconds() < 5 and cache_key in self._ohlcv_cache): cached_data = self._ohlcv_cache[cache_key] return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data # Need to rebuild cache for this symbol/timeframe data_list = self._build_ohlcv_cache(symbol, timeframe, max_count) # Cache the result self._ohlcv_cache[cache_key] = data_list self._last_cache_update[cache_key] = datetime.now() return data_list[-max_count:] if len(data_list) >= max_count else data_list except Exception as e: logger.error(f"Error getting OHLCV data list for {symbol}/{timeframe}: {e}") return self._create_dummy_data_list(symbol, timeframe, max_count) def _build_ohlcv_cache(self, symbol: str, timeframe: str, max_count: int) -> List[OHLCVBar]: """Build OHLCV cache from historical and current data""" try: data_list = [] # Get historical data first (this should be fast as it's already cached) historical_df = self.cache.get_historical_data(symbol, timeframe) if historical_df is not None and not historical_df.empty: # Convert historical data to OHLCVBar objects for idx, row in historical_df.tail(max_count - 1).iterrows(): bar = OHLCVBar( symbol=symbol, timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=float(row['volume']), timeframe=timeframe ) data_list.append(bar) # Add current data from cache current_ohlcv = self.cache.get(f'ohlcv_{timeframe}', symbol) if current_ohlcv and isinstance(current_ohlcv, OHLCVBar): data_list.append(current_ohlcv) # Ensure we have the right amount of data (pad if necessary) while len(data_list) < max_count: data_list.extend(self._create_dummy_data_list(symbol, timeframe, max_count - len(data_list))) return data_list except Exception as e: logger.error(f"Error building OHLCV cache for {symbol}/{timeframe}: {e}") return self._create_dummy_data_list(symbol, timeframe, max_count) def _try_historical_fallback(self, symbol: str, missing_timeframes: List[str]) -> bool: """Try to use historical data for missing timeframes""" try: for timeframe in missing_timeframes: historical_df = self.cache.get_historical_data(symbol, timeframe) if historical_df is not None and not historical_df.empty: # Use latest historical data as current data latest_row = historical_df.iloc[-1] ohlcv_bar = OHLCVBar( symbol=symbol, timestamp=historical_df.index[-1] if hasattr(historical_df.index[-1], 'to_pydatetime') else datetime.now(), open=float(latest_row['open']), high=float(latest_row['high']), low=float(latest_row['low']), close=float(latest_row['close']), volume=float(latest_row['volume']), timeframe=timeframe ) self.cache.update(f'ohlcv_{timeframe}', symbol, ohlcv_bar, 'historical_fallback') logger.info(f"Used historical fallback for {symbol} {timeframe}") return True except Exception as e: logger.error(f"Error in historical fallback: {e}") return False def _get_recent_predictions(self, symbol: str) -> Dict[str, Any]: """Get recent model predictions""" try: predictions = {} # Get predictions from cache for model_type in ['cnn', 'rl', 'extrema']: prediction_data = self.cache.get(f'prediction_{model_type}', symbol) if prediction_data: predictions[model_type] = prediction_data return predictions except Exception as e: logger.error(f"Error getting recent predictions for {symbol}: {e}") return {} def update_model_prediction(self, model_name: str, symbol: str, prediction_data: Any): """Update model prediction in cache""" self.cache.update(f'prediction_{model_name}', symbol, prediction_data, model_name) def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol""" return self.data_updater.get_current_price(symbol) def get_cache_status(self) -> Dict[str, Any]: """Get cache status for monitoring""" return { 'cache_status': self.cache.get_status(), 'updater_status': self.data_updater.get_status() } def has_sufficient_data(self, symbol: str) -> bool: """Check if we have sufficient data for model predictions""" required_data = ['ohlcv_1s', 'ohlcv_1m', 'ohlcv_1h', 'ohlcv_1d'] for data_type in required_data: if not self.cache.has_data(data_type, symbol, max_age_seconds=300): # Check historical data as fallback timeframe = data_type.split('_')[1] if not self.cache.has_historical_data(symbol, timeframe, min_bars=50): return False return True