482 lines
19 KiB
Python
482 lines
19 KiB
Python
"""
|
|
Unified Data Provider Extension
|
|
Extends existing DataProvider with unified storage system capabilities.
|
|
Provides single endpoint for real-time and historical data access.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
import pandas as pd
|
|
|
|
from .unified_cache_manager import DataCacheManager
|
|
from .unified_database_manager import DatabaseConnectionManager, UnifiedDatabaseQueryManager
|
|
from .unified_ingestion_pipeline import DataIngestionPipeline
|
|
from .unified_data_models import InferenceDataFrame, OrderBookDataFrame, OHLCVCandle
|
|
from .config import get_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UnifiedDataProviderExtension:
|
|
"""
|
|
Extension for DataProvider that adds unified storage capabilities.
|
|
Provides single endpoint for real-time and historical data access.
|
|
|
|
Key Features:
|
|
- Single get_inference_data() method for all data access
|
|
- Automatic routing: cache for real-time, database for historical
|
|
- Multi-timeframe data retrieval
|
|
- Order book and imbalance data access
|
|
- Seamless integration with existing DataProvider
|
|
"""
|
|
|
|
def __init__(self, data_provider_instance):
|
|
"""
|
|
Initialize unified data provider extension.
|
|
|
|
Args:
|
|
data_provider_instance: Existing DataProvider instance
|
|
"""
|
|
self.data_provider = data_provider_instance
|
|
self.config = get_config()
|
|
|
|
# Unified storage components
|
|
self.cache_manager: Optional[DataCacheManager] = None
|
|
self.db_connection: Optional[DatabaseConnectionManager] = None
|
|
self.db_query_manager: Optional[UnifiedDatabaseQueryManager] = None
|
|
self.ingestion_pipeline: Optional[DataIngestionPipeline] = None
|
|
|
|
# Initialization state
|
|
self._initialized = False
|
|
self._initialization_lock = asyncio.Lock()
|
|
|
|
logger.info("UnifiedDataProviderExtension created")
|
|
|
|
async def initialize_unified_storage(self):
|
|
"""Initialize unified storage components."""
|
|
async with self._initialization_lock:
|
|
if self._initialized:
|
|
logger.info("Unified storage already initialized")
|
|
return True
|
|
|
|
try:
|
|
logger.info("Initializing unified storage system...")
|
|
|
|
# Initialize cache manager
|
|
self.cache_manager = DataCacheManager(cache_duration_seconds=300)
|
|
logger.info("✓ Cache manager initialized")
|
|
|
|
# Initialize database connection
|
|
self.db_connection = DatabaseConnectionManager(self.config)
|
|
success = await self.db_connection.initialize()
|
|
|
|
if not success:
|
|
logger.error("Failed to initialize database connection")
|
|
return False
|
|
|
|
logger.info("✓ Database connection initialized")
|
|
|
|
# Initialize query manager
|
|
self.db_query_manager = UnifiedDatabaseQueryManager(self.db_connection)
|
|
logger.info("✓ Query manager initialized")
|
|
|
|
# Initialize ingestion pipeline
|
|
self.ingestion_pipeline = DataIngestionPipeline(
|
|
cache_manager=self.cache_manager,
|
|
db_connection_manager=self.db_connection,
|
|
batch_size=100,
|
|
batch_timeout_seconds=5.0
|
|
)
|
|
|
|
# Start ingestion pipeline
|
|
self.ingestion_pipeline.start()
|
|
logger.info("✓ Ingestion pipeline started")
|
|
|
|
self._initialized = True
|
|
logger.info(" Unified storage system initialized successfully")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize unified storage: {e}")
|
|
self._initialized = False
|
|
return False
|
|
|
|
async def shutdown_unified_storage(self):
|
|
"""Shutdown unified storage components."""
|
|
try:
|
|
logger.info("Shutting down unified storage system...")
|
|
|
|
if self.ingestion_pipeline:
|
|
await self.ingestion_pipeline.stop()
|
|
logger.info("✓ Ingestion pipeline stopped")
|
|
|
|
if self.db_connection:
|
|
await self.db_connection.close()
|
|
logger.info("✓ Database connection closed")
|
|
|
|
self._initialized = False
|
|
logger.info(" Unified storage system shutdown complete")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error shutting down unified storage: {e}")
|
|
|
|
async def get_inference_data(
|
|
self,
|
|
symbol: str,
|
|
timestamp: Optional[datetime] = None,
|
|
context_window_minutes: int = 5
|
|
) -> InferenceDataFrame:
|
|
"""
|
|
Get complete inference data for a symbol at a specific time.
|
|
|
|
This is the MAIN UNIFIED ENDPOINT for all data access.
|
|
- If timestamp is None: Returns latest real-time data from cache
|
|
- If timestamp is provided: Returns historical data from database
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
timestamp: Target timestamp (None = latest real-time data)
|
|
context_window_minutes: Minutes of context data before/after timestamp
|
|
|
|
Returns:
|
|
InferenceDataFrame with complete market data
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if not self._initialized:
|
|
logger.warning("Unified storage not initialized, initializing now...")
|
|
await self.initialize_unified_storage()
|
|
|
|
# Determine data source
|
|
if timestamp is None:
|
|
# Real-time data from cache
|
|
data_source = 'cache'
|
|
inference_data = await self._get_realtime_inference_data(symbol)
|
|
else:
|
|
# Historical data from database
|
|
data_source = 'database'
|
|
inference_data = await self._get_historical_inference_data(
|
|
symbol, timestamp, context_window_minutes
|
|
)
|
|
|
|
# Set metadata
|
|
inference_data.data_source = data_source
|
|
inference_data.query_latency_ms = (time.time() - start_time) * 1000
|
|
|
|
logger.debug(f"Retrieved inference data for {symbol} from {data_source} "
|
|
f"({inference_data.query_latency_ms:.2f}ms)")
|
|
|
|
return inference_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting inference data for {symbol}: {e}")
|
|
# Return empty inference data frame
|
|
return InferenceDataFrame(
|
|
symbol=symbol,
|
|
timestamp=timestamp or datetime.now(timezone.utc),
|
|
data_source='error',
|
|
query_latency_ms=(time.time() - start_time) * 1000
|
|
)
|
|
|
|
async def _get_realtime_inference_data(self, symbol: str) -> InferenceDataFrame:
|
|
"""Get real-time inference data from cache."""
|
|
try:
|
|
# Get OHLCV data from cache for all timeframes
|
|
ohlcv_1s = self.cache_manager.get_ohlcv_dataframe(symbol, '1s', limit=100)
|
|
ohlcv_1m = self.cache_manager.get_ohlcv_dataframe(symbol, '1m', limit=100)
|
|
ohlcv_5m = self.cache_manager.get_ohlcv_dataframe(symbol, '5m', limit=50)
|
|
ohlcv_15m = self.cache_manager.get_ohlcv_dataframe(symbol, '15m', limit=30)
|
|
ohlcv_1h = self.cache_manager.get_ohlcv_dataframe(symbol, '1h', limit=24)
|
|
ohlcv_1d = self.cache_manager.get_ohlcv_dataframe(symbol, '1d', limit=30)
|
|
|
|
# Get order book data from cache
|
|
orderbook_snapshot = self.cache_manager.get_latest_orderbook(symbol)
|
|
|
|
# Get imbalances from cache
|
|
imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=60)
|
|
imbalances_df = pd.DataFrame(imbalances_list) if imbalances_list else pd.DataFrame()
|
|
|
|
# Get current timestamp
|
|
current_timestamp = datetime.now(timezone.utc)
|
|
if not ohlcv_1s.empty and 'timestamp' in ohlcv_1s.columns:
|
|
current_timestamp = ohlcv_1s.iloc[-1]['timestamp']
|
|
|
|
# Extract technical indicators from latest candle
|
|
indicators = {}
|
|
if not ohlcv_1m.empty:
|
|
latest_candle = ohlcv_1m.iloc[-1]
|
|
for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']:
|
|
if col in latest_candle:
|
|
indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0
|
|
|
|
# Create inference data frame
|
|
inference_data = InferenceDataFrame(
|
|
symbol=symbol,
|
|
timestamp=current_timestamp,
|
|
ohlcv_1s=ohlcv_1s,
|
|
ohlcv_1m=ohlcv_1m,
|
|
ohlcv_5m=ohlcv_5m,
|
|
ohlcv_15m=ohlcv_15m,
|
|
ohlcv_1h=ohlcv_1h,
|
|
ohlcv_1d=ohlcv_1d,
|
|
orderbook_snapshot=orderbook_snapshot,
|
|
imbalances=imbalances_df,
|
|
indicators=indicators,
|
|
data_source='cache'
|
|
)
|
|
|
|
return inference_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting real-time inference data: {e}")
|
|
return InferenceDataFrame(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(timezone.utc),
|
|
data_source='cache_error'
|
|
)
|
|
|
|
async def _get_historical_inference_data(
|
|
self,
|
|
symbol: str,
|
|
timestamp: datetime,
|
|
context_window_minutes: int
|
|
) -> InferenceDataFrame:
|
|
"""Get historical inference data from database."""
|
|
try:
|
|
# Calculate time range
|
|
start_time = timestamp - timedelta(minutes=context_window_minutes)
|
|
end_time = timestamp + timedelta(minutes=context_window_minutes)
|
|
|
|
# Query OHLCV data for all timeframes
|
|
ohlcv_1s = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '1s', start_time, end_time, limit=300
|
|
)
|
|
ohlcv_1m = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '1m', start_time, end_time, limit=100
|
|
)
|
|
ohlcv_5m = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '5m', start_time, end_time, limit=50
|
|
)
|
|
ohlcv_15m = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '15m', start_time, end_time, limit=30
|
|
)
|
|
ohlcv_1h = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '1h', start_time, end_time, limit=24
|
|
)
|
|
ohlcv_1d = await self.db_query_manager.query_ohlcv_data(
|
|
symbol, '1d', start_time, end_time, limit=30
|
|
)
|
|
|
|
# Query order book snapshots
|
|
orderbook_snapshots = await self.db_query_manager.query_orderbook_snapshots(
|
|
symbol, start_time, end_time, limit=10
|
|
)
|
|
orderbook_snapshot = orderbook_snapshots[-1] if orderbook_snapshots else None
|
|
|
|
# Query imbalances
|
|
imbalances_df = await self.db_query_manager.query_orderbook_imbalances(
|
|
symbol, start_time, end_time, limit=60
|
|
)
|
|
|
|
# Extract technical indicators from latest candle
|
|
indicators = {}
|
|
if not ohlcv_1m.empty:
|
|
latest_candle = ohlcv_1m.iloc[-1]
|
|
for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']:
|
|
if col in latest_candle:
|
|
indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0
|
|
|
|
# Create context data (all 1m candles in window)
|
|
context_data = ohlcv_1m.copy() if not ohlcv_1m.empty else None
|
|
|
|
# Create inference data frame
|
|
inference_data = InferenceDataFrame(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
ohlcv_1s=ohlcv_1s,
|
|
ohlcv_1m=ohlcv_1m,
|
|
ohlcv_5m=ohlcv_5m,
|
|
ohlcv_15m=ohlcv_15m,
|
|
ohlcv_1h=ohlcv_1h,
|
|
ohlcv_1d=ohlcv_1d,
|
|
orderbook_snapshot=orderbook_snapshot,
|
|
imbalances=imbalances_df,
|
|
indicators=indicators,
|
|
context_data=context_data,
|
|
data_source='database'
|
|
)
|
|
|
|
return inference_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting historical inference data: {e}")
|
|
return InferenceDataFrame(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
data_source='database_error'
|
|
)
|
|
|
|
async def get_multi_timeframe_data(
|
|
self,
|
|
symbol: str,
|
|
timeframes: List[str],
|
|
timestamp: Optional[datetime] = None,
|
|
limit: int = 100
|
|
) -> Dict[str, pd.DataFrame]:
|
|
"""
|
|
Get aligned multi-timeframe candlestick data.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframes: List of timeframes to retrieve
|
|
timestamp: Target timestamp (None = latest)
|
|
limit: Number of candles per timeframe
|
|
|
|
Returns:
|
|
Dictionary mapping timeframe to DataFrame
|
|
"""
|
|
try:
|
|
if not self._initialized:
|
|
await self.initialize_unified_storage()
|
|
|
|
if timestamp is None:
|
|
# Get from cache
|
|
result = {}
|
|
for tf in timeframes:
|
|
result[tf] = self.cache_manager.get_ohlcv_dataframe(symbol, tf, limit)
|
|
return result
|
|
else:
|
|
# Get from database
|
|
return await self.db_query_manager.query_multi_timeframe_ohlcv(
|
|
symbol, timeframes, timestamp, limit
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting multi-timeframe data: {e}")
|
|
return {}
|
|
|
|
async def get_order_book_data(
|
|
self,
|
|
symbol: str,
|
|
timestamp: Optional[datetime] = None,
|
|
aggregation: str = '1s',
|
|
limit: int = 300
|
|
) -> OrderBookDataFrame:
|
|
"""
|
|
Get order book data with imbalance metrics.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timestamp: Target timestamp (None = latest)
|
|
aggregation: Aggregation level ('raw', '1s', '1m')
|
|
limit: Number of data points
|
|
|
|
Returns:
|
|
OrderBookDataFrame with bids, asks, imbalances
|
|
"""
|
|
try:
|
|
if not self._initialized:
|
|
await self.initialize_unified_storage()
|
|
|
|
if timestamp is None:
|
|
# Get latest from cache
|
|
snapshot = self.cache_manager.get_latest_orderbook(symbol)
|
|
imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=1)
|
|
|
|
if not snapshot:
|
|
return OrderBookDataFrame(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
|
|
# Extract imbalances
|
|
imbalances = imbalances_list[0] if imbalances_list else {}
|
|
|
|
orderbook_df = OrderBookDataFrame(
|
|
symbol=symbol,
|
|
timestamp=snapshot.get('timestamp', datetime.now(timezone.utc)),
|
|
bids=snapshot.get('bids', []),
|
|
asks=snapshot.get('asks', []),
|
|
mid_price=snapshot.get('mid_price', 0.0),
|
|
spread=snapshot.get('spread', 0.0),
|
|
bid_volume=snapshot.get('bid_volume', 0.0),
|
|
ask_volume=snapshot.get('ask_volume', 0.0),
|
|
imbalance_1s=imbalances.get('imbalance_1s', 0.0),
|
|
imbalance_5s=imbalances.get('imbalance_5s', 0.0),
|
|
imbalance_15s=imbalances.get('imbalance_15s', 0.0),
|
|
imbalance_60s=imbalances.get('imbalance_60s', 0.0)
|
|
)
|
|
|
|
return orderbook_df
|
|
else:
|
|
# Get from database
|
|
snapshots = await self.db_query_manager.query_orderbook_snapshots(
|
|
symbol, timestamp, timestamp, limit=1
|
|
)
|
|
|
|
if not snapshots:
|
|
return OrderBookDataFrame(
|
|
symbol=symbol,
|
|
timestamp=timestamp
|
|
)
|
|
|
|
snapshot = snapshots[0]
|
|
|
|
# Get imbalances
|
|
imbalances_df = await self.db_query_manager.query_orderbook_imbalances(
|
|
symbol, timestamp, timestamp, limit=1
|
|
)
|
|
|
|
imbalances = {}
|
|
if not imbalances_df.empty:
|
|
imbalances = imbalances_df.iloc[0].to_dict()
|
|
|
|
orderbook_df = OrderBookDataFrame(
|
|
symbol=symbol,
|
|
timestamp=snapshot.get('timestamp', timestamp),
|
|
bids=snapshot.get('bids', []),
|
|
asks=snapshot.get('asks', []),
|
|
mid_price=snapshot.get('mid_price', 0.0),
|
|
spread=snapshot.get('spread', 0.0),
|
|
bid_volume=snapshot.get('bid_volume', 0.0),
|
|
ask_volume=snapshot.get('ask_volume', 0.0),
|
|
imbalance_1s=imbalances.get('imbalance_1s', 0.0),
|
|
imbalance_5s=imbalances.get('imbalance_5s', 0.0),
|
|
imbalance_15s=imbalances.get('imbalance_15s', 0.0),
|
|
imbalance_60s=imbalances.get('imbalance_60s', 0.0)
|
|
)
|
|
|
|
return orderbook_df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting order book data: {e}")
|
|
return OrderBookDataFrame(
|
|
symbol=symbol,
|
|
timestamp=timestamp or datetime.now(timezone.utc)
|
|
)
|
|
|
|
def get_unified_stats(self) -> Dict[str, Any]:
|
|
"""Get statistics from all unified storage components."""
|
|
stats = {
|
|
'initialized': self._initialized,
|
|
'cache': None,
|
|
'database': None,
|
|
'ingestion': None
|
|
}
|
|
|
|
if self.cache_manager:
|
|
stats['cache'] = self.cache_manager.get_cache_stats()
|
|
|
|
if self.db_connection:
|
|
stats['database'] = self.db_connection.get_stats()
|
|
|
|
if self.ingestion_pipeline:
|
|
stats['ingestion'] = self.ingestion_pipeline.get_stats()
|
|
|
|
return stats
|