COB integration and refactoring

This commit is contained in:
Dobromir Popov
2025-06-25 02:48:00 +03:00
parent afefcea308
commit e57c6df7e1
6 changed files with 564 additions and 128 deletions

View File

@ -17,6 +17,13 @@ import time
import threading
from typing import Dict, List, Optional, Any
import os
import asyncio
import dash_bootstrap_components as dbc
from dash.exceptions import PreventUpdate
from collections import deque
from threading import Lock
import warnings
from dataclasses import asdict
# Setup logger
logger = logging.getLogger(__name__)
@ -47,6 +54,9 @@ except ImportError:
COB_INTEGRATION_AVAILABLE = False
logger.warning("COB integration not available")
# Import RL COB trader for 1B parameter model integration
from core.realtime_rl_cob_trader import RealtimeRLCOBTrader, PredictionResult
class CleanTradingDashboard:
"""Clean, modular trading dashboard implementation"""
@ -104,7 +114,17 @@ class CleanTradingDashboard:
# Connect to orchestrator for real trading signals
self._connect_to_orchestrator()
logger.info("Clean Trading Dashboard initialized")
# Initialize COB RL Trader (1B parameter model)
self.cob_rl_trader = None
self.cob_predictions = {'ETH/USDT': deque(maxlen=100), 'BTC/USDT': deque(maxlen=100)}
self.cob_data_cache_1d = {'ETH/USDT': deque(maxlen=86400), 'BTC/USDT': deque(maxlen=86400)} # 1d with 1s buckets
self.cob_raw_ticks = {'ETH/USDT': deque(maxlen=150), 'BTC/USDT': deque(maxlen=150)} # 15 seconds of raw ticks
self.cob_lock = Lock()
# Initialize COB integration
self._initialize_cob_integration()
logger.info("Clean Trading Dashboard initialized with COB RL integration")
def _get_initial_balance(self) -> float:
"""Get initial balance from trading executor or default"""
@ -562,10 +582,27 @@ class CleanTradingDashboard:
'simulation_mode': bool(self.trading_executor and getattr(self.trading_executor, 'simulation_mode', True)),
'data_provider_status': 'Active',
'websocket_status': 'Connected' if self.is_streaming else 'Disconnected',
'cob_status': 'Active' if COB_INTEGRATION_AVAILABLE else 'Inactive'
'cob_status': 'Active' if COB_INTEGRATION_AVAILABLE else 'Inactive',
'rl_model_status': 'Inactive',
'predictions_count': 0,
'cache_size': 0
}
if self.orchestrator and hasattr(self.orchestrator, 'cob_integration'):
# Check COB RL trader status
if self.cob_rl_trader:
status['cob_status'] = 'Active'
status['rl_model_status'] = 'Active (1B Parameters)'
# Count predictions
total_predictions = sum(len(pred_list) for pred_list in self.cob_predictions.values())
status['predictions_count'] = total_predictions
# Cache size
total_cache = sum(len(cache) for cache in self.cob_data_cache_1d.values())
status['cache_size'] = total_cache
# Fallback to orchestrator COB integration
elif self.orchestrator and hasattr(self.orchestrator, 'cob_integration'):
cob_integration = self.orchestrator.cob_integration
if cob_integration and hasattr(cob_integration, 'is_active'):
status['cob_status'] = 'Active' if cob_integration.is_active else 'Inactive'
@ -676,6 +713,107 @@ class CleanTradingDashboard:
except Exception as e:
logger.error(f"Error clearing session: {e}")
def _initialize_cob_integration(self):
"""Initialize COB RL trader and data subscription"""
try:
logger.info("Initializing COB RL integration...")
# Initialize trading executor if not provided
if not self.trading_executor:
from core.trading_executor import TradingExecutor
self.trading_executor = TradingExecutor()
# Initialize COB RL trader with 1B parameter model
self.cob_rl_trader = RealtimeRLCOBTrader(
symbols=['ETH/USDT', 'BTC/USDT'],
trading_executor=self.trading_executor,
model_checkpoint_dir="models/realtime_rl_cob",
inference_interval_ms=200, # 200ms inference
min_confidence_threshold=0.7,
required_confident_predictions=3
)
# Subscribe to COB predictions
self.cob_rl_trader.add_prediction_subscriber(self._on_cob_prediction)
# Start COB data subscription in background
import threading
threading.Thread(target=self._start_cob_data_subscription, daemon=True).start()
logger.info("✅ COB RL integration initialized successfully")
logger.info("🧠 1B parameter model ready for inference")
logger.info("📊 COB data subscription started")
except Exception as e:
logger.error(f"Failed to initialize COB integration: {e}")
self.cob_rl_trader = None
def _start_cob_data_subscription(self):
"""Start COB data subscription with proper caching"""
try:
# Start the COB RL trader asynchronously
import asyncio
def start_cob_trader():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.cob_rl_trader.start())
logger.info("COB RL trader started successfully")
except Exception as e:
logger.error(f"Error in COB trader loop: {e}")
finally:
loop.close()
# Start in separate thread to avoid blocking
import threading
cob_thread = threading.Thread(target=start_cob_trader, daemon=True)
cob_thread.start()
except Exception as e:
logger.error(f"Error starting COB data subscription: {e}")
def _on_cob_prediction(self, prediction: PredictionResult):
"""Handle COB RL predictions"""
try:
with self.cob_lock:
# Convert prediction to dashboard format
prediction_data = {
'timestamp': prediction.timestamp,
'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': prediction.confidence,
'predicted_change': prediction.predicted_change,
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction],
'color': ['red', 'gray', 'green'][prediction.predicted_direction]
}
# Add to predictions cache
self.cob_predictions[prediction.symbol].append(prediction_data)
# Cache COB data (1s buckets for 1 day max, 5 min retention)
current_time = datetime.now()
cob_data = {
'timestamp': current_time,
'prediction': prediction_data,
'features': prediction.features.tolist() if prediction.features is not None else []
}
# Add to 1d cache (1s buckets)
self.cob_data_cache_1d[prediction.symbol].append(cob_data)
# Add to raw ticks cache (15 seconds max, 10+ updates/sec)
self.cob_raw_ticks[prediction.symbol].append({
'timestamp': current_time,
'prediction': prediction_data,
'raw_features': prediction.features.tolist() if prediction.features is not None else []
})
logger.debug(f"COB prediction cached for {prediction.symbol}: "
f"{prediction_data['direction_text']} (confidence: {prediction.confidence:.3f})")
except Exception as e:
logger.error(f"Error handling COB prediction: {e}")
def _connect_to_orchestrator(self):
"""Connect to orchestrator for real trading signals"""
try: