From c6094160d738caf45a1b37652e450aaa58c086f1 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 25 Jun 2025 15:57:05 +0300 Subject: [PATCH] cleanup enhanced orchestrator --- web/clean_dashboard.py | 414 +++++++++++++---------------------------- 1 file changed, 125 insertions(+), 289 deletions(-) diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index b5d935c..52882fb 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -58,8 +58,6 @@ from core.trading_executor import TradingExecutor from web.layout_manager import DashboardLayoutManager from web.component_manager import DashboardComponentManager -# Enhanced RL components are no longer available - using Basic orchestrator only -ENHANCED_RL_AVAILABLE = False try: from core.cob_integration import COBIntegration @@ -143,6 +141,7 @@ class CleanTradingDashboard: 'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0} } self.latest_cob_data = {} # Cache for COB integration data + self.cob_predictions = {} # Cache for COB predictions (both ETH and BTC for display) # Initialize timezone timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia') @@ -603,7 +602,7 @@ class CleanTradingDashboard: executed_signals = [signal for signal in self.recent_decisions if signal.get('executed', False)] if executed_signals: - # Separate by prediction type + # Separate by prediction type buy_trades = [] sell_trades = [] @@ -637,51 +636,51 @@ class CleanTradingDashboard: # Add EXECUTED BUY trades (large green circles) if buy_trades: - fig.add_trace( - go.Scatter( + fig.add_trace( + go.Scatter( x=[t['x'] for t in buy_trades], y=[t['y'] for t in buy_trades], - mode='markers', - marker=dict( + mode='markers', + marker=dict( symbol='circle', size=15, color='rgba(0, 255, 100, 0.9)', line=dict(width=3, color='green') ), name='EXECUTED BUY', - showlegend=True, + showlegend=True, hovertemplate="EXECUTED BUY TRADE
" + - "Price: $%{y:.2f}
" + - "Time: %{x}
" + - "Confidence: %{customdata:.1%}", + "Price: $%{y:.2f}
" + + "Time: %{x}
" + + "Confidence: %{customdata:.1%}", customdata=[t['confidence'] for t in buy_trades] - ), - row=row, col=1 - ) - + ), + row=row, col=1 + ) + # Add EXECUTED SELL trades (large red circles) if sell_trades: - fig.add_trace( - go.Scatter( + fig.add_trace( + go.Scatter( x=[t['x'] for t in sell_trades], y=[t['y'] for t in sell_trades], - mode='markers', - marker=dict( + mode='markers', + marker=dict( symbol='circle', size=15, color='rgba(255, 100, 100, 0.9)', line=dict(width=3, color='red') ), name='EXECUTED SELL', - showlegend=True, + showlegend=True, hovertemplate="EXECUTED SELL TRADE
" + - "Price: $%{y:.2f}
" + - "Time: %{x}
" + - "Confidence: %{customdata:.1%}", + "Price: $%{y:.2f}
" + + "Time: %{x}
" + + "Confidence: %{customdata:.1%}", customdata=[t['confidence'] for t in sell_trades] - ), - row=row, col=1 - ) + ), + row=row, col=1 + ) except Exception as e: logger.warning(f"Error adding executed trades to main chart: {e}") @@ -742,36 +741,36 @@ class CleanTradingDashboard: # Executed buy signals (solid green triangles) if executed_buys: - fig.add_trace( - go.Scatter( + fig.add_trace( + go.Scatter( x=[s['x'] for s in executed_buys], y=[s['y'] for s in executed_buys], - mode='markers', - marker=dict( + mode='markers', + marker=dict( symbol='triangle-up', - size=10, + size=10, color='rgba(0, 255, 100, 1.0)', line=dict(width=2, color='green') ), name='BUY (Executed)', showlegend=False, hovertemplate="BUY EXECUTED
" + - "Price: $%{y:.2f}
" + - "Time: %{x}
" + + "Price: $%{y:.2f}
" + + "Time: %{x}
" + "Confidence: %{customdata:.1%}", customdata=[s['confidence'] for s in executed_buys] - ), - row=row, col=1 - ) - + ), + row=row, col=1 + ) + # Pending/non-executed buy signals (hollow green triangles) if pending_buys: - fig.add_trace( - go.Scatter( + fig.add_trace( + go.Scatter( x=[s['x'] for s in pending_buys], y=[s['y'] for s in pending_buys], - mode='markers', - marker=dict( + mode='markers', + marker=dict( symbol='triangle-up', size=8, color='rgba(0, 255, 100, 0.5)', @@ -803,20 +802,20 @@ class CleanTradingDashboard: mode='markers', marker=dict( symbol='triangle-down', - size=10, + size=10, color='rgba(255, 100, 100, 1.0)', line=dict(width=2, color='red') ), name='SELL (Executed)', showlegend=False, hovertemplate="SELL EXECUTED
" + - "Price: $%{y:.2f}
" + - "Time: %{x}
" + + "Price: $%{y:.2f}
" + + "Time: %{x}
" + "Confidence: %{customdata:.1%}", customdata=[s['confidence'] for s in executed_sells] - ), - row=row, col=1 - ) + ), + row=row, col=1 + ) # Pending/non-executed sell signals (hollow red triangles) if pending_sells: @@ -1031,48 +1030,9 @@ class CleanTradingDashboard: # Check COB integration in Enhanced orchestrator if hasattr(self.orchestrator, 'cob_integration'): cob_integration = getattr(self.orchestrator, 'cob_integration', None) - if cob_integration is not None: - # Get real COB integration statistics - try: - if hasattr(cob_integration, 'get_statistics'): - cob_stats = cob_integration.get_statistics() - if cob_stats: - active_symbols = cob_stats.get('active_symbols', []) - total_updates = cob_stats.get('total_updates', 0) - provider_status = cob_stats.get('provider_status', 'Unknown') - - if active_symbols: - status['cob_status'] = f'Enhanced COB Active ({len(active_symbols)} symbols)' - status['active_symbols'] = active_symbols - status['cache_size'] = total_updates - status['provider_status'] = provider_status - else: - status['cob_status'] = 'Enhanced COB Integration Loaded (No Data)' - else: - status['cob_status'] = 'Enhanced COB Integration (Stats Unavailable)' - else: - status['cob_status'] = 'Enhanced COB Integration (No Stats Method)' - - except Exception as e: - logger.debug(f"Error getting COB statistics: {e}") - status['cob_status'] = 'Enhanced COB Integration (Error Getting Stats)' - else: - status['cob_status'] = 'Enhanced Orchestrator (COB Integration Not Initialized)' - # Don't log warning here to avoid spam, just info level - logger.debug("Enhanced orchestrator has COB integration attribute but it's None") - else: - status['cob_status'] = 'Enhanced Orchestrator Missing COB Integration' - logger.debug("Enhanced orchestrator available but has no COB integration attribute") - else: - status['cob_status'] = 'Enhanced Orchestrator Missing COB Integration' - logger.debug("Enhanced orchestrator available but has no COB integration attribute") - else: - if not ENHANCED_ORCHESTRATOR_AVAILABLE: - status['cob_status'] = 'Enhanced Orchestrator Not Available' - status['orchestrator_type'] = 'Basic (Enhanced Unavailable)' - else: - status['cob_status'] = 'Basic Orchestrator (No COB Support)' - status['orchestrator_type'] = 'Basic (Enhanced Not Used)' + # Basic orchestrator only - no enhanced COB features + status['cob_status'] = 'Basic Orchestrator (No COB Support)' + status['orchestrator_type'] = 'Basic' return status @@ -1081,38 +1041,14 @@ class CleanTradingDashboard: return {'error': str(e), 'cob_status': 'Error Getting Status', 'orchestrator_type': 'Unknown'} def _get_cob_snapshot(self, symbol: str) -> Optional[Any]: - """Get COB snapshot for symbol using enhanced orchestrator approach""" + """Get COB snapshot for symbol - Basic orchestrator has no COB features""" try: - # Get from Enhanced Orchestrator's COB integration (proper way) - if (ENHANCED_ORCHESTRATOR_AVAILABLE and - hasattr(self.orchestrator, 'cob_integration') and - self.orchestrator.__class__.__name__ == 'EnhancedTradingOrchestrator'): - - cob_integration = getattr(self.orchestrator, 'cob_integration', None) - if cob_integration is not None: - # Get real COB snapshot using the proper method - if hasattr(cob_integration, 'get_cob_snapshot'): - snapshot = cob_integration.get_cob_snapshot(symbol) - if snapshot: - logger.debug(f"Retrieved Enhanced COB snapshot for {symbol}") - return snapshot - else: - logger.debug(f"No Enhanced COB data available for {symbol}") - elif hasattr(cob_integration, 'get_consolidated_orderbook'): - # Alternative method name - snapshot = cob_integration.get_consolidated_orderbook(symbol) - if snapshot: - logger.debug(f"Retrieved Enhanced COB orderbook for {symbol}") - return snapshot - else: - logger.warning("Enhanced COB integration has no recognized snapshot method") - else: - logger.debug(f"No Enhanced COB integration available for {symbol}") - + # Basic orchestrator has no COB integration + logger.debug(f"No COB integration available for {symbol} (Basic orchestrator)") return None except Exception as e: - logger.warning(f"Error getting Enhanced COB snapshot for {symbol}: {e}") + logger.warning(f"Error getting COB snapshot for {symbol}: {e}") return None def _get_training_metrics(self) -> Dict: @@ -1176,10 +1112,10 @@ class CleanTradingDashboard: }, 'loss_5ma': cnn_last_loss, 'model_type': 'CNN', - 'description': 'Williams Market Structure CNN' + (' (Enhanced Only)' if not is_enhanced else '') + 'description': 'Williams Market Structure CNN (Basic Orchestrator - Inactive)' } loaded_models['cnn'] = cnn_model_info - + # 3. COB RL Model Status - NOT AVAILABLE IN BASIC ORCHESTRATOR cob_active = False cob_last_loss = 0.012 # Default loss value @@ -1190,12 +1126,12 @@ class CleanTradingDashboard: 'parameters': 400000000, # 400M optimized (Enhanced COB integration) 'last_prediction': { 'timestamp': datetime.now().strftime('%H:%M:%S'), - 'action': 'ENHANCED_COB_INFERENCE' if cob_active else ('INACTIVE' if is_enhanced else 'NOT_AVAILABLE'), + 'action': 'INACTIVE', 'confidence': 0.0 }, 'loss_5ma': cob_last_loss, 'model_type': 'ENHANCED_COB_RL', - 'description': 'Enhanced COB Integration' + (' (Enhanced Only)' if not is_enhanced else ''), + 'description': 'Enhanced COB Integration (Basic Orchestrator - Inactive)', 'predictions_count': cob_predictions_count } loaded_models['cob_rl'] = cob_model_info @@ -1212,14 +1148,14 @@ class CleanTradingDashboard: 'last_update': datetime.now().strftime('%H:%M:%S'), 'models_loaded': len(loaded_models), 'total_parameters': sum(m['parameters'] for m in loaded_models.values() if m['active']), - 'orchestrator_type': 'Enhanced' if is_enhanced else 'Basic' + 'orchestrator_type': 'Basic' } # COB $1 Buckets (sample data for now) metrics['cob_buckets'] = self._get_cob_dollar_buckets() return metrics - + except Exception as e: logger.error(f"Error getting training metrics: {e}") return {'error': str(e), 'loaded_models': {}, 'training_status': {'active_sessions': 0}} @@ -1272,8 +1208,8 @@ class CleanTradingDashboard: while True: try: - # Generate signals for both symbols - for symbol in ['ETH/USDT', 'BTC/USDT']: + # Generate signals for ETH only (ignore BTC) + for symbol in ['ETH/USDT']: # Only ETH signals try: # Get current price current_price = self._get_current_price(symbol) @@ -1371,7 +1307,7 @@ class CleanTradingDashboard: self.recent_decisions = self.recent_decisions[-20:] # Log signal generation - logger.info(f"Generated {signal['action']} signal for {signal['symbol']} " + logger.info(f"Generated ETH {signal['action']} signal for {signal['symbol']} " f"(conf: {signal['confidence']:.2f}, model: {signal.get('model', 'UNKNOWN')})") # DQN training not available in Basic orchestrator @@ -1720,178 +1656,58 @@ class CleanTradingDashboard: logger.error(f"Error clearing session: {e}") def _initialize_cob_integration_proper(self): - """Initialize COB integration using Enhanced Orchestrator - PROPER APPROACH""" + """Initialize COB integration - Basic orchestrator has no COB features""" try: - logger.info("Connecting to COB integration from Enhanced Orchestrator...") - - # Check if we have Enhanced Orchestrator - if not ENHANCED_ORCHESTRATOR_AVAILABLE: - logger.error("Enhanced Orchestrator not available - COB integration requires Enhanced Orchestrator") - return - - # Check if Enhanced Orchestrator has COB integration - if not hasattr(self.orchestrator, 'cob_integration'): - logger.error("Enhanced Orchestrator has no cob_integration attribute") - return - - if self.orchestrator.cob_integration is None: - logger.warning("Enhanced Orchestrator COB integration is None - needs to be started") - - # Try to start the COB integration asynchronously - def start_cob_async(): - """Start COB integration in async context""" - import asyncio - async def _start_cob(): - try: - # Start the COB integration from enhanced orchestrator - await self.orchestrator.start_cob_integration() - logger.info("COB integration started successfully from Enhanced Orchestrator") - - # Register dashboard callback if possible - if hasattr(self.orchestrator.cob_integration, 'add_dashboard_callback'): - self.orchestrator.cob_integration.add_dashboard_callback(self._on_enhanced_cob_update) - logger.info("Registered dashboard callback with Enhanced COB integration") - - except Exception as e: - logger.error(f"Error starting COB integration from Enhanced Orchestrator: {e}") - - # Run in new event loop if needed - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # If loop is already running, schedule as task - asyncio.create_task(_start_cob()) - else: - # If no loop running, run directly - loop.run_until_complete(_start_cob()) - except RuntimeError: - # No event loop, create new one - asyncio.run(_start_cob()) - - # Start COB integration in background thread to avoid blocking dashboard - import threading - cob_start_thread = threading.Thread(target=start_cob_async, daemon=True) - cob_start_thread.start() - logger.info("Enhanced COB integration startup initiated in background") - - else: - # COB integration already exists, just register callback - cob_integration = self.orchestrator.cob_integration - logger.info(f"Enhanced COB integration found: {type(cob_integration)}") - - # Register callbacks if available - if hasattr(cob_integration, 'add_dashboard_callback'): - cob_integration.add_dashboard_callback(self._on_enhanced_cob_update) - logger.info("Registered dashboard callback with existing Enhanced COB integration") - - # Verify COB integration is active and working - if hasattr(cob_integration, 'get_statistics'): - try: - stats = cob_integration.get_statistics() - logger.info(f"Enhanced COB statistics: {stats}") - except Exception as e: - logger.debug(f"Could not get COB statistics: {e}") - - logger.info("Enhanced COB integration connection completed") - logger.info("NO SIMULATION - Using Enhanced Orchestrator real market data only") + logger.info("Basic orchestrator has no COB integration features") + logger.info("COB integration not available with Basic orchestrator") except Exception as e: - logger.error(f"CRITICAL: Failed to connect to Enhanced COB integration: {e}") - logger.error("Dashboard will operate without COB data") - + logger.error(f"Error in COB integration init: {e}") + def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict): - """Handle Enhanced COB data updates - NO SIMULATION""" + """Handle Enhanced COB data updates - Basic orchestrator has no COB features""" try: - # Process Enhanced COB data update - current_time = time.time() - - # Update cache with Enhanced COB data (same format as cob_realtime_dashboard.py) - if symbol not in self.cob_cache: - self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} - - self.cob_cache[symbol] = { - 'last_update': current_time, - 'data': cob_data, - 'updates_count': self.cob_cache[symbol].get('updates_count', 0) + 1 - } - - # Also update latest_cob_data for compatibility - self.latest_cob_data[symbol] = cob_data - - # Log Enhanced COB data updates - update_count = self.cob_cache[symbol]['updates_count'] - if update_count % 50 == 0: # Every 50 Enhanced updates - logger.info(f"[ENHANCED-COB] {symbol} - Enhanced update #{update_count}") - + logger.debug("Enhanced COB updates not available with Basic orchestrator") except Exception as e: - logger.error(f"Error handling Enhanced COB update for {symbol}: {e}") + logger.error(f"Error handling COB update for {symbol}: {e}") def _start_cob_data_subscription(self): - """Start COB data subscription with proper caching""" + """Start COB data subscription - Basic orchestrator has no COB features""" try: - # Start the COB RL trader asynchronously - import asyncio - - def start_cob_trader(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self.cob_rl_trader.start()) - logger.info("COB RL trader started successfully") - except Exception as e: - logger.error(f"Error in COB trader loop: {e}") - finally: - loop.close() - - # Start in separate thread to avoid blocking - import threading - cob_thread = threading.Thread(target=start_cob_trader, daemon=True) - cob_thread.start() - + logger.info("COB data subscription not available with Basic orchestrator") except Exception as e: - logger.error(f"Error starting COB data subscription: {e}") + logger.error(f"Error in COB subscription: {e}") def _on_cob_prediction(self, prediction: PredictionResult): - """Handle COB RL predictions""" + """Handle COB RL predictions - Display both ETH and BTC for reference""" try: - with self.cob_lock: - # Convert prediction to dashboard format - prediction_data = { - 'timestamp': prediction.timestamp, - 'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP - 'confidence': prediction.confidence, - 'predicted_change': prediction.predicted_change, - 'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction], - 'color': ['red', 'gray', 'green'][prediction.predicted_direction] - } + # Convert prediction to dashboard format + prediction_data = { + 'timestamp': prediction.timestamp, + 'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP + 'confidence': prediction.confidence, + 'predicted_change': prediction.predicted_change, + 'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction], + 'color': ['red', 'gray', 'green'][prediction.predicted_direction] + } + + # Add predictions to cache for both ETH and BTC (for reference/display) + if hasattr(prediction, 'symbol') and prediction.symbol: + symbol = prediction.symbol + # Store predictions for display (both ETH and BTC) + if symbol not in self.cob_predictions: + self.cob_predictions[symbol] = deque(maxlen=100) - # Add to predictions cache - self.cob_predictions[prediction.symbol].append(prediction_data) + self.cob_predictions[symbol].append(prediction_data) - # Cache COB data (1s buckets for 1 day max, 5 min retention) - current_time = datetime.now() - cob_data = { - 'timestamp': current_time, - 'prediction': prediction_data, - 'features': prediction.features.tolist() if prediction.features is not None else [] - } - - # Add to 1d cache (1s buckets) - self.cob_data_cache_1d[prediction.symbol].append(cob_data) - - # Add to raw ticks cache (15 seconds max, 10+ updates/sec) - self.cob_raw_ticks[prediction.symbol].append({ - 'timestamp': current_time, - 'prediction': prediction_data, - 'raw_features': prediction.features.tolist() if prediction.features is not None else [] - }) - - logger.debug(f"COB prediction cached for {prediction.symbol}: " + # Log all predictions but note that only ETH generates trading signals + signal_note = " (TRADING ENABLED)" if 'ETH' in symbol.upper() else " (REFERENCE ONLY)" + logger.debug(f"COB prediction cached for {symbol}{signal_note}: " f"{prediction_data['direction_text']} (confidence: {prediction.confidence:.3f})") except Exception as e: logger.error(f"Error handling COB prediction: {e}") - + def _connect_to_orchestrator(self): """Connect to orchestrator for real trading signals""" try: @@ -1905,8 +1721,20 @@ class CleanTradingDashboard: logger.error(f"Error connecting to orchestrator: {e}") def _on_trading_decision(self, decision): - """Handle trading decision from orchestrator""" + """Handle trading decision from orchestrator - Filter to show only ETH signals""" try: + # Check if this decision is for ETH/USDT - ignore all BTC signals + symbol = None + if hasattr(decision, 'symbol'): + symbol = decision.symbol + elif isinstance(decision, dict) and 'symbol' in decision: + symbol = decision.get('symbol') + + # Only process ETH signals, ignore BTC + if symbol and 'BTC' in symbol.upper(): + logger.debug(f"Ignoring BTC signal: {symbol}") + return + # Convert orchestrator decision to dashboard format # Handle both TradingDecision objects and dictionary formats if hasattr(decision, 'action'): @@ -1916,6 +1744,7 @@ class CleanTradingDashboard: 'action': decision.action, 'confidence': decision.confidence, 'price': decision.price, + 'symbol': getattr(decision, 'symbol', 'ETH/USDT'), # Add symbol field 'executed': True, # Orchestrator decisions are executed 'blocked': False, 'manual': False @@ -1927,17 +1756,24 @@ class CleanTradingDashboard: 'action': decision.get('action', 'UNKNOWN'), 'confidence': decision.get('confidence', 0), 'price': decision.get('price', 0), + 'symbol': decision.get('symbol', 'ETH/USDT'), # Add symbol field 'executed': True, # Orchestrator decisions are executed 'blocked': False, 'manual': False } - # Add to recent decisions - self.recent_decisions.append(dashboard_decision) - - # Keep only last 50 decisions - if len(self.recent_decisions) > 50: - self.recent_decisions = self.recent_decisions[-50:] + # Only show ETH signals in dashboard + if dashboard_decision['symbol'] and 'ETH' in dashboard_decision['symbol'].upper(): + # Add to recent decisions + self.recent_decisions.append(dashboard_decision) + + # Keep only last 50 decisions + if len(self.recent_decisions) > 50: + self.recent_decisions = self.recent_decisions[-50:] + + logger.info(f"ETH signal added to dashboard: {dashboard_decision['action']} (conf: {dashboard_decision['confidence']:.2f})") + else: + logger.debug(f"Non-ETH signal ignored: {dashboard_decision.get('symbol', 'UNKNOWN')}") except Exception as e: logger.error(f"Error handling trading decision: {e}") @@ -2130,9 +1966,9 @@ class CleanTradingDashboard: self.tick_cache = self.tick_cache[-1000:] if 'ohlcv' in data_packet: - # Update multi-timeframe data + # Update multi-timeframe data for both ETH and BTC (BTC for reference) multi_tf_data = data_packet.get('multi_timeframe', {}) - for symbol in ['ETH/USDT', 'BTC/USDT']: + for symbol in ['ETH/USDT', 'BTC/USDT']: # Process both ETH and BTC data if symbol in multi_tf_data: for timeframe in ['1s', '1m', '1h', '1d']: if timeframe in multi_tf_data[symbol]: @@ -2321,4 +2157,4 @@ def create_clean_dashboard(data_provider: Optional[DataProvider] = None, orchest data_provider=data_provider, orchestrator=orchestrator, trading_executor=trading_executor - ) \ No newline at end of file + ) \ No newline at end of file