fix pivots display

This commit is contained in:
Dobromir Popov
2025-10-20 16:17:43 +03:00
parent e993bc2831
commit a8ea9b24c0
6 changed files with 1314 additions and 139 deletions

View File

@@ -0,0 +1,481 @@
"""
Unified Data Provider Extension
Extends existing DataProvider with unified storage system capabilities.
Provides single endpoint for real-time and historical data access.
"""
import asyncio
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Any
import pandas as pd
from .unified_cache_manager import DataCacheManager
from .unified_database_manager import DatabaseConnectionManager, UnifiedDatabaseQueryManager
from .unified_ingestion_pipeline import DataIngestionPipeline
from .unified_data_models import InferenceDataFrame, OrderBookDataFrame, OHLCVCandle
from .config import get_config
logger = logging.getLogger(__name__)
class UnifiedDataProviderExtension:
"""
Extension for DataProvider that adds unified storage capabilities.
Provides single endpoint for real-time and historical data access.
Key Features:
- Single get_inference_data() method for all data access
- Automatic routing: cache for real-time, database for historical
- Multi-timeframe data retrieval
- Order book and imbalance data access
- Seamless integration with existing DataProvider
"""
def __init__(self, data_provider_instance):
"""
Initialize unified data provider extension.
Args:
data_provider_instance: Existing DataProvider instance
"""
self.data_provider = data_provider_instance
self.config = get_config()
# Unified storage components
self.cache_manager: Optional[DataCacheManager] = None
self.db_connection: Optional[DatabaseConnectionManager] = None
self.db_query_manager: Optional[UnifiedDatabaseQueryManager] = None
self.ingestion_pipeline: Optional[DataIngestionPipeline] = None
# Initialization state
self._initialized = False
self._initialization_lock = asyncio.Lock()
logger.info("UnifiedDataProviderExtension created")
async def initialize_unified_storage(self):
"""Initialize unified storage components."""
async with self._initialization_lock:
if self._initialized:
logger.info("Unified storage already initialized")
return True
try:
logger.info("Initializing unified storage system...")
# Initialize cache manager
self.cache_manager = DataCacheManager(cache_duration_seconds=300)
logger.info("✓ Cache manager initialized")
# Initialize database connection
self.db_connection = DatabaseConnectionManager(self.config)
success = await self.db_connection.initialize()
if not success:
logger.error("Failed to initialize database connection")
return False
logger.info("✓ Database connection initialized")
# Initialize query manager
self.db_query_manager = UnifiedDatabaseQueryManager(self.db_connection)
logger.info("✓ Query manager initialized")
# Initialize ingestion pipeline
self.ingestion_pipeline = DataIngestionPipeline(
cache_manager=self.cache_manager,
db_connection_manager=self.db_connection,
batch_size=100,
batch_timeout_seconds=5.0
)
# Start ingestion pipeline
self.ingestion_pipeline.start()
logger.info("✓ Ingestion pipeline started")
self._initialized = True
logger.info("✅ Unified storage system initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize unified storage: {e}")
self._initialized = False
return False
async def shutdown_unified_storage(self):
"""Shutdown unified storage components."""
try:
logger.info("Shutting down unified storage system...")
if self.ingestion_pipeline:
await self.ingestion_pipeline.stop()
logger.info("✓ Ingestion pipeline stopped")
if self.db_connection:
await self.db_connection.close()
logger.info("✓ Database connection closed")
self._initialized = False
logger.info("✅ Unified storage system shutdown complete")
except Exception as e:
logger.error(f"Error shutting down unified storage: {e}")
async def get_inference_data(
self,
symbol: str,
timestamp: Optional[datetime] = None,
context_window_minutes: int = 5
) -> InferenceDataFrame:
"""
Get complete inference data for a symbol at a specific time.
This is the MAIN UNIFIED ENDPOINT for all data access.
- If timestamp is None: Returns latest real-time data from cache
- If timestamp is provided: Returns historical data from database
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
timestamp: Target timestamp (None = latest real-time data)
context_window_minutes: Minutes of context data before/after timestamp
Returns:
InferenceDataFrame with complete market data
"""
start_time = time.time()
try:
if not self._initialized:
logger.warning("Unified storage not initialized, initializing now...")
await self.initialize_unified_storage()
# Determine data source
if timestamp is None:
# Real-time data from cache
data_source = 'cache'
inference_data = await self._get_realtime_inference_data(symbol)
else:
# Historical data from database
data_source = 'database'
inference_data = await self._get_historical_inference_data(
symbol, timestamp, context_window_minutes
)
# Set metadata
inference_data.data_source = data_source
inference_data.query_latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Retrieved inference data for {symbol} from {data_source} "
f"({inference_data.query_latency_ms:.2f}ms)")
return inference_data
except Exception as e:
logger.error(f"Error getting inference data for {symbol}: {e}")
# Return empty inference data frame
return InferenceDataFrame(
symbol=symbol,
timestamp=timestamp or datetime.now(timezone.utc),
data_source='error',
query_latency_ms=(time.time() - start_time) * 1000
)
async def _get_realtime_inference_data(self, symbol: str) -> InferenceDataFrame:
"""Get real-time inference data from cache."""
try:
# Get OHLCV data from cache for all timeframes
ohlcv_1s = self.cache_manager.get_ohlcv_dataframe(symbol, '1s', limit=100)
ohlcv_1m = self.cache_manager.get_ohlcv_dataframe(symbol, '1m', limit=100)
ohlcv_5m = self.cache_manager.get_ohlcv_dataframe(symbol, '5m', limit=50)
ohlcv_15m = self.cache_manager.get_ohlcv_dataframe(symbol, '15m', limit=30)
ohlcv_1h = self.cache_manager.get_ohlcv_dataframe(symbol, '1h', limit=24)
ohlcv_1d = self.cache_manager.get_ohlcv_dataframe(symbol, '1d', limit=30)
# Get order book data from cache
orderbook_snapshot = self.cache_manager.get_latest_orderbook(symbol)
# Get imbalances from cache
imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=60)
imbalances_df = pd.DataFrame(imbalances_list) if imbalances_list else pd.DataFrame()
# Get current timestamp
current_timestamp = datetime.now(timezone.utc)
if not ohlcv_1s.empty and 'timestamp' in ohlcv_1s.columns:
current_timestamp = ohlcv_1s.iloc[-1]['timestamp']
# Extract technical indicators from latest candle
indicators = {}
if not ohlcv_1m.empty:
latest_candle = ohlcv_1m.iloc[-1]
for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']:
if col in latest_candle:
indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0
# Create inference data frame
inference_data = InferenceDataFrame(
symbol=symbol,
timestamp=current_timestamp,
ohlcv_1s=ohlcv_1s,
ohlcv_1m=ohlcv_1m,
ohlcv_5m=ohlcv_5m,
ohlcv_15m=ohlcv_15m,
ohlcv_1h=ohlcv_1h,
ohlcv_1d=ohlcv_1d,
orderbook_snapshot=orderbook_snapshot,
imbalances=imbalances_df,
indicators=indicators,
data_source='cache'
)
return inference_data
except Exception as e:
logger.error(f"Error getting real-time inference data: {e}")
return InferenceDataFrame(
symbol=symbol,
timestamp=datetime.now(timezone.utc),
data_source='cache_error'
)
async def _get_historical_inference_data(
self,
symbol: str,
timestamp: datetime,
context_window_minutes: int
) -> InferenceDataFrame:
"""Get historical inference data from database."""
try:
# Calculate time range
start_time = timestamp - timedelta(minutes=context_window_minutes)
end_time = timestamp + timedelta(minutes=context_window_minutes)
# Query OHLCV data for all timeframes
ohlcv_1s = await self.db_query_manager.query_ohlcv_data(
symbol, '1s', start_time, end_time, limit=300
)
ohlcv_1m = await self.db_query_manager.query_ohlcv_data(
symbol, '1m', start_time, end_time, limit=100
)
ohlcv_5m = await self.db_query_manager.query_ohlcv_data(
symbol, '5m', start_time, end_time, limit=50
)
ohlcv_15m = await self.db_query_manager.query_ohlcv_data(
symbol, '15m', start_time, end_time, limit=30
)
ohlcv_1h = await self.db_query_manager.query_ohlcv_data(
symbol, '1h', start_time, end_time, limit=24
)
ohlcv_1d = await self.db_query_manager.query_ohlcv_data(
symbol, '1d', start_time, end_time, limit=30
)
# Query order book snapshots
orderbook_snapshots = await self.db_query_manager.query_orderbook_snapshots(
symbol, start_time, end_time, limit=10
)
orderbook_snapshot = orderbook_snapshots[-1] if orderbook_snapshots else None
# Query imbalances
imbalances_df = await self.db_query_manager.query_orderbook_imbalances(
symbol, start_time, end_time, limit=60
)
# Extract technical indicators from latest candle
indicators = {}
if not ohlcv_1m.empty:
latest_candle = ohlcv_1m.iloc[-1]
for col in ['rsi_14', 'macd', 'macd_signal', 'bb_upper', 'bb_middle', 'bb_lower']:
if col in latest_candle:
indicators[col] = float(latest_candle[col]) if pd.notna(latest_candle[col]) else 0.0
# Create context data (all 1m candles in window)
context_data = ohlcv_1m.copy() if not ohlcv_1m.empty else None
# Create inference data frame
inference_data = InferenceDataFrame(
symbol=symbol,
timestamp=timestamp,
ohlcv_1s=ohlcv_1s,
ohlcv_1m=ohlcv_1m,
ohlcv_5m=ohlcv_5m,
ohlcv_15m=ohlcv_15m,
ohlcv_1h=ohlcv_1h,
ohlcv_1d=ohlcv_1d,
orderbook_snapshot=orderbook_snapshot,
imbalances=imbalances_df,
indicators=indicators,
context_data=context_data,
data_source='database'
)
return inference_data
except Exception as e:
logger.error(f"Error getting historical inference data: {e}")
return InferenceDataFrame(
symbol=symbol,
timestamp=timestamp,
data_source='database_error'
)
async def get_multi_timeframe_data(
self,
symbol: str,
timeframes: List[str],
timestamp: Optional[datetime] = None,
limit: int = 100
) -> Dict[str, pd.DataFrame]:
"""
Get aligned multi-timeframe candlestick data.
Args:
symbol: Trading symbol
timeframes: List of timeframes to retrieve
timestamp: Target timestamp (None = latest)
limit: Number of candles per timeframe
Returns:
Dictionary mapping timeframe to DataFrame
"""
try:
if not self._initialized:
await self.initialize_unified_storage()
if timestamp is None:
# Get from cache
result = {}
for tf in timeframes:
result[tf] = self.cache_manager.get_ohlcv_dataframe(symbol, tf, limit)
return result
else:
# Get from database
return await self.db_query_manager.query_multi_timeframe_ohlcv(
symbol, timeframes, timestamp, limit
)
except Exception as e:
logger.error(f"Error getting multi-timeframe data: {e}")
return {}
async def get_order_book_data(
self,
symbol: str,
timestamp: Optional[datetime] = None,
aggregation: str = '1s',
limit: int = 300
) -> OrderBookDataFrame:
"""
Get order book data with imbalance metrics.
Args:
symbol: Trading symbol
timestamp: Target timestamp (None = latest)
aggregation: Aggregation level ('raw', '1s', '1m')
limit: Number of data points
Returns:
OrderBookDataFrame with bids, asks, imbalances
"""
try:
if not self._initialized:
await self.initialize_unified_storage()
if timestamp is None:
# Get latest from cache
snapshot = self.cache_manager.get_latest_orderbook(symbol)
imbalances_list = self.cache_manager.get_latest_imbalances(symbol, limit=1)
if not snapshot:
return OrderBookDataFrame(
symbol=symbol,
timestamp=datetime.now(timezone.utc)
)
# Extract imbalances
imbalances = imbalances_list[0] if imbalances_list else {}
orderbook_df = OrderBookDataFrame(
symbol=symbol,
timestamp=snapshot.get('timestamp', datetime.now(timezone.utc)),
bids=snapshot.get('bids', []),
asks=snapshot.get('asks', []),
mid_price=snapshot.get('mid_price', 0.0),
spread=snapshot.get('spread', 0.0),
bid_volume=snapshot.get('bid_volume', 0.0),
ask_volume=snapshot.get('ask_volume', 0.0),
imbalance_1s=imbalances.get('imbalance_1s', 0.0),
imbalance_5s=imbalances.get('imbalance_5s', 0.0),
imbalance_15s=imbalances.get('imbalance_15s', 0.0),
imbalance_60s=imbalances.get('imbalance_60s', 0.0)
)
return orderbook_df
else:
# Get from database
snapshots = await self.db_query_manager.query_orderbook_snapshots(
symbol, timestamp, timestamp, limit=1
)
if not snapshots:
return OrderBookDataFrame(
symbol=symbol,
timestamp=timestamp
)
snapshot = snapshots[0]
# Get imbalances
imbalances_df = await self.db_query_manager.query_orderbook_imbalances(
symbol, timestamp, timestamp, limit=1
)
imbalances = {}
if not imbalances_df.empty:
imbalances = imbalances_df.iloc[0].to_dict()
orderbook_df = OrderBookDataFrame(
symbol=symbol,
timestamp=snapshot.get('timestamp', timestamp),
bids=snapshot.get('bids', []),
asks=snapshot.get('asks', []),
mid_price=snapshot.get('mid_price', 0.0),
spread=snapshot.get('spread', 0.0),
bid_volume=snapshot.get('bid_volume', 0.0),
ask_volume=snapshot.get('ask_volume', 0.0),
imbalance_1s=imbalances.get('imbalance_1s', 0.0),
imbalance_5s=imbalances.get('imbalance_5s', 0.0),
imbalance_15s=imbalances.get('imbalance_15s', 0.0),
imbalance_60s=imbalances.get('imbalance_60s', 0.0)
)
return orderbook_df
except Exception as e:
logger.error(f"Error getting order book data: {e}")
return OrderBookDataFrame(
symbol=symbol,
timestamp=timestamp or datetime.now(timezone.utc)
)
def get_unified_stats(self) -> Dict[str, Any]:
"""Get statistics from all unified storage components."""
stats = {
'initialized': self._initialized,
'cache': None,
'database': None,
'ingestion': None
}
if self.cache_manager:
stats['cache'] = self.cache_manager.get_cache_stats()
if self.db_connection:
stats['database'] = self.db_connection.get_stats()
if self.ingestion_pipeline:
stats['ingestion'] = self.ingestion_pipeline.get_stats()
return stats

View File

@@ -0,0 +1,602 @@
"""
Data Ingestion Pipeline for unified storage system.
Handles real-time data ingestion with batch writes to database.
"""
import asyncio
import logging
import time
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from collections import deque
from threading import Lock
from .unified_cache_manager import DataCacheManager
from .unified_database_manager import DatabaseConnectionManager
from .unified_data_validator import DataValidator
from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent
logger = logging.getLogger(__name__)
class DataIngestionPipeline:
"""
Handles real-time data ingestion from WebSocket sources.
Writes to cache immediately, persists to DB asynchronously.
Features:
- Immediate cache writes (<1ms)
- Batch database writes (100 items or 5 seconds)
- Data validation before storage
- Retry logic with exponential backoff
- Performance monitoring
"""
def __init__(
self,
cache_manager: DataCacheManager,
db_connection_manager: DatabaseConnectionManager,
batch_size: int = 100,
batch_timeout_seconds: float = 5.0
):
"""
Initialize ingestion pipeline.
Args:
cache_manager: Cache manager instance
db_connection_manager: Database connection manager
batch_size: Number of items before flushing to DB
batch_timeout_seconds: Seconds before flushing to DB
"""
self.cache = cache_manager
self.db = db_connection_manager
# Batch write settings
self.batch_size = batch_size
self.batch_timeout = batch_timeout_seconds
# Batch write buffers with locks
self.lock = Lock()
self.ohlcv_buffer: List[Dict] = []
self.orderbook_buffer: List[Dict] = []
self.imbalance_buffer: List[Dict] = []
self.trade_buffer: List[Dict] = []
# Last flush times
self.last_ohlcv_flush = time.time()
self.last_orderbook_flush = time.time()
self.last_imbalance_flush = time.time()
self.last_trade_flush = time.time()
# Statistics
self.stats = {
'ohlcv_ingested': 0,
'orderbook_ingested': 0,
'imbalance_ingested': 0,
'trade_ingested': 0,
'ohlcv_validated': 0,
'orderbook_validated': 0,
'trade_validated': 0,
'validation_failures': 0,
'db_writes': 0,
'db_write_failures': 0,
'cache_writes': 0,
'total_latency_ms': 0.0
}
# Background flush task
self.flush_task = None
self.is_running = False
logger.info(f"DataIngestionPipeline initialized (batch_size={batch_size}, timeout={batch_timeout}s)")
def start(self):
"""Start background flush task."""
if not self.is_running:
self.is_running = True
self.flush_task = asyncio.create_task(self._background_flush_worker())
logger.info("Ingestion pipeline started")
async def stop(self):
"""Stop background flush task and flush remaining data."""
self.is_running = False
if self.flush_task:
self.flush_task.cancel()
try:
await self.flush_task
except asyncio.CancelledError:
pass
# Flush remaining data
await self._flush_all_buffers()
logger.info("Ingestion pipeline stopped")
async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
"""
Ingest OHLCV candle.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
timeframe: Timeframe
candle: Candle dictionary
"""
start_time = time.time()
try:
# Validate candle
is_valid, error = DataValidator.validate_ohlcv(candle)
if not is_valid:
logger.warning(f"Invalid OHLCV candle for {symbol} {timeframe}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['ohlcv_validated'] += 1
# Sanitize data
candle = DataValidator.sanitize_ohlcv(candle)
# Immediate cache write
self.cache.add_ohlcv_candle(symbol, timeframe, candle)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.ohlcv_buffer.append({
'symbol': symbol,
'timeframe': timeframe,
**candle
})
# Check if buffer is full
if len(self.ohlcv_buffer) >= self.batch_size:
asyncio.create_task(self._flush_ohlcv_buffer())
self.stats['ohlcv_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested OHLCV candle: {symbol} {timeframe} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting OHLCV candle: {e}")
async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict):
"""
Ingest order book snapshot.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
snapshot: Order book snapshot dictionary
"""
start_time = time.time()
try:
# Validate order book
is_valid, error = DataValidator.validate_orderbook(snapshot)
if not is_valid:
logger.warning(f"Invalid order book for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['orderbook_validated'] += 1
# Sanitize data
snapshot = DataValidator.sanitize_orderbook(snapshot)
# Immediate cache write
self.cache.add_orderbook_snapshot(symbol, snapshot)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.orderbook_buffer.append({
'symbol': symbol,
**snapshot
})
# Check if buffer is full
if len(self.orderbook_buffer) >= self.batch_size:
asyncio.create_task(self._flush_orderbook_buffer())
self.stats['orderbook_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested order book snapshot: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting order book snapshot: {e}")
async def ingest_imbalance_data(self, symbol: str, imbalance: Dict):
"""
Ingest order book imbalance metrics.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
imbalance: Imbalance metrics dictionary
"""
start_time = time.time()
try:
# Validate imbalances
is_valid, error = DataValidator.validate_imbalances(imbalance)
if not is_valid:
logger.warning(f"Invalid imbalances for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
# Immediate cache write
self.cache.add_imbalance_data(symbol, imbalance)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.imbalance_buffer.append({
'symbol': symbol,
**imbalance
})
# Check if buffer is full
if len(self.imbalance_buffer) >= self.batch_size:
asyncio.create_task(self._flush_imbalance_buffer())
self.stats['imbalance_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested imbalance data: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting imbalance data: {e}")
async def ingest_trade(self, symbol: str, trade: Dict):
"""
Ingest trade event.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
trade: Trade event dictionary
"""
start_time = time.time()
try:
# Validate trade
is_valid, error = DataValidator.validate_trade(trade)
if not is_valid:
logger.warning(f"Invalid trade for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['trade_validated'] += 1
# Immediate cache write
self.cache.add_trade(symbol, trade)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.trade_buffer.append({
'symbol': symbol,
**trade
})
# Check if buffer is full
if len(self.trade_buffer) >= self.batch_size:
asyncio.create_task(self._flush_trade_buffer())
self.stats['trade_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested trade: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting trade: {e}")
async def _flush_ohlcv_buffer(self):
"""Batch write OHLCV data to database."""
with self.lock:
if not self.ohlcv_buffer:
return
# Get buffer contents and clear
buffer_copy = self.ohlcv_buffer.copy()
self.ohlcv_buffer.clear()
self.last_ohlcv_flush = time.time()
try:
# Prepare batch insert
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item['timeframe'],
float(item['open_price']),
float(item['high_price']),
float(item['low_price']),
float(item['close_price']),
float(item['volume']),
int(item.get('trade_count', 0))
))
# Batch insert with conflict handling
query = """
INSERT INTO ohlcv_data
(timestamp, symbol, timeframe, open_price, high_price,
low_price, close_price, volume, trade_count)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE
SET close_price = EXCLUDED.close_price,
high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price),
low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price),
volume = ohlcv_data.volume + EXCLUDED.volume,
trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} OHLCV candles to database")
except Exception as e:
logger.error(f"Error flushing OHLCV buffer: {e}")
self.stats['db_write_failures'] += 1
# Re-add to buffer for retry
with self.lock:
self.ohlcv_buffer.extend(buffer_copy)
async def _flush_orderbook_buffer(self):
"""Batch write order book data to database."""
with self.lock:
if not self.orderbook_buffer:
return
buffer_copy = self.orderbook_buffer.copy()
self.orderbook_buffer.clear()
self.last_orderbook_flush = time.time()
try:
# Prepare batch insert
values = []
for item in buffer_copy:
import json
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item.get('exchange', 'binance'),
json.dumps(item.get('bids', [])),
json.dumps(item.get('asks', [])),
float(item.get('mid_price', 0)),
float(item.get('spread', 0)),
float(item.get('bid_volume', 0)),
float(item.get('ask_volume', 0)),
item.get('sequence_id')
))
query = """
INSERT INTO order_book_snapshots
(timestamp, symbol, exchange, bids, asks, mid_price, spread,
bid_volume, ask_volume, sequence_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (timestamp, symbol, exchange) DO UPDATE
SET bids = EXCLUDED.bids,
asks = EXCLUDED.asks,
mid_price = EXCLUDED.mid_price,
spread = EXCLUDED.spread,
bid_volume = EXCLUDED.bid_volume,
ask_volume = EXCLUDED.ask_volume
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} order book snapshots to database")
except Exception as e:
logger.error(f"Error flushing order book buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.orderbook_buffer.extend(buffer_copy)
async def _flush_imbalance_buffer(self):
"""Batch write imbalance data to database."""
with self.lock:
if not self.imbalance_buffer:
return
buffer_copy = self.imbalance_buffer.copy()
self.imbalance_buffer.clear()
self.last_imbalance_flush = time.time()
try:
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
float(item.get('imbalance_1s', 0)),
float(item.get('imbalance_5s', 0)),
float(item.get('imbalance_15s', 0)),
float(item.get('imbalance_60s', 0)),
float(item.get('volume_imbalance_1s', 0)),
float(item.get('volume_imbalance_5s', 0)),
float(item.get('volume_imbalance_15s', 0)),
float(item.get('volume_imbalance_60s', 0)),
float(item.get('price_range', 0))
))
query = """
INSERT INTO order_book_imbalances
(timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s,
volume_imbalance_1s, volume_imbalance_5s, volume_imbalance_15s, volume_imbalance_60s,
price_range)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (timestamp, symbol) DO UPDATE
SET imbalance_1s = EXCLUDED.imbalance_1s,
imbalance_5s = EXCLUDED.imbalance_5s,
imbalance_15s = EXCLUDED.imbalance_15s,
imbalance_60s = EXCLUDED.imbalance_60s,
volume_imbalance_1s = EXCLUDED.volume_imbalance_1s,
volume_imbalance_5s = EXCLUDED.volume_imbalance_5s,
volume_imbalance_15s = EXCLUDED.volume_imbalance_15s,
volume_imbalance_60s = EXCLUDED.volume_imbalance_60s
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} imbalance entries to database")
except Exception as e:
logger.error(f"Error flushing imbalance buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.imbalance_buffer.extend(buffer_copy)
async def _flush_trade_buffer(self):
"""Batch write trade data to database."""
with self.lock:
if not self.trade_buffer:
return
buffer_copy = self.trade_buffer.copy()
self.trade_buffer.clear()
self.last_trade_flush = time.time()
try:
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item.get('exchange', 'binance'),
float(item['price']),
float(item['size']),
item['side'],
str(item['trade_id']),
item.get('is_buyer_maker', False)
))
query = """
INSERT INTO trade_events
(timestamp, symbol, exchange, price, size, side, trade_id, is_buyer_maker)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} trades to database")
except Exception as e:
logger.error(f"Error flushing trade buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.trade_buffer.extend(buffer_copy)
async def _background_flush_worker(self):
"""Background worker to flush buffers based on timeout."""
logger.info("Background flush worker started")
while self.is_running:
try:
await asyncio.sleep(1) # Check every second
current_time = time.time()
# Check OHLCV buffer timeout
if current_time - self.last_ohlcv_flush >= self.batch_timeout:
if self.ohlcv_buffer:
await self._flush_ohlcv_buffer()
# Check order book buffer timeout
if current_time - self.last_orderbook_flush >= self.batch_timeout:
if self.orderbook_buffer:
await self._flush_orderbook_buffer()
# Check imbalance buffer timeout
if current_time - self.last_imbalance_flush >= self.batch_timeout:
if self.imbalance_buffer:
await self._flush_imbalance_buffer()
# Check trade buffer timeout
if current_time - self.last_trade_flush >= self.batch_timeout:
if self.trade_buffer:
await self._flush_trade_buffer()
# Auto-evict old cache data
self.cache.auto_evict_if_needed()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in background flush worker: {e}")
logger.info("Background flush worker stopped")
async def _flush_all_buffers(self):
"""Flush all buffers to database."""
logger.info("Flushing all buffers...")
await self._flush_ohlcv_buffer()
await self._flush_orderbook_buffer()
await self._flush_imbalance_buffer()
await self._flush_trade_buffer()
logger.info("All buffers flushed")
def get_stats(self) -> Dict[str, Any]:
"""Get ingestion pipeline statistics."""
with self.lock:
total_ingested = (
self.stats['ohlcv_ingested'] +
self.stats['orderbook_ingested'] +
self.stats['imbalance_ingested'] +
self.stats['trade_ingested']
)
avg_latency = (
self.stats['total_latency_ms'] / total_ingested
if total_ingested > 0 else 0
)
return {
**self.stats,
'total_ingested': total_ingested,
'avg_latency_ms': round(avg_latency, 2),
'buffer_sizes': {
'ohlcv': len(self.ohlcv_buffer),
'orderbook': len(self.orderbook_buffer),
'imbalance': len(self.imbalance_buffer),
'trade': len(self.trade_buffer)
},
'is_running': self.is_running
}