229 lines
7.1 KiB
Markdown
229 lines
7.1 KiB
Markdown
# Event-Driven Inference Training System
|
|
|
|
## Overview
|
|
|
|
This system provides a flexible, efficient, and robust training pipeline that:
|
|
1. **Stores inference frames by reference** (not copying 600 candles every second)
|
|
2. **Uses DuckDB** for efficient data storage and retrieval
|
|
3. **Subscribes to events** (candle completion, pivot points) for training triggers
|
|
4. **Supports multiple training methods** (backprop for Transformer, others for different models)
|
|
|
|
## Architecture
|
|
|
|
### Components
|
|
|
|
1. **InferenceTrainingCoordinator** (`inference_training_system.py`)
|
|
- Manages inference frame references
|
|
- Matches inference frames to actual results
|
|
- Distributes training events to subscribers
|
|
|
|
2. **TrainingEventSubscriber** (interface)
|
|
- Implemented by training adapters
|
|
- Receives callbacks for candle completion and pivot events
|
|
|
|
3. **DataProvider Extensions**
|
|
- `subscribe_candle_completion()` - Subscribe to candle completion events
|
|
- `subscribe_pivot_events()` - Subscribe to pivot events (L2L, L2H, etc.)
|
|
|
|
4. **DuckDB Storage**
|
|
- Stores OHLCV data, MA indicators, pivots
|
|
- Efficient queries by timestamp range
|
|
- No data copying - just references
|
|
|
|
## Data Flow
|
|
|
|
### 1. Inference Phase
|
|
|
|
```
|
|
Model Inference
|
|
↓
|
|
Create InferenceFrameReference
|
|
↓
|
|
Store reference (timestamp range, norm_params, prediction metadata)
|
|
↓
|
|
Register with InferenceTrainingCoordinator
|
|
```
|
|
|
|
**No copying** - just store:
|
|
- `data_range_start` / `data_range_end` (timestamp range for 600 candles)
|
|
- `norm_params` (small dict)
|
|
- `predicted_action`, `predicted_candle`, `confidence`
|
|
- `target_timestamp` (for candles - when result will be available)
|
|
|
|
### 2. Training Trigger Phase
|
|
|
|
#### Time-Based (Candle Completion)
|
|
|
|
```
|
|
Candle Closes
|
|
↓
|
|
DataProvider emits CandleCompletionEvent
|
|
↓
|
|
InferenceTrainingCoordinator matches inference frames
|
|
↓
|
|
Calls subscriber.on_candle_completion(event, inference_ref)
|
|
↓
|
|
Training adapter retrieves data from DuckDB using reference
|
|
↓
|
|
Train model with actual candle result
|
|
```
|
|
|
|
#### Event-Based (Pivot Points)
|
|
|
|
```
|
|
Pivot Detected (L2L, L2H, etc.)
|
|
↓
|
|
DataProvider emits PivotEvent
|
|
↓
|
|
InferenceTrainingCoordinator finds matching inference frames
|
|
↓
|
|
Calls subscriber.on_pivot_event(event, inference_refs)
|
|
↓
|
|
Training adapter retrieves data from DuckDB
|
|
↓
|
|
Train model with pivot result
|
|
```
|
|
|
|
## Implementation Steps
|
|
|
|
### Step 1: Extend DataProvider
|
|
|
|
Add subscription methods to `core/data_provider.py`:
|
|
|
|
```python
|
|
def subscribe_candle_completion(self, callback: Callable, symbol: str, timeframe: str):
|
|
"""Subscribe to candle completion events"""
|
|
# Register callback
|
|
# Emit event when candle closes
|
|
|
|
def subscribe_pivot_events(self, callback: Callable, symbol: str, timeframe: str, pivot_types: List[str]):
|
|
"""Subscribe to pivot events (L2L, L2H, etc.)"""
|
|
# Register callback
|
|
# Emit event when pivot detected
|
|
```
|
|
|
|
### Step 2: Update RealTrainingAdapter
|
|
|
|
Make `RealTrainingAdapter` implement `TrainingEventSubscriber`:
|
|
|
|
```python
|
|
class RealTrainingAdapter(TrainingEventSubscriber):
|
|
def __init__(self, ...):
|
|
# Initialize InferenceTrainingCoordinator
|
|
self.training_coordinator = InferenceTrainingCoordinator(
|
|
data_provider=self.data_provider,
|
|
duckdb_storage=self.data_provider.duckdb_storage
|
|
)
|
|
|
|
# Subscribe to events
|
|
self.training_coordinator.subscribe_to_candle_completion(
|
|
self, symbol='ETH/USDT', timeframe='1m'
|
|
)
|
|
self.training_coordinator.subscribe_to_pivot_events(
|
|
self, symbol='ETH/USDT', timeframe='1m',
|
|
pivot_types=['L2L', 'L2H', 'L3L', 'L3H']
|
|
)
|
|
|
|
def on_candle_completion(self, event: CandleCompletionEvent, inference_ref: Optional[InferenceFrameReference]):
|
|
"""Called when candle completes"""
|
|
if not inference_ref:
|
|
return # No matching inference frame
|
|
|
|
# Retrieve inference data from DuckDB
|
|
model_inputs = self.training_coordinator.get_inference_data(inference_ref)
|
|
if not model_inputs:
|
|
return
|
|
|
|
# Create training batch with actual candle
|
|
batch = self._create_training_batch(model_inputs, event.ohlcv, inference_ref)
|
|
|
|
# Train model (backprop for Transformer, other methods for other models)
|
|
self._train_on_batch(batch, inference_ref)
|
|
|
|
def on_pivot_event(self, event: PivotEvent, inference_refs: List[InferenceFrameReference]):
|
|
"""Called when pivot detected"""
|
|
for inference_ref in inference_refs:
|
|
# Retrieve inference data
|
|
model_inputs = self.training_coordinator.get_inference_data(inference_ref)
|
|
if not model_inputs:
|
|
continue
|
|
|
|
# Create training batch with pivot result
|
|
batch = self._create_pivot_training_batch(model_inputs, event, inference_ref)
|
|
|
|
# Train model
|
|
self._train_on_batch(batch, inference_ref)
|
|
```
|
|
|
|
### Step 3: Update Inference Loop
|
|
|
|
In `_realtime_inference_loop()`, register inference frames:
|
|
|
|
```python
|
|
# After making prediction
|
|
prediction = self._make_realtime_prediction(...)
|
|
|
|
# Create inference frame reference
|
|
inference_ref = InferenceFrameReference(
|
|
inference_id=str(uuid.uuid4()),
|
|
symbol=symbol,
|
|
timeframe=timeframe,
|
|
prediction_timestamp=datetime.now(timezone.utc),
|
|
target_timestamp=next_candle_time, # For candles
|
|
data_range_start=start_time, # 600 candles before
|
|
data_range_end=current_time,
|
|
norm_params=norm_params,
|
|
predicted_action=prediction['action'],
|
|
predicted_candle=prediction['predicted_candle'],
|
|
confidence=prediction['confidence']
|
|
)
|
|
|
|
# Register with coordinator (no copying!)
|
|
self.training_coordinator.register_inference_frame(inference_ref)
|
|
```
|
|
|
|
## Benefits
|
|
|
|
1. **Memory Efficient**: No copying 600 candles every second
|
|
2. **Flexible**: Supports time-based (candles) and event-based (pivots) training
|
|
3. **Robust**: Event-driven architecture with proper error handling
|
|
4. **Simple**: Clear separation of concerns
|
|
5. **Scalable**: DuckDB handles efficient queries
|
|
6. **Extensible**: Easy to add new training methods or event types
|
|
|
|
## DuckDB Schema Extensions
|
|
|
|
Ensure DuckDB stores:
|
|
- OHLCV data (already exists)
|
|
- MA indicators (add to ohlcv_data or separate table)
|
|
- Pivot points (add pivot_data table)
|
|
|
|
```sql
|
|
-- Add technical indicators to ohlcv_data
|
|
ALTER TABLE ohlcv_data ADD COLUMN sma_10 DOUBLE;
|
|
ALTER TABLE ohlcv_data ADD COLUMN sma_20 DOUBLE;
|
|
ALTER TABLE ohlcv_data ADD COLUMN ema_12 DOUBLE;
|
|
-- ... etc
|
|
|
|
-- Create pivot points table
|
|
CREATE TABLE IF NOT EXISTS pivot_points (
|
|
id INTEGER PRIMARY KEY,
|
|
symbol VARCHAR NOT NULL,
|
|
timeframe VARCHAR NOT NULL,
|
|
timestamp BIGINT NOT NULL,
|
|
price DOUBLE NOT NULL,
|
|
pivot_type VARCHAR NOT NULL, -- 'L2L', 'L2H', etc.
|
|
level INTEGER NOT NULL,
|
|
strength DOUBLE NOT NULL,
|
|
UNIQUE(symbol, timeframe, timestamp, pivot_type)
|
|
);
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
1. Implement DataProvider subscription methods
|
|
2. Update RealTrainingAdapter to use InferenceTrainingCoordinator
|
|
3. Extend DuckDB schema for indicators and pivots
|
|
4. Test with live inference
|
|
5. Add support for other model types (not just Transformer)
|