""" Data Ingestion Pipeline for unified storage system. Handles real-time data ingestion with batch writes to database. """ import asyncio import logging import time from datetime import datetime, timezone from typing import Dict, List, Optional, Any from collections import deque from threading import Lock from .unified_cache_manager import DataCacheManager from .unified_database_manager import DatabaseConnectionManager from .unified_data_validator import DataValidator from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent logger = logging.getLogger(__name__) class DataIngestionPipeline: """ Handles real-time data ingestion from WebSocket sources. Writes to cache immediately, persists to DB asynchronously. Features: - Immediate cache writes (<1ms) - Batch database writes (100 items or 5 seconds) - Data validation before storage - Retry logic with exponential backoff - Performance monitoring """ def __init__( self, cache_manager: DataCacheManager, db_connection_manager: DatabaseConnectionManager, batch_size: int = 100, batch_timeout_seconds: float = 5.0 ): """ Initialize ingestion pipeline. Args: cache_manager: Cache manager instance db_connection_manager: Database connection manager batch_size: Number of items before flushing to DB batch_timeout_seconds: Seconds before flushing to DB """ self.cache = cache_manager self.db = db_connection_manager # Batch write settings self.batch_size = batch_size self.batch_timeout = batch_timeout_seconds # Batch write buffers with locks self.lock = Lock() self.ohlcv_buffer: List[Dict] = [] self.orderbook_buffer: List[Dict] = [] self.imbalance_buffer: List[Dict] = [] self.trade_buffer: List[Dict] = [] # Last flush times self.last_ohlcv_flush = time.time() self.last_orderbook_flush = time.time() self.last_imbalance_flush = time.time() self.last_trade_flush = time.time() # Statistics self.stats = { 'ohlcv_ingested': 0, 'orderbook_ingested': 0, 'imbalance_ingested': 0, 'trade_ingested': 0, 'ohlcv_validated': 0, 'orderbook_validated': 0, 'trade_validated': 0, 'validation_failures': 0, 'db_writes': 0, 'db_write_failures': 0, 'cache_writes': 0, 'total_latency_ms': 0.0 } # Background flush task self.flush_task = None self.is_running = False logger.info(f"DataIngestionPipeline initialized (batch_size={batch_size}, timeout={batch_timeout}s)") def start(self): """Start background flush task.""" if not self.is_running: self.is_running = True self.flush_task = asyncio.create_task(self._background_flush_worker()) logger.info("Ingestion pipeline started") async def stop(self): """Stop background flush task and flush remaining data.""" self.is_running = False if self.flush_task: self.flush_task.cancel() try: await self.flush_task except asyncio.CancelledError: pass # Flush remaining data await self._flush_all_buffers() logger.info("Ingestion pipeline stopped") async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict): """ Ingest OHLCV candle. 1. Validate data 2. Add to cache immediately 3. Buffer for batch write to DB Args: symbol: Trading symbol timeframe: Timeframe candle: Candle dictionary """ start_time = time.time() try: # Validate candle is_valid, error = DataValidator.validate_ohlcv(candle) if not is_valid: logger.warning(f"Invalid OHLCV candle for {symbol} {timeframe}: {error}") self.stats['validation_failures'] += 1 return self.stats['ohlcv_validated'] += 1 # Sanitize data candle = DataValidator.sanitize_ohlcv(candle) # Immediate cache write self.cache.add_ohlcv_candle(symbol, timeframe, candle) self.stats['cache_writes'] += 1 # Buffer for DB write with self.lock: self.ohlcv_buffer.append({ 'symbol': symbol, 'timeframe': timeframe, **candle }) # Check if buffer is full if len(self.ohlcv_buffer) >= self.batch_size: asyncio.create_task(self._flush_ohlcv_buffer()) self.stats['ohlcv_ingested'] += 1 # Track latency latency_ms = (time.time() - start_time) * 1000 self.stats['total_latency_ms'] += latency_ms logger.debug(f"Ingested OHLCV candle: {symbol} {timeframe} ({latency_ms:.2f}ms)") except Exception as e: logger.error(f"Error ingesting OHLCV candle: {e}") async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict): """ Ingest order book snapshot. 1. Validate data 2. Add to cache immediately 3. Buffer for batch write to DB Args: symbol: Trading symbol snapshot: Order book snapshot dictionary """ start_time = time.time() try: # Validate order book is_valid, error = DataValidator.validate_orderbook(snapshot) if not is_valid: logger.warning(f"Invalid order book for {symbol}: {error}") self.stats['validation_failures'] += 1 return self.stats['orderbook_validated'] += 1 # Sanitize data snapshot = DataValidator.sanitize_orderbook(snapshot) # Immediate cache write self.cache.add_orderbook_snapshot(symbol, snapshot) self.stats['cache_writes'] += 1 # Buffer for DB write with self.lock: self.orderbook_buffer.append({ 'symbol': symbol, **snapshot }) # Check if buffer is full if len(self.orderbook_buffer) >= self.batch_size: asyncio.create_task(self._flush_orderbook_buffer()) self.stats['orderbook_ingested'] += 1 # Track latency latency_ms = (time.time() - start_time) * 1000 self.stats['total_latency_ms'] += latency_ms logger.debug(f"Ingested order book snapshot: {symbol} ({latency_ms:.2f}ms)") except Exception as e: logger.error(f"Error ingesting order book snapshot: {e}") async def ingest_imbalance_data(self, symbol: str, imbalance: Dict): """ Ingest order book imbalance metrics. 1. Validate data 2. Add to cache immediately 3. Buffer for batch write to DB Args: symbol: Trading symbol imbalance: Imbalance metrics dictionary """ start_time = time.time() try: # Validate imbalances is_valid, error = DataValidator.validate_imbalances(imbalance) if not is_valid: logger.warning(f"Invalid imbalances for {symbol}: {error}") self.stats['validation_failures'] += 1 return # Immediate cache write self.cache.add_imbalance_data(symbol, imbalance) self.stats['cache_writes'] += 1 # Buffer for DB write with self.lock: self.imbalance_buffer.append({ 'symbol': symbol, **imbalance }) # Check if buffer is full if len(self.imbalance_buffer) >= self.batch_size: asyncio.create_task(self._flush_imbalance_buffer()) self.stats['imbalance_ingested'] += 1 # Track latency latency_ms = (time.time() - start_time) * 1000 self.stats['total_latency_ms'] += latency_ms logger.debug(f"Ingested imbalance data: {symbol} ({latency_ms:.2f}ms)") except Exception as e: logger.error(f"Error ingesting imbalance data: {e}") async def ingest_trade(self, symbol: str, trade: Dict): """ Ingest trade event. 1. Validate data 2. Add to cache immediately 3. Buffer for batch write to DB Args: symbol: Trading symbol trade: Trade event dictionary """ start_time = time.time() try: # Validate trade is_valid, error = DataValidator.validate_trade(trade) if not is_valid: logger.warning(f"Invalid trade for {symbol}: {error}") self.stats['validation_failures'] += 1 return self.stats['trade_validated'] += 1 # Immediate cache write self.cache.add_trade(symbol, trade) self.stats['cache_writes'] += 1 # Buffer for DB write with self.lock: self.trade_buffer.append({ 'symbol': symbol, **trade }) # Check if buffer is full if len(self.trade_buffer) >= self.batch_size: asyncio.create_task(self._flush_trade_buffer()) self.stats['trade_ingested'] += 1 # Track latency latency_ms = (time.time() - start_time) * 1000 self.stats['total_latency_ms'] += latency_ms logger.debug(f"Ingested trade: {symbol} ({latency_ms:.2f}ms)") except Exception as e: logger.error(f"Error ingesting trade: {e}") async def _flush_ohlcv_buffer(self): """Batch write OHLCV data to database.""" with self.lock: if not self.ohlcv_buffer: return # Get buffer contents and clear buffer_copy = self.ohlcv_buffer.copy() self.ohlcv_buffer.clear() self.last_ohlcv_flush = time.time() try: # Prepare batch insert values = [] for item in buffer_copy: values.append(( item.get('timestamp', datetime.now(timezone.utc)), item['symbol'], item['timeframe'], float(item['open_price']), float(item['high_price']), float(item['low_price']), float(item['close_price']), float(item['volume']), int(item.get('trade_count', 0)) )) # Batch insert with conflict handling query = """ INSERT INTO ohlcv_data (timestamp, symbol, timeframe, open_price, high_price, low_price, close_price, volume, trade_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE SET close_price = EXCLUDED.close_price, high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price), low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price), volume = ohlcv_data.volume + EXCLUDED.volume, trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count """ await self.db.executemany(query, values) self.stats['db_writes'] += 1 logger.debug(f"Flushed {len(values)} OHLCV candles to database") except Exception as e: logger.error(f"Error flushing OHLCV buffer: {e}") self.stats['db_write_failures'] += 1 # Re-add to buffer for retry with self.lock: self.ohlcv_buffer.extend(buffer_copy) async def _flush_orderbook_buffer(self): """Batch write order book data to database.""" with self.lock: if not self.orderbook_buffer: return buffer_copy = self.orderbook_buffer.copy() self.orderbook_buffer.clear() self.last_orderbook_flush = time.time() try: # Prepare batch insert values = [] for item in buffer_copy: import json values.append(( item.get('timestamp', datetime.now(timezone.utc)), item['symbol'], item.get('exchange', 'binance'), json.dumps(item.get('bids', [])), json.dumps(item.get('asks', [])), float(item.get('mid_price', 0)), float(item.get('spread', 0)), float(item.get('bid_volume', 0)), float(item.get('ask_volume', 0)), item.get('sequence_id') )) query = """ INSERT INTO order_book_snapshots (timestamp, symbol, exchange, bids, asks, mid_price, spread, bid_volume, ask_volume, sequence_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (timestamp, symbol, exchange) DO UPDATE SET bids = EXCLUDED.bids, asks = EXCLUDED.asks, mid_price = EXCLUDED.mid_price, spread = EXCLUDED.spread, bid_volume = EXCLUDED.bid_volume, ask_volume = EXCLUDED.ask_volume """ await self.db.executemany(query, values) self.stats['db_writes'] += 1 logger.debug(f"Flushed {len(values)} order book snapshots to database") except Exception as e: logger.error(f"Error flushing order book buffer: {e}") self.stats['db_write_failures'] += 1 with self.lock: self.orderbook_buffer.extend(buffer_copy) async def _flush_imbalance_buffer(self): """Batch write imbalance data to database.""" with self.lock: if not self.imbalance_buffer: return buffer_copy = self.imbalance_buffer.copy() self.imbalance_buffer.clear() self.last_imbalance_flush = time.time() try: values = [] for item in buffer_copy: values.append(( item.get('timestamp', datetime.now(timezone.utc)), item['symbol'], float(item.get('imbalance_1s', 0)), float(item.get('imbalance_5s', 0)), float(item.get('imbalance_15s', 0)), float(item.get('imbalance_60s', 0)), float(item.get('volume_imbalance_1s', 0)), float(item.get('volume_imbalance_5s', 0)), float(item.get('volume_imbalance_15s', 0)), float(item.get('volume_imbalance_60s', 0)), float(item.get('price_range', 0)) )) query = """ INSERT INTO order_book_imbalances (timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s, volume_imbalance_1s, volume_imbalance_5s, volume_imbalance_15s, volume_imbalance_60s, price_range) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (timestamp, symbol) DO UPDATE SET imbalance_1s = EXCLUDED.imbalance_1s, imbalance_5s = EXCLUDED.imbalance_5s, imbalance_15s = EXCLUDED.imbalance_15s, imbalance_60s = EXCLUDED.imbalance_60s, volume_imbalance_1s = EXCLUDED.volume_imbalance_1s, volume_imbalance_5s = EXCLUDED.volume_imbalance_5s, volume_imbalance_15s = EXCLUDED.volume_imbalance_15s, volume_imbalance_60s = EXCLUDED.volume_imbalance_60s """ await self.db.executemany(query, values) self.stats['db_writes'] += 1 logger.debug(f"Flushed {len(values)} imbalance entries to database") except Exception as e: logger.error(f"Error flushing imbalance buffer: {e}") self.stats['db_write_failures'] += 1 with self.lock: self.imbalance_buffer.extend(buffer_copy) async def _flush_trade_buffer(self): """Batch write trade data to database.""" with self.lock: if not self.trade_buffer: return buffer_copy = self.trade_buffer.copy() self.trade_buffer.clear() self.last_trade_flush = time.time() try: values = [] for item in buffer_copy: values.append(( item.get('timestamp', datetime.now(timezone.utc)), item['symbol'], item.get('exchange', 'binance'), float(item['price']), float(item['size']), item['side'], str(item['trade_id']), item.get('is_buyer_maker', False) )) query = """ INSERT INTO trade_events (timestamp, symbol, exchange, price, size, side, trade_id, is_buyer_maker) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING """ await self.db.executemany(query, values) self.stats['db_writes'] += 1 logger.debug(f"Flushed {len(values)} trades to database") except Exception as e: logger.error(f"Error flushing trade buffer: {e}") self.stats['db_write_failures'] += 1 with self.lock: self.trade_buffer.extend(buffer_copy) async def _background_flush_worker(self): """Background worker to flush buffers based on timeout.""" logger.info("Background flush worker started") while self.is_running: try: await asyncio.sleep(1) # Check every second current_time = time.time() # Check OHLCV buffer timeout if current_time - self.last_ohlcv_flush >= self.batch_timeout: if self.ohlcv_buffer: await self._flush_ohlcv_buffer() # Check order book buffer timeout if current_time - self.last_orderbook_flush >= self.batch_timeout: if self.orderbook_buffer: await self._flush_orderbook_buffer() # Check imbalance buffer timeout if current_time - self.last_imbalance_flush >= self.batch_timeout: if self.imbalance_buffer: await self._flush_imbalance_buffer() # Check trade buffer timeout if current_time - self.last_trade_flush >= self.batch_timeout: if self.trade_buffer: await self._flush_trade_buffer() # Auto-evict old cache data self.cache.auto_evict_if_needed() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in background flush worker: {e}") logger.info("Background flush worker stopped") async def _flush_all_buffers(self): """Flush all buffers to database.""" logger.info("Flushing all buffers...") await self._flush_ohlcv_buffer() await self._flush_orderbook_buffer() await self._flush_imbalance_buffer() await self._flush_trade_buffer() logger.info("All buffers flushed") def get_stats(self) -> Dict[str, Any]: """Get ingestion pipeline statistics.""" with self.lock: total_ingested = ( self.stats['ohlcv_ingested'] + self.stats['orderbook_ingested'] + self.stats['imbalance_ingested'] + self.stats['trade_ingested'] ) avg_latency = ( self.stats['total_latency_ms'] / total_ingested if total_ingested > 0 else 0 ) return { **self.stats, 'total_ingested': total_ingested, 'avg_latency_ms': round(avg_latency, 2), 'buffer_sizes': { 'ohlcv': len(self.ohlcv_buffer), 'orderbook': len(self.orderbook_buffer), 'imbalance': len(self.imbalance_buffer), 'trade': len(self.trade_buffer) }, 'is_running': self.is_running }