603 lines
22 KiB
Python
603 lines
22 KiB
Python
"""
|
|
Data Ingestion Pipeline for unified storage system.
|
|
Handles real-time data ingestion with batch writes to database.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
from collections import deque
|
|
from threading import Lock
|
|
|
|
from .unified_cache_manager import DataCacheManager
|
|
from .unified_database_manager import DatabaseConnectionManager
|
|
from .unified_data_validator import DataValidator
|
|
from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DataIngestionPipeline:
|
|
"""
|
|
Handles real-time data ingestion from WebSocket sources.
|
|
Writes to cache immediately, persists to DB asynchronously.
|
|
|
|
Features:
|
|
- Immediate cache writes (<1ms)
|
|
- Batch database writes (100 items or 5 seconds)
|
|
- Data validation before storage
|
|
- Retry logic with exponential backoff
|
|
- Performance monitoring
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
cache_manager: DataCacheManager,
|
|
db_connection_manager: DatabaseConnectionManager,
|
|
batch_size: int = 100,
|
|
batch_timeout_seconds: float = 5.0
|
|
):
|
|
"""
|
|
Initialize ingestion pipeline.
|
|
|
|
Args:
|
|
cache_manager: Cache manager instance
|
|
db_connection_manager: Database connection manager
|
|
batch_size: Number of items before flushing to DB
|
|
batch_timeout_seconds: Seconds before flushing to DB
|
|
"""
|
|
self.cache = cache_manager
|
|
self.db = db_connection_manager
|
|
|
|
# Batch write settings
|
|
self.batch_size = batch_size
|
|
self.batch_timeout = batch_timeout_seconds
|
|
|
|
# Batch write buffers with locks
|
|
self.lock = Lock()
|
|
self.ohlcv_buffer: List[Dict] = []
|
|
self.orderbook_buffer: List[Dict] = []
|
|
self.imbalance_buffer: List[Dict] = []
|
|
self.trade_buffer: List[Dict] = []
|
|
|
|
# Last flush times
|
|
self.last_ohlcv_flush = time.time()
|
|
self.last_orderbook_flush = time.time()
|
|
self.last_imbalance_flush = time.time()
|
|
self.last_trade_flush = time.time()
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
'ohlcv_ingested': 0,
|
|
'orderbook_ingested': 0,
|
|
'imbalance_ingested': 0,
|
|
'trade_ingested': 0,
|
|
'ohlcv_validated': 0,
|
|
'orderbook_validated': 0,
|
|
'trade_validated': 0,
|
|
'validation_failures': 0,
|
|
'db_writes': 0,
|
|
'db_write_failures': 0,
|
|
'cache_writes': 0,
|
|
'total_latency_ms': 0.0
|
|
}
|
|
|
|
# Background flush task
|
|
self.flush_task = None
|
|
self.is_running = False
|
|
|
|
logger.info(f"DataIngestionPipeline initialized (batch_size={batch_size}, timeout={batch_timeout}s)")
|
|
|
|
def start(self):
|
|
"""Start background flush task."""
|
|
if not self.is_running:
|
|
self.is_running = True
|
|
self.flush_task = asyncio.create_task(self._background_flush_worker())
|
|
logger.info("Ingestion pipeline started")
|
|
|
|
async def stop(self):
|
|
"""Stop background flush task and flush remaining data."""
|
|
self.is_running = False
|
|
|
|
if self.flush_task:
|
|
self.flush_task.cancel()
|
|
try:
|
|
await self.flush_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Flush remaining data
|
|
await self._flush_all_buffers()
|
|
|
|
logger.info("Ingestion pipeline stopped")
|
|
|
|
async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
|
|
"""
|
|
Ingest OHLCV candle.
|
|
1. Validate data
|
|
2. Add to cache immediately
|
|
3. Buffer for batch write to DB
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
candle: Candle dictionary
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Validate candle
|
|
is_valid, error = DataValidator.validate_ohlcv(candle)
|
|
if not is_valid:
|
|
logger.warning(f"Invalid OHLCV candle for {symbol} {timeframe}: {error}")
|
|
self.stats['validation_failures'] += 1
|
|
return
|
|
|
|
self.stats['ohlcv_validated'] += 1
|
|
|
|
# Sanitize data
|
|
candle = DataValidator.sanitize_ohlcv(candle)
|
|
|
|
# Immediate cache write
|
|
self.cache.add_ohlcv_candle(symbol, timeframe, candle)
|
|
self.stats['cache_writes'] += 1
|
|
|
|
# Buffer for DB write
|
|
with self.lock:
|
|
self.ohlcv_buffer.append({
|
|
'symbol': symbol,
|
|
'timeframe': timeframe,
|
|
**candle
|
|
})
|
|
|
|
# Check if buffer is full
|
|
if len(self.ohlcv_buffer) >= self.batch_size:
|
|
asyncio.create_task(self._flush_ohlcv_buffer())
|
|
|
|
self.stats['ohlcv_ingested'] += 1
|
|
|
|
# Track latency
|
|
latency_ms = (time.time() - start_time) * 1000
|
|
self.stats['total_latency_ms'] += latency_ms
|
|
|
|
logger.debug(f"Ingested OHLCV candle: {symbol} {timeframe} ({latency_ms:.2f}ms)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ingesting OHLCV candle: {e}")
|
|
|
|
async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict):
|
|
"""
|
|
Ingest order book snapshot.
|
|
1. Validate data
|
|
2. Add to cache immediately
|
|
3. Buffer for batch write to DB
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
snapshot: Order book snapshot dictionary
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Validate order book
|
|
is_valid, error = DataValidator.validate_orderbook(snapshot)
|
|
if not is_valid:
|
|
logger.warning(f"Invalid order book for {symbol}: {error}")
|
|
self.stats['validation_failures'] += 1
|
|
return
|
|
|
|
self.stats['orderbook_validated'] += 1
|
|
|
|
# Sanitize data
|
|
snapshot = DataValidator.sanitize_orderbook(snapshot)
|
|
|
|
# Immediate cache write
|
|
self.cache.add_orderbook_snapshot(symbol, snapshot)
|
|
self.stats['cache_writes'] += 1
|
|
|
|
# Buffer for DB write
|
|
with self.lock:
|
|
self.orderbook_buffer.append({
|
|
'symbol': symbol,
|
|
**snapshot
|
|
})
|
|
|
|
# Check if buffer is full
|
|
if len(self.orderbook_buffer) >= self.batch_size:
|
|
asyncio.create_task(self._flush_orderbook_buffer())
|
|
|
|
self.stats['orderbook_ingested'] += 1
|
|
|
|
# Track latency
|
|
latency_ms = (time.time() - start_time) * 1000
|
|
self.stats['total_latency_ms'] += latency_ms
|
|
|
|
logger.debug(f"Ingested order book snapshot: {symbol} ({latency_ms:.2f}ms)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ingesting order book snapshot: {e}")
|
|
|
|
async def ingest_imbalance_data(self, symbol: str, imbalance: Dict):
|
|
"""
|
|
Ingest order book imbalance metrics.
|
|
1. Validate data
|
|
2. Add to cache immediately
|
|
3. Buffer for batch write to DB
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
imbalance: Imbalance metrics dictionary
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Validate imbalances
|
|
is_valid, error = DataValidator.validate_imbalances(imbalance)
|
|
if not is_valid:
|
|
logger.warning(f"Invalid imbalances for {symbol}: {error}")
|
|
self.stats['validation_failures'] += 1
|
|
return
|
|
|
|
# Immediate cache write
|
|
self.cache.add_imbalance_data(symbol, imbalance)
|
|
self.stats['cache_writes'] += 1
|
|
|
|
# Buffer for DB write
|
|
with self.lock:
|
|
self.imbalance_buffer.append({
|
|
'symbol': symbol,
|
|
**imbalance
|
|
})
|
|
|
|
# Check if buffer is full
|
|
if len(self.imbalance_buffer) >= self.batch_size:
|
|
asyncio.create_task(self._flush_imbalance_buffer())
|
|
|
|
self.stats['imbalance_ingested'] += 1
|
|
|
|
# Track latency
|
|
latency_ms = (time.time() - start_time) * 1000
|
|
self.stats['total_latency_ms'] += latency_ms
|
|
|
|
logger.debug(f"Ingested imbalance data: {symbol} ({latency_ms:.2f}ms)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ingesting imbalance data: {e}")
|
|
|
|
async def ingest_trade(self, symbol: str, trade: Dict):
|
|
"""
|
|
Ingest trade event.
|
|
1. Validate data
|
|
2. Add to cache immediately
|
|
3. Buffer for batch write to DB
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
trade: Trade event dictionary
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Validate trade
|
|
is_valid, error = DataValidator.validate_trade(trade)
|
|
if not is_valid:
|
|
logger.warning(f"Invalid trade for {symbol}: {error}")
|
|
self.stats['validation_failures'] += 1
|
|
return
|
|
|
|
self.stats['trade_validated'] += 1
|
|
|
|
# Immediate cache write
|
|
self.cache.add_trade(symbol, trade)
|
|
self.stats['cache_writes'] += 1
|
|
|
|
# Buffer for DB write
|
|
with self.lock:
|
|
self.trade_buffer.append({
|
|
'symbol': symbol,
|
|
**trade
|
|
})
|
|
|
|
# Check if buffer is full
|
|
if len(self.trade_buffer) >= self.batch_size:
|
|
asyncio.create_task(self._flush_trade_buffer())
|
|
|
|
self.stats['trade_ingested'] += 1
|
|
|
|
# Track latency
|
|
latency_ms = (time.time() - start_time) * 1000
|
|
self.stats['total_latency_ms'] += latency_ms
|
|
|
|
logger.debug(f"Ingested trade: {symbol} ({latency_ms:.2f}ms)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error ingesting trade: {e}")
|
|
|
|
async def _flush_ohlcv_buffer(self):
|
|
"""Batch write OHLCV data to database."""
|
|
with self.lock:
|
|
if not self.ohlcv_buffer:
|
|
return
|
|
|
|
# Get buffer contents and clear
|
|
buffer_copy = self.ohlcv_buffer.copy()
|
|
self.ohlcv_buffer.clear()
|
|
self.last_ohlcv_flush = time.time()
|
|
|
|
try:
|
|
# Prepare batch insert
|
|
values = []
|
|
for item in buffer_copy:
|
|
values.append((
|
|
item.get('timestamp', datetime.now(timezone.utc)),
|
|
item['symbol'],
|
|
item['timeframe'],
|
|
float(item['open_price']),
|
|
float(item['high_price']),
|
|
float(item['low_price']),
|
|
float(item['close_price']),
|
|
float(item['volume']),
|
|
int(item.get('trade_count', 0))
|
|
))
|
|
|
|
# Batch insert with conflict handling
|
|
query = """
|
|
INSERT INTO ohlcv_data
|
|
(timestamp, symbol, timeframe, open_price, high_price,
|
|
low_price, close_price, volume, trade_count)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE
|
|
SET close_price = EXCLUDED.close_price,
|
|
high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price),
|
|
low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price),
|
|
volume = ohlcv_data.volume + EXCLUDED.volume,
|
|
trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count
|
|
"""
|
|
|
|
await self.db.executemany(query, values)
|
|
|
|
self.stats['db_writes'] += 1
|
|
logger.debug(f"Flushed {len(values)} OHLCV candles to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error flushing OHLCV buffer: {e}")
|
|
self.stats['db_write_failures'] += 1
|
|
|
|
# Re-add to buffer for retry
|
|
with self.lock:
|
|
self.ohlcv_buffer.extend(buffer_copy)
|
|
|
|
async def _flush_orderbook_buffer(self):
|
|
"""Batch write order book data to database."""
|
|
with self.lock:
|
|
if not self.orderbook_buffer:
|
|
return
|
|
|
|
buffer_copy = self.orderbook_buffer.copy()
|
|
self.orderbook_buffer.clear()
|
|
self.last_orderbook_flush = time.time()
|
|
|
|
try:
|
|
# Prepare batch insert
|
|
values = []
|
|
for item in buffer_copy:
|
|
import json
|
|
values.append((
|
|
item.get('timestamp', datetime.now(timezone.utc)),
|
|
item['symbol'],
|
|
item.get('exchange', 'binance'),
|
|
json.dumps(item.get('bids', [])),
|
|
json.dumps(item.get('asks', [])),
|
|
float(item.get('mid_price', 0)),
|
|
float(item.get('spread', 0)),
|
|
float(item.get('bid_volume', 0)),
|
|
float(item.get('ask_volume', 0)),
|
|
item.get('sequence_id')
|
|
))
|
|
|
|
query = """
|
|
INSERT INTO order_book_snapshots
|
|
(timestamp, symbol, exchange, bids, asks, mid_price, spread,
|
|
bid_volume, ask_volume, sequence_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
ON CONFLICT (timestamp, symbol, exchange) DO UPDATE
|
|
SET bids = EXCLUDED.bids,
|
|
asks = EXCLUDED.asks,
|
|
mid_price = EXCLUDED.mid_price,
|
|
spread = EXCLUDED.spread,
|
|
bid_volume = EXCLUDED.bid_volume,
|
|
ask_volume = EXCLUDED.ask_volume
|
|
"""
|
|
|
|
await self.db.executemany(query, values)
|
|
|
|
self.stats['db_writes'] += 1
|
|
logger.debug(f"Flushed {len(values)} order book snapshots to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error flushing order book buffer: {e}")
|
|
self.stats['db_write_failures'] += 1
|
|
|
|
with self.lock:
|
|
self.orderbook_buffer.extend(buffer_copy)
|
|
|
|
async def _flush_imbalance_buffer(self):
|
|
"""Batch write imbalance data to database."""
|
|
with self.lock:
|
|
if not self.imbalance_buffer:
|
|
return
|
|
|
|
buffer_copy = self.imbalance_buffer.copy()
|
|
self.imbalance_buffer.clear()
|
|
self.last_imbalance_flush = time.time()
|
|
|
|
try:
|
|
values = []
|
|
for item in buffer_copy:
|
|
values.append((
|
|
item.get('timestamp', datetime.now(timezone.utc)),
|
|
item['symbol'],
|
|
float(item.get('imbalance_1s', 0)),
|
|
float(item.get('imbalance_5s', 0)),
|
|
float(item.get('imbalance_15s', 0)),
|
|
float(item.get('imbalance_60s', 0)),
|
|
float(item.get('volume_imbalance_1s', 0)),
|
|
float(item.get('volume_imbalance_5s', 0)),
|
|
float(item.get('volume_imbalance_15s', 0)),
|
|
float(item.get('volume_imbalance_60s', 0)),
|
|
float(item.get('price_range', 0))
|
|
))
|
|
|
|
query = """
|
|
INSERT INTO order_book_imbalances
|
|
(timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s,
|
|
volume_imbalance_1s, volume_imbalance_5s, volume_imbalance_15s, volume_imbalance_60s,
|
|
price_range)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
ON CONFLICT (timestamp, symbol) DO UPDATE
|
|
SET imbalance_1s = EXCLUDED.imbalance_1s,
|
|
imbalance_5s = EXCLUDED.imbalance_5s,
|
|
imbalance_15s = EXCLUDED.imbalance_15s,
|
|
imbalance_60s = EXCLUDED.imbalance_60s,
|
|
volume_imbalance_1s = EXCLUDED.volume_imbalance_1s,
|
|
volume_imbalance_5s = EXCLUDED.volume_imbalance_5s,
|
|
volume_imbalance_15s = EXCLUDED.volume_imbalance_15s,
|
|
volume_imbalance_60s = EXCLUDED.volume_imbalance_60s
|
|
"""
|
|
|
|
await self.db.executemany(query, values)
|
|
|
|
self.stats['db_writes'] += 1
|
|
logger.debug(f"Flushed {len(values)} imbalance entries to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error flushing imbalance buffer: {e}")
|
|
self.stats['db_write_failures'] += 1
|
|
|
|
with self.lock:
|
|
self.imbalance_buffer.extend(buffer_copy)
|
|
|
|
async def _flush_trade_buffer(self):
|
|
"""Batch write trade data to database."""
|
|
with self.lock:
|
|
if not self.trade_buffer:
|
|
return
|
|
|
|
buffer_copy = self.trade_buffer.copy()
|
|
self.trade_buffer.clear()
|
|
self.last_trade_flush = time.time()
|
|
|
|
try:
|
|
values = []
|
|
for item in buffer_copy:
|
|
values.append((
|
|
item.get('timestamp', datetime.now(timezone.utc)),
|
|
item['symbol'],
|
|
item.get('exchange', 'binance'),
|
|
float(item['price']),
|
|
float(item['size']),
|
|
item['side'],
|
|
str(item['trade_id']),
|
|
item.get('is_buyer_maker', False)
|
|
))
|
|
|
|
query = """
|
|
INSERT INTO trade_events
|
|
(timestamp, symbol, exchange, price, size, side, trade_id, is_buyer_maker)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING
|
|
"""
|
|
|
|
await self.db.executemany(query, values)
|
|
|
|
self.stats['db_writes'] += 1
|
|
logger.debug(f"Flushed {len(values)} trades to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error flushing trade buffer: {e}")
|
|
self.stats['db_write_failures'] += 1
|
|
|
|
with self.lock:
|
|
self.trade_buffer.extend(buffer_copy)
|
|
|
|
async def _background_flush_worker(self):
|
|
"""Background worker to flush buffers based on timeout."""
|
|
logger.info("Background flush worker started")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await asyncio.sleep(1) # Check every second
|
|
|
|
current_time = time.time()
|
|
|
|
# Check OHLCV buffer timeout
|
|
if current_time - self.last_ohlcv_flush >= self.batch_timeout:
|
|
if self.ohlcv_buffer:
|
|
await self._flush_ohlcv_buffer()
|
|
|
|
# Check order book buffer timeout
|
|
if current_time - self.last_orderbook_flush >= self.batch_timeout:
|
|
if self.orderbook_buffer:
|
|
await self._flush_orderbook_buffer()
|
|
|
|
# Check imbalance buffer timeout
|
|
if current_time - self.last_imbalance_flush >= self.batch_timeout:
|
|
if self.imbalance_buffer:
|
|
await self._flush_imbalance_buffer()
|
|
|
|
# Check trade buffer timeout
|
|
if current_time - self.last_trade_flush >= self.batch_timeout:
|
|
if self.trade_buffer:
|
|
await self._flush_trade_buffer()
|
|
|
|
# Auto-evict old cache data
|
|
self.cache.auto_evict_if_needed()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in background flush worker: {e}")
|
|
|
|
logger.info("Background flush worker stopped")
|
|
|
|
async def _flush_all_buffers(self):
|
|
"""Flush all buffers to database."""
|
|
logger.info("Flushing all buffers...")
|
|
|
|
await self._flush_ohlcv_buffer()
|
|
await self._flush_orderbook_buffer()
|
|
await self._flush_imbalance_buffer()
|
|
await self._flush_trade_buffer()
|
|
|
|
logger.info("All buffers flushed")
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get ingestion pipeline statistics."""
|
|
with self.lock:
|
|
total_ingested = (
|
|
self.stats['ohlcv_ingested'] +
|
|
self.stats['orderbook_ingested'] +
|
|
self.stats['imbalance_ingested'] +
|
|
self.stats['trade_ingested']
|
|
)
|
|
|
|
avg_latency = (
|
|
self.stats['total_latency_ms'] / total_ingested
|
|
if total_ingested > 0 else 0
|
|
)
|
|
|
|
return {
|
|
**self.stats,
|
|
'total_ingested': total_ingested,
|
|
'avg_latency_ms': round(avg_latency, 2),
|
|
'buffer_sizes': {
|
|
'ohlcv': len(self.ohlcv_buffer),
|
|
'orderbook': len(self.orderbook_buffer),
|
|
'imbalance': len(self.imbalance_buffer),
|
|
'trade': len(self.trade_buffer)
|
|
},
|
|
'is_running': self.is_running
|
|
}
|