uni data storage

This commit is contained in:
Dobromir Popov
2025-10-20 09:48:59 +03:00
parent 002d0f7858
commit f464a412dc
12 changed files with 2905 additions and 181 deletions

View File

@@ -0,0 +1,553 @@
"""
Unified Data Storage Schema Manager
Extends COBY schema with additional tables for unified data storage system.
"""
import logging
from typing import Dict, List, Optional
from datetime import datetime
import asyncpg
logger = logging.getLogger(__name__)
class UnifiedStorageSchemaManager:
"""
Manages unified data storage schema with TimescaleDB.
Extends existing COBY schema with additional tables for:
- Enhanced OHLCV with technical indicators
- Order book 1s aggregations with price buckets
- Multi-timeframe order book imbalances
"""
def __init__(self, connection_pool):
self.pool = connection_pool
self.current_version = "2.0.0"
async def initialize_schema_tracking(self) -> None:
"""Initialize schema version tracking table."""
try:
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(20) PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
description TEXT,
checksum VARCHAR(64)
);
""")
logger.info("Schema tracking initialized")
except Exception as e:
logger.error(f"Failed to initialize schema tracking: {e}")
raise
async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool:
"""Apply a database migration."""
try:
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check if migration already applied
existing = await conn.fetchval("""
SELECT version FROM schema_migrations WHERE version = $1
""", version)
if existing:
logger.info(f"Migration {version} already applied")
return True
# Apply migration commands
for sql_command in sql_commands:
try:
await conn.execute(sql_command)
except Exception as cmd_error:
logger.error(f"Error executing command: {sql_command[:100]}... Error: {cmd_error}")
raise
# Record migration
await conn.execute("""
INSERT INTO schema_migrations (version, description)
VALUES ($1, $2)
""", version, description)
logger.info(f"Applied migration {version}: {description}")
return True
except Exception as e:
logger.error(f"Failed to apply migration {version}: {e}")
return False
async def create_enhanced_ohlcv_table(self) -> bool:
"""Create enhanced OHLCV table with technical indicators."""
migration_commands = [
"""
CREATE TABLE IF NOT EXISTS ohlcv_data (
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
timeframe VARCHAR(10) NOT NULL,
open_price DECIMAL(20,8) NOT NULL,
high_price DECIMAL(20,8) NOT NULL,
low_price DECIMAL(20,8) NOT NULL,
close_price DECIMAL(20,8) NOT NULL,
volume DECIMAL(30,8) NOT NULL,
trade_count INTEGER DEFAULT 0,
-- Technical Indicators (pre-calculated)
rsi_14 DECIMAL(10,4),
macd DECIMAL(20,8),
macd_signal DECIMAL(20,8),
macd_histogram DECIMAL(20,8),
bb_upper DECIMAL(20,8),
bb_middle DECIMAL(20,8),
bb_lower DECIMAL(20,8),
ema_12 DECIMAL(20,8),
ema_26 DECIMAL(20,8),
sma_20 DECIMAL(20,8),
PRIMARY KEY (timestamp, symbol, timeframe)
);
"""
]
return await self.apply_migration(
"2.0.0",
"Create enhanced OHLCV table with technical indicators",
migration_commands
)
async def create_order_book_tables(self) -> bool:
"""Create order book related tables."""
migration_commands = [
# Order book snapshots
"""
CREATE TABLE IF NOT EXISTS order_book_snapshots (
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(20) NOT NULL DEFAULT 'binance',
bids JSONB NOT NULL,
asks JSONB NOT NULL,
mid_price DECIMAL(20,8),
spread DECIMAL(20,8),
bid_volume DECIMAL(30,8),
ask_volume DECIMAL(30,8),
sequence_id BIGINT,
PRIMARY KEY (timestamp, symbol, exchange)
);
""",
# Order book 1s aggregations with price buckets
"""
CREATE TABLE IF NOT EXISTS order_book_1s_agg (
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
price_bucket DECIMAL(20,2) NOT NULL,
bid_volume DECIMAL(30,8) DEFAULT 0,
ask_volume DECIMAL(30,8) DEFAULT 0,
bid_count INTEGER DEFAULT 0,
ask_count INTEGER DEFAULT 0,
imbalance DECIMAL(10,6) DEFAULT 0,
PRIMARY KEY (timestamp, symbol, price_bucket)
);
""",
# Multi-timeframe order book imbalances
"""
CREATE TABLE IF NOT EXISTS order_book_imbalances (
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
imbalance_1s DECIMAL(10,6) DEFAULT 0,
imbalance_5s DECIMAL(10,6) DEFAULT 0,
imbalance_15s DECIMAL(10,6) DEFAULT 0,
imbalance_60s DECIMAL(10,6) DEFAULT 0,
volume_imbalance_1s DECIMAL(10,6) DEFAULT 0,
volume_imbalance_5s DECIMAL(10,6) DEFAULT 0,
volume_imbalance_15s DECIMAL(10,6) DEFAULT 0,
volume_imbalance_60s DECIMAL(10,6) DEFAULT 0,
price_range DECIMAL(10,2),
PRIMARY KEY (timestamp, symbol)
);
"""
]
return await self.apply_migration(
"2.0.1",
"Create order book tables (snapshots, 1s aggregations, imbalances)",
migration_commands
)
async def create_trade_events_table(self) -> bool:
"""Create trade events table."""
migration_commands = [
"""
CREATE TABLE IF NOT EXISTS trade_events (
timestamp TIMESTAMPTZ NOT NULL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(20) NOT NULL DEFAULT 'binance',
price DECIMAL(20,8) NOT NULL,
size DECIMAL(30,8) NOT NULL,
side VARCHAR(4) NOT NULL,
trade_id VARCHAR(100) NOT NULL,
is_buyer_maker BOOLEAN,
PRIMARY KEY (timestamp, symbol, exchange, trade_id)
);
"""
]
return await self.apply_migration(
"2.0.2",
"Create trade events table",
migration_commands
)
async def create_hypertables(self) -> bool:
"""Convert tables to TimescaleDB hypertables."""
hypertable_commands = [
"SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('order_book_1s_agg', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('order_book_imbalances', 'timestamp', if_not_exists => TRUE);",
"SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);"
]
return await self.apply_migration(
"2.0.3",
"Convert tables to TimescaleDB hypertables",
hypertable_commands
)
async def create_indexes(self) -> bool:
"""Create performance indexes."""
index_commands = [
# OHLCV indexes
"CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_tf_ts ON ohlcv_data (symbol, timeframe, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_ts ON ohlcv_data (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_ohlcv_tf_ts ON ohlcv_data (timeframe, timestamp DESC);",
# Order book snapshots indexes
"CREATE INDEX IF NOT EXISTS idx_obs_symbol_ts ON order_book_snapshots (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_obs_exchange_ts ON order_book_snapshots (exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange_ts ON order_book_snapshots (symbol, exchange, timestamp DESC);",
# Order book 1s aggregation indexes
"CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_ts ON order_book_1s_agg (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_bucket_ts ON order_book_1s_agg (symbol, price_bucket, timestamp DESC);",
# Order book imbalances indexes
"CREATE INDEX IF NOT EXISTS idx_obi_symbol_ts ON order_book_imbalances (symbol, timestamp DESC);",
# Trade events indexes
"CREATE INDEX IF NOT EXISTS idx_trades_symbol_ts ON trade_events (symbol, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_exchange_ts ON trade_events (exchange, timestamp DESC);",
"CREATE INDEX IF NOT EXISTS idx_trades_symbol_side_ts ON trade_events (symbol, side, timestamp DESC);"
]
return await self.apply_migration(
"2.0.4",
"Create performance indexes",
index_commands
)
async def create_continuous_aggregates(self) -> bool:
"""Create continuous aggregates for multi-timeframe data."""
aggregate_commands = [
# 1m OHLCV from 1s data
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1m_continuous
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', timestamp) AS timestamp,
symbol,
'1m' AS timeframe,
first(open_price, timestamp) AS open_price,
max(high_price) AS high_price,
min(low_price) AS low_price,
last(close_price, timestamp) AS close_price,
sum(volume) AS volume,
sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '1s'
GROUP BY time_bucket('1 minute', timestamp), symbol
WITH NO DATA;
""",
# 5m OHLCV from 1m data
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_5m_continuous
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', timestamp) AS timestamp,
symbol,
'5m' AS timeframe,
first(open_price, timestamp) AS open_price,
max(high_price) AS high_price,
min(low_price) AS low_price,
last(close_price, timestamp) AS close_price,
sum(volume) AS volume,
sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '1m'
GROUP BY time_bucket('5 minutes', timestamp), symbol
WITH NO DATA;
""",
# 15m OHLCV from 5m data
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_15m_continuous
WITH (timescaledb.continuous) AS
SELECT
time_bucket('15 minutes', timestamp) AS timestamp,
symbol,
'15m' AS timeframe,
first(open_price, timestamp) AS open_price,
max(high_price) AS high_price,
min(low_price) AS low_price,
last(close_price, timestamp) AS close_price,
sum(volume) AS volume,
sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '5m'
GROUP BY time_bucket('15 minutes', timestamp), symbol
WITH NO DATA;
""",
# 1h OHLCV from 15m data
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1h_continuous
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', timestamp) AS timestamp,
symbol,
'1h' AS timeframe,
first(open_price, timestamp) AS open_price,
max(high_price) AS high_price,
min(low_price) AS low_price,
last(close_price, timestamp) AS close_price,
sum(volume) AS volume,
sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '15m'
GROUP BY time_bucket('1 hour', timestamp), symbol
WITH NO DATA;
""",
# 1d OHLCV from 1h data
"""
CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1d_continuous
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', timestamp) AS timestamp,
symbol,
'1d' AS timeframe,
first(open_price, timestamp) AS open_price,
max(high_price) AS high_price,
min(low_price) AS low_price,
last(close_price, timestamp) AS close_price,
sum(volume) AS volume,
sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '1h'
GROUP BY time_bucket('1 day', timestamp), symbol
WITH NO DATA;
"""
]
return await self.apply_migration(
"2.0.5",
"Create continuous aggregates for multi-timeframe OHLCV",
aggregate_commands
)
async def setup_compression_policies(self) -> bool:
"""Set up compression policies for efficient storage."""
compression_commands = [
# Compress OHLCV data older than 7 days
"SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days', if_not_exists => TRUE);",
# Compress order book snapshots older than 1 day
"SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day', if_not_exists => TRUE);",
# Compress order book 1s aggregations older than 2 days
"SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days', if_not_exists => TRUE);",
# Compress order book imbalances older than 2 days
"SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days', if_not_exists => TRUE);",
# Compress trade events older than 7 days
"SELECT add_compression_policy('trade_events', INTERVAL '7 days', if_not_exists => TRUE);"
]
return await self.apply_migration(
"2.0.6",
"Setup compression policies",
compression_commands
)
async def setup_retention_policies(self) -> bool:
"""Set up data retention policies."""
retention_commands = [
# Retain OHLCV data for 2 years
"SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years', if_not_exists => TRUE);",
# Retain order book snapshots for 30 days
"SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);",
# Retain order book 1s aggregations for 60 days
"SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days', if_not_exists => TRUE);",
# Retain order book imbalances for 60 days
"SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days', if_not_exists => TRUE);",
# Retain trade events for 90 days
"SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);"
]
return await self.apply_migration(
"2.0.7",
"Setup retention policies",
retention_commands
)
async def setup_complete_schema(self) -> bool:
"""Set up the complete unified storage schema."""
try:
logger.info("Setting up unified storage schema...")
# Initialize schema tracking
await self.initialize_schema_tracking()
# Apply all migrations in order
migrations = [
("Enhanced OHLCV table", self.create_enhanced_ohlcv_table),
("Order book tables", self.create_order_book_tables),
("Trade events table", self.create_trade_events_table),
("Hypertables", self.create_hypertables),
("Indexes", self.create_indexes),
("Continuous aggregates", self.create_continuous_aggregates),
("Compression policies", self.setup_compression_policies),
("Retention policies", self.setup_retention_policies),
]
for name, migration_func in migrations:
logger.info(f"Applying migration: {name}")
success = await migration_func()
if not success:
logger.error(f"Failed to apply migration: {name}")
return False
logger.info(f"Successfully applied migration: {name}")
logger.info("Complete unified storage schema setup successful")
return True
except Exception as e:
logger.error(f"Failed to setup complete schema: {e}")
return False
async def get_schema_info(self) -> Dict:
"""Get information about the current schema state."""
try:
async with self.pool.acquire() as conn:
# Get applied migrations
migrations = await conn.fetch("""
SELECT version, applied_at, description
FROM schema_migrations
ORDER BY applied_at
""")
# Get table information
tables = await conn.fetch("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN (
'ohlcv_data',
'order_book_snapshots',
'order_book_1s_agg',
'order_book_imbalances',
'trade_events'
)
ORDER BY tablename
""")
# Get hypertable information
hypertables = await conn.fetch("""
SELECT
hypertable_name,
num_chunks,
compression_enabled,
pg_size_pretty(total_bytes) as total_size,
pg_size_pretty(compressed_total_bytes) as compressed_size
FROM timescaledb_information.hypertables
WHERE hypertable_schema = 'public'
ORDER BY hypertable_name
""")
# Get continuous aggregates
continuous_aggs = await conn.fetch("""
SELECT
view_name,
materialization_hypertable_name,
pg_size_pretty(total_bytes) as size
FROM timescaledb_information.continuous_aggregates
WHERE view_schema = 'public'
ORDER BY view_name
""")
return {
"migrations": [dict(m) for m in migrations],
"tables": [dict(t) for t in tables],
"hypertables": [dict(h) for h in hypertables],
"continuous_aggregates": [dict(c) for c in continuous_aggs]
}
except Exception as e:
logger.error(f"Failed to get schema info: {e}")
return {}
async def verify_schema(self) -> bool:
"""Verify that all required tables and indexes exist."""
try:
async with self.pool.acquire() as conn:
# Check required tables
required_tables = [
'ohlcv_data',
'order_book_snapshots',
'order_book_1s_agg',
'order_book_imbalances',
'trade_events'
]
for table in required_tables:
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public'
AND tablename = $1
)
""", table)
if not exists:
logger.error(f"Required table missing: {table}")
return False
# Check hypertables
for table in required_tables:
is_hypertable = await conn.fetchval("""
SELECT EXISTS (
SELECT FROM timescaledb_information.hypertables
WHERE hypertable_schema = 'public'
AND hypertable_name = $1
)
""", table)
if not is_hypertable:
logger.error(f"Table is not a hypertable: {table}")
return False
logger.info("Schema verification successful")
return True
except Exception as e:
logger.error(f"Schema verification failed: {e}")
return False