From d791ab8b14577c4373f163f9d599284427ff7e41 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 27 Jun 2025 02:38:05 +0300 Subject: [PATCH] better cob integration --- core/orchestrator.py | 481 +++++++++++++++++++++--- enhanced_realtime_training.py | 2 +- test_model_predictions_visualization.py | 309 --------------- web/clean_dashboard.py | 336 ++++++++++++----- 4 files changed, 678 insertions(+), 450 deletions(-) delete mode 100644 test_model_predictions_visualization.py diff --git a/core/orchestrator.py b/core/orchestrator.py index 8350381..4852853 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -13,10 +13,12 @@ This is the core orchestrator that: import asyncio import logging import time +import threading import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass +from collections import deque from .config import get_config from .data_provider import DataProvider @@ -258,41 +260,322 @@ class TradingOrchestrator: logger.error(f"Error initializing ML models: {e}") def _initialize_cob_integration(self): - """Initialize real-time COB integration for market microstructure data""" + """Initialize real-time COB integration for market microstructure data with 5-minute data matrix""" try: - if COB_INTEGRATION_AVAILABLE: - # Initialize COB integration with our symbols - self.cob_integration = COBIntegration(data_provider=self.data_provider, symbols=self.symbols ) + logger.info("Initializing COB integration with 5-minute data matrix for all models") + + # Import COB integration directly (same as working dashboard) + from core.cob_integration import COBIntegration + + # Initialize COB integration with our symbols + self.cob_integration = COBIntegration(symbols=self.symbols) + + # Register callbacks to receive real-time COB data + self.cob_integration.add_cnn_callback(self._on_cob_cnn_features) + self.cob_integration.add_dqn_callback(self._on_cob_dqn_features) + self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data) + + # Initialize 5-minute COB data matrix system + self.cob_matrix_duration = 300 # 5 minutes in seconds + self.cob_matrix_resolution = 1 # 1 second resolution + self.cob_matrix_size = self.cob_matrix_duration // self.cob_matrix_resolution # 300 samples + + # COB data matrix storage - 5 minutes of 1-second snapshots + self.cob_data_matrix: Dict[str, deque] = {} + self.cob_feature_matrix: Dict[str, deque] = {} + self.cob_state_matrix: Dict[str, deque] = {} + + # Initialize matrix storage for each symbol + for symbol in self.symbols: + # Raw COB snapshots (300 x COBSnapshot objects) + self.cob_data_matrix[symbol] = deque(maxlen=self.cob_matrix_size) - # Register callbacks to receive real-time COB data - self.cob_integration.add_cnn_callback(self._on_cob_cnn_features) - self.cob_integration.add_dqn_callback(self._on_cob_dqn_features) - self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data) - - logger.info("COB Integration initialized - real-time market microstructure data available") - logger.info(f"COB symbols: {self.symbols}") - - # COB integration will be started manually via start_cob_integration() - else: - logger.warning("COB Integration not available - models will use basic price data only") + # CNN feature matrix (300 x 400 features) + self.cob_feature_matrix[symbol] = deque(maxlen=self.cob_matrix_size) + # DQN state matrix (300 x 200 state features) + self.cob_state_matrix[symbol] = deque(maxlen=self.cob_matrix_size) + + # Initialize COB data storage (legacy support) + self.latest_cob_snapshots = {} + self.cob_feature_cache = {} + self.cob_state_cache = {} + + # COB matrix update tracking + self.last_cob_matrix_update = {} + self.cob_matrix_update_interval = 1.0 # Update every 1 second + + # COB matrix statistics + self.cob_matrix_stats = { + 'total_updates': 0, + 'matrix_fills': {symbol: 0 for symbol in self.symbols}, + 'feature_generations': 0, + 'model_feeds': 0 + } + + logger.info("COB integration initialized successfully with 5-minute data matrix") + logger.info(f"Matrix configuration: {self.cob_matrix_size} samples x 1s resolution") + logger.info("Real-time order book data matrix will be available for all models") + logger.info("COB provides: Multi-exchange consolidated order book with temporal context") + except Exception as e: logger.error(f"Error initializing COB integration: {e}") - logger.info("COB integration will be disabled - dashboard will run with basic price data") self.cob_integration = None - - async def _start_cob_integration(self): - """Start COB integration in background""" + logger.info("COB integration will be disabled - models will use basic price data") + + async def start_cob_integration(self): + """Start COB integration with matrix data collection""" try: if self.cob_integration: - await self.cob_integration.start() - logger.info("COB Integration started - real-time order book data streaming") + logger.info("Starting COB integration with 5-minute matrix collection...") + + # Start COB integration in background thread + def start_cob_in_thread(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def cob_main(): + await self.cob_integration.start() + # Keep running until stopped + while True: + await asyncio.sleep(1) + + loop.run_until_complete(cob_main()) + + except Exception as e: + logger.error(f"Error in COB thread: {e}") + finally: + try: + loop.close() + except: + pass + + import threading + self.cob_thread = threading.Thread(target=start_cob_in_thread, daemon=True) + self.cob_thread.start() + + # Start matrix update worker + self._start_cob_matrix_worker() + + logger.info("COB Integration started - 5-minute data matrix streaming active") + else: + logger.warning("COB integration is None - cannot start") + except Exception as e: logger.error(f"Error starting COB integration: {e}") self.cob_integration = None - + + def _start_cob_matrix_worker(self): + """Start background worker for COB matrix updates""" + def matrix_worker(): + try: + while True: + try: + current_time = time.time() + + # Update matrix for each symbol + for symbol in self.symbols: + # Check if it's time to update this symbol's matrix + last_update = self.last_cob_matrix_update.get(symbol, 0) + + if current_time - last_update >= self.cob_matrix_update_interval: + self._update_cob_matrix_for_symbol(symbol) + self.last_cob_matrix_update[symbol] = current_time + + # Sleep for a short interval + time.sleep(0.5) # 500ms update cycle + + except Exception as e: + logger.warning(f"Error in COB matrix worker: {e}") + time.sleep(5) + + except Exception as e: + logger.error(f"COB matrix worker error: {e}") + + # Start worker thread + matrix_thread = threading.Thread(target=matrix_worker, daemon=True) + matrix_thread.start() + logger.info("COB matrix worker started - updating every 1 second") + + def _update_cob_matrix_for_symbol(self, symbol: str): + """Update COB data matrix for a specific symbol""" + try: + if not self.cob_integration: + return + + # Get latest COB snapshot + cob_snapshot = self.cob_integration.get_cob_snapshot(symbol) + + if cob_snapshot: + # Add raw snapshot to matrix + self.cob_data_matrix[symbol].append(cob_snapshot) + + # Generate CNN features (400 features) + cnn_features = self._generate_cob_cnn_features(symbol, cob_snapshot) + if cnn_features is not None: + self.cob_feature_matrix[symbol].append(cnn_features) + + # Generate DQN state features (200 features) + dqn_features = self._generate_cob_dqn_features(symbol, cob_snapshot) + if dqn_features is not None: + self.cob_state_matrix[symbol].append(dqn_features) + + # Update statistics + self.cob_matrix_stats['total_updates'] += 1 + self.cob_matrix_stats['matrix_fills'][symbol] += 1 + + # Log progress every 100 updates + if self.cob_matrix_stats['total_updates'] % 100 == 0: + matrix_size = len(self.cob_data_matrix[symbol]) + feature_size = len(self.cob_feature_matrix[symbol]) + logger.info(f"COB Matrix Update #{self.cob_matrix_stats['total_updates']}: " + f"{symbol} matrix={matrix_size}/300, features={feature_size}/300") + + except Exception as e: + logger.warning(f"Error updating COB matrix for {symbol}: {e}") + + def _generate_cob_cnn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]: + """Generate CNN features from COB snapshot (400 features)""" + try: + features = [] + + # Order book depth features (200 features: 20 levels x 5 features x 2 sides) + max_levels = 20 + + # Process bids (100 features: 20 levels x 5 features) + for i in range(max_levels): + if hasattr(cob_snapshot, 'consolidated_bids') and i < len(cob_snapshot.consolidated_bids): + level = cob_snapshot.consolidated_bids[i] + if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'): + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + features.extend([ + price_offset, + getattr(level, 'total_volume_usd', 0) / 1000000, # Normalize to millions + getattr(level, 'total_size', 0) / 1000, # Normalize to thousands + len(getattr(level, 'exchange_breakdown', {})), + getattr(level, 'liquidity_score', 0.5) + ]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + + # Process asks (100 features: 20 levels x 5 features) + for i in range(max_levels): + if hasattr(cob_snapshot, 'consolidated_asks') and i < len(cob_snapshot.consolidated_asks): + level = cob_snapshot.consolidated_asks[i] + if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'): + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + features.extend([ + price_offset, + getattr(level, 'total_volume_usd', 0) / 1000000, + getattr(level, 'total_size', 0) / 1000, + len(getattr(level, 'exchange_breakdown', {})), + getattr(level, 'liquidity_score', 0.5) + ]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + + # Market microstructure features (100 features) + features.extend([ + getattr(cob_snapshot, 'spread_bps', 0) / 100, # Normalized spread + getattr(cob_snapshot, 'liquidity_imbalance', 0), + getattr(cob_snapshot, 'total_bid_liquidity', 0) / 1000000, + getattr(cob_snapshot, 'total_ask_liquidity', 0) / 1000000, + len(getattr(cob_snapshot, 'exchanges_active', [])) / 10, # Normalize to max 10 exchanges + ]) + + # Pad remaining features to reach 400 + while len(features) < 400: + features.append(0.0) + + # Ensure exactly 400 features + features = features[:400] + + return np.array(features, dtype=np.float32) + + except Exception as e: + logger.warning(f"Error generating COB CNN features for {symbol}: {e}") + return np.zeros(400, dtype=np.float32) + + def _generate_cob_dqn_features(self, symbol: str, cob_snapshot) -> Optional[np.ndarray]: + """Generate DQN state features from COB snapshot (200 features)""" + try: + features = [] + + # Market state features (50 features) + features.extend([ + getattr(cob_snapshot, 'volume_weighted_mid', 0) / 100000, # Normalized price + getattr(cob_snapshot, 'spread_bps', 0) / 100, + getattr(cob_snapshot, 'liquidity_imbalance', 0), + getattr(cob_snapshot, 'total_bid_liquidity', 0) / 1000000, + getattr(cob_snapshot, 'total_ask_liquidity', 0) / 1000000, + ]) + + # Top 10 bid levels (50 features: 10 levels x 5 features) + for i in range(10): + if hasattr(cob_snapshot, 'consolidated_bids') and i < len(cob_snapshot.consolidated_bids): + level = cob_snapshot.consolidated_bids[i] + if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'): + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + features.extend([ + price_offset, + getattr(level, 'total_volume_usd', 0) / 1000000, + getattr(level, 'total_size', 0) / 1000, + len(getattr(level, 'exchange_breakdown', {})), + getattr(level, 'liquidity_score', 0.5) + ]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + + # Top 10 ask levels (50 features: 10 levels x 5 features) + for i in range(10): + if hasattr(cob_snapshot, 'consolidated_asks') and i < len(cob_snapshot.consolidated_asks): + level = cob_snapshot.consolidated_asks[i] + if hasattr(level, 'price') and hasattr(cob_snapshot, 'volume_weighted_mid'): + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + features.extend([ + price_offset, + getattr(level, 'total_volume_usd', 0) / 1000000, + getattr(level, 'total_size', 0) / 1000, + len(getattr(level, 'exchange_breakdown', {})), + getattr(level, 'liquidity_score', 0.5) + ]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + else: + features.extend([0.0, 0.0, 0.0, 0.0, 0.0]) + + # Exchange diversity and quality features (50 features) + active_exchanges = getattr(cob_snapshot, 'exchanges_active', []) + features.extend([ + len(active_exchanges) / 10, # Normalized exchange count + 1.0 if 'binance' in active_exchanges else 0.0, + 1.0 if 'coinbase' in active_exchanges else 0.0, + 1.0 if 'kraken' in active_exchanges else 0.0, + 1.0 if 'huobi' in active_exchanges else 0.0, + ]) + + # Pad remaining features to reach 200 + while len(features) < 200: + features.append(0.0) + + # Ensure exactly 200 features + features = features[:200] + + return np.array(features, dtype=np.float32) + + except Exception as e: + logger.warning(f"Error generating COB DQN features for {symbol}: {e}") + return np.zeros(200, dtype=np.float32) + def _on_cob_cnn_features(self, symbol: str, cob_data: Dict): - """Handle CNN features from COB integration""" + """Handle CNN features from COB integration - enhanced with matrix support""" try: if 'features' in cob_data: self.latest_cob_features[symbol] = cob_data['features'] @@ -312,9 +595,9 @@ class TradingOrchestrator: except Exception as e: logger.warning(f"Error processing COB CNN features for {symbol}: {e}") - + def _on_cob_dqn_features(self, symbol: str, cob_data: Dict): - """Handle DQN state features from COB integration""" + """Handle DQN state features from COB integration - enhanced with matrix support""" try: if 'state' in cob_data: self.latest_cob_state[symbol] = cob_data['state'] @@ -330,9 +613,9 @@ class TradingOrchestrator: except Exception as e: logger.warning(f"Error processing COB DQN features for {symbol}: {e}") - + def _on_cob_dashboard_data(self, symbol: str, cob_data: Dict): - """Handle dashboard data from COB integration""" + """Handle dashboard data from COB integration - enhanced with matrix support""" try: # Store raw COB snapshot for dashboard display if self.cob_integration: @@ -343,21 +626,138 @@ class TradingOrchestrator: except Exception as e: logger.warning(f"Error processing COB dashboard data for {symbol}: {e}") - - # COB Data Access Methods for Models - + + # Enhanced COB Data Access Methods for Models with 5-minute matrix support + def get_cob_features(self, symbol: str) -> Optional[np.ndarray]: """Get latest COB CNN features for a symbol""" return self.latest_cob_features.get(symbol) - + def get_cob_state(self, symbol: str) -> Optional[np.ndarray]: """Get latest COB DQN state features for a symbol""" return self.latest_cob_state.get(symbol) - + def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]: """Get latest COB snapshot for a symbol""" return self.latest_cob_data.get(symbol) - + + def get_cob_feature_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]: + """ + Get COB feature matrix for CNN models (5-minute capped) + + Args: + symbol: Trading symbol + sequence_length: Number of time steps to return (max 300 for 5 minutes) + + Returns: + np.ndarray: Shape (sequence_length, 400) - CNN features over time + """ + try: + if symbol not in self.cob_feature_matrix: + return None + + # Limit sequence length to available data and maximum 5 minutes + max_length = min(sequence_length, len(self.cob_feature_matrix[symbol]), 300) + + if max_length == 0: + return None + + # Get the most recent features + recent_features = list(self.cob_feature_matrix[symbol])[-max_length:] + + # Stack into matrix + feature_matrix = np.stack(recent_features, axis=0) + + # Pad if necessary to reach requested sequence length + if len(recent_features) < sequence_length: + padding_size = sequence_length - len(recent_features) + padding = np.zeros((padding_size, 400), dtype=np.float32) + feature_matrix = np.vstack([padding, feature_matrix]) + + self.cob_matrix_stats['feature_generations'] += 1 + + logger.debug(f"Generated COB feature matrix for {symbol}: {feature_matrix.shape}") + return feature_matrix + + except Exception as e: + logger.warning(f"Error getting COB feature matrix for {symbol}: {e}") + return None + + def get_cob_state_matrix(self, symbol: str, sequence_length: int = 60) -> Optional[np.ndarray]: + """ + Get COB state matrix for RL models (5-minute capped) + + Args: + symbol: Trading symbol + sequence_length: Number of time steps to return (max 300 for 5 minutes) + + Returns: + np.ndarray: Shape (sequence_length, 200) - DQN state features over time + """ + try: + if symbol not in self.cob_state_matrix: + return None + + # Limit sequence length to available data and maximum 5 minutes + max_length = min(sequence_length, len(self.cob_state_matrix[symbol]), 300) + + if max_length == 0: + return None + + # Get the most recent states + recent_states = list(self.cob_state_matrix[symbol])[-max_length:] + + # Stack into matrix + state_matrix = np.stack(recent_states, axis=0) + + # Pad if necessary to reach requested sequence length + if len(recent_states) < sequence_length: + padding_size = sequence_length - len(recent_states) + padding = np.zeros((padding_size, 200), dtype=np.float32) + state_matrix = np.vstack([padding, state_matrix]) + + self.cob_matrix_stats['model_feeds'] += 1 + + logger.debug(f"Generated COB state matrix for {symbol}: {state_matrix.shape}") + return state_matrix + + except Exception as e: + logger.warning(f"Error getting COB state matrix for {symbol}: {e}") + return None + + def get_cob_matrix_stats(self) -> Dict[str, Any]: + """Get COB matrix statistics""" + try: + stats = self.cob_matrix_stats.copy() + + # Add current matrix sizes + stats['current_matrix_sizes'] = {} + for symbol in self.symbols: + stats['current_matrix_sizes'][symbol] = { + 'data_matrix': len(self.cob_data_matrix.get(symbol, [])), + 'feature_matrix': len(self.cob_feature_matrix.get(symbol, [])), + 'state_matrix': len(self.cob_state_matrix.get(symbol, [])) + } + + # Add matrix fill percentages + stats['matrix_fill_percentages'] = {} + for symbol in self.symbols: + data_fill = len(self.cob_data_matrix.get(symbol, [])) / 300 * 100 + feature_fill = len(self.cob_feature_matrix.get(symbol, [])) / 300 * 100 + state_fill = len(self.cob_state_matrix.get(symbol, [])) / 300 * 100 + + stats['matrix_fill_percentages'][symbol] = { + 'data_matrix': f"{data_fill:.1f}%", + 'feature_matrix': f"{feature_fill:.1f}%", + 'state_matrix': f"{state_fill:.1f}%" + } + + return stats + + except Exception as e: + logger.warning(f"Error getting COB matrix stats: {e}") + return {} + def get_cob_statistics(self, symbol: str) -> Optional[Dict]: """Get COB statistics for a symbol""" try: @@ -367,7 +767,7 @@ class TradingOrchestrator: except Exception as e: logger.warning(f"Error getting COB statistics for {symbol}: {e}") return None - + def get_market_depth_analysis(self, symbol: str) -> Optional[Dict]: """Get detailed market depth analysis from COB""" try: @@ -377,7 +777,7 @@ class TradingOrchestrator: except Exception as e: logger.warning(f"Error getting market depth analysis for {symbol}: {e}") return None - + def get_price_buckets(self, symbol: str) -> Optional[Dict]: """Get fine-grain price buckets from COB""" try: @@ -1435,17 +1835,6 @@ class TradingOrchestrator: # Enhanced Orchestrator Methods - async def start_cob_integration(self): - """Start COB integration manually""" - try: - if self.cob_integration: - await self._start_cob_integration() - logger.info("COB Integration started successfully") - else: - logger.warning("COB Integration not available") - except Exception as e: - logger.error(f"Error starting COB integration: {e}") - async def stop_cob_integration(self): """Stop COB integration""" try: diff --git a/enhanced_realtime_training.py b/enhanced_realtime_training.py index 5cf281c..66973c0 100644 --- a/enhanced_realtime_training.py +++ b/enhanced_realtime_training.py @@ -1183,7 +1183,7 @@ class EnhancedRealtimeTrainingSystem: if symbol in self.recent_dqn_predictions: self.recent_dqn_predictions[symbol].append(display_prediction) - self.last_prediction_time[symbol] = current_time + self.last_prediction_time[symbol] = int(current_time) logger.info(f"Forward DQN prediction: {symbol} action={['BUY','SELL','HOLD'][action]} confidence={confidence:.2f} target={target_time.strftime('%H:%M:%S')}") diff --git a/test_model_predictions_visualization.py b/test_model_predictions_visualization.py deleted file mode 100644 index 40a9a3e..0000000 --- a/test_model_predictions_visualization.py +++ /dev/null @@ -1,309 +0,0 @@ -#!/usr/bin/env python3 -""" -Test Model Predictions Visualization - -This script demonstrates the enhanced model prediction visualization system -that shows DQN actions, CNN price predictions, and accuracy feedback on the price chart. - -Features tested: -- DQN action predictions (BUY/SELL/HOLD) as directional arrows with confidence-based sizing -- CNN price direction predictions as trend lines with target markers -- Prediction accuracy feedback with color-coded results -- Real-time prediction tracking and storage -- Mock prediction generation for demonstration -""" - -import asyncio -import logging -import time -import numpy as np -from datetime import datetime, timedelta -from typing import Dict, List, Optional - -from core.config import get_config -from core.data_provider import DataProvider -from core.orchestrator import TradingOrchestrator -from core.trading_executor import TradingExecutor -from web.clean_dashboard import create_clean_dashboard -from enhanced_realtime_training import EnhancedRealtimeTrainingSystem - -# Setup logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -class ModelPredictionTester: - """Test model prediction visualization capabilities""" - - def __init__(self): - self.config = get_config() - self.data_provider = DataProvider() - self.trading_executor = TradingExecutor() - self.orchestrator = TradingOrchestrator( - data_provider=self.data_provider, - enhanced_rl_training=True, - model_registry={} - ) - - # Initialize enhanced training system - self.training_system = EnhancedRealtimeTrainingSystem( - orchestrator=self.orchestrator, - data_provider=self.data_provider, - dashboard=None # Will be set after dashboard creation - ) - - # Create dashboard with enhanced prediction visualization - self.dashboard = create_clean_dashboard( - data_provider=self.data_provider, - orchestrator=self.orchestrator, - trading_executor=self.trading_executor - ) - - # Connect training system to dashboard - self.training_system.dashboard = self.dashboard - self.dashboard.training_system = self.training_system - - # Test data - self.test_symbols = ['ETH/USDT', 'BTC/USDT'] - self.prediction_count = 0 - - logger.info("Model Prediction Tester initialized") - - def generate_mock_dqn_predictions(self, symbol: str, count: int = 10): - """Generate mock DQN predictions for testing""" - try: - current_price = self.data_provider.get_current_price(symbol) or 2400.0 - - for i in range(count): - # Generate realistic state vector - state = np.random.random(100) # 100-dimensional state - - # Generate Q-values with some logic - q_values = [np.random.random(), np.random.random(), np.random.random()] - action = np.argmax(q_values) # Best action - confidence = max(q_values) / sum(q_values) # Confidence based on Q-value distribution - - # Add some price variation - pred_price = current_price + np.random.normal(0, 20) - - # Capture prediction - self.training_system.capture_dqn_prediction( - symbol=symbol, - state=state, - q_values=q_values, - action=action, - confidence=confidence, - price=pred_price - ) - - self.prediction_count += 1 - - logger.info(f"Generated DQN prediction {i+1}/{count}: {symbol} action={['BUY', 'SELL', 'HOLD'][action]} confidence={confidence:.2f}") - - # Small delay between predictions - time.sleep(0.1) - - except Exception as e: - logger.error(f"Error generating DQN predictions: {e}") - - def generate_mock_cnn_predictions(self, symbol: str, count: int = 8): - """Generate mock CNN predictions for testing""" - try: - current_price = self.data_provider.get_current_price(symbol) or 2400.0 - - for i in range(count): - # Generate direction with some logic - direction = np.random.choice([0, 1, 2], p=[0.3, 0.2, 0.5]) # Slightly bullish - confidence = 0.4 + np.random.random() * 0.5 # 0.4-0.9 confidence - - # Calculate predicted price based on direction - if direction == 2: # UP - price_change = np.random.uniform(5, 50) - elif direction == 0: # DOWN - price_change = -np.random.uniform(5, 50) - else: # SAME - price_change = np.random.uniform(-5, 5) - - predicted_price = current_price + price_change - - # Generate features - features = np.random.random((15, 20)).flatten() # Flattened CNN features - - # Capture prediction - self.training_system.capture_cnn_prediction( - symbol=symbol, - current_price=current_price, - predicted_price=predicted_price, - direction=direction, - confidence=confidence, - features=features - ) - - self.prediction_count += 1 - - logger.info(f"Generated CNN prediction {i+1}/{count}: {symbol} direction={['DOWN', 'SAME', 'UP'][direction]} confidence={confidence:.2f}") - - # Small delay between predictions - time.sleep(0.2) - - except Exception as e: - logger.error(f"Error generating CNN predictions: {e}") - - def generate_mock_accuracy_data(self, symbol: str, count: int = 15): - """Generate mock prediction accuracy data for testing""" - try: - current_price = self.data_provider.get_current_price(symbol) or 2400.0 - - for i in range(count): - # Randomly choose prediction type - prediction_type = np.random.choice(['DQN', 'CNN']) - predicted_action = np.random.choice([0, 1, 2]) - confidence = 0.3 + np.random.random() * 0.6 - - # Generate realistic price change - actual_price_change = np.random.normal(0, 0.01) # ±1% typical change - - # Validate accuracy - self.training_system.validate_prediction_accuracy( - symbol=symbol, - prediction_type=prediction_type, - predicted_action=predicted_action, - actual_price_change=actual_price_change, - confidence=confidence - ) - - logger.info(f"Generated accuracy data {i+1}/{count}: {symbol} {prediction_type} action={predicted_action}") - - # Small delay - time.sleep(0.1) - - except Exception as e: - logger.error(f"Error generating accuracy data: {e}") - - def run_prediction_generation_test(self): - """Run comprehensive prediction generation test""" - try: - logger.info("Starting Model Prediction Visualization Test") - logger.info("=" * 60) - - # Test for each symbol - for symbol in self.test_symbols: - logger.info(f"\nGenerating predictions for {symbol}...") - - # Generate DQN predictions - logger.info(f"Generating DQN predictions for {symbol}...") - self.generate_mock_dqn_predictions(symbol, count=12) - - # Generate CNN predictions - logger.info(f"Generating CNN predictions for {symbol}...") - self.generate_mock_cnn_predictions(symbol, count=8) - - # Generate accuracy data - logger.info(f"Generating accuracy data for {symbol}...") - self.generate_mock_accuracy_data(symbol, count=20) - - # Get prediction summary - summary = self.training_system.get_prediction_summary(symbol) - logger.info(f"Prediction summary for {symbol}: {summary}") - - # Log total statistics - training_stats = self.training_system.get_training_statistics() - logger.info("\nTraining System Statistics:") - logger.info(f"Total predictions generated: {self.prediction_count}") - logger.info(f"Prediction stats: {training_stats.get('prediction_stats', {})}") - - logger.info("\n" + "=" * 60) - logger.info("Prediction generation test completed successfully!") - logger.info("Dashboard should now show enhanced model predictions on the price chart:") - logger.info("- Green/Red arrows for DQN BUY/SELL predictions") - logger.info("- Gray circles for DQN HOLD predictions") - logger.info("- Colored trend lines for CNN price direction predictions") - logger.info("- Diamond markers for CNN prediction targets") - logger.info("- Green/Red X marks for correct/incorrect prediction feedback") - logger.info("- Hover tooltips showing confidence, Q-values, and accuracy scores") - - except Exception as e: - logger.error(f"Error in prediction generation test: {e}") - - def start_dashboard_with_predictions(self, host='127.0.0.1', port=8051): - """Start dashboard with enhanced prediction visualization""" - try: - logger.info(f"Starting dashboard with model predictions at http://{host}:{port}") - - # Run prediction generation in background - import threading - pred_thread = threading.Thread(target=self.run_prediction_generation_test, daemon=True) - pred_thread.start() - - # Start training system - self.training_system.start_training() - - # Start dashboard - self.dashboard.run_server(host=host, port=port, debug=False) - - except Exception as e: - logger.error(f"Error starting dashboard with predictions: {e}") - - def test_prediction_accuracy_validation(self): - """Test prediction accuracy validation logic""" - try: - logger.info("Testing prediction accuracy validation...") - - # Test DQN accuracy validation - test_cases = [ - ('DQN', 0, 0.01, 0.8, True), # BUY + price up = correct - ('DQN', 1, -0.01, 0.7, True), # SELL + price down = correct - ('DQN', 2, 0.0005, 0.6, True), # HOLD + no change = correct - ('DQN', 0, -0.01, 0.8, False), # BUY + price down = incorrect - ('CNN', 2, 0.01, 0.9, True), # UP + price up = correct - ('CNN', 0, -0.01, 0.8, True), # DOWN + price down = correct - ('CNN', 1, 0.0005, 0.7, True), # SAME + no change = correct - ('CNN', 2, -0.01, 0.9, False), # UP + price down = incorrect - ] - - for prediction_type, action, price_change, confidence, expected_correct in test_cases: - self.training_system.validate_prediction_accuracy( - symbol='ETH/USDT', - prediction_type=prediction_type, - predicted_action=action, - actual_price_change=price_change, - confidence=confidence - ) - - # Check if validation worked correctly - if self.training_system.prediction_accuracy_history['ETH/USDT']: - latest = list(self.training_system.prediction_accuracy_history['ETH/USDT'])[-1] - actual_correct = latest['correct'] - - status = "✓" if actual_correct == expected_correct else "✗" - logger.info(f"{status} {prediction_type} action={action} change={price_change:.4f} -> correct={actual_correct}") - - logger.info("Prediction accuracy validation test completed") - - except Exception as e: - logger.error(f"Error testing prediction accuracy validation: {e}") - -def main(): - """Main test function""" - try: - # Create tester - tester = ModelPredictionTester() - - # Run accuracy validation test first - tester.test_prediction_accuracy_validation() - - # Start dashboard with enhanced predictions - logger.info("\nStarting dashboard with enhanced model prediction visualization...") - logger.info("Visit http://127.0.0.1:8051 to see the enhanced price chart with model predictions") - - tester.start_dashboard_with_predictions() - - except KeyboardInterrupt: - logger.info("Test interrupted by user") - except Exception as e: - logger.error(f"Error in main test: {e}") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 808e81c..b619e93 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -421,29 +421,17 @@ class CleanTradingDashboard: [Input('interval-component', 'n_intervals')] ) def update_cob_data(n): - """Update COB data displays with price buckets""" + """Update COB data displays with real order book ladders""" try: - # ETH/USDT COB with $1 price buckets - eth_cob = self._get_cob_snapshot('ETH/USDT') - eth_buckets = self.get_cob_price_buckets('ETH/USDT') - eth_memory_stats = self.get_cob_memory_stats('ETH/USDT') - eth_components = self.component_manager.format_cob_data_with_buckets( - eth_cob, 'ETH/USDT', eth_buckets, eth_memory_stats, bucket_size=1.0 - ) - - # BTC/USDT COB with $10 price buckets - Reference data for ETH models - btc_cob = self._get_cob_snapshot('BTC/USDT') - btc_buckets = self.get_cob_price_buckets('BTC/USDT') - btc_memory_stats = self.get_cob_memory_stats('BTC/USDT') - btc_components = self.component_manager.format_cob_data_with_buckets( - btc_cob, 'BTC/USDT', btc_buckets, btc_memory_stats, bucket_size=10.0 - ) + # Get real COB data from the working integration + eth_components = self._create_cob_ladder_display('ETH/USDT') + btc_components = self._create_cob_ladder_display('BTC/USDT') return eth_components, btc_components except Exception as e: logger.error(f"Error updating COB data: {e}") - error_msg = html.P(f"Error: {str(e)}", className="text-danger") + error_msg = html.P(f"COB Error: {str(e)}", className="text-danger small") return error_msg, error_msg @self.app.callback( @@ -2947,47 +2935,243 @@ class CleanTradingDashboard: self.training_system = None def _initialize_cob_integration(self): - """Initialize COB integration with high-frequency data handling""" + """Initialize COB integration with high-frequency data handling - LAZY INITIALIZATION""" try: - if not COB_INTEGRATION_AVAILABLE: - logger.warning("COB integration not available - skipping") - return + logger.info("Setting up COB integration for lazy initialization (will start when dashboard runs)") - # Initialize COB integration with dashboard callback - self.cob_integration = COBIntegration( - data_provider=self.data_provider, - symbols=['ETH/USDT', 'BTC/USDT'] - ) + # Don't initialize COB here - just set up for lazy initialization + self.cob_integration = None + self.cob_integration_started = False + self.latest_cob_data = {} + self.cob_update_timestamps = {} - # Register dashboard callback for COB updates - self.cob_integration.add_dashboard_callback(self._on_high_frequency_cob_update) - - # Register CNN callback for COB features (for next price prediction) - self.cob_integration.add_cnn_callback(self._on_cob_cnn_features) - - # Register DQN callback for COB state features (for RL training) - self.cob_integration.add_dqn_callback(self._on_cob_dqn_features) - - # Start COB integration in background thread - import threading - def start_cob(): - import asyncio - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self.cob_integration.start()) - except Exception as e: - logger.error(f"Error starting COB integration: {e}") - finally: - loop.close() - - cob_thread = threading.Thread(target=start_cob, daemon=True) - cob_thread.start() - - logger.info("High-frequency COB integration initialized (50-100 Hz data handling)") + logger.info("COB integration setup complete - will initialize when event loop is available") except Exception as e: - logger.error(f"Error initializing COB integration: {e}") + logger.error(f"Error setting up COB integration: {e}") + self.cob_integration = None + + def _start_cob_integration_lazy(self): + """Start COB integration when dashboard is running (lazy initialization)""" + if self.cob_integration_started: + return + + try: + logger.info("Starting COB integration with lazy initialization pattern") + + # Import COB integration directly (same as working dashboard) + from core.cob_integration import COBIntegration + + # Start COB integration in a background thread with proper event loop + def start_cob_worker(): + """Start COB integration using the exact same pattern as working dashboard""" + try: + # Create new event loop for COB (same as working dashboard) + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def cob_main(): + """Main COB loop (same pattern as working dashboard)""" + try: + # Initialize COB integration with our symbols (same pattern as working dashboard) + self.cob_integration = COBIntegration(symbols=['ETH/USDT', 'BTC/USDT']) + + # Register callback to receive real-time COB data (same as working dashboard) + self.cob_integration.add_dashboard_callback(self._on_cob_update) + + # Start COB data streaming as background task (same as working dashboard) + await self.cob_integration.start() + + logger.info("COB integration started successfully with lazy initialization") + logger.info("High-frequency COB data streaming active") + + # Keep running (same as working dashboard) + while True: + await asyncio.sleep(1) + + except Exception as e: + logger.error(f"Error in COB main loop: {e}") + + # Run the COB integration (same as working dashboard) + loop.run_until_complete(cob_main()) + + except Exception as e: + logger.error(f"Error in COB worker thread: {e}") + finally: + try: + loop.close() + except: + pass + + # Start COB worker in background thread + import threading + self.cob_thread = threading.Thread(target=start_cob_worker, daemon=True) + self.cob_thread.start() + + self.cob_integration_started = True + logger.info("COB integration lazy initialization completed") + + except Exception as e: + logger.error(f"Error in lazy COB initialization: {e}") + self.cob_integration = None + + def _on_cob_update(self, symbol: str, data: Dict): + """Handle COB data updates (same callback pattern as working dashboard)""" + try: + # Store latest COB data + self.latest_cob_data[symbol] = data + self.cob_update_timestamps[symbol] = datetime.now() + + # Provide data to orchestrator models + if hasattr(self.orchestrator, '_on_cob_dashboard_data'): + self.orchestrator._on_cob_dashboard_data(symbol, data) + + # Provide data to enhanced training system + if hasattr(self, 'training_system') and self.training_system: + # Add COB snapshot to training system + if hasattr(self.training_system, 'real_time_data'): + cob_snapshot = { + 'timestamp': time.time(), + 'symbol': symbol, + 'stats': data.get('stats', {}), + 'levels': len(data.get('bids', [])) + len(data.get('asks', [])), + 'imbalance': data.get('stats', {}).get('imbalance', 0), + 'spread_bps': data.get('stats', {}).get('spread_bps', 0) + } + self.training_system.real_time_data['cob_snapshots'].append(cob_snapshot) + + logger.debug(f"COB update processed: {symbol} - {len(data.get('bids', []))} bids, {len(data.get('asks', []))} asks") + + except Exception as e: + logger.debug(f"Error processing COB update: {e}") + + def get_cob_data(self, symbol: str) -> Optional[Dict]: + """Get latest COB data for a symbol""" + try: + return self.latest_cob_data.get(symbol) + except Exception as e: + logger.debug(f"Error getting COB data: {e}") + return None + + def get_cob_statistics(self, symbol: str) -> Optional[Dict]: + """Get COB statistics for a symbol""" + try: + if symbol in self.latest_cob_data: + return self.latest_cob_data[symbol].get('stats', {}) + return None + except Exception as e: + logger.debug(f"Error getting COB statistics: {e}") + return None + + def _create_cob_ladder_display(self, symbol: str) -> List: + """Create real COB ladder display showing order book""" + try: + # Get COB data from the working integration + cob_data = self.get_cob_data(symbol) + + if not cob_data: + return [ + html.Div([ + html.H6(f"{symbol} - COB", className="text-muted mb-2"), + html.P("COB data not available", className="text-warning small"), + html.P("Initializing connections...", className="text-muted small") + ]) + ] + + components = [] + + # Header with symbol and status + components.append(html.Div([ + html.H6(f"{symbol} - Order Book", className="text-info mb-2"), + html.Small(f"Last update: {datetime.now().strftime('%H:%M:%S')}", className="text-muted") + ])) + + # Get order book data + bids = cob_data.get('bids', []) + asks = cob_data.get('asks', []) + stats = cob_data.get('stats', {}) + + # Display key statistics + if stats: + spread = stats.get('spread_bps', 0) + imbalance = stats.get('imbalance', 0) + + components.append(html.Div([ + html.P([ + html.Span("Spread: ", className="text-muted small"), + html.Span(f"{spread:.1f} bps", className="text-warning small fw-bold") + ], className="mb-1"), + html.P([ + html.Span("Imbalance: ", className="text-muted small"), + html.Span(f"{imbalance:.3f}", className="text-info small fw-bold") + ], className="mb-2") + ])) + + # Order book ladder - Asks (top, descending) + if asks: + components.append(html.Div([ + html.H6("ASKS", className="text-danger small mb-1"), + html.Div([ + html.Div([ + html.Span(f"${ask['price']:.2f}", className="text-danger small me-2"), + html.Span(f"{ask['size']:.4f}", className="text-muted small") + ], className="d-flex justify-content-between mb-1") + for ask in asks[:5] # Top 5 asks + ], className="border-start border-danger ps-2 mb-2") + ])) + + # Current price (mid) + if bids and asks: + mid_price = (bids[0]['price'] + asks[0]['price']) / 2 + components.append(html.Div([ + html.Hr(className="my-1"), + html.P([ + html.Strong(f"${mid_price:.2f}", className="text-primary") + ], className="text-center mb-1"), + html.Hr(className="my-1") + ])) + + # Order book ladder - Bids (bottom, descending) + if bids: + components.append(html.Div([ + html.H6("BIDS", className="text-success small mb-1"), + html.Div([ + html.Div([ + html.Span(f"${bid['price']:.2f}", className="text-success small me-2"), + html.Span(f"{bid['size']:.4f}", className="text-muted small") + ], className="d-flex justify-content-between mb-1") + for bid in bids[:5] # Top 5 bids + ], className="border-start border-success ps-2") + ])) + + # Summary stats + if bids and asks: + total_bid_volume = sum(bid['size'] * bid['price'] for bid in bids[:10]) + total_ask_volume = sum(ask['size'] * ask['price'] for ask in asks[:10]) + + components.append(html.Div([ + html.Hr(className="my-2"), + html.P([ + html.Span("Bid Vol: ", className="text-muted small"), + html.Span(f"${total_bid_volume:,.0f}", className="text-success small") + ], className="mb-1"), + html.P([ + html.Span("Ask Vol: ", className="text-muted small"), + html.Span(f"${total_ask_volume:,.0f}", className="text-danger small") + ], className="mb-1") + ])) + + return components + + except Exception as e: + logger.error(f"Error creating COB ladder for {symbol}: {e}") + return [ + html.Div([ + html.H6(f"{symbol} - COB", className="text-muted mb-2"), + html.P(f"Error: {str(e)}", className="text-danger small") + ]) + ] def _initialize_unified_orchestrator_features(self): """Initialize unified orchestrator features including COB integration""" @@ -3068,6 +3252,10 @@ class CleanTradingDashboard: logging.getLogger('werkzeug').setLevel(logging.ERROR) logger.info(f"Starting Clean Trading Dashboard at http://{host}:{port}") + + # Start lazy COB integration now that dashboard is running + self._start_cob_integration_lazy() + self.app.run(host=host, port=port, debug=debug, dev_tools_silence_routes_logging=True) def stop(self): @@ -3479,47 +3667,7 @@ class CleanTradingDashboard: except Exception as e: logger.error(f"Error broadcasting COB update to UI: {e}") - def get_cob_price_buckets(self, symbol: str) -> List[Dict]: - """Get price buckets for display in dashboard""" - try: - if symbol not in self.cob_price_buckets: - return [] - - buckets = self.cob_price_buckets[symbol] - - # Sort buckets by price and return as list - sorted_buckets = [] - for price_key in sorted(buckets.keys(), key=float): - bucket = buckets[price_key] - if bucket['total_volume'] > 0: # Only return buckets with volume - sorted_buckets.append(bucket) - - return sorted_buckets - - except Exception as e: - logger.error(f"Error getting COB price buckets for {symbol}: {e}") - return [] - - def get_cob_memory_stats(self, symbol: str) -> Dict: - """Get COB memory statistics for debugging""" - try: - if symbol not in self.cob_memory: - return {} - - memory = self.cob_memory[symbol] - buffer = self.cob_data_buffer[symbol] - - return { - 'memory_snapshots': len(memory), - 'buffer_updates': len(buffer), - 'total_updates': self.cob_update_count, - 'last_update': self.last_cob_broadcast.get(symbol, 0), - 'bucket_count': len(self.cob_price_buckets.get(symbol, {})) - } - - except Exception as e: - logger.error(f"Error getting COB memory stats: {e}") - return {} + # REMOVED: Complex COB bucket methods - using simplified real order book display instead def _on_cob_cnn_features(self, symbol: str, cob_features: Dict): """Handle COB features for CNN models (next price prediction)"""