Files
gogo2/docs/UNIFIED_STORAGE_INTEGRATION.md
Dobromir Popov 68b91f37bd better pivots
2025-10-21 11:45:57 +03:00

399 lines
11 KiB
Markdown

# Unified Storage System Integration Guide
## Overview
The unified storage system has been integrated into the existing `DataProvider` class, providing a single endpoint for both real-time and historical data access.
## Key Features
**Single Endpoint**: One method for all data access
**Automatic Routing**: Cache for real-time, database for historical
**Backward Compatible**: All existing methods still work
**Opt-In**: Only enabled when explicitly initialized
**Fast**: <10ms cache reads, <100ms database queries
## Quick Start
### 1. Enable Unified Storage
```python
from core.data_provider import DataProvider
import asyncio
# Create DataProvider (existing code works as before)
data_provider = DataProvider()
# Enable unified storage system
async def setup():
success = await data_provider.enable_unified_storage()
if success:
print("✅ Unified storage enabled!")
else:
print("❌ Failed to enable unified storage")
asyncio.run(setup())
```
### 2. Get Real-Time Data (from cache)
```python
async def get_realtime_data():
# Get latest real-time data (timestamp=None)
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
print(f"Symbol: {inference_data.symbol}")
print(f"Timestamp: {inference_data.timestamp}")
print(f"Latest price: {inference_data.get_latest_price()}")
print(f"Data source: {inference_data.data_source}") # 'cache'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <10ms
# Check data completeness
if inference_data.has_complete_data():
print("✓ All required data present")
# Get data summary
summary = inference_data.get_data_summary()
print(f"OHLCV 1m rows: {summary['ohlcv_1m_rows']}")
print(f"Has orderbook: {summary['has_orderbook']}")
print(f"Imbalances rows: {summary['imbalances_rows']}")
asyncio.run(get_realtime_data())
```
### 3. Get Historical Data (from database)
```python
from datetime import datetime, timedelta
async def get_historical_data():
# Get historical data at specific timestamp
target_time = datetime.now() - timedelta(hours=1)
inference_data = await data_provider.get_inference_data_unified(
symbol='ETH/USDT',
timestamp=target_time,
context_window_minutes=5 # ±5 minutes of context
)
print(f"Data source: {inference_data.data_source}") # 'database'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <100ms
# Access multi-timeframe data
print(f"1s candles: {len(inference_data.ohlcv_1s)}")
print(f"1m candles: {len(inference_data.ohlcv_1m)}")
print(f"1h candles: {len(inference_data.ohlcv_1h)}")
# Access technical indicators
print(f"RSI: {inference_data.indicators.get('rsi_14')}")
print(f"MACD: {inference_data.indicators.get('macd')}")
# Access context data
if inference_data.context_data is not None:
print(f"Context data: {len(inference_data.context_data)} rows")
asyncio.run(get_historical_data())
```
### 4. Get Multi-Timeframe Data
```python
async def get_multi_timeframe():
# Get multiple timeframes at once
multi_tf = await data_provider.get_multi_timeframe_data_unified(
symbol='ETH/USDT',
timeframes=['1m', '5m', '1h'],
limit=100
)
for timeframe, df in multi_tf.items():
print(f"{timeframe}: {len(df)} candles")
if not df.empty:
print(f" Latest close: {df.iloc[-1]['close_price']}")
asyncio.run(get_multi_timeframe())
```
### 5. Get Order Book Data
```python
async def get_orderbook():
# Get order book with imbalances
orderbook = await data_provider.get_order_book_data_unified('ETH/USDT')
print(f"Mid price: {orderbook.mid_price}")
print(f"Spread: {orderbook.spread}")
print(f"Spread (bps): {orderbook.get_spread_bps()}")
# Get best bid/ask
best_bid = orderbook.get_best_bid()
best_ask = orderbook.get_best_ask()
print(f"Best bid: {best_bid}")
print(f"Best ask: {best_ask}")
# Get imbalance summary
imbalances = orderbook.get_imbalance_summary()
print(f"Imbalances: {imbalances}")
asyncio.run(get_orderbook())
```
### 6. Get Statistics
```python
# Get unified storage statistics
stats = data_provider.get_unified_storage_stats()
print("=== Cache Statistics ===")
print(f"Hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"Total entries: {stats['cache']['total_entries']}")
print("\n=== Database Statistics ===")
print(f"Total queries: {stats['database']['total_queries']}")
print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
print("\n=== Ingestion Statistics ===")
print(f"Total ingested: {stats['ingestion']['total_ingested']}")
print(f"Validation failures: {stats['ingestion']['validation_failures']}")
```
## Integration with Existing Code
### Backward Compatibility
All existing DataProvider methods continue to work:
```python
# Existing methods still work
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=100)
price = data_provider.get_current_price('ETH/USDT')
features = data_provider.get_feature_matrix('ETH/USDT')
# New unified methods available alongside
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
```
### Gradual Migration
You can migrate to unified storage gradually:
```python
# Option 1: Use existing methods (no changes needed)
df = data_provider.get_historical_data('ETH/USDT', '1m')
# Option 2: Use unified storage for new features
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
```
## Use Cases
### 1. Real-Time Trading
```python
async def realtime_trading_loop():
while True:
# Get latest market data (fast!)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Make trading decision
if data.has_complete_data():
price = data.get_latest_price()
rsi = data.indicators.get('rsi_14', 50)
if rsi < 30:
print(f"Buy signal at {price}")
elif rsi > 70:
print(f"Sell signal at {price}")
await asyncio.sleep(1)
```
### 2. Backtesting
```python
async def backtest_strategy(start_time, end_time):
current_time = start_time
while current_time < end_time:
# Get historical data at specific time
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=current_time,
context_window_minutes=60
)
# Run strategy
if data.has_complete_data():
# Your strategy logic here
pass
# Move to next timestamp
current_time += timedelta(minutes=1)
```
### 3. Data Annotation
```python
async def annotate_data(timestamps):
annotations = []
for timestamp in timestamps:
# Get data at specific timestamp
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=timestamp,
context_window_minutes=5
)
# Display to user for annotation
# User marks buy/sell signals
annotation = {
'timestamp': timestamp,
'price': data.get_latest_price(),
'signal': 'buy', # User input
'data': data.to_dict()
}
annotations.append(annotation)
return annotations
```
### 4. Model Training
```python
async def prepare_training_data(symbol, start_time, end_time):
training_samples = []
current_time = start_time
while current_time < end_time:
# Get complete inference data
data = await data_provider.get_inference_data_unified(
symbol,
timestamp=current_time,
context_window_minutes=10
)
if data.has_complete_data():
# Extract features
features = {
'ohlcv_1m': data.ohlcv_1m.to_numpy(),
'indicators': data.indicators,
'imbalances': data.imbalances.to_numpy(),
'orderbook': data.orderbook_snapshot
}
training_samples.append(features)
current_time += timedelta(minutes=1)
return training_samples
```
## Configuration
### Database Configuration
Update `config.yaml`:
```yaml
database:
host: localhost
port: 5432
name: trading_data
user: postgres
password: postgres
pool_size: 20
```
### Setup Database
```bash
# Run setup script
python scripts/setup_unified_storage.py
```
## Performance Tips
1. **Use Real-Time Endpoint for Latest Data**
```python
# Fast (cache)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Slower (database)
data = await data_provider.get_inference_data_unified('ETH/USDT', datetime.now())
```
2. **Batch Historical Queries**
```python
# Get multiple timeframes at once
multi_tf = await data_provider.get_multi_timeframe_data_unified(
'ETH/USDT',
['1m', '5m', '1h'],
limit=100
)
```
3. **Monitor Performance**
```python
stats = data_provider.get_unified_storage_stats()
print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
```
## Troubleshooting
### Unified Storage Not Available
```python
if not data_provider.is_unified_storage_enabled():
success = await data_provider.enable_unified_storage()
if not success:
print("Check database connection and configuration")
```
### Slow Queries
```python
# Check query latency
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if data.query_latency_ms > 100:
print(f"Slow query: {data.query_latency_ms}ms")
# Check database stats
stats = data_provider.get_unified_storage_stats()
print(stats['database'])
```
### Missing Data
```python
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if not data.has_complete_data():
summary = data.get_data_summary()
print(f"Missing data: {summary}")
```
## API Reference
### Main Methods
- `enable_unified_storage()` - Enable unified storage system
- `disable_unified_storage()` - Disable unified storage system
- `get_inference_data_unified()` - Get complete inference data
- `get_multi_timeframe_data_unified()` - Get multi-timeframe data
- `get_order_book_data_unified()` - Get order book with imbalances
- `get_unified_storage_stats()` - Get statistics
- `is_unified_storage_enabled()` - Check if enabled
### Data Models
- `InferenceDataFrame` - Complete inference data structure
- `OrderBookDataFrame` - Order book with imbalances
- `OHLCVCandle` - Single candlestick
- `TradeEvent` - Individual trade
## Support
For issues or questions:
1. Check database connection: `python scripts/setup_unified_storage.py`
2. Review logs for errors
3. Check statistics: `data_provider.get_unified_storage_stats()`