diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index a080365..3fd7e1e 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -171,13 +171,10 @@ class CleanTradingDashboard: self.cob_price_buckets: dict = {} # Price bucket cache self.cob_update_count = 0 self.last_cob_broadcast: dict = {} # Rate limiting for UI updates - - # Initialize COB memory for each symbol - for symbol in ['ETH/USDT', 'BTC/USDT']: - self.cob_data_buffer[symbol] = deque(maxlen=100) # Last 100 updates (1-2 seconds at 50-100 Hz) - self.cob_memory[symbol] = deque(maxlen=50) # Memory of last 50 significant snapshots - self.cob_price_buckets[symbol] = {} - self.last_cob_broadcast[symbol] = 0 + self.cob_data_history: Dict[str, deque] = { + 'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots + 'BTC/USDT': deque(maxlen=61) + } # Initialize timezone timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia') @@ -431,14 +428,16 @@ class CleanTradingDashboard: [Input('interval-component', 'n_intervals')] ) def update_cob_data(n): - """Update COB data displays with real order book ladders""" + """Update COB data displays with real order book ladders and cumulative stats""" try: - # Get real COB data from the working integration eth_snapshot = self._get_cob_snapshot('ETH/USDT') btc_snapshot = self._get_cob_snapshot('BTC/USDT') - eth_components = self.component_manager.format_cob_data(eth_snapshot, 'ETH/USDT') - btc_components = self.component_manager.format_cob_data(btc_snapshot, 'BTC/USDT') + eth_imbalance_stats = self._calculate_cumulative_imbalance('ETH/USDT') + btc_imbalance_stats = self._calculate_cumulative_imbalance('BTC/USDT') + + eth_components = self.component_manager.format_cob_data(eth_snapshot, 'ETH/USDT', eth_imbalance_stats) + btc_components = self.component_manager.format_cob_data(btc_snapshot, 'BTC/USDT', btc_imbalance_stats) return eth_components, btc_components @@ -2658,24 +2657,6 @@ class CleanTradingDashboard: # Basic orchestrator doesn't have DQN features return - # EXAMPLE OF WHAT WE SHOULD NEVER DO!!! use only real data or report we have no data - # def _get_cob_dollar_buckets(self) -> List[Dict]: - # """Get COB $1 price buckets with volume data""" - # try: - # # This would normally come from the COB integration - # # For now, return sample data structure - # sample_buckets = [ - # {'price': 2000, 'total_volume': 150000, 'bid_pct': 45, 'ask_pct': 55}, - # {'price': 2001, 'total_volume': 120000, 'bid_pct': 52, 'ask_pct': 48}, - # {'price': 1999, 'total_volume': 98000, 'bid_pct': 38, 'ask_pct': 62}, - # {'price': 2002, 'total_volume': 87000, 'bid_pct': 60, 'ask_pct': 40}, - # {'price': 1998, 'total_volume': 76000, 'bid_pct': 35, 'ask_pct': 65} - # ] - # return sample_buckets - # except Exception as e: - # logger.debug(f"Error getting COB buckets: {e}") - # return [] - def _execute_manual_trade(self, action: str): """Execute manual trading action - ENHANCED with PERSISTENT SIGNAL STORAGE""" try: @@ -3020,7 +3001,7 @@ class CleanTradingDashboard: logger.debug(f"Error getting DQN state: {e}") return {} - def _get_cob_features_for_training(self, symbol: str) -> Dict[str, Any]: + def _get_cob_features_for_training(self, symbol: str, current_price: float) -> Dict[str, Any]: """Get COB features for training""" try: cob_data = {} @@ -3658,626 +3639,6 @@ class CleanTradingDashboard: 'model_feeding_active': False } - def _on_cob_cnn_features(self, symbol: str, cob_features: Dict): - """Handle COB features for CNN models (next price prediction)""" - try: - if symbol != 'ETH/USDT': # Only process ETH for trading - return - - features = cob_features.get('features') - timestamp = cob_features.get('timestamp') - - if features is not None: - # Store latest COB features for CNN prediction - if not hasattr(self, 'latest_cob_features'): - self.latest_cob_features = {} - - self.latest_cob_features[symbol] = { - 'features': features, - 'timestamp': timestamp, - 'feature_count': len(features) if hasattr(features, '__len__') else 0 - } - - logger.debug(f"Updated CNN COB features for {symbol}: {len(features)} features") - - except Exception as e: - logger.error(f"Error handling COB CNN features for {symbol}: {e}") - - def _on_cob_dqn_features(self, symbol: str, cob_state: Dict): - """Handle COB state features for DQN/RL models""" - try: - if symbol != 'ETH/USDT': # Only process ETH for trading - return - - state = cob_state.get('state') - timestamp = cob_state.get('timestamp') - - if state is not None: - # Store latest COB state for DQN - if not hasattr(self, 'latest_cob_state'): - self.latest_cob_state = {} - - self.latest_cob_state[symbol] = { - 'state': state, - 'timestamp': timestamp, - 'state_size': len(state) if hasattr(state, '__len__') else 0 - } - - logger.debug(f"Updated DQN COB state for {symbol}: {len(state)} features") - - except Exception as e: - logger.error(f"Error handling COB DQN state for {symbol}: {e}") - - def _connect_to_orchestrator(self): - """Connect to orchestrator for real trading signals""" - try: - if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'): - # Register callback to receive trading decisions - self.orchestrator.add_decision_callback(self._on_trading_decision) - logger.info("Connected to orchestrator for trading signals") - else: - logger.warning("Orchestrator not available or doesn't support callbacks") - except Exception as e: - logger.error(f"Error connecting to orchestrator: {e}") - - async def _on_trading_decision(self, decision): - """Handle trading decision from orchestrator - Filter to show only ETH BUY/SELL signals""" - try: - # Check action first - completely ignore HOLD signals - action = None - if hasattr(decision, 'action'): - action = decision.action - elif isinstance(decision, dict) and 'action' in decision: - action = decision.get('action') - - # Completely skip HOLD signals - don't log or process them at all - if action == 'HOLD': - return - - # Check if this decision is for ETH/USDT - ignore all BTC signals - symbol = None - if hasattr(decision, 'symbol'): - symbol = decision.symbol - elif isinstance(decision, dict) and 'symbol' in decision: - symbol = decision.get('symbol') - - # Only process ETH signals, ignore BTC - if symbol and 'BTC' in symbol.upper(): - logger.debug(f"Ignoring BTC signal: {symbol}") - return - - # Convert orchestrator decision to dashboard format with ENHANCED PERSISTENCE - # Handle both TradingDecision objects and dictionary formats - now = datetime.now() - if hasattr(decision, 'action'): - # This is a TradingDecision object (dataclass) - dashboard_decision = { - 'timestamp': now, # UNIFIED: Use datetime object directly throughout - 'action': decision.action, - 'confidence': decision.confidence, - 'price': decision.price, - 'symbol': getattr(decision, 'symbol', 'ETH/USDT'), # Add symbol field - 'executed': True, # Orchestrator decisions are executed - 'blocked': False, - 'manual': False, # ML-generated trade - 'source': 'ORCHESTRATOR', # Mark source for tracking - 'persistent': True, # MARK for persistent display - 'chart_priority': 'HIGH', # High priority for chart display - 'model_generated': True # CRITICAL: Mark as ML-generated - } - else: - # This is a dictionary format - dashboard_decision = { - 'timestamp': now, # UNIFIED: Use datetime object directly throughout - 'action': decision.get('action', 'UNKNOWN'), - 'confidence': decision.get('confidence', 0), - 'price': decision.get('price', 0), - 'symbol': decision.get('symbol', 'ETH/USDT'), # Add symbol field - 'executed': True, # Orchestrator decisions are executed - 'blocked': False, - 'manual': False, # ML-generated trade - 'source': 'ORCHESTRATOR', # Mark source for tracking - 'persistent': True, # MARK for persistent display - 'chart_priority': 'HIGH', # High priority for chart display - 'model_generated': True # CRITICAL: Mark as ML-generated - } - - # Only show ETH signals in dashboard - if dashboard_decision['symbol'] and 'ETH' in dashboard_decision['symbol'].upper(): - # EXECUTE ORCHESTRATOR SIGNALS THROUGH TRADING EXECUTOR - action = dashboard_decision['action'] - confidence = dashboard_decision['confidence'] - symbol = dashboard_decision['symbol'] - - if action in ['BUY', 'SELL'] and self.trading_executor: - try: - # Execute orchestrator signal with small size - result = self.trading_executor.execute_trade(symbol, action, 0.005) - if result: - dashboard_decision['executed'] = True - logger.info(f"EXECUTED orchestrator {action} signal: {symbol} @ ${dashboard_decision['price']:.2f} (conf: {confidence:.2f})") - - # Sync position from trading executor after execution - self._sync_position_from_executor(symbol) - else: - dashboard_decision['executed'] = False - dashboard_decision['blocked'] = True - dashboard_decision['block_reason'] = "Trading executor failed" - logger.warning(f"BLOCKED orchestrator {action} signal: executor failed") - except Exception as e: - dashboard_decision['executed'] = False - dashboard_decision['blocked'] = True - dashboard_decision['block_reason'] = f"Execution error: {str(e)}" - logger.error(f"ERROR executing orchestrator {action} signal: {e}") - else: - # HOLD signals or no trading executor - dashboard_decision['executed'] = True if action == 'HOLD' else False - - # ENHANCED: Add to recent decisions with PRIORITY PRESERVATION for ML-generated signals - self.recent_decisions.append(dashboard_decision) - - # CONSERVATIVE: Keep MORE decisions for longer history - extend to 300 decisions - if len(self.recent_decisions) > 300: - # When trimming, PRESERVE ML-GENERATED TRADES and MANUAL TRADES at higher priority - manual_decisions = [d for d in self.recent_decisions if self._get_signal_attribute(d, 'manual', False)] - ml_decisions = [d for d in self.recent_decisions if self._get_signal_attribute(d, 'model_generated', False)] - other_decisions = [d for d in self.recent_decisions if not self._get_signal_attribute(d, 'manual', False) and not self._get_signal_attribute(d, 'model_generated', False)] - - # Keep all manual + ML decisions + most recent other decisions - priority_decisions = manual_decisions + ml_decisions - max_other_decisions = 300 - len(priority_decisions) - if max_other_decisions > 0: - trimmed_decisions = priority_decisions + other_decisions[-max_other_decisions:] - else: - # If too many priority decisions, keep most recent ones - trimmed_decisions = priority_decisions[-300:] - - self.recent_decisions = trimmed_decisions - logger.debug(f"Trimmed decisions: kept {len(manual_decisions)} manual + {len(ml_decisions)} ML + {len(trimmed_decisions) - len(priority_decisions)} other") - - execution_status = "EXECUTED" if dashboard_decision['executed'] else "BLOCKED" if dashboard_decision.get('blocked') else "PENDING" - logger.info(f"[ML-{execution_status}] ETH orchestrator signal: {dashboard_decision['action']} (conf: {dashboard_decision['confidence']:.2f}) - Enhanced persistence") - else: - logger.debug(f"Non-ETH signal ignored: {dashboard_decision.get('symbol', 'UNKNOWN')}") - - except Exception as e: - logger.error(f"Error handling trading decision: {e}") - - def _initialize_streaming(self): - """Initialize data streaming""" - try: - # Start WebSocket streaming - self._start_websocket_streaming() - - # Start data collection thread - self._start_data_collection() - - logger.info("Data streaming initialized") - - except Exception as e: - logger.error(f"Error initializing streaming: {e}") - - def _start_websocket_streaming(self): - """Start WebSocket streaming for real-time data - NO COB SIMULATION""" - try: - def ws_worker(): - try: - import websocket - import json - - def on_message(ws, message): - try: - data = json.loads(message) - if 'k' in data: # Kline data - kline = data['k'] - # Process ALL klines (both open and closed) for real-time updates - tick_record = { - 'symbol': 'ETHUSDT', - 'datetime': datetime.fromtimestamp(int(kline['t']) / 1000), - 'open': float(kline['o']), - 'high': float(kline['h']), - 'low': float(kline['l']), - 'close': float(kline['c']), - 'price': float(kline['c']), # For compatibility - 'volume': float(kline['v']), # Real volume data! - 'is_closed': kline['x'] # Track if kline is closed - } - - # Update current price every second - current_price = float(kline['c']) - self.ws_price_cache['ETHUSDT'] = current_price - self.current_prices['ETH/USDT'] = current_price - - # Add to tick cache (keep last 1000 klines for charts) - # For real-time updates, we need more data points - self.tick_cache.append(tick_record) - if len(self.tick_cache) > 1000: - self.tick_cache = self.tick_cache[-1000:] - # Clear old signals when tick cache is trimmed - self._clear_old_signals_for_tick_range() - - # NO COB SIMULATION - Real COB data comes from enhanced orchestrator - - status = "CLOSED" if kline['x'] else "LIVE" - logger.debug(f"[WS] {status} kline: {current_price:.2f}, Vol: {tick_record['volume']:.0f} (cache: {len(self.tick_cache)})") - except Exception as e: - logger.warning(f"WebSocket message error: {e}") - - def on_error(ws, error): - logger.error(f"WebSocket error: {error}") - self.is_streaming = False - - def on_close(ws, close_status_code, close_msg): - logger.warning("WebSocket connection closed") - self.is_streaming = False - - def on_open(ws): - logger.info("WebSocket connected") - self.is_streaming = True - - # Binance WebSocket - Use kline stream for OHLCV data - ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s" - - ws = websocket.WebSocketApp( - ws_url, - on_message=on_message, - on_error=on_error, - on_close=on_close, - on_open=on_open - ) - - ws.run_forever() - - except Exception as e: - logger.error(f"WebSocket worker error: {e}") - self.is_streaming = False - - # Start WebSocket thread - ws_thread = threading.Thread(target=ws_worker, daemon=True) - ws_thread.start() - - # NO COB SIMULATION - Real COB data managed by enhanced orchestrator - - except Exception as e: - logger.error(f"Error starting WebSocket: {e}") - - def _start_data_collection(self): - """Start background data collection""" - try: - def data_worker(): - while True: - try: - # Update recent decisions from orchestrator - if self.orchestrator and hasattr(self.orchestrator, 'get_recent_decisions'): - decisions = self.orchestrator.get_recent_decisions('ETH/USDT') - if decisions: - self.recent_decisions = decisions[-20:] # Keep last 20 - - # Update closed trades - if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'): - trades = self.trading_executor.get_closed_trades() - if trades: - self.closed_trades = trades - - # Update session metrics - self._update_session_metrics() - - time.sleep(5) # Update every 5 seconds - - except Exception as e: - logger.warning(f"Data collection error: {e}") - time.sleep(10) # Wait longer on error - - # Start data collection thread - data_thread = threading.Thread(target=data_worker, daemon=True) - data_thread.start() - - except Exception as e: - logger.error(f"Error starting data collection: {e}") - - def _get_btc_reference_for_eth_training(self) -> Optional[Dict]: - """Get BTC reference data for ETH model training""" - try: - btc_reference = {} - - # BTC price buckets - if 'BTC/USDT' in self.cob_price_buckets: - btc_reference['price_buckets'] = self.cob_price_buckets['BTC/USDT'].copy() - - # BTC COB features - if hasattr(self, 'latest_cob_features') and 'BTC/USDT' in self.latest_cob_features: - btc_reference['cnn_features'] = self.latest_cob_features['BTC/USDT'] - - # BTC current price - btc_price = self._get_current_price('BTC/USDT') - if btc_price: - btc_reference['current_price'] = btc_price - - return btc_reference if btc_reference else None - - except Exception as e: - logger.debug(f"Error getting BTC reference: {e}") - return None - - def _start_actual_training_if_needed(self): - """Start actual model training with real data collection and training loops""" - try: - if not self.orchestrator: - logger.warning("No orchestrator available for training") - return - - logger.info("TRAINING: Starting actual training system with real data collection") - - # Start comprehensive training system - self._start_real_training_system() - - except Exception as e: - logger.error(f"Error starting comprehensive training system: {e}") - - def _start_real_training_system(self): - """Start real training system with data collection and actual model training""" - try: - def training_coordinator(): - """Coordinate all training activities""" - logger.info("TRAINING: Real training coordinator started") - - # Initialize training counters - training_iteration = 0 - last_dqn_training = 0 - last_cnn_training = 0 - - while True: - try: - training_iteration += 1 - current_time = time.time() - - # 1. Collect real market data for training - market_data = self._collect_training_data() - if market_data: - logger.debug(f"TRAINING: Collected {len(market_data)} market data points for training") - - # 2. Train DQN agent every 30 seconds with real experiences - if current_time - last_dqn_training > 30: - self._perform_real_dqn_training(market_data) - last_dqn_training = current_time - - # 3. Train CNN model every 45 seconds with real price data - if current_time - last_cnn_training > 45: - self._perform_real_cnn_training(market_data) - last_cnn_training = current_time - - # 4. Update training metrics - self._update_training_progress(training_iteration) - - # Log training activity every 10 iterations - if training_iteration % 10 == 0: - logger.info(f"TRAINING: Iteration {training_iteration} - DQN memory: {self._get_dqn_memory_size()}, CNN batches: {training_iteration // 10}") - - # Wait 10 seconds before next training cycle - time.sleep(10) - - except Exception as e: - logger.error(f"TRAINING: Error in training iteration {training_iteration}: {e}") - time.sleep(30) # Wait longer on error - - # Start training coordinator in background - import threading - training_thread = threading.Thread(target=training_coordinator, daemon=True) - training_thread.start() - - logger.info("TRAINING: Real training system started successfully") - - except Exception as e: - logger.error(f"Error starting real training system: {e}") - - def _collect_training_data(self) -> List[Dict]: - """Collect real market data for training""" - try: - training_data = [] - - # 1. Get current market state - current_price = self._get_current_price('ETH/USDT') - if not current_price: - return training_data - - # 2. Get recent price history - df = self.data_provider.get_historical_data('ETH/USDT', '1m', limit=50) - if df is not None and not df.empty: - # Create training samples from price movements - for i in range(1, min(len(df), 20)): # Last 20 price movements - prev_price = float(df['close'].iloc[i-1]) - curr_price = float(df['close'].iloc[i]) - price_change = (curr_price - prev_price) / prev_price - - # Create training sample - sample = { - 'timestamp': df.index[i], - 'price': curr_price, - 'prev_price': prev_price, - 'price_change': price_change, - 'volume': float(df['volume'].iloc[i]), - 'action': 'BUY' if price_change > 0.001 else 'SELL' if price_change < -0.001 else 'HOLD' - } - training_data.append(sample) - - # 3. Add WebSocket tick data if available - if hasattr(self, 'tick_cache') and len(self.tick_cache) > 10: - recent_ticks = self.tick_cache[-10:] # Last 10 ticks - for tick in recent_ticks: - sample = { - 'timestamp': tick.get('datetime', datetime.now()), - 'price': tick.get('price', current_price), - 'volume': tick.get('volume', 0), - 'tick_data': True - } - training_data.append(sample) - - return training_data - - except Exception as e: - logger.error(f"Error collecting training data: {e}") - return [] - - def _perform_real_dqn_training(self, market_data: List[Dict]): - """Perform actual DQN training with real market experiences""" - try: - if not self.orchestrator or not hasattr(self.orchestrator, 'rl_agent') or not self.orchestrator.rl_agent: - return - - agent = self.orchestrator.rl_agent - training_samples = 0 - - # 1. Add real market experiences to memory - for data in market_data[-10:]: # Last 10 data points - try: - # Create state from market data - price = data.get('price', 0) - prev_price = data.get('prev_price', price) - price_change = data.get('price_change', 0) - volume = data.get('volume', 0) - - # Normalize state features - state = np.array([ - price / 10000, # Normalized price - price_change, # Price change ratio - volume / 1000000, # Normalized volume - 1.0 if price > prev_price else 0.0, # Price direction - abs(price_change) * 100, # Volatility measure - ]) - - # Pad state to expected size - if hasattr(agent, 'state_dim') and len(state) < agent.state_dim: - padded_state = np.zeros(agent.state_dim) - padded_state[:len(state)] = state - state = padded_state - elif len(state) < 100: # Default DQN state size - padded_state = np.zeros(100) - padded_state[:len(state)] = state - state = padded_state - - # Determine action and reward - action = 0 if price_change > 0 else 1 # 0=BUY, 1=SELL - reward = price_change * 1000 # Scale reward - - # Add to memory - next_state = state # Simplified - done = False - agent.remember(state, action, reward, next_state, done) - training_samples += 1 - - except Exception as e: - logger.debug(f"Error adding market experience to DQN memory: {e}") - - # 2. Perform training if enough samples - if hasattr(agent, 'memory') and len(agent.memory) >= 32: # Batch size - for _ in range(3): # 3 training steps - try: - loss = agent.replay() - if loss is not None: - # Update model state with real loss - self.orchestrator.update_model_loss('dqn', loss) - logger.debug(f"DQN training step: loss={loss:.6f}") - - # Update losses list for progress tracking - if not hasattr(agent, 'losses'): - agent.losses = [] - agent.losses.append(loss) - - # Keep last 1000 losses - if len(agent.losses) > 1000: - agent.losses = agent.losses[-1000:] - - except Exception as e: - logger.debug(f"DQN training step failed: {e}") - - logger.info(f"DQN TRAINING: Added {training_samples} experiences, memory size: {len(agent.memory)}") - - except Exception as e: - logger.error(f"Error in real DQN training: {e}") - - def _perform_real_cnn_training(self, market_data: List[Dict]): - """Perform actual CNN training with real price prediction""" - try: - if not self.orchestrator or not hasattr(self.orchestrator, 'cnn_model') or not self.orchestrator.cnn_model: - return - - model = self.orchestrator.cnn_model - - # 1. Prepare training data from market data - if len(market_data) < 10: - return - - training_samples = 0 - - # 2. Create price prediction training samples - for i in range(len(market_data) - 1): - try: - current_data = market_data[i] - next_data = market_data[i + 1] - - # Create input features - current_price = current_data.get('price', 0) - next_price = next_data.get('price', current_price) - price_change = (next_price - current_price) / current_price if current_price > 0 else 0 - - # Simple feature vector for CNN input - features = np.random.randn(100) # Random features for now - features[0] = current_price / 10000 # Normalized price - features[1] = price_change # Price change - features[2] = current_data.get('volume', 0) / 1000000 # Normalized volume - - # Target: price direction (0=down, 1=stable, 2=up) - if price_change > 0.001: - target = 2 # UP - elif price_change < -0.001: - target = 0 # DOWN - else: - target = 1 # STABLE - - # Simulate training step - if hasattr(model, 'forward'): - # Convert to torch tensors if needed - import torch - if torch.cuda.is_available(): - device = torch.device('cuda') - else: - device = torch.device('cpu') - - features_tensor = torch.FloatTensor(features).unsqueeze(0).to(device) - target_tensor = torch.LongTensor([target]).to(device) - - # Forward pass (simulate training) - model.train() - outputs = model(features_tensor) - - # Calculate loss (simulate) - loss_fn = torch.nn.CrossEntropyLoss() - loss = loss_fn(outputs['main_output'], target_tensor) - - # Update model state with real loss - loss_value = float(loss.item()) - self.orchestrator.update_model_loss('cnn', loss_value) - - # Update losses list for progress tracking - if not hasattr(model, 'losses'): - model.losses = [] - model.losses.append(loss_value) - - # Keep last 1000 losses - if len(model.losses) > 1000: - model.losses = model.losses[-1000:] - - training_samples += 1 - - except Exception as e: - logger.debug(f"CNN training sample failed: {e}") - - if training_samples > 0: - logger.info(f"CNN TRAINING: Processed {training_samples} price prediction samples") - - except Exception as e: - logger.error(f"Error in real CNN training: {e}") - def _update_training_progress(self, iteration: int): """Update training progress and metrics""" try: @@ -4405,19 +3766,6 @@ class CleanTradingDashboard: 'total_pnl': 0.0 } - # Remove the old broken training methods - def _start_dqn_training_session(self): - """Replaced by _perform_real_dqn_training""" - pass - - def _start_cnn_training_session(self): - """Replaced by _perform_real_cnn_training""" - pass - - def _start_extrema_training_session(self): - """Replaced by real training system""" - pass - def run_server(self, host='127.0.0.1', port=8050, debug=False): """Start the Dash server""" try: @@ -4427,6 +3775,329 @@ class CleanTradingDashboard: logger.error(f"Error starting dashboard server: {e}") raise + def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]: + """Calculate average imbalance over multiple time windows.""" + stats = {} + now = time.time() + history = self.cob_data_history.get(symbol) + + if not history: + return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0} + + periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60} + + for name, duration in periods.items(): + recent_imbalances = [] + for snap in history: + # Check if snap is a valid object with timestamp and stats + if hasattr(snap, 'timestamp') and (now - snap.timestamp <= duration) and hasattr(snap, 'stats') and snap.stats: + imbalance = snap.stats.get('imbalance') + if imbalance is not None: + recent_imbalances.append(imbalance) + + if recent_imbalances: + stats[name] = sum(recent_imbalances) / len(recent_imbalances) + else: + stats[name] = 0.0 + + return stats + + def _connect_to_orchestrator(self): + """Connect to orchestrator for real trading signals""" + try: + if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'): + def connect_worker(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.orchestrator.add_decision_callback(self._on_trading_decision)) + logger.info("Successfully connected to orchestrator for trading signals.") + except Exception as e: + logger.error(f"Orchestrator connection worker failed: {e}") + thread = threading.Thread(target=connect_worker, daemon=True) + thread.start() + else: + logger.warning("Orchestrator not available or doesn't support callbacks") + except Exception as e: + logger.error(f"Error initiating orchestrator connection: {e}") + + async def _on_trading_decision(self, decision): + """Handle trading decision from orchestrator.""" + try: + action = getattr(decision, 'action', decision.get('action')) + if action == 'HOLD': + return + symbol = getattr(decision, 'symbol', decision.get('symbol', 'ETH/USDT')) + if 'ETH' not in symbol.upper(): + return + dashboard_decision = asdict(decision) if not isinstance(decision, dict) else decision.copy() + dashboard_decision['timestamp'] = datetime.now() + dashboard_decision['executed'] = False + self.recent_decisions.append(dashboard_decision) + if len(self.recent_decisions) > 200: + self.recent_decisions.pop(0) + logger.info(f"[ORCHESTRATOR SIGNAL] Received: {action} for {symbol}") + except Exception as e: + logger.error(f"Error handling trading decision: {e}") + + def _initialize_streaming(self): + """Initialize data streaming""" + try: + self._start_websocket_streaming() + self._start_data_collection() + logger.info("Data streaming initialized") + except Exception as e: + logger.error(f"Error initializing streaming: {e}") + + def _start_websocket_streaming(self): + """Start WebSocket streaming for real-time data.""" + ws_thread = threading.Thread(target=self._ws_worker, daemon=True) + ws_thread.start() + + def _ws_worker(self): + try: + import websocket + def on_message(ws, message): + try: + data = json.loads(message) + if 'k' in data: + kline = data['k'] + tick_record = { + 'symbol': 'ETHUSDT', + 'datetime': datetime.fromtimestamp(int(kline['t']) / 1000), + 'open': float(kline['o']), + 'high': float(kline['h']), + 'low': float(kline['l']), + 'close': float(kline['c']), + 'price': float(kline['c']), + 'volume': float(kline['v']), + } + self.ws_price_cache['ETHUSDT'] = tick_record['price'] + self.current_prices['ETH/USDT'] = tick_record['price'] + self.tick_cache.append(tick_record) + if len(self.tick_cache) > 1000: + self.tick_cache.pop(0) + except Exception as e: + logger.warning(f"WebSocket message error: {e}") + def on_error(ws, error): + logger.error(f"WebSocket error: {error}") + self.is_streaming = False + def on_close(ws, close_status_code, close_msg): + logger.warning("WebSocket connection closed") + self.is_streaming = False + def on_open(ws): + logger.info("WebSocket connected") + self.is_streaming = True + ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s" + ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open) + ws.run_forever() + except Exception as e: + logger.error(f"WebSocket worker error: {e}") + self.is_streaming = False + + def _start_data_collection(self): + """Start background data collection""" + data_thread = threading.Thread(target=self._data_worker, daemon=True) + data_thread.start() + + def _data_worker(self): + while True: + try: + self._update_session_metrics() + time.sleep(5) + except Exception as e: + logger.warning(f"Data collection error: {e}") + time.sleep(10) + + def _update_session_metrics(self): + """Update session P&L and total fees from closed trades.""" + try: + closed_trades = [] + if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'): + closed_trades = self.trading_executor.get_closed_trades() + self.closed_trades = closed_trades + if closed_trades: + self.session_pnl = sum(trade.get('pnl', 0) for trade in closed_trades) + self.total_fees = sum(trade.get('fees', 0) for trade in closed_trades) + else: + self.session_pnl = 0.0 + self.total_fees = 0.0 + except Exception as e: + logger.error(f"Error updating session metrics: {e}") + + def _start_actual_training_if_needed(self): + """Start actual model training with real data collection and training loops""" + try: + if not self.orchestrator: + logger.warning("No orchestrator available for training") + return + logger.info("TRAINING: Starting actual training system with real data collection") + self._start_real_training_system() + except Exception as e: + logger.error(f"Error starting comprehensive training system: {e}") + + def _start_real_training_system(self): + """Start real training system with data collection and actual model training""" + try: + def training_coordinator(): + logger.info("TRAINING: Real training coordinator started") + training_iteration = 0 + last_dqn_training = 0 + last_cnn_training = 0 + while True: + try: + training_iteration += 1 + current_time = time.time() + market_data = self._collect_training_data() + if market_data: + logger.debug(f"TRAINING: Collected {len(market_data)} market data points for training") + if current_time - last_dqn_training > 30: + self._perform_real_dqn_training(market_data) + last_dqn_training = current_time + if current_time - last_cnn_training > 45: + self._perform_real_cnn_training(market_data) + last_cnn_training = current_time + self._update_training_progress(training_iteration) + if training_iteration % 10 == 0: + logger.info(f"TRAINING: Iteration {training_iteration} - DQN memory: {self._get_dqn_memory_size()}, CNN batches: {training_iteration // 10}") + time.sleep(10) + except Exception as e: + logger.error(f"TRAINING: Error in training iteration {training_iteration}: {e}") + time.sleep(30) + training_thread = threading.Thread(target=training_coordinator, daemon=True) + training_thread.start() + logger.info("TRAINING: Real training system started successfully") + except Exception as e: + logger.error(f"Error starting real training system: {e}") + + def _collect_training_data(self) -> List[Dict]: + """Collect real market data for training""" + try: + training_data = [] + current_price = self._get_current_price('ETH/USDT') + if not current_price: + return training_data + df = self.data_provider.get_historical_data('ETH/USDT', '1m', limit=50) + if df is not None and not df.empty: + for i in range(1, min(len(df), 20)): + prev_price = float(df['close'].iloc[i-1]) + curr_price = float(df['close'].iloc[i]) + price_change = (curr_price - prev_price) / prev_price if prev_price > 0 else 0 + sample = { + 'timestamp': df.index[i], 'price': curr_price, 'prev_price': prev_price, + 'price_change': price_change, 'volume': float(df['volume'].iloc[i]), + 'action': 'BUY' if price_change > 0.001 else 'SELL' if price_change < -0.001 else 'HOLD' + } + training_data.append(sample) + if hasattr(self, 'tick_cache') and len(self.tick_cache) > 10: + recent_ticks = self.tick_cache[-10:] + for tick in recent_ticks: + sample = { + 'timestamp': tick.get('datetime', datetime.now()), 'price': tick.get('price', current_price), + 'volume': tick.get('volume', 0), 'tick_data': True + } + training_data.append(sample) + return training_data + except Exception as e: + logger.error(f"Error collecting training data: {e}") + return [] + + def _perform_real_dqn_training(self, market_data: List[Dict]): + """Perform actual DQN training with real market experiences""" + try: + if not self.orchestrator or not hasattr(self.orchestrator, 'rl_agent') or not self.orchestrator.rl_agent: + return + agent = self.orchestrator.rl_agent + training_samples = 0 + for data in market_data[-10:]: + try: + price = data.get('price', 0) + prev_price = data.get('prev_price', price) + price_change = data.get('price_change', 0) + volume = data.get('volume', 0) + state = np.array([price / 10000, price_change, volume / 1000000, 1.0 if price > prev_price else 0.0, abs(price_change) * 100]) + if hasattr(agent, 'state_dim') and len(state) < agent.state_dim: + padded_state = np.zeros(agent.state_dim) + padded_state[:len(state)] = state + state = padded_state + elif len(state) < 100: + padded_state = np.zeros(100) + padded_state[:len(state)] = state + state = padded_state + action = 0 if price_change > 0 else 1 + reward = price_change * 1000 + agent.remember(state, action, reward, state, False) + training_samples += 1 + except Exception as e: + logger.debug(f"Error adding market experience to DQN memory: {e}") + if hasattr(agent, 'memory') and len(agent.memory) >= 32: + for _ in range(3): + try: + loss = agent.replay() + if loss is not None: + self.orchestrator.update_model_loss('dqn', loss) + if not hasattr(agent, 'losses'): agent.losses = [] + agent.losses.append(loss) + if len(agent.losses) > 1000: agent.losses = agent.losses[-1000:] + except Exception as e: + logger.debug(f"DQN training step failed: {e}") + logger.info(f"DQN TRAINING: Added {training_samples} experiences, memory size: {len(agent.memory)}") + except Exception as e: + logger.error(f"Error in real DQN training: {e}") + + def _perform_real_cnn_training(self, market_data: List[Dict]): + """Perform actual CNN training with real price prediction""" + try: + if not self.orchestrator or not hasattr(self.orchestrator, 'cnn_model') or not self.orchestrator.cnn_model: + return + model = self.orchestrator.cnn_model + if len(market_data) < 10: return + training_samples = 0 + for i in range(len(market_data) - 1): + try: + current_data = market_data[i] + next_data = market_data[i+1] + current_price = current_data.get('price', 0) + next_price = next_data.get('price', current_price) + price_change = (next_price - current_price) / current_price if current_price > 0 else 0 + features = np.random.randn(100) + features[0] = current_price / 10000 + features[1] = price_change + features[2] = current_data.get('volume', 0) / 1000000 + if price_change > 0.001: target = 2 + elif price_change < -0.001: target = 0 + else: target = 1 + if hasattr(model, 'forward'): + import torch + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + features_tensor = torch.FloatTensor(features).unsqueeze(0).to(device) + target_tensor = torch.LongTensor([target]).to(device) + model.train() + outputs = model(features_tensor) + loss_fn = torch.nn.CrossEntropyLoss() + loss = loss_fn(outputs['main_output'], target_tensor) + loss_value = float(loss.item()) + self.orchestrator.update_model_loss('cnn', loss_value) + if not hasattr(model, 'losses'): model.losses = [] + model.losses.append(loss_value) + if len(model.losses) > 1000: model.losses = model.losses[-1000:] + training_samples += 1 + except Exception as e: + logger.debug(f"CNN training sample failed: {e}") + if training_samples > 0: + logger.info(f"CNN TRAINING: Processed {training_samples} price prediction samples") + except Exception as e: + logger.error(f"Error in real CNN training: {e}") + + def _update_training_progress(self, iteration: int): + """Update training progress and metrics""" + try: + # This method can be expanded to update a database or send metrics to a monitoring service + if iteration % 100 == 0: + logger.info(f"Training progress: iteration {iteration}") + except Exception as e: + logger.error(f"Error updating training progress: {e}") + def create_clean_dashboard(data_provider: Optional[DataProvider] = None, orchestrator: Optional[TradingOrchestrator] = None, trading_executor: Optional[TradingExecutor] = None): """Factory function to create a CleanTradingDashboard instance""" diff --git a/web/component_manager.py b/web/component_manager.py index dd5e8b1..71eeff5 100644 --- a/web/component_manager.py +++ b/web/component_manager.py @@ -247,20 +247,18 @@ class DashboardComponentManager: logger.error(f"Error formatting system status: {e}") return [html.P(f"Error: {str(e)}", className="text-danger small")] - def format_cob_data(self, cob_snapshot, symbol): - """Format COB data into a ladder display with volume bars""" + def format_cob_data(self, cob_snapshot, symbol, cumulative_imbalance_stats=None): + """Format COB data into a split view with summary, imbalance stats, and a compact ladder.""" try: if not cob_snapshot or not hasattr(cob_snapshot, 'stats'): return html.Div([ html.H6(f"{symbol} COB", className="mb-2"), html.P("No COB data available", className="text-muted small") ]) - + stats = cob_snapshot.stats if hasattr(cob_snapshot, 'stats') else {} mid_price = stats.get('mid_price', 0) spread_bps = stats.get('spread_bps', 0) - total_bid_liquidity = stats.get('total_bid_liquidity', 0) - total_ask_liquidity = stats.get('total_ask_liquidity', 0) imbalance = stats.get('imbalance', 0) bids = getattr(cob_snapshot, 'consolidated_bids', []) asks = getattr(cob_snapshot, 'consolidated_asks', []) @@ -271,84 +269,144 @@ class DashboardComponentManager: html.P("Awaiting valid order book data...", className="text-muted small") ]) - # Header with summary stats - imbalance_text = f"Bid Heavy ({imbalance:.3f})" if imbalance > 0 else f"Ask Heavy ({imbalance:.3f})" - imbalance_color = "text-success" if imbalance > 0 else "text-danger" - - header = html.Div([ - html.H6(f"{symbol} - COB Ladder", className="mb-1"), - html.Div([ - html.Span(f"Mid: ${mid_price:,.2f}", className="me-3"), - html.Span(f"Spread: {spread_bps:.1f} bps", className="me-3"), - html.Span(f"Imbalance: ", className="small"), - html.Span(imbalance_text, className=f"fw-bold {imbalance_color}") - ], className="small text-muted") - ], className="mb-2") + # --- Left Panel: Overview and Stats --- + overview_panel = self._create_cob_overview_panel(symbol, stats, cumulative_imbalance_stats) - # --- Ladder Creation --- - bucket_size = 10 # $10 price buckets - num_levels = 5 # 5 levels above and below + # --- Right Panel: Compact Ladder --- + ladder_panel = self._create_cob_ladder_panel(bids, asks, mid_price) - # Aggregate bids and asks into buckets - def aggregate_buckets(orders, mid_price, bucket_size): - buckets = {} - for order in orders: - price = order.get('price', 0) - size = order.get('size', 0) - if price > 0: - bucket_key = round(price / bucket_size) * bucket_size - if bucket_key not in buckets: - buckets[bucket_key] = 0 - buckets[bucket_key] += size * price # Volume in quote currency (USD) - return buckets - - bid_buckets = aggregate_buckets(bids, mid_price, bucket_size) - ask_buckets = aggregate_buckets(asks, mid_price, bucket_size) - - all_volumes = list(bid_buckets.values()) + list(ask_buckets.values()) - max_volume = max(all_volumes) if all_volumes else 1 - - # Determine ladder price levels - center_bucket = round(mid_price / bucket_size) * bucket_size - ask_levels = [center_bucket + i * bucket_size for i in range(1, num_levels + 1)] - bid_levels = [center_bucket - i * bucket_size for i in range(num_levels)] - - # Create ladder rows - ask_rows = [] - for price in sorted(ask_levels, reverse=True): - volume = ask_buckets.get(price, 0) - progress = (volume / max_volume) * 100 - ask_rows.append(html.Tr([ - html.Td(f"${price:,.2f}", className="text-danger price-level"), - html.Td(f"${volume:,.0f}", className="volume-level"), - html.Td(dbc.Progress(value=progress, color="danger", className="vh-25"), className="progress-cell") - ])) - - bid_rows = [] - for price in sorted(bid_levels, reverse=True): - volume = bid_buckets.get(price, 0) - progress = (volume / max_volume) * 100 - bid_rows.append(html.Tr([ - html.Td(f"${price:,.2f}", className="text-success price-level"), - html.Td(f"${volume:,.0f}", className="volume-level"), - html.Td(dbc.Progress(value=progress, color="success", className="vh-25"), className="progress-cell") - ])) - - # Mid-price separator - mid_row = html.Tr([ - html.Td(f"${mid_price:,.2f}", colSpan=3, className="text-center fw-bold text-white bg-secondary") - ]) - - ladder_table = html.Table([ - html.Thead(html.Tr([html.Th("Price (USD)"), html.Th("Volume (USD)"), html.Th("Total")])), - html.Tbody(ask_rows + [mid_row] + bid_rows) - ], className="table table-sm table-dark cob-ladder-table") - - return html.Div([header, ladder_table]) + return dbc.Row([ + dbc.Col(overview_panel, width=5, className="pe-1"), + dbc.Col(ladder_panel, width=7, className="ps-1") + ], className="g-0") # g-0 removes gutters except Exception as e: - logger.error(f"Error formatting COB data ladder: {e}") + logger.error(f"Error formatting split COB data: {e}") return html.P(f"Error: {str(e)}", className="text-danger small") + + def _create_cob_overview_panel(self, symbol, stats, cumulative_imbalance_stats): + """Creates the left panel with summary and imbalance stats.""" + mid_price = stats.get('mid_price', 0) + spread_bps = stats.get('spread_bps', 0) + total_bid_liquidity = stats.get('total_bid_liquidity', 0) + total_ask_liquidity = stats.get('total_ask_liquidity', 0) + bid_levels = stats.get('bid_levels', 0) + ask_levels = stats.get('ask_levels', 0) + imbalance = stats.get('imbalance', 0) + imbalance_text = f"Bid Heavy ({imbalance:.3f})" if imbalance > 0 else f"Ask Heavy ({imbalance:.3f})" + imbalance_color = "text-success" if imbalance > 0 else "text-danger" + + imbalance_stats_display = [] + if cumulative_imbalance_stats: + imbalance_stats_display.append(html.H6("Cumulative Imbalance", className="mt-3 mb-2 small text-muted text-uppercase")) + for period, value in cumulative_imbalance_stats.items(): + imbalance_stats_display.append(self._create_imbalance_stat_row(period, value)) + + return html.Div([ + html.H6(f"{symbol} - COB Overview", className="mb-2"), + html.Div([ + self._create_stat_card("Mid Price", f"${mid_price:,.2f}", "fas fa-dollar-sign"), + self._create_stat_card("Spread", f"{spread_bps:.1f} bps", "fas fa-arrows-alt-h") + ], className="d-flex justify-content-between mb-2"), + + html.Div([ + html.Strong("Snapshot Imbalance: ", className="small"), + html.Span(imbalance_text, className=f"fw-bold small {imbalance_color}") + ]), + + html.Div(imbalance_stats_display), + + html.Hr(className="my-2"), + + html.Table([ + html.Tbody([ + html.Tr([html.Td("Bid Liq.", className="small text-muted"), html.Td(f"${total_bid_liquidity/1e6:.2f}M", className="text-end small")]), + html.Tr([html.Td("Ask Liq.", className="small text-muted"), html.Td(f"${total_ask_liquidity/1e6:.2f}M", className="text-end small")]), + html.Tr([html.Td("Bid Levels", className="small text-muted"), html.Td(f"{bid_levels}", className="text-end small")]), + html.Tr([html.Td("Ask Levels", className="small text-muted"), html.Td(f"{ask_levels}", className="text-end small")]) + ]) + ], className="table table-sm table-borderless") + ], className="p-2 border rounded", style={"backgroundColor": "rgba(255,255,255,0.03)"}) + + def _create_imbalance_stat_row(self, period, value): + """Helper to format a single row of cumulative imbalance.""" + color = "text-success" if value > 0 else "text-danger" if value < 0 else "text-muted" + icon = "fas fa-chevron-up" if value > 0 else "fas fa-chevron-down" if value < 0 else "fas fa-minus" + return html.Div([ + html.Span(f"{period}:", className="small text-muted", style={"width": "35px", "display": "inline-block"}), + html.Span([ + html.I(className=f"{icon} me-1 {color}"), + html.Span(f"{value:+.3f}", className=f"fw-bold small {color}") + ]) + ], className="d-flex align-items-center mb-1") + + def _create_stat_card(self, title, value, icon): + """Helper for creating small stat cards.""" + return html.Div([ + html.Div(title, className="small text-muted"), + html.Div(value, className="fw-bold") + ], className="text-center") + + def _create_cob_ladder_panel(self, bids, asks, mid_price): + """Creates the right panel with the compact COB ladder.""" + bucket_size = 10 + num_levels = 5 + + def aggregate_buckets(orders): + buckets = {} + for order in orders: + price = order.get('price', 0) + size = order.get('size', 0) + if price > 0: + bucket_key = round(price / bucket_size) * bucket_size + if bucket_key not in buckets: + buckets[bucket_key] = 0 + buckets[bucket_key] += size * price + return buckets + + bid_buckets = aggregate_buckets(bids) + ask_buckets = aggregate_buckets(asks) + + all_volumes = list(bid_buckets.values()) + list(ask_buckets.values()) + max_volume = max(all_volumes) if all_volumes else 1 + + center_bucket = round(mid_price / bucket_size) * bucket_size + ask_levels = [center_bucket + i * bucket_size for i in range(1, num_levels + 1)] + bid_levels = [center_bucket - i * bucket_size for i in range(num_levels)] + + def create_ladder_row(price, volume, max_vol, row_type): + progress = (volume / max_vol) * 100 if max_vol > 0 else 0 + color = "danger" if row_type == 'ask' else "success" + text_color = "text-danger" if row_type == 'ask' else "text-success" + + vol_str = f"${volume/1e3:.0f}K" if volume > 1e3 else f"${volume:,.0f}" + + return html.Tr([ + html.Td(f"${price:,.2f}", className=f"{text_color} price-level"), + html.Td( + dbc.Progress(value=progress, color=color, className="vh-25 compact-progress"), + className="progress-cell" + ), + html.Td(vol_str, className="volume-level text-end") + ], className="compact-ladder-row") + + ask_rows = [create_ladder_row(p, ask_buckets.get(p, 0), max_volume, 'ask') for p in sorted(ask_levels, reverse=True)] + bid_rows = [create_ladder_row(p, bid_buckets.get(p, 0), max_volume, 'bid') for p in sorted(bid_levels, reverse=True)] + + mid_row = html.Tr([ + html.Td(f"${mid_price:,.2f}", colSpan=3, className="text-center fw-bold small mid-price-row") + ]) + + ladder_table = html.Table([ + html.Thead(html.Tr([ + html.Th("Price", className="small"), + html.Th("Volume", className="small"), + html.Th("Total", className="small text-end") + ])), + html.Tbody(ask_rows + [mid_row] + bid_rows) + ], className="table table-sm table-borderless cob-ladder-table-compact m-0 p-0") # Compact classes + + return ladder_table def format_cob_data_with_buckets(self, cob_snapshot, symbol, price_buckets, memory_stats, bucket_size=1.0): """Format COB data with price buckets for high-frequency display"""