Files
gogo2/core/timescale_storage.py
Dobromir Popov 0225f4df58 wip wip wip
2025-10-23 18:57:07 +03:00

372 lines
13 KiB
Python

"""
TimescaleDB Storage for OHLCV Candle Data
Provides long-term storage for all candle data without limits.
Replaces capped deques with unlimited database storage.
CRITICAL POLICY: NO SYNTHETIC DATA ALLOWED
This module MUST ONLY store real market data from exchanges.
"""
import logging
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, List
import psycopg2
from psycopg2.extras import execute_values
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class TimescaleDBStorage:
"""
TimescaleDB storage for OHLCV candle data
Features:
- Unlimited storage (no caps)
- Fast time-range queries
- Automatic compression
- Multi-symbol, multi-timeframe support
"""
def __init__(self, connection_string: str = None):
"""
Initialize TimescaleDB storage
Args:
connection_string: PostgreSQL connection string
Default: postgresql://postgres:password@localhost:5432/trading_data
"""
self.connection_string = connection_string or \
"postgresql://postgres:password@localhost:5432/trading_data"
# Test connection
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT version();")
version = cur.fetchone()
logger.info(f"Connected to TimescaleDB: {version[0]}")
except Exception as e:
logger.error(f"Failed to connect to TimescaleDB: {e}")
logger.warning("TimescaleDB storage will not be available")
raise
@contextmanager
def get_connection(self):
"""Get database connection with automatic cleanup"""
conn = psycopg2.connect(self.connection_string)
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def create_tables(self):
"""Create TimescaleDB tables and hypertables"""
with self.get_connection() as conn:
with conn.cursor() as cur:
# Create extension if not exists
cur.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;")
# Create ohlcv_candles table
cur.execute("""
CREATE TABLE IF NOT EXISTS ohlcv_candles (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL,
PRIMARY KEY (time, symbol, timeframe)
);
""")
# Convert to hypertable (if not already)
try:
cur.execute("""
SELECT create_hypertable('ohlcv_candles', 'time',
if_not_exists => TRUE);
""")
logger.info("Created hypertable: ohlcv_candles")
except Exception as e:
logger.debug(f"Hypertable may already exist: {e}")
# Create indexes for fast queries
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_symbol_timeframe_time
ON ohlcv_candles (symbol, timeframe, time DESC);
""")
# Enable compression (saves 10-20x space)
try:
cur.execute("""
ALTER TABLE ohlcv_candles SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,timeframe'
);
""")
logger.info("Enabled compression on ohlcv_candles")
except Exception as e:
logger.debug(f"Compression may already be enabled: {e}")
# Add compression policy (compress data older than 7 days)
try:
cur.execute("""
SELECT add_compression_policy('ohlcv_candles', INTERVAL '7 days');
""")
logger.info("Added compression policy (7 days)")
except Exception as e:
logger.debug(f"Compression policy may already exist: {e}")
logger.info("TimescaleDB tables created successfully")
def store_candles(self, symbol: str, timeframe: str, df: pd.DataFrame):
"""
Store OHLCV candles in TimescaleDB
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
timeframe: Timeframe (e.g., '1s', '1m', '1h', '1d')
df: DataFrame with columns: open, high, low, close, volume
Index must be DatetimeIndex (timestamps)
Returns:
int: Number of candles stored
"""
if df is None or df.empty:
logger.warning(f"No data to store for {symbol} {timeframe}")
return 0
try:
# Prepare data for insertion
data = []
for timestamp, row in df.iterrows():
data.append((
timestamp,
symbol,
timeframe,
float(row['open']),
float(row['high']),
float(row['low']),
float(row['close']),
float(row['volume'])
))
# Insert data (ON CONFLICT DO NOTHING to avoid duplicates)
with self.get_connection() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO ohlcv_candles
(time, symbol, timeframe, open, high, low, close, volume)
VALUES %s
ON CONFLICT (time, symbol, timeframe) DO NOTHING
""",
data
)
logger.info(f"Stored {len(data)} candles for {symbol} {timeframe}")
return len(data)
except Exception as e:
logger.error(f"Error storing candles for {symbol} {timeframe}: {e}")
return 0
def get_candles(self, symbol: str, timeframe: str,
start_time: datetime = None, end_time: datetime = None,
limit: int = None) -> Optional[pd.DataFrame]:
"""
Retrieve OHLCV candles from TimescaleDB
Args:
symbol: Trading symbol
timeframe: Timeframe
start_time: Start of time range (optional)
end_time: End of time range (optional)
limit: Maximum number of candles to return (optional)
Returns:
DataFrame with OHLCV data, indexed by timestamp
"""
try:
# Build query
query = """
SELECT time, open, high, low, close, volume
FROM ohlcv_candles
WHERE symbol = %s AND timeframe = %s
"""
params = [symbol, timeframe]
# Add time range filter
if start_time:
query += " AND time >= %s"
params.append(start_time)
if end_time:
query += " AND time <= %s"
params.append(end_time)
# Order by time
query += " ORDER BY time DESC"
# Add limit
if limit:
query += " LIMIT %s"
params.append(limit)
# Execute query
with self.get_connection() as conn:
df = pd.read_sql(query, conn, params=params, index_col='time')
# Sort by time ascending (oldest first)
if not df.empty:
df = df.sort_index()
logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe}")
return df
except Exception as e:
logger.error(f"Error retrieving candles for {symbol} {timeframe}: {e}")
return None
def get_recent_candles(self, symbol: str, timeframe: str,
limit: int = 1000) -> Optional[pd.DataFrame]:
"""
Get most recent candles
Args:
symbol: Trading symbol
timeframe: Timeframe
limit: Number of recent candles to retrieve
Returns:
DataFrame with recent OHLCV data
"""
return self.get_candles(symbol, timeframe, limit=limit)
def get_candles_count(self, symbol: str = None, timeframe: str = None) -> int:
"""
Get count of stored candles
Args:
symbol: Optional symbol filter
timeframe: Optional timeframe filter
Returns:
Number of candles stored
"""
try:
query = "SELECT COUNT(*) FROM ohlcv_candles WHERE 1=1"
params = []
if symbol:
query += " AND symbol = %s"
params.append(symbol)
if timeframe:
query += " AND timeframe = %s"
params.append(timeframe)
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
count = cur.fetchone()[0]
return count
except Exception as e:
logger.error(f"Error getting candles count: {e}")
return 0
def get_storage_stats(self) -> dict:
"""
Get storage statistics
Returns:
Dictionary with storage stats
"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
# Total candles
cur.execute("SELECT COUNT(*) FROM ohlcv_candles")
total_candles = cur.fetchone()[0]
# Candles by symbol
cur.execute("""
SELECT symbol, COUNT(*) as count
FROM ohlcv_candles
GROUP BY symbol
ORDER BY count DESC
""")
by_symbol = dict(cur.fetchall())
# Candles by timeframe
cur.execute("""
SELECT timeframe, COUNT(*) as count
FROM ohlcv_candles
GROUP BY timeframe
ORDER BY count DESC
""")
by_timeframe = dict(cur.fetchall())
# Time range
cur.execute("""
SELECT MIN(time) as oldest, MAX(time) as newest
FROM ohlcv_candles
""")
oldest, newest = cur.fetchone()
# Table size
cur.execute("""
SELECT pg_size_pretty(pg_total_relation_size('ohlcv_candles'))
""")
table_size = cur.fetchone()[0]
return {
'total_candles': total_candles,
'by_symbol': by_symbol,
'by_timeframe': by_timeframe,
'oldest_candle': oldest,
'newest_candle': newest,
'table_size': table_size
}
except Exception as e:
logger.error(f"Error getting storage stats: {e}")
return {}
# Global instance
_timescale_storage = None
def get_timescale_storage(connection_string: str = None) -> Optional[TimescaleDBStorage]:
"""
Get global TimescaleDB storage instance
Args:
connection_string: PostgreSQL connection string (optional)
Returns:
TimescaleDBStorage instance or None if unavailable
"""
global _timescale_storage
if _timescale_storage is None:
try:
_timescale_storage = TimescaleDBStorage(connection_string)
_timescale_storage.create_tables()
logger.info("TimescaleDB storage initialized successfully")
except Exception as e:
logger.warning(f"TimescaleDB storage not available: {e}")
_timescale_storage = None
return _timescale_storage