diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index 4b7e6ff..6055d65 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -11,6 +11,8 @@ + + - _Requirements: 1.1, 6.1, 7.3_ - [ ] 2. Implement TimescaleDB integration and database schema diff --git a/COBY/storage/__init__.py b/COBY/storage/__init__.py index 9d35ca4..e0c978e 100644 --- a/COBY/storage/__init__.py +++ b/COBY/storage/__init__.py @@ -1,11 +1,11 @@ """ -Storage layer for the COBY system. +Storage layer for the multi-exchange data aggregation system. +Provides TimescaleDB integration, connection pooling, and schema management. """ from .timescale_manager import TimescaleManager -from .connection_pool import DatabaseConnectionPool +from .connection_pool import ConnectionPoolManager +from .schema import SchemaManager +from .storage_manager import StorageManager -__all__ = [ - 'TimescaleManager', - 'DatabaseConnectionPool' -] \ No newline at end of file +__all__ = ['TimescaleManager', 'ConnectionPoolManager', 'SchemaManager', 'StorageManager'] \ No newline at end of file diff --git a/COBY/storage/connection_pool.py b/COBY/storage/connection_pool.py index 407785a..36350ce 100644 --- a/COBY/storage/connection_pool.py +++ b/COBY/storage/connection_pool.py @@ -1,140 +1,219 @@ """ -Database connection pool management for TimescaleDB. +Database connection pool management with health monitoring and automatic recovery. """ import asyncio -import asyncpg +import logging from typing import Optional, Dict, Any -from contextlib import asynccontextmanager -from ..config import config -from ..utils.logging import get_logger -from ..utils.exceptions import StorageError +from datetime import datetime, timedelta +import asyncpg +from asyncpg import Pool -logger = get_logger(__name__) +from ..config import Config +from ..utils.exceptions import ConnectionError + +logger = logging.getLogger(__name__) -class DatabaseConnectionPool: - """Manages database connection pool for TimescaleDB""" +class ConnectionPoolManager: + """Manages database connection pools with health monitoring and recovery.""" - def __init__(self): - self._pool: Optional[asyncpg.Pool] = None - self._is_initialized = False + def __init__(self, config: Config): + self.config = config + self.pool: Optional[Pool] = None + self._connection_string = self._build_connection_string() + self._health_check_interval = 30 # seconds + self._health_check_task: Optional[asyncio.Task] = None + self._last_health_check = datetime.utcnow() + self._connection_failures = 0 + self._max_failures = 5 + + def _build_connection_string(self) -> str: + """Build PostgreSQL connection string from config.""" + return ( + f"postgresql://{self.config.database.user}:{self.config.database.password}" + f"@{self.config.database.host}:{self.config.database.port}/{self.config.database.name}" + ) async def initialize(self) -> None: - """Initialize the connection pool""" - if self._is_initialized: - return - + """Initialize connection pool with health monitoring.""" try: - # Build connection string - dsn = ( - f"postgresql://{config.database.user}:{config.database.password}" - f"@{config.database.host}:{config.database.port}/{config.database.name}" - ) + logger.info("Creating database connection pool...") - # Create connection pool - self._pool = await asyncpg.create_pool( - dsn, + self.pool = await asyncpg.create_pool( + self._connection_string, min_size=5, - max_size=config.database.pool_size, - max_queries=50000, - max_inactive_connection_lifetime=300, - command_timeout=config.database.pool_timeout, + max_size=self.config.database.pool_size, + command_timeout=60, server_settings={ - 'search_path': config.database.schema, - 'timezone': 'UTC' - } + 'jit': 'off', + 'timezone': 'UTC', + 'statement_timeout': '30s', + 'idle_in_transaction_session_timeout': '60s' + }, + init=self._init_connection ) - self._is_initialized = True - logger.info(f"Database connection pool initialized with {config.database.pool_size} connections") + # Test initial connection + await self._test_connection() - # Test connection - await self.health_check() + # Start health monitoring + self._health_check_task = asyncio.create_task(self._health_monitor()) + + logger.info(f"Database connection pool initialized with {self.config.db_min_connections}-{self.config.db_max_connections} connections") except Exception as e: - logger.error(f"Failed to initialize database connection pool: {e}") - raise StorageError(f"Database connection failed: {e}", "DB_INIT_ERROR") + logger.error(f"Failed to initialize connection pool: {e}") + raise ConnectionError(f"Connection pool initialization failed: {e}") - async def close(self) -> None: - """Close the connection pool""" - if self._pool: - await self._pool.close() - self._pool = None - self._is_initialized = False - logger.info("Database connection pool closed") - - @asynccontextmanager - async def get_connection(self): - """Get a database connection from the pool""" - if not self._is_initialized: - await self.initialize() - - if not self._pool: - raise StorageError("Connection pool not initialized", "POOL_NOT_READY") - - async with self._pool.acquire() as connection: - try: - yield connection - except Exception as e: - logger.error(f"Database operation failed: {e}") - raise - - @asynccontextmanager - async def get_transaction(self): - """Get a database transaction""" - async with self.get_connection() as conn: - async with conn.transaction(): - yield conn - - async def execute_query(self, query: str, *args) -> Any: - """Execute a query and return results""" - async with self.get_connection() as conn: - return await conn.fetch(query, *args) - - async def execute_command(self, command: str, *args) -> str: - """Execute a command and return status""" - async with self.get_connection() as conn: - return await conn.execute(command, *args) - - async def execute_many(self, command: str, args_list) -> None: - """Execute a command multiple times with different arguments""" - async with self.get_connection() as conn: - await conn.executemany(command, args_list) - - async def health_check(self) -> bool: - """Check database health""" + async def _init_connection(self, conn: asyncpg.Connection) -> None: + """Initialize individual database connections.""" try: - async with self.get_connection() as conn: - result = await conn.fetchval("SELECT 1") - if result == 1: - logger.debug("Database health check passed") - return True - else: - logger.warning("Database health check returned unexpected result") - return False + # Set connection-specific settings + await conn.execute("SET timezone = 'UTC'") + await conn.execute("SET statement_timeout = '30s'") + + # Test TimescaleDB extension + result = await conn.fetchval("SELECT extname FROM pg_extension WHERE extname = 'timescaledb'") + if not result: + logger.warning("TimescaleDB extension not found in database") + except Exception as e: - logger.error(f"Database health check failed: {e}") + logger.error(f"Failed to initialize connection: {e}") + raise + + async def _test_connection(self) -> bool: + """Test database connection health.""" + try: + async with self.pool.acquire() as conn: + await conn.execute('SELECT 1') + self._connection_failures = 0 + return True + + except Exception as e: + self._connection_failures += 1 + logger.error(f"Connection test failed (attempt {self._connection_failures}): {e}") + + if self._connection_failures >= self._max_failures: + logger.critical("Maximum connection failures reached, attempting pool recreation") + await self._recreate_pool() + return False - async def get_pool_stats(self) -> Dict[str, Any]: - """Get connection pool statistics""" - if not self._pool: - return {} + async def _recreate_pool(self) -> None: + """Recreate connection pool after failures.""" + try: + if self.pool: + await self.pool.close() + self.pool = None + + # Wait before recreating + await asyncio.sleep(5) + + self.pool = await asyncpg.create_pool( + self._connection_string, + min_size=5, + max_size=self.config.database.pool_size, + command_timeout=60, + server_settings={ + 'jit': 'off', + 'timezone': 'UTC' + }, + init=self._init_connection + ) + + self._connection_failures = 0 + logger.info("Connection pool recreated successfully") + + except Exception as e: + logger.error(f"Failed to recreate connection pool: {e}") + # Will retry on next health check + + async def _health_monitor(self) -> None: + """Background task to monitor connection pool health.""" + while True: + try: + await asyncio.sleep(self._health_check_interval) + + if self.pool: + await self._test_connection() + self._last_health_check = datetime.utcnow() + + # Log pool statistics periodically + if datetime.utcnow().minute % 5 == 0: # Every 5 minutes + stats = self.get_pool_stats() + logger.debug(f"Connection pool stats: {stats}") + + except asyncio.CancelledError: + logger.info("Health monitor task cancelled") + break + except Exception as e: + logger.error(f"Health monitor error: {e}") + + async def close(self) -> None: + """Close connection pool and stop monitoring.""" + if self._health_check_task: + self._health_check_task.cancel() + try: + await self._health_check_task + except asyncio.CancelledError: + pass + + if self.pool: + await self.pool.close() + logger.info("Database connection pool closed") + + def get_pool_stats(self) -> Dict[str, Any]: + """Get connection pool statistics.""" + if not self.pool: + return {"status": "not_initialized"} return { - 'size': self._pool.get_size(), - 'min_size': self._pool.get_min_size(), - 'max_size': self._pool.get_max_size(), - 'idle_size': self._pool.get_idle_size(), - 'is_closing': self._pool.is_closing() + "status": "active", + "size": self.pool.get_size(), + "max_size": self.pool.get_max_size(), + "min_size": self.pool.get_min_size(), + "connection_failures": self._connection_failures, + "last_health_check": self._last_health_check.isoformat(), + "health_check_interval": self._health_check_interval } - @property - def is_initialized(self) -> bool: - """Check if pool is initialized""" - return self._is_initialized - - -# Global connection pool instance -db_pool = DatabaseConnectionPool() \ No newline at end of file + def is_healthy(self) -> bool: + """Check if connection pool is healthy.""" + if not self.pool: + return False + + # Check if health check is recent + time_since_check = datetime.utcnow() - self._last_health_check + if time_since_check > timedelta(seconds=self._health_check_interval * 2): + return False + + # Check failure count + return self._connection_failures < self._max_failures + + async def acquire(self): + """Acquire a connection from the pool.""" + if not self.pool: + raise ConnectionError("Connection pool not initialized") + + return self.pool.acquire() + + async def execute(self, query: str, *args) -> None: + """Execute a query using a pooled connection.""" + async with self.acquire() as conn: + return await conn.execute(query, *args) + + async def fetch(self, query: str, *args) -> list: + """Fetch multiple rows using a pooled connection.""" + async with self.acquire() as conn: + return await conn.fetch(query, *args) + + async def fetchrow(self, query: str, *args): + """Fetch a single row using a pooled connection.""" + async with self.acquire() as conn: + return await conn.fetchrow(query, *args) + + async def fetchval(self, query: str, *args): + """Fetch a single value using a pooled connection.""" + async with self.acquire() as conn: + return await conn.fetchval(query, *args) \ No newline at end of file diff --git a/COBY/storage/schema.py b/COBY/storage/schema.py index f4e3413..46bba9f 100644 --- a/COBY/storage/schema.py +++ b/COBY/storage/schema.py @@ -1,29 +1,103 @@ """ -Database schema management for TimescaleDB. +Database schema management and migration system. +Handles schema versioning, migrations, and database structure updates. """ -from typing import List -from ..utils.logging import get_logger +import logging +from typing import Dict, List, Optional +from datetime import datetime +import asyncpg -logger = get_logger(__name__) +logger = logging.getLogger(__name__) -class DatabaseSchema: - """Manages database schema creation and migrations""" +class SchemaManager: + """Manages database schema versions and migrations.""" - @staticmethod - def get_schema_creation_queries() -> List[str]: - """Get list of queries to create the database schema""" - return [ - # Create TimescaleDB extension - "CREATE EXTENSION IF NOT EXISTS timescaledb;", - - # Create schema - "CREATE SCHEMA IF NOT EXISTS market_data;", + def __init__(self, connection_pool): + self.pool = connection_pool + self.current_version = "1.0.0" + + async def initialize_schema_tracking(self) -> None: + """Initialize schema version tracking table.""" + try: + async with self.pool.acquire() as conn: + await conn.execute(""" + CREATE TABLE IF NOT EXISTS schema_migrations ( + version VARCHAR(20) PRIMARY KEY, + applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + description TEXT, + checksum VARCHAR(64) + ); + """) + + # Record initial schema version + await conn.execute(""" + INSERT INTO schema_migrations (version, description) + VALUES ($1, $2) + ON CONFLICT (version) DO NOTHING + """, self.current_version, "Initial schema setup") + + logger.info("Schema tracking initialized") + + except Exception as e: + logger.error(f"Failed to initialize schema tracking: {e}") + raise + + async def get_current_schema_version(self) -> Optional[str]: + """Get the current schema version from database.""" + try: + async with self.pool.acquire() as conn: + version = await conn.fetchval(""" + SELECT version FROM schema_migrations + ORDER BY applied_at DESC LIMIT 1 + """) + return version + + except Exception as e: + logger.error(f"Failed to get schema version: {e}") + return None + + async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool: + """Apply a database migration.""" + try: + async with self.pool.acquire() as conn: + async with conn.transaction(): + # Check if migration already applied + existing = await conn.fetchval(""" + SELECT version FROM schema_migrations WHERE version = $1 + """, version) + + if existing: + logger.info(f"Migration {version} already applied") + return True + + # Apply migration commands + for sql_command in sql_commands: + await conn.execute(sql_command) + + # Record migration + await conn.execute(""" + INSERT INTO schema_migrations (version, description) + VALUES ($1, $2) + """, version, description) + + logger.info(f"Applied migration {version}: {description}") + return True + + except Exception as e: + logger.error(f"Failed to apply migration {version}: {e}") + return False + + async def create_base_schema(self) -> bool: + """Create the base database schema with all tables and indexes.""" + migration_commands = [ + # Enable TimescaleDB extension + "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;", # Order book snapshots table """ - CREATE TABLE IF NOT EXISTS market_data.order_book_snapshots ( + CREATE TABLE IF NOT EXISTS order_book_snapshots ( id BIGSERIAL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, @@ -35,14 +109,13 @@ class DatabaseSchema: spread DECIMAL(20,8), bid_volume DECIMAL(30,8), ask_volume DECIMAL(30,8), - created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (timestamp, symbol, exchange) ); """, # Trade events table """ - CREATE TABLE IF NOT EXISTS market_data.trade_events ( + CREATE TABLE IF NOT EXISTS trade_events ( id BIGSERIAL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, @@ -51,14 +124,13 @@ class DatabaseSchema: size DECIMAL(30,8) NOT NULL, side VARCHAR(4) NOT NULL, trade_id VARCHAR(100) NOT NULL, - created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (timestamp, symbol, exchange, trade_id) ); """, - # Aggregated heatmap data table + # Heatmap data table """ - CREATE TABLE IF NOT EXISTS market_data.heatmap_data ( + CREATE TABLE IF NOT EXISTS heatmap_data ( symbol VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, bucket_size DECIMAL(10,2) NOT NULL, @@ -66,15 +138,13 @@ class DatabaseSchema: volume DECIMAL(30,8) NOT NULL, side VARCHAR(3) NOT NULL, exchange_count INTEGER NOT NULL, - exchanges JSONB, - created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side) ); """, # OHLCV data table """ - CREATE TABLE IF NOT EXISTS market_data.ohlcv_data ( + CREATE TABLE IF NOT EXISTS ohlcv_data ( symbol VARCHAR(20) NOT NULL, timestamp TIMESTAMPTZ NOT NULL, timeframe VARCHAR(10) NOT NULL, @@ -84,173 +154,185 @@ class DatabaseSchema: close_price DECIMAL(20,8) NOT NULL, volume DECIMAL(30,8) NOT NULL, trade_count INTEGER, - vwap DECIMAL(20,8), - created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (timestamp, symbol, timeframe) ); - """, - - # Exchange status tracking table - """ - CREATE TABLE IF NOT EXISTS market_data.exchange_status ( - exchange VARCHAR(20) NOT NULL, - timestamp TIMESTAMPTZ NOT NULL, - status VARCHAR(20) NOT NULL, - last_message_time TIMESTAMPTZ, - error_message TEXT, - connection_count INTEGER DEFAULT 0, - created_at TIMESTAMPTZ DEFAULT NOW(), - PRIMARY KEY (timestamp, exchange) - ); - """, - - # System metrics table - """ - CREATE TABLE IF NOT EXISTS market_data.system_metrics ( - metric_name VARCHAR(50) NOT NULL, - timestamp TIMESTAMPTZ NOT NULL, - value DECIMAL(20,8) NOT NULL, - labels JSONB, - created_at TIMESTAMPTZ DEFAULT NOW(), - PRIMARY KEY (timestamp, metric_name) - ); """ ] + + return await self.apply_migration( + "1.0.0", + "Create base schema with core tables", + migration_commands + ) - @staticmethod - def get_hypertable_creation_queries() -> List[str]: - """Get queries to create hypertables""" - return [ - "SELECT create_hypertable('market_data.order_book_snapshots', 'timestamp', if_not_exists => TRUE);", - "SELECT create_hypertable('market_data.trade_events', 'timestamp', if_not_exists => TRUE);", - "SELECT create_hypertable('market_data.heatmap_data', 'timestamp', if_not_exists => TRUE);", - "SELECT create_hypertable('market_data.ohlcv_data', 'timestamp', if_not_exists => TRUE);", - "SELECT create_hypertable('market_data.exchange_status', 'timestamp', if_not_exists => TRUE);", - "SELECT create_hypertable('market_data.system_metrics', 'timestamp', if_not_exists => TRUE);" + async def create_hypertables(self) -> bool: + """Convert tables to TimescaleDB hypertables.""" + hypertable_commands = [ + "SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('heatmap_data', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);" ] + + return await self.apply_migration( + "1.0.1", + "Convert tables to hypertables", + hypertable_commands + ) - @staticmethod - def get_index_creation_queries() -> List[str]: - """Get queries to create indexes""" - return [ - # Order book indexes - "CREATE INDEX IF NOT EXISTS idx_order_book_symbol_exchange ON market_data.order_book_snapshots (symbol, exchange, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_order_book_timestamp ON market_data.order_book_snapshots (timestamp DESC);", + async def create_indexes(self) -> bool: + """Create performance indexes.""" + index_commands = [ + # Order book snapshots indexes + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_time ON order_book_snapshots (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_exchange_time ON order_book_snapshots (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange ON order_book_snapshots (symbol, exchange, timestamp DESC);", # Trade events indexes - "CREATE INDEX IF NOT EXISTS idx_trade_events_symbol_exchange ON market_data.trade_events (symbol, exchange, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_trade_events_timestamp ON market_data.trade_events (timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_trade_events_price ON market_data.trade_events (symbol, price, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_symbol_time ON trade_events (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_exchange_time ON trade_events (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_price ON trade_events (symbol, price, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_side ON trade_events (symbol, side, timestamp DESC);", # Heatmap data indexes - "CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_bucket ON market_data.heatmap_data (symbol, bucket_size, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_heatmap_timestamp ON market_data.heatmap_data (timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_time ON heatmap_data (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_heatmap_bucket ON heatmap_data (symbol, bucket_size, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_heatmap_side ON heatmap_data (symbol, side, timestamp DESC);", - # OHLCV data indexes - "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON market_data.ohlcv_data (symbol, timeframe, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_ohlcv_timestamp ON market_data.ohlcv_data (timestamp DESC);", - - # Exchange status indexes - "CREATE INDEX IF NOT EXISTS idx_exchange_status_exchange ON market_data.exchange_status (exchange, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_exchange_status_timestamp ON market_data.exchange_status (timestamp DESC);", - - # System metrics indexes - "CREATE INDEX IF NOT EXISTS idx_system_metrics_name ON market_data.system_metrics (metric_name, timestamp DESC);", - "CREATE INDEX IF NOT EXISTS idx_system_metrics_timestamp ON market_data.system_metrics (timestamp DESC);" + # OHLCV indexes + "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON ohlcv_data (symbol, timeframe, timestamp DESC);" ] + + return await self.apply_migration( + "1.0.2", + "Create performance indexes", + index_commands + ) - @staticmethod - def get_retention_policy_queries() -> List[str]: - """Get queries to create retention policies""" - return [ - "SELECT add_retention_policy('market_data.order_book_snapshots', INTERVAL '90 days', if_not_exists => TRUE);", - "SELECT add_retention_policy('market_data.trade_events', INTERVAL '90 days', if_not_exists => TRUE);", - "SELECT add_retention_policy('market_data.heatmap_data', INTERVAL '90 days', if_not_exists => TRUE);", - "SELECT add_retention_policy('market_data.ohlcv_data', INTERVAL '365 days', if_not_exists => TRUE);", - "SELECT add_retention_policy('market_data.exchange_status', INTERVAL '30 days', if_not_exists => TRUE);", - "SELECT add_retention_policy('market_data.system_metrics', INTERVAL '30 days', if_not_exists => TRUE);" + async def setup_retention_policies(self) -> bool: + """Set up data retention policies.""" + retention_commands = [ + "SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);", + "SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);", + "SELECT add_retention_policy('heatmap_data', INTERVAL '60 days', if_not_exists => TRUE);", + "SELECT add_retention_policy('ohlcv_data', INTERVAL '1 year', if_not_exists => TRUE);" ] + + return await self.apply_migration( + "1.0.3", + "Setup data retention policies", + retention_commands + ) - @staticmethod - def get_continuous_aggregate_queries() -> List[str]: - """Get queries to create continuous aggregates""" - return [ - # Hourly OHLCV aggregate + async def create_continuous_aggregates(self) -> bool: + """Create continuous aggregates for better query performance.""" + aggregate_commands = [ + # 1-minute OHLCV aggregates from trades """ - CREATE MATERIALIZED VIEW IF NOT EXISTS market_data.hourly_ohlcv + CREATE MATERIALIZED VIEW IF NOT EXISTS trades_1m WITH (timescaledb.continuous) AS SELECT + time_bucket('1 minute', timestamp) AS bucket, symbol, exchange, - time_bucket('1 hour', timestamp) AS hour, first(price, timestamp) AS open_price, max(price) AS high_price, min(price) AS low_price, last(price, timestamp) AS close_price, sum(size) AS volume, - count(*) AS trade_count, - avg(price) AS vwap - FROM market_data.trade_events - GROUP BY symbol, exchange, hour - WITH NO DATA; + count(*) AS trade_count + FROM trade_events + GROUP BY bucket, symbol, exchange; """, - # Add refresh policy for continuous aggregate + # 5-minute order book statistics """ - SELECT add_continuous_aggregate_policy('market_data.hourly_ohlcv', - start_offset => INTERVAL '3 hours', - end_offset => INTERVAL '1 hour', - schedule_interval => INTERVAL '1 hour', - if_not_exists => TRUE); - """ - ] - - @staticmethod - def get_view_creation_queries() -> List[str]: - """Get queries to create views""" - return [ - # Latest order books view - """ - CREATE OR REPLACE VIEW market_data.latest_order_books AS - SELECT DISTINCT ON (symbol, exchange) + CREATE MATERIALIZED VIEW IF NOT EXISTS orderbook_stats_5m + WITH (timescaledb.continuous) AS + SELECT + time_bucket('5 minutes', timestamp) AS bucket, symbol, exchange, - timestamp, - bids, - asks, - mid_price, - spread, - bid_volume, - ask_volume - FROM market_data.order_book_snapshots - ORDER BY symbol, exchange, timestamp DESC; - """, - - # Latest heatmaps view - """ - CREATE OR REPLACE VIEW market_data.latest_heatmaps AS - SELECT DISTINCT ON (symbol, bucket_size, price_bucket, side) - symbol, - bucket_size, - price_bucket, - side, - timestamp, - volume, - exchange_count, - exchanges - FROM market_data.heatmap_data - ORDER BY symbol, bucket_size, price_bucket, side, timestamp DESC; + avg(mid_price) AS avg_mid_price, + avg(spread) AS avg_spread, + avg(bid_volume) AS avg_bid_volume, + avg(ask_volume) AS avg_ask_volume, + count(*) AS snapshot_count + FROM order_book_snapshots + WHERE mid_price IS NOT NULL + GROUP BY bucket, symbol, exchange; """ ] + + return await self.apply_migration( + "1.0.4", + "Create continuous aggregates", + aggregate_commands + ) - @staticmethod - def get_all_creation_queries() -> List[str]: - """Get all schema creation queries in order""" - queries = [] - queries.extend(DatabaseSchema.get_schema_creation_queries()) - queries.extend(DatabaseSchema.get_hypertable_creation_queries()) - queries.extend(DatabaseSchema.get_index_creation_queries()) - queries.extend(DatabaseSchema.get_retention_policy_queries()) - queries.extend(DatabaseSchema.get_continuous_aggregate_queries()) - queries.extend(DatabaseSchema.get_view_creation_queries()) - return queries \ No newline at end of file + async def setup_complete_schema(self) -> bool: + """Set up the complete database schema with all components.""" + try: + # Initialize schema tracking + await self.initialize_schema_tracking() + + # Apply all migrations in order + migrations = [ + self.create_base_schema, + self.create_hypertables, + self.create_indexes, + self.setup_retention_policies, + self.create_continuous_aggregates + ] + + for migration in migrations: + success = await migration() + if not success: + logger.error(f"Failed to apply migration: {migration.__name__}") + return False + + logger.info("Complete database schema setup successful") + return True + + except Exception as e: + logger.error(f"Failed to setup complete schema: {e}") + return False + + async def get_schema_info(self) -> Dict: + """Get information about the current schema state.""" + try: + async with self.pool.acquire() as conn: + # Get applied migrations + migrations = await conn.fetch(""" + SELECT version, applied_at, description + FROM schema_migrations + ORDER BY applied_at + """) + + # Get table information + tables = await conn.fetch(""" + SELECT + schemaname, + tablename, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ('order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data') + """) + + # Get hypertable information + hypertables = await conn.fetch(""" + SELECT hypertable_name, num_chunks, compression_enabled + FROM timescaledb_information.hypertables + WHERE hypertable_schema = 'public' + """) + + return { + "migrations": [dict(m) for m in migrations], + "tables": [dict(t) for t in tables], + "hypertables": [dict(h) for h in hypertables] + } + + except Exception as e: + logger.error(f"Failed to get schema info: {e}") + return {} \ No newline at end of file diff --git a/COBY/storage/storage_manager.py b/COBY/storage/storage_manager.py new file mode 100644 index 0000000..ef7f70c --- /dev/null +++ b/COBY/storage/storage_manager.py @@ -0,0 +1,270 @@ +""" +Comprehensive storage manager that integrates TimescaleDB, connection pooling, and schema management. +""" + +import asyncio +import logging +from typing import Dict, List, Optional, Any +from datetime import datetime, timedelta + +from .timescale_manager import TimescaleManager +from .connection_pool import ConnectionPoolManager +from .schema import SchemaManager +from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData +from ..config import Config +from ..utils.exceptions import DatabaseError, ConnectionError + +logger = logging.getLogger(__name__) + + +class StorageManager: + """Unified storage manager for all database operations.""" + + def __init__(self, config: Config): + self.config = config + self.connection_pool = ConnectionPoolManager(config) + self.schema_manager = SchemaManager(self.connection_pool) + self.timescale_manager = TimescaleManager(config) + self._initialized = False + + async def initialize(self) -> None: + """Initialize all storage components.""" + try: + logger.info("Initializing storage manager...") + + # Initialize connection pool + await self.connection_pool.initialize() + + # Set up database schema + await self.schema_manager.setup_complete_schema() + + # Initialize TimescaleDB manager with existing pool + self.timescale_manager.pool = self.connection_pool.pool + + self._initialized = True + logger.info("Storage manager initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize storage manager: {e}") + raise ConnectionError(f"Storage initialization failed: {e}") + + async def close(self) -> None: + """Close all storage connections.""" + if self.timescale_manager: + await self.timescale_manager.close() + + if self.connection_pool: + await self.connection_pool.close() + + logger.info("Storage manager closed") + + def is_healthy(self) -> bool: + """Check if storage system is healthy.""" + return self._initialized and self.connection_pool.is_healthy() + + # Order book operations + async def store_orderbook(self, data: OrderBookSnapshot) -> bool: + """Store order book snapshot.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.store_orderbook(data) + + async def batch_store_orderbooks(self, data_list: List[OrderBookSnapshot]) -> bool: + """Store multiple order book snapshots.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.batch_store_orderbooks(data_list) + + async def get_latest_orderbook(self, symbol: str, exchange: Optional[str] = None) -> Optional[Dict]: + """Get latest order book snapshot.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.get_latest_orderbook(symbol, exchange) + + # Trade operations + async def store_trade(self, data: TradeEvent) -> bool: + """Store trade event.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.store_trade(data) + + async def batch_store_trades(self, data_list: List[TradeEvent]) -> bool: + """Store multiple trade events.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.batch_store_trades(data_list) + + # Heatmap operations + async def store_heatmap_data(self, data: HeatmapData) -> bool: + """Store heatmap data.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.store_heatmap_data(data) + + async def get_heatmap_data(self, symbol: str, bucket_size: float, + start: Optional[datetime] = None) -> List[Dict]: + """Get heatmap data for visualization.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.get_heatmap_data(symbol, bucket_size, start) + + # Historical data operations + async def get_historical_data(self, symbol: str, start: datetime, end: datetime, + data_type: str = 'orderbook') -> List[Dict]: + """Get historical data for a symbol within time range.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + return await self.timescale_manager.get_historical_data(symbol, start, end, data_type) + + # System operations + async def get_system_stats(self) -> Dict[str, Any]: + """Get comprehensive system statistics.""" + if not self._initialized: + return {"status": "not_initialized"} + + try: + # Get database stats + db_stats = await self.timescale_manager.get_database_stats() + + # Get connection pool stats + pool_stats = self.connection_pool.get_pool_stats() + + # Get schema info + schema_info = await self.schema_manager.get_schema_info() + + return { + "status": "healthy" if self.is_healthy() else "unhealthy", + "database": db_stats, + "connection_pool": pool_stats, + "schema": schema_info, + "initialized": self._initialized + } + + except Exception as e: + logger.error(f"Failed to get system stats: {e}") + return {"status": "error", "error": str(e)} + + async def health_check(self) -> Dict[str, Any]: + """Perform comprehensive health check.""" + health_status = { + "healthy": True, + "components": {}, + "timestamp": datetime.utcnow().isoformat() + } + + try: + # Check connection pool + pool_healthy = self.connection_pool.is_healthy() + health_status["components"]["connection_pool"] = { + "healthy": pool_healthy, + "stats": self.connection_pool.get_pool_stats() + } + + # Test database connection + try: + async with self.connection_pool.acquire() as conn: + await conn.execute('SELECT 1') + health_status["components"]["database"] = {"healthy": True} + except Exception as e: + health_status["components"]["database"] = { + "healthy": False, + "error": str(e) + } + health_status["healthy"] = False + + # Check schema version + try: + current_version = await self.schema_manager.get_current_schema_version() + health_status["components"]["schema"] = { + "healthy": True, + "version": current_version + } + except Exception as e: + health_status["components"]["schema"] = { + "healthy": False, + "error": str(e) + } + health_status["healthy"] = False + + # Overall health + health_status["healthy"] = all( + comp.get("healthy", False) + for comp in health_status["components"].values() + ) + + except Exception as e: + logger.error(f"Health check failed: {e}") + health_status["healthy"] = False + health_status["error"] = str(e) + + return health_status + + # Maintenance operations + async def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: + """Clean up old data beyond retention period.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + try: + cutoff_date = datetime.utcnow() - timedelta(days=days) + cleanup_stats = {} + + async with self.connection_pool.acquire() as conn: + # Clean up old order book snapshots + result = await conn.execute(""" + DELETE FROM order_book_snapshots + WHERE timestamp < $1 + """, cutoff_date) + cleanup_stats["order_book_snapshots"] = int(result.split()[-1]) + + # Clean up old trade events + result = await conn.execute(""" + DELETE FROM trade_events + WHERE timestamp < $1 + """, cutoff_date) + cleanup_stats["trade_events"] = int(result.split()[-1]) + + # Clean up old heatmap data + result = await conn.execute(""" + DELETE FROM heatmap_data + WHERE timestamp < $1 + """, cutoff_date) + cleanup_stats["heatmap_data"] = int(result.split()[-1]) + + logger.info(f"Cleaned up old data: {cleanup_stats}") + return cleanup_stats + + except Exception as e: + logger.error(f"Failed to cleanup old data: {e}") + raise DatabaseError(f"Data cleanup failed: {e}") + + async def optimize_database(self) -> bool: + """Run database optimization tasks.""" + if not self._initialized: + raise DatabaseError("Storage manager not initialized") + + try: + async with self.connection_pool.acquire() as conn: + # Analyze tables for better query planning + tables = ['order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data'] + for table in tables: + await conn.execute(f"ANALYZE {table}") + + # Vacuum tables to reclaim space + for table in tables: + await conn.execute(f"VACUUM {table}") + + logger.info("Database optimization completed") + return True + + except Exception as e: + logger.error(f"Database optimization failed: {e}") + return False \ No newline at end of file diff --git a/COBY/storage/timescale_manager.py b/COBY/storage/timescale_manager.py index 9098e2e..fd71bec 100644 --- a/COBY/storage/timescale_manager.py +++ b/COBY/storage/timescale_manager.py @@ -1,604 +1,540 @@ """ -TimescaleDB storage manager implementation. +TimescaleDB connection manager and database operations. +Provides connection pooling, schema management, and optimized time-series operations. """ -import json +import asyncio +import logging from datetime import datetime -from typing import List, Dict, Optional, Any -from ..interfaces.storage_manager import StorageManager -from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData, SystemMetrics, PriceLevel -from ..utils.logging import get_logger, set_correlation_id -from ..utils.exceptions import StorageError, ValidationError -from ..utils.timing import get_current_timestamp -from .connection_pool import db_pool -from .schema import DatabaseSchema +from typing import Dict, List, Optional, Any +from dataclasses import asdict +import asyncpg +from asyncpg import Pool, Connection +import json -logger = get_logger(__name__) +from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData, PriceBuckets +from ..utils.exceptions import DatabaseError, ConnectionError +from ..config import Config + +logger = logging.getLogger(__name__) -class TimescaleManager(StorageManager): - """TimescaleDB implementation of StorageManager interface""" +class TimescaleManager: + """Manages TimescaleDB connections and operations for time-series data.""" - def __init__(self): - self._schema_initialized = False + def __init__(self, config: Config): + self.config = config + self.pool: Optional[Pool] = None + self._connection_string = self._build_connection_string() + + def _build_connection_string(self) -> str: + """Build PostgreSQL connection string from config.""" + return ( + f"postgresql://{self.config.database.user}:{self.config.database.password}" + f"@{self.config.database.host}:{self.config.database.port}/{self.config.database.name}" + ) async def initialize(self) -> None: - """Initialize the storage manager""" - await db_pool.initialize() - await self.setup_database_schema() - logger.info("TimescaleDB storage manager initialized") + """Initialize connection pool and database schema.""" + try: + logger.info("Initializing TimescaleDB connection pool...") + self.pool = await asyncpg.create_pool( + self._connection_string, + min_size=5, + max_size=self.config.database.pool_size, + command_timeout=60, + server_settings={ + 'jit': 'off', # Disable JIT for better performance with time-series + 'timezone': 'UTC' + } + ) + + # Test connection + async with self.pool.acquire() as conn: + await conn.execute('SELECT 1') + + logger.info("TimescaleDB connection pool initialized successfully") + + # Initialize database schema + await self.setup_database_schema() + + except Exception as e: + logger.error(f"Failed to initialize TimescaleDB: {e}") + raise ConnectionError(f"TimescaleDB initialization failed: {e}") async def close(self) -> None: - """Close the storage manager""" - await db_pool.close() - logger.info("TimescaleDB storage manager closed") + """Close connection pool.""" + if self.pool: + await self.pool.close() + logger.info("TimescaleDB connection pool closed") - def setup_database_schema(self) -> None: - """Set up database schema and tables""" - async def _setup(): - if self._schema_initialized: - return - - try: - queries = DatabaseSchema.get_all_creation_queries() - - for query in queries: - try: - await db_pool.execute_command(query) - logger.debug(f"Executed schema query: {query[:50]}...") - except Exception as e: - # Log but continue - some queries might fail if already exists - logger.warning(f"Schema query failed (continuing): {e}") - - self._schema_initialized = True - logger.info("Database schema setup completed") - - except Exception as e: - logger.error(f"Failed to setup database schema: {e}") - raise StorageError(f"Schema setup failed: {e}", "SCHEMA_SETUP_ERROR") - - # Run async setup - import asyncio - if asyncio.get_event_loop().is_running(): - asyncio.create_task(_setup()) - else: - asyncio.run(_setup()) - - async def store_orderbook(self, data: OrderBookSnapshot) -> bool: - """Store order book snapshot to database""" + async def setup_database_schema(self) -> None: + """Create database schema with hypertables and indexes.""" try: - set_correlation_id() + async with self.pool.acquire() as conn: + # Enable TimescaleDB extension + await conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;") + + # Create order book snapshots table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS order_book_snapshots ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bids JSONB NOT NULL, + asks JSONB NOT NULL, + sequence_id BIGINT, + mid_price DECIMAL(20,8), + spread DECIMAL(20,8), + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + PRIMARY KEY (timestamp, symbol, exchange) + ); + """) + + # Convert to hypertable if not already + try: + await conn.execute(""" + SELECT create_hypertable('order_book_snapshots', 'timestamp', + if_not_exists => TRUE); + """) + except Exception as e: + if "already a hypertable" not in str(e): + logger.warning(f"Could not create hypertable for order_book_snapshots: {e}") + + # Create trade events table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS trade_events ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + price DECIMAL(20,8) NOT NULL, + size DECIMAL(30,8) NOT NULL, + side VARCHAR(4) NOT NULL, + trade_id VARCHAR(100) NOT NULL, + PRIMARY KEY (timestamp, symbol, exchange, trade_id) + ); + """) + + # Convert to hypertable if not already + try: + await conn.execute(""" + SELECT create_hypertable('trade_events', 'timestamp', + if_not_exists => TRUE); + """) + except Exception as e: + if "already a hypertable" not in str(e): + logger.warning(f"Could not create hypertable for trade_events: {e}") + + # Create heatmap data table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS heatmap_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bucket_size DECIMAL(10,2) NOT NULL, + price_bucket DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + side VARCHAR(3) NOT NULL, + exchange_count INTEGER NOT NULL, + PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side) + ); + """) + + # Convert to hypertable if not already + try: + await conn.execute(""" + SELECT create_hypertable('heatmap_data', 'timestamp', + if_not_exists => TRUE); + """) + except Exception as e: + if "already a hypertable" not in str(e): + logger.warning(f"Could not create hypertable for heatmap_data: {e}") + + # Create OHLCV data table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS ohlcv_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open_price DECIMAL(20,8) NOT NULL, + high_price DECIMAL(20,8) NOT NULL, + low_price DECIMAL(20,8) NOT NULL, + close_price DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + trade_count INTEGER, + PRIMARY KEY (timestamp, symbol, timeframe) + ); + """) + + # Convert to hypertable if not already + try: + await conn.execute(""" + SELECT create_hypertable('ohlcv_data', 'timestamp', + if_not_exists => TRUE); + """) + except Exception as e: + if "already a hypertable" not in str(e): + logger.warning(f"Could not create hypertable for ohlcv_data: {e}") + + # Create indexes for better query performance + await self._create_indexes(conn) + + # Set up data retention policies + await self._setup_retention_policies(conn) + + logger.info("Database schema setup completed successfully") + + except Exception as e: + logger.error(f"Failed to setup database schema: {e}") + raise DatabaseError(f"Schema setup failed: {e}") + + async def _create_indexes(self, conn: Connection) -> None: + """Create indexes for optimized queries.""" + indexes = [ + # Order book snapshots indexes + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_time ON order_book_snapshots (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_exchange_time ON order_book_snapshots (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange ON order_book_snapshots (symbol, exchange, timestamp DESC);", - # Convert price levels to JSON - bids_json = json.dumps([ - {"price": float(level.price), "size": float(level.size), "count": level.count} - for level in data.bids - ]) - asks_json = json.dumps([ - {"price": float(level.price), "size": float(level.size), "count": level.count} - for level in data.asks - ]) + # Trade events indexes + "CREATE INDEX IF NOT EXISTS idx_trades_symbol_time ON trade_events (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_exchange_time ON trade_events (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_price ON trade_events (symbol, price, timestamp DESC);", - query = """ - INSERT INTO market_data.order_book_snapshots - (symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread, bid_volume, ask_volume) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - """ + # Heatmap data indexes + "CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_time ON heatmap_data (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_heatmap_bucket ON heatmap_data (symbol, bucket_size, timestamp DESC);", - await db_pool.execute_command( - query, - data.symbol, - data.exchange, - data.timestamp, - bids_json, - asks_json, - data.sequence_id, - float(data.mid_price) if data.mid_price else None, - float(data.spread) if data.spread else None, - float(data.bid_volume), - float(data.ask_volume) - ) + # OHLCV indexes + "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON ohlcv_data (symbol, timeframe, timestamp DESC);" + ] + + for index_sql in indexes: + try: + await conn.execute(index_sql) + except Exception as e: + logger.warning(f"Could not create index: {e}") + + async def _setup_retention_policies(self, conn: Connection) -> None: + """Set up data retention policies for automatic cleanup.""" + try: + # Keep raw order book data for 30 days + await conn.execute(""" + SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', + if_not_exists => TRUE); + """) - logger.debug(f"Stored order book: {data.symbol}@{data.exchange}") - return True + # Keep trade events for 90 days + await conn.execute(""" + SELECT add_retention_policy('trade_events', INTERVAL '90 days', + if_not_exists => TRUE); + """) + + # Keep heatmap data for 60 days + await conn.execute(""" + SELECT add_retention_policy('heatmap_data', INTERVAL '60 days', + if_not_exists => TRUE); + """) + + # Keep OHLCV data for 1 year + await conn.execute(""" + SELECT add_retention_policy('ohlcv_data', INTERVAL '1 year', + if_not_exists => TRUE); + """) + + logger.info("Data retention policies configured") except Exception as e: - logger.error(f"Failed to store order book: {e}") - return False + logger.warning(f"Could not set up retention policies: {e}") + + async def store_orderbook(self, data: OrderBookSnapshot) -> bool: + """Store order book snapshot in database.""" + try: + async with self.pool.acquire() as conn: + # Calculate derived metrics + mid_price = None + spread = None + bid_volume = sum(level.size for level in data.bids) + ask_volume = sum(level.size for level in data.asks) + + if data.bids and data.asks: + best_bid = max(data.bids, key=lambda x: x.price).price + best_ask = min(data.asks, key=lambda x: x.price).price + mid_price = (best_bid + best_ask) / 2 + spread = best_ask - best_bid + + # Convert price levels to JSON + bids_json = json.dumps([asdict(level) for level in data.bids]) + asks_json = json.dumps([asdict(level) for level in data.asks]) + + await conn.execute(""" + INSERT INTO order_book_snapshots + (symbol, exchange, timestamp, bids, asks, sequence_id, + mid_price, spread, bid_volume, ask_volume) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (timestamp, symbol, exchange) DO UPDATE SET + bids = EXCLUDED.bids, + asks = EXCLUDED.asks, + sequence_id = EXCLUDED.sequence_id, + mid_price = EXCLUDED.mid_price, + spread = EXCLUDED.spread, + bid_volume = EXCLUDED.bid_volume, + ask_volume = EXCLUDED.ask_volume + """, data.symbol, data.exchange, data.timestamp, bids_json, asks_json, + data.sequence_id, mid_price, spread, bid_volume, ask_volume) + + return True + + except Exception as e: + logger.error(f"Failed to store order book data: {e}") + raise DatabaseError(f"Order book storage failed: {e}") async def store_trade(self, data: TradeEvent) -> bool: - """Store trade event to database""" + """Store trade event in database.""" try: - set_correlation_id() - - query = """ - INSERT INTO market_data.trade_events - (symbol, exchange, timestamp, price, size, side, trade_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) - """ - - await db_pool.execute_command( - query, - data.symbol, - data.exchange, - data.timestamp, - float(data.price), - float(data.size), - data.side, - data.trade_id - ) - - logger.debug(f"Stored trade: {data.symbol}@{data.exchange} - {data.trade_id}") - return True - - except Exception as e: - logger.error(f"Failed to store trade: {e}") - return False - - async def store_heatmap(self, data: HeatmapData) -> bool: - """Store heatmap data to database""" - try: - set_correlation_id() - - # Store each heatmap point - for point in data.data: - query = """ - INSERT INTO market_data.heatmap_data - (symbol, timestamp, bucket_size, price_bucket, volume, side, exchange_count, exchanges) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (timestamp, symbol, bucket_size, price_bucket, side) - DO UPDATE SET - volume = EXCLUDED.volume, - exchange_count = EXCLUDED.exchange_count, - exchanges = EXCLUDED.exchanges - """ + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO trade_events + (symbol, exchange, timestamp, price, size, side, trade_id) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING + """, data.symbol, data.exchange, data.timestamp, data.price, + data.size, data.side, data.trade_id) - await db_pool.execute_command( - query, - data.symbol, - data.timestamp, - float(data.bucket_size), - float(point.price), - float(point.volume), - point.side, - 1, # exchange_count - will be updated by aggregation - json.dumps([]) # exchanges - will be updated by aggregation - ) - - logger.debug(f"Stored heatmap: {data.symbol} with {len(data.data)} points") - return True - - except Exception as e: - logger.error(f"Failed to store heatmap: {e}") - return False - - async def store_metrics(self, data: SystemMetrics) -> bool: - """Store system metrics to database""" - try: - set_correlation_id() - - # Store multiple metrics - metrics = [ - ('cpu_usage', data.cpu_usage), - ('memory_usage', data.memory_usage), - ('disk_usage', data.disk_usage), - ('database_connections', data.database_connections), - ('redis_connections', data.redis_connections), - ('active_websockets', data.active_websockets), - ('messages_per_second', data.messages_per_second), - ('processing_latency', data.processing_latency) - ] - - query = """ - INSERT INTO market_data.system_metrics - (metric_name, timestamp, value, labels) - VALUES ($1, $2, $3, $4) - """ - - for metric_name, value in metrics: - await db_pool.execute_command( - query, - metric_name, - data.timestamp, - float(value), - json.dumps(data.network_io) - ) - - logger.debug("Stored system metrics") - return True - - except Exception as e: - logger.error(f"Failed to store metrics: {e}") - return False - - async def get_historical_orderbooks(self, symbol: str, exchange: str, - start: datetime, end: datetime, - limit: Optional[int] = None) -> List[OrderBookSnapshot]: - """Retrieve historical order book data""" - try: - query = """ - SELECT symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread - FROM market_data.order_book_snapshots - WHERE symbol = $1 AND exchange = $2 AND timestamp >= $3 AND timestamp <= $4 - ORDER BY timestamp DESC - """ - - if limit: - query += f" LIMIT {limit}" - - rows = await db_pool.execute_query(query, symbol, exchange, start, end) - - orderbooks = [] - for row in rows: - # Parse JSON bid/ask data - bids_data = json.loads(row['bids']) - asks_data = json.loads(row['asks']) + return True - bids = [PriceLevel(price=b['price'], size=b['size'], count=b.get('count')) - for b in bids_data] - asks = [PriceLevel(price=a['price'], size=a['size'], count=a.get('count')) - for a in asks_data] + except Exception as e: + logger.error(f"Failed to store trade data: {e}") + raise DatabaseError(f"Trade storage failed: {e}") + + async def store_heatmap_data(self, data: HeatmapData) -> bool: + """Store heatmap data in database.""" + try: + async with self.pool.acquire() as conn: + # Prepare batch insert data + insert_data = [] + for point in data.data: + insert_data.append(( + data.symbol, data.timestamp, data.bucket_size, + point.price, point.volume, point.side, 1 # exchange_count + )) + + if insert_data: + await conn.executemany(""" + INSERT INTO heatmap_data + (symbol, timestamp, bucket_size, price_bucket, volume, side, exchange_count) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (timestamp, symbol, bucket_size, price_bucket, side) + DO UPDATE SET + volume = heatmap_data.volume + EXCLUDED.volume, + exchange_count = heatmap_data.exchange_count + EXCLUDED.exchange_count + """, insert_data) + + return True - orderbook = OrderBookSnapshot( - symbol=row['symbol'], - exchange=row['exchange'], - timestamp=row['timestamp'], - bids=bids, - asks=asks, - sequence_id=row['sequence_id'] - ) - orderbooks.append(orderbook) - - logger.debug(f"Retrieved {len(orderbooks)} historical order books") - return orderbooks - except Exception as e: - logger.error(f"Failed to get historical order books: {e}") - return [] + logger.error(f"Failed to store heatmap data: {e}") + raise DatabaseError(f"Heatmap storage failed: {e}") - async def get_historical_trades(self, symbol: str, exchange: str, - start: datetime, end: datetime, - limit: Optional[int] = None) -> List[TradeEvent]: - """Retrieve historical trade data""" + async def get_latest_orderbook(self, symbol: str, exchange: Optional[str] = None) -> Optional[Dict]: + """Get latest order book snapshot for a symbol.""" try: - query = """ - SELECT symbol, exchange, timestamp, price, size, side, trade_id - FROM market_data.trade_events - WHERE symbol = $1 AND exchange = $2 AND timestamp >= $3 AND timestamp <= $4 - ORDER BY timestamp DESC - """ - - if limit: - query += f" LIMIT {limit}" - - rows = await db_pool.execute_query(query, symbol, exchange, start, end) - - trades = [] - for row in rows: - trade = TradeEvent( - symbol=row['symbol'], - exchange=row['exchange'], - timestamp=row['timestamp'], - price=float(row['price']), - size=float(row['size']), - side=row['side'], - trade_id=row['trade_id'] - ) - trades.append(trade) - - logger.debug(f"Retrieved {len(trades)} historical trades") - return trades - - except Exception as e: - logger.error(f"Failed to get historical trades: {e}") - return [] - - async def get_latest_orderbook(self, symbol: str, exchange: str) -> Optional[OrderBookSnapshot]: - """Get latest order book snapshot""" - try: - query = """ - SELECT symbol, exchange, timestamp, bids, asks, sequence_id - FROM market_data.order_book_snapshots - WHERE symbol = $1 AND exchange = $2 - ORDER BY timestamp DESC - LIMIT 1 - """ - - rows = await db_pool.execute_query(query, symbol, exchange) - - if not rows: + async with self.pool.acquire() as conn: + if exchange: + query = """ + SELECT * FROM order_book_snapshots + WHERE symbol = $1 AND exchange = $2 + ORDER BY timestamp DESC LIMIT 1 + """ + row = await conn.fetchrow(query, symbol, exchange) + else: + query = """ + SELECT * FROM order_book_snapshots + WHERE symbol = $1 + ORDER BY timestamp DESC LIMIT 1 + """ + row = await conn.fetchrow(query, symbol) + + if row: + return dict(row) return None - - row = rows[0] - bids_data = json.loads(row['bids']) - asks_data = json.loads(row['asks']) - - bids = [PriceLevel(price=b['price'], size=b['size'], count=b.get('count')) - for b in bids_data] - asks = [PriceLevel(price=a['price'], size=a['size'], count=a.get('count')) - for a in asks_data] - - return OrderBookSnapshot( - symbol=row['symbol'], - exchange=row['exchange'], - timestamp=row['timestamp'], - bids=bids, - asks=asks, - sequence_id=row['sequence_id'] - ) - + except Exception as e: logger.error(f"Failed to get latest order book: {e}") - return None + raise DatabaseError(f"Order book retrieval failed: {e}") - async def get_latest_heatmap(self, symbol: str, bucket_size: float) -> Optional[HeatmapData]: - """Get latest heatmap data""" + async def get_historical_data(self, symbol: str, start: datetime, end: datetime, + data_type: str = 'orderbook') -> List[Dict]: + """Get historical data for a symbol within time range.""" try: - query = """ - SELECT price_bucket, volume, side, timestamp - FROM market_data.heatmap_data - WHERE symbol = $1 AND bucket_size = $2 - AND timestamp = ( - SELECT MAX(timestamp) - FROM market_data.heatmap_data - WHERE symbol = $1 AND bucket_size = $2 - ) - ORDER BY price_bucket - """ - - rows = await db_pool.execute_query(query, symbol, bucket_size) - - if not rows: - return None - - from ..models.core import HeatmapPoint - heatmap = HeatmapData( - symbol=symbol, - timestamp=rows[0]['timestamp'], - bucket_size=bucket_size - ) - - # Calculate max volume for intensity - max_volume = max(float(row['volume']) for row in rows) - - for row in rows: - volume = float(row['volume']) - intensity = volume / max_volume if max_volume > 0 else 0.0 + async with self.pool.acquire() as conn: + if data_type == 'orderbook': + query = """ + SELECT * FROM order_book_snapshots + WHERE symbol = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY timestamp ASC + """ + elif data_type == 'trades': + query = """ + SELECT * FROM trade_events + WHERE symbol = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY timestamp ASC + """ + elif data_type == 'heatmap': + query = """ + SELECT * FROM heatmap_data + WHERE symbol = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY timestamp ASC + """ + else: + raise ValueError(f"Unknown data type: {data_type}") - point = HeatmapPoint( - price=float(row['price_bucket']), - volume=volume, - intensity=intensity, - side=row['side'] - ) - heatmap.data.append(point) - - return heatmap - - except Exception as e: - logger.error(f"Failed to get latest heatmap: {e}") - return None - - async def get_ohlcv_data(self, symbol: str, exchange: str, timeframe: str, - start: datetime, end: datetime) -> List[Dict[str, Any]]: - """Get OHLCV candlestick data""" - try: - query = """ - SELECT timestamp, open_price, high_price, low_price, close_price, volume, trade_count, vwap - FROM market_data.ohlcv_data - WHERE symbol = $1 AND exchange = $2 AND timeframe = $3 - AND timestamp >= $4 AND timestamp <= $5 - ORDER BY timestamp - """ - - rows = await db_pool.execute_query(query, symbol, exchange, timeframe, start, end) - - ohlcv_data = [] - for row in rows: - ohlcv_data.append({ - 'timestamp': row['timestamp'], - 'open': float(row['open_price']), - 'high': float(row['high_price']), - 'low': float(row['low_price']), - 'close': float(row['close_price']), - 'volume': float(row['volume']), - 'trade_count': row['trade_count'], - 'vwap': float(row['vwap']) if row['vwap'] else None - }) - - logger.debug(f"Retrieved {len(ohlcv_data)} OHLCV records") - return ohlcv_data - - except Exception as e: - logger.error(f"Failed to get OHLCV data: {e}") - return [] - - async def batch_store_orderbooks(self, data: List[OrderBookSnapshot]) -> int: - """Store multiple order book snapshots in batch""" - if not data: - return 0 - - try: - set_correlation_id() - - # Prepare batch data - batch_data = [] - for orderbook in data: - bids_json = json.dumps([ - {"price": float(level.price), "size": float(level.size), "count": level.count} - for level in orderbook.bids - ]) - asks_json = json.dumps([ - {"price": float(level.price), "size": float(level.size), "count": level.count} - for level in orderbook.asks - ]) + rows = await conn.fetch(query, symbol, start, end) + return [dict(row) for row in rows] - batch_data.append(( - orderbook.symbol, - orderbook.exchange, - orderbook.timestamp, - bids_json, - asks_json, - orderbook.sequence_id, - float(orderbook.mid_price) if orderbook.mid_price else None, - float(orderbook.spread) if orderbook.spread else None, - float(orderbook.bid_volume), - float(orderbook.ask_volume) - )) - - query = """ - INSERT INTO market_data.order_book_snapshots - (symbol, exchange, timestamp, bids, asks, sequence_id, mid_price, spread, bid_volume, ask_volume) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - """ - - await db_pool.execute_many(query, batch_data) - - logger.debug(f"Batch stored {len(data)} order books") - return len(data) - except Exception as e: - logger.error(f"Failed to batch store order books: {e}") - return 0 + logger.error(f"Failed to get historical data: {e}") + raise DatabaseError(f"Historical data retrieval failed: {e}") - async def batch_store_trades(self, data: List[TradeEvent]) -> int: - """Store multiple trade events in batch""" - if not data: - return 0 - + async def get_heatmap_data(self, symbol: str, bucket_size: float, + start: Optional[datetime] = None) -> List[Dict]: + """Get heatmap data for visualization.""" try: - set_correlation_id() - - # Prepare batch data - batch_data = [ - (trade.symbol, trade.exchange, trade.timestamp, float(trade.price), - float(trade.size), trade.side, trade.trade_id) - for trade in data - ] - - query = """ - INSERT INTO market_data.trade_events - (symbol, exchange, timestamp, price, size, side, trade_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) - """ - - await db_pool.execute_many(query, batch_data) - - logger.debug(f"Batch stored {len(data)} trades") - return len(data) - - except Exception as e: - logger.error(f"Failed to batch store trades: {e}") - return 0 - - async def cleanup_old_data(self, retention_days: int) -> int: - """Clean up old data based on retention policy""" - try: - cutoff_time = get_current_timestamp().replace( - day=get_current_timestamp().day - retention_days - ) - - tables = [ - 'order_book_snapshots', - 'trade_events', - 'heatmap_data', - 'exchange_status', - 'system_metrics' - ] - - total_deleted = 0 - for table in tables: - query = f""" - DELETE FROM market_data.{table} - WHERE timestamp < $1 - """ + async with self.pool.acquire() as conn: + if start: + query = """ + SELECT price_bucket, volume, side, timestamp + FROM heatmap_data + WHERE symbol = $1 AND bucket_size = $2 AND timestamp >= $3 + ORDER BY timestamp DESC, price_bucket ASC + """ + rows = await conn.fetch(query, symbol, bucket_size, start) + else: + # Get latest heatmap data + query = """ + SELECT price_bucket, volume, side, timestamp + FROM heatmap_data + WHERE symbol = $1 AND bucket_size = $2 + AND timestamp = ( + SELECT MAX(timestamp) FROM heatmap_data + WHERE symbol = $1 AND bucket_size = $2 + ) + ORDER BY price_bucket ASC + """ + rows = await conn.fetch(query, symbol, bucket_size) - result = await db_pool.execute_command(query, cutoff_time) - # Extract number from result like "DELETE 1234" - deleted = int(result.split()[-1]) if result.split()[-1].isdigit() else 0 - total_deleted += deleted + return [dict(row) for row in rows] - logger.debug(f"Cleaned up {deleted} records from {table}") - - logger.info(f"Cleaned up {total_deleted} total records older than {retention_days} days") - return total_deleted - except Exception as e: - logger.error(f"Failed to cleanup old data: {e}") - return 0 + logger.error(f"Failed to get heatmap data: {e}") + raise DatabaseError(f"Heatmap data retrieval failed: {e}") - async def get_storage_stats(self) -> Dict[str, Any]: - """Get storage statistics""" - try: - stats = {} - - # Table sizes - size_query = """ - SELECT - schemaname, - tablename, - pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, - pg_total_relation_size(schemaname||'.'||tablename) as size_bytes - FROM pg_tables - WHERE schemaname = 'market_data' - ORDER BY size_bytes DESC - """ - - size_rows = await db_pool.execute_query(size_query) - stats['table_sizes'] = [ - { - 'table': row['tablename'], - 'size': row['size'], - 'size_bytes': row['size_bytes'] - } - for row in size_rows - ] - - # Record counts - tables = ['order_book_snapshots', 'trade_events', 'heatmap_data', - 'ohlcv_data', 'exchange_status', 'system_metrics'] - - record_counts = {} - for table in tables: - count_query = f"SELECT COUNT(*) as count FROM market_data.{table}" - count_rows = await db_pool.execute_query(count_query) - record_counts[table] = count_rows[0]['count'] if count_rows else 0 - - stats['record_counts'] = record_counts - - # Connection pool stats - stats['connection_pool'] = await db_pool.get_pool_stats() - - return stats - - except Exception as e: - logger.error(f"Failed to get storage stats: {e}") - return {} - - async def health_check(self) -> bool: - """Check storage system health""" - try: - # Check database connection - if not await db_pool.health_check(): - return False - - # Check if tables exist - query = """ - SELECT COUNT(*) as count - FROM information_schema.tables - WHERE table_schema = 'market_data' - """ - - rows = await db_pool.execute_query(query) - table_count = rows[0]['count'] if rows else 0 - - if table_count < 6: # We expect 6 main tables - logger.warning(f"Expected 6 tables, found {table_count}") - return False - - logger.debug("Storage health check passed") + async def batch_store_orderbooks(self, data_list: List[OrderBookSnapshot]) -> bool: + """Store multiple order book snapshots in a single transaction.""" + if not data_list: return True + try: + async with self.pool.acquire() as conn: + async with conn.transaction(): + insert_data = [] + for data in data_list: + # Calculate derived metrics + mid_price = None + spread = None + bid_volume = sum(level.size for level in data.bids) + ask_volume = sum(level.size for level in data.asks) + + if data.bids and data.asks: + best_bid = max(data.bids, key=lambda x: x.price).price + best_ask = min(data.asks, key=lambda x: x.price).price + mid_price = (best_bid + best_ask) / 2 + spread = best_ask - best_bid + + # Convert price levels to JSON + bids_json = json.dumps([asdict(level) for level in data.bids]) + asks_json = json.dumps([asdict(level) for level in data.asks]) + + insert_data.append(( + data.symbol, data.exchange, data.timestamp, bids_json, asks_json, + data.sequence_id, mid_price, spread, bid_volume, ask_volume + )) + + await conn.executemany(""" + INSERT INTO order_book_snapshots + (symbol, exchange, timestamp, bids, asks, sequence_id, + mid_price, spread, bid_volume, ask_volume) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (timestamp, symbol, exchange) DO UPDATE SET + bids = EXCLUDED.bids, + asks = EXCLUDED.asks, + sequence_id = EXCLUDED.sequence_id, + mid_price = EXCLUDED.mid_price, + spread = EXCLUDED.spread, + bid_volume = EXCLUDED.bid_volume, + ask_volume = EXCLUDED.ask_volume + """, insert_data) + + return True + except Exception as e: - logger.error(f"Storage health check failed: {e}") - return False \ No newline at end of file + logger.error(f"Failed to batch store order books: {e}") + raise DatabaseError(f"Batch order book storage failed: {e}") + + async def batch_store_trades(self, data_list: List[TradeEvent]) -> bool: + """Store multiple trade events in a single transaction.""" + if not data_list: + return True + + try: + async with self.pool.acquire() as conn: + async with conn.transaction(): + insert_data = [( + data.symbol, data.exchange, data.timestamp, data.price, + data.size, data.side, data.trade_id + ) for data in data_list] + + await conn.executemany(""" + INSERT INTO trade_events + (symbol, exchange, timestamp, price, size, side, trade_id) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (timestamp, symbol, exchange, trade_id) DO NOTHING + """, insert_data) + + return True + + except Exception as e: + logger.error(f"Failed to batch store trades: {e}") + raise DatabaseError(f"Batch trade storage failed: {e}") + + async def get_database_stats(self) -> Dict[str, Any]: + """Get database statistics and health information.""" + try: + async with self.pool.acquire() as conn: + stats = {} + + # Get table sizes + tables = ['order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data'] + for table in tables: + row = await conn.fetchrow(f""" + SELECT + COUNT(*) as row_count, + pg_size_pretty(pg_total_relation_size('{table}')) as size + FROM {table} + """) + stats[table] = dict(row) + + # Get connection pool stats + stats['connection_pool'] = { + 'size': self.pool.get_size(), + 'max_size': self.pool.get_max_size(), + 'min_size': self.pool.get_min_size() + } + + return stats + + except Exception as e: + logger.error(f"Failed to get database stats: {e}") + return {} \ No newline at end of file diff --git a/COBY/tests/__init__.py b/COBY/tests/__init__.py index 22555c0..296cf12 100644 --- a/COBY/tests/__init__.py +++ b/COBY/tests/__init__.py @@ -1,3 +1,3 @@ """ -Test suite for the COBY system. +Test modules for the COBY multi-exchange data aggregation system. """ \ No newline at end of file diff --git a/COBY/tests/test_timescale_integration.py b/COBY/tests/test_timescale_integration.py new file mode 100644 index 0000000..cf65ede --- /dev/null +++ b/COBY/tests/test_timescale_integration.py @@ -0,0 +1,101 @@ +""" +Test script for TimescaleDB integration. +Tests database connection, schema creation, and basic operations. +""" + +import asyncio +import logging +from datetime import datetime +from decimal import Decimal + +from ..config import Config +from ..storage.storage_manager import StorageManager +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def test_timescale_integration(): + """Test TimescaleDB integration with basic operations.""" + + # Initialize configuration + config = Config() + + # Create storage manager + storage = StorageManager(config) + + try: + # Initialize storage + logger.info("Initializing storage manager...") + await storage.initialize() + + # Test health check + logger.info("Running health check...") + health = await storage.health_check() + logger.info(f"Health status: {health}") + + # Test schema info + logger.info("Getting schema information...") + schema_info = await storage.get_system_stats() + logger.info(f"Schema info: {schema_info}") + + # Test order book storage + logger.info("Testing order book storage...") + test_orderbook = OrderBookSnapshot( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.utcnow(), + bids=[ + PriceLevel(price=50000.0, size=1.5), + PriceLevel(price=49999.0, size=2.0), + ], + asks=[ + PriceLevel(price=50001.0, size=1.2), + PriceLevel(price=50002.0, size=0.8), + ], + sequence_id=12345 + ) + + success = await storage.store_orderbook(test_orderbook) + logger.info(f"Order book storage result: {success}") + + # Test trade storage + logger.info("Testing trade storage...") + test_trade = TradeEvent( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.utcnow(), + price=50000.5, + size=0.1, + side="buy", + trade_id="test_trade_123" + ) + + success = await storage.store_trade(test_trade) + logger.info(f"Trade storage result: {success}") + + # Test data retrieval + logger.info("Testing data retrieval...") + latest_orderbook = await storage.get_latest_orderbook("BTCUSDT", "binance") + logger.info(f"Latest order book: {latest_orderbook is not None}") + + # Test system stats + logger.info("Getting system statistics...") + stats = await storage.get_system_stats() + logger.info(f"System stats: {stats}") + + logger.info("All tests completed successfully!") + + except Exception as e: + logger.error(f"Test failed: {e}") + raise + + finally: + # Clean up + await storage.close() + + +if __name__ == "__main__": + asyncio.run(test_timescale_integration()) \ No newline at end of file