243 lines
7.3 KiB
Markdown
243 lines
7.3 KiB
Markdown
# Real-Time Prediction Loop - Integration Guide
|
|
|
|
## Problem Identified
|
|
|
|
**Your models are NEVER called for real-time predictions!**
|
|
|
|
- `model.predict()` only called ONCE at startup in `chain_inference()`
|
|
- NO continuous prediction loop exists
|
|
- ALL signals come from COB imbalance only
|
|
- CNN/DQN models sit idle despite being loaded
|
|
|
|
## Solution Created
|
|
|
|
**`core/realtime_prediction_loop.py`** - Continuous prediction system
|
|
|
|
### How It Works:
|
|
|
|
1. **Monitors Market Data**
|
|
- Detects new 1s candles → triggers prediction
|
|
- Detects new 1m candles → triggers prediction
|
|
- Detects pivot points → triggers prediction
|
|
- Runs periodic predictions every 5s
|
|
|
|
2. **Calls ALL Models**
|
|
```python
|
|
# CNN Model
|
|
cnn_prediction = self.orchestrator.cnn_model.predict(cnn_features)
|
|
|
|
# DQN Agent
|
|
action = self.orchestrator.dqn_agent.act(dqn_state)
|
|
|
|
# COB RL
|
|
cob_prediction = self.orchestrator.cob_rl_model.predict(cob_features)
|
|
```
|
|
|
|
3. **Combines Predictions**
|
|
- Voting system: each model contributes
|
|
- Confidence-weighted decisions
|
|
- Generates trading signals
|
|
|
|
---
|
|
|
|
## Integration Steps
|
|
|
|
### Step 1: Add to Orchestrator
|
|
|
|
```python
|
|
# File: core/orchestrator.py
|
|
|
|
from .realtime_prediction_loop import RealtimePredictionLoop
|
|
|
|
class TradingOrchestrator:
|
|
def __init__(self, ...):
|
|
# ... existing init ...
|
|
|
|
# Initialize prediction loop
|
|
self.prediction_loop = RealtimePredictionLoop(
|
|
orchestrator=self,
|
|
data_provider=self.data_provider
|
|
)
|
|
logger.info("Real-time prediction loop initialized")
|
|
|
|
async def start_prediction_loop(self):
|
|
"""Start continuous model predictions"""
|
|
if self.prediction_loop:
|
|
await self.prediction_loop.start()
|
|
|
|
def stop_prediction_loop(self):
|
|
"""Stop prediction loop"""
|
|
if self.prediction_loop:
|
|
self.prediction_loop.stop()
|
|
|
|
async def process_signal(self, signal: Dict):
|
|
"""Process trading signal from prediction loop"""
|
|
try:
|
|
logger.info(f"📥 Received prediction signal: {signal['action']} for {signal['symbol']} "
|
|
f"(conf: {signal['confidence']:.2f})")
|
|
|
|
# Execute through trading executor
|
|
if hasattr(self, 'trading_executor') and self.trading_executor:
|
|
result = self.trading_executor.execute_signal(
|
|
symbol=signal['symbol'],
|
|
action=signal['action'],
|
|
confidence=signal['confidence'],
|
|
current_price=signal['price']
|
|
)
|
|
|
|
if result:
|
|
logger.info(f" Signal executed successfully")
|
|
else:
|
|
logger.warning(f" Signal execution failed or blocked")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing prediction signal: {e}")
|
|
```
|
|
|
|
### Step 2: Add to Dashboard
|
|
|
|
```python
|
|
# File: web/clean_dashboard.py
|
|
|
|
class CleanTradingDashboard:
|
|
def __init__(self, ...):
|
|
# ... existing init ...
|
|
|
|
# Start prediction loop in background
|
|
if self.orchestrator:
|
|
import threading
|
|
def run_prediction_loop():
|
|
import asyncio
|
|
asyncio.run(self.orchestrator.start_prediction_loop())
|
|
|
|
self.prediction_thread = threading.Thread(
|
|
target=run_prediction_loop,
|
|
daemon=True,
|
|
name="PredictionLoop"
|
|
)
|
|
self.prediction_thread.start()
|
|
logger.info(" Real-time prediction loop started in background")
|
|
```
|
|
|
|
### Step 3: Update Data Provider (if needed)
|
|
|
|
```python
|
|
# File: core/data_provider.py
|
|
|
|
async def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100):
|
|
"""Get latest candles for a symbol and timeframe"""
|
|
try:
|
|
# Use existing cache or fetch from API
|
|
if hasattr(self, 'candle_cache'):
|
|
key = f"{symbol}_{timeframe}"
|
|
if key in self.candle_cache:
|
|
return self.candle_cache[key][-limit:]
|
|
|
|
# Fetch from Binance/exchange
|
|
candles = await self.fetch_candles(symbol, timeframe, limit)
|
|
return candles
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest candles: {e}")
|
|
return []
|
|
```
|
|
|
|
---
|
|
|
|
## Expected Behavior After Integration
|
|
|
|
### Before (Current):
|
|
```
|
|
INFO:web.clean_dashboard:🔗 Running initial chained inference...
|
|
INFO:web.clean_dashboard:🔗 Running chained inference for ETH/USDT with 10 steps
|
|
WARNING:core.orchestrator:No model predictions available for step 0
|
|
WARNING:web.clean_dashboard: Chained inference returned no predictions
|
|
|
|
# Then nothing... models never called again
|
|
```
|
|
|
|
### After (Expected):
|
|
```
|
|
INFO:core.realtime_prediction_loop: Starting Real-Time Prediction Loop
|
|
INFO:core.realtime_prediction_loop: Prediction loop started for ETH/USDT
|
|
INFO:core.realtime_prediction_loop: Prediction loop started for BTC/USDT
|
|
|
|
# Every 1-5 seconds:
|
|
INFO:core.realtime_prediction_loop:📊 New 1s candle detected for ETH/USDT - running predictions
|
|
INFO:core.realtime_prediction_loop: CNN prediction for ETH/USDT: {'action': 'BUY', 'confidence': 0.78}
|
|
INFO:core.realtime_prediction_loop: DQN prediction for ETH/USDT: BUY
|
|
INFO:core.realtime_prediction_loop: COB RL prediction for ETH/USDT: {'action': 'BUY', 'confidence': 0.82}
|
|
INFO:core.realtime_prediction_loop:📤 Trading signal sent for ETH/USDT: BUY (confidence: 0.80, trigger: new_1s_candle)
|
|
|
|
INFO:core.orchestrator:📥 Received prediction signal: BUY for ETH/USDT (conf: 0.80)
|
|
INFO:core.trading_executor:Executing BUY: 0.050000 ETH/USDT at $4191.25
|
|
```
|
|
|
|
---
|
|
|
|
## Monitoring & Validation
|
|
|
|
### Check if Prediction Loop is Running:
|
|
```python
|
|
# In dashboard or orchestrator logs, look for:
|
|
- " Real-time prediction loop started"
|
|
- "📊 New 1s candle detected" (every second)
|
|
- " CNN prediction" (model being called!)
|
|
- " DQN prediction" (model being called!)
|
|
- "📤 Trading signal sent"
|
|
```
|
|
|
|
### Your Breakpoint Should Now Trigger:
|
|
- **Before**: `predict()` called once at startup
|
|
- **After**: `predict()` called every 1-5 seconds!
|
|
|
|
### Model Usage Stats:
|
|
```python
|
|
# Track how often models are called
|
|
cnn_predictions_count = 0
|
|
dqn_predictions_count = 0
|
|
cob_predictions_count = 0
|
|
|
|
# Should see:
|
|
# CNN: ~100-200 predictions/minute
|
|
# DQN: ~100-200 predictions/minute
|
|
# COB: ~100-200 predictions/minute
|
|
```
|
|
|
|
---
|
|
|
|
## Files Modified
|
|
|
|
1. **core/realtime_prediction_loop.py** (NEW)
|
|
- Created prediction loop
|
|
|
|
2. ⏳ **core/orchestrator.py** (TO MODIFY)
|
|
- Add prediction_loop initialization
|
|
- Add start_prediction_loop() method
|
|
- Add process_signal() method
|
|
|
|
3. ⏳ **web/clean_dashboard.py** (TO MODIFY)
|
|
- Start prediction loop in background thread
|
|
|
|
4. ⏳ **core/data_provider.py** (MAY NEED)
|
|
- Ensure get_latest_candles() works properly
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. Integrate prediction loop into orchestrator (5 min)
|
|
2. Start loop from dashboard (5 min)
|
|
3. Test and verify models being called (10 min)
|
|
4. Monitor logs for continuous predictions (ongoing)
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
**Root Cause**: No continuous prediction loop - models loaded but never used
|
|
**Solution**: Created `RealtimePredictionLoop` to call `model.predict()` continuously
|
|
**Impact**: Models will now make 100-200 predictions per minute instead of 1 at startup
|
|
|
|
Your breakpoint on `predict()` should now trigger every 1-5 seconds! 🎯
|