storage manager

This commit is contained in:
Dobromir Popov
2025-08-04 21:50:11 +03:00
parent 42cf02cf3a
commit db61f3c3bf
8 changed files with 1306 additions and 836 deletions

View File

@ -1,11 +1,11 @@
"""
Storage layer for the COBY system.
Storage layer for the multi-exchange data aggregation system.
Provides TimescaleDB integration, connection pooling, and schema management.
"""
from .timescale_manager import TimescaleManager
from .connection_pool import DatabaseConnectionPool
from .connection_pool import ConnectionPoolManager
from .schema import SchemaManager
from .storage_manager import StorageManager
__all__ = [
'TimescaleManager',
'DatabaseConnectionPool'
]
__all__ = ['TimescaleManager', 'ConnectionPoolManager', 'SchemaManager', 'StorageManager']

View File

@ -1,140 +1,219 @@
"""
Database connection pool management for TimescaleDB.
Database connection pool management with health monitoring and automatic recovery.
"""
import asyncio
import asyncpg
import logging
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
from ..config import config
from ..utils.logging import get_logger
from ..utils.exceptions import StorageError
from datetime import datetime, timedelta
import asyncpg
from asyncpg import Pool
logger = get_logger(__name__)
from ..config import Config
from ..utils.exceptions import ConnectionError
logger = logging.getLogger(__name__)
class DatabaseConnectionPool:
"""Manages database connection pool for TimescaleDB"""
class ConnectionPoolManager:
"""Manages database connection pools with health monitoring and recovery."""
def __init__(self):
self._pool: Optional[asyncpg.Pool] = None
self._is_initialized = False
def __init__(self, config: Config):
self.config = config
self.pool: Optional[Pool] = None
self._connection_string = self._build_connection_string()
self._health_check_interval = 30 # seconds
self._health_check_task: Optional[asyncio.Task] = None
self._last_health_check = datetime.utcnow()
self._connection_failures = 0
self._max_failures = 5
def _build_connection_string(self) -> str:
"""Build PostgreSQL connection string from config."""
return (
f"postgresql://{self.config.database.user}:{self.config.database.password}"
f"@{self.config.database.host}:{self.config.database.port}/{self.config.database.name}"
)
async def initialize(self) -> None:
"""Initialize the connection pool"""
if self._is_initialized:
return
"""Initialize connection pool with health monitoring."""
try:
# Build connection string
dsn = (
f"postgresql://{config.database.user}:{config.database.password}"
f"@{config.database.host}:{config.database.port}/{config.database.name}"
)
logger.info("Creating database connection pool...")
# Create connection pool
self._pool = await asyncpg.create_pool(
dsn,
self.pool = await asyncpg.create_pool(
self._connection_string,
min_size=5,
max_size=config.database.pool_size,
max_queries=50000,
max_inactive_connection_lifetime=300,
command_timeout=config.database.pool_timeout,
max_size=self.config.database.pool_size,
command_timeout=60,
server_settings={
'search_path': config.database.schema,
'timezone': 'UTC'
}
'jit': 'off',
'timezone': 'UTC',
'statement_timeout': '30s',
'idle_in_transaction_session_timeout': '60s'
},
init=self._init_connection
)
self._is_initialized = True
logger.info(f"Database connection pool initialized with {config.database.pool_size} connections")
# Test initial connection
await self._test_connection()
# Test connection
await self.health_check()
# Start health monitoring
self._health_check_task = asyncio.create_task(self._health_monitor())
logger.info(f"Database connection pool initialized with {self.config.db_min_connections}-{self.config.db_max_connections} connections")
except Exception as e:
logger.error(f"Failed to initialize database connection pool: {e}")
raise StorageError(f"Database connection failed: {e}", "DB_INIT_ERROR")
logger.error(f"Failed to initialize connection pool: {e}")
raise ConnectionError(f"Connection pool initialization failed: {e}")
async def close(self) -> None:
"""Close the connection pool"""
if self._pool:
await self._pool.close()
self._pool = None
self._is_initialized = False
logger.info("Database connection pool closed")
@asynccontextmanager
async def get_connection(self):
"""Get a database connection from the pool"""
if not self._is_initialized:
await self.initialize()
if not self._pool:
raise StorageError("Connection pool not initialized", "POOL_NOT_READY")
async with self._pool.acquire() as connection:
try:
yield connection
except Exception as e:
logger.error(f"Database operation failed: {e}")
raise
@asynccontextmanager
async def get_transaction(self):
"""Get a database transaction"""
async with self.get_connection() as conn:
async with conn.transaction():
yield conn
async def execute_query(self, query: str, *args) -> Any:
"""Execute a query and return results"""
async with self.get_connection() as conn:
return await conn.fetch(query, *args)
async def execute_command(self, command: str, *args) -> str:
"""Execute a command and return status"""
async with self.get_connection() as conn:
return await conn.execute(command, *args)
async def execute_many(self, command: str, args_list) -> None:
"""Execute a command multiple times with different arguments"""
async with self.get_connection() as conn:
await conn.executemany(command, args_list)
async def health_check(self) -> bool:
"""Check database health"""
async def _init_connection(self, conn: asyncpg.Connection) -> None:
"""Initialize individual database connections."""
try:
async with self.get_connection() as conn:
result = await conn.fetchval("SELECT 1")
if result == 1:
logger.debug("Database health check passed")
return True
else:
logger.warning("Database health check returned unexpected result")
return False
# Set connection-specific settings
await conn.execute("SET timezone = 'UTC'")
await conn.execute("SET statement_timeout = '30s'")
# Test TimescaleDB extension
result = await conn.fetchval("SELECT extname FROM pg_extension WHERE extname = 'timescaledb'")
if not result:
logger.warning("TimescaleDB extension not found in database")
except Exception as e:
logger.error(f"Database health check failed: {e}")
logger.error(f"Failed to initialize connection: {e}")
raise
async def _test_connection(self) -> bool:
"""Test database connection health."""
try:
async with self.pool.acquire() as conn:
await conn.execute('SELECT 1')
self._connection_failures = 0
return True
except Exception as e:
self._connection_failures += 1
logger.error(f"Connection test failed (attempt {self._connection_failures}): {e}")
if self._connection_failures >= self._max_failures:
logger.critical("Maximum connection failures reached, attempting pool recreation")
await self._recreate_pool()
return False
async def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
if not self._pool:
return {}
async def _recreate_pool(self) -> None:
"""Recreate connection pool after failures."""
try:
if self.pool:
await self.pool.close()
self.pool = None
# Wait before recreating
await asyncio.sleep(5)
self.pool = await asyncpg.create_pool(
self._connection_string,
min_size=5,
max_size=self.config.database.pool_size,
command_timeout=60,
server_settings={
'jit': 'off',
'timezone': 'UTC'
},
init=self._init_connection
)
self._connection_failures = 0
logger.info("Connection pool recreated successfully")
except Exception as e:
logger.error(f"Failed to recreate connection pool: {e}")
# Will retry on next health check
async def _health_monitor(self) -> None:
"""Background task to monitor connection pool health."""
while True:
try:
await asyncio.sleep(self._health_check_interval)
if self.pool:
await self._test_connection()
self._last_health_check = datetime.utcnow()
# Log pool statistics periodically
if datetime.utcnow().minute % 5 == 0: # Every 5 minutes
stats = self.get_pool_stats()
logger.debug(f"Connection pool stats: {stats}")
except asyncio.CancelledError:
logger.info("Health monitor task cancelled")
break
except Exception as e:
logger.error(f"Health monitor error: {e}")
async def close(self) -> None:
"""Close connection pool and stop monitoring."""
if self._health_check_task:
self._health_check_task.cancel()
try:
await self._health_check_task
except asyncio.CancelledError:
pass
if self.pool:
await self.pool.close()
logger.info("Database connection pool closed")
def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics."""
if not self.pool:
return {"status": "not_initialized"}
return {
'size': self._pool.get_size(),
'min_size': self._pool.get_min_size(),
'max_size': self._pool.get_max_size(),
'idle_size': self._pool.get_idle_size(),
'is_closing': self._pool.is_closing()
"status": "active",
"size": self.pool.get_size(),
"max_size": self.pool.get_max_size(),
"min_size": self.pool.get_min_size(),
"connection_failures": self._connection_failures,
"last_health_check": self._last_health_check.isoformat(),
"health_check_interval": self._health_check_interval
}
@property
def is_initialized(self) -> bool:
"""Check if pool is initialized"""
return self._is_initialized
# Global connection pool instance
db_pool = DatabaseConnectionPool()
def is_healthy(self) -> bool:
"""Check if connection pool is healthy."""
if not self.pool:
return False
# Check if health check is recent
time_since_check = datetime.utcnow() - self._last_health_check
if time_since_check > timedelta(seconds=self._health_check_interval * 2):
return False
# Check failure count
return self._connection_failures < self._max_failures
async def acquire(self):
"""Acquire a connection from the pool."""
if not self.pool:
raise ConnectionError("Connection pool not initialized")
return self.pool.acquire()
async def execute(self, query: str, *args) -> None:
"""Execute a query using a pooled connection."""
async with self.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args) -> list:
"""Fetch multiple rows using a pooled connection."""
async with self.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch a single row using a pooled connection."""
async with self.acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(self, query: str, *args):
"""Fetch a single value using a pooled connection."""
async with self.acquire() as conn:
return await conn.fetchval(query, *args)

View File

@ -1,29 +1,103 @@
"""
Database schema management for TimescaleDB.
Database schema management and migration system.
Handles schema versioning, migrations, and database structure updates.
"""
from typing import List
from ..utils.logging import get_logger
import logging
from typing import Dict, List, Optional
from datetime import datetime
import asyncpg
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
class DatabaseSchema:
"""Manages database schema creation and migrations"""
class SchemaManager:
"""Manages database schema versions and migrations."""
@staticmethod
def get_schema_creation_queries() -> List[str]:
"""Get list of queries to create the database schema"""
return [
# Create TimescaleDB extension
"CREATE EXTENSION IF NOT EXISTS timescaledb;",
# Create schema
"CREATE SCHEMA IF NOT EXISTS market_data;",
def __init__(self, connection_pool):
self.pool = connection_pool
self.current_version = "1.0.0"
async def initialize_schema_tracking(self) -> None:
"""Initialize schema version tracking table."""
try:
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(20) PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
description TEXT,
checksum VARCHAR(64)
);
""")
# Record initial schema version
await conn.execute("""
INSERT INTO schema_migrations (version, description)
VALUES ($1, $2)
ON CONFLICT (version) DO NOTHING
""", self.current_version, "Initial schema setup")
logger.info("Schema tracking initialized")
except Exception as e:
logger.error(f"Failed to initialize schema tracking: {e}")
raise
async def get_current_schema_version(self) -> Optional[str]:
"""Get the current schema version from database."""
try:
async with self.pool.acquire() as conn:
version = await conn.fetchval("""
SELECT version FROM schema_migrations
ORDER BY applied_at DESC LIMIT 1
""")
return version
except Exception as e:
logger.error(f"Failed to get schema version: {e}")
return None
async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool:
"""Apply a database migration."""
try:
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check if migration already applied
existing = await conn.fetchval("""
SELECT version FROM schema_migrations WHERE version = $1
""", version)
if existing:
logger.info(f"Migration {version} already applied")
return True
# Apply migration commands
for sql_command in sql_commands:
await conn.execute(sql_command)
# Record migration
await conn.execute("""
INSERT INTO schema_migrations (version, description)
VALUES ($1, $2)
""", version, description)
logger.info(f"Applied migration {version}: {description}")
return True
except Exception as e:
logger.error(f"Failed to apply migration {version}: {e}")
return False
async def create_base_schema(self) -> bool:
"""Create the base database schema with all tables and indexes."""
migration_commands = [
# Enable TimescaleDB extension
"CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;",
# Order book snapshots table
"""
CREATE TABLE IF NOT EXISTS market_data.order_book_snapshots (
CREATE TABLE IF NOT EXISTS order_book_snapshots (
id BIGSERIAL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(20) NOT NULL,
@ -35,14 +109,13 @@ class DatabaseSchema:
spread DECIMAL(20,8),
bid_volume DECIMAL(30,8),
ask_volume DECIMAL(30,8),
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, symbol, exchange)
);
""",
# Trade events table
"""
CREATE TABLE IF NOT EXISTS market_data.trade_events (
CREATE TABLE IF NOT EXISTS trade_events (
id BIGSERIAL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(20) NOT NULL,
@ -51,14 +124,13 @@ class DatabaseSchema:
size DECIMAL(30,8) NOT NULL,
side VARCHAR(4) NOT NULL,
trade_id VARCHAR(100) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, symbol, exchange, trade_id)
);
""",
# Aggregated heatmap data table
# Heatmap data table
"""
CREATE TABLE IF NOT EXISTS market_data.heatmap_data (
CREATE TABLE IF NOT EXISTS heatmap_data (
symbol VARCHAR(20) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
bucket_size DECIMAL(10,2) NOT NULL,
@ -66,15 +138,13 @@ class DatabaseSchema:
volume DECIMAL(30,8) NOT NULL,
side VARCHAR(3) NOT NULL,
exchange_count INTEGER NOT NULL,
exchanges JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side)
);
""",
# OHLCV data table
"""
CREATE TABLE IF NOT EXISTS market_data.ohlcv_data (
CREATE TABLE IF NOT EXISTS ohlcv_data (
symbol VARCHAR(20) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
timeframe VARCHAR(10) NOT NULL,
@ -84,173 +154,185 @@ class DatabaseSchema:
close_price DECIMAL(20,8) NOT NULL,
volume DECIMAL(30,8) NOT NULL,
trade_count INTEGER,
vwap DECIMAL(20,8),
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, symbol, timeframe)
);
""",
# Exchange status tracking table
"""
CREATE TABLE IF NOT EXISTS market_data.exchange_status (
exchange VARCHAR(20) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
status VARCHAR(20) NOT NULL,
last_message_time TIMESTAMPTZ,
error_message TEXT,
connection_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, exchange)
);
""",
# System metrics table
"""
CREATE TABLE IF NOT EXISTS market_data.system_metrics (
metric_name VARCHAR(50) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
value DECIMAL(20,8) NOT NULL,
labels JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (timestamp, metric_name)
);
"""
]
return await self.apply_migration(
"1.0.0",
"Create base schema with core tables",
migration_commands
)
@staticmethod
def get_hypertable_creation_queries() -> List[str]:
"""Get queries to create hypertables"""
return [
"SELECT create_hypertable('market_data.order_book_snapshots', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('market_data.trade_events', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('market_data.heatmap_data', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('market_data.ohlcv_data', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('market_data.exchange_status', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('market_data.system_metrics', 'timestamp', if_not_exists => TRUE);"
async def create_hypertables(self) -> bool:
"""Convert tables to TimescaleDB hypertables."""
hypertable_commands = [
"SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('heatmap_data', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);"
]
return await self.apply_migration(
"1.0.1",
"Convert tables to hypertables",
hypertable_commands
)
@staticmethod
def get_index_creation_queries() -> List[str]:
"""Get queries to create indexes"""
return [
# Order book indexes
"CREATE INDEX IF NOT EXISTS idx_order_book_symbol_exchange ON market_data.order_book_snapshots (symbol, exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_order_book_timestamp ON market_data.order_book_snapshots (timestamp DESC);",
async def create_indexes(self) -> bool:
"""Create performance indexes."""
index_commands = [
# Order book snapshots indexes
"CREATE INDEX IF NOT EXISTS idx_obs_symbol_time ON order_book_snapshots (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_obs_exchange_time ON order_book_snapshots (exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange ON order_book_snapshots (symbol, exchange, timestamp DESC);",
# Trade events indexes
"CREATE INDEX IF NOT EXISTS idx_trade_events_symbol_exchange ON market_data.trade_events (symbol, exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trade_events_timestamp ON market_data.trade_events (timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trade_events_price ON market_data.trade_events (symbol, price, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_symbol_time ON trade_events (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_exchange_time ON trade_events (exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_price ON trade_events (symbol, price, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_side ON trade_events (symbol, side, timestamp DESC);",
# Heatmap data indexes
"CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_bucket ON market_data.heatmap_data (symbol, bucket_size, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_heatmap_timestamp ON market_data.heatmap_data (timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_time ON heatmap_data (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_heatmap_bucket ON heatmap_data (symbol, bucket_size, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_heatmap_side ON heatmap_data (symbol, side, timestamp DESC);",
# OHLCV data indexes
"CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON market_data.ohlcv_data (symbol, timeframe, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_ohlcv_timestamp ON market_data.ohlcv_data (timestamp DESC);",
# Exchange status indexes
"CREATE INDEX IF NOT EXISTS idx_exchange_status_exchange ON market_data.exchange_status (exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_exchange_status_timestamp ON market_data.exchange_status (timestamp DESC);",
# System metrics indexes
"CREATE INDEX IF NOT EXISTS idx_system_metrics_name ON market_data.system_metrics (metric_name, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_system_metrics_timestamp ON market_data.system_metrics (timestamp DESC);"
# OHLCV indexes
"CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON ohlcv_data (symbol, timeframe, timestamp DESC);"
]
return await self.apply_migration(
"1.0.2",
"Create performance indexes",
index_commands
)
@staticmethod
def get_retention_policy_queries() -> List[str]:
"""Get queries to create retention policies"""
return [
"SELECT add_retention_policy('market_data.order_book_snapshots', INTERVAL '90 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('market_data.trade_events', INTERVAL '90 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('market_data.heatmap_data', INTERVAL '90 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('market_data.ohlcv_data', INTERVAL '365 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('market_data.exchange_status', INTERVAL '30 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('market_data.system_metrics', INTERVAL '30 days', if_not_exists => TRUE);"
async def setup_retention_policies(self) -> bool:
"""Set up data retention policies."""
retention_commands = [
"SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('heatmap_data', INTERVAL '60 days', if_not_exists => TRUE);",
"SELECT add_retention_policy('ohlcv_data', INTERVAL '1 year', if_not_exists => TRUE);"
]
return await self.apply_migration(
"1.0.3",
"Setup data retention policies",
retention_commands
)
@staticmethod
def get_continuous_aggregate_queries() -> List[str]:
"""Get queries to create continuous aggregates"""
return [
# Hourly OHLCV aggregate
async def create_continuous_aggregates(self) -> bool:
"""Create continuous aggregates for better query performance."""
aggregate_commands = [
# 1-minute OHLCV aggregates from trades
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS market_data.hourly_ohlcv
CREATE MATERIALIZED VIEW IF NOT EXISTS trades_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', timestamp) AS bucket,
symbol,
exchange,
time_bucket('1 hour', timestamp) AS hour,
first(price, timestamp) AS open_price,
max(price) AS high_price,
min(price) AS low_price,
last(price, timestamp) AS close_price,
sum(size) AS volume,
count(*) AS trade_count,
avg(price) AS vwap
FROM market_data.trade_events
GROUP BY symbol, exchange, hour
WITH NO DATA;
count(*) AS trade_count
FROM trade_events
GROUP BY bucket, symbol, exchange;
""",
# Add refresh policy for continuous aggregate
# 5-minute order book statistics
"""
SELECT add_continuous_aggregate_policy('market_data.hourly_ohlcv',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour',
if_not_exists => TRUE);
"""
]
@staticmethod
def get_view_creation_queries() -> List[str]:
"""Get queries to create views"""
return [
# Latest order books view
"""
CREATE OR REPLACE VIEW market_data.latest_order_books AS
SELECT DISTINCT ON (symbol, exchange)
CREATE MATERIALIZED VIEW IF NOT EXISTS orderbook_stats_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', timestamp) AS bucket,
symbol,
exchange,
timestamp,
bids,
asks,
mid_price,
spread,
bid_volume,
ask_volume
FROM market_data.order_book_snapshots
ORDER BY symbol, exchange, timestamp DESC;
""",
# Latest heatmaps view
"""
CREATE OR REPLACE VIEW market_data.latest_heatmaps AS
SELECT DISTINCT ON (symbol, bucket_size, price_bucket, side)
symbol,
bucket_size,
price_bucket,
side,
timestamp,
volume,
exchange_count,
exchanges
FROM market_data.heatmap_data
ORDER BY symbol, bucket_size, price_bucket, side, timestamp DESC;
avg(mid_price) AS avg_mid_price,
avg(spread) AS avg_spread,
avg(bid_volume) AS avg_bid_volume,
avg(ask_volume) AS avg_ask_volume,
count(*) AS snapshot_count
FROM order_book_snapshots
WHERE mid_price IS NOT NULL
GROUP BY bucket, symbol, exchange;
"""
]
return await self.apply_migration(
"1.0.4",
"Create continuous aggregates",
aggregate_commands
)
@staticmethod
def get_all_creation_queries() -> List[str]:
"""Get all schema creation queries in order"""
queries = []
queries.extend(DatabaseSchema.get_schema_creation_queries())
queries.extend(DatabaseSchema.get_hypertable_creation_queries())
queries.extend(DatabaseSchema.get_index_creation_queries())
queries.extend(DatabaseSchema.get_retention_policy_queries())
queries.extend(DatabaseSchema.get_continuous_aggregate_queries())
queries.extend(DatabaseSchema.get_view_creation_queries())
return queries
async def setup_complete_schema(self) -> bool:
"""Set up the complete database schema with all components."""
try:
# Initialize schema tracking
await self.initialize_schema_tracking()
# Apply all migrations in order
migrations = [
self.create_base_schema,
self.create_hypertables,
self.create_indexes,
self.setup_retention_policies,
self.create_continuous_aggregates
]
for migration in migrations:
success = await migration()
if not success:
logger.error(f"Failed to apply migration: {migration.__name__}")
return False
logger.info("Complete database schema setup successful")
return True
except Exception as e:
logger.error(f"Failed to setup complete schema: {e}")
return False
async def get_schema_info(self) -> Dict:
"""Get information about the current schema state."""
try:
async with self.pool.acquire() as conn:
# Get applied migrations
migrations = await conn.fetch("""
SELECT version, applied_at, description
FROM schema_migrations
ORDER BY applied_at
""")
# Get table information
tables = await conn.fetch("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN ('order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data')
""")
# Get hypertable information
hypertables = await conn.fetch("""
SELECT hypertable_name, num_chunks, compression_enabled
FROM timescaledb_information.hypertables
WHERE hypertable_schema = 'public'
""")
return {
"migrations": [dict(m) for m in migrations],
"tables": [dict(t) for t in tables],
"hypertables": [dict(h) for h in hypertables]
}
except Exception as e:
logger.error(f"Failed to get schema info: {e}")
return {}

View File

@ -0,0 +1,270 @@
"""
Comprehensive storage manager that integrates TimescaleDB, connection pooling, and schema management.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from .timescale_manager import TimescaleManager
from .connection_pool import ConnectionPoolManager
from .schema import SchemaManager
from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData
from ..config import Config
from ..utils.exceptions import DatabaseError, ConnectionError
logger = logging.getLogger(__name__)
class StorageManager:
"""Unified storage manager for all database operations."""
def __init__(self, config: Config):
self.config = config
self.connection_pool = ConnectionPoolManager(config)
self.schema_manager = SchemaManager(self.connection_pool)
self.timescale_manager = TimescaleManager(config)
self._initialized = False
async def initialize(self) -> None:
"""Initialize all storage components."""
try:
logger.info("Initializing storage manager...")
# Initialize connection pool
await self.connection_pool.initialize()
# Set up database schema
await self.schema_manager.setup_complete_schema()
# Initialize TimescaleDB manager with existing pool
self.timescale_manager.pool = self.connection_pool.pool
self._initialized = True
logger.info("Storage manager initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize storage manager: {e}")
raise ConnectionError(f"Storage initialization failed: {e}")
async def close(self) -> None:
"""Close all storage connections."""
if self.timescale_manager:
await self.timescale_manager.close()
if self.connection_pool:
await self.connection_pool.close()
logger.info("Storage manager closed")
def is_healthy(self) -> bool:
"""Check if storage system is healthy."""
return self._initialized and self.connection_pool.is_healthy()
# Order book operations
async def store_orderbook(self, data: OrderBookSnapshot) -> bool:
"""Store order book snapshot."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.store_orderbook(data)
async def batch_store_orderbooks(self, data_list: List[OrderBookSnapshot]) -> bool:
"""Store multiple order book snapshots."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.batch_store_orderbooks(data_list)
async def get_latest_orderbook(self, symbol: str, exchange: Optional[str] = None) -> Optional[Dict]:
"""Get latest order book snapshot."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.get_latest_orderbook(symbol, exchange)
# Trade operations
async def store_trade(self, data: TradeEvent) -> bool:
"""Store trade event."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.store_trade(data)
async def batch_store_trades(self, data_list: List[TradeEvent]) -> bool:
"""Store multiple trade events."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.batch_store_trades(data_list)
# Heatmap operations
async def store_heatmap_data(self, data: HeatmapData) -> bool:
"""Store heatmap data."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.store_heatmap_data(data)
async def get_heatmap_data(self, symbol: str, bucket_size: float,
start: Optional[datetime] = None) -> List[Dict]:
"""Get heatmap data for visualization."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.get_heatmap_data(symbol, bucket_size, start)
# Historical data operations
async def get_historical_data(self, symbol: str, start: datetime, end: datetime,
data_type: str = 'orderbook') -> List[Dict]:
"""Get historical data for a symbol within time range."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
return await self.timescale_manager.get_historical_data(symbol, start, end, data_type)
# System operations
async def get_system_stats(self) -> Dict[str, Any]:
"""Get comprehensive system statistics."""
if not self._initialized:
return {"status": "not_initialized"}
try:
# Get database stats
db_stats = await self.timescale_manager.get_database_stats()
# Get connection pool stats
pool_stats = self.connection_pool.get_pool_stats()
# Get schema info
schema_info = await self.schema_manager.get_schema_info()
return {
"status": "healthy" if self.is_healthy() else "unhealthy",
"database": db_stats,
"connection_pool": pool_stats,
"schema": schema_info,
"initialized": self._initialized
}
except Exception as e:
logger.error(f"Failed to get system stats: {e}")
return {"status": "error", "error": str(e)}
async def health_check(self) -> Dict[str, Any]:
"""Perform comprehensive health check."""
health_status = {
"healthy": True,
"components": {},
"timestamp": datetime.utcnow().isoformat()
}
try:
# Check connection pool
pool_healthy = self.connection_pool.is_healthy()
health_status["components"]["connection_pool"] = {
"healthy": pool_healthy,
"stats": self.connection_pool.get_pool_stats()
}
# Test database connection
try:
async with self.connection_pool.acquire() as conn:
await conn.execute('SELECT 1')
health_status["components"]["database"] = {"healthy": True}
except Exception as e:
health_status["components"]["database"] = {
"healthy": False,
"error": str(e)
}
health_status["healthy"] = False
# Check schema version
try:
current_version = await self.schema_manager.get_current_schema_version()
health_status["components"]["schema"] = {
"healthy": True,
"version": current_version
}
except Exception as e:
health_status["components"]["schema"] = {
"healthy": False,
"error": str(e)
}
health_status["healthy"] = False
# Overall health
health_status["healthy"] = all(
comp.get("healthy", False)
for comp in health_status["components"].values()
)
except Exception as e:
logger.error(f"Health check failed: {e}")
health_status["healthy"] = False
health_status["error"] = str(e)
return health_status
# Maintenance operations
async def cleanup_old_data(self, days: int = 30) -> Dict[str, int]:
"""Clean up old data beyond retention period."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
try:
cutoff_date = datetime.utcnow() - timedelta(days=days)
cleanup_stats = {}
async with self.connection_pool.acquire() as conn:
# Clean up old order book snapshots
result = await conn.execute("""
DELETE FROM order_book_snapshots
WHERE timestamp < $1
""", cutoff_date)
cleanup_stats["order_book_snapshots"] = int(result.split()[-1])
# Clean up old trade events
result = await conn.execute("""
DELETE FROM trade_events
WHERE timestamp < $1
""", cutoff_date)
cleanup_stats["trade_events"] = int(result.split()[-1])
# Clean up old heatmap data
result = await conn.execute("""
DELETE FROM heatmap_data
WHERE timestamp < $1
""", cutoff_date)
cleanup_stats["heatmap_data"] = int(result.split()[-1])
logger.info(f"Cleaned up old data: {cleanup_stats}")
return cleanup_stats
except Exception as e:
logger.error(f"Failed to cleanup old data: {e}")
raise DatabaseError(f"Data cleanup failed: {e}")
async def optimize_database(self) -> bool:
"""Run database optimization tasks."""
if not self._initialized:
raise DatabaseError("Storage manager not initialized")
try:
async with self.connection_pool.acquire() as conn:
# Analyze tables for better query planning
tables = ['order_book_snapshots', 'trade_events', 'heatmap_data', 'ohlcv_data']
for table in tables:
await conn.execute(f"ANALYZE {table}")
# Vacuum tables to reclaim space
for table in tables:
await conn.execute(f"VACUUM {table}")
logger.info("Database optimization completed")
return True
except Exception as e:
logger.error(f"Database optimization failed: {e}")
return False

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,3 @@
"""
Test suite for the COBY system.
Test modules for the COBY multi-exchange data aggregation system.
"""

View File

@ -0,0 +1,101 @@
"""
Test script for TimescaleDB integration.
Tests database connection, schema creation, and basic operations.
"""
import asyncio
import logging
from datetime import datetime
from decimal import Decimal
from ..config import Config
from ..storage.storage_manager import StorageManager
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def test_timescale_integration():
"""Test TimescaleDB integration with basic operations."""
# Initialize configuration
config = Config()
# Create storage manager
storage = StorageManager(config)
try:
# Initialize storage
logger.info("Initializing storage manager...")
await storage.initialize()
# Test health check
logger.info("Running health check...")
health = await storage.health_check()
logger.info(f"Health status: {health}")
# Test schema info
logger.info("Getting schema information...")
schema_info = await storage.get_system_stats()
logger.info(f"Schema info: {schema_info}")
# Test order book storage
logger.info("Testing order book storage...")
test_orderbook = OrderBookSnapshot(
symbol="BTCUSDT",
exchange="binance",
timestamp=datetime.utcnow(),
bids=[
PriceLevel(price=50000.0, size=1.5),
PriceLevel(price=49999.0, size=2.0),
],
asks=[
PriceLevel(price=50001.0, size=1.2),
PriceLevel(price=50002.0, size=0.8),
],
sequence_id=12345
)
success = await storage.store_orderbook(test_orderbook)
logger.info(f"Order book storage result: {success}")
# Test trade storage
logger.info("Testing trade storage...")
test_trade = TradeEvent(
symbol="BTCUSDT",
exchange="binance",
timestamp=datetime.utcnow(),
price=50000.5,
size=0.1,
side="buy",
trade_id="test_trade_123"
)
success = await storage.store_trade(test_trade)
logger.info(f"Trade storage result: {success}")
# Test data retrieval
logger.info("Testing data retrieval...")
latest_orderbook = await storage.get_latest_orderbook("BTCUSDT", "binance")
logger.info(f"Latest order book: {latest_orderbook is not None}")
# Test system stats
logger.info("Getting system statistics...")
stats = await storage.get_system_stats()
logger.info(f"System stats: {stats}")
logger.info("All tests completed successfully!")
except Exception as e:
logger.error(f"Test failed: {e}")
raise
finally:
# Clean up
await storage.close()
if __name__ == "__main__":
asyncio.run(test_timescale_integration())