562 lines
20 KiB
Python
562 lines
20 KiB
Python
"""
|
|
Unified Queryable Storage Manager
|
|
|
|
Provides a unified interface for queryable data storage with automatic fallback:
|
|
1. TimescaleDB (preferred) - for production with time-series optimization
|
|
2. SQLite (fallback) - for development/testing without TimescaleDB
|
|
|
|
This avoids data duplication with parquet/cache by providing a single queryable layer
|
|
that can be reused across multiple training setups.
|
|
|
|
Key Features:
|
|
- Automatic detection and fallback
|
|
- Unified query interface
|
|
- Time-series optimized queries
|
|
- Efficient storage for training data
|
|
- No duplication with existing cache implementations
|
|
"""
|
|
|
|
import logging
|
|
import sqlite3
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any, Union
|
|
from pathlib import Path
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class UnifiedQueryableStorage:
|
|
"""
|
|
Unified storage manager with TimescaleDB/SQLite fallback
|
|
|
|
Provides queryable storage for:
|
|
- OHLCV candle data
|
|
- Prediction records
|
|
- Training data
|
|
- Model metrics
|
|
|
|
Automatically uses TimescaleDB when available, falls back to SQLite otherwise.
|
|
"""
|
|
|
|
def __init__(self,
|
|
timescale_connection_string: Optional[str] = None,
|
|
sqlite_path: str = "data/queryable_storage.db"):
|
|
"""
|
|
Initialize unified storage with automatic fallback
|
|
|
|
Args:
|
|
timescale_connection_string: PostgreSQL/TimescaleDB connection string
|
|
sqlite_path: Path to SQLite database file (fallback)
|
|
"""
|
|
self.backend = None
|
|
self.backend_type = None
|
|
|
|
# Try TimescaleDB first
|
|
if timescale_connection_string:
|
|
try:
|
|
from core.timescale_storage import get_timescale_storage
|
|
self.backend = get_timescale_storage(timescale_connection_string)
|
|
if self.backend:
|
|
self.backend_type = "timescale"
|
|
logger.info("✅ Using TimescaleDB for queryable storage")
|
|
except Exception as e:
|
|
logger.warning(f"TimescaleDB not available: {e}")
|
|
|
|
# Fallback to SQLite
|
|
if self.backend is None:
|
|
try:
|
|
self.backend = SQLiteQueryableStorage(sqlite_path)
|
|
self.backend_type = "sqlite"
|
|
logger.info("✅ Using SQLite for queryable storage (TimescaleDB fallback)")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize SQLite storage: {e}")
|
|
raise Exception("No queryable storage backend available")
|
|
|
|
def store_candles(self, symbol: str, timeframe: str, df: pd.DataFrame) -> bool:
|
|
"""
|
|
Store OHLCV candles
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
timeframe: Timeframe (e.g., '1m', '1h', '1d')
|
|
df: DataFrame with OHLCV data
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
if self.backend_type == "timescale":
|
|
self.backend.store_candles(symbol, timeframe, df)
|
|
else:
|
|
self.backend.store_candles(symbol, timeframe, df)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error storing candles: {e}")
|
|
return False
|
|
|
|
def get_candles(self,
|
|
symbol: str,
|
|
timeframe: str,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
limit: Optional[int] = None) -> Optional[pd.DataFrame]:
|
|
"""
|
|
Retrieve OHLCV candles with time range filtering
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
start_time: Start of time range (optional)
|
|
end_time: End of time range (optional)
|
|
limit: Maximum number of candles (optional)
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data or None
|
|
"""
|
|
try:
|
|
if self.backend_type == "timescale":
|
|
return self.backend.get_candles(symbol, timeframe, start_time, end_time, limit)
|
|
else:
|
|
return self.backend.get_candles(symbol, timeframe, start_time, end_time, limit)
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving candles: {e}")
|
|
return None
|
|
|
|
def store_prediction(self, prediction_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Store prediction record for training
|
|
|
|
Args:
|
|
prediction_data: Dictionary with prediction information
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
return self.backend.store_prediction(prediction_data)
|
|
except Exception as e:
|
|
logger.error(f"Error storing prediction: {e}")
|
|
return False
|
|
|
|
def get_predictions(self,
|
|
symbol: Optional[str] = None,
|
|
model_name: Optional[str] = None,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
limit: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query predictions with filtering
|
|
|
|
Args:
|
|
symbol: Filter by symbol (optional)
|
|
model_name: Filter by model (optional)
|
|
start_time: Start of time range (optional)
|
|
end_time: End of time range (optional)
|
|
limit: Maximum number of records (optional)
|
|
|
|
Returns:
|
|
List of prediction records
|
|
"""
|
|
try:
|
|
return self.backend.get_predictions(symbol, model_name, start_time, end_time, limit)
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving predictions: {e}")
|
|
return []
|
|
|
|
def get_storage_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get storage statistics
|
|
|
|
Returns:
|
|
Dictionary with storage stats
|
|
"""
|
|
try:
|
|
stats = self.backend.get_storage_stats()
|
|
stats['backend_type'] = self.backend_type
|
|
return stats
|
|
except Exception as e:
|
|
logger.error(f"Error getting storage stats: {e}")
|
|
return {'backend_type': self.backend_type, 'error': str(e)}
|
|
|
|
|
|
class SQLiteQueryableStorage:
|
|
"""
|
|
SQLite-based queryable storage (fallback when TimescaleDB unavailable)
|
|
|
|
Provides similar functionality to TimescaleDB but using SQLite.
|
|
Optimized for time-series queries with proper indexing.
|
|
"""
|
|
|
|
def __init__(self, db_path: str = "data/queryable_storage.db"):
|
|
"""
|
|
Initialize SQLite storage
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize database
|
|
self._create_tables()
|
|
logger.info(f"SQLite queryable storage initialized: {self.db_path}")
|
|
|
|
def _create_tables(self):
|
|
"""Create SQLite tables with proper indexing"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# OHLCV candles table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS ohlcv_candles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
symbol TEXT NOT NULL,
|
|
timeframe TEXT NOT NULL,
|
|
timestamp INTEGER NOT NULL,
|
|
open REAL NOT NULL,
|
|
high REAL NOT NULL,
|
|
low REAL NOT NULL,
|
|
close REAL NOT NULL,
|
|
volume REAL NOT NULL,
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
|
UNIQUE(symbol, timeframe, timestamp)
|
|
)
|
|
""")
|
|
|
|
# Indexes for efficient time-series queries
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe_timestamp
|
|
ON ohlcv_candles(symbol, timeframe, timestamp DESC)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_ohlcv_timestamp
|
|
ON ohlcv_candles(timestamp DESC)
|
|
""")
|
|
|
|
# Predictions table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS predictions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
prediction_id TEXT UNIQUE NOT NULL,
|
|
symbol TEXT NOT NULL,
|
|
model_name TEXT NOT NULL,
|
|
timestamp INTEGER NOT NULL,
|
|
predicted_price REAL,
|
|
current_price REAL,
|
|
predicted_direction INTEGER,
|
|
confidence REAL,
|
|
timeframe TEXT,
|
|
outcome_price REAL,
|
|
outcome_timestamp INTEGER,
|
|
reward REAL,
|
|
metadata TEXT,
|
|
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
|
)
|
|
""")
|
|
|
|
# Indexes for prediction queries
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_predictions_symbol_timestamp
|
|
ON predictions(symbol, timestamp DESC)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_predictions_model_timestamp
|
|
ON predictions(model_name, timestamp DESC)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_predictions_timestamp
|
|
ON predictions(timestamp DESC)
|
|
""")
|
|
|
|
conn.commit()
|
|
logger.debug("SQLite tables created successfully")
|
|
|
|
def store_candles(self, symbol: str, timeframe: str, df: pd.DataFrame):
|
|
"""
|
|
Store OHLCV candles in SQLite
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
df: DataFrame with OHLCV data
|
|
"""
|
|
if df is None or df.empty:
|
|
return
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
# Prepare data
|
|
df_copy = df.copy()
|
|
df_copy['symbol'] = symbol
|
|
df_copy['timeframe'] = timeframe
|
|
|
|
# Convert timestamp to Unix timestamp if it's a datetime
|
|
if pd.api.types.is_datetime64_any_dtype(df_copy.index):
|
|
df_copy['timestamp'] = df_copy.index.astype('int64') // 10**9
|
|
else:
|
|
df_copy['timestamp'] = df_copy.index
|
|
|
|
# Reset index to make timestamp a column
|
|
df_copy = df_copy.reset_index(drop=True)
|
|
|
|
# Select only required columns
|
|
columns = ['symbol', 'timeframe', 'timestamp', 'open', 'high', 'low', 'close', 'volume']
|
|
df_insert = df_copy[columns]
|
|
|
|
# Insert with REPLACE to handle duplicates
|
|
df_insert.to_sql('ohlcv_candles', conn, if_exists='append', index=False, method='multi')
|
|
|
|
logger.debug(f"Stored {len(df_insert)} candles for {symbol} {timeframe}")
|
|
|
|
def get_candles(self,
|
|
symbol: str,
|
|
timeframe: str,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
limit: Optional[int] = None) -> Optional[pd.DataFrame]:
|
|
"""
|
|
Retrieve OHLCV candles from SQLite
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe
|
|
start_time: Start of time range
|
|
end_time: End of time range
|
|
limit: Maximum number of candles
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data
|
|
"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
# Build query
|
|
query = """
|
|
SELECT timestamp, open, high, low, close, volume
|
|
FROM ohlcv_candles
|
|
WHERE symbol = ? AND timeframe = ?
|
|
"""
|
|
params = [symbol, timeframe]
|
|
|
|
# Add time range filters
|
|
if start_time:
|
|
query += " AND timestamp >= ?"
|
|
params.append(int(start_time.timestamp()))
|
|
|
|
if end_time:
|
|
query += " AND timestamp <= ?"
|
|
params.append(int(end_time.timestamp()))
|
|
|
|
# Order by timestamp
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
# Add limit
|
|
if limit:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
|
|
# Execute query
|
|
df = pd.read_sql_query(query, conn, params=params)
|
|
|
|
if df.empty:
|
|
return None
|
|
|
|
# Convert timestamp to datetime and set as index
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
|
|
df.set_index('timestamp', inplace=True)
|
|
df.sort_index(inplace=True)
|
|
|
|
return df
|
|
|
|
def store_prediction(self, prediction_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Store prediction record
|
|
|
|
Args:
|
|
prediction_data: Dictionary with prediction information
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract fields
|
|
prediction_id = prediction_data.get('prediction_id')
|
|
symbol = prediction_data.get('symbol')
|
|
model_name = prediction_data.get('model_name')
|
|
timestamp = prediction_data.get('timestamp')
|
|
|
|
# Convert datetime to Unix timestamp
|
|
if isinstance(timestamp, datetime):
|
|
timestamp = int(timestamp.timestamp())
|
|
|
|
# Prepare metadata
|
|
metadata = {k: v for k, v in prediction_data.items()
|
|
if k not in ['prediction_id', 'symbol', 'model_name', 'timestamp',
|
|
'predicted_price', 'current_price', 'predicted_direction',
|
|
'confidence', 'timeframe', 'outcome_price',
|
|
'outcome_timestamp', 'reward']}
|
|
|
|
# Insert prediction
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO predictions
|
|
(prediction_id, symbol, model_name, timestamp, predicted_price,
|
|
current_price, predicted_direction, confidence, timeframe,
|
|
outcome_price, outcome_timestamp, reward, metadata)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
prediction_id,
|
|
symbol,
|
|
model_name,
|
|
timestamp,
|
|
prediction_data.get('predicted_price'),
|
|
prediction_data.get('current_price'),
|
|
prediction_data.get('predicted_direction'),
|
|
prediction_data.get('confidence'),
|
|
prediction_data.get('timeframe'),
|
|
prediction_data.get('outcome_price'),
|
|
prediction_data.get('outcome_timestamp'),
|
|
prediction_data.get('reward'),
|
|
json.dumps(metadata)
|
|
))
|
|
|
|
conn.commit()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing prediction: {e}")
|
|
return False
|
|
|
|
def get_predictions(self,
|
|
symbol: Optional[str] = None,
|
|
model_name: Optional[str] = None,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
limit: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query predictions with filtering
|
|
|
|
Args:
|
|
symbol: Filter by symbol
|
|
model_name: Filter by model
|
|
start_time: Start of time range
|
|
end_time: End of time range
|
|
limit: Maximum number of records
|
|
|
|
Returns:
|
|
List of prediction records
|
|
"""
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Build query
|
|
query = "SELECT * FROM predictions WHERE 1=1"
|
|
params = []
|
|
|
|
if symbol:
|
|
query += " AND symbol = ?"
|
|
params.append(symbol)
|
|
|
|
if model_name:
|
|
query += " AND model_name = ?"
|
|
params.append(model_name)
|
|
|
|
if start_time:
|
|
query += " AND timestamp >= ?"
|
|
params.append(int(start_time.timestamp()))
|
|
|
|
if end_time:
|
|
query += " AND timestamp <= ?"
|
|
params.append(int(end_time.timestamp()))
|
|
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
if limit:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert to list of dicts
|
|
predictions = []
|
|
for row in rows:
|
|
pred = dict(row)
|
|
# Parse metadata JSON
|
|
if pred.get('metadata'):
|
|
try:
|
|
pred['metadata'] = json.loads(pred['metadata'])
|
|
except:
|
|
pass
|
|
predictions.append(pred)
|
|
|
|
return predictions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error querying predictions: {e}")
|
|
return []
|
|
|
|
def get_storage_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get storage statistics
|
|
|
|
Returns:
|
|
Dictionary with storage stats
|
|
"""
|
|
try:
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get table sizes
|
|
cursor.execute("SELECT COUNT(*) FROM ohlcv_candles")
|
|
candles_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM predictions")
|
|
predictions_count = cursor.fetchone()[0]
|
|
|
|
# Get database file size
|
|
db_size = self.db_path.stat().st_size if self.db_path.exists() else 0
|
|
|
|
return {
|
|
'candles_count': candles_count,
|
|
'predictions_count': predictions_count,
|
|
'database_size_bytes': db_size,
|
|
'database_size_mb': db_size / (1024 * 1024),
|
|
'database_path': str(self.db_path)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting storage stats: {e}")
|
|
return {'error': str(e)}
|
|
|
|
|
|
# Global instance
|
|
_unified_storage = None
|
|
|
|
|
|
def get_unified_storage(timescale_connection_string: Optional[str] = None,
|
|
sqlite_path: str = "data/queryable_storage.db") -> UnifiedQueryableStorage:
|
|
"""
|
|
Get global unified storage instance
|
|
|
|
Args:
|
|
timescale_connection_string: PostgreSQL/TimescaleDB connection string
|
|
sqlite_path: Path to SQLite database file (fallback)
|
|
|
|
Returns:
|
|
UnifiedQueryableStorage instance
|
|
"""
|
|
global _unified_storage
|
|
|
|
if _unified_storage is None:
|
|
_unified_storage = UnifiedQueryableStorage(timescale_connection_string, sqlite_path)
|
|
logger.info(f"Unified queryable storage initialized: {_unified_storage.backend_type}")
|
|
|
|
return _unified_storage
|