COB heatmap WIP

This commit is contained in:
Dobromir Popov
2025-08-10 02:51:06 +03:00
parent 4ea2386d07
commit 1cc8509e87
3 changed files with 186 additions and 101 deletions

View File

@ -248,16 +248,16 @@ class DataProvider:
# COB (Consolidated Order Book) data system using WebSocket
self.cob_integration: Optional[COBIntegration] = None
# COB data storage - 15 minutes of raw ticks and 1s aggregated data
self.cob_raw_ticks: Dict[str, deque] = {} # Raw COB ticks (15 min)
self.cob_1s_aggregated: Dict[str, deque] = {} # 1s aggregated COB data with $1 buckets
# COB data storage - 30 minutes of raw ticks and 1s aggregated data
self.cob_raw_ticks: Dict[str, deque] = {} # Raw COB ticks (30 min)
self.cob_1s_aggregated: Dict[str, deque] = {} # 1s aggregated COB data with $1 buckets (30 min)
# Initialize COB data structures
for symbol in self.symbols:
# Raw ticks: 15 minutes at ~100 ticks/second = ~90,000 ticks
self.cob_raw_ticks[symbol] = deque(maxlen=90000)
# 1s aggregated: 15 minutes = 900 seconds
self.cob_1s_aggregated[symbol] = deque(maxlen=900)
# Raw ticks: ~100 ticks/second * 30 minutes = ~180,000 ticks
self.cob_raw_ticks[symbol] = deque(maxlen=180000)
# 1s aggregated: 30 minutes = 1,800 seconds
self.cob_1s_aggregated[symbol] = deque(maxlen=1800)
# COB callbacks for data distribution
self.cob_data_callbacks: List[Callable] = []
@ -4317,7 +4317,10 @@ class DataProvider:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ws = loop.run_until_complete(get_huobi_cob_websocket(self.symbols))
ws.add_cob_callback(lambda symbol, data: asyncio.run_coroutine_threadsafe(self._merge_huobi_tick(symbol, data), loop))
# Register an async callback that runs in the same event loop
async def on_huobi(symbol, data):
await self._merge_huobi_tick(symbol, data)
ws.add_cob_callback(on_huobi)
loop.run_forever()
except Exception as he:
logger.error(f"Error in Huobi WS thread: {he}")
@ -4340,32 +4343,50 @@ class DataProvider:
latest = {}
if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache and self.cob_data_cache[symbol]:
latest = dict(self.cob_data_cache[symbol][-1])
# Construct merged bids/asks limited depth
hb_bids = huobi_data.get('bids', []) or []
hb_asks = huobi_data.get('asks', []) or []
bn_bids = latest.get('bids', []) if isinstance(latest.get('bids'), list) else []
bn_asks = latest.get('asks', []) if isinstance(latest.get('asks'), list) else []
# Normalize levels to [price, size] lists
def to_pairs(levels):
pairs = []
for lvl in levels or []:
try:
if isinstance(lvl, dict):
p = float(lvl.get('price', 0)); s = float(lvl.get('size', 0))
if s > 0:
pairs.append([p, s])
else:
# Assume [price, size]
p = float(lvl[0]); s = float(lvl[1])
if s > 0:
pairs.append([p, s])
except Exception:
continue
return pairs
hb_bids = to_pairs(huobi_data.get('bids'))
hb_asks = to_pairs(huobi_data.get('asks'))
bn_bids = to_pairs(latest.get('bids'))
bn_asks = to_pairs(latest.get('asks'))
# Concatenate and re-sort with depth limit
merged_bids = (bn_bids + hb_bids)[:2000]
merged_asks = (bn_asks + hb_asks)[:2000]
merged_bids = (bn_bids + hb_bids)
merged_asks = (bn_asks + hb_asks)
if merged_bids:
merged_bids.sort(key=lambda x: x['price'], reverse=True)
merged_bids.sort(key=lambda x: x[0], reverse=True)
if merged_asks:
merged_asks.sort(key=lambda x: x['price'])
merged_asks.sort(key=lambda x: x[0])
merged_bids = merged_bids[:1000]
merged_asks = merged_asks[:1000]
# Stats from merged
if merged_bids and merged_asks:
best_bid = merged_bids[0]['price']
best_ask = merged_asks[0]['price']
best_bid = merged_bids[0][0]
best_ask = merged_asks[0][0]
mid = (best_bid + best_ask) / 2.0
spread = best_ask - best_bid
spread_bps = (spread / mid) * 10000 if mid > 0 else 0
top_bids = merged_bids[:20]
top_asks = merged_asks[:20]
bid_vol = sum(x['price'] * x['size'] for x in top_bids)
ask_vol = sum(x['price'] * x['size'] for x in top_asks)
bid_vol = sum(x[0] * x[1] for x in top_bids)
ask_vol = sum(x[0] * x[1] for x in top_asks)
total = bid_vol + ask_vol
merged_stats = {
'best_bid': best_bid,
@ -4383,9 +4404,9 @@ class DataProvider:
# Create merged snapshot (preserve original + annotate source)
merged = {
'symbol': symbol,
'timestamp': time.time(),
'bids': merged_bids,
'asks': merged_asks,
'timestamp': int(time.time() * 1000), # milliseconds to match rest of cache
'bids': merged_bids[:200], # keep reasonable depth
'asks': merged_asks[:200],
'stats': merged_stats,
'source': 'merged_ws',
'exchanges': ['binance', 'huobi']
@ -4897,26 +4918,74 @@ class DataProvider:
values: List[List[float]] = []
mids: List[float] = []
latest = self.get_latest_cob_data(symbol)
if not latest or 'stats' not in latest:
return times, prices, values, mids
mid = float(latest['stats'].get('mid_price', 0) or 0)
if mid <= 0:
return times, prices, values, mids
bucket_size = 1.0 if 'ETH' in symbol else 10.0
center = round(mid / bucket_size) * bucket_size
prices = [center + i * bucket_size for i in range(-bucket_radius, bucket_radius + 1)]
# Build exactly 1 snapshot per second (most recent 'seconds' seconds), using cache
with self.subscriber_lock:
cache_for_symbol = getattr(self, 'cob_data_cache', {}).get(symbol, [])
snapshots = list(cache_for_symbol[-seconds:]) if cache_for_symbol else []
cache_map = getattr(self, 'cob_data_cache', {})
cache_for_symbol = cache_map.get(symbol, [])
# Fallback: try alternate key without slash (e.g., ETHUSDT)
if not cache_for_symbol:
alt_key = symbol.replace('/', '').upper()
cache_for_symbol = cache_map.get(alt_key, [])
snapshots: List[dict] = []
if cache_for_symbol:
# Walk backwards and pick the last snapshot per unique second
selected_by_sec: Dict[int, dict] = {}
for snap in reversed(cache_for_symbol):
ts = snap.get('timestamp')
if isinstance(ts, (int, float)):
sec = int(ts / 1000) if ts > 1e10 else int(ts)
if sec not in selected_by_sec:
selected_by_sec[sec] = snap
if len(selected_by_sec) >= seconds:
break
# Order by time ascending
for sec in sorted(selected_by_sec.keys()):
snapshots.append(selected_by_sec[sec])
# If dedup by second produced nothing (unexpected), fallback to last N snapshots
if not snapshots:
snapshots = list(cache_for_symbol[-seconds:])
# If no snapshots, nothing to render
if not snapshots:
return times, prices, values, mids
# Determine center price from the most recent valid snapshot in our selection
bucket_size = 1.0 if 'ETH' in symbol else 10.0
center = 0.0
for snap in reversed(snapshots):
try:
stats = snap.get('stats') or {}
center = float(stats.get('mid_price', 0) or 0)
if center <= 0:
# derive from best bid/ask
def first_price(level):
try:
return float(level.get('price')) if isinstance(level, dict) else float(level[0])
except Exception:
return 0.0
bids = snap.get('bids') or []
asks = snap.get('asks') or []
best_bid = max((first_price(b) for b in bids), default=0.0)
best_ask = min((first_price(a) for a in asks), default=0.0)
if best_bid > 0 and best_ask > 0:
center = (best_bid + best_ask) / 2.0
if center > 0:
break
except Exception:
continue
if center <= 0:
return times, prices, values, mids
center = round(center / bucket_size) * bucket_size
prices = [center + i * bucket_size for i in range(-bucket_radius, bucket_radius + 1)]
for snap in snapshots:
ts_ms = snap.get('timestamp')
if isinstance(ts_ms, (int, float)):
times.append(datetime.fromtimestamp(ts_ms / 1000.0))
# Detect if ms or s
ts_s = ts_ms / 1000.0 if ts_ms > 1e10 else float(ts_ms)
times.append(datetime.fromtimestamp(ts_s))
else:
times.append(datetime.utcnow())
@ -4926,7 +4995,11 @@ class DataProvider:
bucket_map: Dict[float, Dict[str, float]] = {}
for level in bids[:200]:
try:
price, size = float(level[0]), float(level[1])
# Level can be [price, size] or {'price': p, 'size': s}
if isinstance(level, dict):
price = float(level.get('price', 0.0)); size = float(level.get('size', 0.0))
else:
price, size = float(level[0]), float(level[1])
bp = round(price / bucket_size) * bucket_size
if bp not in bucket_map:
bucket_map[bp] = {'bid': 0.0, 'ask': 0.0}
@ -4935,7 +5008,10 @@ class DataProvider:
continue
for level in asks[:200]:
try:
price, size = float(level[0]), float(level[1])
if isinstance(level, dict):
price = float(level.get('price', 0.0)); size = float(level.get('size', 0.0))
else:
price, size = float(level[0]), float(level[1])
bp = round(price / bucket_size) * bucket_size
if bp not in bucket_map:
bucket_map[bp] = {'bid': 0.0, 'ask': 0.0}
@ -4945,8 +5021,13 @@ class DataProvider:
# Compute mid price for this snapshot
try:
best_bid = max((float(b[0]) for b in bids), default=0.0)
best_ask = min((float(a[0]) for a in asks), default=0.0)
def first_price(level):
try:
return float(level.get('price')) if isinstance(level, dict) else float(level[0])
except Exception:
return 0.0
best_bid = max((first_price(b) for b in bids), default=0.0)
best_ask = min((first_price(a) for a in asks), default=0.0)
if best_bid > 0 and best_ask > 0:
mids.append((best_bid + best_ask) / 2.0)
else: