11 KiB
11 KiB
Unified Storage System Integration Guide
Overview
The unified storage system has been integrated into the existing DataProvider class, providing a single endpoint for both real-time and historical data access.
Key Features
✅ Single Endpoint: One method for all data access
✅ Automatic Routing: Cache for real-time, database for historical
✅ Backward Compatible: All existing methods still work
✅ Opt-In: Only enabled when explicitly initialized
✅ Fast: <10ms cache reads, <100ms database queries
Quick Start
1. Enable Unified Storage
from core.data_provider import DataProvider
import asyncio
# Create DataProvider (existing code works as before)
data_provider = DataProvider()
# Enable unified storage system
async def setup():
success = await data_provider.enable_unified_storage()
if success:
print("✅ Unified storage enabled!")
else:
print("❌ Failed to enable unified storage")
asyncio.run(setup())
2. Get Real-Time Data (from cache)
async def get_realtime_data():
# Get latest real-time data (timestamp=None)
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
print(f"Symbol: {inference_data.symbol}")
print(f"Timestamp: {inference_data.timestamp}")
print(f"Latest price: {inference_data.get_latest_price()}")
print(f"Data source: {inference_data.data_source}") # 'cache'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <10ms
# Check data completeness
if inference_data.has_complete_data():
print("✓ All required data present")
# Get data summary
summary = inference_data.get_data_summary()
print(f"OHLCV 1m rows: {summary['ohlcv_1m_rows']}")
print(f"Has orderbook: {summary['has_orderbook']}")
print(f"Imbalances rows: {summary['imbalances_rows']}")
asyncio.run(get_realtime_data())
3. Get Historical Data (from database)
from datetime import datetime, timedelta
async def get_historical_data():
# Get historical data at specific timestamp
target_time = datetime.now() - timedelta(hours=1)
inference_data = await data_provider.get_inference_data_unified(
symbol='ETH/USDT',
timestamp=target_time,
context_window_minutes=5 # ±5 minutes of context
)
print(f"Data source: {inference_data.data_source}") # 'database'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <100ms
# Access multi-timeframe data
print(f"1s candles: {len(inference_data.ohlcv_1s)}")
print(f"1m candles: {len(inference_data.ohlcv_1m)}")
print(f"1h candles: {len(inference_data.ohlcv_1h)}")
# Access technical indicators
print(f"RSI: {inference_data.indicators.get('rsi_14')}")
print(f"MACD: {inference_data.indicators.get('macd')}")
# Access context data
if inference_data.context_data is not None:
print(f"Context data: {len(inference_data.context_data)} rows")
asyncio.run(get_historical_data())
4. Get Multi-Timeframe Data
async def get_multi_timeframe():
# Get multiple timeframes at once
multi_tf = await data_provider.get_multi_timeframe_data_unified(
symbol='ETH/USDT',
timeframes=['1m', '5m', '1h'],
limit=100
)
for timeframe, df in multi_tf.items():
print(f"{timeframe}: {len(df)} candles")
if not df.empty:
print(f" Latest close: {df.iloc[-1]['close_price']}")
asyncio.run(get_multi_timeframe())
5. Get Order Book Data
async def get_orderbook():
# Get order book with imbalances
orderbook = await data_provider.get_order_book_data_unified('ETH/USDT')
print(f"Mid price: {orderbook.mid_price}")
print(f"Spread: {orderbook.spread}")
print(f"Spread (bps): {orderbook.get_spread_bps()}")
# Get best bid/ask
best_bid = orderbook.get_best_bid()
best_ask = orderbook.get_best_ask()
print(f"Best bid: {best_bid}")
print(f"Best ask: {best_ask}")
# Get imbalance summary
imbalances = orderbook.get_imbalance_summary()
print(f"Imbalances: {imbalances}")
asyncio.run(get_orderbook())
6. Get Statistics
# Get unified storage statistics
stats = data_provider.get_unified_storage_stats()
print("=== Cache Statistics ===")
print(f"Hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"Total entries: {stats['cache']['total_entries']}")
print("\n=== Database Statistics ===")
print(f"Total queries: {stats['database']['total_queries']}")
print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
print("\n=== Ingestion Statistics ===")
print(f"Total ingested: {stats['ingestion']['total_ingested']}")
print(f"Validation failures: {stats['ingestion']['validation_failures']}")
Integration with Existing Code
Backward Compatibility
All existing DataProvider methods continue to work:
# Existing methods still work
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=100)
price = data_provider.get_current_price('ETH/USDT')
features = data_provider.get_feature_matrix('ETH/USDT')
# New unified methods available alongside
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
Gradual Migration
You can migrate to unified storage gradually:
# Option 1: Use existing methods (no changes needed)
df = data_provider.get_historical_data('ETH/USDT', '1m')
# Option 2: Use unified storage for new features
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
Use Cases
1. Real-Time Trading
async def realtime_trading_loop():
while True:
# Get latest market data (fast!)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Make trading decision
if data.has_complete_data():
price = data.get_latest_price()
rsi = data.indicators.get('rsi_14', 50)
if rsi < 30:
print(f"Buy signal at {price}")
elif rsi > 70:
print(f"Sell signal at {price}")
await asyncio.sleep(1)
2. Backtesting
async def backtest_strategy(start_time, end_time):
current_time = start_time
while current_time < end_time:
# Get historical data at specific time
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=current_time,
context_window_minutes=60
)
# Run strategy
if data.has_complete_data():
# Your strategy logic here
pass
# Move to next timestamp
current_time += timedelta(minutes=1)
3. Data Annotation
async def annotate_data(timestamps):
annotations = []
for timestamp in timestamps:
# Get data at specific timestamp
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=timestamp,
context_window_minutes=5
)
# Display to user for annotation
# User marks buy/sell signals
annotation = {
'timestamp': timestamp,
'price': data.get_latest_price(),
'signal': 'buy', # User input
'data': data.to_dict()
}
annotations.append(annotation)
return annotations
4. Model Training
async def prepare_training_data(symbol, start_time, end_time):
training_samples = []
current_time = start_time
while current_time < end_time:
# Get complete inference data
data = await data_provider.get_inference_data_unified(
symbol,
timestamp=current_time,
context_window_minutes=10
)
if data.has_complete_data():
# Extract features
features = {
'ohlcv_1m': data.ohlcv_1m.to_numpy(),
'indicators': data.indicators,
'imbalances': data.imbalances.to_numpy(),
'orderbook': data.orderbook_snapshot
}
training_samples.append(features)
current_time += timedelta(minutes=1)
return training_samples
Configuration
Database Configuration
Update config.yaml:
database:
host: localhost
port: 5432
name: trading_data
user: postgres
password: postgres
pool_size: 20
Setup Database
# Run setup script
python scripts/setup_unified_storage.py
Performance Tips
-
Use Real-Time Endpoint for Latest Data
# Fast (cache) data = await data_provider.get_inference_data_unified('ETH/USDT') # Slower (database) data = await data_provider.get_inference_data_unified('ETH/USDT', datetime.now()) -
Batch Historical Queries
# Get multiple timeframes at once multi_tf = await data_provider.get_multi_timeframe_data_unified( 'ETH/USDT', ['1m', '5m', '1h'], limit=100 ) -
Monitor Performance
stats = data_provider.get_unified_storage_stats() print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%") print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
Troubleshooting
Unified Storage Not Available
if not data_provider.is_unified_storage_enabled():
success = await data_provider.enable_unified_storage()
if not success:
print("Check database connection and configuration")
Slow Queries
# Check query latency
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if data.query_latency_ms > 100:
print(f"Slow query: {data.query_latency_ms}ms")
# Check database stats
stats = data_provider.get_unified_storage_stats()
print(stats['database'])
Missing Data
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if not data.has_complete_data():
summary = data.get_data_summary()
print(f"Missing data: {summary}")
API Reference
Main Methods
enable_unified_storage()- Enable unified storage systemdisable_unified_storage()- Disable unified storage systemget_inference_data_unified()- Get complete inference dataget_multi_timeframe_data_unified()- Get multi-timeframe dataget_order_book_data_unified()- Get order book with imbalancesget_unified_storage_stats()- Get statisticsis_unified_storage_enabled()- Check if enabled
Data Models
InferenceDataFrame- Complete inference data structureOrderBookDataFrame- Order book with imbalancesOHLCVCandle- Single candlestickTradeEvent- Individual trade
Support
For issues or questions:
- Check database connection:
python scripts/setup_unified_storage.py - Review logs for errors
- Check statistics:
data_provider.get_unified_storage_stats()