Files
gogo2/scripts/setup_unified_storage.py
Dobromir Popov f464a412dc uni data storage
2025-10-20 09:48:59 +03:00

247 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
Setup script for unified data storage system.
Initializes TimescaleDB schema and verifies setup.
"""
import asyncio
import asyncpg
import logging
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.unified_storage_schema import UnifiedStorageSchemaManager
from core.config import get_config
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def create_connection_pool(config):
"""Create database connection pool."""
try:
# Build connection string
db_config = config.get('database', {})
# Default values
host = db_config.get('host', 'localhost')
port = db_config.get('port', 5432)
database = db_config.get('name', 'trading_data')
user = db_config.get('user', 'postgres')
password = db_config.get('password', 'postgres')
logger.info(f"Connecting to database: {host}:{port}/{database}")
pool = await asyncpg.create_pool(
host=host,
port=port,
database=database,
user=user,
password=password,
min_size=2,
max_size=10,
command_timeout=60
)
logger.info("Database connection pool created")
return pool
except Exception as e:
logger.error(f"Failed to create connection pool: {e}")
raise
async def verify_timescaledb(pool):
"""Verify TimescaleDB extension is available."""
try:
async with pool.acquire() as conn:
# Check if TimescaleDB extension exists
result = await conn.fetchval("""
SELECT EXISTS (
SELECT FROM pg_extension WHERE extname = 'timescaledb'
)
""")
if result:
# Get TimescaleDB version
version = await conn.fetchval("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
logger.info(f"TimescaleDB extension found (version {version})")
return True
else:
logger.warning("TimescaleDB extension not found, attempting to create...")
# Try to create extension
await conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")
logger.info("TimescaleDB extension created successfully")
return True
except Exception as e:
logger.error(f"Failed to verify TimescaleDB: {e}")
logger.error("Please ensure TimescaleDB is installed: https://docs.timescale.com/install/latest/")
return False
async def setup_schema(pool):
"""Set up the complete unified storage schema."""
try:
schema_manager = UnifiedStorageSchemaManager(pool)
logger.info("Starting schema setup...")
success = await schema_manager.setup_complete_schema()
if success:
logger.info("Schema setup completed successfully")
# Verify schema
logger.info("Verifying schema...")
verified = await schema_manager.verify_schema()
if verified:
logger.info("Schema verification passed")
# Get schema info
info = await schema_manager.get_schema_info()
logger.info("\n=== Schema Information ===")
logger.info(f"Migrations applied: {len(info.get('migrations', []))}")
logger.info(f"Tables created: {len(info.get('tables', []))}")
logger.info(f"Hypertables: {len(info.get('hypertables', []))}")
logger.info(f"Continuous aggregates: {len(info.get('continuous_aggregates', []))}")
# Display table sizes
logger.info("\n=== Table Sizes ===")
for table in info.get('tables', []):
logger.info(f" {table['tablename']}: {table['size']}")
# Display hypertables
logger.info("\n=== Hypertables ===")
for ht in info.get('hypertables', []):
logger.info(f" {ht['hypertable_name']}: {ht['num_chunks']} chunks, "
f"compression={'enabled' if ht['compression_enabled'] else 'disabled'}")
# Display continuous aggregates
if info.get('continuous_aggregates'):
logger.info("\n=== Continuous Aggregates ===")
for ca in info.get('continuous_aggregates', []):
logger.info(f" {ca['view_name']}: {ca.get('size', 'N/A')}")
return True
else:
logger.error("Schema verification failed")
return False
else:
logger.error("Schema setup failed")
return False
except Exception as e:
logger.error(f"Error during schema setup: {e}")
return False
async def test_basic_operations(pool):
"""Test basic database operations."""
try:
logger.info("\n=== Testing Basic Operations ===")
async with pool.acquire() as conn:
# Test insert into ohlcv_data
logger.info("Testing OHLCV insert...")
await conn.execute("""
INSERT INTO ohlcv_data
(timestamp, symbol, timeframe, open_price, high_price, low_price, close_price, volume)
VALUES (NOW(), 'ETH/USDT', '1s', 2000.0, 2001.0, 1999.0, 2000.5, 100.0)
ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING
""")
logger.info("✓ OHLCV insert successful")
# Test query
logger.info("Testing OHLCV query...")
result = await conn.fetchrow("""
SELECT * FROM ohlcv_data
WHERE symbol = 'ETH/USDT'
ORDER BY timestamp DESC
LIMIT 1
""")
if result:
logger.info(f"✓ OHLCV query successful: {dict(result)}")
# Test order book insert
logger.info("Testing order book insert...")
await conn.execute("""
INSERT INTO order_book_snapshots
(timestamp, symbol, exchange, bids, asks, mid_price, spread)
VALUES (NOW(), 'ETH/USDT', 'binance', '[]'::jsonb, '[]'::jsonb, 2000.0, 0.1)
ON CONFLICT (timestamp, symbol, exchange) DO NOTHING
""")
logger.info("✓ Order book insert successful")
# Test imbalances insert
logger.info("Testing imbalances insert...")
await conn.execute("""
INSERT INTO order_book_imbalances
(timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s)
VALUES (NOW(), 'ETH/USDT', 0.5, 0.4, 0.3, 0.2)
ON CONFLICT (timestamp, symbol) DO NOTHING
""")
logger.info("✓ Imbalances insert successful")
logger.info("\n✓ All basic operations successful")
return True
except Exception as e:
logger.error(f"Basic operations test failed: {e}")
return False
async def main():
"""Main setup function."""
logger.info("=== Unified Data Storage Setup ===\n")
pool = None
try:
# Load configuration
config = get_config()
# Create connection pool
pool = await create_connection_pool(config)
# Verify TimescaleDB
if not await verify_timescaledb(pool):
logger.error("TimescaleDB verification failed")
return 1
# Setup schema
if not await setup_schema(pool):
logger.error("Schema setup failed")
return 1
# Test basic operations
if not await test_basic_operations(pool):
logger.error("Basic operations test failed")
return 1
logger.info("\n=== Setup Complete ===")
logger.info("Unified data storage system is ready to use!")
return 0
except Exception as e:
logger.error(f"Setup failed: {e}")
return 1
finally:
if pool:
await pool.close()
logger.info("Database connection pool closed")
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)