diff --git a/config.yaml b/config.yaml index d3de456..f251d11 100644 --- a/config.yaml +++ b/config.yaml @@ -81,9 +81,9 @@ orchestrator: # Model weights for decision combination cnn_weight: 0.7 # Weight for CNN predictions rl_weight: 0.3 # Weight for RL decisions - confidence_threshold: 0.05 # Very low threshold for training and simulation - confidence_threshold_close: 0.05 # Very low threshold for easier exits - decision_frequency: 30 # Seconds between decisions (faster) + confidence_threshold: 0.15 + confidence_threshold_close: 0.08 + decision_frequency: 30 # Multi-symbol coordination symbol_correlation_matrix: @@ -100,6 +100,11 @@ orchestrator: failure_penalty: 5 # Penalty for wrong predictions confidence_scaling: true # Scale rewards by confidence + # Entry aggressiveness: 0.0 = very conservative (fewer, higher quality trades), 1.0 = very aggressive (more trades) + entry_aggressiveness: 0.5 + # Exit aggressiveness: 0.0 = very conservative (let profits run), 1.0 = very aggressive (quick exits) + exit_aggressiveness: 0.5 + # Training Configuration training: learning_rate: 0.001 diff --git a/core/cob_integration.py b/core/cob_integration.py index f8db1a7..54e5129 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -85,8 +85,14 @@ class COBIntegration: self.cob_provider.subscribe_to_cob_updates(self._on_cob_update) self.cob_provider.subscribe_to_bucket_updates(self._on_bucket_update) - # Start COB provider - await self.cob_provider.start_streaming() + # Start COB provider streaming + try: + logger.info("Starting COB provider streaming...") + await self.cob_provider.start_streaming() + except Exception as e: + logger.error(f"Error starting COB provider streaming: {e}") + # Start a background task instead + asyncio.create_task(self._start_cob_provider_background()) # Start analysis threads asyncio.create_task(self._continuous_cob_analysis()) @@ -94,6 +100,14 @@ class COBIntegration: logger.info("COB Integration started successfully") + async def _start_cob_provider_background(self): + """Start COB provider in background task""" + try: + logger.info("Starting COB provider in background...") + await self.cob_provider.start_streaming() + except Exception as e: + logger.error(f"Error in background COB provider: {e}") + async def stop(self): """Stop COB integration""" logger.info("Stopping COB Integration") diff --git a/core/orchestrator.py b/core/orchestrator.py index 0cb2eb5..6960752 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -67,6 +67,10 @@ class TradingDecision: timestamp: datetime reasoning: Dict[str, Any] # Why this decision was made memory_usage: Dict[str, int] # Memory usage of models + # NEW: Aggressiveness parameters + entry_aggressiveness: float = 0.5 # 0.0 = conservative, 1.0 = very aggressive + exit_aggressiveness: float = 0.5 # 0.0 = conservative, 1.0 = very aggressive + current_position_pnl: float = 0.0 # Current open position P&L for RL feedback class TradingOrchestrator: """ @@ -90,6 +94,14 @@ class TradingOrchestrator: self.decision_frequency = self.config.orchestrator.get('decision_frequency', 30) self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols + # NEW: Aggressiveness parameters + self.entry_aggressiveness = self.config.orchestrator.get('entry_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive + self.exit_aggressiveness = self.config.orchestrator.get('exit_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive + + # Position tracking for P&L feedback + self.current_positions: Dict[str, Dict] = {} # {symbol: {side, size, entry_price, entry_time, pnl}} + self.trading_executor = None # Will be set by dashboard or external system + # Dynamic weights (will be adapted based on performance) self.model_weights: Dict[str, float] = {} # {model_name: weight} self._initialize_default_weights() @@ -1483,7 +1495,7 @@ class TradingOrchestrator: def _combine_predictions(self, symbol: str, price: float, predictions: List[Prediction], timestamp: datetime) -> TradingDecision: - """Combine all predictions into a final decision""" + """Combine all predictions into a final decision with aggressiveness and P&L feedback""" try: reasoning = { 'predictions': len(predictions), @@ -1491,6 +1503,9 @@ class TradingOrchestrator: 'models_used': [pred.model_name for pred in predictions] } + # Get current position P&L for feedback + current_position_pnl = self._get_current_position_pnl(symbol, price) + # Initialize action scores action_scores = {'BUY': 0.0, 'SELL': 0.0, 'HOLD': 0.0} total_weight = 0.0 @@ -1516,10 +1531,35 @@ class TradingOrchestrator: best_action = max(action_scores, key=action_scores.get) best_confidence = action_scores[best_action] - # Apply confidence threshold - if best_confidence < self.confidence_threshold: - best_action = 'HOLD' - reasoning['threshold_applied'] = True + # Calculate aggressiveness-adjusted thresholds + entry_threshold, exit_threshold = self._calculate_aggressiveness_thresholds( + current_position_pnl, symbol + ) + + # Apply aggressiveness-based confidence thresholds + if best_action in ['BUY', 'SELL']: + # For entry signals, use entry aggressiveness + if not self._has_open_position(symbol): + if best_confidence < entry_threshold: + best_action = 'HOLD' + reasoning['entry_threshold_applied'] = True + reasoning['entry_threshold'] = entry_threshold + # For exit signals, use exit aggressiveness + else: + if best_confidence < exit_threshold: + best_action = 'HOLD' + reasoning['exit_threshold_applied'] = True + reasoning['exit_threshold'] = exit_threshold + else: + # Standard threshold for HOLD + if best_confidence < self.confidence_threshold: + best_action = 'HOLD' + reasoning['threshold_applied'] = True + + # Add P&L-based decision adjustment + best_action, best_confidence = self._apply_pnl_feedback( + best_action, best_confidence, current_position_pnl, symbol, reasoning + ) # Get memory usage stats try: @@ -1527,6 +1567,10 @@ class TradingOrchestrator: except Exception: memory_usage = {} + # Calculate dynamic aggressiveness based on recent performance + entry_aggressiveness = self._calculate_dynamic_entry_aggressiveness(symbol) + exit_aggressiveness = self._calculate_dynamic_exit_aggressiveness(symbol, current_position_pnl) + # Create final decision decision = TradingDecision( action=best_action, @@ -1535,12 +1579,15 @@ class TradingOrchestrator: price=price, timestamp=timestamp, reasoning=reasoning, - memory_usage=memory_usage.get('models', {}) if memory_usage else {} + memory_usage=memory_usage.get('models', {}) if memory_usage else {}, + entry_aggressiveness=entry_aggressiveness, + exit_aggressiveness=exit_aggressiveness, + current_position_pnl=current_position_pnl ) - logger.info(f"Decision for {symbol}: {best_action} (confidence: {best_confidence:.3f})") - if memory_usage and 'total_used_mb' in memory_usage: - logger.debug(f"Memory usage: {memory_usage['total_used_mb']:.1f}MB / {memory_usage['total_limit_mb']:.1f}MB") + logger.info(f"Decision for {symbol}: {best_action} (confidence: {best_confidence:.3f}, " + f"entry_agg: {entry_aggressiveness:.2f}, exit_agg: {exit_aggressiveness:.2f}, " + f"pnl: ${current_position_pnl:.2f})") return decision @@ -1554,7 +1601,10 @@ class TradingOrchestrator: price=price, timestamp=timestamp, reasoning={'error': str(e)}, - memory_usage={} + memory_usage={}, + entry_aggressiveness=0.5, + exit_aggressiveness=0.5, + current_position_pnl=0.0 ) def _get_timeframe_weight(self, timeframe: str) -> float: @@ -1918,12 +1968,75 @@ class TradingOrchestrator: logger.warning(f"Microstructure features fallback: {e}") comprehensive_features.extend([0.0] * 100) - # Final validation - now includes COB features (13,400 + 400 = 13,800) + # === NEW: P&L FEEDBACK AND AGGRESSIVENESS FEATURES (50) === + try: + current_price = self._get_current_price(symbol) or 3500.0 + current_pnl = self._get_current_position_pnl(symbol, current_price) + + # P&L feedback features (25) + pnl_features = [ + current_pnl, # Current P&L + max(-1.0, min(1.0, current_pnl / 100.0)), # Normalized P&L (-1 to 1) + 1.0 if current_pnl > 0 else 0.0, # Is profitable + 1.0 if current_pnl < -10.0 else 0.0, # Is losing significantly + 1.0 if current_pnl > 20.0 else 0.0, # Is winning significantly + 1.0 if self._has_open_position(symbol) else 0.0, # Has open position + ] + + # Recent performance features (10) + recent_decisions = self.get_recent_decisions(symbol, limit=10) + if recent_decisions: + win_rate = sum(1 for d in recent_decisions if d.reasoning.get('was_profitable', False)) / len(recent_decisions) + avg_confidence = sum(d.confidence for d in recent_decisions) / len(recent_decisions) + recent_pnl_changes = [d.current_position_pnl for d in recent_decisions if hasattr(d, 'current_position_pnl')] + avg_recent_pnl = sum(recent_pnl_changes) / len(recent_pnl_changes) if recent_pnl_changes else 0.0 + else: + win_rate = 0.5 + avg_confidence = 0.5 + avg_recent_pnl = 0.0 + + pnl_features.extend([ + win_rate, + avg_confidence, + max(-1.0, min(1.0, avg_recent_pnl / 50.0)), # Normalized recent P&L + len(recent_decisions) / 10.0, # Decision frequency + ]) + + # Aggressiveness features (15) + entry_agg = getattr(self, 'entry_aggressiveness', 0.5) + exit_agg = getattr(self, 'exit_aggressiveness', 0.5) + + aggressiveness_features = [ + entry_agg, + exit_agg, + entry_agg * 2.0 - 1.0, # Scaled entry aggressiveness (-1 to 1) + exit_agg * 2.0 - 1.0, # Scaled exit aggressiveness (-1 to 1) + entry_agg * exit_agg, # Combined aggressiveness + abs(entry_agg - exit_agg), # Aggressiveness difference + 1.0 if entry_agg > 0.7 else 0.0, # Is very aggressive entry + 1.0 if exit_agg > 0.7 else 0.0, # Is very aggressive exit + 1.0 if entry_agg < 0.3 else 0.0, # Is very conservative entry + 1.0 if exit_agg < 0.3 else 0.0, # Is very conservative exit + ] + + # Pad to 50 features total + all_feedback_features = pnl_features + aggressiveness_features + while len(all_feedback_features) < 50: + all_feedback_features.append(0.0) + + comprehensive_features.extend(all_feedback_features[:50]) + logger.debug("P&L feedback and aggressiveness features: 50 added") + + except Exception as e: + logger.warning(f"P&L feedback features fallback: {e}") + comprehensive_features.extend([0.0] * 50) + + # Final validation - now includes P&L feedback (13,400 + 400 + 50 = 13,850) total_features = len(comprehensive_features) - expected_features = 13800 # Updated to include 400 COB features + expected_features = 13850 # Updated to include P&L feedback features if total_features >= expected_features - 100: # Allow small tolerance - # logger.info(f"TRAINING: Comprehensive RL state built successfully: {total_features} features (including COB)") + # logger.info(f"TRAINING: Comprehensive RL state built successfully: {total_features} features (including P&L feedback)") return comprehensive_features else: logger.warning(f"⚠️ Comprehensive RL state incomplete: {total_features} features (expected {expected_features}+)") @@ -2651,4 +2764,145 @@ class TradingOrchestrator: return None except Exception as e: logger.error(f"Error getting universal data for {model_type}: {e}") - return None \ No newline at end of file + return None + + def _get_current_position_pnl(self, symbol: str, current_price: float) -> float: + """Get current position P&L for the symbol""" + try: + if self.trading_executor and hasattr(self.trading_executor, 'get_current_position'): + position = self.trading_executor.get_current_position(symbol) + if position: + entry_price = position.get('price', 0) + size = position.get('size', 0) + side = position.get('side', 'LONG') + + if entry_price and size > 0: + if side.upper() == 'LONG': + pnl = (current_price - entry_price) * size + else: # SHORT + pnl = (entry_price - current_price) * size + return pnl + return 0.0 + except Exception as e: + logger.debug(f"Error getting position P&L for {symbol}: {e}") + return 0.0 + + def _has_open_position(self, symbol: str) -> bool: + """Check if there's an open position for the symbol""" + try: + if self.trading_executor and hasattr(self.trading_executor, 'get_current_position'): + position = self.trading_executor.get_current_position(symbol) + return position is not None and position.get('size', 0) > 0 + return False + except Exception: + return False + + def _calculate_aggressiveness_thresholds(self, current_pnl: float, symbol: str) -> tuple: + """Calculate confidence thresholds based on aggressiveness settings""" + # Base thresholds + base_entry_threshold = self.confidence_threshold + base_exit_threshold = self.confidence_threshold_close + + # Get aggressiveness settings (could be from config or adaptive) + entry_agg = getattr(self, 'entry_aggressiveness', 0.5) + exit_agg = getattr(self, 'exit_aggressiveness', 0.5) + + # Adjust thresholds based on aggressiveness + # More aggressive = lower threshold (more trades) + # Less aggressive = higher threshold (fewer, higher quality trades) + entry_threshold = base_entry_threshold * (1.5 - entry_agg) # 0.5 agg = 1.0x, 1.0 agg = 0.5x + exit_threshold = base_exit_threshold * (1.5 - exit_agg) + + # Ensure minimum thresholds + entry_threshold = max(0.05, entry_threshold) + exit_threshold = max(0.02, exit_threshold) + + return entry_threshold, exit_threshold + + def _apply_pnl_feedback(self, action: str, confidence: float, current_pnl: float, + symbol: str, reasoning: dict) -> tuple: + """Apply P&L-based feedback to decision making""" + try: + # If we have a losing position, be more aggressive about cutting losses + if current_pnl < -10.0: # Losing more than $10 + if action == 'SELL' and self._has_open_position(symbol): + # Boost confidence for exit signals when losing + confidence = min(1.0, confidence * 1.2) + reasoning['pnl_loss_cut_boost'] = True + elif action == 'BUY': + # Reduce confidence for new entries when losing + confidence *= 0.8 + reasoning['pnl_loss_entry_reduction'] = True + + # If we have a winning position, be more conservative about exits + elif current_pnl > 5.0: # Winning more than $5 + if action == 'SELL' and self._has_open_position(symbol): + # Reduce confidence for exit signals when winning (let profits run) + confidence *= 0.9 + reasoning['pnl_profit_hold'] = True + elif action == 'BUY': + # Slightly boost confidence for entries when on a winning streak + confidence = min(1.0, confidence * 1.05) + reasoning['pnl_winning_streak_boost'] = True + + reasoning['current_pnl'] = current_pnl + return action, confidence + + except Exception as e: + logger.debug(f"Error applying P&L feedback: {e}") + return action, confidence + + def _calculate_dynamic_entry_aggressiveness(self, symbol: str) -> float: + """Calculate dynamic entry aggressiveness based on recent performance""" + try: + # Start with base aggressiveness + base_agg = getattr(self, 'entry_aggressiveness', 0.5) + + # Get recent decisions for this symbol + recent_decisions = self.get_recent_decisions(symbol, limit=10) + if len(recent_decisions) < 3: + return base_agg + + # Calculate win rate + winning_decisions = sum(1 for d in recent_decisions + if d.reasoning.get('was_profitable', False)) + win_rate = winning_decisions / len(recent_decisions) + + # Adjust aggressiveness based on performance + if win_rate > 0.7: # High win rate - be more aggressive + return min(1.0, base_agg + 0.2) + elif win_rate < 0.3: # Low win rate - be more conservative + return max(0.1, base_agg - 0.2) + else: + return base_agg + + except Exception as e: + logger.debug(f"Error calculating dynamic entry aggressiveness: {e}") + return 0.5 + + def _calculate_dynamic_exit_aggressiveness(self, symbol: str, current_pnl: float) -> float: + """Calculate dynamic exit aggressiveness based on P&L and market conditions""" + try: + # Start with base aggressiveness + base_agg = getattr(self, 'exit_aggressiveness', 0.5) + + # Adjust based on current P&L + if current_pnl < -20.0: # Large loss - be very aggressive about cutting + return min(1.0, base_agg + 0.3) + elif current_pnl < -5.0: # Small loss - be more aggressive + return min(1.0, base_agg + 0.1) + elif current_pnl > 20.0: # Large profit - be less aggressive (let it run) + return max(0.1, base_agg - 0.2) + elif current_pnl > 5.0: # Small profit - slightly less aggressive + return max(0.2, base_agg - 0.1) + else: + return base_agg + + except Exception as e: + logger.debug(f"Error calculating dynamic exit aggressiveness: {e}") + return 0.5 + + def set_trading_executor(self, trading_executor): + """Set the trading executor for position tracking""" + self.trading_executor = trading_executor + logger.info("Trading executor set for position tracking and P&L feedback") \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index ba6b6cc..aa4b94a 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -88,6 +88,10 @@ class CleanTradingDashboard: def __init__(self, data_provider: Optional[DataProvider] = None, orchestrator: Optional[Any] = None, trading_executor: Optional[TradingExecutor] = None): self.config = get_config() + # Initialize update batch counter to reduce flickering + self.update_batch_counter = 0 + self.update_batch_interval = 3 # Update less critical elements every 3 intervals + # Initialize components self.data_provider = data_provider or DataProvider() self.trading_executor = trading_executor or TradingExecutor() @@ -432,6 +436,11 @@ class CleanTradingDashboard: def update_recent_decisions(n): """Update recent trading signals - FILTER OUT HOLD signals and highlight COB signals""" try: + # Update less frequently to reduce flickering + self.update_batch_counter += 1 + if self.update_batch_counter % self.update_batch_interval != 0: + raise PreventUpdate + # Filter out HOLD signals before displaying filtered_decisions = [] for decision in self.recent_decisions: @@ -445,6 +454,8 @@ class CleanTradingDashboard: logger.debug(f"COB signals active: {len(cob_signals)} recent COB signals") return self.component_manager.format_trading_signals(filtered_decisions) + except PreventUpdate: + raise except Exception as e: logger.error(f"Error updating decisions: {e}") return [html.P(f"Error: {str(e)}", className="text-danger")] @@ -493,6 +504,10 @@ class CleanTradingDashboard: def update_cob_data(n): """Update COB data displays with real order book ladders and cumulative stats""" try: + # Update less frequently to reduce flickering + if n % self.update_batch_interval != 0: + raise PreventUpdate + eth_snapshot = self._get_cob_snapshot('ETH/USDT') btc_snapshot = self._get_cob_snapshot('BTC/USDT') @@ -504,6 +519,8 @@ class CleanTradingDashboard: return eth_components, btc_components + except PreventUpdate: + raise except Exception as e: logger.error(f"Error updating COB data: {e}") error_msg = html.P(f"COB Error: {str(e)}", className="text-danger small") @@ -516,8 +533,14 @@ class CleanTradingDashboard: def update_training_metrics(n): """Update training metrics""" try: + # Update less frequently to reduce flickering + if n % self.update_batch_interval != 0: + raise PreventUpdate + metrics_data = self._get_training_metrics() return self.component_manager.format_training_metrics(metrics_data) + except PreventUpdate: + raise except Exception as e: logger.error(f"Error updating training metrics: {e}") return [html.P(f"Error: {str(e)}", className="text-danger")] @@ -1883,18 +1906,30 @@ class CleanTradingDashboard: # PERFORMANCE FIX: Use orchestrator's COB integration instead of separate dashboard integration # This eliminates redundant COB providers and improves performance if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration: + # First try to get snapshot from orchestrator's COB integration snapshot = self.orchestrator.cob_integration.get_cob_snapshot(symbol) if snapshot: logger.debug(f"COB snapshot available for {symbol} from orchestrator COB integration") return snapshot - else: - logger.debug(f"No COB snapshot available for {symbol} from orchestrator COB integration") - return None + + # If no snapshot, try to get from orchestrator's cached data + if hasattr(self.orchestrator, 'latest_cob_data') and symbol in self.orchestrator.latest_cob_data: + cob_data = self.orchestrator.latest_cob_data[symbol] + logger.debug(f"COB snapshot available for {symbol} from orchestrator cached data") + + # Create a simple snapshot object from the cached data + class COBSnapshot: + def __init__(self, data): + self.consolidated_bids = data.get('bids', []) + self.consolidated_asks = data.get('asks', []) + self.stats = data.get('stats', {}) + + return COBSnapshot(cob_data) # Fallback: Use cached COB data if orchestrator integration not available - elif symbol in self.latest_cob_data: + if symbol in self.latest_cob_data and self.latest_cob_data[symbol]: cob_data = self.latest_cob_data[symbol] - logger.debug(f"COB snapshot available for {symbol} from cached data (fallback)") + logger.debug(f"COB snapshot available for {symbol} from dashboard cached data (fallback)") # Create a simple snapshot object from the cached data class COBSnapshot: @@ -1902,11 +1937,18 @@ class CleanTradingDashboard: self.consolidated_bids = data.get('bids', []) self.consolidated_asks = data.get('asks', []) self.stats = data.get('stats', {}) + # Add direct attributes for new format compatibility + self.volume_weighted_mid = data['stats'].get('mid_price', 0) + self.spread_bps = data['stats'].get('spread_bps', 0) + self.liquidity_imbalance = data['stats'].get('imbalance', 0) + self.total_bid_liquidity = data['stats'].get('total_bid_liquidity', 0) + self.total_ask_liquidity = data['stats'].get('total_ask_liquidity', 0) + self.exchanges_active = data['stats'].get('exchanges_active', []) return COBSnapshot(cob_data) - else: - logger.debug(f"No COB snapshot available for {symbol} - no orchestrator integration or cached data") - return None + + logger.debug(f"No COB snapshot available for {symbol} - no orchestrator integration or cached data") + return None except Exception as e: logger.warning(f"Error getting COB snapshot for {symbol}: {e}") @@ -4154,11 +4196,11 @@ class CleanTradingDashboard: self.training_system = None def _initialize_cob_integration(self): - """Initialize simple COB integration that works without async event loops""" + """Initialize COB integration using orchestrator's COB system""" try: - logger.debug("Initializing simple COB integration for model feeding") + logger.info("Initializing COB integration via orchestrator") - # Initialize COB data storage + # Initialize COB data storage (for fallback) self.cob_data_history = { 'ETH/USDT': [], 'BTC/USDT': [] @@ -4171,15 +4213,45 @@ class CleanTradingDashboard: 'ETH/USDT': None, 'BTC/USDT': None } + self.latest_cob_data = { + 'ETH/USDT': None, + 'BTC/USDT': None + } - # Start simple COB data collection + # Check if orchestrator has COB integration + if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration: + logger.info("Using orchestrator's COB integration") + + # Start orchestrator's COB integration in background + def start_orchestrator_cob(): + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.orchestrator.start_cob_integration()) + except Exception as e: + logger.error(f"Error starting orchestrator COB integration: {e}") + + import threading + cob_thread = threading.Thread(target=start_orchestrator_cob, daemon=True) + cob_thread.start() + + logger.info("Orchestrator COB integration started successfully") + + else: + logger.warning("Orchestrator COB integration not available, using fallback simple collection") + # Fallback to simple collection + self._start_simple_cob_collection() + + # ALWAYS start simple collection as backup even if orchestrator COB exists + # This ensures we have data flowing while orchestrator COB integration starts up + logger.info("Starting simple COB collection as backup/fallback") self._start_simple_cob_collection() - - logger.debug("Simple COB integration initialized successfully") - + except Exception as e: logger.error(f"Error initializing COB integration: {e}") - self.cob_integration = None + # Fallback to simple collection + self._start_simple_cob_collection() def _start_simple_cob_collection(self): """Start simple COB data collection using REST APIs (no async required)"""