From 1d09b3778eb721c163ebc467c047cc7b6e98ff28 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 24 Jun 2025 20:25:46 +0300 Subject: [PATCH] bom to CNN --- core/enhanced_orchestrator.py | 493 +++++++++++++++++++++++++++++++++- 1 file changed, 491 insertions(+), 2 deletions(-) diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index 2d1c0ce..0a78c72 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -704,7 +704,7 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): return df def _get_cnn_features_for_rl(self, symbol: str) -> Tuple[Optional[Dict[str, np.ndarray]], Optional[Dict[str, np.ndarray]]]: - """Get CNN hidden features and predictions for RL state building""" + """Get CNN hidden features and predictions for RL state building with BOM matrix integration""" try: # Try to get CNN features from model registry if hasattr(self, 'model_registry') and self.model_registry: @@ -723,9 +723,24 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): window_size=50 ) + # Get BOM (Book of Market) matrix data + bom_matrix = self._get_bom_matrix_for_cnn(symbol) + if feature_matrix is not None: + # Enhance feature matrix with BOM data if available + if bom_matrix is not None: + enhanced_matrix = self._combine_market_and_bom_features( + feature_matrix, bom_matrix, symbol + ) + logger.debug(f"Enhanced CNN features with BOM matrix for {symbol}: " + f"market_shape={feature_matrix.shape}, bom_shape={bom_matrix.shape}, " + f"combined_shape={enhanced_matrix.shape}") + else: + enhanced_matrix = feature_matrix + logger.debug(f"Using market features only for CNN {symbol}: shape={feature_matrix.shape}") + # Extract hidden features and predictions - model_hidden, model_pred = self._extract_cnn_features(model, feature_matrix) + model_hidden, model_pred = self._extract_cnn_features(model, enhanced_matrix) if model_hidden is not None: hidden_features[model_name] = model_hidden if model_pred is not None: @@ -741,6 +756,480 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): except Exception as e: logger.warning(f"Error getting CNN features for {symbol}: {e}") return None, None + + def _get_bom_matrix_for_cnn(self, symbol: str) -> Optional[np.ndarray]: + """ + Generate BOM (Book of Market) matrix for CNN input + + BOM Matrix contains: + - Order book depth (20 levels bid/ask) + - Volume profile distribution + - Order flow intensity patterns + - Market microstructure signals + - Exchange-specific liquidity data + + Returns: + np.ndarray: BOM matrix of shape (sequence_length, bom_features) + where bom_features typically = 120 features + """ + try: + sequence_length = 50 # Match standard CNN sequence length + bom_features = [] + + # === 1. CONSOLIDATED ORDER BOOK DATA === + cob_features = self._get_cob_bom_features(symbol) + if cob_features: + bom_features.extend(cob_features) # ~40 features + else: + bom_features.extend([0.0] * 40) + + # === 2. VOLUME PROFILE FEATURES === + volume_profile_features = self._get_volume_profile_bom_features(symbol) + if volume_profile_features: + bom_features.extend(volume_profile_features) # ~30 features + else: + bom_features.extend([0.0] * 30) + + # === 3. ORDER FLOW INTENSITY === + flow_intensity_features = self._get_flow_intensity_bom_features(symbol) + if flow_intensity_features: + bom_features.extend(flow_intensity_features) # ~25 features + else: + bom_features.extend([0.0] * 25) + + # === 4. MARKET MICROSTRUCTURE SIGNALS === + microstructure_features = self._get_microstructure_bom_features(symbol) + if microstructure_features: + bom_features.extend(microstructure_features) # ~25 features + else: + bom_features.extend([0.0] * 25) + + # Pad or trim to exactly 120 features + if len(bom_features) > 120: + bom_features = bom_features[:120] + elif len(bom_features) < 120: + bom_features.extend([0.0] * (120 - len(bom_features))) + + # Create time series matrix by repeating features across sequence + # In real implementation, you might want historical BOM data + bom_matrix = np.tile(bom_features, (sequence_length, 1)) + + # Add temporal dynamics (simulate order book changes over time) + bom_matrix = self._add_temporal_dynamics_to_bom(bom_matrix, symbol) + + logger.debug(f"Generated BOM matrix for {symbol}: shape={bom_matrix.shape}") + return bom_matrix.astype(np.float32) + + except Exception as e: + logger.warning(f"Error generating BOM matrix for {symbol}: {e}") + return None + + def _get_cob_bom_features(self, symbol: str) -> Optional[List[float]]: + """Extract COB features for BOM matrix (40 features)""" + try: + features = [] + + if hasattr(self, 'cob_integration') and self.cob_integration: + cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol) + if cob_snapshot: + # Top 10 bid levels (price offset + volume) + for i in range(10): + if i < len(cob_snapshot.consolidated_bids): + level = cob_snapshot.consolidated_bids[i] + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + volume_normalized = level.total_volume_usd / 1000000 # Normalize to millions + features.extend([price_offset, volume_normalized]) + else: + features.extend([0.0, 0.0]) + + # Top 10 ask levels (price offset + volume) + for i in range(10): + if i < len(cob_snapshot.consolidated_asks): + level = cob_snapshot.consolidated_asks[i] + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + volume_normalized = level.total_volume_usd / 1000000 + features.extend([price_offset, volume_normalized]) + else: + features.extend([0.0, 0.0]) + + return features[:40] # Ensure exactly 40 features + + return None + except Exception as e: + logger.warning(f"Error getting COB BOM features for {symbol}: {e}") + return None + + def _get_volume_profile_bom_features(self, symbol: str) -> Optional[List[float]]: + """Extract volume profile features for BOM matrix (30 features)""" + try: + features = [] + + if hasattr(self, 'cob_integration') and self.cob_integration: + # Get session volume profile + volume_profile = self.cob_integration.get_session_volume_profile(symbol) + if volume_profile and 'data' in volume_profile: + svp_data = volume_profile['data'] + + # Sort by volume and get top 10 levels + top_levels = sorted(svp_data, key=lambda x: x['total_volume'], reverse=True)[:10] + + for level in top_levels: + features.extend([ + level.get('buy_percent', 50.0) / 100.0, # Normalize to 0-1 + level.get('sell_percent', 50.0) / 100.0, + level.get('total_volume', 0.0) / 1000000 # Normalize to millions + ]) + + # Pad to 30 features (10 levels * 3 features) + while len(features) < 30: + features.extend([0.5, 0.5, 0.0]) # Neutral buy/sell, zero volume + + return features[:30] + + return None + except Exception as e: + logger.warning(f"Error getting volume profile BOM features for {symbol}: {e}") + return None + + def _get_flow_intensity_bom_features(self, symbol: str) -> Optional[List[float]]: + """Extract order flow intensity features for BOM matrix (25 features)""" + try: + # Get recent trade flow data for analysis + trade_flow_data = self._get_recent_trade_data_for_flow_analysis(symbol, 300) + + if not trade_flow_data: + return [0.0] * 25 + + features = [] + + # === AGGRESSIVE ORDER FLOW ANALYSIS === + aggressive_buys = [t for t in trade_flow_data if t.get('aggressive_side') == 'buy'] + aggressive_sells = [t for t in trade_flow_data if t.get('aggressive_side') == 'sell'] + + total_trades = len(trade_flow_data) + if total_trades > 0: + features.extend([ + len(aggressive_buys) / total_trades, # Aggressive buy ratio + len(aggressive_sells) / total_trades, # Aggressive sell ratio + sum(t.get('volume', 0) for t in aggressive_buys) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1), + sum(t.get('volume', 0) for t in aggressive_sells) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1), + np.mean([t.get('size', 0) for t in aggressive_buys]) if aggressive_buys else 0.0, + np.mean([t.get('size', 0) for t in aggressive_sells]) if aggressive_sells else 0.0 + ]) + else: + features.extend([0.0] * 6) + + # === BLOCK TRADE DETECTION === + large_trades = [t for t in trade_flow_data if t.get('volume', 0) > 10000] # >$10k trades + if trade_flow_data: + features.extend([ + len(large_trades) / len(trade_flow_data), + sum(t.get('volume', 0) for t in large_trades) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1), + np.mean([t.get('volume', 0) for t in large_trades]) if large_trades else 0.0 + ]) + else: + features.extend([0.0] * 3) + + # === FLOW VELOCITY METRICS === + if len(trade_flow_data) > 1: + time_deltas = [] + for i in range(1, len(trade_flow_data)): + time_delta = (trade_flow_data[i]['timestamp'] - trade_flow_data[i-1]['timestamp']).total_seconds() + time_deltas.append(time_delta) + + features.extend([ + np.mean(time_deltas) if time_deltas else 1.0, # Average time between trades + np.std(time_deltas) if len(time_deltas) > 1 else 0.0, # Time volatility + min(time_deltas) if time_deltas else 1.0, # Fastest execution + len(trade_flow_data) / 300.0 # Trade rate per second + ]) + else: + features.extend([1.0, 0.0, 1.0, 0.0]) + + # === PRICE IMPACT ANALYSIS === + price_changes = [] + for trade in trade_flow_data: + if 'price_before' in trade and 'price_after' in trade: + price_impact = abs(trade['price_after'] - trade['price_before']) / trade['price_before'] + price_changes.append(price_impact) + + if price_changes: + features.extend([ + np.mean(price_changes), + np.max(price_changes), + np.std(price_changes) + ]) + else: + features.extend([0.0, 0.0, 0.0]) + + # === MOMENTUM INDICATORS === + if len(trade_flow_data) >= 10: + recent_volume = sum(t.get('volume', 0) for t in trade_flow_data[-10:]) + earlier_volume = sum(t.get('volume', 0) for t in trade_flow_data[:-10]) + momentum = recent_volume / max(earlier_volume, 1) if earlier_volume > 0 else 1.0 + + recent_aggressive_ratio = len([t for t in trade_flow_data[-10:] if t.get('aggressive_side') == 'buy']) / 10 + earlier_aggressive_ratio = len([t for t in trade_flow_data[:-10] if t.get('aggressive_side') == 'buy']) / max(len(trade_flow_data) - 10, 1) + + features.extend([ + momentum, + recent_aggressive_ratio - earlier_aggressive_ratio, + recent_aggressive_ratio + ]) + else: + features.extend([1.0, 0.0, 0.5]) + + # === INSTITUTIONAL ACTIVITY INDICATORS === + # Detect iceberg orders and large hidden liquidity + volume_spikes = [t for t in trade_flow_data if t.get('volume', 0) > np.mean([x.get('volume', 0) for x in trade_flow_data]) * 3] + uniform_sizes = len([t for t in trade_flow_data if t.get('size', 0) in [0.1, 0.01, 1.0, 10.0]]) # Common algo sizes + + features.extend([ + len(volume_spikes) / max(len(trade_flow_data), 1), + uniform_sizes / max(len(trade_flow_data), 1), + np.std([t.get('size', 0) for t in trade_flow_data]) if trade_flow_data else 0.0 + ]) + + # Ensure exactly 25 features + while len(features) < 25: + features.append(0.0) + + return features[:25] + + except Exception as e: + logger.warning(f"Error getting flow intensity BOM features for {symbol}: {e}") + return [0.0] * 25 + + def _get_microstructure_bom_features(self, symbol: str) -> Optional[List[float]]: + """Extract market microstructure features for BOM matrix (25 features)""" + try: + features = [] + + # === SPREAD DYNAMICS === + if hasattr(self, 'cob_integration') and self.cob_integration: + cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol) + if cob_snapshot: + features.extend([ + cob_snapshot.spread_bps / 100.0, # Normalize spread + cob_snapshot.liquidity_imbalance, # Already normalized -1 to 1 + len(cob_snapshot.exchanges_active) / 5.0, # Normalize to max 5 exchanges + cob_snapshot.total_bid_liquidity / 1000000.0, # Normalize to millions + cob_snapshot.total_ask_liquidity / 1000000.0 + ]) + else: + features.extend([0.0] * 5) + else: + features.extend([0.0] * 5) + + # === MARKET DEPTH ANALYSIS === + recent_trades = self._get_recent_trade_data_for_flow_analysis(symbol, 60) # Last 1 minute + if recent_trades: + trade_sizes = [t.get('size', 0) for t in recent_trades] + trade_volumes = [t.get('volume', 0) for t in recent_trades] + + features.extend([ + np.mean(trade_sizes) if trade_sizes else 0.0, + np.median(trade_sizes) if trade_sizes else 0.0, + np.std(trade_sizes) if len(trade_sizes) > 1 else 0.0, + np.mean(trade_volumes) / 1000.0 if trade_volumes else 0.0, # Normalize to thousands + len(recent_trades) / 60.0 # Trades per second + ]) + else: + features.extend([0.0] * 5) + + # === LIQUIDITY CONSUMPTION PATTERNS === + if recent_trades: + # Analyze if trades are consuming top-of-book vs deeper levels + top_book_trades = len([t for t in recent_trades if t.get('level', 1) == 1]) + deep_book_trades = len([t for t in recent_trades if t.get('level', 1) > 3]) + + features.extend([ + top_book_trades / max(len(recent_trades), 1), + deep_book_trades / max(len(recent_trades), 1), + np.mean([t.get('level', 1) for t in recent_trades]) + ]) + else: + features.extend([0.0, 0.0, 1.0]) + + # === ORDER BOOK PRESSURE === + pressure_features = self._calculate_order_book_pressure(symbol) + if pressure_features: + features.extend(pressure_features) # Should be 7 features + else: + features.extend([0.0] * 7) + + # === TIME-OF-DAY EFFECTS === + current_time = datetime.now() + features.extend([ + current_time.hour / 24.0, # Hour of day normalized + current_time.minute / 60.0, # Minute of hour normalized + current_time.weekday() / 7.0, # Day of week normalized + 1.0 if 9 <= current_time.hour <= 16 else 0.0, # Market hours indicator + 1.0 if current_time.weekday() < 5 else 0.0 # Weekday indicator + ]) + + # Ensure exactly 25 features + while len(features) < 25: + features.append(0.0) + + return features[:25] + + except Exception as e: + logger.warning(f"Error getting microstructure BOM features for {symbol}: {e}") + return [0.0] * 25 + + def _calculate_order_book_pressure(self, symbol: str) -> Optional[List[float]]: + """Calculate order book pressure indicators (7 features)""" + try: + if not hasattr(self, 'cob_integration') or not self.cob_integration: + return [0.0] * 7 + + cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol) + if not cob_snapshot: + return [0.0] * 7 + + # Calculate various pressure metrics + features = [] + + # 1. Bid-Ask Volume Ratio (different levels) + if cob_snapshot.consolidated_bids and cob_snapshot.consolidated_asks: + level_1_bid = cob_snapshot.consolidated_bids[0].total_volume_usd + level_1_ask = cob_snapshot.consolidated_asks[0].total_volume_usd + ratio_1 = level_1_bid / (level_1_bid + level_1_ask) if (level_1_bid + level_1_ask) > 0 else 0.5 + + # Top 5 levels ratio + top_5_bid = sum(level.total_volume_usd for level in cob_snapshot.consolidated_bids[:5]) + top_5_ask = sum(level.total_volume_usd for level in cob_snapshot.consolidated_asks[:5]) + ratio_5 = top_5_bid / (top_5_bid + top_5_ask) if (top_5_bid + top_5_ask) > 0 else 0.5 + + features.extend([ratio_1, ratio_5]) + else: + features.extend([0.5, 0.5]) + + # 2. Depth asymmetry + bid_depth = len(cob_snapshot.consolidated_bids) + ask_depth = len(cob_snapshot.consolidated_asks) + depth_asymmetry = (bid_depth - ask_depth) / (bid_depth + ask_depth) if (bid_depth + ask_depth) > 0 else 0.0 + features.append(depth_asymmetry) + + # 3. Volume concentration (Gini coefficient approximation) + if cob_snapshot.consolidated_bids: + bid_volumes = [level.total_volume_usd for level in cob_snapshot.consolidated_bids[:10]] + bid_concentration = self._calculate_concentration_index(bid_volumes) + else: + bid_concentration = 0.0 + + if cob_snapshot.consolidated_asks: + ask_volumes = [level.total_volume_usd for level in cob_snapshot.consolidated_asks[:10]] + ask_concentration = self._calculate_concentration_index(ask_volumes) + else: + ask_concentration = 0.0 + + features.extend([bid_concentration, ask_concentration]) + + # 4. Exchange diversity impact + if cob_snapshot.consolidated_bids: + avg_exchanges_per_level = np.mean([len(level.exchange_breakdown) for level in cob_snapshot.consolidated_bids[:5]]) + max_exchanges = 5.0 # Assuming max 5 exchanges + exchange_diversity_bid = avg_exchanges_per_level / max_exchanges + else: + exchange_diversity_bid = 0.0 + + if cob_snapshot.consolidated_asks: + avg_exchanges_per_level = np.mean([len(level.exchange_breakdown) for level in cob_snapshot.consolidated_asks[:5]]) + exchange_diversity_ask = avg_exchanges_per_level / max_exchanges + else: + exchange_diversity_ask = 0.0 + + features.extend([exchange_diversity_bid, exchange_diversity_ask]) + + return features[:7] + + except Exception as e: + logger.warning(f"Error calculating order book pressure for {symbol}: {e}") + return [0.0] * 7 + + def _calculate_concentration_index(self, volumes: List[float]) -> float: + """Calculate volume concentration index (simplified Gini coefficient)""" + try: + if not volumes or len(volumes) < 2: + return 0.0 + + total_volume = sum(volumes) + if total_volume == 0: + return 0.0 + + # Sort volumes in ascending order + sorted_volumes = sorted(volumes) + n = len(sorted_volumes) + + # Calculate Gini coefficient + sum_product = sum((i + 1) * vol for i, vol in enumerate(sorted_volumes)) + gini = (2 * sum_product) / (n * total_volume) - (n + 1) / n + + return gini + + except Exception as e: + logger.warning(f"Error calculating concentration index: {e}") + return 0.0 + + def _add_temporal_dynamics_to_bom(self, bom_matrix: np.ndarray, symbol: str) -> np.ndarray: + """Add temporal dynamics to BOM matrix to simulate order book changes over time""" + try: + sequence_length, features = bom_matrix.shape + + # Add small random variations to simulate order book dynamics + # In real implementation, this would be historical order book snapshots + noise_scale = 0.05 # 5% noise + + for t in range(1, sequence_length): + # Add temporal correlation - each timestep slightly different from previous + correlation = 0.95 # High correlation between adjacent timesteps + random_change = np.random.normal(0, noise_scale, features) + + bom_matrix[t] = bom_matrix[t-1] * correlation + bom_matrix[t] * (1 - correlation) + random_change + + # Ensure values stay within reasonable bounds + bom_matrix = np.clip(bom_matrix, -5.0, 5.0) + + return bom_matrix + + except Exception as e: + logger.warning(f"Error adding temporal dynamics to BOM matrix: {e}") + return bom_matrix + + def _combine_market_and_bom_features(self, market_matrix: np.ndarray, bom_matrix: np.ndarray, symbol: str) -> np.ndarray: + """ + Combine traditional market features with BOM matrix features + + Args: + market_matrix: Traditional market data features (sequence_length, market_features) + bom_matrix: BOM matrix features (sequence_length, bom_features) + symbol: Trading symbol + + Returns: + Combined feature matrix (sequence_length, market_features + bom_features) + """ + try: + # Ensure both matrices have the same sequence length + min_length = min(market_matrix.shape[0], bom_matrix.shape[0]) + + market_trimmed = market_matrix[:min_length] + bom_trimmed = bom_matrix[:min_length] + + # Combine horizontally + combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1) + + logger.debug(f"Combined market and BOM features for {symbol}: " + f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, " + f"combined={combined_matrix.shape}") + + return combined_matrix.astype(np.float32) + + except Exception as e: + logger.error(f"Error combining market and BOM features for {symbol}: {e}") + # Fallback to market features only + return market_matrix def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]: """Get latest price for symbol and timeframe from universal data stream"""