""" TimescaleDB storage manager implementation. """ import json from datetime import datetime from typing import List, Dict, Optional, Any from ..interfaces.storage_manager import StorageManager from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData, SystemMetrics, PriceLevel from ..utils.logging import get_logger, set_correlation_id from ..utils.exceptions import StorageError, ValidationError from ..utils.timing import get_current_timestamp from .connection_pool import db_pool from .schema import DatabaseSchema logger = get_logger(__name__) class TimescaleManager(StorageManager): """TimescaleDB implementation of StorageManager interface""" def __init__(self): self._schema_initialized = False async def initialize(self) -> None: """Initialize the storage manager""" await db_pool.initialize() await self.setup_database_schema() logger.info("TimescaleDB storage manager initialized") async def close(self) -> None: """Close the storage manager""" await db_pool.close() logger.info("TimescaleDB storage manager closed") def setup_database_schema(self) -> None: """Set up database schema and tables""" async def _setup(): if self._schema_initialized: return try: queries = DatabaseSchema.get_all_creation_queries() for query in queries: try: await db_pool.execute_command(query) logger.debug(f"Executed schema query: {query[:50]}...") except Exception as e: # Log but continue - some queries might fail if already exists logger.warning(f"Schema query failed (continuing): {e}") self._schema_initialized = True logger.info("Database schema setup completed") except Exception as e: logger.error(f"Failed to setup database schema: {e}") raise StorageError(f"Schema setup failed: {e}", "SCHEMA_SETUP_ERROR") # Run async setup import asyncio if asyncio.get_event_loop().is_running(): asyncio.create_task(_setup()) else: asyncio.run(_setup()) async def store_orderbook(self, data: OrderBookSnapshot) -> bool: """Store order book snapshot to database""" try: set_correlation_id() # Convert price levels to JSON bids_json = json.dumps([ {"price": float(level.price), "size": float(level.size), "count": level.count} for level in data.bids ]) asks_json = json.dumps([ {"price": float(level.price), "size": float(level.size), "count": level.count} for level in data.asks ]) query = """ INSERT INTO market_data.order_book_snapshots (symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread, bid_volume, ask_volume) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) """ await db_pool.execute_command( query, data.symbol, data.exchange, data.timestamp, bids_json, asks_json, data.sequence_id, float(data.mid_price) if data.mid_price else None, float(data.spread) if data.spread else None, float(data.bid_volume), float(data.ask_volume) ) logger.debug(f"Stored order book: {data.symbol}@{data.exchange}") return True except Exception as e: logger.error(f"Failed to store order book: {e}") return False async def store_trade(self, data: TradeEvent) -> bool: """Store trade event to database""" try: set_correlation_id() query = """ INSERT INTO market_data.trade_events (symbol, exchange, timestamp, price, size, side, trade_id) VALUES ($1, $2, $3, $4, $5, $6, $7) """ await db_pool.execute_command( query, data.symbol, data.exchange, data.timestamp, float(data.price), float(data.size), data.side, data.trade_id ) logger.debug(f"Stored trade: {data.symbol}@{data.exchange} - {data.trade_id}") return True except Exception as e: logger.error(f"Failed to store trade: {e}") return False async def store_heatmap(self, data: HeatmapData) -> bool: """Store heatmap data to database""" try: set_correlation_id() # Store each heatmap point for point in data.data: query = """ INSERT INTO market_data.heatmap_data (symbol, timestamp, bucket_size, price_bucket, volume, side, exchange_count, exchanges) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (timestamp, symbol, bucket_size, price_bucket, side) DO UPDATE SET volume = EXCLUDED.volume, exchange_count = EXCLUDED.exchange_count, exchanges = EXCLUDED.exchanges """ await db_pool.execute_command( query, data.symbol, data.timestamp, float(data.bucket_size), float(point.price), float(point.volume), point.side, 1, # exchange_count - will be updated by aggregation json.dumps([]) # exchanges - will be updated by aggregation ) logger.debug(f"Stored heatmap: {data.symbol} with {len(data.data)} points") return True except Exception as e: logger.error(f"Failed to store heatmap: {e}") return False async def store_metrics(self, data: SystemMetrics) -> bool: """Store system metrics to database""" try: set_correlation_id() # Store multiple metrics metrics = [ ('cpu_usage', data.cpu_usage), ('memory_usage', data.memory_usage), ('disk_usage', data.disk_usage), ('database_connections', data.database_connections), ('redis_connections', data.redis_connections), ('active_websockets', data.active_websockets), ('messages_per_second', data.messages_per_second), ('processing_latency', data.processing_latency) ] query = """ INSERT INTO market_data.system_metrics (metric_name, timestamp, value, labels) VALUES ($1, $2, $3, $4) """ for metric_name, value in metrics: await db_pool.execute_command( query, metric_name, data.timestamp, float(value), json.dumps(data.network_io) ) logger.debug("Stored system metrics") return True except Exception as e: logger.error(f"Failed to store metrics: {e}") return False async def get_historical_orderbooks(self, symbol: str, exchange: str, start: datetime, end: datetime, limit: Optional[int] = None) -> List[OrderBookSnapshot]: """Retrieve historical order book data""" try: query = """ SELECT symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread FROM market_data.order_book_snapshots WHERE symbol = $1 AND exchange = $2 AND timestamp >= $3 AND timestamp <= $4 ORDER BY timestamp DESC """ if limit: query += f" LIMIT {limit}" rows = await db_pool.execute_query(query, symbol, exchange, start, end) orderbooks = [] for row in rows: # Parse JSON bid/ask data bids_data = json.loads(row['bids']) asks_data = json.loads(row['asks']) bids = [PriceLevel(price=b['price'], size=b['size'], count=b.get('count')) for b in bids_data] asks = [PriceLevel(price=a['price'], size=a['size'], count=a.get('count')) for a in asks_data] orderbook = OrderBookSnapshot( symbol=row['symbol'], exchange=row['exchange'], timestamp=row['timestamp'], bids=bids, asks=asks, sequence_id=row['sequence_id'] ) orderbooks.append(orderbook) logger.debug(f"Retrieved {len(orderbooks)} historical order books") return orderbooks except Exception as e: logger.error(f"Failed to get historical order books: {e}") return [] async def get_historical_trades(self, symbol: str, exchange: str, start: datetime, end: datetime, limit: Optional[int] = None) -> List[TradeEvent]: """Retrieve historical trade data""" try: query = """ SELECT symbol, exchange, timestamp, price, size, side, trade_id FROM market_data.trade_events WHERE symbol = $1 AND exchange = $2 AND timestamp >= $3 AND timestamp <= $4 ORDER BY timestamp DESC """ if limit: query += f" LIMIT {limit}" rows = await db_pool.execute_query(query, symbol, exchange, start, end) trades = [] for row in rows: trade = TradeEvent( symbol=row['symbol'], exchange=row['exchange'], timestamp=row['timestamp'], price=float(row['price']), size=float(row['size']), side=row['side'], trade_id=row['trade_id'] ) trades.append(trade) logger.debug(f"Retrieved {len(trades)} historical trades") return trades except Exception as e: logger.error(f"Failed to get historical trades: {e}") return [] async def get_latest_orderbook(self, symbol: str, exchange: str) -> Optional[OrderBookSnapshot]: """Get latest order book snapshot""" try: query = """ SELECT symbol, exchange, timestamp, bids, asks, sequence_id FROM market_data.order_book_snapshots WHERE symbol = $1 AND exchange = $2 ORDER BY timestamp DESC LIMIT 1 """ rows = await db_pool.execute_query(query, symbol, exchange) if not rows: return None row = rows[0] bids_data = json.loads(row['bids']) asks_data = json.loads(row['asks']) bids = [PriceLevel(price=b['price'], size=b['size'], count=b.get('count')) for b in bids_data] asks = [PriceLevel(price=a['price'], size=a['size'], count=a.get('count')) for a in asks_data] return OrderBookSnapshot( symbol=row['symbol'], exchange=row['exchange'], timestamp=row['timestamp'], bids=bids, asks=asks, sequence_id=row['sequence_id'] ) except Exception as e: logger.error(f"Failed to get latest order book: {e}") return None async def get_latest_heatmap(self, symbol: str, bucket_size: float) -> Optional[HeatmapData]: """Get latest heatmap data""" try: query = """ SELECT price_bucket, volume, side, timestamp FROM market_data.heatmap_data WHERE symbol = $1 AND bucket_size = $2 AND timestamp = ( SELECT MAX(timestamp) FROM market_data.heatmap_data WHERE symbol = $1 AND bucket_size = $2 ) ORDER BY price_bucket """ rows = await db_pool.execute_query(query, symbol, bucket_size) if not rows: return None from ..models.core import HeatmapPoint heatmap = HeatmapData( symbol=symbol, timestamp=rows[0]['timestamp'], bucket_size=bucket_size ) # Calculate max volume for intensity max_volume = max(float(row['volume']) for row in rows) for row in rows: volume = float(row['volume']) intensity = volume / max_volume if max_volume > 0 else 0.0 point = HeatmapPoint( price=float(row['price_bucket']), volume=volume, intensity=intensity, side=row['side'] ) heatmap.data.append(point) return heatmap except Exception as e: logger.error(f"Failed to get latest heatmap: {e}") return None async def get_ohlcv_data(self, symbol: str, exchange: str, timeframe: str, start: datetime, end: datetime) -> List[Dict[str, Any]]: """Get OHLCV candlestick data""" try: query = """ SELECT timestamp, open_price, high_price, low_price, close_price, volume, trade_count, vwap FROM market_data.ohlcv_data WHERE symbol = $1 AND exchange = $2 AND timeframe = $3 AND timestamp >= $4 AND timestamp <= $5 ORDER BY timestamp """ rows = await db_pool.execute_query(query, symbol, exchange, timeframe, start, end) ohlcv_data = [] for row in rows: ohlcv_data.append({ 'timestamp': row['timestamp'], 'open': float(row['open_price']), 'high': float(row['high_price']), 'low': float(row['low_price']), 'close': float(row['close_price']), 'volume': float(row['volume']), 'trade_count': row['trade_count'], 'vwap': float(row['vwap']) if row['vwap'] else None }) logger.debug(f"Retrieved {len(ohlcv_data)} OHLCV records") return ohlcv_data except Exception as e: logger.error(f"Failed to get OHLCV data: {e}") return [] async def batch_store_orderbooks(self, data: List[OrderBookSnapshot]) -> int: """Store multiple order book snapshots in batch""" if not data: return 0 try: set_correlation_id() # Prepare batch data batch_data = [] for orderbook in data: bids_json = json.dumps([ {"price": float(level.price), "size": float(level.size), "count": level.count} for level in orderbook.bids ]) asks_json = json.dumps([ {"price": float(level.price), "size": float(level.size), "count": level.count} for level in orderbook.asks ]) batch_data.append(( orderbook.symbol, orderbook.exchange, orderbook.timestamp, bids_json, asks_json, orderbook.sequence_id, float(orderbook.mid_price) if orderbook.mid_price else None, float(orderbook.spread) if orderbook.spread else None, float(orderbook.bid_volume), float(orderbook.ask_volume) )) query = """ INSERT INTO market_data.order_book_snapshots (symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread, bid_volume, ask_volume) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) """ await db_pool.execute_many(query, batch_data) logger.debug(f"Batch stored {len(data)} order books") return len(data) except Exception as e: logger.error(f"Failed to batch store order books: {e}") return 0 async def batch_store_trades(self, data: List[TradeEvent]) -> int: """Store multiple trade events in batch""" if not data: return 0 try: set_correlation_id() # Prepare batch data batch_data = [ (trade.symbol, trade.exchange, trade.timestamp, float(trade.price), float(trade.size), trade.side, trade.trade_id) for trade in data ] query = """ INSERT INTO market_data.trade_events (symbol, exchange, timestamp, price, size, side, trade_id) VALUES ($1, $2, $3, $4, $5, $6, $7) """ await db_pool.execute_many(query, batch_data) logger.debug(f"Batch stored {len(data)} trades") return len(data) except Exception as e: logger.error(f"Failed to batch store trades: {e}") return 0 async def cleanup_old_data(self, retention_days: int) -> int: """Clean up old data based on retention policy""" try: cutoff_time = get_current_timestamp().replace( day=get_current_timestamp().day - retention_days ) tables = [ 'order_book_snapshots', 'trade_events', 'heatmap_data', 'exchange_status', 'system_metrics' ] total_deleted = 0 for table in tables: query = f""" DELETE FROM market_data.{table} WHERE timestamp < $1 """ result = await db_pool.execute_command(query, cutoff_time) # Extract number from result like "DELETE 1234" deleted = int(result.split()[-1]) if result.split()[-1].isdigit() else 0 total_deleted += deleted logger.debug(f"Cleaned up {deleted} records from {table}") logger.info(f"Cleaned up {total_deleted} total records older than {retention_days} days") return total_deleted except Exception as e: logger.error(f"Failed to cleanup old data: {e}") return 0 async def get_storage_stats(self) -> Dict[str, Any]: """Get storage statistics""" try: stats = {} # Table sizes size_query = """ SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, pg_total_relation_size(schemaname||'.'||tablename) as size_bytes FROM pg_tables WHERE schemaname = 'market_data' ORDER BY size_bytes DESC """ size_rows = await db_pool.execute_query(size_query) stats['table_sizes'] = [ { 'table': row['tablename'], 'size': row['size'], 'size_bytes': row['size_bytes'] } for row in size_rows ] # Record counts tables = ['order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data', 'exchange_status', 'system_metrics'] record_counts = {} for table in tables: count_query = f"SELECT COUNT(*) as count FROM market_data.{table}" count_rows = await db_pool.execute_query(count_query) record_counts[table] = count_rows[0]['count'] if count_rows else 0 stats['record_counts'] = record_counts # Connection pool stats stats['connection_pool'] = await db_pool.get_pool_stats() return stats except Exception as e: logger.error(f"Failed to get storage stats: {e}") return {} async def health_check(self) -> bool: """Check storage system health""" try: # Check database connection if not await db_pool.health_check(): return False # Check if tables exist query = """ SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = 'market_data' """ rows = await db_pool.execute_query(query) table_count = rows[0]['count'] if rows else 0 if table_count < 6: # We expect 6 main tables logger.warning(f"Expected 6 tables, found {table_count}") return False logger.debug("Storage health check passed") return True except Exception as e: logger.error(f"Storage health check failed: {e}") return False