372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""
|
|
TimescaleDB Storage for OHLCV Candle Data
|
|
|
|
Provides long-term storage for all candle data without limits.
|
|
Replaces capped deques with unlimited database storage.
|
|
|
|
CRITICAL POLICY: NO SYNTHETIC DATA ALLOWED
|
|
This module MUST ONLY store real market data from exchanges.
|
|
"""
|
|
|
|
import logging
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
from contextlib import contextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TimescaleDBStorage:
|
|
"""
|
|
TimescaleDB storage for OHLCV candle data
|
|
|
|
Features:
|
|
- Unlimited storage (no caps)
|
|
- Fast time-range queries
|
|
- Automatic compression
|
|
- Multi-symbol, multi-timeframe support
|
|
"""
|
|
|
|
def __init__(self, connection_string: str = None):
|
|
"""
|
|
Initialize TimescaleDB storage
|
|
|
|
Args:
|
|
connection_string: PostgreSQL connection string
|
|
Default: postgresql://postgres:password@localhost:5432/trading_data
|
|
"""
|
|
self.connection_string = connection_string or \
|
|
"postgresql://postgres:password@localhost:5432/trading_data"
|
|
|
|
# Test connection
|
|
try:
|
|
with self.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT version();")
|
|
version = cur.fetchone()
|
|
logger.info(f"Connected to TimescaleDB: {version[0]}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to TimescaleDB: {e}")
|
|
logger.warning("TimescaleDB storage will not be available")
|
|
raise
|
|
|
|
@contextmanager
|
|
def get_connection(self):
|
|
"""Get database connection with automatic cleanup"""
|
|
conn = psycopg2.connect(self.connection_string)
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
raise e
|
|
finally:
|
|
conn.close()
|
|
|
|
def create_tables(self):
|
|
"""Create TimescaleDB tables and hypertables"""
|
|
with self.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
# Create extension if not exists
|
|
cur.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;")
|
|
|
|
# Create ohlcv_candles table
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS ohlcv_candles (
|
|
time TIMESTAMPTZ NOT NULL,
|
|
symbol TEXT NOT NULL,
|
|
timeframe TEXT NOT NULL,
|
|
open DOUBLE PRECISION NOT NULL,
|
|
high DOUBLE PRECISION NOT NULL,
|
|
low DOUBLE PRECISION NOT NULL,
|
|
close DOUBLE PRECISION NOT NULL,
|
|
volume DOUBLE PRECISION NOT NULL,
|
|
PRIMARY KEY (time, symbol, timeframe)
|
|
);
|
|
""")
|
|
|
|
# Convert to hypertable (if not already)
|
|
try:
|
|
cur.execute("""
|
|
SELECT create_hypertable('ohlcv_candles', 'time',
|
|
if_not_exists => TRUE);
|
|
""")
|
|
logger.info("Created hypertable: ohlcv_candles")
|
|
except Exception as e:
|
|
logger.debug(f"Hypertable may already exist: {e}")
|
|
|
|
# Create indexes for fast queries
|
|
cur.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_symbol_timeframe_time
|
|
ON ohlcv_candles (symbol, timeframe, time DESC);
|
|
""")
|
|
|
|
# Enable compression (saves 10-20x space)
|
|
try:
|
|
cur.execute("""
|
|
ALTER TABLE ohlcv_candles SET (
|
|
timescaledb.compress,
|
|
timescaledb.compress_segmentby = 'symbol,timeframe'
|
|
);
|
|
""")
|
|
logger.info("Enabled compression on ohlcv_candles")
|
|
except Exception as e:
|
|
logger.debug(f"Compression may already be enabled: {e}")
|
|
|
|
# Add compression policy (compress data older than 7 days)
|
|
try:
|
|
cur.execute("""
|
|
SELECT add_compression_policy('ohlcv_candles', INTERVAL '7 days');
|
|
""")
|
|
logger.info("Added compression policy (7 days)")
|
|
except Exception as e:
|
|
logger.debug(f"Compression policy may already exist: {e}")
|
|
|
|
logger.info("TimescaleDB tables created successfully")
|
|
|
|
def store_candles(self, symbol: str, timeframe: str, df: pd.DataFrame):
|
|
"""
|
|
Store OHLCV candles in TimescaleDB
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
timeframe: Timeframe (e.g., '1s', '1m', '1h', '1d')
|
|
df: DataFrame with columns: open, high, low, close, volume
|
|
Index must be DatetimeIndex (timestamps)
|
|
|
|
Returns:
|
|
int: Number of candles stored
|
|
"""
|
|
if df is None or df.empty:
|
|
logger.warning(f"No data to store for {symbol} {timeframe}")
|
|
return 0
|
|
|
|
try:
|
|
# Prepare data for insertion
|
|
data = []
|
|
for timestamp, row in df.iterrows():
|
|
data.append((
|
|
timestamp,
|
|
symbol,
|
|
timeframe,
|
|
float(row['open']),
|
|
float(row['high']),
|
|
float(row['low']),
|
|
float(row['close']),
|
|
float(row['volume'])
|
|
))
|
|
|
|
# Insert data (ON CONFLICT DO NOTHING to avoid duplicates)
|
|
with self.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO ohlcv_candles
|
|
(time, symbol, timeframe, open, high, low, close, volume)
|
|
VALUES %s
|
|
ON CONFLICT (time, symbol, timeframe) DO NOTHING
|
|
""",
|
|
data
|
|
)
|
|
|
|
logger.info(f"Stored {len(data)} candles for {symbol} {timeframe}")
|
|
return len(data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing candles for {symbol} {timeframe}: {e}")
|
|
return 0
|
|
|
|
def get_candles(self, symbol: str, timeframe: str,
|
|
start_time: datetime = None, end_time: datetime = None,
|
|
limit: int = None) -> Optional[pd.DataFrame]:
|
|
"""
|
|
Retrieve OHLCV candles from TimescaleDB
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
start_time: Start of time range (optional)
|
|
end_time: End of time range (optional)
|
|
limit: Maximum number of candles to return (optional)
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data, indexed by timestamp
|
|
"""
|
|
try:
|
|
# Build query
|
|
query = """
|
|
SELECT time, open, high, low, close, volume
|
|
FROM ohlcv_candles
|
|
WHERE symbol = %s AND timeframe = %s
|
|
"""
|
|
params = [symbol, timeframe]
|
|
|
|
# Add time range filter
|
|
if start_time:
|
|
query += " AND time >= %s"
|
|
params.append(start_time)
|
|
if end_time:
|
|
query += " AND time <= %s"
|
|
params.append(end_time)
|
|
|
|
# Order by time
|
|
query += " ORDER BY time DESC"
|
|
|
|
# Add limit
|
|
if limit:
|
|
query += " LIMIT %s"
|
|
params.append(limit)
|
|
|
|
# Execute query
|
|
with self.get_connection() as conn:
|
|
df = pd.read_sql(query, conn, params=params, index_col='time')
|
|
|
|
# Sort by time ascending (oldest first)
|
|
if not df.empty:
|
|
df = df.sort_index()
|
|
|
|
logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving candles for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def get_recent_candles(self, symbol: str, timeframe: str,
|
|
limit: int = 1000) -> Optional[pd.DataFrame]:
|
|
"""
|
|
Get most recent candles
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
limit: Number of recent candles to retrieve
|
|
|
|
Returns:
|
|
DataFrame with recent OHLCV data
|
|
"""
|
|
return self.get_candles(symbol, timeframe, limit=limit)
|
|
|
|
def get_candles_count(self, symbol: str = None, timeframe: str = None) -> int:
|
|
"""
|
|
Get count of stored candles
|
|
|
|
Args:
|
|
symbol: Optional symbol filter
|
|
timeframe: Optional timeframe filter
|
|
|
|
Returns:
|
|
Number of candles stored
|
|
"""
|
|
try:
|
|
query = "SELECT COUNT(*) FROM ohlcv_candles WHERE 1=1"
|
|
params = []
|
|
|
|
if symbol:
|
|
query += " AND symbol = %s"
|
|
params.append(symbol)
|
|
if timeframe:
|
|
query += " AND timeframe = %s"
|
|
params.append(timeframe)
|
|
|
|
with self.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(query, params)
|
|
count = cur.fetchone()[0]
|
|
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting candles count: {e}")
|
|
return 0
|
|
|
|
def get_storage_stats(self) -> dict:
|
|
"""
|
|
Get storage statistics
|
|
|
|
Returns:
|
|
Dictionary with storage stats
|
|
"""
|
|
try:
|
|
with self.get_connection() as conn:
|
|
with conn.cursor() as cur:
|
|
# Total candles
|
|
cur.execute("SELECT COUNT(*) FROM ohlcv_candles")
|
|
total_candles = cur.fetchone()[0]
|
|
|
|
# Candles by symbol
|
|
cur.execute("""
|
|
SELECT symbol, COUNT(*) as count
|
|
FROM ohlcv_candles
|
|
GROUP BY symbol
|
|
ORDER BY count DESC
|
|
""")
|
|
by_symbol = dict(cur.fetchall())
|
|
|
|
# Candles by timeframe
|
|
cur.execute("""
|
|
SELECT timeframe, COUNT(*) as count
|
|
FROM ohlcv_candles
|
|
GROUP BY timeframe
|
|
ORDER BY count DESC
|
|
""")
|
|
by_timeframe = dict(cur.fetchall())
|
|
|
|
# Time range
|
|
cur.execute("""
|
|
SELECT MIN(time) as oldest, MAX(time) as newest
|
|
FROM ohlcv_candles
|
|
""")
|
|
oldest, newest = cur.fetchone()
|
|
|
|
# Table size
|
|
cur.execute("""
|
|
SELECT pg_size_pretty(pg_total_relation_size('ohlcv_candles'))
|
|
""")
|
|
table_size = cur.fetchone()[0]
|
|
|
|
return {
|
|
'total_candles': total_candles,
|
|
'by_symbol': by_symbol,
|
|
'by_timeframe': by_timeframe,
|
|
'oldest_candle': oldest,
|
|
'newest_candle': newest,
|
|
'table_size': table_size
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting storage stats: {e}")
|
|
return {}
|
|
|
|
|
|
# Global instance
|
|
_timescale_storage = None
|
|
|
|
|
|
def get_timescale_storage(connection_string: str = None) -> Optional[TimescaleDBStorage]:
|
|
"""
|
|
Get global TimescaleDB storage instance
|
|
|
|
Args:
|
|
connection_string: PostgreSQL connection string (optional)
|
|
|
|
Returns:
|
|
TimescaleDBStorage instance or None if unavailable
|
|
"""
|
|
global _timescale_storage
|
|
|
|
if _timescale_storage is None:
|
|
try:
|
|
_timescale_storage = TimescaleDBStorage(connection_string)
|
|
_timescale_storage.create_tables()
|
|
logger.info("TimescaleDB storage initialized successfully")
|
|
except Exception as e:
|
|
logger.warning(f"TimescaleDB storage not available: {e}")
|
|
_timescale_storage = None
|
|
|
|
return _timescale_storage
|