From 87193f3d6f3d7240efacdc724f0df52262fde305 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 9 Aug 2025 00:58:36 +0300 Subject: [PATCH] COB info restored, better COB heatmap, restore kill processes --- core/data_provider.py | 4 +- core/enhanced_cob_websocket.py | 10 +-- core/standardized_data_provider.py | 42 ++-------- scripts/kill_stale_processes.ps1 | 38 +++++++++ web/clean_dashboard.py | 20 +++-- web/component_manager.py | 127 +++++++++++++++++++++-------- 6 files changed, 152 insertions(+), 89 deletions(-) create mode 100644 scripts/kill_stale_processes.ps1 diff --git a/core/data_provider.py b/core/data_provider.py index 78991a5..830e1fd 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -4198,9 +4198,9 @@ class DataProvider: if symbol not in self.cob_data_cache: self.cob_data_cache[symbol] = [] - # Add to cache with max size limit + # Add to cache with max size limit (30 minutes of 1s data) self.cob_data_cache[symbol].append(cob_snapshot) - if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes of 1s data + if len(self.cob_data_cache[symbol]) > 1800: self.cob_data_cache[symbol].pop(0) # Notify subscribers diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index 26f7d86..4639997 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -430,17 +430,9 @@ class EnhancedCOBWebSocket: # 3. ticker - 24hr ticker statistics (includes volume) # 4. aggTrade - Aggregated trade data for volume analysis - # Configure kline stream with timezone offset if specified - if self.timezone_offset: - kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}" - logger.info(f"Using timezone offset {self.timezone_offset} for kline stream") - else: - kline_stream = f"{ws_symbol}@kline_1s" - logger.info("Using UTC timezone for kline stream") - + # Build only supported, critical streams for stability (exclude kline_1s which may be unsupported) streams = [ f"{ws_symbol}@depth@{self.update_speed}", # Order book diff depth - kline_stream, # 1-second candlesticks (with timezone) f"{ws_symbol}@ticker", # 24hr ticker with volume f"{ws_symbol}@aggTrade" # Aggregated trades ] diff --git a/core/standardized_data_provider.py b/core/standardized_data_provider.py index 58bac27..cad82f5 100644 --- a/core/standardized_data_provider.py +++ b/core/standardized_data_provider.py @@ -74,35 +74,11 @@ class StandardizedDataProvider(DataProvider): logger.info("StandardizedDataProvider initialized with BaseDataInput support") def _initialize_cob_provider(self): - """Initialize COB provider for order book data""" + """Unify on parent EnhancedCOBWebSocket. Disable separate COB provider.""" try: - from .multi_exchange_cob_provider import MultiExchangeCOBProvider, ExchangeConfig, ExchangeType - - # Configure exchanges (focusing on Binance for now) - exchange_configs = { - 'binance': ExchangeConfig( - exchange_type=ExchangeType.BINANCE, - weight=1.0, - enabled=True, - websocket_url="wss://stream.binance.com:9443/ws/", - symbols_mapping={symbol: symbol.replace('/', '').lower() for symbol in self.symbols} - ), - # CoinAPI REST for supplemental depth snapshots (merged with WS streams) - 'coinapi': ExchangeConfig( - exchange_type=ExchangeType.COINAPI, - weight=0.6, - enabled=True, - rest_api_url="https://rest.coinapi.io/v1/", - symbols_mapping={symbol: symbol.replace('/', '_').replace('USDT', 'USD') for symbol in self.symbols}, - rate_limits={"min_interval_ms": 500} - ) - } - - self.cob_provider = MultiExchangeCOBProvider(self.symbols, exchange_configs) - logger.info("COB provider initialized successfully") - - except Exception as e: - logger.warning(f"Failed to initialize COB provider: {e}") + self.cob_provider = None + logger.info("Using unified EnhancedCOBWebSocket from DataProvider; external COB provider disabled") + except Exception: self.cob_provider = None def get_base_data_input(self, symbol: str, timestamp: Optional[datetime] = None) -> Optional[BaseDataInput]: @@ -445,10 +421,7 @@ class StandardizedDataProvider(DataProvider): if hasattr(super(), 'start_real_time_processing'): super().start_real_time_processing() - # Start COB provider if available - if self.cob_provider: - import asyncio - asyncio.create_task(self.cob_provider.start_streaming()) + # Unified: COB streaming handled by parent DataProvider.start_cob_collection() logger.info("Started real-time processing for standardized data") @@ -458,10 +431,7 @@ class StandardizedDataProvider(DataProvider): def stop_real_time_processing(self): """Stop real-time processing""" try: - # Stop COB provider if available - if self.cob_provider: - import asyncio - asyncio.create_task(self.cob_provider.stop_streaming()) + # Unified: No separate COB provider to stop # Stop parent class processing if hasattr(super(), 'stop_real_time_processing'): diff --git a/scripts/kill_stale_processes.ps1 b/scripts/kill_stale_processes.ps1 new file mode 100644 index 0000000..80497f7 --- /dev/null +++ b/scripts/kill_stale_processes.ps1 @@ -0,0 +1,38 @@ +# Kill stale Python dashboard processes +# Enhanced version with better error handling and logging + +Write-Host "Checking for stale Python dashboard processes..." + +try { + # Get all Python processes + $pythonProcesses = Get-Process python -ErrorAction SilentlyContinue + + if ($pythonProcesses) { + # Filter for dashboard processes + $dashboardProcesses = $pythonProcesses | Where-Object { + $_.ProcessName -eq 'python' -and + $_.MainWindowTitle -like '*dashboard*' + } + + if ($dashboardProcesses) { + Write-Host "Found $($dashboardProcesses.Count) dashboard process(es) to kill:" + foreach ($process in $dashboardProcesses) { + Write-Host " - PID: $($process.Id), Title: $($process.MainWindowTitle)" + } + + # Kill the processes + $dashboardProcesses | Stop-Process -Force -ErrorAction SilentlyContinue + Write-Host "Successfully killed $($dashboardProcesses.Count) dashboard process(es)" + } else { + Write-Host "No dashboard processes found to kill" + } + } else { + Write-Host "No Python processes found" + } +} catch { + Write-Host "Error checking for processes: $($_.Exception.Message)" +} + +# Wait a moment for processes to fully terminate +Start-Sleep -Seconds 1 +Write-Host "Process cleanup completed" diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 6170a5b..06d6158 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -1370,6 +1370,7 @@ class CleanTradingDashboard: eth_rate = _calc_update_rate('ETH/USDT') btc_rate = _calc_update_rate('BTC/USDT') + # Unified COB timeseries source: provider's 1s aggregation eth_agg_1s = self.data_provider.get_cob_1s_aggregated('ETH/USDT') if hasattr(self.data_provider, 'get_cob_1s_aggregated') else [] btc_agg_1s = self.data_provider.get_cob_1s_aggregated('BTC/USDT') if hasattr(self.data_provider, 'get_cob_1s_aggregated') else [] eth_recent = _recent_ticks('ETH/USDT') @@ -1425,11 +1426,10 @@ class CleanTradingDashboard: def update_cob_heatmap_eth(n): """Render ETH COB 1s heatmap (±10 buckets, last 5 minutes).""" try: + # Unified heatmap source from provider times, prices, matrix = [], [], [] if hasattr(self.data_provider, 'get_cob_heatmap_matrix'): - times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix( - 'ETH/USDT', seconds=300, bucket_radius=10, metric='liquidity' - ) + times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix('ETH/USDT', seconds=300, bucket_radius=10, metric='liquidity') if not times or not prices or not matrix: fig = go.Figure() fig.add_annotation(text="No COB heatmap data", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) @@ -1493,11 +1493,10 @@ class CleanTradingDashboard: ) def update_cob_heatmap_btc(n): try: + # Unified heatmap source from provider times, prices, matrix = [], [], [] if hasattr(self.data_provider, 'get_cob_heatmap_matrix'): - times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix( - 'BTC/USDT', seconds=300, bucket_radius=10, metric='liquidity' - ) + times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix('BTC/USDT', seconds=300, bucket_radius=10, metric='liquidity') if not times or not prices or not matrix: fig = go.Figure() fig.add_annotation(text="No COB heatmap data", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) @@ -1570,8 +1569,13 @@ class CleanTradingDashboard: # Create panel instance with orchestrator panel = ModelsTrainingPanel(orchestrator=self.orchestrator) - # Render the panel - panel_content = panel.render() + # Prefer create_panel if available; fallback to render + if hasattr(panel, 'create_panel'): + panel_content = panel.create_panel() + elif hasattr(panel, 'render'): + panel_content = panel.render() + else: + panel_content = html.Div([html.Div("Training panel not available", className="text-muted small")]) logger.info("Successfully created training metrics panel") return panel_content diff --git a/web/component_manager.py b/web/component_manager.py index 506fb59..d7f1e68 100644 --- a/web/component_manager.py +++ b/web/component_manager.py @@ -331,22 +331,36 @@ class DashboardComponentManager: html.P(f"Mode: {cob_mode}", className="text-muted small") ]) - # Handle both old format (with stats attribute) and new format (direct attributes) - if hasattr(cob_snapshot, 'stats'): + # Normalize snapshot to support dict-based COB (bids/asks arrays) and legacy object formats + stats = {} + bids = [] + asks = [] + mid_price = 0 + spread_bps = 0 + imbalance = 0 + + if isinstance(cob_snapshot, dict): + stats = cob_snapshot.get('stats', {}) if isinstance(cob_snapshot.get('stats', {}), dict) else {} + mid_price = float(stats.get('mid_price', 0) or 0) + spread_bps = float(stats.get('spread_bps', 0) or 0) + imbalance = float(stats.get('imbalance', 0) or 0) + bids = cob_snapshot.get('bids', []) or [] + asks = cob_snapshot.get('asks', []) or [] + elif hasattr(cob_snapshot, 'stats'): # Old format with stats attribute - stats = cob_snapshot.stats - mid_price = stats.get('mid_price', 0) - spread_bps = stats.get('spread_bps', 0) - imbalance = stats.get('imbalance', 0) - bids = getattr(cob_snapshot, 'consolidated_bids', []) - asks = getattr(cob_snapshot, 'consolidated_asks', []) + stats = cob_snapshot.stats if isinstance(cob_snapshot.stats, dict) else {} + mid_price = float((stats or {}).get('mid_price', 0) or 0) + spread_bps = float((stats or {}).get('spread_bps', 0) or 0) + imbalance = float((stats or {}).get('imbalance', 0) or 0) + bids = getattr(cob_snapshot, 'consolidated_bids', []) or [] + asks = getattr(cob_snapshot, 'consolidated_asks', []) or [] else: - # New COBSnapshot format with direct attributes - mid_price = getattr(cob_snapshot, 'volume_weighted_mid', 0) - spread_bps = getattr(cob_snapshot, 'spread_bps', 0) - imbalance = getattr(cob_snapshot, 'liquidity_imbalance', 0) - bids = getattr(cob_snapshot, 'consolidated_bids', []) - asks = getattr(cob_snapshot, 'consolidated_asks', []) + # New object-like snapshot with direct attributes + mid_price = float(getattr(cob_snapshot, 'volume_weighted_mid', 0) or 0) + spread_bps = float(getattr(cob_snapshot, 'spread_bps', 0) or 0) + imbalance = float(getattr(cob_snapshot, 'liquidity_imbalance', 0) or 0) + bids = getattr(cob_snapshot, 'consolidated_bids', []) or [] + asks = getattr(cob_snapshot, 'consolidated_asks', []) or [] if mid_price == 0 or not bids or not asks: return html.Div([ @@ -401,14 +415,26 @@ class DashboardComponentManager: ]) except Exception: pass - # Append small extras line from aggregated_1s and recent_ticks + # Append small extras line from aggregated_1s and recent_ticks (robust to numeric fields) extras = [] if update_info: agg = (update_info.get('aggregated_1s') or []) if agg and isinstance(agg[-1], dict): last = agg[-1] - avg_spread = last.get('spread', {}).get('average_bps', 0) - avg_imb = last.get('imbalance', {}).get('average', 0) + spread_field = last.get('spread', {}) + if isinstance(spread_field, dict): + avg_spread = spread_field.get('average_bps', 0) + elif isinstance(spread_field, (int, float)): + avg_spread = spread_field + else: + avg_spread = 0 + imb_field = last.get('imbalance', {}) + if isinstance(imb_field, dict): + avg_imb = imb_field.get('average', 0) + elif isinstance(imb_field, (int, float)): + avg_imb = imb_field + else: + avg_imb = 0 tick_count = last.get('tick_count', 0) extras.append(html.Small(f"1s agg: {tick_count} ticks, spread {avg_spread:.1f} bps, imb {avg_imb:.2f}", className="text-muted")) recent = (update_info.get('recent_ticks') or []) @@ -416,8 +442,21 @@ class DashboardComponentManager: extras.append(html.Small(f"Recent ticks: {len(recent)}", className="text-muted ms-2")) extras_div = html.Div(extras, className="mb-1") if extras else None + # Insert mini heatmap inside the COB panel (right side) + heatmap_graph = None + try: + # The dashboard's data provider is accessible through a global reference on the dashboard instance. + # We embed a placeholder Graph here; actual figure is provided by the dashboard callback tied to this id. + graph_id = 'cob-heatmap-eth' if 'ETH' in symbol else 'cob-heatmap-btc' + heatmap_graph = dcc.Graph(id=graph_id, config={'displayModeBar': False}, style={"height": "220px"}) + except Exception: + heatmap_graph = None + children = [dbc.Col(overview_panel, width=5, className="pe-1")] - right_children = [ladder_panel] + right_children = [] + if heatmap_graph: + right_children.append(heatmap_graph) + right_children.append(ladder_panel) if extras_div: right_children.insert(0, extras_div) children.append(dbc.Col(html.Div(right_children), width=7, className="ps-1")) @@ -443,6 +482,16 @@ class DashboardComponentManager: mode_color = "text-success" if cob_mode == "WS" else "text-warning" if cob_mode == "REST" else "text-muted" mode_icon = "fas fa-wifi" if cob_mode == "WS" else "fas fa-globe" if cob_mode == "REST" else "fas fa-question" + def _safe_imb(stats_val, key, fallback): + try: + if isinstance(stats_val, dict): + return float(stats_val.get(key, fallback)) + if isinstance(stats_val, (int, float)): + return float(stats_val) + except Exception: + pass + return float(fallback) + return html.Div([ html.H6(f"{symbol} - COB Overview", className="mb-2"), html.Div([ @@ -467,10 +516,10 @@ class DashboardComponentManager: ]), html.Div([ - self._create_timeframe_imbalance("1s", cumulative_imbalance_stats.get('1s', imbalance) if cumulative_imbalance_stats else imbalance), - self._create_timeframe_imbalance("5s", cumulative_imbalance_stats.get('5s', imbalance) if cumulative_imbalance_stats else imbalance), - self._create_timeframe_imbalance("15s", cumulative_imbalance_stats.get('15s', imbalance) if cumulative_imbalance_stats else imbalance), - self._create_timeframe_imbalance("60s", cumulative_imbalance_stats.get('60s', imbalance) if cumulative_imbalance_stats else imbalance), + self._create_timeframe_imbalance("1s", _safe_imb(cumulative_imbalance_stats, '1s', imbalance)), + self._create_timeframe_imbalance("5s", _safe_imb(cumulative_imbalance_stats, '5s', imbalance)), + self._create_timeframe_imbalance("15s", _safe_imb(cumulative_imbalance_stats, '15s', imbalance)), + self._create_timeframe_imbalance("60s", _safe_imb(cumulative_imbalance_stats, '60s', imbalance)), ], className="d-flex justify-content-between mb-2"), html.Hr(className="my-2"), @@ -529,18 +578,28 @@ class DashboardComponentManager: def aggregate_buckets(orders): buckets = {} for order in orders: - # Handle both dictionary format and ConsolidatedOrderBookLevel objects - if hasattr(order, 'price'): - # ConsolidatedOrderBookLevel object - price = order.price - size = order.total_size - volume_usd = order.total_volume_usd - else: - # Dictionary format (legacy) - price = order.get('price', 0) - # Handle both old format (size) and new format (total_size) - size = order.get('total_size', order.get('size', 0)) - volume_usd = order.get('total_volume_usd', size * price) + # Handle multiple formats: object, dict, or [price, size] + price = 0.0 + size = 0.0 + volume_usd = 0.0 + try: + if hasattr(order, 'price'): + # ConsolidatedOrderBookLevel object + price = float(getattr(order, 'price', 0) or 0) + size = float(getattr(order, 'total_size', getattr(order, 'size', 0)) or 0) + volume_usd = float(getattr(order, 'total_volume_usd', price * size) or (price * size)) + elif isinstance(order, dict): + price = float(order.get('price', 0) or 0) + size = float(order.get('total_size', order.get('size', 0)) or 0) + volume_usd = float(order.get('total_volume_usd', price * size) or (price * size)) + elif isinstance(order, (list, tuple)) and len(order) >= 2: + price = float(order[0] or 0) + size = float(order[1] or 0) + volume_usd = price * size + else: + continue + except Exception: + continue if price > 0: bucket_key = round(price / bucket_size) * bucket_size