Files
gogo2/core/multi_exchange_cob_provider.py
Dobromir Popov fb72c93743 stability
2025-07-28 12:10:52 +03:00

1864 lines
87 KiB
Python

"""
Multi-Exchange Consolidated Order Book (COB) Data Provider
This module aggregates order book data from multiple cryptocurrency exchanges to provide:
- Consolidated Order Book (COB) data across multiple exchanges
- Fine-grain volume buckets at configurable price levels
- Real-time order book depth aggregation
- Volume-weighted consolidated pricing
- Exchange-specific order flow analysis
- Liquidity distribution metrics
Supported Exchanges:
- Binance (via WebSocket depth streams)
- Coinbase Pro (via WebSocket level2 updates)
- Kraken (via WebSocket book updates)
- Huobi (via WebSocket mbp updates)
- Bitfinex (via WebSocket book updates)
Data is structured for consumption by CNN/DQN models and trading dashboards.
"""
import asyncio
import json
import logging
import time
try:
import websockets
from websockets.client import connect as websockets_connect
except ImportError:
# Fallback for environments where websockets is not available
websockets = None
websockets_connect = None
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Callable, Union, Awaitable
from collections import deque, defaultdict
from dataclasses import dataclass, field
from threading import Thread, Lock
import requests
import ccxt
from enum import Enum
import math
import aiohttp
import aiohttp.resolver
logger = logging.getLogger(__name__)
class SimpleRateLimiter:
"""Simple rate limiter to prevent 418 errors"""
def __init__(self, requests_per_second: float = 0.5): # Much more conservative
self.requests_per_second = requests_per_second
self.last_request_time = 0
self.min_interval = 1.0 / requests_per_second
self.consecutive_errors = 0
self.blocked_until = 0
def can_make_request(self) -> bool:
"""Check if we can make a request"""
now = time.time()
# Check if we're in a blocked state
if now < self.blocked_until:
return False
return (now - self.last_request_time) >= self.min_interval
def record_request(self, success: bool = True):
"""Record that a request was made"""
self.last_request_time = time.time()
if success:
self.consecutive_errors = 0
else:
self.consecutive_errors += 1
# Exponential backoff for errors
if self.consecutive_errors >= 3:
backoff_time = min(300, 10 * (2 ** (self.consecutive_errors - 3))) # Max 5 min
self.blocked_until = time.time() + backoff_time
logger.warning(f"Rate limiter blocked for {backoff_time}s after {self.consecutive_errors} errors")
def get_wait_time(self) -> float:
"""Get time to wait before next request"""
now = time.time()
# Check if blocked
if now < self.blocked_until:
return self.blocked_until - now
time_since_last = now - self.last_request_time
if time_since_last < self.min_interval:
return self.min_interval - time_since_last
return 0.0
class ExchangeType(Enum):
BINANCE = "binance"
COINBASE = "coinbase"
KRAKEN = "kraken"
HUOBI = "huobi"
BITFINEX = "bitfinex"
@dataclass
class ExchangeOrderBookLevel:
"""Single order book level with exchange attribution"""
exchange: str
price: float
size: float
volume_usd: float
orders_count: int
side: str # 'bid' or 'ask'
timestamp: datetime
raw_data: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ConsolidatedOrderBookLevel:
"""Consolidated order book level across multiple exchanges"""
price: float
total_size: float
total_volume_usd: float
total_orders: int
side: str
exchange_breakdown: Dict[str, ExchangeOrderBookLevel]
dominant_exchange: str
liquidity_score: float
timestamp: datetime
@dataclass
class COBSnapshot:
"""Complete Consolidated Order Book snapshot"""
symbol: str
timestamp: datetime
consolidated_bids: List[ConsolidatedOrderBookLevel]
consolidated_asks: List[ConsolidatedOrderBookLevel]
exchanges_active: List[str]
volume_weighted_mid: float
total_bid_liquidity: float
total_ask_liquidity: float
spread_bps: float
liquidity_imbalance: float
price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets
@dataclass
class ExchangeConfig:
"""Exchange configuration for COB aggregation"""
exchange_type: ExchangeType
weight: float = 1.0
enabled: bool = True
websocket_url: str = ""
rest_api_url: str = ""
symbols_mapping: Dict[str, str] = field(default_factory=dict)
rate_limits: Dict[str, int] = field(default_factory=dict)
class MultiExchangeCOBProvider:
"""
Multi-Exchange Consolidated Order Book Data Provider
Aggregates real-time order book data from multiple cryptocurrency exchanges
to create a consolidated view of market liquidity and pricing.
"""
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
"""Initialize multi-exchange COB provider"""
self.symbols = symbols
self.exchange_configs = exchange_configs
self.active_exchanges = ['binance'] # Focus on Binance for now
self.is_streaming = False
self.cob_data_cache = {} # Cache for COB data
self.cob_subscribers = [] # List of callback functions
# Initialize missing attributes that are used throughout the code
self.current_order_book = {} # Current order book data per symbol
self.realtime_snapshots = defaultdict(list) # Real-time snapshots per symbol
self.cob_update_callbacks = [] # COB update callbacks
self.data_lock = asyncio.Lock() # Lock for thread-safe data access
self.consolidation_stats = defaultdict(lambda: {
'total_updates': 0,
'active_price_levels': 0,
'total_liquidity_usd': 0.0
})
self.fixed_usd_buckets = {} # Fixed USD bucket sizes per symbol
self.bucket_size_bps = 10 # Default bucket size in basis points
# Rate limiting for REST API fallback
self.last_rest_api_call = 0
self.rest_api_call_count = 0
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
def subscribe_to_cob_updates(self, callback):
"""Subscribe to COB data updates"""
self.cob_subscribers.append(callback)
logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}")
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
"""Notify all subscribers of COB data updates"""
try:
for callback in self.cob_subscribers:
try:
if asyncio.iscoroutinefunction(callback):
await callback(symbol, cob_snapshot)
else:
callback(symbol, cob_snapshot)
except Exception as e:
logger.error(f"Error in COB subscriber callback: {e}")
except Exception as e:
logger.error(f"Error notifying COB subscribers: {e}")
async def start_streaming(self):
"""Start real-time order book streaming from all configured exchanges using only WebSocket"""
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
self.is_streaming = True
# Setup aiohttp session here, within the async context
await self._setup_http_session()
# Start WebSocket connections for each active exchange and symbol
tasks = []
for symbol in self.symbols:
for exchange_name, config in self.exchange_configs.items():
if config.enabled and exchange_name in self.active_exchanges:
if exchange_name == 'binance':
# Enhanced Binance WebSocket streams (NO REST API)
# 1. Partial depth stream (20 levels, 100ms updates) - for real-time updates
tasks.append(self._stream_binance_orderbook(symbol, config))
# 2. Full depth stream (1000 levels, 1000ms updates) - replaces REST API
tasks.append(self._stream_binance_full_depth(symbol))
# 3. Trade stream for order flow analysis
tasks.append(self._stream_binance_trades(symbol))
# 4. Book ticker stream for best bid/ask real-time
tasks.append(self._stream_binance_book_ticker(symbol))
# 5. Aggregate trade stream for large order detection
tasks.append(self._stream_binance_agg_trades(symbol))
else:
# Other exchanges - WebSocket only
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
# Start continuous consolidation and bucket updates
tasks.append(self._continuous_consolidation())
tasks.append(self._continuous_bucket_updates())
logger.info(f"Starting {len(tasks)} COB streaming tasks (WebSocket only - NO REST API)")
await asyncio.gather(*tasks)
async def _setup_http_session(self):
"""Setup aiohttp session and connector"""
self.connector = aiohttp.TCPConnector(
resolver=aiohttp.ThreadedResolver() # This is now created inside async function
)
self.session = aiohttp.ClientSession(connector=self.connector)
self.rest_session = aiohttp.ClientSession(connector=self.connector) # Moved here from __init__
logger.info("aiohttp session and connector setup completed")
async def stop_streaming(self):
"""Stop real-time order book streaming and close sessions"""
logger.info("Stopping COB Integration")
self.is_streaming = False
if self.session and not self.session.closed:
await self.session.close()
logger.info("aiohttp session closed")
if self.rest_session and not self.rest_session.closed:
await self.rest_session.close()
logger.info("aiohttp REST session closed")
if self.connector and not self.connector.closed:
await self.connector.close()
logger.info("aiohttp connector closed")
logger.info("COB Integration stopped")
async def _stream_deep_orderbook(self, exchange_name: str, symbol: str):
"""Fetch deep order book data via REST API periodically"""
while self.is_streaming:
try:
start_time = time.time()
if exchange_name == 'binance':
await self._fetch_binance_deep_orderbook(symbol)
# Add other exchanges here as needed
processing_time = (time.time() - start_time) * 1000
self.processing_times['rest_api'].append(processing_time)
logger.debug(f"Deep order book fetch for {symbol} took {processing_time:.2f}ms")
# Wait before next fetch
await asyncio.sleep(self.rest_api_frequency / 1000)
except Exception as e:
logger.error(f"Error fetching deep order book for {symbol}: {e}")
await asyncio.sleep(5) # Wait 5 seconds on error
async def _fetch_binance_deep_orderbook(self, symbol: str):
"""Fetch deep order book from Binance REST API with rate limiting"""
try:
if not self.rest_session:
return
# Check rate limiter before making request
if not self.rest_rate_limiter.can_make_request():
wait_time = self.rest_rate_limiter.get_wait_time()
if wait_time > 0:
logger.debug(f"Rate limited, waiting {wait_time:.1f}s before {symbol} request")
await asyncio.sleep(wait_time)
return # Skip this cycle
# Convert symbol format for Binance
binance_symbol = symbol.replace('/', '').upper()
url = f"https://api.binance.com/api/v3/depth"
params = {
'symbol': binance_symbol,
'limit': self.rest_depth_limit
}
# Add headers to reduce detection
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
}
async with self.rest_session.get(url, params=params, headers=headers) as response:
if response.status == 200:
data = await response.json()
await self._process_binance_deep_orderbook(symbol, data)
self.rest_rate_limiter.record_request() # Record successful request
elif response.status in [418, 429, 451]:
logger.warning(f"Binance REST API rate limited (HTTP {response.status}) for {symbol}")
# Increase wait time for next request
await asyncio.sleep(10) # Wait 10 seconds on rate limit
else:
logger.error(f"Binance REST API error {response.status} for {symbol}")
except Exception as e:
logger.error(f"Error fetching Binance deep order book for {symbol}: {e}")
async def _process_binance_deep_orderbook(self, symbol: str, data: Dict):
"""Process deep order book data from Binance REST API"""
try:
timestamp = datetime.now()
exchange_name = 'binance'
# Parse deep bids and asks
deep_bids = {}
deep_asks = {}
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if size > 0:
deep_bids[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=timestamp
)
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if size > 0:
deep_asks[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=timestamp
)
# Update deep order book storage
async with self.data_lock:
self.exchange_order_books[symbol][exchange_name]['deep_bids'] = deep_bids
self.exchange_order_books[symbol][exchange_name]['deep_asks'] = deep_asks
self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp
self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId')
logger.debug(f"Updated deep order book for {symbol}: {len(deep_bids)} bids, {len(deep_asks)} asks")
except Exception as e:
logger.error(f"Error processing deep order book for {symbol}: {e}")
async def _stream_exchange_orderbook(self, exchange_name: str, symbol: str):
"""Stream order book data from specific exchange"""
config = self.exchange_configs[exchange_name]
try:
if exchange_name == ExchangeType.BINANCE.value:
await self._stream_binance_orderbook(symbol, config)
elif exchange_name == ExchangeType.COINBASE.value:
await self._stream_coinbase_orderbook(symbol, config)
elif exchange_name == ExchangeType.KRAKEN.value:
await self._stream_kraken_orderbook(symbol, config)
elif exchange_name == ExchangeType.HUOBI.value:
await self._stream_huobi_orderbook(symbol, config)
elif exchange_name == ExchangeType.BITFINEX.value:
await self._stream_bitfinex_orderbook(symbol, config)
except Exception as e:
logger.error(f"Error streaming {exchange_name} for {symbol}: {e}")
await asyncio.sleep(5) # Wait before reconnecting
async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream order book data from Binance"""
try:
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms"
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
self.exchange_order_books[symbol]['binance']['connected'] = True
logger.info(f"Connected to Binance order book stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_orderbook(symbol, data)
# Also track trades for SVP
await self._track_binance_trades(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance message: {e}")
except Exception as e:
logger.error(f"Error processing Binance data: {e}")
except Exception as e:
logger.error(f"Binance WebSocket error for {symbol}: {e}")
finally:
self.exchange_order_books[symbol]['binance']['connected'] = False
logger.info(f"Disconnected from Binance order book stream for {symbol}")
async def _track_binance_trades(self, symbol: str, data: Dict):
"""Track executed trades from Binance for SVP calculation"""
try:
# Binance depth stream doesn't include trades, so we need to connect to trade stream
if 'e' in data and data['e'] == 'trade':
trade = {
'exchange': 'binance',
'symbol': symbol,
'price': float(data['p']),
'quantity': float(data['q']),
'side': 'buy' if data['m'] else 'sell', # m is true for maker sell
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
'volume_usd': float(data['p']) * float(data['q'])
}
await self._add_trade_to_svp(symbol, trade)
except Exception as e:
logger.error(f"Error tracking Binance trade: {e}")
async def _add_trade_to_svp(self, symbol: str, trade: Dict):
"""Add trade to session volume profile"""
try:
async with self.data_lock:
# Add to session trades
self.session_trades[symbol].append(trade)
# Update SVP cache
price = trade['price']
side = trade['side']
volume = trade['volume_usd']
if price not in self.svp_cache[symbol]:
self.svp_cache[symbol][price] = {'buy_volume': 0.0, 'sell_volume': 0.0}
if side == 'buy':
self.svp_cache[symbol][price]['buy_volume'] += volume
else:
self.svp_cache[symbol][price]['sell_volume'] += volume
# Keep only recent trades (last 24 hours)
cutoff_time = datetime.now() - timedelta(hours=24)
self.session_trades[symbol] = [
t for t in self.session_trades[symbol]
if t['timestamp'] > cutoff_time
]
except Exception as e:
logger.error(f"Error adding trade to SVP: {e}")
def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict:
"""Get session volume profile for a symbol"""
try:
if bucket_size is None:
bucket_size = self.fixed_usd_buckets.get(symbol, 1.0)
svp_data = {}
# Access SVP cache without lock for read-only operations (generally safe)
try:
for price, volumes in self.svp_cache[symbol].items():
bucket_price = math.floor(price / bucket_size) * bucket_size
if bucket_price not in svp_data:
svp_data[bucket_price] = {
'buy_volume': 0.0,
'sell_volume': 0.0,
'total_volume': 0.0,
'trade_count': 0
}
svp_data[bucket_price]['buy_volume'] += volumes['buy_volume']
svp_data[bucket_price]['sell_volume'] += volumes['sell_volume']
svp_data[bucket_price]['total_volume'] += volumes['buy_volume'] + volumes['sell_volume']
svp_data[bucket_price]['trade_count'] += 1
except Exception as e:
logger.error(f"Error accessing SVP cache for {symbol}: {e}")
return {}
# Convert to sorted list
svp_list = []
for price in sorted(svp_data.keys()):
data = svp_data[price]
if data['total_volume'] > 0:
svp_list.append({
'price': price,
'buy_volume': data['buy_volume'],
'sell_volume': data['sell_volume'],
'total_volume': data['total_volume'],
'trade_count': data['trade_count'],
'buy_percent': (data['buy_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0,
'sell_percent': (data['sell_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0
})
return {
'symbol': symbol,
'session_start': self.session_start_time.isoformat(),
'bucket_size': bucket_size,
'data': svp_list
}
except Exception as e:
logger.error(f"Error getting session volume profile for {symbol}: {e}")
return {}
async def _process_binance_orderbook(self, symbol: str, data: Dict):
"""Process Binance order book update"""
try:
timestamp = datetime.now()
exchange_name = ExchangeType.BINANCE.value
# Parse bids and asks
bids = {}
asks = {}
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if size > 0: # Only include non-zero sizes
bids[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=timestamp
)
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if size > 0:
asks[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=timestamp
)
# Update exchange order book
async with self.data_lock:
self.exchange_order_books[symbol][exchange_name].update({
'bids': bids,
'asks': asks,
'timestamp': timestamp,
'connected': True
})
logger.debug(f"Updated Binance order book for {symbol}: {len(bids)} bids, {len(asks)} asks")
self.exchange_update_counts[exchange_name] += 1
# Log every 1000th update
if self.exchange_update_counts[exchange_name] % 1000 == 0:
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}")
except Exception as e:
logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True)
async def _process_coinbase_orderbook(self, symbol: str, data: Dict):
"""Process Coinbase order book data"""
try:
if data.get('type') == 'snapshot':
# Initial snapshot
bids = {}
asks = {}
for bid_data in data.get('bids', []):
price, size = float(bid_data[0]), float(bid_data[1])
if size > 0:
bids[price] = ExchangeOrderBookLevel(
exchange='coinbase',
price=price,
size=size,
volume_usd=price * size,
orders_count=1, # Coinbase doesn't provide order count
side='bid',
timestamp=datetime.now(),
raw_data=bid_data
)
for ask_data in data.get('asks', []):
price, size = float(ask_data[0]), float(ask_data[1])
if size > 0:
asks[price] = ExchangeOrderBookLevel(
exchange='coinbase',
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=datetime.now(),
raw_data=ask_data
)
# Update order book
async with self.data_lock:
if symbol not in self.exchange_order_books:
self.exchange_order_books[symbol] = {}
self.exchange_order_books[symbol]['coinbase'] = {
'bids': bids,
'asks': asks,
'last_update': datetime.now(),
'connected': True
}
logger.info(f"Coinbase snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
elif data.get('type') == 'l2update':
# Level 2 update
async with self.data_lock:
if symbol in self.exchange_order_books and 'coinbase' in self.exchange_order_books[symbol]:
coinbase_data = self.exchange_order_books[symbol]['coinbase']
for change in data.get('changes', []):
side, price_str, size_str = change
price, size = float(price_str), float(size_str)
if side == 'buy':
if size == 0:
# Remove level
coinbase_data['bids'].pop(price, None)
else:
# Update level
coinbase_data['bids'][price] = ExchangeOrderBookLevel(
exchange='coinbase',
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=datetime.now(),
raw_data=change
)
elif side == 'sell':
if size == 0:
# Remove level
coinbase_data['asks'].pop(price, None)
else:
# Update level
coinbase_data['asks'][price] = ExchangeOrderBookLevel(
exchange='coinbase',
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=datetime.now(),
raw_data=change
)
coinbase_data['last_update'] = datetime.now()
# Update exchange count
exchange_name = 'coinbase'
if exchange_name not in self.exchange_update_counts:
self.exchange_update_counts[exchange_name] = 0
self.exchange_update_counts[exchange_name] += 1
# Log every 1000th update
if self.exchange_update_counts[exchange_name] % 1000 == 0:
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Coinbase updates for {symbol}")
except Exception as e:
logger.error(f"Error processing Coinbase order book for {symbol}: {e}", exc_info=True)
async def _process_kraken_orderbook(self, symbol: str, data: Dict):
"""Process Kraken order book data"""
try:
# Kraken sends different message types
if isinstance(data, list) and len(data) > 1:
# Order book update format: [channel_id, data, channel_name, pair]
if len(data) >= 4 and data[2] == "book-25":
book_data = data[1]
# Check for snapshot vs update
if 'bs' in book_data and 'as' in book_data:
# Snapshot
bids = {}
asks = {}
for bid_data in book_data.get('bs', []):
price, volume, timestamp = float(bid_data[0]), float(bid_data[1]), float(bid_data[2])
if volume > 0:
bids[price] = ExchangeOrderBookLevel(
exchange='kraken',
price=price,
size=volume,
volume_usd=price * volume,
orders_count=1, # Kraken doesn't provide order count in book feed
side='bid',
timestamp=datetime.fromtimestamp(timestamp),
raw_data=bid_data
)
for ask_data in book_data.get('as', []):
price, volume, timestamp = float(ask_data[0]), float(ask_data[1]), float(ask_data[2])
if volume > 0:
asks[price] = ExchangeOrderBookLevel(
exchange='kraken',
price=price,
size=volume,
volume_usd=price * volume,
orders_count=1,
side='ask',
timestamp=datetime.fromtimestamp(timestamp),
raw_data=ask_data
)
# Update order book
async with self.data_lock:
if symbol not in self.exchange_order_books:
self.exchange_order_books[symbol] = {}
self.exchange_order_books[symbol]['kraken'] = {
'bids': bids,
'asks': asks,
'last_update': datetime.now(),
'connected': True
}
logger.info(f"Kraken snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
else:
# Incremental update
async with self.data_lock:
if symbol in self.exchange_order_books and 'kraken' in self.exchange_order_books[symbol]:
kraken_data = self.exchange_order_books[symbol]['kraken']
# Process bid updates
for bid_update in book_data.get('b', []):
price, volume, timestamp = float(bid_update[0]), float(bid_update[1]), float(bid_update[2])
if volume == 0:
# Remove level
kraken_data['bids'].pop(price, None)
else:
# Update level
kraken_data['bids'][price] = ExchangeOrderBookLevel(
exchange='kraken',
price=price,
size=volume,
volume_usd=price * volume,
orders_count=1,
side='bid',
timestamp=datetime.fromtimestamp(timestamp),
raw_data=bid_update
)
# Process ask updates
for ask_update in book_data.get('a', []):
price, volume, timestamp = float(ask_update[0]), float(ask_update[1]), float(ask_update[2])
if volume == 0:
# Remove level
kraken_data['asks'].pop(price, None)
else:
# Update level
kraken_data['asks'][price] = ExchangeOrderBookLevel(
exchange='kraken',
price=price,
size=volume,
volume_usd=price * volume,
orders_count=1,
side='ask',
timestamp=datetime.fromtimestamp(timestamp),
raw_data=ask_update
)
kraken_data['last_update'] = datetime.now()
# Update exchange count
exchange_name = 'kraken'
if exchange_name not in self.exchange_update_counts:
self.exchange_update_counts[exchange_name] = 0
self.exchange_update_counts[exchange_name] += 1
# Log every 1000th update
if self.exchange_update_counts[exchange_name] % 1000 == 0:
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Kraken updates for {symbol}")
except Exception as e:
logger.error(f"Error processing Kraken order book for {symbol}: {e}", exc_info=True)
async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Coinbase order book data via WebSocket"""
try:
import json
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
# Coinbase Pro WebSocket URL
ws_url = "wss://ws-feed.pro.coinbase.com"
coinbase_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '-'))
# Subscribe message for level2 order book updates
subscribe_message = {
"type": "subscribe",
"product_ids": [coinbase_symbol],
"channels": ["level2"]
}
logger.info(f"Connecting to Coinbase order book stream for {symbol}")
async with websockets_connect(ws_url) as websocket:
# Send subscription
await websocket.send(json.dumps(subscribe_message))
logger.info(f"Subscribed to Coinbase level2 for {coinbase_symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_coinbase_orderbook(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Coinbase message: {e}")
except Exception as e:
logger.error(f"Error processing Coinbase orderbook: {e}")
except Exception as e:
logger.error(f"Coinbase order book stream error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Coinbase order book stream for {symbol}")
async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Kraken order book data via WebSocket"""
try:
import json
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
# Kraken WebSocket URL
ws_url = "wss://ws.kraken.com"
kraken_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', ''))
# Subscribe message for book updates
subscribe_message = {
"event": "subscribe",
"pair": [kraken_symbol],
"subscription": {"name": "book", "depth": 25}
}
logger.info(f"Connecting to Kraken order book stream for {symbol}")
async with websockets_connect(ws_url) as websocket:
# Send subscription
await websocket.send(json.dumps(subscribe_message))
logger.info(f"Subscribed to Kraken book for {kraken_symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_kraken_orderbook(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Kraken message: {e}")
except Exception as e:
logger.error(f"Error processing Kraken orderbook: {e}")
except Exception as e:
logger.error(f"Kraken order book stream error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Kraken order book stream for {symbol}")
async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Huobi order book data (placeholder implementation)"""
try:
logger.info(f"Huobi streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Huobi order book for {symbol}: {e}")
async def _stream_bitfinex_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Bitfinex order book data (placeholder implementation)"""
try:
logger.info(f"Bitfinex streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Bitfinex order book for {symbol}: {e}")
async def _stream_binance_trades(self, symbol: str):
"""Stream trade data from Binance for SVP calculation"""
try:
config = self.exchange_configs[ExchangeType.BINANCE.value]
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade"
logger.info(f"Connecting to Binance trade stream: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
logger.info(f"Connected to Binance trade stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_trade(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance trade message: {e}")
except Exception as e:
logger.error(f"Error processing Binance trade: {e}")
except Exception as e:
logger.error(f"Binance trade stream error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Binance trade stream for {symbol}")
async def _process_binance_trade(self, symbol: str, data: Dict):
"""Process Binance trade data for SVP calculation"""
try:
if 'e' in data and data['e'] == 'trade':
trade = {
'exchange': 'binance',
'symbol': symbol,
'price': float(data['p']),
'quantity': float(data['q']),
'side': 'buy' if not data['m'] else 'sell', # m is true for maker sell
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
'volume_usd': float(data['p']) * float(data['q'])
}
await self._add_trade_to_svp(symbol, trade)
# Log every 1000th trade
if len(self.session_trades[symbol]) % 1000 == 0:
logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}")
except Exception as e:
logger.error(f"Error processing Binance trade for {symbol}: {e}")
async def _continuous_consolidation(self):
"""Continuously consolidate order books from all exchanges"""
while self.is_streaming:
try:
start_time = time.time()
for symbol in self.symbols:
logger.debug(f"Starting consolidation for {symbol}")
await self._consolidate_symbol_orderbook(symbol)
processing_time = (time.time() - start_time) * 1000
self.processing_times['consolidation'].append(processing_time)
# Log consolidation performance every 100 iterations
if len(self.processing_times['consolidation']) % 100 == 0:
avg_time = sum(self.processing_times['consolidation']) / len(self.processing_times['consolidation'])
logger.debug(f"Average consolidation time: {avg_time:.2f}ms")
await asyncio.sleep(0.1) # 100ms consolidation frequency
except Exception as e:
logger.error(f"Error in consolidation loop: {e}", exc_info=True)
await asyncio.sleep(1)
async def _consolidate_symbol_orderbook(self, symbol: str):
"""Consolidate order book for a specific symbol across all exchanges"""
try:
timestamp = datetime.now()
consolidated_bids = {}
consolidated_asks = {}
active_exchanges = []
# Collect order book data from all connected exchanges
async with self.data_lock:
logger.debug(f"Collecting order book data for {symbol}")
for exchange_name, exchange_data in self.exchange_order_books[symbol].items():
if exchange_data.get('connected', False):
active_exchanges.append(exchange_name)
# Get real-time WebSocket data (top 20 levels)
live_bids = exchange_data.get('bids', {})
live_asks = exchange_data.get('asks', {})
# Get deep REST API data (up to 1000 levels)
deep_bids = exchange_data.get('deep_bids', {})
deep_asks = exchange_data.get('deep_asks', {})
# Merge data: prioritize live data for top levels, add deep data for others
merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid')
merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask')
bid_count = len(merged_bids)
ask_count = len(merged_asks)
logger.debug(f"{exchange_name} data for {symbol}: {bid_count} bids ({len(live_bids)} live), {ask_count} asks ({len(live_asks)} live)")
# Process merged bids
for price, level in merged_bids.items():
if price not in consolidated_bids:
consolidated_bids[price] = ConsolidatedOrderBookLevel(
price=price,
total_size=0,
total_volume_usd=0,
total_orders=0,
side='bid',
exchange_breakdown={},
dominant_exchange=exchange_name,
liquidity_score=0,
timestamp=timestamp
)
consolidated_bids[price].total_size += level.size
consolidated_bids[price].total_volume_usd += level.volume_usd
consolidated_bids[price].total_orders += level.orders_count
consolidated_bids[price].exchange_breakdown[exchange_name] = level
# Update dominant exchange based on volume
current_dominant = consolidated_bids[price].exchange_breakdown.get(
consolidated_bids[price].dominant_exchange
)
current_volume = current_dominant.volume_usd if current_dominant else 0
if level.volume_usd > current_volume:
consolidated_bids[price].dominant_exchange = exchange_name
# Process merged asks (similar logic)
for price, level in merged_asks.items():
if price not in consolidated_asks:
consolidated_asks[price] = ConsolidatedOrderBookLevel(
price=price,
total_size=0,
total_volume_usd=0,
total_orders=0,
side='ask',
exchange_breakdown={},
dominant_exchange=exchange_name,
liquidity_score=0,
timestamp=timestamp
)
consolidated_asks[price].total_size += level.size
consolidated_asks[price].total_volume_usd += level.volume_usd
consolidated_asks[price].total_orders += level.orders_count
consolidated_asks[price].exchange_breakdown[exchange_name] = level
current_dominant = consolidated_asks[price].exchange_breakdown.get(
consolidated_asks[price].dominant_exchange
)
current_volume = current_dominant.volume_usd if current_dominant else 0
if level.volume_usd > current_volume:
consolidated_asks[price].dominant_exchange = exchange_name
logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}")
# Sort and calculate consolidated metrics
sorted_bids = sorted(consolidated_bids.values(), key=lambda x: x.price, reverse=True)
sorted_asks = sorted(consolidated_asks.values(), key=lambda x: x.price)
# Calculate consolidated metrics
volume_weighted_mid = self._calculate_volume_weighted_mid(sorted_bids, sorted_asks)
total_bid_liquidity = sum(level.total_volume_usd for level in sorted_bids)
total_ask_liquidity = sum(level.total_volume_usd for level in sorted_asks)
spread_bps = 0
liquidity_imbalance = 0
if sorted_bids and sorted_asks:
best_bid = sorted_bids[0].price
best_ask = sorted_asks[0].price
spread_bps = ((best_ask - best_bid) / volume_weighted_mid) * 10000
if total_bid_liquidity + total_ask_liquidity > 0:
liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity)
logger.debug(f"{symbol} metrics - Mid: ${volume_weighted_mid:.2f}, Spread: {spread_bps:.1f}bps, " +
f"Imbalance: {liquidity_imbalance:.2%}")
# Generate fine-grain price buckets
price_buckets = self._generate_price_buckets(symbol, sorted_bids, sorted_asks, volume_weighted_mid)
# Create consolidated snapshot with more levels for dashboard
cob_snapshot = COBSnapshot(
symbol=symbol,
timestamp=timestamp,
consolidated_bids=sorted_bids[:100], # Top 100 levels for better dashboard display
consolidated_asks=sorted_asks[:100],
exchanges_active=active_exchanges,
volume_weighted_mid=volume_weighted_mid,
total_bid_liquidity=total_bid_liquidity,
total_ask_liquidity=total_ask_liquidity,
spread_bps=spread_bps,
liquidity_imbalance=liquidity_imbalance,
price_buckets=price_buckets
)
# Store consolidated order book
self.current_order_book[symbol] = cob_snapshot
self.realtime_snapshots[symbol].append(cob_snapshot)
# Update real-time statistics
self._update_realtime_stats(symbol, cob_snapshot)
# Update consolidation statistics
async with self.data_lock:
self.consolidation_stats[symbol]['total_updates'] += 1
self.consolidation_stats[symbol]['active_price_levels'] = len(sorted_bids) + len(sorted_asks)
self.consolidation_stats[symbol]['total_liquidity_usd'] = total_bid_liquidity + total_ask_liquidity
# Notify callbacks with real-time data
for callback in self.cob_update_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(symbol, cob_snapshot))
else:
callback(symbol, cob_snapshot)
except Exception as e:
logger.error(f"Error in COB update callback: {e}")
logger.debug(f"Notified {len(self.cob_update_callbacks)} COB callbacks for {symbol}")
logger.debug(f"Completed consolidation for {symbol} - {len(active_exchanges)} exchanges active")
except Exception as e:
logger.error(f"Error consolidating order book for {symbol}: {e}", exc_info=True)
def _merge_orderbook_data(self, live_data: Dict, deep_data: Dict, side: str) -> Dict:
"""
Merge live WebSocket data with deep REST API data
Strategy: Use live data for top levels (lowest latency), deep data for additional depth
"""
try:
merged = {}
# Always prioritize live WebSocket data (top 20 levels)
for price, level in live_data.items():
merged[price] = level
# Add deep data that's not already covered by live data
for price, level in deep_data.items():
if price not in merged:
# Mark this as deep data (older timestamp but more comprehensive)
level.timestamp = level.timestamp # Keep original timestamp
merged[price] = level
# Sort to find the cutoff point for live vs deep data
if side == 'bid':
# For bids, higher prices are better (closer to mid)
sorted_prices = sorted(merged.keys(), reverse=True)
else:
# For asks, lower prices are better (closer to mid)
sorted_prices = sorted(merged.keys())
# Limit total depth to prevent memory issues (keep top 200 levels)
max_levels = 200
if len(sorted_prices) > max_levels:
cutoff_price = sorted_prices[max_levels - 1]
if side == 'bid':
merged = {p: level for p, level in merged.items() if p >= cutoff_price}
else:
merged = {p: level for p, level in merged.items() if p <= cutoff_price}
return merged
except Exception as e:
logger.error(f"Error merging order book data: {e}")
return live_data # Fallback to live data only
def _generate_price_buckets(self, symbol: str, bids: List[ConsolidatedOrderBookLevel],
asks: List[ConsolidatedOrderBookLevel], mid_price: float) -> Dict[str, Dict[str, float]]:
"""Generate fine-grain price buckets for volume analysis"""
try:
buckets = {'bids': {}, 'asks': {}}
# Use fixed USD bucket size if configured for this symbol
if symbol in self.fixed_usd_buckets:
bucket_size = self.fixed_usd_buckets[symbol]
logger.debug(f"Using fixed USD bucket size {bucket_size} for {symbol}")
else:
bucket_size = mid_price * (self.bucket_size_bps / 10000) # Convert bps to decimal
# Process bids (below mid price)
for level in bids:
if level.price <= mid_price:
bucket_key = int((mid_price - level.price) / bucket_size)
bucket_price = mid_price - (bucket_key * bucket_size)
if bucket_key not in buckets['bids']:
buckets['bids'][bucket_key] = {
'price': bucket_price,
'volume_usd': 0,
'size': 0,
'orders': 0,
'exchanges': set()
}
buckets['bids'][bucket_key]['volume_usd'] += level.total_volume_usd
buckets['bids'][bucket_key]['size'] += level.total_size
buckets['bids'][bucket_key]['orders'] += level.total_orders
buckets['bids'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
# Process asks (above mid price)
for level in asks:
if level.price >= mid_price:
bucket_key = int((level.price - mid_price) / bucket_size)
bucket_price = mid_price + (bucket_key * bucket_size)
if bucket_key not in buckets['asks']:
buckets['asks'][bucket_key] = {
'price': bucket_price,
'volume_usd': 0,
'size': 0,
'orders': 0,
'exchanges': set()
}
buckets['asks'][bucket_key]['volume_usd'] += level.total_volume_usd
buckets['asks'][bucket_key]['size'] += level.total_size
buckets['asks'][bucket_key]['orders'] += level.total_orders
buckets['asks'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
# Convert sets to lists for JSON serialization
for side in ['bids', 'asks']:
for bucket_key in buckets[side]:
buckets[side][bucket_key]['exchanges'] = list(buckets[side][bucket_key]['exchanges'])
return buckets
except Exception as e:
logger.error(f"Error generating price buckets for {symbol}: {e}")
return {'bids': {}, 'asks': {}}
def _calculate_volume_weighted_mid(self, bids: List[ConsolidatedOrderBookLevel],
asks: List[ConsolidatedOrderBookLevel]) -> float:
"""Calculate volume-weighted mid price across all exchanges"""
if not bids or not asks:
return 0.0
try:
# Take top 5 levels for volume weighting
top_bids = bids[:5]
top_asks = asks[:5]
total_bid_volume = sum(level.total_volume_usd for level in top_bids)
total_ask_volume = sum(level.total_volume_usd for level in top_asks)
if total_bid_volume + total_ask_volume == 0:
return (bids[0].price + asks[0].price) / 2
weighted_bid = sum(level.price * level.total_volume_usd for level in top_bids) / total_bid_volume if total_bid_volume > 0 else bids[0].price
weighted_ask = sum(level.price * level.total_volume_usd for level in top_asks) / total_ask_volume if total_ask_volume > 0 else asks[0].price
bid_weight = total_bid_volume / (total_bid_volume + total_ask_volume)
ask_weight = total_ask_volume / (total_bid_volume + total_ask_volume)
return (weighted_bid * ask_weight) + (weighted_ask * bid_weight)
except Exception as e:
logger.error(f"Error calculating volume weighted mid: {e}")
return (bids[0].price + asks[0].price) / 2 if bids and asks else 0.0
async def _continuous_bucket_updates(self):
"""Continuously update and optimize price buckets"""
while self.is_streaming:
try:
for symbol in self.symbols:
if symbol in self.current_order_book:
cob = self.current_order_book[symbol]
# Notify bucket update callbacks
for callback in self.bucket_update_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(symbol, cob.price_buckets))
else:
callback(symbol, cob.price_buckets)
except Exception as e:
logger.warning(f"Error in bucket update callback: {e}")
await asyncio.sleep(self.bucket_update_frequency / 1000) # Convert ms to seconds
except Exception as e:
logger.error(f"Error in bucket update loop: {e}")
await asyncio.sleep(1)
# Public interface methods
def subscribe_to_cob_updates(self, callback: Callable[[str, COBSnapshot], Awaitable[None]]):
"""Subscribe to consolidated order book updates"""
self.cob_update_callbacks.append(callback)
logger.info(f"Added COB update callback: {len(self.cob_update_callbacks)} total")
def subscribe_to_bucket_updates(self, callback: Callable[[str, Dict], Awaitable[None]]):
"""Subscribe to price bucket updates"""
self.bucket_update_callbacks.append(callback)
logger.info(f"Added bucket update callback: {len(self.bucket_update_callbacks)} total")
def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]:
"""Get current consolidated order book snapshot"""
return self.current_order_book.get(symbol)
def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]:
"""Get fine-grain price buckets for a symbol"""
if symbol not in self.current_order_book:
return None
cob = self.current_order_book[symbol]
return cob.price_buckets
def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]:
"""Get breakdown of liquidity by exchange"""
if symbol not in self.current_order_book:
return None
cob = self.current_order_book[symbol]
breakdown = {}
for exchange in cob.exchanges_active:
breakdown[exchange] = {
'bid_liquidity': 0,
'ask_liquidity': 0,
'total_liquidity': 0,
'market_share': 0
}
# Calculate liquidity by exchange
for level in cob.consolidated_bids + cob.consolidated_asks:
for exchange, exchange_level in level.exchange_breakdown.items():
if level.side == 'bid':
breakdown[exchange]['bid_liquidity'] += exchange_level.volume_usd
else:
breakdown[exchange]['ask_liquidity'] += exchange_level.volume_usd
breakdown[exchange]['total_liquidity'] += exchange_level.volume_usd
# Calculate market share
total_market_liquidity = sum(data['total_liquidity'] for data in breakdown.values())
if total_market_liquidity > 0:
for exchange in breakdown:
breakdown[exchange]['market_share'] = breakdown[exchange]['total_liquidity'] / total_market_liquidity
return breakdown
def get_statistics(self) -> Dict[str, Any]:
"""Get provider statistics"""
return {
'symbols': self.symbols,
'is_streaming': self.is_streaming,
'active_exchanges': self.active_exchanges,
'exchange_update_counts': dict(self.exchange_update_counts),
'consolidation_stats': dict(self.consolidation_stats),
'bucket_size_bps': self.bucket_size_bps,
'cob_update_callbacks': len(self.cob_update_callbacks),
'bucket_update_callbacks': len(self.bucket_update_callbacks),
'avg_processing_time_ms': np.mean(self.processing_times.get('consolidation', [0])) if self.processing_times.get('consolidation') else 0
}
def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]:
"""Get detailed market depth analysis"""
if symbol not in self.current_order_book:
return None
cob = self.current_order_book[symbol]
# Analyze depth distribution
bid_levels = cob.consolidated_bids[:depth_levels]
ask_levels = cob.consolidated_asks[:depth_levels]
analysis = {
'symbol': symbol,
'timestamp': cob.timestamp.isoformat(),
'volume_weighted_mid': cob.volume_weighted_mid,
'spread_bps': cob.spread_bps,
'total_bid_liquidity': cob.total_bid_liquidity,
'total_ask_liquidity': cob.total_ask_liquidity,
'liquidity_imbalance': cob.liquidity_imbalance,
'exchanges_active': cob.exchanges_active,
'depth_analysis': {
'bid_levels': len(bid_levels),
'ask_levels': len(ask_levels),
'bid_liquidity_distribution': [],
'ask_liquidity_distribution': [],
'dominant_exchanges': {}
}
}
# Analyze liquidity distribution
for i, level in enumerate(bid_levels):
analysis['depth_analysis']['bid_liquidity_distribution'].append({
'level': i + 1,
'price': level.price,
'volume_usd': level.total_volume_usd,
'size': level.total_size,
'dominant_exchange': level.dominant_exchange,
'exchange_count': len(level.exchange_breakdown)
})
for i, level in enumerate(ask_levels):
analysis['depth_analysis']['ask_liquidity_distribution'].append({
'level': i + 1,
'price': level.price,
'volume_usd': level.total_volume_usd,
'size': level.total_size,
'dominant_exchange': level.dominant_exchange,
'exchange_count': len(level.exchange_breakdown)
})
# Count dominant exchanges
for level in bid_levels + ask_levels:
exchange = level.dominant_exchange
if exchange not in analysis['depth_analysis']['dominant_exchanges']:
analysis['depth_analysis']['dominant_exchanges'][exchange] = 0
analysis['depth_analysis']['dominant_exchanges'][exchange] += 1
return analysis
def _update_realtime_stats(self, symbol: str, cob_snapshot: COBSnapshot):
"""Update real-time statistics for 1s and 5s windows"""
try:
current_time = datetime.now()
# Add to history
self.realtime_snapshots[symbol].append(cob_snapshot)
# Calculate 1s and 5s windows
window_1s = current_time - timedelta(seconds=1)
window_5s = current_time - timedelta(seconds=5)
# Get data within windows
data_1s = [snapshot for snapshot in self.realtime_snapshots[symbol]
if snapshot.timestamp >= window_1s]
data_5s = [snapshot for snapshot in self.realtime_snapshots[symbol]
if snapshot.timestamp >= window_5s]
# Update 1s stats
if data_1s:
self.realtime_stats[symbol]['1s_stats'] = self._calculate_window_stats(data_1s)
# Update 5s stats
if data_5s:
self.realtime_stats[symbol]['5s_stats'] = self._calculate_window_stats(data_5s)
except Exception as e:
logger.error(f"Error updating real-time stats for {symbol}: {e}")
def _calculate_window_stats(self, snapshots: List[COBSnapshot]) -> Dict:
"""Calculate statistics for a time window"""
if not snapshots:
return {}
mid_prices = [s.volume_weighted_mid for s in snapshots]
spreads = [s.spread_bps for s in snapshots]
bid_liquidity = [s.total_bid_liquidity for s in snapshots]
ask_liquidity = [s.total_ask_liquidity for s in snapshots]
imbalances = [s.liquidity_imbalance for s in snapshots]
return {
'max_mid_price': max(mid_prices),
'min_mid_price': min(mid_prices),
'avg_mid_price': sum(mid_prices) / len(mid_prices),
'max_spread_bps': max(spreads),
'avg_spread_bps': sum(spreads) / len(spreads),
'max_bid_liquidity': max(bid_liquidity),
'avg_bid_liquidity': sum(bid_liquidity) / len(bid_liquidity),
'max_ask_liquidity': max(ask_liquidity),
'avg_ask_liquidity': sum(ask_liquidity) / len(ask_liquidity),
'max_imbalance': max(imbalances),
'avg_imbalance': sum(imbalances) / len(imbalances),
'update_count': len(snapshots)
}
def get_realtime_stats(self, symbol: str) -> Dict:
"""Get current real-time statistics for a symbol"""
try:
return self.realtime_stats.get(symbol, {})
except Exception as e:
logger.error(f"Error getting real-time stats for {symbol}: {e}")
return {}
async def _stream_binance_full_depth(self, symbol: str):
"""Stream full depth order book from Binance WebSocket (replaces REST API)"""
try:
binance_symbol = symbol.replace('/', '').upper()
# Full depth stream with 1000 levels, updated every 1000ms
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@depth@1000ms"
logger.info(f"Connecting to Binance full depth WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
logger.info(f"Connected to Binance full depth stream for {symbol}")
while self.is_streaming:
try:
message = await websocket.recv()
data = json.loads(message)
# Process full depth data
if 'bids' in data and 'asks' in data:
# Create comprehensive COB snapshot
cob_snapshot = {
'symbol': symbol,
'timestamp': time.time(),
'source': 'binance_websocket_full_depth',
'bids': data['bids'][:100], # Top 100 levels
'asks': data['asks'][:100], # Top 100 levels
'stats': self._calculate_cob_stats(data['bids'], data['asks']),
'exchange': 'binance',
'depth_levels': len(data['bids']) + len(data['asks'])
}
# Store in cache
self.cob_data_cache[symbol] = cob_snapshot
# Notify subscribers
await self._notify_cob_subscribers(symbol, cob_snapshot)
logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
except Exception as e:
if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower():
logger.warning(f"Binance full depth WebSocket connection closed for {symbol}")
break
except Exception as e:
logger.error(f"Error processing full depth data for {symbol}: {e}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in Binance full depth stream for {symbol}: {e}")
def _calculate_cob_stats(self, bids: List, asks: List) -> Dict:
"""Calculate COB statistics from order book data"""
try:
if not bids or not asks:
return {
'mid_price': 0,
'spread_bps': 0,
'imbalance': 0,
'bid_liquidity': 0,
'ask_liquidity': 0
}
# Convert string values to float
bid_prices = [float(bid[0]) for bid in bids]
bid_sizes = [float(bid[1]) for bid in bids]
ask_prices = [float(ask[0]) for ask in asks]
ask_sizes = [float(ask[1]) for ask in asks]
# Calculate best bid/ask
best_bid = max(bid_prices)
best_ask = min(ask_prices)
mid_price = (best_bid + best_ask) / 2
# Calculate spread
spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0
# Calculate liquidity
bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels
ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels
total_liquidity = bid_liquidity + ask_liquidity
# Calculate imbalance
imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
return {
'mid_price': mid_price,
'spread_bps': spread_bps,
'imbalance': imbalance,
'bid_liquidity': bid_liquidity,
'ask_liquidity': ask_liquidity,
'best_bid': best_bid,
'best_ask': best_ask
}
except Exception as e:
logger.error(f"Error calculating COB stats: {e}")
return {
'mid_price': 0,
'spread_bps': 0,
'imbalance': 0,
'bid_liquidity': 0,
'ask_liquidity': 0
}
async def _stream_binance_book_ticker(self, symbol: str):
"""Stream best bid/ask prices from Binance WebSocket"""
try:
binance_symbol = symbol.replace('/', '').upper()
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@bookTicker"
logger.info(f"Connecting to Binance book ticker WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
logger.info(f"Connected to Binance book ticker stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_book_ticker(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance book ticker message: {e}")
except Exception as e:
logger.error(f"Error processing Binance book ticker: {e}")
except Exception as e:
logger.error(f"Binance book ticker WebSocket error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Binance book ticker stream for {symbol}")
async def _stream_binance_agg_trades(self, symbol: str):
"""Stream aggregated trades from Binance WebSocket for large order detection"""
try:
binance_symbol = symbol.replace('/', '').upper()
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@aggTrade"
logger.info(f"Connecting to Binance aggregate trades WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
logger.info(f"Connected to Binance aggregate trades stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_agg_trade(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance agg trade message: {e}")
except Exception as e:
logger.error(f"Error processing Binance agg trade: {e}")
except Exception as e:
logger.error(f"Binance aggregate trades WebSocket error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Binance aggregate trades stream for {symbol}")
async def _process_binance_full_depth(self, symbol: str, data: Dict):
"""Process full depth order book data from WebSocket (replaces REST API)"""
try:
timestamp = datetime.now()
exchange_name = 'binance'
# Parse full depth bids and asks (up to 1000 levels)
full_bids = {}
full_asks = {}
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if size > 0:
full_bids[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=timestamp
)
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if size > 0:
full_asks[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=timestamp
)
# Update full depth storage (replaces REST API data)
async with self.data_lock:
self.exchange_order_books[symbol][exchange_name]['deep_bids'] = full_bids
self.exchange_order_books[symbol][exchange_name]['deep_asks'] = full_asks
self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp
self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId')
logger.debug(f"Updated full depth via WebSocket for {symbol}: {len(full_bids)} bids, {len(full_asks)} asks")
except Exception as e:
logger.error(f"Error processing full depth WebSocket data for {symbol}: {e}")
async def _process_binance_book_ticker(self, symbol: str, data: Dict):
"""Process book ticker data for best bid/ask tracking"""
try:
timestamp = datetime.now()
best_bid_price = float(data.get('b', 0))
best_bid_qty = float(data.get('B', 0))
best_ask_price = float(data.get('a', 0))
best_ask_qty = float(data.get('A', 0))
# Store best bid/ask data
async with self.data_lock:
if symbol not in self.realtime_stats:
self.realtime_stats[symbol] = {}
self.realtime_stats[symbol].update({
'best_bid_price': best_bid_price,
'best_bid_qty': best_bid_qty,
'best_ask_price': best_ask_price,
'best_ask_qty': best_ask_qty,
'spread': best_ask_price - best_bid_price,
'mid_price': (best_bid_price + best_ask_price) / 2,
'book_ticker_timestamp': timestamp
})
logger.debug(f"Book ticker update for {symbol}: Bid {best_bid_price}@{best_bid_qty}, Ask {best_ask_price}@{best_ask_qty}")
except Exception as e:
logger.error(f"Error processing book ticker for {symbol}: {e}")
async def _process_binance_agg_trade(self, symbol: str, data: Dict):
"""Process aggregate trade data for large order detection"""
try:
timestamp = datetime.fromtimestamp(int(data['T']) / 1000)
price = float(data['p'])
quantity = float(data['q'])
is_buyer_maker = data['m']
agg_trade_id = data['a']
first_trade_id = data['f']
last_trade_id = data['l']
# Calculate trade value and size
trade_value_usd = price * quantity
trade_count = last_trade_id - first_trade_id + 1
# Detect large orders (institutional activity)
is_large_order = trade_value_usd > 10000 # $10k+ trades
is_whale_order = trade_value_usd > 100000 # $100k+ trades
agg_trade = {
'symbol': symbol,
'timestamp': timestamp,
'price': price,
'quantity': quantity,
'value_usd': trade_value_usd,
'trade_count': trade_count,
'is_buyer_maker': is_buyer_maker,
'side': 'sell' if is_buyer_maker else 'buy', # Opposite of maker
'is_large_order': is_large_order,
'is_whale_order': is_whale_order,
'agg_trade_id': agg_trade_id
}
# Add to aggregate trade tracking
await self._add_agg_trade_to_analysis(symbol, agg_trade)
# Log significant trades
if is_whale_order:
logger.info(f"WHALE ORDER detected for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()} at ${price}")
elif is_large_order:
logger.debug(f"Large order for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()}")
except Exception as e:
logger.error(f"Error processing aggregate trade for {symbol}: {e}")
async def _add_agg_trade_to_analysis(self, symbol: str, agg_trade: Dict):
"""Add aggregate trade to analysis queues"""
try:
async with self.data_lock:
# Initialize if needed
if symbol not in self.realtime_stats:
self.realtime_stats[symbol] = {}
if 'agg_trades' not in self.realtime_stats[symbol]:
self.realtime_stats[symbol]['agg_trades'] = deque(maxlen=1000)
# Add to aggregate trade history
self.realtime_stats[symbol]['agg_trades'].append(agg_trade)
# Update real-time aggregate statistics
recent_trades = list(self.realtime_stats[symbol]['agg_trades'])[-100:] # Last 100 trades
if recent_trades:
total_buy_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'buy')
total_sell_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'sell')
total_volume = total_buy_volume + total_sell_volume
large_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_large_order'])
large_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_large_order'])
whale_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_whale_order'])
whale_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_whale_order'])
# Calculate order flow metrics
self.realtime_stats[symbol].update({
'buy_sell_ratio': total_buy_volume / total_sell_volume if total_sell_volume > 0 else float('inf'),
'total_volume_100': total_volume,
'large_order_ratio': (large_buy_count + large_sell_count) / len(recent_trades),
'whale_activity': whale_buy_count + whale_sell_count,
'institutional_flow': 'BULLISH' if total_buy_volume > total_sell_volume * 1.2 else 'BEARISH' if total_sell_volume > total_buy_volume * 1.2 else 'NEUTRAL'
})
except Exception as e:
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
"""Get latest COB data for a symbol from cache"""
try:
if symbol in self.cob_data_cache:
return self.cob_data_cache[symbol]
return None
except Exception as e:
logger.error(f"Error getting latest COB data for {symbol}: {e}")
return None