""" Database schema management and migration system. Handles schema versioning, migrations, and database structure updates. """ import logging from typing import Dict, List, Optional from datetime import datetime import asyncpg logger = logging.getLogger(__name__) class SchemaManager: """Manages database schema versions and migrations.""" def __init__(self, connection_pool): self.pool = connection_pool self.current_version = "1.0.0" async def initialize_schema_tracking(self) -> None: """Initialize schema version tracking table.""" try: async with self.pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(20) PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), description TEXT, checksum VARCHAR(64) ); """) # Record initial schema version await conn.execute(""" INSERT INTO schema_migrations (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING """, self.current_version, "Initial schema setup") logger.info("Schema tracking initialized") except Exception as e: logger.error(f"Failed to initialize schema tracking: {e}") raise async def get_current_schema_version(self) -> Optional[str]: """Get the current schema version from database.""" try: async with self.pool.acquire() as conn: version = await conn.fetchval(""" SELECT version FROM schema_migrations ORDER BY applied_at DESC LIMIT 1 """) return version except Exception as e: logger.error(f"Failed to get schema version: {e}") return None async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool: """Apply a database migration.""" try: async with self.pool.acquire() as conn: async with conn.transaction(): # Check if migration already applied existing = await conn.fetchval(""" SELECT version FROM schema_migrations WHERE version = $1 """, version) if existing: logger.info(f"Migration {version} already applied") return True # Apply migration commands for sql_command in sql_commands: await conn.execute(sql_command) # Record migration await conn.execute(""" INSERT INTO schema_migrations (version, description) VALUES ($1, $2) """, version, description) logger.info(f"Applied migration {version}: {description}") return True except Exception as e: logger.error(f"Failed to apply migration {version}: {e}") return False async def create_base_schema(self) -> bool: """Create the base database schema with all tables and indexes.""" migration_commands = [ # Enable TimescaleDB extension "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;", # Order book snapshots table """ CREATE TABLE IF NOT EXISTS order_book_snapshots ( id BIGSERIAL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, bids JSONB NOT NULL, asks JSONB NOT NULL, sequence_id BIGINT, mid_price DECIMAL(20,8), spread DECIMAL(20,8), bid_volume DECIMAL(30,8), ask_volume DECIMAL(30,8), PRIMARY KEY (timestamp, symbol, exchange) ); """, # Trade events table """ CREATE TABLE IF NOT EXISTS trade_events ( id BIGSERIAL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, price DECIMAL(20,8) NOT NULL, size DECIMAL(30,8) NOT NULL, side VARCHAR(4) NOT NULL, trade_id VARCHAR(100) NOT NULL, PRIMARY KEY (timestamp, symbol, exchange, trade_id) ); """, # Heatmap data table """ CREATE TABLE IF NOT EXISTS heatmap_data ( symbol VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, bucket_size DECIMAL(10,2) NOT NULL, price_bucket DECIMAL(20,8) NOT NULL, volume DECIMAL(30,8) NOT NULL, side VARCHAR(3) NOT NULL, exchange_count INTEGER NOT NULL, PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side) ); """, # OHLCV data table """ CREATE TABLE IF NOT EXISTS ohlcv_data ( symbol VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, timeframe VARCHAR(10) NOT NULL, open_price DECIMAL(20,8) NOT NULL, high_price DECIMAL(20,8) NOT NULL, low_price DECIMAL(20,8) NOT NULL, close_price DECIMAL(20,8) NOT NULL, volume DECIMAL(30,8) NOT NULL, trade_count INTEGER, PRIMARY KEY (timestamp, symbol, timeframe) ); """ ] return await self.apply_migration( "1.0.0", "Create base schema with core tables", migration_commands ) async def create_hypertables(self) -> bool: """Convert tables to TimescaleDB hypertables.""" hypertable_commands = [ "SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('heatmap_data', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);" ] return await self.apply_migration( "1.0.1", "Convert tables to hypertables", hypertable_commands ) async def create_indexes(self) -> bool: """Create performance indexes.""" index_commands = [ # Order book snapshots indexes "CREATE INDEX IF NOT EXISTS idx_obs_symbol_time ON order_book_snapshots (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_obs_exchange_time ON order_book_snapshots (exchange, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange ON order_book_snapshots (symbol, exchange, timestamp DESC);", # Trade events indexes "CREATE INDEX IF NOT EXISTS idx_trades_symbol_time ON trade_events (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_trades_exchange_time ON trade_events (exchange, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_trades_price ON trade_events (symbol, price, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_trades_side ON trade_events (symbol, side, timestamp DESC);", # Heatmap data indexes "CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_time ON heatmap_data (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_heatmap_bucket ON heatmap_data (symbol, bucket_size, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_heatmap_side ON heatmap_data (symbol, side, timestamp DESC);", # OHLCV indexes "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON ohlcv_data (symbol, timeframe, timestamp DESC);" ] return await self.apply_migration( "1.0.2", "Create performance indexes", index_commands ) async def setup_retention_policies(self) -> bool: """Set up data retention policies.""" retention_commands = [ "SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);", "SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);", "SELECT add_retention_policy('heatmap_data', INTERVAL '60 days', if_not_exists => TRUE);", "SELECT add_retention_policy('ohlcv_data', INTERVAL '1 year', if_not_exists => TRUE);" ] return await self.apply_migration( "1.0.3", "Setup data retention policies", retention_commands ) async def create_continuous_aggregates(self) -> bool: """Create continuous aggregates for better query performance.""" aggregate_commands = [ # 1-minute OHLCV aggregates from trades """ CREATE MATERIALIZED VIEW IF NOT EXISTS trades_1m WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', timestamp) AS bucket, symbol, exchange, first(price, timestamp) AS open_price, max(price) AS high_price, min(price) AS low_price, last(price, timestamp) AS close_price, sum(size) AS volume, count(*) AS trade_count FROM trade_events GROUP BY bucket, symbol, exchange; """, # 5-minute order book statistics """ CREATE MATERIALIZED VIEW IF NOT EXISTS orderbook_stats_5m WITH (timescaledb.continuous) AS SELECT time_bucket('5 minutes', timestamp) AS bucket, symbol, exchange, avg(mid_price) AS avg_mid_price, avg(spread) AS avg_spread, avg(bid_volume) AS avg_bid_volume, avg(ask_volume) AS avg_ask_volume, count(*) AS snapshot_count FROM order_book_snapshots WHERE mid_price IS NOT NULL GROUP BY bucket, symbol, exchange; """ ] return await self.apply_migration( "1.0.4", "Create continuous aggregates", aggregate_commands ) async def setup_complete_schema(self) -> bool: """Set up the complete database schema with all components.""" try: # Initialize schema tracking await self.initialize_schema_tracking() # Apply all migrations in order migrations = [ self.create_base_schema, self.create_hypertables, self.create_indexes, self.setup_retention_policies, self.create_continuous_aggregates ] for migration in migrations: success = await migration() if not success: logger.error(f"Failed to apply migration: {migration.__name__}") return False logger.info("Complete database schema setup successful") return True except Exception as e: logger.error(f"Failed to setup complete schema: {e}") return False async def get_schema_info(self) -> Dict: """Get information about the current schema state.""" try: async with self.pool.acquire() as conn: # Get applied migrations migrations = await conn.fetch(""" SELECT version, applied_at, description FROM schema_migrations ORDER BY applied_at """) # Get table information tables = await conn.fetch(""" SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' AND tablename IN ('order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data') """) # Get hypertable information hypertables = await conn.fetch(""" SELECT hypertable_name, num_chunks, compression_enabled FROM timescaledb_information.hypertables WHERE hypertable_schema = 'public' """) return { "migrations": [dict(m) for m in migrations], "tables": [dict(t) for t in tables], "hypertables": [dict(h) for h in hypertables] } except Exception as e: logger.error(f"Failed to get schema info: {e}") return {}