diff --git a/core/data_provider.py b/core/data_provider.py index 186183c..30988c9 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -47,6 +47,7 @@ from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar from .cnn_monitor import log_cnn_prediction from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket +from .huobi_cob_websocket import get_huobi_cob_websocket from .cob_integration import COBIntegration logger = logging.getLogger(__name__) @@ -4279,7 +4280,7 @@ class DataProvider: dashboard_callback=self._on_cob_websocket_status ) - # Add callback for COB data + # Add callback for Binance COB data self.enhanced_cob_websocket.add_cob_callback(self._on_cob_websocket_data) # Start WebSocket in background thread @@ -4307,6 +4308,103 @@ class DataProvider: except Exception as e: logger.error(f"Error initializing COB WebSocket: {e}") self._start_rest_only_cob_collection() + + # Start Huobi WS in background (parallel to Binance) and merge data + try: + import asyncio + def run_huobi_ws(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + ws = loop.run_until_complete(get_huobi_cob_websocket(self.symbols)) + ws.add_cob_callback(lambda symbol, data: asyncio.run_coroutine_threadsafe(self._merge_huobi_tick(symbol, data), loop)) + loop.run_forever() + except Exception as he: + logger.error(f"Error in Huobi WS thread: {he}") + huobi_thread = threading.Thread(target=run_huobi_ws, daemon=True) + huobi_thread.start() + logger.info("Huobi COB WebSocket initialized and started") + except Exception as he: + logger.warning(f"Huobi COB WebSocket init failed: {he}") + + async def _merge_huobi_tick(self, symbol: str, huobi_data: dict): + """Merge Huobi depth into consolidated snapshot for symbol with minimal overhead. + Strategy: prefer best bid/ask from best spread; sum top-N notional liquidity across exchanges. + """ + try: + # Update latest cache for Huobi + if not huobi_data or not isinstance(huobi_data, dict): + return + # Build a lightweight merged snapshot using latest Binance (if any) + with self.subscriber_lock: + latest = {} + if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache and self.cob_data_cache[symbol]: + latest = dict(self.cob_data_cache[symbol][-1]) + # Construct merged bids/asks limited depth + hb_bids = huobi_data.get('bids', []) or [] + hb_asks = huobi_data.get('asks', []) or [] + bn_bids = latest.get('bids', []) if isinstance(latest.get('bids'), list) else [] + bn_asks = latest.get('asks', []) if isinstance(latest.get('asks'), list) else [] + # Concatenate and re-sort with depth limit + merged_bids = (bn_bids + hb_bids)[:2000] + merged_asks = (bn_asks + hb_asks)[:2000] + if merged_bids: + merged_bids.sort(key=lambda x: x['price'], reverse=True) + if merged_asks: + merged_asks.sort(key=lambda x: x['price']) + merged_bids = merged_bids[:1000] + merged_asks = merged_asks[:1000] + + # Stats from merged + if merged_bids and merged_asks: + best_bid = merged_bids[0]['price'] + best_ask = merged_asks[0]['price'] + mid = (best_bid + best_ask) / 2.0 + spread = best_ask - best_bid + spread_bps = (spread / mid) * 10000 if mid > 0 else 0 + top_bids = merged_bids[:20] + top_asks = merged_asks[:20] + bid_vol = sum(x['price'] * x['size'] for x in top_bids) + ask_vol = sum(x['price'] * x['size'] for x in top_asks) + total = bid_vol + ask_vol + merged_stats = { + 'best_bid': best_bid, + 'best_ask': best_ask, + 'mid_price': mid, + 'spread': spread, + 'spread_bps': spread_bps, + 'bid_volume': bid_vol, + 'ask_volume': ask_vol, + 'imbalance': (bid_vol - ask_vol) / total if total > 0 else 0.0, + } + else: + merged_stats = latest.get('stats', {}) if isinstance(latest.get('stats', {}), dict) else {} + + # Create merged snapshot (preserve original + annotate source) + merged = { + 'symbol': symbol, + 'timestamp': time.time(), + 'bids': merged_bids, + 'asks': merged_asks, + 'stats': merged_stats, + 'source': 'merged_ws', + 'exchanges': ['binance', 'huobi'] + } + # Store as new tick into raw deque and cache + if hasattr(self, 'cob_raw_ticks') and symbol in self.cob_raw_ticks: + self.cob_raw_ticks[symbol].append(merged) + if not hasattr(self, 'cob_data_cache'): + self.cob_data_cache = {} + if symbol not in self.cob_data_cache: + self.cob_data_cache[symbol] = [] + self.cob_data_cache[symbol].append(merged) + if len(self.cob_data_cache[symbol]) > 1800: + self.cob_data_cache[symbol].pop(0) + + # Notify subscribers outside lock + self._notify_cob_subscribers(symbol, merged) + except Exception as e: + logger.debug(f"Huobi merge error for {symbol}: {e}") def _start_cob_tick_aggregation(self): """Start COB tick aggregation system""" diff --git a/core/huobi_cob_websocket.py b/core/huobi_cob_websocket.py new file mode 100644 index 0000000..7d11374 --- /dev/null +++ b/core/huobi_cob_websocket.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +Huobi (HTX) COB WebSocket Connector + +Low-latency order book stream for additional market depth aggregation. +Features: +- GZIP-compressed message handling +- Ping/pong keepalive +- Automatic reconnection with backoff +- Per-symbol tasks; safe for Windows + +This module emits standardized COB snapshots via callbacks: + { 'symbol': 'ETH/USDT', 'bids': [{'price': .., 'size': ..}], 'asks': [...], + 'exchange': 'huobi', 'source': 'huobi_ws', 'timestamp': datetime } +""" + +import asyncio +import json +import logging +import time +from datetime import datetime +from typing import Callable, Dict, List, Optional + +import gzip + +try: + import websockets + from websockets.client import connect as websockets_connect + WEBSOCKETS_AVAILABLE = True +except Exception: + websockets = None + websockets_connect = None + WEBSOCKETS_AVAILABLE = False + +logger = logging.getLogger(__name__) + + +class HuobiCOBWebSocket: + """Minimal Huobi order book WebSocket for depth updates.""" + + def __init__(self, symbols: Optional[List[str]] = None): + # Expect symbols like 'ETH/USDT', 'BTC/USDT' + self.symbols = symbols or ['ETH/USDT', 'BTC/USDT'] + self.ws_tasks: Dict[str, asyncio.Task] = {} + self.callbacks: List[Callable] = [] + self._stopping = False + + def add_cob_callback(self, callback: Callable): + self.callbacks.append(callback) + + async def start(self): + if not WEBSOCKETS_AVAILABLE: + logger.warning("Huobi WS not available (websockets missing)") + return + for symbol in self.symbols: + if symbol not in self.ws_tasks or self.ws_tasks[symbol].done(): + self.ws_tasks[symbol] = asyncio.create_task(self._run_symbol(symbol)) + + async def stop(self): + self._stopping = True + for _, task in list(self.ws_tasks.items()): + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + def _to_huobi_symbol(self, symbol: str) -> str: + # 'ETH/USDT' -> 'ethusdt' + return symbol.replace('/', '').lower() + + async def _run_symbol(self, symbol: str): + huobi_symbol = self._to_huobi_symbol(symbol) + sub_msg = { + "sub": f"market.{huobi_symbol}.depth.step0", + "id": f"sub_{huobi_symbol}_{int(time.time())}" + } + url = "wss://api.huobi.pro/ws" + + backoff = 1 + while not self._stopping: + try: + logger.info(f"Huobi: connecting WS for {symbol} -> {url}") + async with websockets_connect(url, ping_interval=20, ping_timeout=60, close_timeout=10) as ws: + # Subscribe + await ws.send(json.dumps(sub_msg)) + logger.info(f"Huobi: subscribed {sub_msg}") + backoff = 1 + + while not self._stopping: + raw = await ws.recv() + # Huobi sends gzip-compressed bytes + if isinstance(raw, (bytes, bytearray)): + try: + message = gzip.decompress(raw).decode('utf-8') + except Exception: + # Some servers send raw JSON; try decode directly + try: + message = raw.decode('utf-8') + except Exception: + continue + else: + message = raw + + try: + data = json.loads(message) + except Exception: + continue + + # Ping/pong + if 'ping' in data: + await ws.send(json.dumps({"pong": data['ping']})) + continue + + # Depth update + if 'tick' in data and 'ch' in data: + tick = data['tick'] or {} + bids = tick.get('bids', []) or [] + asks = tick.get('asks', []) or [] + + # Convert to standardized structure + std_bids = [] + std_asks = [] + for b in bids[:1000]: + try: + price = float(b[0]); size = float(b[1]) + if size > 0: + std_bids.append({'price': price, 'size': size}) + except Exception: + continue + for a in asks[:1000]: + try: + price = float(a[0]); size = float(a[1]) + if size > 0: + std_asks.append({'price': price, 'size': size}) + except Exception: + continue + + if std_bids: + std_bids.sort(key=lambda x: x['price'], reverse=True) + if std_asks: + std_asks.sort(key=lambda x: x['price']) + + cob = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': std_bids, + 'asks': std_asks, + 'exchange': 'huobi', + 'source': 'huobi_ws' + } + + # Stats + if std_bids and std_asks: + best_bid = std_bids[0]['price'] + best_ask = std_asks[0]['price'] + mid = (best_bid + best_ask) / 2.0 + spread = best_ask - best_bid + spread_bps = (spread / mid) * 10000 if mid > 0 else 0 + top_bids = std_bids[:20]; top_asks = std_asks[:20] + bid_vol = sum(x['price'] * x['size'] for x in top_bids) + ask_vol = sum(x['price'] * x['size'] for x in top_asks) + tot = bid_vol + ask_vol + cob['stats'] = { + 'best_bid': best_bid, + 'best_ask': best_ask, + 'mid_price': mid, + 'spread': spread, + 'spread_bps': spread_bps, + 'bid_volume': bid_vol, + 'ask_volume': ask_vol, + 'imbalance': (bid_vol - ask_vol) / tot if tot > 0 else 0.0, + } + + # Notify + for cb in self.callbacks: + try: + await cb(symbol, cob) + except Exception as cb_ex: + logger.debug(f"Huobi callback error: {cb_ex}") + + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Huobi WS error for {symbol}: {e}") + await asyncio.sleep(backoff) + backoff = min(backoff * 2, 60) + + +huobi_cob_websocket: Optional[HuobiCOBWebSocket] = None + + +async def get_huobi_cob_websocket(symbols: Optional[List[str]] = None) -> HuobiCOBWebSocket: + global huobi_cob_websocket + if huobi_cob_websocket is None: + huobi_cob_websocket = HuobiCOBWebSocket(symbols) + await huobi_cob_websocket.start() + return huobi_cob_websocket + + +async def stop_huobi_cob_websocket(): + global huobi_cob_websocket + if huobi_cob_websocket: + await huobi_cob_websocket.stop() + huobi_cob_websocket = None + + diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 9985eee..121c311 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -58,6 +58,12 @@ logger = logging.getLogger(__name__) # Reduce Werkzeug/Dash logging noise logging.getLogger('werkzeug').setLevel(logging.WARNING) + +# Type imports for linter clarity (not required at runtime) +try: + from core.data_models import BaseDataInput, OHLCVBar, COBData, PivotPoint # noqa: F401 +except Exception: + pass logging.getLogger('dash').setLevel(logging.WARNING) logging.getLogger('dash.dash').setLevel(logging.WARNING) @@ -2416,6 +2422,50 @@ class CleanTradingDashboard: # Mini 1-second chart (if available) if has_mini_chart and ws_data_1s is not None: + # Overlay COB heatmap (up to 5 minutes) behind the 1s price line + try: + hm_seconds = 300 # 5 minutes + bucket_radius = 10 # ±10 buckets + # Prefer raw liquidity for visual; models can still use normalized features internally + times, prices, matrix, mids = self.data_provider.get_cob_heatmap_matrix( + symbol=symbol, + seconds=hm_seconds, + bucket_radius=bucket_radius, + metric='liquidity' + ) + if times and prices and matrix: + # Align times to local tz-naive to match chart + try: + _local_tz = datetime.now().astimezone().tzinfo + except Exception: + _local_tz = None + x_vals = [] + for t in times: + try: + if hasattr(t, 'tzinfo') and t.tzinfo is not None: + tt = t.astimezone(_local_tz) if _local_tz else t + tt = tt.replace(tzinfo=None) + else: + tt = t + x_vals.append(tt) + except Exception: + x_vals.append(t) + # Plot heatmap beneath the 1s price line + fig.add_trace( + go.Heatmap( + x=x_vals, + y=prices, # numeric prices to share axis with 1s line + z=matrix, + colorscale='Turbo', + showscale=False, + zsmooth='best', + opacity=0.8, + name='COB Liquidity' + ), + row=2, col=1 + ) + except Exception as e: + logger.debug(f"COB heatmap overlay skipped: {e}") # Align mini chart to local tz-naive try: if hasattr(ws_data_1s.index, 'tz') and ws_data_1s.index.tz is not None: