Files
gogo2/core/multi_exchange_cob_provider.py
2025-07-07 01:07:48 +03:00

1281 lines
58 KiB
Python

"""
Multi-Exchange Consolidated Order Book (COB) Data Provider
This module aggregates order book data from multiple cryptocurrency exchanges to provide:
- Consolidated Order Book (COB) data across multiple exchanges
- Fine-grain volume buckets at configurable price levels
- Real-time order book depth aggregation
- Volume-weighted consolidated pricing
- Exchange-specific order flow analysis
- Liquidity distribution metrics
Supported Exchanges:
- Binance (via WebSocket depth streams)
- Coinbase Pro (via WebSocket level2 updates)
- Kraken (via WebSocket book updates)
- Huobi (via WebSocket mbp updates)
- Bitfinex (via WebSocket book updates)
Data is structured for consumption by CNN/DQN models and trading dashboards.
"""
import asyncio
import json
import logging
import time
try:
import websockets
from websockets.client import connect as websockets_connect
except ImportError:
# Fallback for environments where websockets is not available
websockets = None
websockets_connect = None
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Callable, Union, Awaitable
from collections import deque, defaultdict
from dataclasses import dataclass, field
from threading import Thread, Lock
import requests
import ccxt
from enum import Enum
import math
import aiohttp
import aiohttp.resolver
logger = logging.getLogger(__name__)
class ExchangeType(Enum):
BINANCE = "binance"
COINBASE = "coinbase"
KRAKEN = "kraken"
HUOBI = "huobi"
BITFINEX = "bitfinex"
@dataclass
class ExchangeOrderBookLevel:
"""Single order book level with exchange attribution"""
exchange: str
price: float
size: float
volume_usd: float
orders_count: int
side: str # 'bid' or 'ask'
timestamp: datetime
raw_data: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ConsolidatedOrderBookLevel:
"""Consolidated order book level across multiple exchanges"""
price: float
total_size: float
total_volume_usd: float
total_orders: int
side: str
exchange_breakdown: Dict[str, ExchangeOrderBookLevel]
dominant_exchange: str
liquidity_score: float
timestamp: datetime
@dataclass
class COBSnapshot:
"""Complete Consolidated Order Book snapshot"""
symbol: str
timestamp: datetime
consolidated_bids: List[ConsolidatedOrderBookLevel]
consolidated_asks: List[ConsolidatedOrderBookLevel]
exchanges_active: List[str]
volume_weighted_mid: float
total_bid_liquidity: float
total_ask_liquidity: float
spread_bps: float
liquidity_imbalance: float
price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets
@dataclass
class ExchangeConfig:
"""Exchange configuration for COB aggregation"""
exchange_type: ExchangeType
weight: float = 1.0
enabled: bool = True
websocket_url: str = ""
rest_api_url: str = ""
symbols_mapping: Dict[str, str] = field(default_factory=dict)
rate_limits: Dict[str, int] = field(default_factory=dict)
class MultiExchangeCOBProvider:
"""
Multi-Exchange Consolidated Order Book Data Provider
Aggregates real-time order book data from multiple cryptocurrency exchanges
to create a consolidated view of market liquidity and pricing.
"""
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
"""
Initialize Multi-Exchange COB Provider
Args:
symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT'])
bucket_size_bps: Price bucket size in basis points for fine-grain analysis
"""
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
self.bucket_size_bps = bucket_size_bps
self.bucket_update_frequency = 100 # ms
self.consolidation_frequency = 100 # ms
# REST API configuration for deep order book
self.rest_api_frequency = 1000 # ms - full snapshot every 1 second
self.rest_depth_limit = 500 # Increased from 100 to 500 levels via REST for maximum depth
# Exchange configurations
self.exchange_configs = self._initialize_exchange_configs()
# Order book storage - now with deep and live separation
self.exchange_order_books = {
symbol: {
exchange.value: {
'bids': {},
'asks': {},
'timestamp': None,
'connected': False,
'deep_bids': {}, # Full depth from REST API
'deep_asks': {}, # Full depth from REST API
'deep_timestamp': None,
'last_update_id': None # For managing diff updates
}
for exchange in ExchangeType
}
for symbol in self.symbols
}
# Consolidated order books
self.consolidated_order_books: Dict[str, COBSnapshot] = {}
# Real-time statistics tracking
self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
self.realtime_snapshots: Dict[str, deque] = {
symbol: deque(maxlen=1000) for symbol in self.symbols
}
# Session tracking for SVP
self.session_start_time = datetime.now()
self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols}
self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
# Fixed USD bucket sizes for different symbols as requested
self.fixed_usd_buckets = {
'BTC/USDT': 10.0, # $10 buckets for BTC
'ETH/USDT': 1.0, # $1 buckets for ETH
}
# WebSocket management
self.is_streaming = False
self.active_exchanges = ['binance'] # Start with Binance only
# Callbacks for real-time updates
self.cob_update_callbacks = []
self.bucket_update_callbacks = []
# Performance tracking
self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType}
self.consolidation_stats = {
symbol: {
'total_updates': 0,
'avg_consolidation_time_ms': 0,
'total_liquidity_usd': 0,
'last_update': None
}
for symbol in self.symbols
}
self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)}
# Thread safety
self.data_lock = asyncio.Lock()
# Initialize aiohttp session and connector to None, will be set up in start_streaming
self.session: Optional[aiohttp.ClientSession] = None
self.connector: Optional[aiohttp.TCPConnector] = None
self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization
# Create REST API session
# Fix for Windows aiodns issue - use ThreadedResolver instead
connector = aiohttp.TCPConnector(
resolver=aiohttp.ThreadedResolver(),
use_dns_cache=False
)
self.rest_session = aiohttp.ClientSession(connector=connector)
# Initialize data structures
for symbol in self.symbols:
self.exchange_order_books[symbol]['binance']['connected'] = False
self.exchange_order_books[symbol]['binance']['deep_bids'] = {}
self.exchange_order_books[symbol]['binance']['deep_asks'] = {}
self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None
self.exchange_order_books[symbol]['binance']['last_update_id'] = None
self.realtime_snapshots[symbol].append(COBSnapshot(
symbol=symbol,
timestamp=datetime.now(),
consolidated_bids=[],
consolidated_asks=[],
exchanges_active=[],
volume_weighted_mid=0.0,
total_bid_liquidity=0.0,
total_ask_liquidity=0.0,
spread_bps=0.0,
liquidity_imbalance=0.0,
price_buckets={}
))
logger.info(f"Multi-Exchange COB Provider initialized")
logger.info(f"Symbols: {self.symbols}")
logger.info(f"Bucket size: {bucket_size_bps} bps")
logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}")
logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}")
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
"""Initialize exchange configurations"""
configs = {}
# Binance configuration
configs[ExchangeType.BINANCE.value] = ExchangeConfig(
exchange_type=ExchangeType.BINANCE,
weight=0.3, # Higher weight due to volume
websocket_url="wss://stream.binance.com:9443/ws/",
rest_api_url="https://api.binance.com",
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000}
)
# Coinbase Pro configuration
configs[ExchangeType.COINBASE.value] = ExchangeConfig(
exchange_type=ExchangeType.COINBASE,
weight=0.25,
websocket_url="wss://ws-feed.exchange.coinbase.com",
rest_api_url="https://api.exchange.coinbase.com",
symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'},
rate_limits={'requests_per_minute': 600}
)
# Kraken configuration
configs[ExchangeType.KRAKEN.value] = ExchangeConfig(
exchange_type=ExchangeType.KRAKEN,
weight=0.2,
websocket_url="wss://ws.kraken.com",
rest_api_url="https://api.kraken.com",
symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'},
rate_limits={'requests_per_minute': 900}
)
# Huobi configuration
configs[ExchangeType.HUOBI.value] = ExchangeConfig(
exchange_type=ExchangeType.HUOBI,
weight=0.15,
websocket_url="wss://api.huobi.pro/ws",
rest_api_url="https://api.huobi.pro",
symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'},
rate_limits={'requests_per_minute': 2000}
)
# Bitfinex configuration
configs[ExchangeType.BITFINEX.value] = ExchangeConfig(
exchange_type=ExchangeType.BITFINEX,
weight=0.1,
websocket_url="wss://api-pub.bitfinex.com/ws/2",
rest_api_url="https://api-pub.bitfinex.com",
symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'},
rate_limits={'requests_per_minute': 1000}
)
return configs
async def start_streaming(self):
"""Start real-time order book streaming from all configured exchanges"""
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
self.is_streaming = True
# Setup aiohttp session here, within the async context
await self._setup_http_session()
# Start WebSocket connections for each active exchange and symbol
tasks = []
for symbol in self.symbols:
for exchange_name, config in self.exchange_configs.items():
if config.enabled and exchange_name in self.active_exchanges:
# Start WebSocket stream
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
# Start deep order book (REST API) stream
tasks.append(self._stream_deep_orderbook(exchange_name, symbol))
# Start trade stream (for SVP)
if exchange_name == 'binance': # Only Binance for now
tasks.append(self._stream_binance_trades(symbol))
# Start continuous consolidation and bucket updates
tasks.append(self._continuous_consolidation())
tasks.append(self._continuous_bucket_updates())
logger.info(f"Starting {len(tasks)} COB streaming tasks")
await asyncio.gather(*tasks)
async def _setup_http_session(self):
"""Setup aiohttp session and connector"""
self.connector = aiohttp.TCPConnector(
resolver=aiohttp.ThreadedResolver() # This is now created inside async function
)
self.session = aiohttp.ClientSession(connector=self.connector)
self.rest_session = aiohttp.ClientSession(connector=self.connector) # Moved here from __init__
logger.info("aiohttp session and connector setup completed")
async def stop_streaming(self):
"""Stop real-time order book streaming and close sessions"""
logger.info("Stopping COB Integration")
self.is_streaming = False
if self.session and not self.session.closed:
await self.session.close()
logger.info("aiohttp session closed")
if self.rest_session and not self.rest_session.closed:
await self.rest_session.close()
logger.info("aiohttp REST session closed")
if self.connector and not self.connector.closed:
await self.connector.close()
logger.info("aiohttp connector closed")
logger.info("COB Integration stopped")
async def _stream_deep_orderbook(self, exchange_name: str, symbol: str):
"""Fetch deep order book data via REST API periodically"""
while self.is_streaming:
try:
start_time = time.time()
if exchange_name == 'binance':
await self._fetch_binance_deep_orderbook(symbol)
# Add other exchanges here as needed
processing_time = (time.time() - start_time) * 1000
self.processing_times['rest_api'].append(processing_time)
logger.debug(f"Deep order book fetch for {symbol} took {processing_time:.2f}ms")
# Wait before next fetch
await asyncio.sleep(self.rest_api_frequency / 1000)
except Exception as e:
logger.error(f"Error fetching deep order book for {symbol}: {e}")
await asyncio.sleep(5) # Wait 5 seconds on error
async def _fetch_binance_deep_orderbook(self, symbol: str):
"""Fetch deep order book from Binance REST API"""
try:
if not self.rest_session:
return
# Convert symbol format for Binance
binance_symbol = symbol.replace('/', '').upper()
url = f"https://api.binance.com/api/v3/depth"
params = {
'symbol': binance_symbol,
'limit': self.rest_depth_limit
}
async with self.rest_session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
await self._process_binance_deep_orderbook(symbol, data)
else:
logger.error(f"Binance REST API error {response.status} for {symbol}")
except Exception as e:
logger.error(f"Error fetching Binance deep order book for {symbol}: {e}")
async def _process_binance_deep_orderbook(self, symbol: str, data: Dict):
"""Process deep order book data from Binance REST API"""
try:
timestamp = datetime.now()
exchange_name = 'binance'
# Parse deep bids and asks
deep_bids = {}
deep_asks = {}
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if size > 0:
deep_bids[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=timestamp
)
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if size > 0:
deep_asks[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=timestamp
)
# Update deep order book storage
async with self.data_lock:
self.exchange_order_books[symbol][exchange_name]['deep_bids'] = deep_bids
self.exchange_order_books[symbol][exchange_name]['deep_asks'] = deep_asks
self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp
self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId')
logger.debug(f"Updated deep order book for {symbol}: {len(deep_bids)} bids, {len(deep_asks)} asks")
except Exception as e:
logger.error(f"Error processing deep order book for {symbol}: {e}")
async def _stream_exchange_orderbook(self, exchange_name: str, symbol: str):
"""Stream order book data from specific exchange"""
config = self.exchange_configs[exchange_name]
try:
if exchange_name == ExchangeType.BINANCE.value:
await self._stream_binance_orderbook(symbol, config)
elif exchange_name == ExchangeType.COINBASE.value:
await self._stream_coinbase_orderbook(symbol, config)
elif exchange_name == ExchangeType.KRAKEN.value:
await self._stream_kraken_orderbook(symbol, config)
elif exchange_name == ExchangeType.HUOBI.value:
await self._stream_huobi_orderbook(symbol, config)
elif exchange_name == ExchangeType.BITFINEX.value:
await self._stream_bitfinex_orderbook(symbol, config)
except Exception as e:
logger.error(f"Error streaming {exchange_name} for {symbol}: {e}")
await asyncio.sleep(5) # Wait before reconnecting
async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream order book data from Binance"""
try:
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms"
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
self.exchange_order_books[symbol]['binance']['connected'] = True
logger.info(f"Connected to Binance order book stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_orderbook(symbol, data)
# Also track trades for SVP
await self._track_binance_trades(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance message: {e}")
except Exception as e:
logger.error(f"Error processing Binance data: {e}")
except Exception as e:
logger.error(f"Binance WebSocket error for {symbol}: {e}")
finally:
self.exchange_order_books[symbol]['binance']['connected'] = False
logger.info(f"Disconnected from Binance order book stream for {symbol}")
async def _track_binance_trades(self, symbol: str, data: Dict):
"""Track executed trades from Binance for SVP calculation"""
try:
# Binance depth stream doesn't include trades, so we need to connect to trade stream
if 'e' in data and data['e'] == 'trade':
trade = {
'exchange': 'binance',
'symbol': symbol,
'price': float(data['p']),
'quantity': float(data['q']),
'side': 'buy' if data['m'] else 'sell', # m is true for maker sell
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
'volume_usd': float(data['p']) * float(data['q'])
}
await self._add_trade_to_svp(symbol, trade)
except Exception as e:
logger.error(f"Error tracking Binance trade: {e}")
async def _add_trade_to_svp(self, symbol: str, trade: Dict):
"""Add trade to session volume profile"""
try:
async with self.data_lock:
# Add to session trades
self.session_trades[symbol].append(trade)
# Update SVP cache
price = trade['price']
side = trade['side']
volume = trade['volume_usd']
if price not in self.svp_cache[symbol]:
self.svp_cache[symbol][price] = {'buy_volume': 0.0, 'sell_volume': 0.0}
if side == 'buy':
self.svp_cache[symbol][price]['buy_volume'] += volume
else:
self.svp_cache[symbol][price]['sell_volume'] += volume
# Keep only recent trades (last 24 hours)
cutoff_time = datetime.now() - timedelta(hours=24)
self.session_trades[symbol] = [
t for t in self.session_trades[symbol]
if t['timestamp'] > cutoff_time
]
except Exception as e:
logger.error(f"Error adding trade to SVP: {e}")
def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict:
"""Get session volume profile for a symbol"""
try:
if bucket_size is None:
bucket_size = self.fixed_usd_buckets.get(symbol, 1.0)
svp_data = {}
# Access SVP cache without lock for read-only operations (generally safe)
try:
for price, volumes in self.svp_cache[symbol].items():
bucket_price = math.floor(price / bucket_size) * bucket_size
if bucket_price not in svp_data:
svp_data[bucket_price] = {
'buy_volume': 0.0,
'sell_volume': 0.0,
'total_volume': 0.0,
'trade_count': 0
}
svp_data[bucket_price]['buy_volume'] += volumes['buy_volume']
svp_data[bucket_price]['sell_volume'] += volumes['sell_volume']
svp_data[bucket_price]['total_volume'] += volumes['buy_volume'] + volumes['sell_volume']
svp_data[bucket_price]['trade_count'] += 1
except Exception as e:
logger.error(f"Error accessing SVP cache for {symbol}: {e}")
return {}
# Convert to sorted list
svp_list = []
for price in sorted(svp_data.keys()):
data = svp_data[price]
if data['total_volume'] > 0:
svp_list.append({
'price': price,
'buy_volume': data['buy_volume'],
'sell_volume': data['sell_volume'],
'total_volume': data['total_volume'],
'trade_count': data['trade_count'],
'buy_percent': (data['buy_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0,
'sell_percent': (data['sell_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0
})
return {
'symbol': symbol,
'session_start': self.session_start_time.isoformat(),
'bucket_size': bucket_size,
'data': svp_list
}
except Exception as e:
logger.error(f"Error getting session volume profile for {symbol}: {e}")
return {}
async def _process_binance_orderbook(self, symbol: str, data: Dict):
"""Process Binance order book update"""
try:
timestamp = datetime.now()
exchange_name = ExchangeType.BINANCE.value
# Parse bids and asks
bids = {}
asks = {}
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if size > 0: # Only include non-zero sizes
bids[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=timestamp
)
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if size > 0:
asks[price] = ExchangeOrderBookLevel(
exchange=exchange_name,
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=timestamp
)
# Update exchange order book
async with self.data_lock:
self.exchange_order_books[symbol][exchange_name].update({
'bids': bids,
'asks': asks,
'timestamp': timestamp,
'connected': True
})
logger.debug(f"Updated Binance order book for {symbol}: {len(bids)} bids, {len(asks)} asks")
self.exchange_update_counts[exchange_name] += 1
# Log every 1000th update
if self.exchange_update_counts[exchange_name] % 1000 == 0:
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}")
except Exception as e:
logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True)
async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Coinbase order book data (placeholder implementation)"""
try:
# For now, just log that Coinbase streaming is not implemented
logger.info(f"Coinbase streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Coinbase order book for {symbol}: {e}")
async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Kraken order book data (placeholder implementation)"""
try:
logger.info(f"Kraken streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Kraken order book for {symbol}: {e}")
async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Huobi order book data (placeholder implementation)"""
try:
logger.info(f"Huobi streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Huobi order book for {symbol}: {e}")
async def _stream_bitfinex_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream Bitfinex order book data (placeholder implementation)"""
try:
logger.info(f"Bitfinex streaming for {symbol} not yet implemented")
await asyncio.sleep(60) # Sleep to prevent spam
except Exception as e:
logger.error(f"Error streaming Bitfinex order book for {symbol}: {e}")
async def _stream_binance_trades(self, symbol: str):
"""Stream trade data from Binance for SVP calculation"""
try:
config = self.exchange_configs[ExchangeType.BINANCE.value]
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade"
logger.info(f"Connecting to Binance trade stream: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
logger.info(f"Connected to Binance trade stream for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_trade(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance trade message: {e}")
except Exception as e:
logger.error(f"Error processing Binance trade: {e}")
except Exception as e:
logger.error(f"Binance trade stream error for {symbol}: {e}")
finally:
logger.info(f"Disconnected from Binance trade stream for {symbol}")
async def _process_binance_trade(self, symbol: str, data: Dict):
"""Process Binance trade data for SVP calculation"""
try:
if 'e' in data and data['e'] == 'trade':
trade = {
'exchange': 'binance',
'symbol': symbol,
'price': float(data['p']),
'quantity': float(data['q']),
'side': 'buy' if not data['m'] else 'sell', # m is true for maker sell
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
'volume_usd': float(data['p']) * float(data['q'])
}
await self._add_trade_to_svp(symbol, trade)
# Log every 1000th trade
if len(self.session_trades[symbol]) % 1000 == 0:
logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}")
except Exception as e:
logger.error(f"Error processing Binance trade for {symbol}: {e}")
async def _continuous_consolidation(self):
"""Continuously consolidate order books from all exchanges"""
while self.is_streaming:
try:
start_time = time.time()
for symbol in self.symbols:
logger.debug(f"Starting consolidation for {symbol}")
await self._consolidate_symbol_orderbook(symbol)
processing_time = (time.time() - start_time) * 1000
self.processing_times['consolidation'].append(processing_time)
# Log consolidation performance every 100 iterations
if len(self.processing_times['consolidation']) % 100 == 0:
avg_time = sum(self.processing_times['consolidation']) / len(self.processing_times['consolidation'])
logger.debug(f"Average consolidation time: {avg_time:.2f}ms")
await asyncio.sleep(0.1) # 100ms consolidation frequency
except Exception as e:
logger.error(f"Error in consolidation loop: {e}", exc_info=True)
await asyncio.sleep(1)
async def _consolidate_symbol_orderbook(self, symbol: str):
"""Consolidate order book for a specific symbol across all exchanges"""
try:
timestamp = datetime.now()
consolidated_bids = {}
consolidated_asks = {}
active_exchanges = []
# Collect order book data from all connected exchanges
async with self.data_lock:
logger.debug(f"Collecting order book data for {symbol}")
for exchange_name, exchange_data in self.exchange_order_books[symbol].items():
if exchange_data.get('connected', False):
active_exchanges.append(exchange_name)
# Get real-time WebSocket data (top 20 levels)
live_bids = exchange_data.get('bids', {})
live_asks = exchange_data.get('asks', {})
# Get deep REST API data (up to 1000 levels)
deep_bids = exchange_data.get('deep_bids', {})
deep_asks = exchange_data.get('deep_asks', {})
# Merge data: prioritize live data for top levels, add deep data for others
merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid')
merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask')
bid_count = len(merged_bids)
ask_count = len(merged_asks)
logger.debug(f"{exchange_name} data for {symbol}: {bid_count} bids ({len(live_bids)} live), {ask_count} asks ({len(live_asks)} live)")
# Process merged bids
for price, level in merged_bids.items():
if price not in consolidated_bids:
consolidated_bids[price] = ConsolidatedOrderBookLevel(
price=price,
total_size=0,
total_volume_usd=0,
total_orders=0,
side='bid',
exchange_breakdown={},
dominant_exchange=exchange_name,
liquidity_score=0,
timestamp=timestamp
)
consolidated_bids[price].total_size += level.size
consolidated_bids[price].total_volume_usd += level.volume_usd
consolidated_bids[price].total_orders += level.orders_count
consolidated_bids[price].exchange_breakdown[exchange_name] = level
# Update dominant exchange based on volume
if level.volume_usd > consolidated_bids[price].exchange_breakdown.get(
consolidated_bids[price].dominant_exchange,
type('obj', (object,), {'volume_usd': 0})()
).volume_usd:
consolidated_bids[price].dominant_exchange = exchange_name
# Process merged asks (similar logic)
for price, level in merged_asks.items():
if price not in consolidated_asks:
consolidated_asks[price] = ConsolidatedOrderBookLevel(
price=price,
total_size=0,
total_volume_usd=0,
total_orders=0,
side='ask',
exchange_breakdown={},
dominant_exchange=exchange_name,
liquidity_score=0,
timestamp=timestamp
)
consolidated_asks[price].total_size += level.size
consolidated_asks[price].total_volume_usd += level.volume_usd
consolidated_asks[price].total_orders += level.orders_count
consolidated_asks[price].exchange_breakdown[exchange_name] = level
if level.volume_usd > consolidated_asks[price].exchange_breakdown.get(
consolidated_asks[price].dominant_exchange,
type('obj', (object,), {'volume_usd': 0})()
).volume_usd:
consolidated_asks[price].dominant_exchange = exchange_name
logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}")
# Sort and calculate consolidated metrics
sorted_bids = sorted(consolidated_bids.values(), key=lambda x: x.price, reverse=True)
sorted_asks = sorted(consolidated_asks.values(), key=lambda x: x.price)
# Calculate consolidated metrics
volume_weighted_mid = self._calculate_volume_weighted_mid(sorted_bids, sorted_asks)
total_bid_liquidity = sum(level.total_volume_usd for level in sorted_bids)
total_ask_liquidity = sum(level.total_volume_usd for level in sorted_asks)
spread_bps = 0
liquidity_imbalance = 0
if sorted_bids and sorted_asks:
best_bid = sorted_bids[0].price
best_ask = sorted_asks[0].price
spread_bps = ((best_ask - best_bid) / volume_weighted_mid) * 10000
if total_bid_liquidity + total_ask_liquidity > 0:
liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity)
logger.debug(f"{symbol} metrics - Mid: ${volume_weighted_mid:.2f}, Spread: {spread_bps:.1f}bps, " +
f"Imbalance: {liquidity_imbalance:.2%}")
# Generate fine-grain price buckets
price_buckets = self._generate_price_buckets(symbol, sorted_bids, sorted_asks, volume_weighted_mid)
# Create consolidated snapshot with more levels for dashboard
cob_snapshot = COBSnapshot(
symbol=symbol,
timestamp=timestamp,
consolidated_bids=sorted_bids[:100], # Top 100 levels for better dashboard display
consolidated_asks=sorted_asks[:100],
exchanges_active=active_exchanges,
volume_weighted_mid=volume_weighted_mid,
total_bid_liquidity=total_bid_liquidity,
total_ask_liquidity=total_ask_liquidity,
spread_bps=spread_bps,
liquidity_imbalance=liquidity_imbalance,
price_buckets=price_buckets
)
# Store consolidated order book
self.consolidated_order_books[symbol] = cob_snapshot
self.realtime_snapshots[symbol].append(cob_snapshot)
# Update real-time statistics
self._update_realtime_stats(symbol, cob_snapshot)
# Update consolidation statistics
async with self.data_lock:
self.consolidation_stats[symbol]['total_updates'] += 1
self.consolidation_stats[symbol]['active_price_levels'] = len(sorted_bids) + len(sorted_asks)
self.consolidation_stats[symbol]['total_liquidity_usd'] = total_bid_liquidity + total_ask_liquidity
# Notify callbacks with real-time data
for callback in self.cob_update_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(symbol, cob_snapshot))
else:
callback(symbol, cob_snapshot)
except Exception as e:
logger.error(f"Error in COB update callback: {e}")
logger.debug(f"Notified {len(self.cob_update_callbacks)} COB callbacks for {symbol}")
logger.debug(f"Completed consolidation for {symbol} - {len(active_exchanges)} exchanges active")
except Exception as e:
logger.error(f"Error consolidating order book for {symbol}: {e}", exc_info=True)
def _merge_orderbook_data(self, live_data: Dict, deep_data: Dict, side: str) -> Dict:
"""
Merge live WebSocket data with deep REST API data
Strategy: Use live data for top levels (lowest latency), deep data for additional depth
"""
try:
merged = {}
# Always prioritize live WebSocket data (top 20 levels)
for price, level in live_data.items():
merged[price] = level
# Add deep data that's not already covered by live data
for price, level in deep_data.items():
if price not in merged:
# Mark this as deep data (older timestamp but more comprehensive)
level.timestamp = level.timestamp # Keep original timestamp
merged[price] = level
# Sort to find the cutoff point for live vs deep data
if side == 'bid':
# For bids, higher prices are better (closer to mid)
sorted_prices = sorted(merged.keys(), reverse=True)
else:
# For asks, lower prices are better (closer to mid)
sorted_prices = sorted(merged.keys())
# Limit total depth to prevent memory issues (keep top 200 levels)
max_levels = 200
if len(sorted_prices) > max_levels:
cutoff_price = sorted_prices[max_levels - 1]
if side == 'bid':
merged = {p: level for p, level in merged.items() if p >= cutoff_price}
else:
merged = {p: level for p, level in merged.items() if p <= cutoff_price}
return merged
except Exception as e:
logger.error(f"Error merging order book data: {e}")
return live_data # Fallback to live data only
def _generate_price_buckets(self, symbol: str, bids: List[ConsolidatedOrderBookLevel],
asks: List[ConsolidatedOrderBookLevel], mid_price: float) -> Dict[str, Dict[str, float]]:
"""Generate fine-grain price buckets for volume analysis"""
try:
buckets = {'bids': {}, 'asks': {}}
# Use fixed USD bucket size if configured for this symbol
if symbol in self.fixed_usd_buckets:
bucket_size = self.fixed_usd_buckets[symbol]
logger.debug(f"Using fixed USD bucket size {bucket_size} for {symbol}")
else:
bucket_size = mid_price * (self.bucket_size_bps / 10000) # Convert bps to decimal
# Process bids (below mid price)
for level in bids:
if level.price <= mid_price:
bucket_key = int((mid_price - level.price) / bucket_size)
bucket_price = mid_price - (bucket_key * bucket_size)
if bucket_key not in buckets['bids']:
buckets['bids'][bucket_key] = {
'price': bucket_price,
'volume_usd': 0,
'size': 0,
'orders': 0,
'exchanges': set()
}
buckets['bids'][bucket_key]['volume_usd'] += level.total_volume_usd
buckets['bids'][bucket_key]['size'] += level.total_size
buckets['bids'][bucket_key]['orders'] += level.total_orders
buckets['bids'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
# Process asks (above mid price)
for level in asks:
if level.price >= mid_price:
bucket_key = int((level.price - mid_price) / bucket_size)
bucket_price = mid_price + (bucket_key * bucket_size)
if bucket_key not in buckets['asks']:
buckets['asks'][bucket_key] = {
'price': bucket_price,
'volume_usd': 0,
'size': 0,
'orders': 0,
'exchanges': set()
}
buckets['asks'][bucket_key]['volume_usd'] += level.total_volume_usd
buckets['asks'][bucket_key]['size'] += level.total_size
buckets['asks'][bucket_key]['orders'] += level.total_orders
buckets['asks'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
# Convert sets to lists for JSON serialization
for side in ['bids', 'asks']:
for bucket_key in buckets[side]:
buckets[side][bucket_key]['exchanges'] = list(buckets[side][bucket_key]['exchanges'])
return buckets
except Exception as e:
logger.error(f"Error generating price buckets for {symbol}: {e}")
return {'bids': {}, 'asks': {}}
def _calculate_volume_weighted_mid(self, bids: List[ConsolidatedOrderBookLevel],
asks: List[ConsolidatedOrderBookLevel]) -> float:
"""Calculate volume-weighted mid price across all exchanges"""
if not bids or not asks:
return 0.0
try:
# Take top 5 levels for volume weighting
top_bids = bids[:5]
top_asks = asks[:5]
total_bid_volume = sum(level.total_volume_usd for level in top_bids)
total_ask_volume = sum(level.total_volume_usd for level in top_asks)
if total_bid_volume + total_ask_volume == 0:
return (bids[0].price + asks[0].price) / 2
weighted_bid = sum(level.price * level.total_volume_usd for level in top_bids) / total_bid_volume if total_bid_volume > 0 else bids[0].price
weighted_ask = sum(level.price * level.total_volume_usd for level in top_asks) / total_ask_volume if total_ask_volume > 0 else asks[0].price
bid_weight = total_bid_volume / (total_bid_volume + total_ask_volume)
ask_weight = total_ask_volume / (total_bid_volume + total_ask_volume)
return (weighted_bid * ask_weight) + (weighted_ask * bid_weight)
except Exception as e:
logger.error(f"Error calculating volume weighted mid: {e}")
return (bids[0].price + asks[0].price) / 2 if bids and asks else 0.0
async def _continuous_bucket_updates(self):
"""Continuously update and optimize price buckets"""
while self.is_streaming:
try:
for symbol in self.symbols:
if symbol in self.consolidated_order_books:
cob = self.consolidated_order_books[symbol]
# Notify bucket update callbacks
for callback in self.bucket_update_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(symbol, cob.price_buckets))
else:
callback(symbol, cob.price_buckets)
except Exception as e:
logger.warning(f"Error in bucket update callback: {e}")
await asyncio.sleep(self.bucket_update_frequency / 1000) # Convert ms to seconds
except Exception as e:
logger.error(f"Error in bucket update loop: {e}")
await asyncio.sleep(1)
# Public interface methods
def subscribe_to_cob_updates(self, callback: Callable[[str, COBSnapshot], Awaitable[None]]):
"""Subscribe to consolidated order book updates"""
self.cob_update_callbacks.append(callback)
logger.info(f"Added COB update callback: {len(self.cob_update_callbacks)} total")
def subscribe_to_bucket_updates(self, callback: Callable[[str, Dict], Awaitable[None]]):
"""Subscribe to price bucket updates"""
self.bucket_update_callbacks.append(callback)
logger.info(f"Added bucket update callback: {len(self.bucket_update_callbacks)} total")
def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]:
"""Get current consolidated order book snapshot"""
return self.consolidated_order_books.get(symbol)
def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]:
"""Get fine-grain price buckets for a symbol"""
if symbol not in self.consolidated_order_books:
return None
cob = self.consolidated_order_books[symbol]
return cob.price_buckets
def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]:
"""Get breakdown of liquidity by exchange"""
if symbol not in self.consolidated_order_books:
return None
cob = self.consolidated_order_books[symbol]
breakdown = {}
for exchange in cob.exchanges_active:
breakdown[exchange] = {
'bid_liquidity': 0,
'ask_liquidity': 0,
'total_liquidity': 0,
'market_share': 0
}
# Calculate liquidity by exchange
for level in cob.consolidated_bids + cob.consolidated_asks:
for exchange, exchange_level in level.exchange_breakdown.items():
if level.side == 'bid':
breakdown[exchange]['bid_liquidity'] += exchange_level.volume_usd
else:
breakdown[exchange]['ask_liquidity'] += exchange_level.volume_usd
breakdown[exchange]['total_liquidity'] += exchange_level.volume_usd
# Calculate market share
total_market_liquidity = sum(data['total_liquidity'] for data in breakdown.values())
if total_market_liquidity > 0:
for exchange in breakdown:
breakdown[exchange]['market_share'] = breakdown[exchange]['total_liquidity'] / total_market_liquidity
return breakdown
def get_statistics(self) -> Dict[str, Any]:
"""Get provider statistics"""
return {
'symbols': self.symbols,
'is_streaming': self.is_streaming,
'active_exchanges': self.active_exchanges,
'exchange_update_counts': dict(self.exchange_update_counts),
'consolidation_stats': dict(self.consolidation_stats),
'bucket_size_bps': self.bucket_size_bps,
'cob_update_callbacks': len(self.cob_update_callbacks),
'bucket_update_callbacks': len(self.bucket_update_callbacks),
'avg_processing_time_ms': np.mean(self.processing_times.get('consolidation', [0])) if self.processing_times.get('consolidation') else 0
}
def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]:
"""Get detailed market depth analysis"""
if symbol not in self.consolidated_order_books:
return None
cob = self.consolidated_order_books[symbol]
# Analyze depth distribution
bid_levels = cob.consolidated_bids[:depth_levels]
ask_levels = cob.consolidated_asks[:depth_levels]
analysis = {
'symbol': symbol,
'timestamp': cob.timestamp.isoformat(),
'volume_weighted_mid': cob.volume_weighted_mid,
'spread_bps': cob.spread_bps,
'total_bid_liquidity': cob.total_bid_liquidity,
'total_ask_liquidity': cob.total_ask_liquidity,
'liquidity_imbalance': cob.liquidity_imbalance,
'exchanges_active': cob.exchanges_active,
'depth_analysis': {
'bid_levels': len(bid_levels),
'ask_levels': len(ask_levels),
'bid_liquidity_distribution': [],
'ask_liquidity_distribution': [],
'dominant_exchanges': {}
}
}
# Analyze liquidity distribution
for i, level in enumerate(bid_levels):
analysis['depth_analysis']['bid_liquidity_distribution'].append({
'level': i + 1,
'price': level.price,
'volume_usd': level.total_volume_usd,
'size': level.total_size,
'dominant_exchange': level.dominant_exchange,
'exchange_count': len(level.exchange_breakdown)
})
for i, level in enumerate(ask_levels):
analysis['depth_analysis']['ask_liquidity_distribution'].append({
'level': i + 1,
'price': level.price,
'volume_usd': level.total_volume_usd,
'size': level.total_size,
'dominant_exchange': level.dominant_exchange,
'exchange_count': len(level.exchange_breakdown)
})
# Count dominant exchanges
for level in bid_levels + ask_levels:
exchange = level.dominant_exchange
if exchange not in analysis['depth_analysis']['dominant_exchanges']:
analysis['depth_analysis']['dominant_exchanges'][exchange] = 0
analysis['depth_analysis']['dominant_exchanges'][exchange] += 1
return analysis
def _update_realtime_stats(self, symbol: str, cob_snapshot: COBSnapshot):
"""Update real-time statistics for 1s and 5s windows"""
try:
current_time = datetime.now()
# Add to history
self.realtime_snapshots[symbol].append(cob_snapshot)
# Calculate 1s and 5s windows
window_1s = current_time - timedelta(seconds=1)
window_5s = current_time - timedelta(seconds=5)
# Get data within windows
data_1s = [snapshot for snapshot in self.realtime_snapshots[symbol]
if snapshot.timestamp >= window_1s]
data_5s = [snapshot for snapshot in self.realtime_snapshots[symbol]
if snapshot.timestamp >= window_5s]
# Update 1s stats
if data_1s:
self.realtime_stats[symbol]['1s_stats'] = self._calculate_window_stats(data_1s)
# Update 5s stats
if data_5s:
self.realtime_stats[symbol]['5s_stats'] = self._calculate_window_stats(data_5s)
except Exception as e:
logger.error(f"Error updating real-time stats for {symbol}: {e}")
def _calculate_window_stats(self, snapshots: List[COBSnapshot]) -> Dict:
"""Calculate statistics for a time window"""
if not snapshots:
return {}
mid_prices = [s.volume_weighted_mid for s in snapshots]
spreads = [s.spread_bps for s in snapshots]
bid_liquidity = [s.total_bid_liquidity for s in snapshots]
ask_liquidity = [s.total_ask_liquidity for s in snapshots]
imbalances = [s.liquidity_imbalance for s in snapshots]
return {
'max_mid_price': max(mid_prices),
'min_mid_price': min(mid_prices),
'avg_mid_price': sum(mid_prices) / len(mid_prices),
'max_spread_bps': max(spreads),
'avg_spread_bps': sum(spreads) / len(spreads),
'max_bid_liquidity': max(bid_liquidity),
'avg_bid_liquidity': sum(bid_liquidity) / len(bid_liquidity),
'max_ask_liquidity': max(ask_liquidity),
'avg_ask_liquidity': sum(ask_liquidity) / len(ask_liquidity),
'max_imbalance': max(imbalances),
'avg_imbalance': sum(imbalances) / len(imbalances),
'update_count': len(snapshots)
}
def get_realtime_stats(self, symbol: str) -> Dict:
"""Get current real-time statistics for a symbol"""
try:
return self.realtime_stats.get(symbol, {})
except Exception as e:
logger.error(f"Error getting real-time stats for {symbol}: {e}")
return {}