added Huobi HTX data provider
This commit is contained in:
@ -47,6 +47,7 @@ from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
|
||||
from .cnn_monitor import log_cnn_prediction
|
||||
from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel
|
||||
from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket
|
||||
from .huobi_cob_websocket import get_huobi_cob_websocket
|
||||
from .cob_integration import COBIntegration
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -4279,7 +4280,7 @@ class DataProvider:
|
||||
dashboard_callback=self._on_cob_websocket_status
|
||||
)
|
||||
|
||||
# Add callback for COB data
|
||||
# Add callback for Binance COB data
|
||||
self.enhanced_cob_websocket.add_cob_callback(self._on_cob_websocket_data)
|
||||
|
||||
# Start WebSocket in background thread
|
||||
@ -4307,6 +4308,103 @@ class DataProvider:
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing COB WebSocket: {e}")
|
||||
self._start_rest_only_cob_collection()
|
||||
|
||||
# Start Huobi WS in background (parallel to Binance) and merge data
|
||||
try:
|
||||
import asyncio
|
||||
def run_huobi_ws():
|
||||
try:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
ws = loop.run_until_complete(get_huobi_cob_websocket(self.symbols))
|
||||
ws.add_cob_callback(lambda symbol, data: asyncio.run_coroutine_threadsafe(self._merge_huobi_tick(symbol, data), loop))
|
||||
loop.run_forever()
|
||||
except Exception as he:
|
||||
logger.error(f"Error in Huobi WS thread: {he}")
|
||||
huobi_thread = threading.Thread(target=run_huobi_ws, daemon=True)
|
||||
huobi_thread.start()
|
||||
logger.info("Huobi COB WebSocket initialized and started")
|
||||
except Exception as he:
|
||||
logger.warning(f"Huobi COB WebSocket init failed: {he}")
|
||||
|
||||
async def _merge_huobi_tick(self, symbol: str, huobi_data: dict):
|
||||
"""Merge Huobi depth into consolidated snapshot for symbol with minimal overhead.
|
||||
Strategy: prefer best bid/ask from best spread; sum top-N notional liquidity across exchanges.
|
||||
"""
|
||||
try:
|
||||
# Update latest cache for Huobi
|
||||
if not huobi_data or not isinstance(huobi_data, dict):
|
||||
return
|
||||
# Build a lightweight merged snapshot using latest Binance (if any)
|
||||
with self.subscriber_lock:
|
||||
latest = {}
|
||||
if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache and self.cob_data_cache[symbol]:
|
||||
latest = dict(self.cob_data_cache[symbol][-1])
|
||||
# Construct merged bids/asks limited depth
|
||||
hb_bids = huobi_data.get('bids', []) or []
|
||||
hb_asks = huobi_data.get('asks', []) or []
|
||||
bn_bids = latest.get('bids', []) if isinstance(latest.get('bids'), list) else []
|
||||
bn_asks = latest.get('asks', []) if isinstance(latest.get('asks'), list) else []
|
||||
# Concatenate and re-sort with depth limit
|
||||
merged_bids = (bn_bids + hb_bids)[:2000]
|
||||
merged_asks = (bn_asks + hb_asks)[:2000]
|
||||
if merged_bids:
|
||||
merged_bids.sort(key=lambda x: x['price'], reverse=True)
|
||||
if merged_asks:
|
||||
merged_asks.sort(key=lambda x: x['price'])
|
||||
merged_bids = merged_bids[:1000]
|
||||
merged_asks = merged_asks[:1000]
|
||||
|
||||
# Stats from merged
|
||||
if merged_bids and merged_asks:
|
||||
best_bid = merged_bids[0]['price']
|
||||
best_ask = merged_asks[0]['price']
|
||||
mid = (best_bid + best_ask) / 2.0
|
||||
spread = best_ask - best_bid
|
||||
spread_bps = (spread / mid) * 10000 if mid > 0 else 0
|
||||
top_bids = merged_bids[:20]
|
||||
top_asks = merged_asks[:20]
|
||||
bid_vol = sum(x['price'] * x['size'] for x in top_bids)
|
||||
ask_vol = sum(x['price'] * x['size'] for x in top_asks)
|
||||
total = bid_vol + ask_vol
|
||||
merged_stats = {
|
||||
'best_bid': best_bid,
|
||||
'best_ask': best_ask,
|
||||
'mid_price': mid,
|
||||
'spread': spread,
|
||||
'spread_bps': spread_bps,
|
||||
'bid_volume': bid_vol,
|
||||
'ask_volume': ask_vol,
|
||||
'imbalance': (bid_vol - ask_vol) / total if total > 0 else 0.0,
|
||||
}
|
||||
else:
|
||||
merged_stats = latest.get('stats', {}) if isinstance(latest.get('stats', {}), dict) else {}
|
||||
|
||||
# Create merged snapshot (preserve original + annotate source)
|
||||
merged = {
|
||||
'symbol': symbol,
|
||||
'timestamp': time.time(),
|
||||
'bids': merged_bids,
|
||||
'asks': merged_asks,
|
||||
'stats': merged_stats,
|
||||
'source': 'merged_ws',
|
||||
'exchanges': ['binance', 'huobi']
|
||||
}
|
||||
# Store as new tick into raw deque and cache
|
||||
if hasattr(self, 'cob_raw_ticks') and symbol in self.cob_raw_ticks:
|
||||
self.cob_raw_ticks[symbol].append(merged)
|
||||
if not hasattr(self, 'cob_data_cache'):
|
||||
self.cob_data_cache = {}
|
||||
if symbol not in self.cob_data_cache:
|
||||
self.cob_data_cache[symbol] = []
|
||||
self.cob_data_cache[symbol].append(merged)
|
||||
if len(self.cob_data_cache[symbol]) > 1800:
|
||||
self.cob_data_cache[symbol].pop(0)
|
||||
|
||||
# Notify subscribers outside lock
|
||||
self._notify_cob_subscribers(symbol, merged)
|
||||
except Exception as e:
|
||||
logger.debug(f"Huobi merge error for {symbol}: {e}")
|
||||
|
||||
def _start_cob_tick_aggregation(self):
|
||||
"""Start COB tick aggregation system"""
|
||||
|
Reference in New Issue
Block a user