rl cob subscription model

This commit is contained in:
Dobromir Popov
2025-06-24 19:07:42 +03:00
parent 6702a490dd
commit 97f7f54c30
6 changed files with 405 additions and 43 deletions

29
.vscode/launch.json vendored
View File

@ -110,35 +110,8 @@
"COB_ETH_BUCKET_SIZE": "1"
},
"preLaunchTask": "Kill Stale Processes"
},
{
"name": "🎯 Optimized COB System (No Redundancy)",
"type": "python",
"request": "launch",
"program": "run_optimized_cob_system.py",
"console": "integratedTerminal",
"justMyCode": false,
"env": {
"PYTHONUNBUFFERED": "1",
"COB_BTC_BUCKET_SIZE": "10",
"COB_ETH_BUCKET_SIZE": "1"
},
"preLaunchTask": "Kill Stale Processes"
},
{
"name": "🌐 Simple COB Dashboard (Working)",
"type": "python",
"request": "launch",
"program": "run_simple_cob_dashboard.py",
"console": "integratedTerminal",
"justMyCode": false,
"env": {
"PYTHONUNBUFFERED": "1",
"COB_BTC_BUCKET_SIZE": "10",
"COB_ETH_BUCKET_SIZE": "1"
},
"preLaunchTask": "Kill Stale Processes"
}
],
"compounds": [
{

View File

@ -0,0 +1,211 @@
# COB Integration into Enhanced Dashboard - Implementation Summary
## **Overview**
Successfully integrated **COB (Consolidated Order Book) data visualization and training pipeline connectivity** into the main enhanced trading dashboard. The dashboard now displays real-time market microstructure data and shows how COB data flows into the training pipeline.
## **Problem Solved**
### **Initial Architecture Issue:**
- **Enhanced Training Loop** (background) - Used `EnhancedTradingOrchestrator` with **full COB integration**
- **Main Trading Dashboard** (port 8051) - Used basic `TradingOrchestrator` with **NO COB integration**
### **Solution Implemented:**
- **Unified Architecture** - Both systems now use `EnhancedTradingOrchestrator` with **full COB integration**
- **COB Data Visualization** - Dashboard displays real-time COB data from multiple exchanges
- **Training Pipeline Integration** - Shows COB data flow: Market → CNN Features → RL States
## **Key Changes Made**
### **1. Enhanced Orchestrator Integration (`main.py`)**
```python
# OLD: Basic orchestrator without COB
dashboard_orchestrator = TradingOrchestrator(data_provider=data_provider)
# NEW: Enhanced orchestrator WITH COB integration
dashboard_orchestrator = EnhancedTradingOrchestrator(
data_provider=data_provider,
symbols=config.get('symbols', ['ETH/USDT']),
enhanced_rl_training=True, # Enable RL training display
model_registry=model_registry
)
```
### **2. COB Imports Added (`web/dashboard.py`)**
```python
# Import COB integration components
from core.cob_integration import COBIntegration
from core.multi_exchange_cob_provider import MultiExchangeCOBProvider, COBSnapshot
```
### **3. COB Visualization Section Added**
```html
<!-- COB (Consolidated Order Book) Visualization Section -->
<div class="card mb-3">
<div class="card-body">
<h6><i class="fas fa-layer-group me-2"></i>
Consolidated Order Book (COB) - Real-time Market Microstructure
</h6>
<div id="cob-visualization-content" style="height: 400px; overflow-y: auto;"></div>
</div>
</div>
```
### **4. COB Dashboard Callback Integration**
- Added `Output('cob-visualization-content', 'children')` to dashboard callback
- Added `_create_cob_visualization_content()` method to generate COB display
- Added COB content to callback return tuple
## **COB Data Displayed**
### **Per Symbol (ETH/USDT, BTC/USDT):**
-**CNN Features Status** - Shape and ML training readiness
-**RL State Status** - Shape and DQN training readiness
-**Mid Price** - Volume-weighted consolidated price
-**Spread** - Bid-ask spread in basis points
-**Bid/Ask Liquidity** - Total USD liquidity on each side
-**Active Exchanges** - Which exchanges are providing data
-**Order Book Levels** - Number of bid/ask levels consolidated
### **Training Pipeline Integration:**
-**COB → RL Training Status** - Shows if COB data flows into training
-**Market Microstructure Pipeline** - Real-time data → CNN features → RL states
-**COB Updates Counter** - Number of symbols receiving live COB data
-**Performance Metrics** - COB integration health monitoring
## **Training Pipeline Flow**
```
Real-time Market Data
Multi-Exchange COB Provider (Binance, Coinbase, etc.)
COB Integration (Consolidation + Features)
CNN Features (Market microstructure patterns)
RL States (Trading decision inputs)
Enhanced Trading Orchestrator
Trading Decisions & Execution
```
## **Dashboard Access & Features**
### **Main Enhanced Dashboard:**
- **URL:** `http://127.0.0.1:8051`
- **Features:** Live trading, COB visualization, RL training monitoring, Position management
- **COB Section:** Real-time market microstructure display
- **Training Integration:** Shows how COB data feeds the ML pipeline
### **COB Data Display:**
- **Real-time Updates** - Refreshes automatically with dashboard
- **Multi-Symbol Support** - ETH/USDT and BTC/USDT
- **Training Status** - Shows COB integration with RL training
- **Exchange Breakdown** - Which exchanges provide liquidity
- **Error Handling** - Graceful fallbacks when COB data unavailable
## **Technical Implementation**
### **COB Detection Logic:**
```python
# Check for enhanced orchestrator with COB capabilities
if not hasattr(self.orchestrator, 'latest_cob_features') or not hasattr(self.orchestrator, 'cob_integration'):
content.append(html.P("COB integration not available - using basic orchestrator", className="text-warning"))
return content
```
### **COB Data Retrieval:**
```python
# Get real-time COB features and states
cob_features = getattr(self.orchestrator, 'latest_cob_features', {}).get(symbol)
cob_state = getattr(self.orchestrator, 'latest_cob_state', {}).get(symbol)
# Get consolidated order book snapshot
if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration:
cob_snapshot = self.orchestrator.cob_integration.get_cob_snapshot(symbol)
```
## **Error Handling & Fallbacks**
### **COB Integration Not Available:**
- Display warning message: "COB integration not available - using basic orchestrator"
- Continue dashboard operation without COB data
- Graceful degradation to basic functionality
### **COB Data Temporarily Unavailable:**
- Show "COB data loading..." or "COB snapshot not available"
- Continue refreshing until data becomes available
- Error logging without breaking dashboard
## **Performance Considerations**
### **Optimized COB Display:**
- **Cached COB Content** - Avoid regenerating expensive content every update
- **Error Isolation** - COB errors don't break main dashboard
- **Minimal Data Fetching** - Only fetch COB data when orchestrator supports it
- **Background Processing** - COB integration runs in parallel threads
## **Benefits Achieved**
### **✅ Unified Architecture:**
- Both training loop and dashboard use same enhanced orchestrator
- Eliminated redundant implementations
- Consistent COB data across all components
### **✅ Real-time Visibility:**
- Live COB data visualization on main dashboard
- Training pipeline integration status
- Market microstructure monitoring
### **✅ Enhanced Trading Intelligence:**
- COB data feeds CNN features for pattern recognition
- RL states incorporate order book dynamics
- Multi-exchange liquidity analysis
### **✅ Operational Monitoring:**
- COB integration health status
- Data flow monitoring (Market → CNN → RL)
- Exchange connectivity status
## **Launch Instructions**
### **Start Enhanced System:**
```bash
python main.py
```
### **Access Dashboard:**
- **Main Dashboard:** http://127.0.0.1:8051
- **COB Section:** Scroll down to "Consolidated Order Book (COB)" section
- **Training Status:** Check "COB → Training Pipeline Status"
## **Verification Checklist**
### **✅ COB Integration Active:**
1. Dashboard shows "COB data integrated into RL training pipeline"
2. CNN Features show valid shapes (e.g., Shape (64, 20) - Ready for ML training)
3. RL State shows valid shapes (e.g., Shape (128,) - Ready for DQN training)
4. Mid Price updates in real-time
5. Active Exchanges list shows "binance" and others
6. Order Book Levels show bid/ask counts
### **✅ Training Pipeline Connected:**
1. "COB → Training Pipeline Status" shows green checkmarks
2. "Real-time market microstructure → CNN features → RL states" displayed
3. "COB Updates: X symbols receiving data" shows > 0 symbols
4. No COB errors in logs
## **Future Enhancements**
### **Potential Additions:**
- **COB Chart Visualization** - Real-time order book depth charts
- **Exchange Arbitrage Detection** - Price differences across exchanges
- **Liquidity Heatmaps** - Visual representation of bid/ask density
- **COB-based Alerts** - Notifications for unusual market microstructure events
- **Historical COB Analysis** - Store and analyze past COB patterns
## **Status: ✅ IMPLEMENTATION COMPLETE**
The enhanced dashboard now successfully displays COB data and shows its integration with the training pipeline. Both the dashboard UI and the background training loop use the same enhanced orchestrator with full COB capabilities, eliminating redundancy and providing comprehensive market microstructure monitoring.

View File

@ -643,8 +643,8 @@ class MultiExchangeCOBProvider:
self.exchange_update_counts[exchange_name] += 1
# Log every 100th update
if self.exchange_update_counts[exchange_name] % 100 == 0:
# Log every 1000th update
if self.exchange_update_counts[exchange_name] % 1000 == 0:
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}")
except Exception as e:
@ -727,8 +727,8 @@ class MultiExchangeCOBProvider:
await self._add_trade_to_svp(symbol, trade)
# Log every 100th trade
if len(self.session_trades[symbol]) % 100 == 0:
# Log every 1000th trade
if len(self.session_trades[symbol]) % 1000 == 0:
logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}")
except Exception as e:

View File

@ -23,7 +23,7 @@ import torch.optim as optim
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable, Tuple
from collections import deque, defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, asdict
import json
import time
import threading
@ -66,6 +66,30 @@ class SignalAccumulator:
if self.last_reset_time is None:
self.last_reset_time = datetime.now()
@dataclass
class TrainingUpdate:
"""Training update event data"""
timestamp: datetime
symbol: str
epoch: int
loss: float
batch_size: int
learning_rate: float
accuracy: float
avg_confidence: float
@dataclass
class TradeSignal:
"""Trade signal event data"""
timestamp: datetime
symbol: str
action: str # 'BUY', 'SELL', 'HOLD'
confidence: float
quantity: float
price: float
signals_count: int
reason: str
class MassiveRLNetwork(nn.Module):
"""
Massive 1B+ parameter RL network optimized for real-time COB trading
@ -193,7 +217,7 @@ class MassiveRLNetwork(nn.Module):
class RealtimeRLCOBTrader:
"""
Real-time RL trader using COB data
Real-time RL trader using COB data with comprehensive subscriber system
"""
def __init__(self,
@ -231,9 +255,17 @@ class RealtimeRLCOBTrader:
)
self.scalers[symbol] = torch.cuda.amp.GradScaler()
# Subscriber system for real-time events
self.prediction_subscribers: List[Callable[[PredictionResult], None]] = []
self.training_subscribers: List[Callable[[TrainingUpdate], None]] = []
self.signal_subscribers: List[Callable[[TradeSignal], None]] = []
self.async_prediction_subscribers: List[Callable[[PredictionResult], Any]] = []
self.async_training_subscribers: List[Callable[[TrainingUpdate], Any]] = []
self.async_signal_subscribers: List[Callable[[TradeSignal], Any]] = []
# COB integration
self.cob_integration = COBIntegration(symbols=self.symbols)
self.cob_integration.add_dqn_callback(self._on_cob_update)
self.cob_integration.add_dqn_callback(self._on_cob_update_sync)
# Data storage for real-time training
self.prediction_history: Dict[str, deque] = {}
@ -280,6 +312,111 @@ class RealtimeRLCOBTrader:
logger.info(f"RealtimeRLCOBTrader initialized for symbols: {self.symbols}")
logger.info(f"Inference interval: {self.inference_interval_ms}ms")
logger.info(f"Required confident predictions: {self.required_confident_predictions}")
# Subscriber system methods
def add_prediction_subscriber(self, callback: Callable[[PredictionResult], None]):
"""Add a subscriber for prediction events"""
self.prediction_subscribers.append(callback)
logger.info(f"Added prediction subscriber, total: {len(self.prediction_subscribers)}")
def add_training_subscriber(self, callback: Callable[[TrainingUpdate], None]):
"""Add a subscriber for training events"""
self.training_subscribers.append(callback)
logger.info(f"Added training subscriber, total: {len(self.training_subscribers)}")
def add_signal_subscriber(self, callback: Callable[[TradeSignal], None]):
"""Add a subscriber for trade signal events"""
self.signal_subscribers.append(callback)
logger.info(f"Added signal subscriber, total: {len(self.signal_subscribers)}")
def add_async_prediction_subscriber(self, callback: Callable[[PredictionResult], Any]):
"""Add an async subscriber for prediction events"""
self.async_prediction_subscribers.append(callback)
logger.info(f"Added async prediction subscriber, total: {len(self.async_prediction_subscribers)}")
def add_async_training_subscriber(self, callback: Callable[[TrainingUpdate], Any]):
"""Add an async subscriber for training events"""
self.async_training_subscribers.append(callback)
logger.info(f"Added async training subscriber, total: {len(self.async_training_subscribers)}")
def add_async_signal_subscriber(self, callback: Callable[[TradeSignal], Any]):
"""Add an async subscriber for trade signal events"""
self.async_signal_subscribers.append(callback)
logger.info(f"Added async signal subscriber, total: {len(self.async_signal_subscribers)}")
async def _emit_prediction(self, prediction: PredictionResult):
"""Emit prediction to all subscribers"""
try:
# Sync subscribers
for callback in self.prediction_subscribers:
try:
callback(prediction)
except Exception as e:
logger.warning(f"Error in prediction subscriber: {e}")
# Async subscribers
for callback in self.async_prediction_subscribers:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(prediction))
else:
callback(prediction)
except Exception as e:
logger.warning(f"Error in async prediction subscriber: {e}")
except Exception as e:
logger.error(f"Error emitting prediction: {e}")
async def _emit_training_update(self, update: TrainingUpdate):
"""Emit training update to all subscribers"""
try:
# Sync subscribers
for callback in self.training_subscribers:
try:
callback(update)
except Exception as e:
logger.warning(f"Error in training subscriber: {e}")
# Async subscribers
for callback in self.async_training_subscribers:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(update))
else:
callback(update)
except Exception as e:
logger.warning(f"Error in async training subscriber: {e}")
except Exception as e:
logger.error(f"Error emitting training update: {e}")
async def _emit_trade_signal(self, signal: TradeSignal):
"""Emit trade signal to all subscribers"""
try:
# Sync subscribers
for callback in self.signal_subscribers:
try:
callback(signal)
except Exception as e:
logger.warning(f"Error in signal subscriber: {e}")
# Async subscribers
for callback in self.async_signal_subscribers:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(signal))
else:
callback(signal)
except Exception as e:
logger.warning(f"Error in async signal subscriber: {e}")
except Exception as e:
logger.error(f"Error emitting trade signal: {e}")
def _on_cob_update_sync(self, symbol: str, data: Dict):
"""Sync wrapper for async COB update handler"""
try:
# Schedule the async method
asyncio.create_task(self._on_cob_update(symbol, data))
except Exception as e:
logger.error(f"Error scheduling COB update for {symbol}: {e}")
async def start(self):
"""Start the real-time RL trader"""
@ -484,6 +621,9 @@ class RealtimeRLCOBTrader:
# Store prediction for later training
self.prediction_history[symbol].append(result)
# Emit prediction to subscribers
await self._emit_prediction(result)
# Add to signal accumulator if confident enough
if prediction['confidence'] >= self.min_confidence_threshold:
self._add_signal(symbol, result)
@ -606,7 +746,7 @@ class RealtimeRLCOBTrader:
return # No action for sideways
# Execute trade signal
await self._execute_trade_signal(symbol, action, avg_confidence, recent_signals)
await self._execute_trade_signal(symbol, action, float(avg_confidence), recent_signals)
# Reset accumulator after trade signal
self._reset_accumulator(symbol)
@ -624,6 +764,21 @@ class RealtimeRLCOBTrader:
if self.price_history[symbol]:
current_price = self.price_history[symbol][-1]['price']
# Create trade signal for emission
trade_signal = TradeSignal(
timestamp=datetime.now(),
symbol=symbol,
action=action,
confidence=confidence,
quantity=1.0, # Default quantity
price=current_price,
signals_count=len(signals),
reason=f"Consensus of {len(signals)} predictions"
)
# Emit trade signal to subscribers
await self._emit_trade_signal(trade_signal)
# Execute through trading executor if available
if self.trading_executor and current_price > 0:
success = self.trading_executor.execute_signal(
@ -707,6 +862,25 @@ class RealtimeRLCOBTrader:
)
stats['last_training_time'] = datetime.now()
# Calculate accuracy and confidence
accuracy = stats['successful_predictions'] / max(1, stats['total_predictions']) * 100
avg_confidence = sum(p.confidence for p in batch_predictions) / len(batch_predictions)
# Create training update for emission
training_update = TrainingUpdate(
timestamp=datetime.now(),
symbol=symbol,
epoch=stats['total_training_steps'],
loss=loss,
batch_size=batch_size,
learning_rate=self.optimizers[symbol].param_groups[0]['lr'],
accuracy=accuracy,
avg_confidence=avg_confidence
)
# Emit training update to subscribers
await self._emit_training_update(training_update)
logger.debug(f"Training {symbol}: loss={loss:.6f}, batch_size={batch_size}")
except Exception as e:

View File

@ -18,7 +18,7 @@ import sys
import json
import os
from datetime import datetime
from typing import Dict, Any
from typing import Dict, Any, Optional
# Local imports
from core.realtime_rl_cob_trader import RealtimeRLCOBTrader
@ -44,9 +44,10 @@ class RealtimeRLCOBTraderLauncher:
def __init__(self, config_path: str = "config.yaml"):
"""Initialize launcher with configuration"""
self.config_path = config_path
self.config = load_config(config_path)
self.trader = None
self.trading_executor = None
self.trader: Optional[RealtimeRLCOBTrader] = None
self.trading_executor: Optional[TradingExecutor] = None
self.running = False
# Setup signal handlers for graceful shutdown
@ -108,10 +109,7 @@ class RealtimeRLCOBTraderLauncher:
simulation_mode = True
# Initialize trading executor
self.trading_executor = TradingExecutor(
simulation_mode=simulation_mode,
mexc_config=mexc_config
)
self.trading_executor = TradingExecutor(self.config_path)
logger.info(f"Trading Executor initialized in {'SIMULATION' if simulation_mode else 'LIVE'} mode")
@ -132,6 +130,9 @@ class RealtimeRLCOBTraderLauncher:
model_checkpoint_dir = rl_config.get('model_checkpoint_dir', 'models/realtime_rl_cob')
# Initialize RL trader
if self.trading_executor is None:
raise RuntimeError("Trading executor not initialized")
self.trader = RealtimeRLCOBTrader(
symbols=symbols,
trading_executor=self.trading_executor,
@ -151,6 +152,8 @@ class RealtimeRLCOBTraderLauncher:
logger.info("Starting Real-time RL COB Trading System...")
# Start RL trader (this will start COB integration internally)
if self.trader is None:
raise RuntimeError("RL trader not initialized")
await self.trader.start()
self.running = True

View File

@ -0,0 +1 @@