From 1cc8509e879eeef6b7bd79c1415ef4cd2aa36780 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 10 Aug 2025 02:51:06 +0300 Subject: [PATCH] COB heatmap WIP --- core/data_models.py | 32 ++++++++ core/data_provider.py | 167 ++++++++++++++++++++++++++++++----------- web/clean_dashboard.py | 88 ++++++++-------------- 3 files changed, 186 insertions(+), 101 deletions(-) diff --git a/core/data_models.py b/core/data_models.py index f21aafc..6edb7f2 100644 --- a/core/data_models.py +++ b/core/data_models.py @@ -173,6 +173,38 @@ class BaseDataInput: break cob_features.extend(ma_features) + # Add REAL aggregated COB heatmap features to fill remaining COB slots (no synthetic data) + # We compute per-bucket means over the most recent window (up to 300s) and a few global stats + try: + if self.cob_heatmap_values and self.cob_heatmap_prices: + z = np.array(self.cob_heatmap_values, dtype=float) + if z.ndim == 2 and z.size > 0: + # Use up to the last 300 seconds (or whatever is available) + window_rows = z[-300:] if z.shape[0] >= 300 else z + # Replace NaNs with 0.0 to respect the no-synthetic rule but avoid NaN propagation + window_rows = np.nan_to_num(window_rows, nan=0.0, posinf=0.0, neginf=0.0) + + # Per-bucket mean imbalance/liquidity across time + per_bucket_mean = window_rows.mean(axis=0).tolist() + space_left = 200 - len(cob_features) + if space_left > 0 and len(per_bucket_mean) > 0: + cob_features.extend(per_bucket_mean[:space_left]) + + # If there is still space, add compact global stats over the window + space_left = 200 - len(cob_features) + if space_left > 0: + flat = window_rows.reshape(-1) + if flat.size > 0: + global_mean = float(np.mean(flat)) + global_std = float(np.std(flat)) + global_max = float(np.max(flat)) + global_min = float(np.min(flat)) + global_stats = [global_mean, global_std, global_max, global_min] + cob_features.extend(global_stats[:space_left]) + except Exception: + # On any error, skip heatmap-derived features (remaining space will be zero-padded below) + pass + # Pad COB features to exactly 200 cob_features.extend([0.0] * (200 - len(cob_features))) features.extend(cob_features[:200]) # Ensure exactly 200 COB features diff --git a/core/data_provider.py b/core/data_provider.py index 30988c9..0c96e11 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -248,16 +248,16 @@ class DataProvider: # COB (Consolidated Order Book) data system using WebSocket self.cob_integration: Optional[COBIntegration] = None - # COB data storage - 15 minutes of raw ticks and 1s aggregated data - self.cob_raw_ticks: Dict[str, deque] = {} # Raw COB ticks (15 min) - self.cob_1s_aggregated: Dict[str, deque] = {} # 1s aggregated COB data with $1 buckets + # COB data storage - 30 minutes of raw ticks and 1s aggregated data + self.cob_raw_ticks: Dict[str, deque] = {} # Raw COB ticks (30 min) + self.cob_1s_aggregated: Dict[str, deque] = {} # 1s aggregated COB data with $1 buckets (30 min) # Initialize COB data structures for symbol in self.symbols: - # Raw ticks: 15 minutes at ~100 ticks/second = ~90,000 ticks - self.cob_raw_ticks[symbol] = deque(maxlen=90000) - # 1s aggregated: 15 minutes = 900 seconds - self.cob_1s_aggregated[symbol] = deque(maxlen=900) + # Raw ticks: ~100 ticks/second * 30 minutes = ~180,000 ticks + self.cob_raw_ticks[symbol] = deque(maxlen=180000) + # 1s aggregated: 30 minutes = 1,800 seconds + self.cob_1s_aggregated[symbol] = deque(maxlen=1800) # COB callbacks for data distribution self.cob_data_callbacks: List[Callable] = [] @@ -4317,7 +4317,10 @@ class DataProvider: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ws = loop.run_until_complete(get_huobi_cob_websocket(self.symbols)) - ws.add_cob_callback(lambda symbol, data: asyncio.run_coroutine_threadsafe(self._merge_huobi_tick(symbol, data), loop)) + # Register an async callback that runs in the same event loop + async def on_huobi(symbol, data): + await self._merge_huobi_tick(symbol, data) + ws.add_cob_callback(on_huobi) loop.run_forever() except Exception as he: logger.error(f"Error in Huobi WS thread: {he}") @@ -4340,32 +4343,50 @@ class DataProvider: latest = {} if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache and self.cob_data_cache[symbol]: latest = dict(self.cob_data_cache[symbol][-1]) - # Construct merged bids/asks limited depth - hb_bids = huobi_data.get('bids', []) or [] - hb_asks = huobi_data.get('asks', []) or [] - bn_bids = latest.get('bids', []) if isinstance(latest.get('bids'), list) else [] - bn_asks = latest.get('asks', []) if isinstance(latest.get('asks'), list) else [] + # Normalize levels to [price, size] lists + def to_pairs(levels): + pairs = [] + for lvl in levels or []: + try: + if isinstance(lvl, dict): + p = float(lvl.get('price', 0)); s = float(lvl.get('size', 0)) + if s > 0: + pairs.append([p, s]) + else: + # Assume [price, size] + p = float(lvl[0]); s = float(lvl[1]) + if s > 0: + pairs.append([p, s]) + except Exception: + continue + return pairs + + hb_bids = to_pairs(huobi_data.get('bids')) + hb_asks = to_pairs(huobi_data.get('asks')) + bn_bids = to_pairs(latest.get('bids')) + bn_asks = to_pairs(latest.get('asks')) + # Concatenate and re-sort with depth limit - merged_bids = (bn_bids + hb_bids)[:2000] - merged_asks = (bn_asks + hb_asks)[:2000] + merged_bids = (bn_bids + hb_bids) + merged_asks = (bn_asks + hb_asks) if merged_bids: - merged_bids.sort(key=lambda x: x['price'], reverse=True) + merged_bids.sort(key=lambda x: x[0], reverse=True) if merged_asks: - merged_asks.sort(key=lambda x: x['price']) + merged_asks.sort(key=lambda x: x[0]) merged_bids = merged_bids[:1000] merged_asks = merged_asks[:1000] # Stats from merged if merged_bids and merged_asks: - best_bid = merged_bids[0]['price'] - best_ask = merged_asks[0]['price'] + best_bid = merged_bids[0][0] + best_ask = merged_asks[0][0] mid = (best_bid + best_ask) / 2.0 spread = best_ask - best_bid spread_bps = (spread / mid) * 10000 if mid > 0 else 0 top_bids = merged_bids[:20] top_asks = merged_asks[:20] - bid_vol = sum(x['price'] * x['size'] for x in top_bids) - ask_vol = sum(x['price'] * x['size'] for x in top_asks) + bid_vol = sum(x[0] * x[1] for x in top_bids) + ask_vol = sum(x[0] * x[1] for x in top_asks) total = bid_vol + ask_vol merged_stats = { 'best_bid': best_bid, @@ -4383,9 +4404,9 @@ class DataProvider: # Create merged snapshot (preserve original + annotate source) merged = { 'symbol': symbol, - 'timestamp': time.time(), - 'bids': merged_bids, - 'asks': merged_asks, + 'timestamp': int(time.time() * 1000), # milliseconds to match rest of cache + 'bids': merged_bids[:200], # keep reasonable depth + 'asks': merged_asks[:200], 'stats': merged_stats, 'source': 'merged_ws', 'exchanges': ['binance', 'huobi'] @@ -4897,26 +4918,74 @@ class DataProvider: values: List[List[float]] = [] mids: List[float] = [] - latest = self.get_latest_cob_data(symbol) - if not latest or 'stats' not in latest: - return times, prices, values, mids - - mid = float(latest['stats'].get('mid_price', 0) or 0) - if mid <= 0: - return times, prices, values, mids - - bucket_size = 1.0 if 'ETH' in symbol else 10.0 - center = round(mid / bucket_size) * bucket_size - prices = [center + i * bucket_size for i in range(-bucket_radius, bucket_radius + 1)] - + # Build exactly 1 snapshot per second (most recent 'seconds' seconds), using cache with self.subscriber_lock: - cache_for_symbol = getattr(self, 'cob_data_cache', {}).get(symbol, []) - snapshots = list(cache_for_symbol[-seconds:]) if cache_for_symbol else [] + cache_map = getattr(self, 'cob_data_cache', {}) + cache_for_symbol = cache_map.get(symbol, []) + # Fallback: try alternate key without slash (e.g., ETHUSDT) + if not cache_for_symbol: + alt_key = symbol.replace('/', '').upper() + cache_for_symbol = cache_map.get(alt_key, []) + snapshots: List[dict] = [] + if cache_for_symbol: + # Walk backwards and pick the last snapshot per unique second + selected_by_sec: Dict[int, dict] = {} + for snap in reversed(cache_for_symbol): + ts = snap.get('timestamp') + if isinstance(ts, (int, float)): + sec = int(ts / 1000) if ts > 1e10 else int(ts) + if sec not in selected_by_sec: + selected_by_sec[sec] = snap + if len(selected_by_sec) >= seconds: + break + # Order by time ascending + for sec in sorted(selected_by_sec.keys()): + snapshots.append(selected_by_sec[sec]) + # If dedup by second produced nothing (unexpected), fallback to last N snapshots + if not snapshots: + snapshots = list(cache_for_symbol[-seconds:]) + + # If no snapshots, nothing to render + if not snapshots: + return times, prices, values, mids + + # Determine center price from the most recent valid snapshot in our selection + bucket_size = 1.0 if 'ETH' in symbol else 10.0 + center = 0.0 + for snap in reversed(snapshots): + try: + stats = snap.get('stats') or {} + center = float(stats.get('mid_price', 0) or 0) + if center <= 0: + # derive from best bid/ask + def first_price(level): + try: + return float(level.get('price')) if isinstance(level, dict) else float(level[0]) + except Exception: + return 0.0 + bids = snap.get('bids') or [] + asks = snap.get('asks') or [] + best_bid = max((first_price(b) for b in bids), default=0.0) + best_ask = min((first_price(a) for a in asks), default=0.0) + if best_bid > 0 and best_ask > 0: + center = (best_bid + best_ask) / 2.0 + if center > 0: + break + except Exception: + continue + + if center <= 0: + return times, prices, values, mids + + center = round(center / bucket_size) * bucket_size + prices = [center + i * bucket_size for i in range(-bucket_radius, bucket_radius + 1)] for snap in snapshots: ts_ms = snap.get('timestamp') if isinstance(ts_ms, (int, float)): - times.append(datetime.fromtimestamp(ts_ms / 1000.0)) + # Detect if ms or s + ts_s = ts_ms / 1000.0 if ts_ms > 1e10 else float(ts_ms) + times.append(datetime.fromtimestamp(ts_s)) else: times.append(datetime.utcnow()) @@ -4926,7 +4995,11 @@ class DataProvider: bucket_map: Dict[float, Dict[str, float]] = {} for level in bids[:200]: try: - price, size = float(level[0]), float(level[1]) + # Level can be [price, size] or {'price': p, 'size': s} + if isinstance(level, dict): + price = float(level.get('price', 0.0)); size = float(level.get('size', 0.0)) + else: + price, size = float(level[0]), float(level[1]) bp = round(price / bucket_size) * bucket_size if bp not in bucket_map: bucket_map[bp] = {'bid': 0.0, 'ask': 0.0} @@ -4935,7 +5008,10 @@ class DataProvider: continue for level in asks[:200]: try: - price, size = float(level[0]), float(level[1]) + if isinstance(level, dict): + price = float(level.get('price', 0.0)); size = float(level.get('size', 0.0)) + else: + price, size = float(level[0]), float(level[1]) bp = round(price / bucket_size) * bucket_size if bp not in bucket_map: bucket_map[bp] = {'bid': 0.0, 'ask': 0.0} @@ -4945,8 +5021,13 @@ class DataProvider: # Compute mid price for this snapshot try: - best_bid = max((float(b[0]) for b in bids), default=0.0) - best_ask = min((float(a[0]) for a in asks), default=0.0) + def first_price(level): + try: + return float(level.get('price')) if isinstance(level, dict) else float(level[0]) + except Exception: + return 0.0 + best_bid = max((first_price(b) for b in bids), default=0.0) + best_ask = min((first_price(a) for a in asks), default=0.0) if best_bid > 0 and best_ask > 0: mids.append((best_bid + best_ask) / 2.0) else: diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 121c311..7996298 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -1473,7 +1473,7 @@ class CleanTradingDashboard: def update_cob_heatmap_eth(n): """Render ETH COB 1s heatmap (±10 buckets, last 5 minutes).""" try: - # Unified heatmap source from provider + # Unified heatmap source from provider (show last 300s) times, prices, matrix = [], [], [] if hasattr(self.data_provider, 'get_cob_heatmap_matrix'): times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix('ETH/USDT', seconds=300, bucket_radius=10, metric='liquidity') @@ -1483,13 +1483,13 @@ class CleanTradingDashboard: fig.update_layout(margin=dict(l=10, r=10, t=20, b=10)) return fig z = np.array(matrix, dtype=float) - # Normalize per-column for visualization stability (visual only) + # Normalize per-time column (visual only) and transpose to [buckets x time] col_max = np.maximum(z.max(axis=0), 1e-9) - zn = z / col_max + zn = (z / col_max).T fig = go.Figure(data=go.Heatmap( - z=zn.T, - x=[t.strftime('%H:%M:%S') for t in times], - y=[f"{p:.2f}" for p in prices], + z=zn, + x=times, + y=prices, colorscale='Turbo', colorbar=dict(title='Norm'), zmin=0.0, @@ -1523,7 +1523,8 @@ class CleanTradingDashboard: fig.update_layout( title="ETH COB Heatmap (liquidity, per-bucket normalized)", xaxis_title="Time", - yaxis_title="Price", + yaxis_title="Price (USD)", + xaxis_type='date', margin=dict(l=10, r=10, t=30, b=10) ) return fig @@ -1540,7 +1541,7 @@ class CleanTradingDashboard: ) def update_cob_heatmap_btc(n): try: - # Unified heatmap source from provider + # Unified heatmap source from provider (show last 300s) times, prices, matrix = [], [], [] if hasattr(self.data_provider, 'get_cob_heatmap_matrix'): times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix('BTC/USDT', seconds=300, bucket_radius=10, metric='liquidity') @@ -1551,11 +1552,11 @@ class CleanTradingDashboard: return fig z = np.array(matrix, dtype=float) col_max = np.maximum(z.max(axis=0), 1e-9) - zn = z / col_max + zn = (z / col_max).T fig = go.Figure(data=go.Heatmap( - z=zn.T, - x=[t.strftime('%H:%M:%S') for t in times], - y=[f"{p:.2f}" for p in prices], + z=zn, + x=times, + y=prices, colorscale='Turbo', colorbar=dict(title='Norm'), zmin=0.0, @@ -1586,7 +1587,8 @@ class CleanTradingDashboard: fig.update_layout( title="BTC COB Heatmap (liquidity, per-bucket normalized)", xaxis_title="Time", - yaxis_title="Price", + yaxis_title="Price (USD)", + xaxis_type='date', margin=dict(l=10, r=10, t=30, b=10) ) return fig @@ -2189,7 +2191,20 @@ class CleanTradingDashboard: if symbol in self.current_prices and self.current_prices[symbol] > 0: return self.current_prices[symbol] - # Return None instead of hardcoded fallbacks - let the UI handle missing data + # FINAL FALLBACK: Use latest COB mid_price (real data) if available + try: + if hasattr(self.data_provider, 'get_latest_cob_data'): + snap = self.data_provider.get_latest_cob_data(symbol) + if snap and isinstance(snap.get('stats'), dict): + mid = float(snap['stats'].get('mid_price', 0) or 0) + if mid > 0: + self.current_prices[symbol] = mid + logger.info(f"Using COB mid_price as current price for {symbol}: ${mid:.2f}") + return mid + except Exception as _: + pass + + # Return None if absolutely nothing available return None def _create_price_chart(self, symbol: str) -> go.Figure: @@ -2422,50 +2437,7 @@ class CleanTradingDashboard: # Mini 1-second chart (if available) if has_mini_chart and ws_data_1s is not None: - # Overlay COB heatmap (up to 5 minutes) behind the 1s price line - try: - hm_seconds = 300 # 5 minutes - bucket_radius = 10 # ±10 buckets - # Prefer raw liquidity for visual; models can still use normalized features internally - times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix( - symbol=symbol, - seconds=hm_seconds, - bucket_radius=bucket_radius, - metric='liquidity' - ) - if times and prices and matrix: - # Align times to local tz-naive to match chart - try: - _local_tz = datetime.now().astimezone().tzinfo - except Exception: - _local_tz = None - x_vals = [] - for t in times: - try: - if hasattr(t, 'tzinfo') and t.tzinfo is not None: - tt = t.astimezone(_local_tz) if _local_tz else t - tt = tt.replace(tzinfo=None) - else: - tt = t - x_vals.append(tt) - except Exception: - x_vals.append(t) - # Plot heatmap beneath the 1s price line - fig.add_trace( - go.Heatmap( - x=x_vals, - y=prices, # numeric prices to share axis with 1s line - z=matrix, - colorscale='Turbo', - showscale=False, - zsmooth='best', - opacity=0.8, - name='COB Liquidity' - ), - row=2, col=1 - ) - except Exception as e: - logger.debug(f"COB heatmap overlay skipped: {e}") + # Removed COB heatmap overlay on the mini 1s chart per request # Align mini chart to local tz-naive try: if hasattr(ws_data_1s.index, 'tz') and ws_data_1s.index.tz is not None: