bom to CNN

This commit is contained in:
Dobromir Popov
2025-06-24 20:25:46 +03:00
parent 06fbbeb81e
commit 1d09b3778e

View File

@ -704,7 +704,7 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
return df
def _get_cnn_features_for_rl(self, symbol: str) -> Tuple[Optional[Dict[str, np.ndarray]], Optional[Dict[str, np.ndarray]]]:
"""Get CNN hidden features and predictions for RL state building"""
"""Get CNN hidden features and predictions for RL state building with BOM matrix integration"""
try:
# Try to get CNN features from model registry
if hasattr(self, 'model_registry') and self.model_registry:
@ -723,9 +723,24 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
window_size=50
)
# Get BOM (Book of Market) matrix data
bom_matrix = self._get_bom_matrix_for_cnn(symbol)
if feature_matrix is not None:
# Enhance feature matrix with BOM data if available
if bom_matrix is not None:
enhanced_matrix = self._combine_market_and_bom_features(
feature_matrix, bom_matrix, symbol
)
logger.debug(f"Enhanced CNN features with BOM matrix for {symbol}: "
f"market_shape={feature_matrix.shape}, bom_shape={bom_matrix.shape}, "
f"combined_shape={enhanced_matrix.shape}")
else:
enhanced_matrix = feature_matrix
logger.debug(f"Using market features only for CNN {symbol}: shape={feature_matrix.shape}")
# Extract hidden features and predictions
model_hidden, model_pred = self._extract_cnn_features(model, feature_matrix)
model_hidden, model_pred = self._extract_cnn_features(model, enhanced_matrix)
if model_hidden is not None:
hidden_features[model_name] = model_hidden
if model_pred is not None:
@ -741,6 +756,480 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
except Exception as e:
logger.warning(f"Error getting CNN features for {symbol}: {e}")
return None, None
def _get_bom_matrix_for_cnn(self, symbol: str) -> Optional[np.ndarray]:
"""
Generate BOM (Book of Market) matrix for CNN input
BOM Matrix contains:
- Order book depth (20 levels bid/ask)
- Volume profile distribution
- Order flow intensity patterns
- Market microstructure signals
- Exchange-specific liquidity data
Returns:
np.ndarray: BOM matrix of shape (sequence_length, bom_features)
where bom_features typically = 120 features
"""
try:
sequence_length = 50 # Match standard CNN sequence length
bom_features = []
# === 1. CONSOLIDATED ORDER BOOK DATA ===
cob_features = self._get_cob_bom_features(symbol)
if cob_features:
bom_features.extend(cob_features) # ~40 features
else:
bom_features.extend([0.0] * 40)
# === 2. VOLUME PROFILE FEATURES ===
volume_profile_features = self._get_volume_profile_bom_features(symbol)
if volume_profile_features:
bom_features.extend(volume_profile_features) # ~30 features
else:
bom_features.extend([0.0] * 30)
# === 3. ORDER FLOW INTENSITY ===
flow_intensity_features = self._get_flow_intensity_bom_features(symbol)
if flow_intensity_features:
bom_features.extend(flow_intensity_features) # ~25 features
else:
bom_features.extend([0.0] * 25)
# === 4. MARKET MICROSTRUCTURE SIGNALS ===
microstructure_features = self._get_microstructure_bom_features(symbol)
if microstructure_features:
bom_features.extend(microstructure_features) # ~25 features
else:
bom_features.extend([0.0] * 25)
# Pad or trim to exactly 120 features
if len(bom_features) > 120:
bom_features = bom_features[:120]
elif len(bom_features) < 120:
bom_features.extend([0.0] * (120 - len(bom_features)))
# Create time series matrix by repeating features across sequence
# In real implementation, you might want historical BOM data
bom_matrix = np.tile(bom_features, (sequence_length, 1))
# Add temporal dynamics (simulate order book changes over time)
bom_matrix = self._add_temporal_dynamics_to_bom(bom_matrix, symbol)
logger.debug(f"Generated BOM matrix for {symbol}: shape={bom_matrix.shape}")
return bom_matrix.astype(np.float32)
except Exception as e:
logger.warning(f"Error generating BOM matrix for {symbol}: {e}")
return None
def _get_cob_bom_features(self, symbol: str) -> Optional[List[float]]:
"""Extract COB features for BOM matrix (40 features)"""
try:
features = []
if hasattr(self, 'cob_integration') and self.cob_integration:
cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol)
if cob_snapshot:
# Top 10 bid levels (price offset + volume)
for i in range(10):
if i < len(cob_snapshot.consolidated_bids):
level = cob_snapshot.consolidated_bids[i]
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
volume_normalized = level.total_volume_usd / 1000000 # Normalize to millions
features.extend([price_offset, volume_normalized])
else:
features.extend([0.0, 0.0])
# Top 10 ask levels (price offset + volume)
for i in range(10):
if i < len(cob_snapshot.consolidated_asks):
level = cob_snapshot.consolidated_asks[i]
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
volume_normalized = level.total_volume_usd / 1000000
features.extend([price_offset, volume_normalized])
else:
features.extend([0.0, 0.0])
return features[:40] # Ensure exactly 40 features
return None
except Exception as e:
logger.warning(f"Error getting COB BOM features for {symbol}: {e}")
return None
def _get_volume_profile_bom_features(self, symbol: str) -> Optional[List[float]]:
"""Extract volume profile features for BOM matrix (30 features)"""
try:
features = []
if hasattr(self, 'cob_integration') and self.cob_integration:
# Get session volume profile
volume_profile = self.cob_integration.get_session_volume_profile(symbol)
if volume_profile and 'data' in volume_profile:
svp_data = volume_profile['data']
# Sort by volume and get top 10 levels
top_levels = sorted(svp_data, key=lambda x: x['total_volume'], reverse=True)[:10]
for level in top_levels:
features.extend([
level.get('buy_percent', 50.0) / 100.0, # Normalize to 0-1
level.get('sell_percent', 50.0) / 100.0,
level.get('total_volume', 0.0) / 1000000 # Normalize to millions
])
# Pad to 30 features (10 levels * 3 features)
while len(features) < 30:
features.extend([0.5, 0.5, 0.0]) # Neutral buy/sell, zero volume
return features[:30]
return None
except Exception as e:
logger.warning(f"Error getting volume profile BOM features for {symbol}: {e}")
return None
def _get_flow_intensity_bom_features(self, symbol: str) -> Optional[List[float]]:
"""Extract order flow intensity features for BOM matrix (25 features)"""
try:
# Get recent trade flow data for analysis
trade_flow_data = self._get_recent_trade_data_for_flow_analysis(symbol, 300)
if not trade_flow_data:
return [0.0] * 25
features = []
# === AGGRESSIVE ORDER FLOW ANALYSIS ===
aggressive_buys = [t for t in trade_flow_data if t.get('aggressive_side') == 'buy']
aggressive_sells = [t for t in trade_flow_data if t.get('aggressive_side') == 'sell']
total_trades = len(trade_flow_data)
if total_trades > 0:
features.extend([
len(aggressive_buys) / total_trades, # Aggressive buy ratio
len(aggressive_sells) / total_trades, # Aggressive sell ratio
sum(t.get('volume', 0) for t in aggressive_buys) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1),
sum(t.get('volume', 0) for t in aggressive_sells) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1),
np.mean([t.get('size', 0) for t in aggressive_buys]) if aggressive_buys else 0.0,
np.mean([t.get('size', 0) for t in aggressive_sells]) if aggressive_sells else 0.0
])
else:
features.extend([0.0] * 6)
# === BLOCK TRADE DETECTION ===
large_trades = [t for t in trade_flow_data if t.get('volume', 0) > 10000] # >$10k trades
if trade_flow_data:
features.extend([
len(large_trades) / len(trade_flow_data),
sum(t.get('volume', 0) for t in large_trades) / max(sum(t.get('volume', 0) for t in trade_flow_data), 1),
np.mean([t.get('volume', 0) for t in large_trades]) if large_trades else 0.0
])
else:
features.extend([0.0] * 3)
# === FLOW VELOCITY METRICS ===
if len(trade_flow_data) > 1:
time_deltas = []
for i in range(1, len(trade_flow_data)):
time_delta = (trade_flow_data[i]['timestamp'] - trade_flow_data[i-1]['timestamp']).total_seconds()
time_deltas.append(time_delta)
features.extend([
np.mean(time_deltas) if time_deltas else 1.0, # Average time between trades
np.std(time_deltas) if len(time_deltas) > 1 else 0.0, # Time volatility
min(time_deltas) if time_deltas else 1.0, # Fastest execution
len(trade_flow_data) / 300.0 # Trade rate per second
])
else:
features.extend([1.0, 0.0, 1.0, 0.0])
# === PRICE IMPACT ANALYSIS ===
price_changes = []
for trade in trade_flow_data:
if 'price_before' in trade and 'price_after' in trade:
price_impact = abs(trade['price_after'] - trade['price_before']) / trade['price_before']
price_changes.append(price_impact)
if price_changes:
features.extend([
np.mean(price_changes),
np.max(price_changes),
np.std(price_changes)
])
else:
features.extend([0.0, 0.0, 0.0])
# === MOMENTUM INDICATORS ===
if len(trade_flow_data) >= 10:
recent_volume = sum(t.get('volume', 0) for t in trade_flow_data[-10:])
earlier_volume = sum(t.get('volume', 0) for t in trade_flow_data[:-10])
momentum = recent_volume / max(earlier_volume, 1) if earlier_volume > 0 else 1.0
recent_aggressive_ratio = len([t for t in trade_flow_data[-10:] if t.get('aggressive_side') == 'buy']) / 10
earlier_aggressive_ratio = len([t for t in trade_flow_data[:-10] if t.get('aggressive_side') == 'buy']) / max(len(trade_flow_data) - 10, 1)
features.extend([
momentum,
recent_aggressive_ratio - earlier_aggressive_ratio,
recent_aggressive_ratio
])
else:
features.extend([1.0, 0.0, 0.5])
# === INSTITUTIONAL ACTIVITY INDICATORS ===
# Detect iceberg orders and large hidden liquidity
volume_spikes = [t for t in trade_flow_data if t.get('volume', 0) > np.mean([x.get('volume', 0) for x in trade_flow_data]) * 3]
uniform_sizes = len([t for t in trade_flow_data if t.get('size', 0) in [0.1, 0.01, 1.0, 10.0]]) # Common algo sizes
features.extend([
len(volume_spikes) / max(len(trade_flow_data), 1),
uniform_sizes / max(len(trade_flow_data), 1),
np.std([t.get('size', 0) for t in trade_flow_data]) if trade_flow_data else 0.0
])
# Ensure exactly 25 features
while len(features) < 25:
features.append(0.0)
return features[:25]
except Exception as e:
logger.warning(f"Error getting flow intensity BOM features for {symbol}: {e}")
return [0.0] * 25
def _get_microstructure_bom_features(self, symbol: str) -> Optional[List[float]]:
"""Extract market microstructure features for BOM matrix (25 features)"""
try:
features = []
# === SPREAD DYNAMICS ===
if hasattr(self, 'cob_integration') and self.cob_integration:
cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol)
if cob_snapshot:
features.extend([
cob_snapshot.spread_bps / 100.0, # Normalize spread
cob_snapshot.liquidity_imbalance, # Already normalized -1 to 1
len(cob_snapshot.exchanges_active) / 5.0, # Normalize to max 5 exchanges
cob_snapshot.total_bid_liquidity / 1000000.0, # Normalize to millions
cob_snapshot.total_ask_liquidity / 1000000.0
])
else:
features.extend([0.0] * 5)
else:
features.extend([0.0] * 5)
# === MARKET DEPTH ANALYSIS ===
recent_trades = self._get_recent_trade_data_for_flow_analysis(symbol, 60) # Last 1 minute
if recent_trades:
trade_sizes = [t.get('size', 0) for t in recent_trades]
trade_volumes = [t.get('volume', 0) for t in recent_trades]
features.extend([
np.mean(trade_sizes) if trade_sizes else 0.0,
np.median(trade_sizes) if trade_sizes else 0.0,
np.std(trade_sizes) if len(trade_sizes) > 1 else 0.0,
np.mean(trade_volumes) / 1000.0 if trade_volumes else 0.0, # Normalize to thousands
len(recent_trades) / 60.0 # Trades per second
])
else:
features.extend([0.0] * 5)
# === LIQUIDITY CONSUMPTION PATTERNS ===
if recent_trades:
# Analyze if trades are consuming top-of-book vs deeper levels
top_book_trades = len([t for t in recent_trades if t.get('level', 1) == 1])
deep_book_trades = len([t for t in recent_trades if t.get('level', 1) > 3])
features.extend([
top_book_trades / max(len(recent_trades), 1),
deep_book_trades / max(len(recent_trades), 1),
np.mean([t.get('level', 1) for t in recent_trades])
])
else:
features.extend([0.0, 0.0, 1.0])
# === ORDER BOOK PRESSURE ===
pressure_features = self._calculate_order_book_pressure(symbol)
if pressure_features:
features.extend(pressure_features) # Should be 7 features
else:
features.extend([0.0] * 7)
# === TIME-OF-DAY EFFECTS ===
current_time = datetime.now()
features.extend([
current_time.hour / 24.0, # Hour of day normalized
current_time.minute / 60.0, # Minute of hour normalized
current_time.weekday() / 7.0, # Day of week normalized
1.0 if 9 <= current_time.hour <= 16 else 0.0, # Market hours indicator
1.0 if current_time.weekday() < 5 else 0.0 # Weekday indicator
])
# Ensure exactly 25 features
while len(features) < 25:
features.append(0.0)
return features[:25]
except Exception as e:
logger.warning(f"Error getting microstructure BOM features for {symbol}: {e}")
return [0.0] * 25
def _calculate_order_book_pressure(self, symbol: str) -> Optional[List[float]]:
"""Calculate order book pressure indicators (7 features)"""
try:
if not hasattr(self, 'cob_integration') or not self.cob_integration:
return [0.0] * 7
cob_snapshot = self.cob_integration.get_consolidated_orderbook(symbol)
if not cob_snapshot:
return [0.0] * 7
# Calculate various pressure metrics
features = []
# 1. Bid-Ask Volume Ratio (different levels)
if cob_snapshot.consolidated_bids and cob_snapshot.consolidated_asks:
level_1_bid = cob_snapshot.consolidated_bids[0].total_volume_usd
level_1_ask = cob_snapshot.consolidated_asks[0].total_volume_usd
ratio_1 = level_1_bid / (level_1_bid + level_1_ask) if (level_1_bid + level_1_ask) > 0 else 0.5
# Top 5 levels ratio
top_5_bid = sum(level.total_volume_usd for level in cob_snapshot.consolidated_bids[:5])
top_5_ask = sum(level.total_volume_usd for level in cob_snapshot.consolidated_asks[:5])
ratio_5 = top_5_bid / (top_5_bid + top_5_ask) if (top_5_bid + top_5_ask) > 0 else 0.5
features.extend([ratio_1, ratio_5])
else:
features.extend([0.5, 0.5])
# 2. Depth asymmetry
bid_depth = len(cob_snapshot.consolidated_bids)
ask_depth = len(cob_snapshot.consolidated_asks)
depth_asymmetry = (bid_depth - ask_depth) / (bid_depth + ask_depth) if (bid_depth + ask_depth) > 0 else 0.0
features.append(depth_asymmetry)
# 3. Volume concentration (Gini coefficient approximation)
if cob_snapshot.consolidated_bids:
bid_volumes = [level.total_volume_usd for level in cob_snapshot.consolidated_bids[:10]]
bid_concentration = self._calculate_concentration_index(bid_volumes)
else:
bid_concentration = 0.0
if cob_snapshot.consolidated_asks:
ask_volumes = [level.total_volume_usd for level in cob_snapshot.consolidated_asks[:10]]
ask_concentration = self._calculate_concentration_index(ask_volumes)
else:
ask_concentration = 0.0
features.extend([bid_concentration, ask_concentration])
# 4. Exchange diversity impact
if cob_snapshot.consolidated_bids:
avg_exchanges_per_level = np.mean([len(level.exchange_breakdown) for level in cob_snapshot.consolidated_bids[:5]])
max_exchanges = 5.0 # Assuming max 5 exchanges
exchange_diversity_bid = avg_exchanges_per_level / max_exchanges
else:
exchange_diversity_bid = 0.0
if cob_snapshot.consolidated_asks:
avg_exchanges_per_level = np.mean([len(level.exchange_breakdown) for level in cob_snapshot.consolidated_asks[:5]])
exchange_diversity_ask = avg_exchanges_per_level / max_exchanges
else:
exchange_diversity_ask = 0.0
features.extend([exchange_diversity_bid, exchange_diversity_ask])
return features[:7]
except Exception as e:
logger.warning(f"Error calculating order book pressure for {symbol}: {e}")
return [0.0] * 7
def _calculate_concentration_index(self, volumes: List[float]) -> float:
"""Calculate volume concentration index (simplified Gini coefficient)"""
try:
if not volumes or len(volumes) < 2:
return 0.0
total_volume = sum(volumes)
if total_volume == 0:
return 0.0
# Sort volumes in ascending order
sorted_volumes = sorted(volumes)
n = len(sorted_volumes)
# Calculate Gini coefficient
sum_product = sum((i + 1) * vol for i, vol in enumerate(sorted_volumes))
gini = (2 * sum_product) / (n * total_volume) - (n + 1) / n
return gini
except Exception as e:
logger.warning(f"Error calculating concentration index: {e}")
return 0.0
def _add_temporal_dynamics_to_bom(self, bom_matrix: np.ndarray, symbol: str) -> np.ndarray:
"""Add temporal dynamics to BOM matrix to simulate order book changes over time"""
try:
sequence_length, features = bom_matrix.shape
# Add small random variations to simulate order book dynamics
# In real implementation, this would be historical order book snapshots
noise_scale = 0.05 # 5% noise
for t in range(1, sequence_length):
# Add temporal correlation - each timestep slightly different from previous
correlation = 0.95 # High correlation between adjacent timesteps
random_change = np.random.normal(0, noise_scale, features)
bom_matrix[t] = bom_matrix[t-1] * correlation + bom_matrix[t] * (1 - correlation) + random_change
# Ensure values stay within reasonable bounds
bom_matrix = np.clip(bom_matrix, -5.0, 5.0)
return bom_matrix
except Exception as e:
logger.warning(f"Error adding temporal dynamics to BOM matrix: {e}")
return bom_matrix
def _combine_market_and_bom_features(self, market_matrix: np.ndarray, bom_matrix: np.ndarray, symbol: str) -> np.ndarray:
"""
Combine traditional market features with BOM matrix features
Args:
market_matrix: Traditional market data features (sequence_length, market_features)
bom_matrix: BOM matrix features (sequence_length, bom_features)
symbol: Trading symbol
Returns:
Combined feature matrix (sequence_length, market_features + bom_features)
"""
try:
# Ensure both matrices have the same sequence length
min_length = min(market_matrix.shape[0], bom_matrix.shape[0])
market_trimmed = market_matrix[:min_length]
bom_trimmed = bom_matrix[:min_length]
# Combine horizontally
combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1)
logger.debug(f"Combined market and BOM features for {symbol}: "
f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, "
f"combined={combined_matrix.shape}")
return combined_matrix.astype(np.float32)
except Exception as e:
logger.error(f"Error combining market and BOM features for {symbol}: {e}")
# Fallback to market features only
return market_matrix
def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]:
"""Get latest price for symbol and timeframe from universal data stream"""