better cob integration

This commit is contained in:
Dobromir Popov
2025-06-27 02:38:05 +03:00
parent 97ea27ea84
commit d791ab8b14
4 changed files with 678 additions and 450 deletions

View File

@ -13,10 +13,12 @@ This is the core orchestrator that:
import asyncio
import logging
import time
import threading
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from collections import deque
from .config import get_config
from .data_provider import DataProvider
@ -258,41 +260,322 @@ class TradingOrchestrator:
logger.error(f"Error initializing ML models: {e}")
def _initialize_cob_integration(self):
"""Initialize real-time COB integration for market microstructure data"""
"""Initialize real-time COB integration for market microstructure data with 5-minute data matrix"""
try:
if COB_INTEGRATION_AVAILABLE:
# Initialize COB integration with our symbols
self.cob_integration = COBIntegration(data_provider=self.data_provider, symbols=self.symbols )
logger.info("Initializing COB integration with 5-minute data matrix for all models")
# Import COB integration directly (same as working dashboard)
from core.cob_integration import COBIntegration
# Initialize COB integration with our symbols
self.cob_integration = COBIntegration(symbols=self.symbols)
# Register callbacks to receive real-time COB data
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
# Initialize 5-minute COB data matrix system
self.cob_matrix_duration = 300 # 5 minutes in seconds
self.cob_matrix_resolution = 1 # 1 second resolution
self.cob_matrix_size = self.cob_matrix_duration // self.cob_matrix_resolution # 300 samples
# COB data matrix storage - 5 minutes of 1-second snapshots
self.cob_data_matrix: Dict[str, deque] = {}
self.cob_feature_matrix: Dict[str, deque] = {}
self.cob_state_matrix: Dict[str, deque] = {}
# Initialize matrix storage for each symbol
for symbol in self.symbols:
# Raw COB snapshots (300 x COBSnapshot objects)
self.cob_data_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
# Register callbacks to receive real-time COB data
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
logger.info("COB Integration initialized - real-time market microstructure data available")
logger.info(f"COB symbols: {self.symbols}")
# COB integration will be started manually via start_cob_integration()
else:
logger.warning("COB Integration not available - models will use basic price data only")
# CNN feature matrix (300 x 400 features)
self.cob_feature_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
# DQN state matrix (300 x 200 state features)
self.cob_state_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
# Initialize COB data storage (legacy support)
self.latest_cob_snapshots = {}
self.cob_feature_cache = {}
self.cob_state_cache = {}
# COB matrix update tracking
self.last_cob_matrix_update = {}
self.cob_matrix_update_interval = 1.0 # Update every 1 second
# COB matrix statistics
self.cob_matrix_stats = {
'total_updates': 0,
'matrix_fills': {symbol: 0 for symbol in self.symbols},
'feature_generations': 0,
'model_feeds': 0
}
logger.info("COB integration initialized successfully with 5-minute data matrix")
logger.info(f"Matrix configuration: {self.cob_matrix_size} samples x 1s resolution")
logger.info("Real-time order book data matrix will be available for all models")
logger.info("COB provides: Multi-exchange consolidated order book with temporal context")
except Exception as e:
logger.error(f"Error initializing COB integration: {e}")
logger.info("COB integration will be disabled - dashboard will run with basic price data")
self.cob_integration = None
async def _start_cob_integration(self):
"""Start COB integration in background"""
logger.info("COB integration will be disabled - models will use basic price data")
async def start_cob_integration(self):
"""Start COB integration with matrix data collection"""
try:
if self.cob_integration:
await self.cob_integration.start()
logger.info("COB Integration started - real-time order book data streaming")
logger.info("Starting COB integration with 5-minute matrix collection...")
# Start COB integration in background thread
def start_cob_in_thread():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def cob_main():
await self.cob_integration.start()
# Keep running until stopped
while True:
await asyncio.sleep(1)
loop.run_until_complete(cob_main())
except Exception as e:
logger.error(f"Error in COB thread: {e}")
finally:
try:
loop.close()
except:
pass
import threading
self.cob_thread = threading.Thread(target=start_cob_in_thread, daemon=True)
self.cob_thread.start()
# Start matrix update worker
self._start_cob_matrix_worker()
logger.info("COB Integration started - 5-minute data matrix streaming active")
else:
logger.warning("COB integration is None - cannot start")
except Exception as e:
logger.error(f"Error starting COB integration: {e}")
self.cob_integration = None
def _start_cob_matrix_worker(self):
"""Start background worker for COB matrix updates"""
def matrix_worker():
try:
while True:
try:
current_time = time.time()
# Update matrix for each symbol
for symbol in self.symbols:
# Check if it's time to update this symbol's matrix
last_update = self.last_cob_matrix_update.get(symbol, 0)
if current_time - last_update >= self.cob_matrix_update_interval:
self._update_cob_matrix_for_symbol(symbol)
self.last_cob_matrix_update[symbol] = current_time
# Sleep for a short interval
time.sleep(0.5) # 500ms update cycle
except Exception as e:
logger.warning(f"Error in COB matrix worker: {e}")
time.sleep(5)
except Exception as e:
logger.error(f"COB matrix worker error: {e}")
# Start worker thread
matrix_thread = threading.Thread(target=matrix_worker, daemon=True)
matrix_thread.start()
logger.info("COB matrix worker started - updating every 1 second")
def _update_cob_matrix_for_symbol(self, symbol: str):
"""Update COB data matrix for a specific symbol"""
try:
if not self.cob_integration:
return
# Get latest COB snapshot
cob_snapshot = self.cob_integration.get_cob_snapshot(symbol)
if cob_snapshot:
# Add raw snapshot to matrix
self.cob_data_matrix[symbol].append(cob_snapshot)
# Generate CNN features (400 features)
cnn_features = self._generate_cob_cnn_features(symbol, cob_snapshot)
if cnn_features is not None:
self.cob_feature_matrix[symbol].append(cnn_features)
# Generate DQN state features (200 features)
dqn_features = self._generate_cob_dqn_features(symbol, cob_snapshot)
if dqn_features is not None:
self.cob_state_matrix[symbol].append(dqn_features)
# Update statistics
self.cob_matrix_stats['total_updates'] += 1
self.cob_matrix_stats['matrix_fills'][symbol] += 1
# Log progress every 100 updates
if self.cob_matrix_stats['total_updates'] % 100 == 0:
matrix_size = len(self.cob_data_matrix[symbol])
feature_size = len(self.cob_feature_matrix[symbol])
logger.info(f"COB Matrix Update #{self.cob_matrix_stats['total_updates']}: "
f"{symbol} matrix={matrix_size}/300, features={feature_size}/300")
except Exception as e:
logger.warning(f"Error updating COB matrix for {symbol}: {e}")
def _generate_cob_cnn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]:
"""Generate CNN features from COB snapshot (400 features)"""
try:
features = []
# Order book depth features (200 features: 20 levels x 5 features x 2 sides)
max_levels = 20
# Process bids (100 features: 20 levels x 5 features)
for i in range(max_levels):
if hasattr(cob_snapshot, 'consolidated_bids') and i < len(cob_snapshot.consolidated_bids):
level = cob_snapshot.consolidated_bids[i]
if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'):
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
features.extend([
price_offset,
getattr(level, 'total_volume_usd', 0) / 1000000, # Normalize to millions
getattr(level, 'total_size', 0) / 1000, # Normalize to thousands
len(getattr(level, 'exchange_breakdown', {})),
getattr(level, 'liquidity_score', 0.5)
])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
# Process asks (100 features: 20 levels x 5 features)
for i in range(max_levels):
if hasattr(cob_snapshot, 'consolidated_asks') and i < len(cob_snapshot.consolidated_asks):
level = cob_snapshot.consolidated_asks[i]
if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'):
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
features.extend([
price_offset,
getattr(level, 'total_volume_usd', 0) / 1000000,
getattr(level, 'total_size', 0) / 1000,
len(getattr(level, 'exchange_breakdown', {})),
getattr(level, 'liquidity_score', 0.5)
])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
# Market microstructure features (100 features)
features.extend([
getattr(cob_snapshot, 'spread_bps', 0) / 100, # Normalized spread
getattr(cob_snapshot, 'liquidity_imbalance', 0),
getattr(cob_snapshot, 'total_bid_liquidity', 0) / 1000000,
getattr(cob_snapshot, 'total_ask_liquidity', 0) / 1000000,
len(getattr(cob_snapshot, 'exchanges_active', [])) / 10, # Normalize to max 10 exchanges
])
# Pad remaining features to reach 400
while len(features) < 400:
features.append(0.0)
# Ensure exactly 400 features
features = features[:400]
return np.array(features, dtype=np.float32)
except Exception as e:
logger.warning(f"Error generating COB CNN features for {symbol}: {e}")
return np.zeros(400, dtype=np.float32)
def _generate_cob_dqn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]:
"""Generate DQN state features from COB snapshot (200 features)"""
try:
features = []
# Market state features (50 features)
features.extend([
getattr(cob_snapshot, 'volume_weighted_mid', 0) / 100000, # Normalized price
getattr(cob_snapshot, 'spread_bps', 0) / 100,
getattr(cob_snapshot, 'liquidity_imbalance', 0),
getattr(cob_snapshot, 'total_bid_liquidity', 0) / 1000000,
getattr(cob_snapshot, 'total_ask_liquidity', 0) / 1000000,
])
# Top 10 bid levels (50 features: 10 levels x 5 features)
for i in range(10):
if hasattr(cob_snapshot, 'consolidated_bids') and i < len(cob_snapshot.consolidated_bids):
level = cob_snapshot.consolidated_bids[i]
if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'):
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
features.extend([
price_offset,
getattr(level, 'total_volume_usd', 0) / 1000000,
getattr(level, 'total_size', 0) / 1000,
len(getattr(level, 'exchange_breakdown', {})),
getattr(level, 'liquidity_score', 0.5)
])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
# Top 10 ask levels (50 features: 10 levels x 5 features)
for i in range(10):
if hasattr(cob_snapshot, 'consolidated_asks') and i < len(cob_snapshot.consolidated_asks):
level = cob_snapshot.consolidated_asks[i]
if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'):
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
features.extend([
price_offset,
getattr(level, 'total_volume_usd', 0) / 1000000,
getattr(level, 'total_size', 0) / 1000,
len(getattr(level, 'exchange_breakdown', {})),
getattr(level, 'liquidity_score', 0.5)
])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
else:
features.extend([0.0, 0.0, 0.0, 0.0, 0.0])
# Exchange diversity and quality features (50 features)
active_exchanges = getattr(cob_snapshot, 'exchanges_active', [])
features.extend([
len(active_exchanges) / 10, # Normalized exchange count
1.0 if 'binance' in active_exchanges else 0.0,
1.0 if 'coinbase' in active_exchanges else 0.0,
1.0 if 'kraken' in active_exchanges else 0.0,
1.0 if 'huobi' in active_exchanges else 0.0,
])
# Pad remaining features to reach 200
while len(features) < 200:
features.append(0.0)
# Ensure exactly 200 features
features = features[:200]
return np.array(features, dtype=np.float32)
except Exception as e:
logger.warning(f"Error generating COB DQN features for {symbol}: {e}")
return np.zeros(200, dtype=np.float32)
def _on_cob_cnn_features(self, symbol: str, cob_data: Dict):
"""Handle CNN features from COB integration"""
"""Handle CNN features from COB integration - enhanced with matrix support"""
try:
if 'features' in cob_data:
self.latest_cob_features[symbol] = cob_data['features']
@ -312,9 +595,9 @@ class TradingOrchestrator:
except Exception as e:
logger.warning(f"Error processing COB CNN features for {symbol}: {e}")
def _on_cob_dqn_features(self, symbol: str, cob_data: Dict):
"""Handle DQN state features from COB integration"""
"""Handle DQN state features from COB integration - enhanced with matrix support"""
try:
if 'state' in cob_data:
self.latest_cob_state[symbol] = cob_data['state']
@ -330,9 +613,9 @@ class TradingOrchestrator:
except Exception as e:
logger.warning(f"Error processing COB DQN features for {symbol}: {e}")
def _on_cob_dashboard_data(self, symbol: str, cob_data: Dict):
"""Handle dashboard data from COB integration"""
"""Handle dashboard data from COB integration - enhanced with matrix support"""
try:
# Store raw COB snapshot for dashboard display
if self.cob_integration:
@ -343,21 +626,138 @@ class TradingOrchestrator:
except Exception as e:
logger.warning(f"Error processing COB dashboard data for {symbol}: {e}")
# COB Data Access Methods for Models
# Enhanced COB Data Access Methods for Models with 5-minute matrix support
def get_cob_features(self, symbol: str) -> Optional[np.ndarray]:
"""Get latest COB CNN features for a symbol"""
return self.latest_cob_features.get(symbol)
def get_cob_state(self, symbol: str) -> Optional[np.ndarray]:
"""Get latest COB DQN state features for a symbol"""
return self.latest_cob_state.get(symbol)
def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]:
"""Get latest COB snapshot for a symbol"""
return self.latest_cob_data.get(symbol)
def get_cob_feature_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]:
"""
Get COB feature matrix for CNN models (5-minute capped)
Args:
symbol: Trading symbol
sequence_length: Number of time steps to return (max 300 for 5 minutes)
Returns:
np.ndarray: Shape (sequence_length, 400) - CNN features over time
"""
try:
if symbol not in self.cob_feature_matrix:
return None
# Limit sequence length to available data and maximum 5 minutes
max_length = min(sequence_length, len(self.cob_feature_matrix[symbol]), 300)
if max_length == 0:
return None
# Get the most recent features
recent_features = list(self.cob_feature_matrix[symbol])[-max_length:]
# Stack into matrix
feature_matrix = np.stack(recent_features, axis=0)
# Pad if necessary to reach requested sequence length
if len(recent_features) < sequence_length:
padding_size = sequence_length - len(recent_features)
padding = np.zeros((padding_size, 400), dtype=np.float32)
feature_matrix = np.vstack([padding, feature_matrix])
self.cob_matrix_stats['feature_generations'] += 1
logger.debug(f"Generated COB feature matrix for {symbol}: {feature_matrix.shape}")
return feature_matrix
except Exception as e:
logger.warning(f"Error getting COB feature matrix for {symbol}: {e}")
return None
def get_cob_state_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]:
"""
Get COB state matrix for RL models (5-minute capped)
Args:
symbol: Trading symbol
sequence_length: Number of time steps to return (max 300 for 5 minutes)
Returns:
np.ndarray: Shape (sequence_length, 200) - DQN state features over time
"""
try:
if symbol not in self.cob_state_matrix:
return None
# Limit sequence length to available data and maximum 5 minutes
max_length = min(sequence_length, len(self.cob_state_matrix[symbol]), 300)
if max_length == 0:
return None
# Get the most recent states
recent_states = list(self.cob_state_matrix[symbol])[-max_length:]
# Stack into matrix
state_matrix = np.stack(recent_states, axis=0)
# Pad if necessary to reach requested sequence length
if len(recent_states) < sequence_length:
padding_size = sequence_length - len(recent_states)
padding = np.zeros((padding_size, 200), dtype=np.float32)
state_matrix = np.vstack([padding, state_matrix])
self.cob_matrix_stats['model_feeds'] += 1
logger.debug(f"Generated COB state matrix for {symbol}: {state_matrix.shape}")
return state_matrix
except Exception as e:
logger.warning(f"Error getting COB state matrix for {symbol}: {e}")
return None
def get_cob_matrix_stats(self) -> Dict[str, Any]:
"""Get COB matrix statistics"""
try:
stats = self.cob_matrix_stats.copy()
# Add current matrix sizes
stats['current_matrix_sizes'] = {}
for symbol in self.symbols:
stats['current_matrix_sizes'][symbol] = {
'data_matrix': len(self.cob_data_matrix.get(symbol, [])),
'feature_matrix': len(self.cob_feature_matrix.get(symbol, [])),
'state_matrix': len(self.cob_state_matrix.get(symbol, []))
}
# Add matrix fill percentages
stats['matrix_fill_percentages'] = {}
for symbol in self.symbols:
data_fill = len(self.cob_data_matrix.get(symbol, [])) / 300 * 100
feature_fill = len(self.cob_feature_matrix.get(symbol, [])) / 300 * 100
state_fill = len(self.cob_state_matrix.get(symbol, [])) / 300 * 100
stats['matrix_fill_percentages'][symbol] = {
'data_matrix': f"{data_fill:.1f}%",
'feature_matrix': f"{feature_fill:.1f}%",
'state_matrix': f"{state_fill:.1f}%"
}
return stats
except Exception as e:
logger.warning(f"Error getting COB matrix stats: {e}")
return {}
def get_cob_statistics(self, symbol: str) -> Optional[Dict]:
"""Get COB statistics for a symbol"""
try:
@ -367,7 +767,7 @@ class TradingOrchestrator:
except Exception as e:
logger.warning(f"Error getting COB statistics for {symbol}: {e}")
return None
def get_market_depth_analysis(self, symbol: str) -> Optional[Dict]:
"""Get detailed market depth analysis from COB"""
try:
@ -377,7 +777,7 @@ class TradingOrchestrator:
except Exception as e:
logger.warning(f"Error getting market depth analysis for {symbol}: {e}")
return None
def get_price_buckets(self, symbol: str) -> Optional[Dict]:
"""Get fine-grain price buckets from COB"""
try:
@ -1435,17 +1835,6 @@ class TradingOrchestrator:
# Enhanced Orchestrator Methods
async def start_cob_integration(self):
"""Start COB integration manually"""
try:
if self.cob_integration:
await self._start_cob_integration()
logger.info("COB Integration started successfully")
else:
logger.warning("COB Integration not available")
except Exception as e:
logger.error(f"Error starting COB integration: {e}")
async def stop_cob_integration(self):
"""Stop COB integration"""
try: