From 8023dae18fc61695274472732dea2fb8e9eb1917 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 15 Jul 2025 11:12:30 +0300 Subject: [PATCH] wip --- core/orchestrator.py | 90 ++++++++++++++++++++++++++++++++++++++---- web/clean_dashboard.py | 2 +- 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/core/orchestrator.py b/core/orchestrator.py index 06cb0ac..e3e2e55 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -136,6 +136,11 @@ class TradingOrchestrator: self.recent_decisions: Dict[str, List[TradingDecision]] = {} # {symbol: List[TradingDecision]} self.model_performance: Dict[str, Dict[str, Any]] = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}} + # Signal accumulation for trend confirmation + self.signal_accumulator: Dict[str, List[Dict]] = {} # {symbol: List[signal_data]} + self.required_confirmations = 3 # Number of consistent signals needed + self.signal_timeout_seconds = 30 # Signals expire after 30 seconds + # Model prediction tracking for dashboard visualization self.recent_dqn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent DQN predictions self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions @@ -146,6 +151,7 @@ class TradingOrchestrator: self.recent_dqn_predictions[symbol] = deque(maxlen=100) self.recent_cnn_predictions[symbol] = deque(maxlen=50) self.prediction_accuracy_history[symbol] = deque(maxlen=200) + self.signal_accumulator[symbol] = [] # Decision callbacks self.decision_callbacks: List[Any] = [] @@ -1331,15 +1337,43 @@ class TradingOrchestrator: current_position_pnl, symbol ) - # EXECUTE EVERY SIGNAL: Remove confidence thresholds and signal accumulation - # The decision model has already aggregated all model outputs (CNN, DQN, transformer, etc.) - # So we trust its decision and execute every signal - reasoning['execute_every_signal'] = True + # SIGNAL CONFIRMATION: Only execute signals that meet confirmation criteria + # Apply confidence thresholds and signal accumulation for trend confirmation + reasoning['execute_every_signal'] = False reasoning['models_aggregated'] = [pred.model_name for pred in predictions] reasoning['aggregated_confidence'] = best_confidence - logger.info(f"EXECUTING EVERY SIGNAL: {best_action} (confidence: {best_confidence:.3f}) " - f"from aggregated models: {reasoning['models_aggregated']}") + # Apply confidence thresholds for signal confirmation + if best_action != 'HOLD': + if best_confidence < self.confidence_threshold: + logger.debug(f"Signal below confidence threshold: {best_action} {symbol} " + f"(confidence: {best_confidence:.3f} < {self.confidence_threshold})") + best_action = 'HOLD' + best_confidence = 0.0 + reasoning['rejected_reason'] = 'low_confidence' + else: + # Add signal to accumulator for trend confirmation + signal_data = { + 'action': best_action, + 'confidence': best_confidence, + 'timestamp': timestamp, + 'models': reasoning['models_aggregated'] + } + + # Check if we have enough confirmations + confirmed_action = self._check_signal_confirmation(symbol, signal_data) + if confirmed_action: + logger.info(f"SIGNAL CONFIRMED: {confirmed_action} (confidence: {best_confidence:.3f}) " + f"from aggregated models: {reasoning['models_aggregated']}") + best_action = confirmed_action + reasoning['signal_confirmed'] = True + reasoning['confirmations_received'] = len(self.signal_accumulator[symbol]) + else: + logger.debug(f"Signal accumulating: {best_action} {symbol} " + f"({len(self.signal_accumulator[symbol])}/{self.required_confirmations} confirmations)") + best_action = 'HOLD' + best_confidence = 0.0 + reasoning['rejected_reason'] = 'awaiting_confirmation' # Add P&L-based decision adjustment best_action, best_confidence = self._apply_pnl_feedback( @@ -1923,4 +1957,46 @@ class TradingOrchestrator: def set_trading_executor(self, trading_executor): """Set the trading executor for position tracking""" self.trading_executor = trading_executor - logger.info("Trading executor set for position tracking and P&L feedback") \ No newline at end of file + logger.info("Trading executor set for position tracking and P&L feedback") + + def _check_signal_confirmation(self, symbol: str, signal_data: Dict) -> Optional[str]: + """Check if we have enough signal confirmations for trend confirmation""" + try: + # Clean up expired signals + current_time = signal_data['timestamp'] + self.signal_accumulator[symbol] = [ + s for s in self.signal_accumulator[symbol] + if (current_time - s['timestamp']).total_seconds() < self.signal_timeout_seconds + ] + + # Add new signal + self.signal_accumulator[symbol].append(signal_data) + + # Check if we have enough confirmations + if len(self.signal_accumulator[symbol]) < self.required_confirmations: + return None + + # Check if recent signals are consistent + recent_signals = self.signal_accumulator[symbol][-self.required_confirmations:] + actions = [s['action'] for s in recent_signals] + + # Count action consensus + action_counts = {} + for action in actions: + action_counts[action] = action_counts.get(action, 0) + 1 + + # Find dominant action + dominant_action = max(action_counts, key=action_counts.get) + consensus_count = action_counts[dominant_action] + + # Require at least 2/3 consensus + if consensus_count >= max(2, self.required_confirmations * 0.67): + # Clear accumulator after confirmation + self.signal_accumulator[symbol] = [] + return dominant_action + + return None + + except Exception as e: + logger.error(f"Error checking signal confirmation for {symbol}: {e}") + return None \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 4e77e2a..731ae5b 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -5836,7 +5836,7 @@ class CleanTradingDashboard: logger.info(f"[ORCHESTRATOR SIGNAL] Received: {action} for {symbol} (confidence: {confidence:.3f})") # EXECUTE THE DECISION THROUGH TRADING EXECUTOR - if self.trading_executor: # Execute every signal + if self.trading_executor and confidence > 0.5: # Only execute confirmed signals try: logger.info(f"[ORCHESTRATOR EXECUTION] Attempting to execute {action} for {symbol} via trading executor...") success = self.trading_executor.execute_signal(