From 06fbbeb81e5f9ace61d6ba1f1a9019c4179c3d82 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 24 Jun 2025 20:07:44 +0300 Subject: [PATCH] fixes --- core/enhanced_orchestrator.py | 1325 ++++++++++++++++++++------- core/multi_exchange_cob_provider.py | 14 +- core/realtime_rl_cob_trader.py | 50 +- 3 files changed, 1054 insertions(+), 335 deletions(-) diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index 9a084b5..2d1c0ce 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -465,7 +465,7 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): cnn_hidden_features, cnn_predictions = self._get_cnn_features_for_rl(symbol) # Calculate pivot points - pivot_points = self._calculate_pivot_points_for_rl(ohlcv_data) + pivot_points = self._calculate_pivot_points_for_rl(symbol) # Analyze market microstructure market_microstructure = self._analyze_market_microstructure(raw_ticks) @@ -857,328 +857,45 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): def _extract_cnn_features(self, model, feature_matrix: np.ndarray) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: """Extract hidden features and predictions from CNN model""" try: - # This would need to be implemented based on your specific CNN architecture - # For now, return placeholder values - - # Mock hidden features (would be extracted from model's hidden layers) - hidden_features = np.random.random(512).astype(np.float32) - - # Mock predictions (would be model's output) - predictions = np.array([0.33, 0.33, 0.34, 0.7]).astype(np.float32) # BUY, SELL, HOLD, confidence - - return hidden_features, predictions + # Use actual CNN model inference instead of placeholder values + if hasattr(model, 'predict') and callable(model.predict): + # Get model prediction + prediction_result = model.predict(feature_matrix) + + # Extract predictions (action probabilities) + if isinstance(prediction_result, dict): + predictions = prediction_result.get('probabilities', np.array([0.33, 0.33, 0.34])) + confidence = prediction_result.get('confidence', 0.7) + # Ensure predictions is a flat numpy array + if isinstance(predictions, (list, tuple)): + predictions = np.array(predictions, dtype=np.float32) + predictions = np.append(predictions.flatten(), confidence) + else: + # Ensure prediction_result is a flat numpy array + if isinstance(prediction_result, (list, tuple)): + predictions = np.array(prediction_result, dtype=np.float32).flatten() + else: + predictions = np.array(prediction_result).flatten() + + # Extract hidden features if model supports it + hidden_features = None + if hasattr(model, 'get_hidden_features'): + hidden_features = model.get_hidden_features(feature_matrix) + elif hasattr(model, 'extract_features'): + hidden_features = model.extract_features(feature_matrix) + else: + # Use final layer features as approximation + hidden_features = predictions[:512] if len(predictions) >= 512 else np.pad(predictions, (0, 512-len(predictions))) + + return hidden_features, predictions + else: + logger.warning("CNN model does not have predict method") + return None, None except Exception as e: logger.warning(f"Error extracting CNN features: {e}") return None, None - def _calculate_pivot_points_for_rl(self, ohlcv_data: Dict[str, List]) -> Optional[Dict[str, Any]]: - """Calculate Williams pivot points for RL state building""" - try: - if '1m' in ohlcv_data and len(ohlcv_data['1m']) >= 50: - # Use 1m data for pivot calculation - bars = ohlcv_data['1m'] - - # Convert to numpy array - ohlc_array = np.array([ - [bar['timestamp'].timestamp() if hasattr(bar['timestamp'], 'timestamp') else time.time(), - bar['open'], bar['high'], bar['low'], bar['close'], bar['volume']] - for bar in bars[-200:] # Last 200 bars - ]) - - # Calculate pivot points using Williams structure - # This would use the WilliamsMarketStructure implementation - pivot_data = { - 'swing_highs': [], - 'swing_lows': [], - 'trend_levels': [], - 'market_bias': 'neutral' - } - - return pivot_data - - return None - - except Exception as e: - logger.warning(f"Error calculating pivot points: {e}") - return None - - def _analyze_market_microstructure(self, raw_ticks: List[Dict[str, Any]]) -> Dict[str, Any]: - """Analyze market microstructure from tick data""" - try: - if not raw_ticks or len(raw_ticks) < 10: - return {} - - # Calculate microstructure metrics - prices = [tick['price'] for tick in raw_ticks] - volumes = [tick['volume'] for tick in raw_ticks] - - # Price momentum - price_momentum = (prices[-1] - prices[0]) / prices[0] if prices[0] != 0 else 0 - - # Volume pattern - avg_volume = sum(volumes) / len(volumes) - recent_volume = sum(volumes[-10:]) / 10 if len(volumes) >= 10 else avg_volume - volume_intensity = recent_volume / avg_volume if avg_volume != 0 else 1.0 - - # Tick frequency - if len(raw_ticks) >= 2: - time_diffs = [] - for i in range(1, len(raw_ticks)): - if hasattr(raw_ticks[i]['timestamp'], 'timestamp') and hasattr(raw_ticks[i-1]['timestamp'], 'timestamp'): - diff = raw_ticks[i]['timestamp'].timestamp() - raw_ticks[i-1]['timestamp'].timestamp() - time_diffs.append(diff) - - avg_tick_interval = sum(time_diffs) / len(time_diffs) if time_diffs else 1.0 - else: - avg_tick_interval = 1.0 - - return { - 'price_momentum': price_momentum, - 'volume_intensity': volume_intensity, - 'avg_tick_interval': avg_tick_interval, - 'tick_count': len(raw_ticks), - 'price_volatility': np.std(prices) if len(prices) > 1 else 0.0 - } - - except Exception as e: - logger.warning(f"Error analyzing market microstructure: {e}") - return {} - - async def _get_enhanced_predictions_universal(self, symbol: str, market_state: MarketState, - universal_stream: UniversalDataStream) -> List[EnhancedPrediction]: - """Get enhanced predictions using universal data format""" - predictions = [] - - for model_name, model in self.model_registry.models.items(): - try: - if isinstance(model, CNNModelInterface): - # Format universal data for CNN model - cnn_data = self.universal_adapter.format_for_model(universal_stream, 'cnn') - - # Get CNN predictions for each timeframe using universal data - timeframe_predictions = [] - - # ETH timeframes (primary trading pair) - if symbol == 'ETH/USDT': - timeframe_data_map = { - '1s': cnn_data.get('eth_ticks'), - '1m': cnn_data.get('eth_1m'), - '1h': cnn_data.get('eth_1h'), - '1d': cnn_data.get('eth_1d') - } - # BTC reference - elif symbol == 'BTC/USDT': - timeframe_data_map = { - '1s': cnn_data.get('btc_ticks') - } - else: - continue - - for timeframe, feature_matrix in timeframe_data_map.items(): - if feature_matrix is not None and len(feature_matrix) > 0: - # Get timeframe-specific prediction using universal data - action_probs, confidence = await self._get_timeframe_prediction_universal( - model, feature_matrix, timeframe, market_state, universal_stream - ) - - if action_probs is not None: - action_names = ['SELL', 'HOLD', 'BUY'] - best_action_idx = np.argmax(action_probs) - best_action = action_names[best_action_idx] - - # Create timeframe prediction - tf_prediction = TimeframePrediction( - timeframe=timeframe, - action=best_action, - confidence=float(confidence), - probabilities={name: float(prob) for name, prob in zip(action_names, action_probs)}, - timestamp=datetime.now(), - market_features={ - 'volatility': market_state.volatility, - 'volume': market_state.volume, - 'trend_strength': market_state.trend_strength, - 'data_quality': universal_stream.metadata['data_quality']['overall_score'] - } - ) - timeframe_predictions.append(tf_prediction) - - if timeframe_predictions: - # Combine timeframe predictions into overall prediction - overall_action, overall_confidence = self._combine_timeframe_predictions( - timeframe_predictions, symbol - ) - - enhanced_pred = EnhancedPrediction( - symbol=symbol, - timeframe_predictions=timeframe_predictions, - overall_action=overall_action, - overall_confidence=overall_confidence, - model_name=model.name, - timestamp=datetime.now(), - metadata={ - 'market_regime': market_state.market_regime, - 'symbol_correlation': self._get_symbol_correlation(symbol), - 'universal_data_quality': universal_stream.metadata['data_quality'], - 'data_freshness': universal_stream.metadata['data_freshness'] - } - ) - predictions.append(enhanced_pred) - - except Exception as e: - logger.error(f"Error getting enhanced predictions from {model_name}: {e}") - - return predictions - - async def _get_timeframe_prediction_universal(self, model: CNNModelInterface, feature_matrix: np.ndarray, - timeframe: str, market_state: MarketState, - universal_stream: UniversalDataStream) -> Tuple[Optional[np.ndarray], float]: - """Get prediction for specific timeframe using universal data format with CNN monitoring""" - try: - # Measure prediction timing - prediction_start_time = time.time() - - # Get current price for context - current_price = market_state.prices.get(timeframe) - - # Check if model supports timeframe-specific prediction or enhanced predict method - if hasattr(model, 'predict_timeframe'): - action_probs, confidence = model.predict_timeframe(feature_matrix, timeframe) - elif hasattr(model, 'predict') and hasattr(model.predict, '__call__'): - # Enhanced CNN model with detailed output - if hasattr(model, 'enhanced_predict'): - # Get detailed prediction results - prediction_result = model.enhanced_predict(feature_matrix) - action_probs = prediction_result.get('probabilities', []) - confidence = prediction_result.get('confidence', 0.0) - else: - # Standard prediction - prediction_result = model.predict(feature_matrix) - if isinstance(prediction_result, dict): - action_probs = prediction_result.get('probabilities', []) - confidence = prediction_result.get('confidence', 0.0) - else: - action_probs, confidence = prediction_result - else: - action_probs, confidence = model.predict(feature_matrix) - - # Calculate prediction latency - prediction_latency_ms = (time.time() - prediction_start_time) * 1000 - - if action_probs is not None and confidence is not None: - # Enhance confidence based on universal data quality and market conditions - enhanced_confidence = self._enhance_confidence_with_universal_context( - confidence, timeframe, market_state, universal_stream - ) - - # Log detailed CNN prediction for monitoring - try: - # Convert probabilities to list if needed - if hasattr(action_probs, 'tolist'): - prob_list = action_probs.tolist() - elif isinstance(action_probs, (list, tuple)): - prob_list = list(action_probs) - else: - prob_list = [float(action_probs)] - - # Determine action and action confidence - if len(prob_list) >= 2: - action_idx = np.argmax(prob_list) - action_name = ['SELL', 'BUY'][action_idx] if len(prob_list) == 2 else ['SELL', 'HOLD', 'BUY'][action_idx] - action_confidence = prob_list[action_idx] - else: - action_idx = 0 - action_name = 'HOLD' - action_confidence = enhanced_confidence - - # Get model memory usage if available - model_memory_mb = None - if hasattr(model, 'get_memory_usage'): - try: - memory_info = model.get_memory_usage() - if isinstance(memory_info, dict): - model_memory_mb = memory_info.get('total_size_mb', 0.0) - else: - model_memory_mb = float(memory_info) - except: - pass - - # Create detailed prediction result for monitoring - detailed_prediction = { - 'action': action_idx, - 'action_name': action_name, - 'confidence': float(enhanced_confidence), - 'action_confidence': float(action_confidence), - 'probabilities': prob_list, - 'raw_logits': prob_list # Use probabilities as proxy for logits if not available - } - - # Add enhanced model outputs if available - if hasattr(model, 'enhanced_predict') and isinstance(prediction_result, dict): - detailed_prediction.update({ - 'regime_probabilities': prediction_result.get('regime_probabilities'), - 'volatility_prediction': prediction_result.get('volatility_prediction'), - 'extrema_prediction': prediction_result.get('extrema_prediction'), - 'risk_assessment': prediction_result.get('risk_assessment') - }) - - # Calculate price changes for context - price_change_1m = None - price_change_5m = None - volume_ratio = None - - if current_price and timeframe in market_state.prices: - # Try to get historical prices for context - try: - # Get 1m and 5m price changes if available - if '1m' in market_state.prices and market_state.prices['1m'] != current_price: - price_change_1m = (current_price - market_state.prices['1m']) / market_state.prices['1m'] - if '5m' in market_state.prices and market_state.prices['5m'] != current_price: - price_change_5m = (current_price - market_state.prices['5m']) / market_state.prices['5m'] - - # Volume ratio (current vs average) - volume_ratio = market_state.volume - except: - pass - - # Log the CNN prediction with full context - log_cnn_prediction( - model_name=getattr(model, 'name', model.__class__.__name__), - symbol=market_state.symbol, - prediction_result=detailed_prediction, - feature_matrix_shape=feature_matrix.shape, - current_price=current_price, - prediction_latency_ms=prediction_latency_ms, - model_memory_usage_mb=model_memory_mb - ) - - # Enhanced logging for detailed analysis - logger.info(f"CNN [{getattr(model, 'name', 'Unknown')}] {market_state.symbol} {timeframe}: " - f"{action_name} (conf: {enhanced_confidence:.3f}, " - f"action_conf: {action_confidence:.3f}, " - f"latency: {prediction_latency_ms:.1f}ms)") - - if detailed_prediction.get('regime_probabilities'): - regime_idx = np.argmax(detailed_prediction['regime_probabilities']) - regime_conf = detailed_prediction['regime_probabilities'][regime_idx] - logger.info(f" Regime: {regime_idx} (conf: {regime_conf:.3f})") - - if detailed_prediction.get('volatility_prediction') is not None: - logger.info(f" Volatility: {detailed_prediction['volatility_prediction']:.3f}") - - if price_change_1m is not None: - logger.info(f" Context: 1m_change: {price_change_1m:.4f}, volume_ratio: {volume_ratio:.2f}") - - except Exception as e: - logger.warning(f"Error logging CNN prediction details: {e}") - - return action_probs, enhanced_confidence - - except Exception as e: - logger.error(f"Error getting timeframe prediction for {timeframe}: {e}") - - return None, 0.0 - def _enhance_confidence_with_universal_context(self, base_confidence: float, timeframe: str, market_state: MarketState, universal_stream: UniversalDataStream) -> float: @@ -2969,10 +2686,10 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): logger.warning(f"Error calculating correlation between {symbol1} and {symbol2}: {e}") return 0.7 - def build_comprehensive_rl_state(self, symbol: str, market_state: Optional[object] = None) -> Optional[np.ndarray]: - """Build comprehensive RL state with 13,400+ features as identified in audit""" + def build_comprehensive_rl_state(self, symbol: str, market_state: Optional[object] = None, current_pnl: float = 0.0, position_info: Dict = None) -> Optional[np.ndarray]: + """Build comprehensive RL state with 13,500+ features including PnL-aware features for loss cutting optimization""" try: - logger.debug(f"Building comprehensive RL state for {symbol}") + logger.debug(f"Building PnL-aware comprehensive RL state for {symbol} (PnL: {current_pnl:.4f})") # Initialize comprehensive feature vector features = [] @@ -3026,16 +2743,23 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): else: features.extend([0.0] * 600) # Pad with zeros - # === TOTAL: 13,400 features === + # === 8. PnL-AWARE RISK MANAGEMENT FEATURES (100 features) === + pnl_features = self._get_pnl_aware_features_for_rl(symbol, current_pnl, position_info) + if pnl_features is not None and len(pnl_features) > 0: + features.extend(pnl_features[:100]) # Limit to 100 features + else: + features.extend([0.0] * 100) # Pad with zeros + + # === TOTAL: 13,500 features === # Ensure exact feature count - if len(features) > 13400: - features = features[:13400] - elif len(features) < 13400: - features.extend([0.0] * (13400 - len(features))) + if len(features) > 13500: + features = features[:13500] + elif len(features) < 13500: + features.extend([0.0] * (13500 - len(features))) state_vector = np.array(features, dtype=np.float32) - logger.info(f"[RL_STATE] Built comprehensive state for {symbol}: {len(state_vector)} features") + logger.info(f"[RL_STATE] Built PnL-aware state for {symbol}: {len(state_vector)} features (PnL: {current_pnl:.4f})") logger.debug(f"[RL_STATE] State stats: min={state_vector.min():.3f}, max={state_vector.max():.3f}, mean={state_vector.mean():.3f}") return state_vector @@ -3315,7 +3039,12 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): features.extend([0.0] * 400) # Trade flow features (400 features) - features.extend([0.0] * 400) # Placeholder for trade flow analysis + try: + trade_flow_features = self._get_trade_flow_features_for_rl(symbol) + features.extend(trade_flow_features[:400]) + except Exception as e: + logger.warning(f"Error getting trade flow features: {e}") + features.extend([0.0] * 400) return features[:800] @@ -3496,4 +3225,942 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): return base_size * confidence_multiplier except Exception as e: logger.error(f"Error calculating position size for {symbol}: {e}") - return 0.01 # Default small size \ No newline at end of file + return 0.01 # Default small size + + def _get_pnl_aware_features_for_rl(self, symbol: str, current_pnl: float, position_info: Dict = None) -> List[float]: + """ + Generate PnL-aware features for loss cutting optimization (100 features) + + These features help the RL model learn to: + 1. Cut losses early when predicting bigger drawdowns + 2. Optimize exit timing based on current PnL + 3. Avoid letting winners turn into losers + """ + try: + features = [] + + # Current position info + position_info = position_info or {} + current_price = self._get_current_price(symbol) or 0.0 + entry_price = position_info.get('entry_price', current_price) + position_side = position_info.get('side', 'FLAT') + position_duration = position_info.get('duration_seconds', 0) + + # === 1. CURRENT PnL ANALYSIS (20 features) === + + # Normalized current PnL (-1 to 1 range, clamped) + normalized_pnl = max(-1.0, min(1.0, current_pnl / 100.0)) # Assume max +/-100 for normalization + features.append(normalized_pnl) + + # PnL buckets (one-hot encoding for different PnL ranges) + pnl_buckets = [ + 1.0 if current_pnl < -50 else 0.0, # Heavy loss + 1.0 if -50 <= current_pnl < -20 else 0.0, # Moderate loss + 1.0 if -20 <= current_pnl < -5 else 0.0, # Small loss + 1.0 if -5 <= current_pnl < 5 else 0.0, # Break-even + 1.0 if 5 <= current_pnl < 20 else 0.0, # Small profit + 1.0 if 20 <= current_pnl < 50 else 0.0, # Moderate profit + 1.0 if current_pnl >= 50 else 0.0, # Large profit + ] + features.extend(pnl_buckets) + + # PnL velocity (rate of change) + pnl_velocity = self._calculate_pnl_velocity(symbol) + features.append(max(-1.0, min(1.0, pnl_velocity / 10.0))) # Normalized velocity + + # Time-weighted PnL (how long we've been in this PnL state) + time_weight = min(1.0, position_duration / 3600.0) # Hours normalized to 0-1 + time_weighted_pnl = normalized_pnl * time_weight + features.append(time_weighted_pnl) + + # PnL trend analysis (last 5 measurements) + pnl_trend = self._get_pnl_trend_features(symbol) + features.extend(pnl_trend[:10]) # 10 trend features + + # === 2. DRAWDOWN PREDICTION FEATURES (20 features) === + + # Current drawdown from peak + peak_pnl = self._get_peak_pnl_for_position(symbol) + current_drawdown = (peak_pnl - current_pnl) / max(1.0, abs(peak_pnl)) if peak_pnl != 0 else 0.0 + features.append(max(-1.0, min(1.0, current_drawdown))) + + # Predicted future drawdown based on market conditions + predicted_drawdown = self._predict_future_drawdown(symbol) + features.extend(predicted_drawdown[:10]) # 10 prediction features + + # Volatility-adjusted risk score + current_volatility = self._get_current_volatility(symbol) + risk_score = current_drawdown * current_volatility + features.append(max(-1.0, min(1.0, risk_score))) + + # Risk/reward ratio analysis + risk_reward_features = self._calculate_risk_reward_features(symbol, current_pnl) + features.extend(risk_reward_features[:8]) # 8 risk/reward features + + # === 3. POSITION DURATION FEATURES (15 features) === + + # Duration buckets (one-hot encoding) + duration_buckets = [ + 1.0 if position_duration < 60 else 0.0, # < 1 minute + 1.0 if 60 <= position_duration < 300 else 0.0, # 1-5 minutes + 1.0 if 300 <= position_duration < 900 else 0.0, # 5-15 minutes + 1.0 if 900 <= position_duration < 3600 else 0.0, # 15-60 minutes + 1.0 if 3600 <= position_duration < 14400 else 0.0, # 1-4 hours + 1.0 if position_duration >= 14400 else 0.0, # > 4 hours + ] + features.extend(duration_buckets) + + # Normalized duration + normalized_duration = min(1.0, position_duration / 14400.0) # Normalize to 4 hours + features.append(normalized_duration) + + # Duration vs PnL relationship + duration_pnl_ratio = current_pnl / max(1.0, position_duration / 60.0) # PnL per minute + features.append(max(-1.0, min(1.0, duration_pnl_ratio / 5.0))) # Normalized + + # Time decay factor (urgency to act) + time_decay = 1.0 - min(1.0, position_duration / 7200.0) # Decay over 2 hours + features.append(time_decay) + + # === 4. HISTORICAL PERFORMANCE FEATURES (20 features) === + + # Recent trade outcomes for this symbol + recent_trades = self._get_recent_trade_outcomes(symbol, limit=10) + win_rate = len([t for t in recent_trades if t > 0]) / max(1, len(recent_trades)) + avg_win = np.mean([t for t in recent_trades if t > 0]) if any(t > 0 for t in recent_trades) else 0.0 + avg_loss = np.mean([t for t in recent_trades if t < 0]) if any(t < 0 for t in recent_trades) else 0.0 + + features.extend([ + win_rate, + max(-1.0, min(1.0, avg_win / 50.0)), # Normalized average win + max(-1.0, min(1.0, avg_loss / 50.0)), # Normalized average loss + ]) + + # Profit factor and other performance metrics + profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else 1.0 + features.append(min(5.0, profit_factor) / 5.0) # Normalized profit factor + + # Historical cut-loss effectiveness + cut_loss_success = self._get_cut_loss_success_rate(symbol) + features.extend(cut_loss_success[:15]) # 15 cut-loss related features + + # === 5. MARKET REGIME FEATURES (25 features) === + + # Current market regime assessment + market_regime = self._assess_current_market_regime(symbol) + features.extend(market_regime[:25]) # 25 market regime features + + # Ensure we have exactly 100 features + if len(features) > 100: + features = features[:100] + elif len(features) < 100: + features.extend([0.0] * (100 - len(features))) + + logger.debug(f"[PnL_FEATURES] Generated {len(features)} PnL-aware features for {symbol} (PnL: {current_pnl:.4f})") + + return features + + except Exception as e: + logger.error(f"Error generating PnL-aware features for {symbol}: {e}") + return [0.0] * 100 # Return zeros on error + + def _calculate_pnl_velocity(self, symbol: str) -> float: + """Calculate PnL rate of change""" + try: + # Get recent PnL history if available + if not hasattr(self, 'pnl_history'): + self.pnl_history = {} + + if symbol not in self.pnl_history: + return 0.0 + + history = self.pnl_history[symbol] + if len(history) < 2: + return 0.0 + + # Calculate velocity as change per minute + time_diff = (history[-1]['timestamp'] - history[-2]['timestamp']).total_seconds() / 60.0 + pnl_diff = history[-1]['pnl'] - history[-2]['pnl'] + + return pnl_diff / max(1.0, time_diff) + + except Exception: + return 0.0 + + def _get_pnl_trend_features(self, symbol: str) -> List[float]: + """Get PnL trend features over recent history""" + try: + features = [] + + if not hasattr(self, 'pnl_history'): + return [0.0] * 10 + + history = self.pnl_history.get(symbol, []) + if len(history) < 5: + return [0.0] * 10 + + # Last 5 PnL values + recent_pnls = [h['pnl'] for h in history[-5:]] + + # Calculate trend features + features.append(recent_pnls[-1] - recent_pnls[0]) # Total change + features.append(np.mean(np.diff(recent_pnls))) # Average change + features.append(np.std(recent_pnls)) # Volatility + features.append(max(recent_pnls) - min(recent_pnls)) # Range + + # Trend direction indicators + increasing = sum(1 for i in range(1, len(recent_pnls)) if recent_pnls[i] > recent_pnls[i-1]) + features.append(increasing / max(1, len(recent_pnls) - 1)) + + # Remaining features as placeholders + features.extend([0.0] * (10 - len(features))) + + return features[:10] + + except Exception: + return [0.0] * 10 + + def _get_peak_pnl_for_position(self, symbol: str) -> float: + """Get peak PnL for current position""" + try: + if not hasattr(self, 'position_peak_pnl'): + self.position_peak_pnl = {} + + return self.position_peak_pnl.get(symbol, 0.0) + + except Exception: + return 0.0 + + def _predict_future_drawdown(self, symbol: str) -> List[float]: + """Predict potential future drawdown based on market conditions""" + try: + features = [] + + # Get current market volatility + volatility = self._get_current_volatility(symbol) + + # Simple heuristic predictions based on volatility + for i in range(1, 11): # 10 future periods + predicted_risk = volatility * i * 0.1 # Increasing risk over time + features.append(min(1.0, predicted_risk)) + + return features + + except Exception: + return [0.0] * 10 + + def _calculate_risk_reward_features(self, symbol: str, current_pnl: float) -> List[float]: + """Calculate risk/reward ratio features""" + try: + features = [] + + # Current risk level based on volatility + volatility = self._get_current_volatility(symbol) + risk_level = min(1.0, volatility / 0.05) # Normalize volatility + features.append(risk_level) + + # Reward potential based on current PnL + if current_pnl > 0: + reward_potential = max(0.0, 1.0 - (current_pnl / 100.0)) # Diminishing returns + else: + reward_potential = 1.0 # High potential when in loss + features.append(reward_potential) + + # Risk/reward ratio + features.append(reward_potential / max(0.1, risk_level)) + + # Remaining features as placeholders + features.extend([0.0] * (8 - len(features))) + + return features[:8] + + except Exception: + return [0.0] * 8 + + def _get_recent_trade_outcomes(self, symbol: str, limit: int = 10) -> List[float]: + """Get recent trade outcomes for this symbol""" + try: + if not hasattr(self, 'trade_history'): + self.trade_history = {} + + history = self.trade_history.get(symbol, []) + return [trade.get('pnl', 0.0) for trade in history[-limit:]] + + except Exception: + return [] + + def _get_cut_loss_success_rate(self, symbol: str) -> List[float]: + """Get cut-loss success rate features""" + try: + features = [] + + # Get trades where we cut losses early + recent_trades = self._get_recent_trade_outcomes(symbol, 20) + cut_loss_trades = [t for t in recent_trades if -20 < t < -5] # Cut losses + + if len(cut_loss_trades) > 0: + # Success rate (how often cutting losses prevented bigger losses) + success_rate = len([t for t in cut_loss_trades if t > -10]) / len(cut_loss_trades) + features.append(success_rate) + + # Average cut-loss amount + avg_cut_loss = np.mean(cut_loss_trades) + features.append(max(-1.0, avg_cut_loss / 50.0)) # Normalized + else: + features.extend([0.0, 0.0]) + + # Remaining features as placeholders + features.extend([0.0] * (15 - len(features))) + + return features[:15] + + except Exception: + return [0.0] * 15 + + def _assess_current_market_regime(self, symbol: str) -> List[float]: + """Assess current market regime for PnL optimization""" + try: + features = [] + + # Get market data + try: + df = self.data_provider.get_historical_data(symbol, '1m', limit=100) + if df is None or df.empty: + return [0.0] * 25 + + # Trend strength + trend_strength = self._calculate_trend_strength(df) + features.append(trend_strength) + + # Volatility regime + volatility = df['close'].pct_change().std() + features.append(min(1.0, volatility / 0.05)) + + # Volume regime + volume_ratio = df['volume'].iloc[-1] / df['volume'].rolling(20).mean().iloc[-1] + features.append(min(2.0, volume_ratio) / 2.0) + + except Exception: + features.extend([0.0, 0.0, 0.0]) + + # Remaining features as placeholders + features.extend([0.0] * (25 - len(features))) + + return features[:25] + + except Exception: + return [0.0] * 25 + + def _calculate_trend_strength(self, df: pd.DataFrame) -> float: + """Calculate trend strength from price data""" + try: + if len(df) < 20: + return 0.0 + + # Calculate trend using moving averages + short_ma = df['close'].rolling(5).mean() + long_ma = df['close'].rolling(20).mean() + + # Trend strength based on MA separation + ma_diff = (short_ma.iloc[-1] - long_ma.iloc[-1]) / long_ma.iloc[-1] + return max(-1.0, min(1.0, ma_diff * 10)) # Normalized + + except Exception: + return 0.0 + + def _get_current_volatility(self, symbol: str) -> float: + """Get current market volatility""" + try: + df = self.data_provider.get_historical_data(symbol, '1m', limit=20) + if df is None or df.empty: + return 0.02 # Default volatility + + return df['close'].pct_change().std() + + except Exception: + return 0.02 + + def _get_current_price(self, symbol: str) -> Optional[float]: + """Get current price for the symbol""" + try: + # Try to get from data provider + latest_data = self.data_provider.get_latest_data(symbol) + if latest_data: + return latest_data.get('close') + + # Fallback to historical data + df = self.data_provider.get_historical_data(symbol, '1m', limit=1) + if df is not None and not df.empty: + return df['close'].iloc[-1] + + return None + + except Exception: + return None + + def _get_trade_flow_features_for_rl(self, symbol: str, window_seconds: int = 300) -> List[float]: + """ + Generate comprehensive trade flow features for RL models (400 features) + + Analyzes actual trade execution patterns, order flow direction, + institutional activity, and market microstructure to help the RL model + understand market dynamics at tick level. + """ + try: + features = [] + + # Get recent trade data + recent_trades = self._get_recent_trade_data_for_flow_analysis(symbol, window_seconds) + + if not recent_trades: + return [0.0] * 400 # Return zeros if no trade data + + # === 1. ORDER FLOW DIRECTION ANALYSIS (50 features) === + + # Aggressive buy/sell ratio in different time windows + windows = [10, 30, 60, 120, 300] # seconds + for window in windows: + window_trades = [t for t in recent_trades if t['timestamp'] > (datetime.now() - timedelta(seconds=window))] + if window_trades: + aggressive_buys = sum(1 for t in window_trades if t.get('side') == 'buy' and t.get('is_aggressive', True)) + aggressive_sells = sum(1 for t in window_trades if t.get('side') == 'sell' and t.get('is_aggressive', True)) + total_aggressive = aggressive_buys + aggressive_sells + + if total_aggressive > 0: + buy_ratio = aggressive_buys / total_aggressive + sell_ratio = aggressive_sells / total_aggressive + features.extend([buy_ratio, sell_ratio]) + else: + features.extend([0.5, 0.5]) # Neutral + else: + features.extend([0.5, 0.5]) # Neutral + + # === 2. VOLUME FLOW ANALYSIS (80 features) === + + # Volume-weighted order flow in different time buckets + total_buy_volume = sum(t['volume_usd'] for t in recent_trades if t.get('side') == 'buy') + total_sell_volume = sum(t['volume_usd'] for t in recent_trades if t.get('side') == 'sell') + total_volume = total_buy_volume + total_sell_volume + + if total_volume > 0: + volume_buy_ratio = total_buy_volume / total_volume + volume_sell_ratio = total_sell_volume / total_volume + volume_imbalance = (total_buy_volume - total_sell_volume) / total_volume + else: + volume_buy_ratio = volume_sell_ratio = 0.5 + volume_imbalance = 0.0 + + features.extend([volume_buy_ratio, volume_sell_ratio, volume_imbalance]) + + # Volume distribution by trade size buckets + volume_buckets = {'small': 0, 'medium': 0, 'large': 0, 'whale': 0} + for trade in recent_trades: + volume_usd = trade.get('volume_usd', 0) + if volume_usd < 1000: + volume_buckets['small'] += volume_usd + elif volume_usd < 10000: + volume_buckets['medium'] += volume_usd + elif volume_usd < 100000: + volume_buckets['large'] += volume_usd + else: + volume_buckets['whale'] += volume_usd + + # Normalize volume buckets + if total_volume > 0: + bucket_ratios = [volume_buckets[k] / total_volume for k in ['small', 'medium', 'large', 'whale']] + else: + bucket_ratios = [0.25, 0.25, 0.25, 0.25] + features.extend(bucket_ratios) + + # Volume acceleration (rate of change) + if len(recent_trades) >= 10: + first_half = recent_trades[:len(recent_trades)//2] + second_half = recent_trades[len(recent_trades)//2:] + + first_half_volume = sum(t['volume_usd'] for t in first_half) + second_half_volume = sum(t['volume_usd'] for t in second_half) + + if first_half_volume > 0: + volume_acceleration = (second_half_volume - first_half_volume) / first_half_volume + else: + volume_acceleration = 0.0 + + features.append(max(-1.0, min(1.0, volume_acceleration))) + else: + features.append(0.0) + + # Pad remaining volume features + features.extend([0.0] * (80 - len(features) + 3)) # Adjust for current features + + # === 3. TRADE SIZE PATTERN ANALYSIS (70 features) === + + # Average trade sizes by side + buy_trades = [t for t in recent_trades if t.get('side') == 'buy'] + sell_trades = [t for t in recent_trades if t.get('side') == 'sell'] + + avg_buy_size = np.mean([t['volume_usd'] for t in buy_trades]) if buy_trades else 0.0 + avg_sell_size = np.mean([t['volume_usd'] for t in sell_trades]) if sell_trades else 0.0 + + # Normalize trade sizes + max_size = max(avg_buy_size, avg_sell_size, 1.0) + features.extend([avg_buy_size / max_size, avg_sell_size / max_size]) + + # Trade size distribution + trade_sizes = [t['volume_usd'] for t in recent_trades] + if trade_sizes: + size_percentiles = np.percentile(trade_sizes, [10, 25, 50, 75, 90]) + normalized_percentiles = [p / max_size for p in size_percentiles] + features.extend(normalized_percentiles) + else: + features.extend([0.0] * 5) + + # Large trade detection (institutional activity) + large_trade_threshold = 50000 # $50k+ trades + large_trades = [t for t in recent_trades if t['volume_usd'] >= large_trade_threshold] + large_trade_ratio = len(large_trades) / max(1, len(recent_trades)) + features.append(large_trade_ratio) + + # Large trade direction bias + if large_trades: + large_buy_trades = [t for t in large_trades if t.get('side') == 'buy'] + large_trade_buy_ratio = len(large_buy_trades) / len(large_trades) + else: + large_trade_buy_ratio = 0.5 + features.append(large_trade_buy_ratio) + + # Pad remaining trade size features + features.extend([0.0] * (70 - (len(features) - 80 - 3))) + + # === 4. TIMING AND FREQUENCY ANALYSIS (100 features) === + + # Trade frequency analysis + if len(recent_trades) >= 2: + timestamps = [t['timestamp'] for t in recent_trades] + time_diffs = [(timestamps[i] - timestamps[i-1]).total_seconds() + for i in range(1, len(timestamps))] + + if time_diffs: + avg_time_between_trades = np.mean(time_diffs) + trade_frequency = 1.0 / max(0.1, avg_time_between_trades) # Trades per second + trade_frequency_normalized = min(1.0, trade_frequency / 10.0) # Normalize to 0-1 + else: + trade_frequency_normalized = 0.0 + else: + trade_frequency_normalized = 0.0 + + features.append(trade_frequency_normalized) + + # Trade clustering analysis (bursts of activity) + if len(recent_trades) >= 5: + trade_times = [(t['timestamp'] - recent_trades[0]['timestamp']).total_seconds() + for t in recent_trades] + + # Find clusters of trades within 5-second windows + clusters = [] + current_cluster = [] + + for i, time_diff in enumerate(trade_times): + if not current_cluster or time_diff - trade_times[current_cluster[-1]] <= 5.0: + current_cluster.append(i) + else: + if len(current_cluster) >= 3: # Minimum cluster size + clusters.append(current_cluster) + current_cluster = [i] + + if len(current_cluster) >= 3: + clusters.append(current_cluster) + + # Cluster features + cluster_count = len(clusters) + if clusters: + avg_cluster_size = np.mean([len(c) for c in clusters]) + max_cluster_size = max([len(c) for c in clusters]) + else: + avg_cluster_size = max_cluster_size = 0.0 + + features.extend([ + min(1.0, cluster_count / 10.0), # Normalized cluster count + min(1.0, avg_cluster_size / 20.0), # Normalized average cluster size + min(1.0, max_cluster_size / 50.0) # Normalized max cluster size + ]) + else: + features.extend([0.0, 0.0, 0.0]) + + # Pad remaining timing features + features.extend([0.0] * (100 - 4)) + + # === 5. MARKET IMPACT AND SLIPPAGE ANALYSIS (100 features) === + + # Price impact analysis + price_impacts = [] + for i, trade in enumerate(recent_trades[1:], 1): + prev_trade = recent_trades[i-1] + if 'price' in trade and 'price' in prev_trade and prev_trade['price'] > 0: + price_change = (trade['price'] - prev_trade['price']) / prev_trade['price'] + + # Adjust impact by trade size and side + trade_side_multiplier = 1 if trade.get('side') == 'buy' else -1 + size_weight = min(1.0, trade['volume_usd'] / 10000.0) # Weight by size + + impact = price_change * trade_side_multiplier * size_weight + price_impacts.append(impact) + + if price_impacts: + avg_impact = np.mean(price_impacts) + max_impact = np.max(np.abs(price_impacts)) + impact_volatility = np.std(price_impacts) if len(price_impacts) > 1 else 0.0 + + features.extend([ + max(-1.0, min(1.0, avg_impact * 1000)), # Scaled average impact + min(1.0, max_impact * 1000), # Scaled max impact + min(1.0, impact_volatility * 1000) # Scaled impact volatility + ]) + else: + features.extend([0.0, 0.0, 0.0]) + + # Pad remaining market impact features + features.extend([0.0] * (100 - 3)) + + # Ensure we have exactly 400 features + while len(features) < 400: + features.append(0.0) + + return features[:400] + + except Exception as e: + logger.error(f"Error generating trade flow features for {symbol}: {e}") + return [0.0] * 400 + + def _get_recent_trade_data_for_flow_analysis(self, symbol: str, window_seconds: int = 300) -> List[Dict]: + """Get recent trade data for flow analysis""" + try: + # Try to get from data provider or COB integration + if hasattr(self.data_provider, 'get_recent_trades'): + return self.data_provider.get_recent_trades(symbol, window_seconds) + + # Fallback: try to get from COB integration + if hasattr(self, 'cob_integration') and self.cob_integration: + if hasattr(self.cob_integration, 'get_recent_trades'): + return self.cob_integration.get_recent_trades(symbol, window_seconds) + + # Last resort: generate synthetic trade data for testing + # This should be replaced with actual trade data in production + return self._generate_synthetic_trade_data(symbol, window_seconds) + + except Exception as e: + logger.error(f"Error getting recent trade data for {symbol}: {e}") + return [] + + def _generate_synthetic_trade_data(self, symbol: str, window_seconds: int) -> List[Dict]: + """Generate synthetic trade data for testing (should be replaced with real data)""" + try: + import random + + current_price = self._get_current_price(symbol) or 2500.0 + trades = [] + + # Generate some synthetic trades + base_time = datetime.now() - timedelta(seconds=window_seconds) + + for i in range(random.randint(50, 200)): # Random number of trades + timestamp = base_time + timedelta(seconds=i * (window_seconds / 100)) + + # Random price movement + price_change = random.uniform(-0.001, 0.001) # ±0.1% max + price = current_price * (1 + price_change) + + # Random trade details + side = random.choice(['buy', 'sell']) + volume = random.uniform(0.001, 1.0) # Random volume + volume_usd = price * volume + + trades.append({ + 'timestamp': timestamp, + 'price': price, + 'volume': volume, + 'volume_usd': volume_usd, + 'side': side, + 'is_aggressive': random.choice([True, False]) + }) + + return sorted(trades, key=lambda x: x['timestamp']) + + except Exception as e: + logger.error(f"Error generating synthetic trade data: {e}") + return [] + + def _analyze_market_microstructure(self, raw_ticks: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze market microstructure from raw tick data + + Returns comprehensive microstructure analysis including: + - Bid-ask spread patterns + - Order book pressure + - Trade clustering + - Volume profile analysis + """ + try: + if not raw_ticks: + return { + 'spread_analysis': {'avg_spread_bps': 0.0, 'spread_volatility': 0.0}, + 'order_book_pressure': {'bid_pressure': 0.0, 'ask_pressure': 0.0}, + 'trade_clustering': {'cluster_count': 0, 'avg_cluster_size': 0.0}, + 'volume_profile': {'total_volume': 0.0, 'volume_imbalance': 0.0}, + 'market_regime': 'unknown' + } + + # === SPREAD ANALYSIS === + spreads = [] + for tick in raw_ticks: + if 'bid' in tick and 'ask' in tick and tick['bid'] > 0 and tick['ask'] > 0: + spread_bps = ((tick['ask'] - tick['bid']) / tick['bid']) * 10000 + spreads.append(spread_bps) + + if spreads: + avg_spread_bps = np.mean(spreads) + spread_volatility = np.std(spreads) if len(spreads) > 1 else 0.0 + else: + avg_spread_bps = spread_volatility = 0.0 + + # === ORDER BOOK PRESSURE === + bid_volumes = [] + ask_volumes = [] + + for tick in raw_ticks: + if 'bid_volume' in tick: + bid_volumes.append(tick['bid_volume']) + if 'ask_volume' in tick: + ask_volumes.append(tick['ask_volume']) + + if bid_volumes and ask_volumes: + total_bid_volume = sum(bid_volumes) + total_ask_volume = sum(ask_volumes) + total_volume = total_bid_volume + total_ask_volume + + if total_volume > 0: + bid_pressure = total_bid_volume / total_volume + ask_pressure = total_ask_volume / total_volume + else: + bid_pressure = ask_pressure = 0.5 + else: + bid_pressure = ask_pressure = 0.5 + + # === TRADE CLUSTERING === + # Analyze clustering of price movements + price_changes = [] + for i in range(1, len(raw_ticks)): + if 'price' in raw_ticks[i] and 'price' in raw_ticks[i-1]: + if raw_ticks[i-1]['price'] > 0: + change = (raw_ticks[i]['price'] - raw_ticks[i-1]['price']) / raw_ticks[i-1]['price'] + price_changes.append(change) + + # Simple clustering based on consecutive movements in same direction + clusters = [] + current_cluster = [] + current_direction = None + + for change in price_changes: + direction = 'up' if change > 0 else 'down' if change < 0 else 'flat' + + if direction == current_direction or current_direction is None: + current_cluster.append(change) + current_direction = direction + else: + if len(current_cluster) >= 2: # Minimum cluster size + clusters.append(current_cluster) + current_cluster = [change] + current_direction = direction + + if len(current_cluster) >= 2: + clusters.append(current_cluster) + + cluster_count = len(clusters) + avg_cluster_size = np.mean([len(c) for c in clusters]) if clusters else 0.0 + + # === VOLUME PROFILE === + total_volume = sum(tick.get('volume', 0) for tick in raw_ticks) + + # Calculate volume imbalance (more detailed analysis) + buy_volume = sum(tick.get('volume', 0) for tick in raw_ticks + if tick.get('side') == 'buy' or tick.get('price', 0) > tick.get('prev_price', 0)) + sell_volume = total_volume - buy_volume + + if total_volume > 0: + volume_imbalance = (buy_volume - sell_volume) / total_volume + else: + volume_imbalance = 0.0 + + # === MARKET REGIME DETECTION === + if len(price_changes) > 10: + price_volatility = np.std(price_changes) + price_trend = np.mean(price_changes) + + if abs(price_trend) > 2 * price_volatility: + market_regime = 'trending' + elif price_volatility > 0.001: # High volatility threshold + market_regime = 'volatile' + else: + market_regime = 'ranging' + else: + market_regime = 'unknown' + + return { + 'spread_analysis': { + 'avg_spread_bps': avg_spread_bps, + 'spread_volatility': spread_volatility + }, + 'order_book_pressure': { + 'bid_pressure': bid_pressure, + 'ask_pressure': ask_pressure + }, + 'trade_clustering': { + 'cluster_count': cluster_count, + 'avg_cluster_size': avg_cluster_size + }, + 'volume_profile': { + 'total_volume': total_volume, + 'volume_imbalance': volume_imbalance + }, + 'market_regime': market_regime + } + + except Exception as e: + logger.error(f"Error analyzing market microstructure: {e}") + return { + 'spread_analysis': {'avg_spread_bps': 0.0, 'spread_volatility': 0.0}, + 'order_book_pressure': {'bid_pressure': 0.0, 'ask_pressure': 0.0}, + 'trade_clustering': {'cluster_count': 0, 'avg_cluster_size': 0.0}, + 'volume_profile': {'total_volume': 0.0, 'volume_imbalance': 0.0}, + 'market_regime': 'unknown' + } + + def _calculate_pivot_points_for_rl(self, symbol: str) -> Optional[Dict[str, Any]]: + """ + Calculate pivot points for RL feature enhancement + + Returns pivot point analysis including support/resistance levels, + pivot strength, and market structure context for the RL model. + """ + try: + # Get recent price data for pivot calculation + if hasattr(self.data_provider, 'get_recent_ohlcv'): + recent_data = self.data_provider.get_recent_ohlcv(symbol, '1h', 50) + else: + # Fallback to basic price data + return self._get_basic_pivot_analysis(symbol) + + if not recent_data or len(recent_data) < 3: + return None + + # Convert to DataFrame for easier analysis + import pandas as pd + df = pd.DataFrame(recent_data) + + # Calculate standard pivot points (yesterday's H, L, C) + if len(df) >= 2: + prev_high = df['high'].iloc[-2] + prev_low = df['low'].iloc[-2] + prev_close = df['close'].iloc[-2] + + # Standard pivot calculations + pivot = (prev_high + prev_low + prev_close) / 3 + r1 = (2 * pivot) - prev_low # Resistance 1 + s1 = (2 * pivot) - prev_high # Support 1 + r2 = pivot + (prev_high - prev_low) # Resistance 2 + s2 = pivot - (prev_high - prev_low) # Support 2 + + # Current price context + current_price = df['close'].iloc[-1] + + # Calculate pivot strength and position + price_to_pivot_ratio = current_price / pivot if pivot > 0 else 1.0 + + # Determine current market structure + if current_price > r1: + market_bias = 'bullish' + nearest_level = r2 + level_type = 'resistance' + elif current_price < s1: + market_bias = 'bearish' + nearest_level = s2 + level_type = 'support' + else: + market_bias = 'neutral' + if current_price > pivot: + nearest_level = r1 + level_type = 'resistance' + else: + nearest_level = s1 + level_type = 'support' + + # Calculate distance to nearest level + distance_to_level = abs(current_price - nearest_level) / current_price if current_price > 0 else 0.0 + + # Volume-weighted pivot strength + volume_strength = 1.0 + if 'volume' in df.columns: + recent_volume = df['volume'].tail(5).mean() + historical_volume = df['volume'].mean() + volume_strength = min(2.0, recent_volume / max(1.0, historical_volume)) + + return { + 'pivot_point': pivot, + 'resistance_1': r1, + 'resistance_2': r2, + 'support_1': s1, + 'support_2': s2, + 'current_price': current_price, + 'market_bias': market_bias, + 'nearest_level': nearest_level, + 'level_type': level_type, + 'distance_to_level': distance_to_level, + 'price_to_pivot_ratio': price_to_pivot_ratio, + 'volume_strength': volume_strength, + 'pivot_strength': min(1.0, volume_strength * (1.0 - distance_to_level)) + } + + else: + return self._get_basic_pivot_analysis(symbol) + + except Exception as e: + logger.error(f"Error calculating pivot points for {symbol}: {e}") + return self._get_basic_pivot_analysis(symbol) + + def _get_basic_pivot_analysis(self, symbol: str) -> Dict[str, Any]: + """Fallback basic pivot analysis when detailed data is unavailable""" + try: + current_price = self._get_current_price(symbol) or 2500.0 + + # Create basic pivot structure + return { + 'pivot_point': current_price, + 'resistance_1': current_price * 1.01, + 'resistance_2': current_price * 1.02, + 'support_1': current_price * 0.99, + 'support_2': current_price * 0.98, + 'current_price': current_price, + 'market_bias': 'neutral', + 'nearest_level': current_price * 1.01, + 'level_type': 'resistance', + 'distance_to_level': 0.01, + 'price_to_pivot_ratio': 1.0, + 'volume_strength': 1.0, + 'pivot_strength': 0.5 + } + except Exception as e: + logger.error(f"Error in basic pivot analysis for {symbol}: {e}") + return { + 'pivot_point': 2500.0, + 'resistance_1': 2525.0, + 'resistance_2': 2550.0, + 'support_1': 2475.0, + 'support_2': 2450.0, + 'current_price': 2500.0, + 'market_bias': 'neutral', + 'nearest_level': 2525.0, + 'level_type': 'resistance', + 'distance_to_level': 0.01, + 'price_to_pivot_ratio': 1.0, + 'volume_strength': 1.0, + 'pivot_strength': 0.5 + } \ No newline at end of file diff --git a/core/multi_exchange_cob_provider.py b/core/multi_exchange_cob_provider.py index e30708a..623b230 100644 --- a/core/multi_exchange_cob_provider.py +++ b/core/multi_exchange_cob_provider.py @@ -23,7 +23,11 @@ import asyncio import json import logging import time -import websockets +try: + import websockets +except ImportError: + # Fallback for environments where websockets is not available + websockets = None import numpy as np import pandas as pd from datetime import datetime, timedelta @@ -106,7 +110,7 @@ class MultiExchangeCOBProvider: to create a consolidated view of market liquidity and pricing. """ - def __init__(self, symbols: List[str] = None, bucket_size_bps: float = 1.0): + def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0): """ Initialize Multi-Exchange COB Provider @@ -461,6 +465,8 @@ class MultiExchangeCOBProvider: ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms" logger.info(f"Connecting to Binance WebSocket: {ws_url}") + if websockets is None: + raise ImportError("websockets module not available") async with websockets.connect(ws_url) as websocket: self.exchange_order_books[symbol]['binance']['connected'] = True logger.info(f"Connected to Binance order book stream for {symbol}") @@ -537,7 +543,7 @@ class MultiExchangeCOBProvider: except Exception as e: logger.error(f"Error adding trade to SVP: {e}") - def get_session_volume_profile(self, symbol: str, bucket_size: float = None) -> Dict: + def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict: """Get session volume profile for a symbol""" try: if bucket_size is None: @@ -690,6 +696,8 @@ class MultiExchangeCOBProvider: ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade" logger.info(f"Connecting to Binance trade stream: {ws_url}") + if websockets is None: + raise ImportError("websockets module not available") async with websockets.connect(ws_url) as websocket: logger.info(f"Connected to Binance trade stream for {symbol}") diff --git a/core/realtime_rl_cob_trader.py b/core/realtime_rl_cob_trader.py index 913016e..a8b521a 100644 --- a/core/realtime_rl_cob_trader.py +++ b/core/realtime_rl_cob_trader.py @@ -301,6 +301,13 @@ class RealtimeRLCOBTrader: 'last_inference_time': None } + # PnL tracking for loss cutting optimization + self.pnl_history: Dict[str, deque] = { + symbol: deque(maxlen=1000) for symbol in self.symbols + } + self.position_peak_pnl: Dict[str, float] = {symbol: 0.0 for symbol in self.symbols} + self.trade_history: Dict[str, List] = {symbol: [] for symbol in self.symbols} + # Threading self.running = False self.inference_lock = Lock() @@ -961,8 +968,10 @@ class RealtimeRLCOBTrader: actual_direction: int, confidence: float, predicted_change: float, - actual_change: float) -> float: - """Calculate reward for a prediction""" + actual_change: float, + current_pnl: float = 0.0, + position_duration: float = 0.0) -> float: + """Calculate reward for a prediction with PnL-aware loss cutting optimization""" try: # Base reward for correct direction if predicted_direction == actual_direction: @@ -983,7 +992,42 @@ class RealtimeRLCOBTrader: if base_reward < 0 and confidence > 0.8: confidence_scaled_reward *= 1.5 # Increase penalty - return float(confidence_scaled_reward) + # === PnL-AWARE LOSS CUTTING REWARDS === + + pnl_reward = 0.0 + + # Reward cutting losses early (SIDEWAYS when losing) + if current_pnl < -10.0: # In significant loss + if predicted_direction == 1: # SIDEWAYS (exit signal) + # Reward cutting losses before they get worse + loss_cutting_bonus = min(1.0, abs(current_pnl) / 100.0) * confidence + pnl_reward += loss_cutting_bonus + elif predicted_direction != 1: # Continuing to trade while in loss + # Penalty for not cutting losses + pnl_reward -= 0.5 * confidence + + # Reward protecting profits (SIDEWAYS when in profit and market turning) + elif current_pnl > 10.0: # In profit + if predicted_direction == 1 and base_reward > 0: # Correct SIDEWAYS prediction + # Reward protecting profits from reversal + profit_protection_bonus = min(0.5, current_pnl / 200.0) * confidence + pnl_reward += profit_protection_bonus + + # Duration penalty for holding losing positions + if current_pnl < 0 and position_duration > 3600: # Losing for > 1 hour + duration_penalty = min(1.0, position_duration / 7200.0) * 0.3 # Up to 30% penalty + confidence_scaled_reward -= duration_penalty + + # Severe penalty for letting small losses become big losses + if current_pnl < -50.0: # Large loss + drawdown_penalty = min(2.0, abs(current_pnl) / 100.0) * confidence + confidence_scaled_reward -= drawdown_penalty + + # Total reward + total_reward = confidence_scaled_reward + pnl_reward + + # Clamp final reward + return max(-5.0, min(5.0, float(total_reward))) except Exception as e: logger.error(f"Error calculating reward: {e}")