Files
gogo2/core/unified_ingestion_pipeline.py
2025-10-20 16:17:43 +03:00

603 lines
22 KiB
Python

"""
Data Ingestion Pipeline for unified storage system.
Handles real-time data ingestion with batch writes to database.
"""
import asyncio
import logging
import time
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from collections import deque
from threading import Lock
from .unified_cache_manager import DataCacheManager
from .unified_database_manager import DatabaseConnectionManager
from .unified_data_validator import DataValidator
from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent
logger = logging.getLogger(__name__)
class DataIngestionPipeline:
"""
Handles real-time data ingestion from WebSocket sources.
Writes to cache immediately, persists to DB asynchronously.
Features:
- Immediate cache writes (<1ms)
- Batch database writes (100 items or 5 seconds)
- Data validation before storage
- Retry logic with exponential backoff
- Performance monitoring
"""
def __init__(
self,
cache_manager: DataCacheManager,
db_connection_manager: DatabaseConnectionManager,
batch_size: int = 100,
batch_timeout_seconds: float = 5.0
):
"""
Initialize ingestion pipeline.
Args:
cache_manager: Cache manager instance
db_connection_manager: Database connection manager
batch_size: Number of items before flushing to DB
batch_timeout_seconds: Seconds before flushing to DB
"""
self.cache = cache_manager
self.db = db_connection_manager
# Batch write settings
self.batch_size = batch_size
self.batch_timeout = batch_timeout_seconds
# Batch write buffers with locks
self.lock = Lock()
self.ohlcv_buffer: List[Dict] = []
self.orderbook_buffer: List[Dict] = []
self.imbalance_buffer: List[Dict] = []
self.trade_buffer: List[Dict] = []
# Last flush times
self.last_ohlcv_flush = time.time()
self.last_orderbook_flush = time.time()
self.last_imbalance_flush = time.time()
self.last_trade_flush = time.time()
# Statistics
self.stats = {
'ohlcv_ingested': 0,
'orderbook_ingested': 0,
'imbalance_ingested': 0,
'trade_ingested': 0,
'ohlcv_validated': 0,
'orderbook_validated': 0,
'trade_validated': 0,
'validation_failures': 0,
'db_writes': 0,
'db_write_failures': 0,
'cache_writes': 0,
'total_latency_ms': 0.0
}
# Background flush task
self.flush_task = None
self.is_running = False
logger.info(f"DataIngestionPipeline initialized (batch_size={batch_size}, timeout={batch_timeout}s)")
def start(self):
"""Start background flush task."""
if not self.is_running:
self.is_running = True
self.flush_task = asyncio.create_task(self._background_flush_worker())
logger.info("Ingestion pipeline started")
async def stop(self):
"""Stop background flush task and flush remaining data."""
self.is_running = False
if self.flush_task:
self.flush_task.cancel()
try:
await self.flush_task
except asyncio.CancelledError:
pass
# Flush remaining data
await self._flush_all_buffers()
logger.info("Ingestion pipeline stopped")
async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
"""
Ingest OHLCV candle.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
timeframe: Timeframe
candle: Candle dictionary
"""
start_time = time.time()
try:
# Validate candle
is_valid, error = DataValidator.validate_ohlcv(candle)
if not is_valid:
logger.warning(f"Invalid OHLCV candle for {symbol} {timeframe}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['ohlcv_validated'] += 1
# Sanitize data
candle = DataValidator.sanitize_ohlcv(candle)
# Immediate cache write
self.cache.add_ohlcv_candle(symbol, timeframe, candle)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.ohlcv_buffer.append({
'symbol': symbol,
'timeframe': timeframe,
**candle
})
# Check if buffer is full
if len(self.ohlcv_buffer) >= self.batch_size:
asyncio.create_task(self._flush_ohlcv_buffer())
self.stats['ohlcv_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested OHLCV candle: {symbol} {timeframe} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting OHLCV candle: {e}")
async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict):
"""
Ingest order book snapshot.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
snapshot: Order book snapshot dictionary
"""
start_time = time.time()
try:
# Validate order book
is_valid, error = DataValidator.validate_orderbook(snapshot)
if not is_valid:
logger.warning(f"Invalid order book for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['orderbook_validated'] += 1
# Sanitize data
snapshot = DataValidator.sanitize_orderbook(snapshot)
# Immediate cache write
self.cache.add_orderbook_snapshot(symbol, snapshot)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.orderbook_buffer.append({
'symbol': symbol,
**snapshot
})
# Check if buffer is full
if len(self.orderbook_buffer) >= self.batch_size:
asyncio.create_task(self._flush_orderbook_buffer())
self.stats['orderbook_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested order book snapshot: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting order book snapshot: {e}")
async def ingest_imbalance_data(self, symbol: str, imbalance: Dict):
"""
Ingest order book imbalance metrics.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
imbalance: Imbalance metrics dictionary
"""
start_time = time.time()
try:
# Validate imbalances
is_valid, error = DataValidator.validate_imbalances(imbalance)
if not is_valid:
logger.warning(f"Invalid imbalances for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
# Immediate cache write
self.cache.add_imbalance_data(symbol, imbalance)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.imbalance_buffer.append({
'symbol': symbol,
**imbalance
})
# Check if buffer is full
if len(self.imbalance_buffer) >= self.batch_size:
asyncio.create_task(self._flush_imbalance_buffer())
self.stats['imbalance_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested imbalance data: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting imbalance data: {e}")
async def ingest_trade(self, symbol: str, trade: Dict):
"""
Ingest trade event.
1. Validate data
2. Add to cache immediately
3. Buffer for batch write to DB
Args:
symbol: Trading symbol
trade: Trade event dictionary
"""
start_time = time.time()
try:
# Validate trade
is_valid, error = DataValidator.validate_trade(trade)
if not is_valid:
logger.warning(f"Invalid trade for {symbol}: {error}")
self.stats['validation_failures'] += 1
return
self.stats['trade_validated'] += 1
# Immediate cache write
self.cache.add_trade(symbol, trade)
self.stats['cache_writes'] += 1
# Buffer for DB write
with self.lock:
self.trade_buffer.append({
'symbol': symbol,
**trade
})
# Check if buffer is full
if len(self.trade_buffer) >= self.batch_size:
asyncio.create_task(self._flush_trade_buffer())
self.stats['trade_ingested'] += 1
# Track latency
latency_ms = (time.time() - start_time) * 1000
self.stats['total_latency_ms'] += latency_ms
logger.debug(f"Ingested trade: {symbol} ({latency_ms:.2f}ms)")
except Exception as e:
logger.error(f"Error ingesting trade: {e}")
async def _flush_ohlcv_buffer(self):
"""Batch write OHLCV data to database."""
with self.lock:
if not self.ohlcv_buffer:
return
# Get buffer contents and clear
buffer_copy = self.ohlcv_buffer.copy()
self.ohlcv_buffer.clear()
self.last_ohlcv_flush = time.time()
try:
# Prepare batch insert
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item['timeframe'],
float(item['open_price']),
float(item['high_price']),
float(item['low_price']),
float(item['close_price']),
float(item['volume']),
int(item.get('trade_count', 0))
))
# Batch insert with conflict handling
query = """
INSERT INTO ohlcv_data
(timestamp, symbol, timeframe, open_price, high_price,
low_price, close_price, volume, trade_count)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE
SET close_price = EXCLUDED.close_price,
high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price),
low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price),
volume = ohlcv_data.volume + EXCLUDED.volume,
trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} OHLCV candles to database")
except Exception as e:
logger.error(f"Error flushing OHLCV buffer: {e}")
self.stats['db_write_failures'] += 1
# Re-add to buffer for retry
with self.lock:
self.ohlcv_buffer.extend(buffer_copy)
async def _flush_orderbook_buffer(self):
"""Batch write order book data to database."""
with self.lock:
if not self.orderbook_buffer:
return
buffer_copy = self.orderbook_buffer.copy()
self.orderbook_buffer.clear()
self.last_orderbook_flush = time.time()
try:
# Prepare batch insert
values = []
for item in buffer_copy:
import json
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item.get('exchange', 'binance'),
json.dumps(item.get('bids', [])),
json.dumps(item.get('asks', [])),
float(item.get('mid_price', 0)),
float(item.get('spread', 0)),
float(item.get('bid_volume', 0)),
float(item.get('ask_volume', 0)),
item.get('sequence_id')
))
query = """
INSERT INTO order_book_snapshots
(timestamp, symbol, exchange, bids, asks, mid_price, spread,
bid_volume, ask_volume, sequence_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (timestamp, symbol, exchange) DO UPDATE
SET bids = EXCLUDED.bids,
asks = EXCLUDED.asks,
mid_price = EXCLUDED.mid_price,
spread = EXCLUDED.spread,
bid_volume = EXCLUDED.bid_volume,
ask_volume = EXCLUDED.ask_volume
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} order book snapshots to database")
except Exception as e:
logger.error(f"Error flushing order book buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.orderbook_buffer.extend(buffer_copy)
async def _flush_imbalance_buffer(self):
"""Batch write imbalance data to database."""
with self.lock:
if not self.imbalance_buffer:
return
buffer_copy = self.imbalance_buffer.copy()
self.imbalance_buffer.clear()
self.last_imbalance_flush = time.time()
try:
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
float(item.get('imbalance_1s', 0)),
float(item.get('imbalance_5s', 0)),
float(item.get('imbalance_15s', 0)),
float(item.get('imbalance_60s', 0)),
float(item.get('volume_imbalance_1s', 0)),
float(item.get('volume_imbalance_5s', 0)),
float(item.get('volume_imbalance_15s', 0)),
float(item.get('volume_imbalance_60s', 0)),
float(item.get('price_range', 0))
))
query = """
INSERT INTO order_book_imbalances
(timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s,
volume_imbalance_1s, volume_imbalance_5s, volume_imbalance_15s, volume_imbalance_60s,
price_range)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (timestamp, symbol) DO UPDATE
SET imbalance_1s = EXCLUDED.imbalance_1s,
imbalance_5s = EXCLUDED.imbalance_5s,
imbalance_15s = EXCLUDED.imbalance_15s,
imbalance_60s = EXCLUDED.imbalance_60s,
volume_imbalance_1s = EXCLUDED.volume_imbalance_1s,
volume_imbalance_5s = EXCLUDED.volume_imbalance_5s,
volume_imbalance_15s = EXCLUDED.volume_imbalance_15s,
volume_imbalance_60s = EXCLUDED.volume_imbalance_60s
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} imbalance entries to database")
except Exception as e:
logger.error(f"Error flushing imbalance buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.imbalance_buffer.extend(buffer_copy)
async def _flush_trade_buffer(self):
"""Batch write trade data to database."""
with self.lock:
if not self.trade_buffer:
return
buffer_copy = self.trade_buffer.copy()
self.trade_buffer.clear()
self.last_trade_flush = time.time()
try:
values = []
for item in buffer_copy:
values.append((
item.get('timestamp', datetime.now(timezone.utc)),
item['symbol'],
item.get('exchange', 'binance'),
float(item['price']),
float(item['size']),
item['side'],
str(item['trade_id']),
item.get('is_buyer_maker', False)
))
query = """
INSERT INTO trade_events
(timestamp, symbol, exchange, price, size, side, trade_id, is_buyer_maker)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING
"""
await self.db.executemany(query, values)
self.stats['db_writes'] += 1
logger.debug(f"Flushed {len(values)} trades to database")
except Exception as e:
logger.error(f"Error flushing trade buffer: {e}")
self.stats['db_write_failures'] += 1
with self.lock:
self.trade_buffer.extend(buffer_copy)
async def _background_flush_worker(self):
"""Background worker to flush buffers based on timeout."""
logger.info("Background flush worker started")
while self.is_running:
try:
await asyncio.sleep(1) # Check every second
current_time = time.time()
# Check OHLCV buffer timeout
if current_time - self.last_ohlcv_flush >= self.batch_timeout:
if self.ohlcv_buffer:
await self._flush_ohlcv_buffer()
# Check order book buffer timeout
if current_time - self.last_orderbook_flush >= self.batch_timeout:
if self.orderbook_buffer:
await self._flush_orderbook_buffer()
# Check imbalance buffer timeout
if current_time - self.last_imbalance_flush >= self.batch_timeout:
if self.imbalance_buffer:
await self._flush_imbalance_buffer()
# Check trade buffer timeout
if current_time - self.last_trade_flush >= self.batch_timeout:
if self.trade_buffer:
await self._flush_trade_buffer()
# Auto-evict old cache data
self.cache.auto_evict_if_needed()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in background flush worker: {e}")
logger.info("Background flush worker stopped")
async def _flush_all_buffers(self):
"""Flush all buffers to database."""
logger.info("Flushing all buffers...")
await self._flush_ohlcv_buffer()
await self._flush_orderbook_buffer()
await self._flush_imbalance_buffer()
await self._flush_trade_buffer()
logger.info("All buffers flushed")
def get_stats(self) -> Dict[str, Any]:
"""Get ingestion pipeline statistics."""
with self.lock:
total_ingested = (
self.stats['ohlcv_ingested'] +
self.stats['orderbook_ingested'] +
self.stats['imbalance_ingested'] +
self.stats['trade_ingested']
)
avg_latency = (
self.stats['total_latency_ms'] / total_ingested
if total_ingested > 0 else 0
)
return {
**self.stats,
'total_ingested': total_ingested,
'avg_latency_ms': round(avg_latency, 2),
'buffer_sizes': {
'ohlcv': len(self.ohlcv_buffer),
'orderbook': len(self.orderbook_buffer),
'imbalance': len(self.imbalance_buffer),
'trade': len(self.trade_buffer)
},
'is_running': self.is_running
}