""" Unified Data Provider Extension Extends existing DataProvider with unified storage system capabilities. Provides single endpoint for real-time and historical data access. """ import asyncio import logging import time from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any import pandas as pd from .unified_cache_manager import DataCacheManager from .unified_database_manager import DatabaseConnectionManager, UnifiedDatabaseQueryManager from .unified_ingestion_pipeline import DataIngestionPipeline from .unified_data_models import InferenceDataFrame, OrderBookDataFrame, OHLCVCandle from .config import get_config logger = logging.getLogger(__name__) class UnifiedDataProviderExtension: """ Extension for DataProvider that adds unified storage capabilities. Provides single endpoint for real-time and historical data access. Key Features: - Single get_inference_data() method for all data access - Automatic routing: cache for real-time, database for historical - Multi-timeframe data retrieval - Order book and imbalance data access - Seamless integration with existing DataProvider """ def __init__(self, data_provider_instance): """ Initialize unified data provider extension. Args: data_provider_instance: Existing DataProvider instance """ self.data_provider = data_provider_instance self.config = get_config() # Unified storage components self.cache_manager: Optional[DataCacheManager] = None self.db_connection: Optional[DatabaseConnectionManager] = None self.db_query_manager: Optional[UnifiedDatabaseQueryManager] = None self.ingestion_pipeline: Optional[DataIngestionPipeline] = None # Initialization state self._initialized = False self._initialization_lock = asyncio.Lock() logger.info("UnifiedDataProviderExtension created") async def initialize_unified_storage(self): """Initialize unified storage components.""" async with self._initialization_lock: if self._initialized: logger.info("Unified storage already initialized") return True try: logger.info("Initializing unified storage system...") # Initialize cache manager self.cache_manager = DataCacheManager(cache_duration_seconds=300) logger.info("✓ Cache manager initialized") # Initialize database connection self.db_connection = DatabaseConnectionManager(self.config) success = await self.db_connection.initialize() if not success: logger.error("Failed to initialize database connection") return False logger.info("✓ Database connection initialized") # Initialize query manager self.db_query_manager = UnifiedDatabaseQueryManager(self.db_connection) logger.info("✓ Query manager initialized") # Initialize ingestion pipeline self.ingestion_pipeline = DataIngestionPipeline( cache_manager=self.cache_manager, db_connection_manager=self.db_connection, batch_size=100, batch_timeout_seconds=5.0 ) # Start ingestion pipeline self.ingestion_pipeline.start() logger.info("✓ Ingestion pipeline started") self._initialized = True logger.info("✅ Unified storage system initialized successfully") return True except Exception as e: logger.error(f"Failed to initialize unified storage: {e}") self._initialized = False return False async def shutdown_unified_storage(self): """Shutdown unified storage components.""" try: logger.info("Shutting down unified storage system...") if self.ingestion_pipeline: await self.ingestion_pipeline.stop() logger.info("✓ Ingestion pipeline stopped") if self.db_connection: await self.db_connection.close() logger.info("✓ Database connection closed") self._initialized = False logger.info("✅ Unified storage system shutdown complete") except Exception as e: logger.error(f"Error shutting down unified storage: {e}") async def get_inference_data( self, symbol: str, timestamp: Optional[datetime] = None, context_window_minutes: int = 5 ) -> InferenceDataFrame: """ Get complete inference data for a symbol at a specific time. This is the MAIN UNIFIED ENDPOINT for all data access. - If timestamp is None: Returns latest real-time data from cache - If timestamp is provided: Returns historical data from database Args: symbol: Trading symbol (e.g., 'ETH/USDT') timestamp: Target timestamp (None = latest real-time data) context_window_minutes: Minutes of context data before/after timestamp Returns: InferenceDataFrame with complete market data """ start_time = time.time() try: if not self._initialized: logger.warning("Unified storage not initialized, initializing now...") await self.initialize_unified_storage() # Determine data source if timestamp is None: # Real-time data from cache data_source = 'cache' inference_data = await self._get_realtime_inference_data(symbol) else: # Historical data from database data_source = 'database' inference_data = await self._get_historical_inference_data( symbol, timestamp, context_window_minutes ) # Set metadata inference_data.data_source = data_source inference_data.query_latency_ms = (time.time() - start_time) * 1000 logger.debug(f"Retrieved inference data for {symbol} from {data_source} " f"({inference_data.query_latency_ms:.2f}ms)") return inference_data except Exception as e: logger.error(f"Error getting inference data for {symbol}: {e}") # Return empty inference data frame return InferenceDataFrame( symbol=symbol, timestamp=timestamp or datetime.now(timezone.utc), data_source='error', query_latency_ms=(time.time() - start_time) * 1000 ) async def _get_realtime_inference_data(self, symbol: str) -> InferenceDataFrame: """Get real-time inference data from cache.""" try: # Get OHLCV data from cache for all timeframes ohlcv_1s = self.cache_manager.get_ohlcv_dataframe(symbol, '1s', limit=100) ohlcv_1m = self.cache_manager.get_ohlcv_dataframe(symbol, '1m', limit=100) ohlcv_5m = self.cache_manager.get_ohlcv_dataframe(symbol, '5m', limit=50) ohlcv_15m = self.cache_manager.get_ohlcv_dataframe(symbol, '15m', limit=30) ohlcv_1h = self.cache_manager.get_ohlcv_dataframe(symbol, '1h', limit=24) ohlcv_1d = self.cache_manager.get_ohlcv_dataframe(symbol, '1d', limit=30) # Get order book data from cache orderbook_snapshot = self.cache_manager.get_latest_orderbook(symbol) # Get imbalances from cache imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=60) imbalances_df = pd.DataFrame(imbalances_list) if imbalances_list else pd.DataFrame() # Get current timestamp current_timestamp = datetime.now(timezone.utc) if not ohlcv_1s.empty and 'timestamp' in ohlcv_1s.columns: current_timestamp = ohlcv_1s.iloc[-1]['timestamp'] # Extract technical indicators from latest candle indicators = {} if not ohlcv_1m.empty: latest_candle = ohlcv_1m.iloc[-1] for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']: if col in latest_candle: indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0 # Create inference data frame inference_data = InferenceDataFrame( symbol=symbol, timestamp=current_timestamp, ohlcv_1s=ohlcv_1s, ohlcv_1m=ohlcv_1m, ohlcv_5m=ohlcv_5m, ohlcv_15m=ohlcv_15m, ohlcv_1h=ohlcv_1h, ohlcv_1d=ohlcv_1d, orderbook_snapshot=orderbook_snapshot, imbalances=imbalances_df, indicators=indicators, data_source='cache' ) return inference_data except Exception as e: logger.error(f"Error getting real-time inference data: {e}") return InferenceDataFrame( symbol=symbol, timestamp=datetime.now(timezone.utc), data_source='cache_error' ) async def _get_historical_inference_data( self, symbol: str, timestamp: datetime, context_window_minutes: int ) -> InferenceDataFrame: """Get historical inference data from database.""" try: # Calculate time range start_time = timestamp - timedelta(minutes=context_window_minutes) end_time = timestamp + timedelta(minutes=context_window_minutes) # Query OHLCV data for all timeframes ohlcv_1s = await self.db_query_manager.query_ohlcv_data( symbol, '1s', start_time, end_time, limit=300 ) ohlcv_1m = await self.db_query_manager.query_ohlcv_data( symbol, '1m', start_time, end_time, limit=100 ) ohlcv_5m = await self.db_query_manager.query_ohlcv_data( symbol, '5m', start_time, end_time, limit=50 ) ohlcv_15m = await self.db_query_manager.query_ohlcv_data( symbol, '15m', start_time, end_time, limit=30 ) ohlcv_1h = await self.db_query_manager.query_ohlcv_data( symbol, '1h', start_time, end_time, limit=24 ) ohlcv_1d = await self.db_query_manager.query_ohlcv_data( symbol, '1d', start_time, end_time, limit=30 ) # Query order book snapshots orderbook_snapshots = await self.db_query_manager.query_orderbook_snapshots( symbol, start_time, end_time, limit=10 ) orderbook_snapshot = orderbook_snapshots[-1] if orderbook_snapshots else None # Query imbalances imbalances_df = await self.db_query_manager.query_orderbook_imbalances( symbol, start_time, end_time, limit=60 ) # Extract technical indicators from latest candle indicators = {} if not ohlcv_1m.empty: latest_candle = ohlcv_1m.iloc[-1] for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']: if col in latest_candle: indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0 # Create context data (all 1m candles in window) context_data = ohlcv_1m.copy() if not ohlcv_1m.empty else None # Create inference data frame inference_data = InferenceDataFrame( symbol=symbol, timestamp=timestamp, ohlcv_1s=ohlcv_1s, ohlcv_1m=ohlcv_1m, ohlcv_5m=ohlcv_5m, ohlcv_15m=ohlcv_15m, ohlcv_1h=ohlcv_1h, ohlcv_1d=ohlcv_1d, orderbook_snapshot=orderbook_snapshot, imbalances=imbalances_df, indicators=indicators, context_data=context_data, data_source='database' ) return inference_data except Exception as e: logger.error(f"Error getting historical inference data: {e}") return InferenceDataFrame( symbol=symbol, timestamp=timestamp, data_source='database_error' ) async def get_multi_timeframe_data( self, symbol: str, timeframes: List[str], timestamp: Optional[datetime] = None, limit: int = 100 ) -> Dict[str, pd.DataFrame]: """ Get aligned multi-timeframe candlestick data. Args: symbol: Trading symbol timeframes: List of timeframes to retrieve timestamp: Target timestamp (None = latest) limit: Number of candles per timeframe Returns: Dictionary mapping timeframe to DataFrame """ try: if not self._initialized: await self.initialize_unified_storage() if timestamp is None: # Get from cache result = {} for tf in timeframes: result[tf] = self.cache_manager.get_ohlcv_dataframe(symbol, tf, limit) return result else: # Get from database return await self.db_query_manager.query_multi_timeframe_ohlcv( symbol, timeframes, timestamp, limit ) except Exception as e: logger.error(f"Error getting multi-timeframe data: {e}") return {} async def get_order_book_data( self, symbol: str, timestamp: Optional[datetime] = None, aggregation: str = '1s', limit: int = 300 ) -> OrderBookDataFrame: """ Get order book data with imbalance metrics. Args: symbol: Trading symbol timestamp: Target timestamp (None = latest) aggregation: Aggregation level ('raw', '1s', '1m') limit: Number of data points Returns: OrderBookDataFrame with bids, asks, imbalances """ try: if not self._initialized: await self.initialize_unified_storage() if timestamp is None: # Get latest from cache snapshot = self.cache_manager.get_latest_orderbook(symbol) imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=1) if not snapshot: return OrderBookDataFrame( symbol=symbol, timestamp=datetime.now(timezone.utc) ) # Extract imbalances imbalances = imbalances_list[0] if imbalances_list else {} orderbook_df = OrderBookDataFrame( symbol=symbol, timestamp=snapshot.get('timestamp', datetime.now(timezone.utc)), bids=snapshot.get('bids', []), asks=snapshot.get('asks', []), mid_price=snapshot.get('mid_price', 0.0), spread=snapshot.get('spread', 0.0), bid_volume=snapshot.get('bid_volume', 0.0), ask_volume=snapshot.get('ask_volume', 0.0), imbalance_1s=imbalances.get('imbalance_1s', 0.0), imbalance_5s=imbalances.get('imbalance_5s', 0.0), imbalance_15s=imbalances.get('imbalance_15s', 0.0), imbalance_60s=imbalances.get('imbalance_60s', 0.0) ) return orderbook_df else: # Get from database snapshots = await self.db_query_manager.query_orderbook_snapshots( symbol, timestamp, timestamp, limit=1 ) if not snapshots: return OrderBookDataFrame( symbol=symbol, timestamp=timestamp ) snapshot = snapshots[0] # Get imbalances imbalances_df = await self.db_query_manager.query_orderbook_imbalances( symbol, timestamp, timestamp, limit=1 ) imbalances = {} if not imbalances_df.empty: imbalances = imbalances_df.iloc[0].to_dict() orderbook_df = OrderBookDataFrame( symbol=symbol, timestamp=snapshot.get('timestamp', timestamp), bids=snapshot.get('bids', []), asks=snapshot.get('asks', []), mid_price=snapshot.get('mid_price', 0.0), spread=snapshot.get('spread', 0.0), bid_volume=snapshot.get('bid_volume', 0.0), ask_volume=snapshot.get('ask_volume', 0.0), imbalance_1s=imbalances.get('imbalance_1s', 0.0), imbalance_5s=imbalances.get('imbalance_5s', 0.0), imbalance_15s=imbalances.get('imbalance_15s', 0.0), imbalance_60s=imbalances.get('imbalance_60s', 0.0) ) return orderbook_df except Exception as e: logger.error(f"Error getting order book data: {e}") return OrderBookDataFrame( symbol=symbol, timestamp=timestamp or datetime.now(timezone.utc) ) def get_unified_stats(self) -> Dict[str, Any]: """Get statistics from all unified storage components.""" stats = { 'initialized': self._initialized, 'cache': None, 'database': None, 'ingestion': None } if self.cache_manager: stats['cache'] = self.cache_manager.get_cache_stats() if self.db_connection: stats['database'] = self.db_connection.get_stats() if self.ingestion_pipeline: stats['ingestion'] = self.ingestion_pipeline.get_stats() return stats