""" Unified Data Storage Schema Manager Extends COBY schema with additional tables for unified data storage system. """ import logging from typing import Dict, List, Optional from datetime import datetime import asyncpg logger = logging.getLogger(__name__) class UnifiedStorageSchemaManager: """ Manages unified data storage schema with TimescaleDB. Extends existing COBY schema with additional tables for: - Enhanced OHLCV with technical indicators - Order book 1s aggregations with price buckets - Multi-timeframe order book imbalances """ def __init__(self, connection_pool): self.pool = connection_pool self.current_version = "2.0.0" async def initialize_schema_tracking(self) -> None: """Initialize schema version tracking table.""" try: async with self.pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version VARCHAR(20) PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), description TEXT, checksum VARCHAR(64) ); """) logger.info("Schema tracking initialized") except Exception as e: logger.error(f"Failed to initialize schema tracking: {e}") raise async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool: """Apply a database migration.""" try: async with self.pool.acquire() as conn: async with conn.transaction(): # Check if migration already applied existing = await conn.fetchval(""" SELECT version FROM schema_migrations WHERE version = $1 """, version) if existing: logger.info(f"Migration {version} already applied") return True # Apply migration commands for sql_command in sql_commands: try: await conn.execute(sql_command) except Exception as cmd_error: logger.error(f"Error executing command: {sql_command[:100]}... Error: {cmd_error}") raise # Record migration await conn.execute(""" INSERT INTO schema_migrations (version, description) VALUES ($1, $2) """, version, description) logger.info(f"Applied migration {version}: {description}") return True except Exception as e: logger.error(f"Failed to apply migration {version}: {e}") return False async def create_enhanced_ohlcv_table(self) -> bool: """Create enhanced OHLCV table with technical indicators.""" migration_commands = [ """ CREATE TABLE IF NOT EXISTS ohlcv_data ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(10) NOT NULL, open_price DECIMAL(20,8) NOT NULL, high_price DECIMAL(20,8) NOT NULL, low_price DECIMAL(20,8) NOT NULL, close_price DECIMAL(20,8) NOT NULL, volume DECIMAL(30,8) NOT NULL, trade_count INTEGER DEFAULT 0, -- Technical Indicators (pre-calculated) rsi_14 DECIMAL(10,4), macd DECIMAL(20,8), macd_signal DECIMAL(20,8), macd_histogram DECIMAL(20,8), bb_upper DECIMAL(20,8), bb_middle DECIMAL(20,8), bb_lower DECIMAL(20,8), ema_12 DECIMAL(20,8), ema_26 DECIMAL(20,8), sma_20 DECIMAL(20,8), PRIMARY KEY (timestamp, symbol, timeframe) ); """ ] return await self.apply_migration( "2.0.0", "Create enhanced OHLCV table with technical indicators", migration_commands ) async def create_order_book_tables(self) -> bool: """Create order book related tables.""" migration_commands = [ # Order book snapshots """ CREATE TABLE IF NOT EXISTS order_book_snapshots ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL DEFAULT 'binance', bids JSONB NOT NULL, asks JSONB NOT NULL, mid_price DECIMAL(20,8), spread DECIMAL(20,8), bid_volume DECIMAL(30,8), ask_volume DECIMAL(30,8), sequence_id BIGINT, PRIMARY KEY (timestamp, symbol, exchange) ); """, # Order book 1s aggregations with price buckets """ CREATE TABLE IF NOT EXISTS order_book_1s_agg ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, price_bucket DECIMAL(20,2) NOT NULL, bid_volume DECIMAL(30,8) DEFAULT 0, ask_volume DECIMAL(30,8) DEFAULT 0, bid_count INTEGER DEFAULT 0, ask_count INTEGER DEFAULT 0, imbalance DECIMAL(10,6) DEFAULT 0, PRIMARY KEY (timestamp, symbol, price_bucket) ); """, # Multi-timeframe order book imbalances """ CREATE TABLE IF NOT EXISTS order_book_imbalances ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, imbalance_1s DECIMAL(10,6) DEFAULT 0, imbalance_5s DECIMAL(10,6) DEFAULT 0, imbalance_15s DECIMAL(10,6) DEFAULT 0, imbalance_60s DECIMAL(10,6) DEFAULT 0, volume_imbalance_1s DECIMAL(10,6) DEFAULT 0, volume_imbalance_5s DECIMAL(10,6) DEFAULT 0, volume_imbalance_15s DECIMAL(10,6) DEFAULT 0, volume_imbalance_60s DECIMAL(10,6) DEFAULT 0, price_range DECIMAL(10,2), PRIMARY KEY (timestamp, symbol) ); """ ] return await self.apply_migration( "2.0.1", "Create order book tables (snapshots, 1s aggregations, imbalances)", migration_commands ) async def create_trade_events_table(self) -> bool: """Create trade events table.""" migration_commands = [ """ CREATE TABLE IF NOT EXISTS trade_events ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL DEFAULT 'binance', price DECIMAL(20,8) NOT NULL, size DECIMAL(30,8) NOT NULL, side VARCHAR(4) NOT NULL, trade_id VARCHAR(100) NOT NULL, is_buyer_maker BOOLEAN, PRIMARY KEY (timestamp, symbol, exchange, trade_id) ); """ ] return await self.apply_migration( "2.0.2", "Create trade events table", migration_commands ) async def create_hypertables(self) -> bool: """Convert tables to TimescaleDB hypertables.""" hypertable_commands = [ "SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('order_book_1s_agg', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('order_book_imbalances', 'timestamp', if_not_exists => TRUE);", "SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);" ] return await self.apply_migration( "2.0.3", "Convert tables to TimescaleDB hypertables", hypertable_commands ) async def create_indexes(self) -> bool: """Create performance indexes.""" index_commands = [ # OHLCV indexes "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_tf_ts ON ohlcv_data (symbol, timeframe, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_ts ON ohlcv_data (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_ohlcv_tf_ts ON ohlcv_data (timeframe, timestamp DESC);", # Order book snapshots indexes "CREATE INDEX IF NOT EXISTS idx_obs_symbol_ts ON order_book_snapshots (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_obs_exchange_ts ON order_book_snapshots (exchange, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange_ts ON order_book_snapshots (symbol, exchange, timestamp DESC);", # Order book 1s aggregation indexes "CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_ts ON order_book_1s_agg (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_bucket_ts ON order_book_1s_agg (symbol, price_bucket, timestamp DESC);", # Order book imbalances indexes "CREATE INDEX IF NOT EXISTS idx_obi_symbol_ts ON order_book_imbalances (symbol, timestamp DESC);", # Trade events indexes "CREATE INDEX IF NOT EXISTS idx_trades_symbol_ts ON trade_events (symbol, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_trades_exchange_ts ON trade_events (exchange, timestamp DESC);", "CREATE INDEX IF NOT EXISTS idx_trades_symbol_side_ts ON trade_events (symbol, side, timestamp DESC);" ] return await self.apply_migration( "2.0.4", "Create performance indexes", index_commands ) async def create_continuous_aggregates(self) -> bool: """Create continuous aggregates for multi-timeframe data.""" aggregate_commands = [ # 1m OHLCV from 1s data """ CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1m_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', timestamp) AS timestamp, symbol, '1m' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '1s' GROUP BY time_bucket('1 minute', timestamp), symbol WITH NO DATA; """, # 5m OHLCV from 1m data """ CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_5m_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('5 minutes', timestamp) AS timestamp, symbol, '5m' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '1m' GROUP BY time_bucket('5 minutes', timestamp), symbol WITH NO DATA; """, # 15m OHLCV from 5m data """ CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_15m_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('15 minutes', timestamp) AS timestamp, symbol, '15m' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '5m' GROUP BY time_bucket('15 minutes', timestamp), symbol WITH NO DATA; """, # 1h OHLCV from 15m data """ CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1h_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('1 hour', timestamp) AS timestamp, symbol, '1h' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '15m' GROUP BY time_bucket('1 hour', timestamp), symbol WITH NO DATA; """, # 1d OHLCV from 1h data """ CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1d_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('1 day', timestamp) AS timestamp, symbol, '1d' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '1h' GROUP BY time_bucket('1 day', timestamp), symbol WITH NO DATA; """ ] return await self.apply_migration( "2.0.5", "Create continuous aggregates for multi-timeframe OHLCV", aggregate_commands ) async def setup_compression_policies(self) -> bool: """Set up compression policies for efficient storage.""" compression_commands = [ # Compress OHLCV data older than 7 days "SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days', if_not_exists => TRUE);", # Compress order book snapshots older than 1 day "SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day', if_not_exists => TRUE);", # Compress order book 1s aggregations older than 2 days "SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days', if_not_exists => TRUE);", # Compress order book imbalances older than 2 days "SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days', if_not_exists => TRUE);", # Compress trade events older than 7 days "SELECT add_compression_policy('trade_events', INTERVAL '7 days', if_not_exists => TRUE);" ] return await self.apply_migration( "2.0.6", "Setup compression policies", compression_commands ) async def setup_retention_policies(self) -> bool: """Set up data retention policies.""" retention_commands = [ # Retain OHLCV data for 2 years "SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years', if_not_exists => TRUE);", # Retain order book snapshots for 30 days "SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);", # Retain order book 1s aggregations for 60 days "SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days', if_not_exists => TRUE);", # Retain order book imbalances for 60 days "SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days', if_not_exists => TRUE);", # Retain trade events for 90 days "SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);" ] return await self.apply_migration( "2.0.7", "Setup retention policies", retention_commands ) async def setup_complete_schema(self) -> bool: """Set up the complete unified storage schema.""" try: logger.info("Setting up unified storage schema...") # Initialize schema tracking await self.initialize_schema_tracking() # Apply all migrations in order migrations = [ ("Enhanced OHLCV table", self.create_enhanced_ohlcv_table), ("Order book tables", self.create_order_book_tables), ("Trade events table", self.create_trade_events_table), ("Hypertables", self.create_hypertables), ("Indexes", self.create_indexes), ("Continuous aggregates", self.create_continuous_aggregates), ("Compression policies", self.setup_compression_policies), ("Retention policies", self.setup_retention_policies), ] for name, migration_func in migrations: logger.info(f"Applying migration: {name}") success = await migration_func() if not success: logger.error(f"Failed to apply migration: {name}") return False logger.info(f"Successfully applied migration: {name}") logger.info("Complete unified storage schema setup successful") return True except Exception as e: logger.error(f"Failed to setup complete schema: {e}") return False async def get_schema_info(self) -> Dict: """Get information about the current schema state.""" try: async with self.pool.acquire() as conn: # Get applied migrations migrations = await conn.fetch(""" SELECT version, applied_at, description FROM schema_migrations ORDER BY applied_at """) # Get table information tables = await conn.fetch(""" SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' AND tablename IN ( 'ohlcv_data', 'order_book_snapshots', 'order_book_1s_agg', 'order_book_imbalances', 'trade_events' ) ORDER BY tablename """) # Get hypertable information hypertables = await conn.fetch(""" SELECT hypertable_name, num_chunks, compression_enabled, pg_size_pretty(total_bytes) as total_size, pg_size_pretty(compressed_total_bytes) as compressed_size FROM timescaledb_information.hypertables WHERE hypertable_schema = 'public' ORDER BY hypertable_name """) # Get continuous aggregates continuous_aggs = await conn.fetch(""" SELECT view_name, materialization_hypertable_name, pg_size_pretty(total_bytes) as size FROM timescaledb_information.continuous_aggregates WHERE view_schema = 'public' ORDER BY view_name """) return { "migrations": [dict(m) for m in migrations], "tables": [dict(t) for t in tables], "hypertables": [dict(h) for h in hypertables], "continuous_aggregates": [dict(c) for c in continuous_aggs] } except Exception as e: logger.error(f"Failed to get schema info: {e}") return {} async def verify_schema(self) -> bool: """Verify that all required tables and indexes exist.""" try: async with self.pool.acquire() as conn: # Check required tables required_tables = [ 'ohlcv_data', 'order_book_snapshots', 'order_book_1s_agg', 'order_book_imbalances', 'trade_events' ] for table in required_tables: exists = await conn.fetchval(""" SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = $1 ) """, table) if not exists: logger.error(f"Required table missing: {table}") return False # Check hypertables for table in required_tables: is_hypertable = await conn.fetchval(""" SELECT EXISTS ( SELECT FROM timescaledb_information.hypertables WHERE hypertable_schema = 'public' AND hypertable_name = $1 ) """, table) if not is_hypertable: logger.error(f"Table is not a hypertable: {table}") return False logger.info("Schema verification successful") return True except Exception as e: logger.error(f"Schema verification failed: {e}") return False