471 lines
11 KiB
Markdown
471 lines
11 KiB
Markdown
# Data Provider Quick Reference Guide
|
|
|
|
## Overview
|
|
|
|
Quick reference for using the multi-layered data provider system in the multi-modal trading system.
|
|
|
|
## Architecture Layers
|
|
|
|
```
|
|
COBY System → Core DataProvider → StandardizedDataProvider → Models
|
|
```
|
|
|
|
## Getting Started
|
|
|
|
### Basic Usage
|
|
|
|
```python
|
|
from core.standardized_data_provider import StandardizedDataProvider
|
|
|
|
# Initialize provider
|
|
provider = StandardizedDataProvider(
|
|
symbols=['ETH/USDT', 'BTC/USDT'],
|
|
timeframes=['1s', '1m', '1h', '1d']
|
|
)
|
|
|
|
# Start real-time processing
|
|
provider.start_real_time_processing()
|
|
|
|
# Get standardized input for models
|
|
base_input = provider.get_base_data_input('ETH/USDT')
|
|
|
|
# Validate data quality
|
|
if base_input and base_input.validate():
|
|
# Use data for model inference
|
|
pass
|
|
```
|
|
|
|
## BaseDataInput Structure
|
|
|
|
```python
|
|
@dataclass
|
|
class BaseDataInput:
|
|
symbol: str # 'ETH/USDT'
|
|
timestamp: datetime # Current time
|
|
|
|
# OHLCV Data (300 frames each)
|
|
ohlcv_1s: List[OHLCVBar] # 1-second bars
|
|
ohlcv_1m: List[OHLCVBar] # 1-minute bars
|
|
ohlcv_1h: List[OHLCVBar] # 1-hour bars
|
|
ohlcv_1d: List[OHLCVBar] # 1-day bars
|
|
btc_ohlcv_1s: List[OHLCVBar] # BTC reference
|
|
|
|
# COB Data
|
|
cob_data: Optional[COBData] # Order book data
|
|
|
|
# Technical Analysis
|
|
technical_indicators: Dict[str, float] # RSI, MACD, etc.
|
|
pivot_points: List[PivotPoint] # Williams pivots
|
|
|
|
# Cross-Model Feeding
|
|
last_predictions: Dict[str, ModelOutput] # Other model outputs
|
|
|
|
# Market Microstructure
|
|
market_microstructure: Dict[str, Any] # Order flow, etc.
|
|
```
|
|
|
|
## Common Operations
|
|
|
|
### Get Current Price
|
|
|
|
```python
|
|
# Multiple fallback methods
|
|
price = provider.get_current_price('ETH/USDT')
|
|
|
|
# Direct API call with cache
|
|
price = provider.get_live_price_from_api('ETH/USDT')
|
|
```
|
|
|
|
### Get Historical Data
|
|
|
|
```python
|
|
# Get OHLCV data
|
|
df = provider.get_historical_data(
|
|
symbol='ETH/USDT',
|
|
timeframe='1h',
|
|
limit=300
|
|
)
|
|
```
|
|
|
|
### Get COB Data
|
|
|
|
```python
|
|
# Get latest COB snapshot
|
|
cob_data = provider.get_latest_cob_data('ETH/USDT')
|
|
|
|
# Get COB imbalance metrics
|
|
imbalance = provider.get_current_cob_imbalance('ETH/USDT')
|
|
```
|
|
|
|
### Get Pivot Points
|
|
|
|
```python
|
|
# Get Williams Market Structure pivots
|
|
pivots = provider.calculate_williams_pivot_points('ETH/USDT')
|
|
```
|
|
|
|
### Store Model Output
|
|
|
|
```python
|
|
from core.data_models import ModelOutput
|
|
|
|
# Create model output
|
|
output = ModelOutput(
|
|
model_type='cnn',
|
|
model_name='williams_cnn_v2',
|
|
symbol='ETH/USDT',
|
|
timestamp=datetime.now(),
|
|
confidence=0.85,
|
|
predictions={
|
|
'action': 'BUY',
|
|
'action_confidence': 0.85,
|
|
'direction_vector': 0.7
|
|
},
|
|
hidden_states={'conv_features': tensor(...)},
|
|
metadata={'version': '2.1'}
|
|
)
|
|
|
|
# Store for cross-model feeding
|
|
provider.store_model_output(output)
|
|
```
|
|
|
|
### Get Model Outputs
|
|
|
|
```python
|
|
# Get all model outputs for a symbol
|
|
outputs = provider.get_model_outputs('ETH/USDT')
|
|
|
|
# Access specific model output
|
|
cnn_output = outputs.get('williams_cnn_v2')
|
|
```
|
|
|
|
## Data Validation
|
|
|
|
### Validate BaseDataInput
|
|
|
|
```python
|
|
base_input = provider.get_base_data_input('ETH/USDT')
|
|
|
|
if base_input:
|
|
# Check validation
|
|
is_valid = base_input.validate()
|
|
|
|
# Check data completeness
|
|
if len(base_input.ohlcv_1s) >= 100:
|
|
# Sufficient data for inference
|
|
pass
|
|
```
|
|
|
|
### Check Data Quality
|
|
|
|
```python
|
|
# Get data completeness metrics
|
|
if base_input:
|
|
ohlcv_complete = all([
|
|
len(base_input.ohlcv_1s) >= 100,
|
|
len(base_input.ohlcv_1m) >= 100,
|
|
len(base_input.ohlcv_1h) >= 100,
|
|
len(base_input.ohlcv_1d) >= 100
|
|
])
|
|
|
|
cob_complete = base_input.cob_data is not None
|
|
|
|
# Overall quality score (implement in Task 2.3)
|
|
# quality_score = base_input.data_quality_score()
|
|
```
|
|
|
|
## COB Data Access
|
|
|
|
### COB Data Structure
|
|
|
|
```python
|
|
@dataclass
|
|
class COBData:
|
|
symbol: str
|
|
timestamp: datetime
|
|
current_price: float
|
|
bucket_size: float # $1 ETH, $10 BTC
|
|
|
|
# Price Buckets (±20 around current price)
|
|
price_buckets: Dict[float, Dict[str, float]] # {price: {bid_vol, ask_vol}}
|
|
bid_ask_imbalance: Dict[float, float] # {price: imbalance}
|
|
|
|
# Moving Averages (±5 buckets)
|
|
ma_1s_imbalance: Dict[float, float]
|
|
ma_5s_imbalance: Dict[float, float]
|
|
ma_15s_imbalance: Dict[float, float]
|
|
ma_60s_imbalance: Dict[float, float]
|
|
|
|
# Order Flow
|
|
order_flow_metrics: Dict[str, float]
|
|
```
|
|
|
|
### Access COB Buckets
|
|
|
|
```python
|
|
if base_input.cob_data:
|
|
cob = base_input.cob_data
|
|
|
|
# Get current price
|
|
current_price = cob.current_price
|
|
|
|
# Get bid/ask volumes for specific price
|
|
price_level = current_price + cob.bucket_size # One bucket up
|
|
if price_level in cob.price_buckets:
|
|
bucket = cob.price_buckets[price_level]
|
|
bid_volume = bucket.get('bid_volume', 0)
|
|
ask_volume = bucket.get('ask_volume', 0)
|
|
|
|
# Get imbalance for price level
|
|
imbalance = cob.bid_ask_imbalance.get(price_level, 0)
|
|
|
|
# Get moving averages
|
|
ma_1s = cob.ma_1s_imbalance.get(price_level, 0)
|
|
ma_5s = cob.ma_5s_imbalance.get(price_level, 0)
|
|
```
|
|
|
|
## Subscriber Pattern
|
|
|
|
### Subscribe to Data Updates
|
|
|
|
```python
|
|
def my_data_callback(tick):
|
|
"""Handle real-time tick data"""
|
|
print(f"Received tick: {tick.symbol} @ {tick.price}")
|
|
|
|
# Subscribe to data updates
|
|
subscriber_id = provider.subscribe_to_data(
|
|
callback=my_data_callback,
|
|
symbols=['ETH/USDT'],
|
|
subscriber_name='my_model'
|
|
)
|
|
|
|
# Unsubscribe when done
|
|
provider.unsubscribe_from_data(subscriber_id)
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Key Configuration Options
|
|
|
|
```yaml
|
|
# config.yaml
|
|
data_provider:
|
|
symbols:
|
|
- ETH/USDT
|
|
- BTC/USDT
|
|
|
|
timeframes:
|
|
- 1s
|
|
- 1m
|
|
- 1h
|
|
- 1d
|
|
|
|
cache:
|
|
enabled: true
|
|
candles_per_timeframe: 1500
|
|
|
|
cob:
|
|
enabled: true
|
|
bucket_sizes:
|
|
ETH/USDT: 1.0 # $1 buckets
|
|
BTC/USDT: 10.0 # $10 buckets
|
|
price_ranges:
|
|
ETH/USDT: 5.0 # ±$5 for imbalance
|
|
BTC/USDT: 50.0 # ±$50 for imbalance
|
|
|
|
websocket:
|
|
update_speed: 100ms
|
|
max_depth: 1000
|
|
reconnect_delay: 1.0
|
|
max_reconnect_delay: 60.0
|
|
```
|
|
|
|
## Performance Tips
|
|
|
|
### Optimize Data Access
|
|
|
|
```python
|
|
# Cache BaseDataInput for multiple models
|
|
base_input = provider.get_base_data_input('ETH/USDT')
|
|
|
|
# Use cached data for all models
|
|
cnn_input = base_input # CNN uses full data
|
|
rl_input = base_input # RL uses full data + CNN outputs
|
|
|
|
# Avoid repeated calls
|
|
# BAD: base_input = provider.get_base_data_input('ETH/USDT') # Called multiple times
|
|
# GOOD: Cache and reuse
|
|
```
|
|
|
|
### Monitor Performance
|
|
|
|
```python
|
|
# Check subscriber statistics
|
|
stats = provider.distribution_stats
|
|
|
|
print(f"Total ticks received: {stats['total_ticks_received']}")
|
|
print(f"Total ticks distributed: {stats['total_ticks_distributed']}")
|
|
print(f"Distribution errors: {stats['distribution_errors']}")
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
#### 1. No Data Available
|
|
|
|
```python
|
|
base_input = provider.get_base_data_input('ETH/USDT')
|
|
|
|
if base_input is None:
|
|
# Check if data provider is started
|
|
if not provider.data_maintenance_active:
|
|
provider.start_automatic_data_maintenance()
|
|
|
|
# Check if COB collection is started
|
|
if not provider.cob_collection_active:
|
|
provider.start_cob_collection()
|
|
```
|
|
|
|
#### 2. Incomplete Data
|
|
|
|
```python
|
|
if base_input:
|
|
# Check frame counts
|
|
print(f"1s frames: {len(base_input.ohlcv_1s)}")
|
|
print(f"1m frames: {len(base_input.ohlcv_1m)}")
|
|
print(f"1h frames: {len(base_input.ohlcv_1h)}")
|
|
print(f"1d frames: {len(base_input.ohlcv_1d)}")
|
|
|
|
# Wait for data to accumulate
|
|
if len(base_input.ohlcv_1s) < 100:
|
|
print("Waiting for more data...")
|
|
time.sleep(60) # Wait 1 minute
|
|
```
|
|
|
|
#### 3. COB Data Missing
|
|
|
|
```python
|
|
if base_input and base_input.cob_data is None:
|
|
# Check COB collection status
|
|
if not provider.cob_collection_active:
|
|
provider.start_cob_collection()
|
|
|
|
# Check WebSocket status
|
|
if hasattr(provider, 'enhanced_cob_websocket'):
|
|
ws = provider.enhanced_cob_websocket
|
|
status = ws.status.get('ETH/USDT')
|
|
print(f"WebSocket connected: {status.connected}")
|
|
print(f"Last message: {status.last_message_time}")
|
|
```
|
|
|
|
#### 4. Price Data Stale
|
|
|
|
```python
|
|
# Force refresh price
|
|
price = provider.get_live_price_from_api('ETH/USDT')
|
|
|
|
# Check cache freshness
|
|
if 'ETH/USDT' in provider.live_price_cache:
|
|
cached_price, timestamp = provider.live_price_cache['ETH/USDT']
|
|
age = datetime.now() - timestamp
|
|
print(f"Price cache age: {age.total_seconds()}s")
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Always Validate Data
|
|
|
|
```python
|
|
base_input = provider.get_base_data_input('ETH/USDT')
|
|
|
|
if base_input and base_input.validate():
|
|
# Safe to use for inference
|
|
model_output = model.predict(base_input)
|
|
else:
|
|
# Log and skip inference
|
|
logger.warning("Invalid or incomplete data, skipping inference")
|
|
```
|
|
|
|
### 2. Handle Missing Data Gracefully
|
|
|
|
```python
|
|
# Never use synthetic data
|
|
if base_input is None:
|
|
logger.error("No data available")
|
|
return None # Don't proceed with inference
|
|
|
|
# Check specific components
|
|
if base_input.cob_data is None:
|
|
logger.warning("COB data unavailable, using OHLCV only")
|
|
# Proceed with reduced features or skip
|
|
```
|
|
|
|
### 3. Store Model Outputs
|
|
|
|
```python
|
|
# Always store outputs for cross-model feeding
|
|
output = model.predict(base_input)
|
|
provider.store_model_output(output)
|
|
|
|
# Other models can now access this output
|
|
```
|
|
|
|
### 4. Monitor Data Quality
|
|
|
|
```python
|
|
# Implement quality checks
|
|
def check_data_quality(base_input):
|
|
if not base_input:
|
|
return 0.0
|
|
|
|
score = 0.0
|
|
|
|
# OHLCV completeness (40%)
|
|
ohlcv_score = min(1.0, len(base_input.ohlcv_1s) / 300) * 0.4
|
|
score += ohlcv_score
|
|
|
|
# COB availability (30%)
|
|
cob_score = 0.3 if base_input.cob_data else 0.0
|
|
score += cob_score
|
|
|
|
# Pivot points (20%)
|
|
pivot_score = 0.2 if base_input.pivot_points else 0.0
|
|
score += pivot_score
|
|
|
|
# Freshness (10%)
|
|
age = (datetime.now() - base_input.timestamp).total_seconds()
|
|
freshness_score = max(0, 1.0 - age / 60) * 0.1 # Decay over 1 minute
|
|
score += freshness_score
|
|
|
|
return score
|
|
|
|
# Use quality score
|
|
quality = check_data_quality(base_input)
|
|
if quality < 0.8:
|
|
logger.warning(f"Low data quality: {quality:.2f}")
|
|
```
|
|
|
|
## File Locations
|
|
|
|
- **Core DataProvider**: `core/data_provider.py`
|
|
- **Standardized Provider**: `core/standardized_data_provider.py`
|
|
- **Enhanced COB WebSocket**: `core/enhanced_cob_websocket.py`
|
|
- **Williams Market Structure**: `core/williams_market_structure.py`
|
|
- **Data Models**: `core/data_models.py`
|
|
- **Model Output Manager**: `core/model_output_manager.py`
|
|
- **COBY System**: `COBY/` directory
|
|
|
|
## Additional Resources
|
|
|
|
- **Requirements**: `.kiro/specs/1.multi-modal-trading-system/requirements.md`
|
|
- **Design**: `.kiro/specs/1.multi-modal-trading-system/design.md`
|
|
- **Tasks**: `.kiro/specs/1.multi-modal-trading-system/tasks.md`
|
|
- **Audit Summary**: `.kiro/specs/1.multi-modal-trading-system/AUDIT_SUMMARY.md`
|
|
|
|
---
|
|
|
|
**Last Updated**: January 9, 2025
|
|
**Version**: 1.0
|