From 6a4a73ff0b918adb2a6dedf5b9b2e1a55b33886c Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 24 Jun 2025 20:38:59 +0300 Subject: [PATCH] added 5 min bom data to CNN. respecting port --- core/data_provider.py | 371 +++++++++++++++++++++++++++++++++- core/enhanced_orchestrator.py | 205 +++++++++++-------- main.py | 17 +- 3 files changed, 507 insertions(+), 86 deletions(-) diff --git a/core/data_provider.py b/core/data_provider.py index 01f80f2..f9d77a9 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -142,6 +142,16 @@ class DataProvider: binance_symbol = symbol.replace('/', '').upper() self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size) + # BOM (Book of Market) data caching - 1s resolution for last 5 minutes + self.bom_cache_duration = 300 # 5 minutes in seconds + self.bom_feature_count = 120 # Number of BOM features per timestamp + self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)} + + # Initialize BOM cache for each symbol + for symbol in self.symbols: + # Store 300 seconds worth of 1s BOM data + self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration) + # Initialize tick aggregator for raw tick processing binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols] self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols) @@ -2083,4 +2093,363 @@ class DataProvider: 'distribution_stats': self.distribution_stats.copy(), 'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()}, 'tick_aggregator': aggregator_stats - } \ No newline at end of file + } + + def update_bom_cache(self, symbol: str, bom_features: List[float], cob_integration=None): + """ + Update BOM cache with latest features for a symbol + + Args: + symbol: Trading symbol (e.g., 'ETH/USDT') + bom_features: List of BOM features (should be 120 features) + cob_integration: Optional COB integration instance for real BOM data + """ + try: + current_time = datetime.now() + + # Ensure we have exactly 120 features + if len(bom_features) != self.bom_feature_count: + if len(bom_features) > self.bom_feature_count: + bom_features = bom_features[:self.bom_feature_count] + else: + bom_features.extend([0.0] * (self.bom_feature_count - len(bom_features))) + + # Convert to numpy array for efficient storage + bom_array = np.array(bom_features, dtype=np.float32) + + # Add timestamp and features to cache + with self.data_lock: + self.bom_data_cache[symbol].append((current_time, bom_array)) + + logger.debug(f"Updated BOM cache for {symbol}: {len(self.bom_data_cache[symbol])} timestamps cached") + + except Exception as e: + logger.error(f"Error updating BOM cache for {symbol}: {e}") + + def get_bom_matrix_for_cnn(self, symbol: str, sequence_length: int = 50) -> Optional[np.ndarray]: + """ + Get BOM matrix for CNN input from cached 1s data + + Args: + symbol: Trading symbol (e.g., 'ETH/USDT') + sequence_length: Required sequence length (default 50) + + Returns: + np.ndarray: BOM matrix of shape (sequence_length, 120) or None if insufficient data + """ + try: + with self.data_lock: + if symbol not in self.bom_data_cache or len(self.bom_data_cache[symbol]) == 0: + logger.warning(f"No BOM data cached for {symbol}") + return None + + # Get recent data + cached_data = list(self.bom_data_cache[symbol]) + + if len(cached_data) < sequence_length: + logger.warning(f"Insufficient BOM data for {symbol}: {len(cached_data)} < {sequence_length}") + # Pad with zeros if we don't have enough data + bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32) + + # Fill available data at the end + for i, (timestamp, features) in enumerate(cached_data): + if i < sequence_length: + bom_matrix[sequence_length - len(cached_data) + i] = features + + return bom_matrix + + # Take the most recent sequence_length samples + recent_data = cached_data[-sequence_length:] + + # Create matrix + bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32) + for i, (timestamp, features) in enumerate(recent_data): + bom_matrix[i] = features + + logger.debug(f"Retrieved BOM matrix for {symbol}: shape={bom_matrix.shape}") + return bom_matrix + + except Exception as e: + logger.error(f"Error getting BOM matrix for {symbol}: {e}") + return None + + def generate_synthetic_bom_features(self, symbol: str) -> List[float]: + """ + Generate synthetic BOM features when real COB data is not available + + This creates realistic-looking order book features based on current market data + """ + try: + features = [] + + # Get current price for context + current_price = self.get_current_price(symbol) + if current_price is None: + current_price = 3000.0 # Fallback price + + # === 1. CONSOLIDATED ORDER BOOK DATA (40 features) === + # Top 10 bid levels (price offset + volume) + for i in range(10): + price_offset = -0.001 * (i + 1) * (1 + np.random.normal(0, 0.1)) # Negative for bids + volume_normalized = np.random.exponential(0.5) * (1.0 - i * 0.1) # Decreasing with depth + features.extend([price_offset, volume_normalized]) + + # Top 10 ask levels (price offset + volume) + for i in range(10): + price_offset = 0.001 * (i + 1) * (1 + np.random.normal(0, 0.1)) # Positive for asks + volume_normalized = np.random.exponential(0.5) * (1.0 - i * 0.1) # Decreasing with depth + features.extend([price_offset, volume_normalized]) + + # === 2. VOLUME PROFILE FEATURES (30 features) === + # Top 10 volume levels (buy%, sell%, total volume) + for i in range(10): + buy_percent = 0.3 + np.random.normal(0, 0.2) # Around 30-70% buy + buy_percent = max(0.0, min(1.0, buy_percent)) + sell_percent = 1.0 - buy_percent + total_volume = np.random.exponential(1.0) * (1.0 - i * 0.05) + features.extend([buy_percent, sell_percent, total_volume]) + + # === 3. ORDER FLOW INTENSITY (25 features) === + # Aggressive order flow + features.extend([ + 0.5 + np.random.normal(0, 0.1), # Aggressive buy ratio + 0.5 + np.random.normal(0, 0.1), # Aggressive sell ratio + 0.4 + np.random.normal(0, 0.1), # Buy volume ratio + 0.4 + np.random.normal(0, 0.1), # Sell volume ratio + np.random.exponential(100), # Avg aggressive buy size + np.random.exponential(100), # Avg aggressive sell size + ]) + + # Block trade detection + features.extend([ + 0.1 + np.random.exponential(0.05), # Large trade ratio + 0.2 + np.random.exponential(0.1), # Large trade volume ratio + np.random.exponential(1000), # Avg large trade size + ]) + + # Flow velocity metrics + features.extend([ + 1.0 + np.random.normal(0, 0.2), # Avg time delta + 0.1 + np.random.exponential(0.05), # Time velocity variance + 0.5 + np.random.normal(0, 0.1), # Trade clustering + ]) + + # Institutional activity indicators + features.extend([ + 0.05 + np.random.exponential(0.02), # Iceberg detection + 0.3 + np.random.normal(0, 0.1), # Hidden order ratio + 0.2 + np.random.normal(0, 0.05), # Smart money flow + 0.1 + np.random.exponential(0.03), # Algorithmic activity + ]) + + # Market maker behavior + features.extend([ + 0.6 + np.random.normal(0, 0.1), # MM provision ratio + 0.4 + np.random.normal(0, 0.1), # MM take ratio + 0.02 + np.random.normal(0, 0.005), # Spread tightening + 1.0 + np.random.normal(0, 0.2), # Quote update frequency + 0.8 + np.random.normal(0, 0.1), # Quote stability + ]) + + # === 4. MARKET MICROSTRUCTURE SIGNALS (25 features) === + # Order book pressure + features.extend([ + 0.5 + np.random.normal(0, 0.1), # Bid pressure + 0.5 + np.random.normal(0, 0.1), # Ask pressure + 0.0 + np.random.normal(0, 0.05), # Pressure imbalance + 1.0 + np.random.normal(0, 0.2), # Pressure intensity + 0.5 + np.random.normal(0, 0.1), # Depth stability + ]) + + # Price level concentration + features.extend([ + 0.3 + np.random.normal(0, 0.1), # Bid concentration + 0.3 + np.random.normal(0, 0.1), # Ask concentration + 0.8 + np.random.normal(0, 0.1), # Top level dominance + 0.2 + np.random.normal(0, 0.05), # Fragmentation index + 0.6 + np.random.normal(0, 0.1), # Liquidity clustering + ]) + + # Temporal dynamics + features.extend([ + 0.1 + np.random.normal(0, 0.02), # Volatility factor + 1.0 + np.random.normal(0, 0.1), # Momentum factor + 0.0 + np.random.normal(0, 0.05), # Mean reversion + 0.5 + np.random.normal(0, 0.1), # Trend alignment + 0.8 + np.random.normal(0, 0.1), # Pattern consistency + ]) + + # Exchange-specific patterns + features.extend([ + 0.4 + np.random.normal(0, 0.1), # Cross-exchange correlation + 0.3 + np.random.normal(0, 0.1), # Exchange arbitrage + 0.2 + np.random.normal(0, 0.05), # Latency patterns + 0.8 + np.random.normal(0, 0.1), # Sync quality + 0.6 + np.random.normal(0, 0.1), # Data freshness + ]) + + # Ensure exactly 120 features + if len(features) > 120: + features = features[:120] + elif len(features) < 120: + features.extend([0.0] * (120 - len(features))) + + # Clamp all values to reasonable ranges + features = [max(-5.0, min(5.0, f)) for f in features] + + return features + + except Exception as e: + logger.error(f"Error generating synthetic BOM features for {symbol}: {e}") + return [0.0] * 120 + + def start_bom_cache_updates(self, cob_integration=None): + """ + Start background updates of BOM cache every second + + Args: + cob_integration: Optional COB integration instance for real data + """ + try: + def update_loop(): + while self.is_streaming: + try: + for symbol in self.symbols: + if cob_integration: + # Try to get real BOM features from COB integration + try: + bom_features = self._extract_real_bom_features(symbol, cob_integration) + if bom_features: + self.update_bom_cache(symbol, bom_features, cob_integration) + else: + # Fallback to synthetic + synthetic_features = self.generate_synthetic_bom_features(symbol) + self.update_bom_cache(symbol, synthetic_features) + except Exception as e: + logger.warning(f"Error getting real BOM features for {symbol}: {e}") + synthetic_features = self.generate_synthetic_bom_features(symbol) + self.update_bom_cache(symbol, synthetic_features) + else: + # Generate synthetic BOM features + synthetic_features = self.generate_synthetic_bom_features(symbol) + self.update_bom_cache(symbol, synthetic_features) + + time.sleep(1.0) # Update every second + + except Exception as e: + logger.error(f"Error in BOM cache update loop: {e}") + time.sleep(5.0) # Wait longer on error + + # Start background thread + bom_thread = Thread(target=update_loop, daemon=True) + bom_thread.start() + + logger.info("Started BOM cache updates (1s resolution)") + + except Exception as e: + logger.error(f"Error starting BOM cache updates: {e}") + + def _extract_real_bom_features(self, symbol: str, cob_integration) -> Optional[List[float]]: + """Extract real BOM features from COB integration""" + try: + features = [] + + # Get consolidated order book + if hasattr(cob_integration, 'get_consolidated_orderbook'): + cob_snapshot = cob_integration.get_consolidated_orderbook(symbol) + if cob_snapshot: + # Extract order book features (40 features) + features.extend(self._extract_orderbook_features(cob_snapshot)) + else: + features.extend([0.0] * 40) + else: + features.extend([0.0] * 40) + + # Get volume profile features (30 features) + if hasattr(cob_integration, 'get_session_volume_profile'): + volume_profile = cob_integration.get_session_volume_profile(symbol) + if volume_profile: + features.extend(self._extract_volume_profile_features(volume_profile)) + else: + features.extend([0.0] * 30) + else: + features.extend([0.0] * 30) + + # Add flow and microstructure features (50 features) + features.extend(self._extract_flow_microstructure_features(symbol, cob_integration)) + + # Ensure exactly 120 features + if len(features) > 120: + features = features[:120] + elif len(features) < 120: + features.extend([0.0] * (120 - len(features))) + + return features + + except Exception as e: + logger.warning(f"Error extracting real BOM features for {symbol}: {e}") + return None + + def _extract_orderbook_features(self, cob_snapshot) -> List[float]: + """Extract order book features from COB snapshot""" + features = [] + + try: + # Top 10 bid levels + for i in range(10): + if i < len(cob_snapshot.consolidated_bids): + level = cob_snapshot.consolidated_bids[i] + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + volume_normalized = level.total_volume_usd / 1000000 + features.extend([price_offset, volume_normalized]) + else: + features.extend([0.0, 0.0]) + + # Top 10 ask levels + for i in range(10): + if i < len(cob_snapshot.consolidated_asks): + level = cob_snapshot.consolidated_asks[i] + price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid + volume_normalized = level.total_volume_usd / 1000000 + features.extend([price_offset, volume_normalized]) + else: + features.extend([0.0, 0.0]) + + except Exception as e: + logger.warning(f"Error extracting order book features: {e}") + features = [0.0] * 40 + + return features[:40] + + def _extract_volume_profile_features(self, volume_profile) -> List[float]: + """Extract volume profile features""" + features = [] + + try: + if 'data' in volume_profile: + svp_data = volume_profile['data'] + top_levels = sorted(svp_data, key=lambda x: x.get('total_volume', 0), reverse=True)[:10] + + for level in top_levels: + buy_percent = level.get('buy_percent', 50.0) / 100.0 + sell_percent = level.get('sell_percent', 50.0) / 100.0 + total_volume = level.get('total_volume', 0.0) / 1000000 + features.extend([buy_percent, sell_percent, total_volume]) + + # Pad to 30 features + while len(features) < 30: + features.extend([0.5, 0.5, 0.0]) + + except Exception as e: + logger.warning(f"Error extracting volume profile features: {e}") + features = [0.0] * 30 + + return features[:30] + + def _extract_flow_microstructure_features(self, symbol: str, cob_integration) -> List[float]: + """Extract flow and microstructure features""" + try: + # For now, return synthetic features since full implementation would be complex + return self.generate_synthetic_bom_features(symbol)[70:] # Last 50 features + except: + return [0.0] * 50 \ No newline at end of file diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index 0a78c72..ba718a8 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -197,6 +197,14 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): self.latest_cob_state: Dict[str, np.ndarray] = {} self.cob_feature_history: Dict[str, deque] = {symbol: deque(maxlen=100) for symbol in self.symbols} + # Start BOM cache updates in data provider + if hasattr(self.data_provider, 'start_bom_cache_updates'): + try: + self.data_provider.start_bom_cache_updates(self.cob_integration) + logger.info("Started BOM cache updates in data provider") + except Exception as e: + logger.warning(f"Failed to start BOM cache updates: {e}") + logger.info("COB Integration: Deferred initialization to prevent sync/async conflicts") # Williams integration @@ -759,70 +767,56 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): def _get_bom_matrix_for_cnn(self, symbol: str) -> Optional[np.ndarray]: """ - Generate BOM (Book of Market) matrix for CNN input + Get cached BOM (Book of Market) matrix for CNN input from data provider - BOM Matrix contains: - - Order book depth (20 levels bid/ask) - - Volume profile distribution - - Order flow intensity patterns - - Market microstructure signals - - Exchange-specific liquidity data + Uses 1s cached BOM data from the data provider for proper temporal analysis Returns: - np.ndarray: BOM matrix of shape (sequence_length, bom_features) - where bom_features typically = 120 features + np.ndarray: BOM matrix of shape (sequence_length, 120) from cached 1s data """ try: sequence_length = 50 # Match standard CNN sequence length - bom_features = [] - # === 1. CONSOLIDATED ORDER BOOK DATA === - cob_features = self._get_cob_bom_features(symbol) - if cob_features: - bom_features.extend(cob_features) # ~40 features - else: - bom_features.extend([0.0] * 40) + # Get cached BOM matrix from data provider + if hasattr(self.data_provider, 'get_bom_matrix_for_cnn'): + bom_matrix = self.data_provider.get_bom_matrix_for_cnn(symbol, sequence_length) + if bom_matrix is not None: + logger.debug(f"Retrieved cached BOM matrix for {symbol}: shape={bom_matrix.shape}") + return bom_matrix - # === 2. VOLUME PROFILE FEATURES === - volume_profile_features = self._get_volume_profile_bom_features(symbol) - if volume_profile_features: - bom_features.extend(volume_profile_features) # ~30 features - else: - bom_features.extend([0.0] * 30) - - # === 3. ORDER FLOW INTENSITY === - flow_intensity_features = self._get_flow_intensity_bom_features(symbol) - if flow_intensity_features: - bom_features.extend(flow_intensity_features) # ~25 features - else: - bom_features.extend([0.0] * 25) - - # === 4. MARKET MICROSTRUCTURE SIGNALS === - microstructure_features = self._get_microstructure_bom_features(symbol) - if microstructure_features: - bom_features.extend(microstructure_features) # ~25 features - else: - bom_features.extend([0.0] * 25) - - # Pad or trim to exactly 120 features - if len(bom_features) > 120: - bom_features = bom_features[:120] - elif len(bom_features) < 120: - bom_features.extend([0.0] * (120 - len(bom_features))) - - # Create time series matrix by repeating features across sequence - # In real implementation, you might want historical BOM data - bom_matrix = np.tile(bom_features, (sequence_length, 1)) - - # Add temporal dynamics (simulate order book changes over time) - bom_matrix = self._add_temporal_dynamics_to_bom(bom_matrix, symbol) - - logger.debug(f"Generated BOM matrix for {symbol}: shape={bom_matrix.shape}") - return bom_matrix.astype(np.float32) + # Fallback to generating synthetic BOM matrix if no cache available + logger.warning(f"No cached BOM data available for {symbol}, generating synthetic") + return self._generate_fallback_bom_matrix(symbol, sequence_length) except Exception as e: - logger.warning(f"Error generating BOM matrix for {symbol}: {e}") - return None + logger.warning(f"Error getting BOM matrix for {symbol}: {e}") + return self._generate_fallback_bom_matrix(symbol, sequence_length) + + def _generate_fallback_bom_matrix(self, symbol: str, sequence_length: int) -> np.ndarray: + """Generate fallback BOM matrix when cache is not available""" + try: + # Generate synthetic BOM features for current timestamp + if hasattr(self.data_provider, 'generate_synthetic_bom_features'): + current_features = self.data_provider.generate_synthetic_bom_features(symbol) + else: + current_features = [0.0] * 120 + + # Create temporal variations for the sequence + bom_matrix = np.zeros((sequence_length, 120), dtype=np.float32) + + for i in range(sequence_length): + # Add small random variations to simulate temporal changes + variation_factor = 0.95 + 0.1 * np.random.random() # 5% variation + varied_features = [f * variation_factor for f in current_features] + bom_matrix[i] = np.array(varied_features, dtype=np.float32) + + logger.debug(f"Generated fallback BOM matrix for {symbol}: shape={bom_matrix.shape}") + return bom_matrix + + except Exception as e: + logger.error(f"Error generating fallback BOM matrix for {symbol}: {e}") + # Return zeros as absolute fallback + return np.zeros((sequence_length, 120), dtype=np.float32) def _get_cob_bom_features(self, symbol: str) -> Optional[List[float]]: """Extract COB features for BOM matrix (40 features)""" @@ -1203,33 +1197,77 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): Combine traditional market features with BOM matrix features Args: - market_matrix: Traditional market data features (sequence_length, market_features) - bom_matrix: BOM matrix features (sequence_length, bom_features) + market_matrix: Traditional market data features (timeframes, sequence_length, market_features) - 3D + bom_matrix: BOM matrix features (sequence_length, bom_features) - 2D symbol: Trading symbol Returns: - Combined feature matrix (sequence_length, market_features + bom_features) + Combined feature matrix reshaped for CNN input """ try: - # Ensure both matrices have the same sequence length - min_length = min(market_matrix.shape[0], bom_matrix.shape[0]) + logger.debug(f"Combining features for {symbol}: market={market_matrix.shape}, bom={bom_matrix.shape}") - market_trimmed = market_matrix[:min_length] - bom_trimmed = bom_matrix[:min_length] - - # Combine horizontally - combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1) - - logger.debug(f"Combined market and BOM features for {symbol}: " - f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, " - f"combined={combined_matrix.shape}") - - return combined_matrix.astype(np.float32) + # Handle dimensional mismatch + if market_matrix.ndim == 3 and bom_matrix.ndim == 2: + # Market matrix is (timeframes, sequence_length, features) + # BOM matrix is (sequence_length, bom_features) + + # Reshape market matrix to 2D by flattening timeframes dimension + timeframes, sequence_length, market_features = market_matrix.shape + + # Option 1: Take the last timeframe (most recent data) + market_2d = market_matrix[-1] # Shape: (sequence_length, market_features) + + # Ensure sequence lengths match + min_length = min(market_2d.shape[0], bom_matrix.shape[0]) + market_trimmed = market_2d[:min_length] + bom_trimmed = bom_matrix[:min_length] + + # Combine horizontally + combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1) + + logger.debug(f"Combined features for {symbol}: " + f"market_2d={market_trimmed.shape}, bom={bom_trimmed.shape}, " + f"combined={combined_matrix.shape}") + + return combined_matrix.astype(np.float32) + + elif market_matrix.ndim == 2 and bom_matrix.ndim == 2: + # Both are 2D - can combine directly + min_length = min(market_matrix.shape[0], bom_matrix.shape[0]) + market_trimmed = market_matrix[:min_length] + bom_trimmed = bom_matrix[:min_length] + + combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1) + + logger.debug(f"Combined 2D features for {symbol}: " + f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, " + f"combined={combined_matrix.shape}") + + return combined_matrix.astype(np.float32) + + else: + logger.warning(f"Unsupported matrix dimensions for {symbol}: " + f"market={market_matrix.shape}, bom={bom_matrix.shape}") + # Fallback: reshape market matrix to 2D if needed + if market_matrix.ndim == 3: + market_2d = market_matrix.reshape(-1, market_matrix.shape[-1]) + else: + market_2d = market_matrix + + return market_2d.astype(np.float32) except Exception as e: logger.error(f"Error combining market and BOM features for {symbol}: {e}") - # Fallback to market features only - return market_matrix + # Fallback to reshaped market features only + try: + if market_matrix.ndim == 3: + return market_matrix[-1].astype(np.float32) # Last timeframe + else: + return market_matrix.astype(np.float32) + except: + logger.error(f"Fallback failed for {symbol}, returning zeros") + return np.zeros((50, 5), dtype=np.float32) # Basic fallback def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]: """Get latest price for symbol and timeframe from universal data stream""" @@ -1353,18 +1391,25 @@ class EnhancedTradingOrchestrator(TradingOrchestrator): # Extract predictions (action probabilities) if isinstance(prediction_result, dict): - predictions = prediction_result.get('probabilities', np.array([0.33, 0.33, 0.34])) + predictions = prediction_result.get('probabilities', [0.33, 0.33, 0.34]) confidence = prediction_result.get('confidence', 0.7) - # Ensure predictions is a flat numpy array - if isinstance(predictions, (list, tuple)): - predictions = np.array(predictions, dtype=np.float32) - predictions = np.append(predictions.flatten(), confidence) + # Ensure predictions is a flat list first + if isinstance(predictions, np.ndarray): + predictions = predictions.flatten().tolist() + elif not isinstance(predictions, list): + predictions = [float(predictions)] + # Add confidence as a single float + predictions.append(float(confidence)) + # Convert to flat numpy array + predictions = np.array(predictions, dtype=np.float32) else: - # Ensure prediction_result is a flat numpy array - if isinstance(prediction_result, (list, tuple)): + # Handle direct prediction result + if isinstance(prediction_result, np.ndarray): + predictions = prediction_result.flatten() + elif isinstance(prediction_result, (list, tuple)): predictions = np.array(prediction_result, dtype=np.float32).flatten() else: - predictions = np.array(prediction_result).flatten() + predictions = np.array([float(prediction_result)], dtype=np.float32) # Extract hidden features if model supports it hidden_features = None diff --git a/main.py b/main.py index 2674716..3ec9de2 100644 --- a/main.py +++ b/main.py @@ -53,6 +53,13 @@ async def run_web_dashboard(): # Create data provider data_provider = DataProvider() + # Start real-time streaming for BOM caching + try: + await data_provider.start_real_time_streaming() + logger.info("[SUCCESS] Real-time data streaming started for BOM caching") + except Exception as e: + logger.warning(f"[WARNING] Real-time streaming failed: {e}") + # Verify data connection logger.info("[DATA] Verifying live data connection...") symbol = config.get('symbols', ['ETH/USDT'])[0] @@ -116,12 +123,12 @@ async def run_web_dashboard(): import traceback logger.error(traceback.format_exc()) -def start_web_ui(): +def start_web_ui(port=8051): """Start the main TradingDashboard UI in a separate thread""" try: logger.info("=" * 50) logger.info("Starting Main Trading Dashboard UI...") - logger.info("Trading Dashboard: http://127.0.0.1:8051") + logger.info(f"Trading Dashboard: http://127.0.0.1:{port}") logger.info("COB Integration: ENABLED (Real-time order book visualization)") logger.info("=" * 50) @@ -163,7 +170,7 @@ def start_web_ui(): logger.info("Features: Live trading, COB visualization, RL training monitoring, Position management") # Run the dashboard server (COB integration will start automatically) - dashboard.app.run(host='127.0.0.1', port=8051, debug=False, use_reloader=False) + dashboard.app.run(host='127.0.0.1', port=port, debug=False, use_reloader=False) except Exception as e: logger.error(f"Error starting main trading dashboard UI: {e}") @@ -246,7 +253,7 @@ async def main(): logger.info("STREAMLINED TRADING SYSTEM - TRAINING + MAIN DASHBOARD") logger.info(f"Primary Symbol: {args.symbol}") logger.info(f"Training Port: {args.port}") - logger.info(f"Main Trading Dashboard: http://127.0.0.1:8051") + logger.info(f"Main Trading Dashboard: http://127.0.0.1:{args.port}") logger.info("2-Action System: BUY/SELL with intelligent position management") logger.info("Always Invested: Learning to spot high risk/reward setups") logger.info("Flow: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution") @@ -254,7 +261,7 @@ async def main(): logger.info("=" * 70) # Start main trading dashboard UI in a separate thread - web_thread = Thread(target=start_web_ui, daemon=True) + web_thread = Thread(target=lambda: start_web_ui(args.port), daemon=True) web_thread.start() logger.info("Main trading dashboard UI thread started")