""" Multi-Exchange Consolidated Order Book (COB) Data Provider This module aggregates order book data from multiple cryptocurrency exchanges to provide: - Consolidated Order Book (COB) data across multiple exchanges - Fine-grain volume buckets at configurable price levels - Real-time order book depth aggregation - Volume-weighted consolidated pricing - Exchange-specific order flow analysis - Liquidity distribution metrics Supported Exchanges: - Binance (via WebSocket depth streams) - Coinbase Pro (via WebSocket level2 updates) - Kraken (via WebSocket book updates) - Huobi (via WebSocket mbp updates) - Bitfinex (via WebSocket book updates) Data is structured for consumption by CNN/DQN models and trading dashboards. """ import asyncio import json import logging import time try: import websockets from websockets.client import connect as websockets_connect except ImportError: # Fallback for environments where websockets is not available websockets = None websockets_connect = None import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable, Union, Awaitable from collections import deque, defaultdict from dataclasses import dataclass, field from threading import Thread, Lock import requests import ccxt from enum import Enum import math import aiohttp import aiohttp.resolver logger = logging.getLogger(__name__) # goal: use top 10 exchanges # https://www.coingecko.com/en/exchanges class ExchangeType(Enum): BINANCE = "binance" COINBASE = "coinbase" KRAKEN = "kraken" HUOBI = "huobi" BITFINEX = "bitfinex" BYBIT = "bybit" BITGET = "bitget" @dataclass class ExchangeOrderBookLevel: """Single order book level with exchange attribution""" exchange: str price: float size: float volume_usd: float orders_count: int side: str # 'bid' or 'ask' timestamp: datetime raw_data: Dict[str, Any] = field(default_factory=dict) @dataclass class ConsolidatedOrderBookLevel: """Consolidated order book level across multiple exchanges""" price: float total_size: float total_volume_usd: float total_orders: int side: str exchange_breakdown: Dict[str, ExchangeOrderBookLevel] dominant_exchange: str liquidity_score: float timestamp: datetime @dataclass class COBSnapshot: """Complete Consolidated Order Book snapshot""" symbol: str timestamp: datetime consolidated_bids: List[ConsolidatedOrderBookLevel] consolidated_asks: List[ConsolidatedOrderBookLevel] exchanges_active: List[str] volume_weighted_mid: float total_bid_liquidity: float total_ask_liquidity: float spread_bps: float liquidity_imbalance: float price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets @dataclass class ExchangeConfig: """Exchange configuration for COB aggregation""" exchange_type: ExchangeType weight: float = 1.0 enabled: bool = True websocket_url: str = "" rest_api_url: str = "" symbols_mapping: Dict[str, str] = field(default_factory=dict) rate_limits: Dict[str, int] = field(default_factory=dict) class MultiExchangeCOBProvider: """ Multi-Exchange Consolidated Order Book Data Provider Aggregates real-time order book data from multiple cryptocurrency exchanges to create a consolidated view of market liquidity and pricing. """ def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0): """ Initialize Multi-Exchange COB Provider Args: symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT']) bucket_size_bps: Price bucket size in basis points for fine-grain analysis """ self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.bucket_size_bps = bucket_size_bps self.bucket_update_frequency = 100 # ms self.consolidation_frequency = 100 # ms # REST API configuration for deep order book self.rest_api_frequency = 2000 # ms - full snapshot every 2 seconds (reduced frequency for deeper data) self.rest_depth_limit = 1000 # Increased to 1000 levels via REST for maximum depth # Exchange configurations self.exchange_configs = self._initialize_exchange_configs() # Order book storage - now with deep and live separation self.exchange_order_books = { symbol: { exchange.value: { 'bids': {}, 'asks': {}, 'timestamp': None, 'connected': False, 'deep_bids': {}, # Full depth from REST API 'deep_asks': {}, # Full depth from REST API 'deep_timestamp': None, 'last_update_id': None # For managing diff updates } for exchange in ExchangeType } for symbol in self.symbols } # Consolidated order books self.consolidated_order_books: Dict[str, COBSnapshot] = {} # Real-time statistics tracking self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols} self.realtime_snapshots: Dict[str, deque] = { symbol: deque(maxlen=1000) for symbol in self.symbols } # Session tracking for SVP self.session_start_time = datetime.now() self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols} self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols} # Fixed USD bucket sizes for different symbols as requested self.fixed_usd_buckets = { 'BTC/USDT': 10.0, # $10 buckets for BTC 'ETH/USDT': 1.0, # $1 buckets for ETH } # WebSocket management self.is_streaming = False self.active_exchanges = ['binance'] # Start with Binance only # Callbacks for real-time updates self.cob_update_callbacks = [] self.bucket_update_callbacks = [] # Performance tracking self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType} self.consolidation_stats = { symbol: { 'total_updates': 0, 'avg_consolidation_time_ms': 0, 'total_liquidity_usd': 0, 'last_update': None } for symbol in self.symbols } self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)} # Thread safety self.data_lock = asyncio.Lock() # Initialize aiohttp session and connector to None, will be set up in start_streaming self.session: Optional[aiohttp.ClientSession] = None self.connector: Optional[aiohttp.TCPConnector] = None self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization # Create REST API session # Fix for Windows aiodns issue - use ThreadedResolver instead connector = aiohttp.TCPConnector( resolver=aiohttp.ThreadedResolver(), use_dns_cache=False ) self.rest_session = aiohttp.ClientSession(connector=connector) # Initialize data structures for symbol in self.symbols: self.exchange_order_books[symbol]['binance']['connected'] = False self.exchange_order_books[symbol]['binance']['deep_bids'] = {} self.exchange_order_books[symbol]['binance']['deep_asks'] = {} self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None self.exchange_order_books[symbol]['binance']['last_update_id'] = None self.realtime_snapshots[symbol].append(COBSnapshot( symbol=symbol, timestamp=datetime.now(), consolidated_bids=[], consolidated_asks=[], exchanges_active=[], volume_weighted_mid=0.0, total_bid_liquidity=0.0, total_ask_liquidity=0.0, spread_bps=0.0, liquidity_imbalance=0.0, price_buckets={} )) logger.info(f"Multi-Exchange COB Provider initialized") logger.info(f"Symbols: {self.symbols}") logger.info(f"Bucket size: {bucket_size_bps} bps") logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}") logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}") def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]: """Initialize exchange configurations""" configs = {} # Binance configuration configs[ExchangeType.BINANCE.value] = ExchangeConfig( exchange_type=ExchangeType.BINANCE, weight=0.3, # Higher weight due to volume websocket_url="wss://stream.binance.com:9443/ws/", rest_api_url="https://api.binance.com", symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'}, rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000} ) # Coinbase Pro configuration configs[ExchangeType.COINBASE.value] = ExchangeConfig( exchange_type=ExchangeType.COINBASE, weight=0.25, websocket_url="wss://ws-feed.exchange.coinbase.com", rest_api_url="https://api.exchange.coinbase.com", symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'}, rate_limits={'requests_per_minute': 600} ) # Kraken configuration configs[ExchangeType.KRAKEN.value] = ExchangeConfig( exchange_type=ExchangeType.KRAKEN, weight=0.2, websocket_url="wss://ws.kraken.com", rest_api_url="https://api.kraken.com", symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'}, rate_limits={'requests_per_minute': 900} ) # Huobi configuration configs[ExchangeType.HUOBI.value] = ExchangeConfig( exchange_type=ExchangeType.HUOBI, weight=0.15, websocket_url="wss://api.huobi.pro/ws", rest_api_url="https://api.huobi.pro", symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'}, rate_limits={'requests_per_minute': 2000} ) # Bitfinex configuration configs[ExchangeType.BITFINEX.value] = ExchangeConfig( exchange_type=ExchangeType.BITFINEX, weight=0.1, websocket_url="wss://api-pub.bitfinex.com/ws/2", rest_api_url="https://api-pub.bitfinex.com", symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'}, rate_limits={'requests_per_minute': 1000} ) # Bybit configuration configs[ExchangeType.BYBIT.value] = ExchangeConfig( exchange_type=ExchangeType.BYBIT, weight=0.18, websocket_url="wss://stream.bybit.com/v5/public/spot", rest_api_url="https://api.bybit.com", symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'}, rate_limits={'requests_per_minute': 1200} ) # Bitget configuration configs[ExchangeType.BITGET.value] = ExchangeConfig( exchange_type=ExchangeType.BITGET, weight=0.12, websocket_url="wss://ws.bitget.com/spot/v1/stream", rest_api_url="https://api.bitget.com", symbols_mapping={'BTC/USDT': 'BTCUSDT_SPBL', 'ETH/USDT': 'ETHUSDT_SPBL'}, rate_limits={'requests_per_minute': 1200} ) return configs async def start_streaming(self): """Start real-time order book streaming from all configured exchanges""" logger.info(f"Starting COB streaming for symbols: {self.symbols}") self.is_streaming = True # Setup aiohttp session here, within the async context await self._setup_http_session() # Start WebSocket connections for each active exchange and symbol tasks = [] for symbol in self.symbols: for exchange_name, config in self.exchange_configs.items(): if config.enabled and exchange_name in self.active_exchanges: # Start WebSocket stream tasks.append(self._stream_exchange_orderbook(exchange_name, symbol)) # Start deep order book (REST API) stream tasks.append(self._stream_deep_orderbook(exchange_name, symbol)) # Start trade stream (for SVP) if exchange_name == 'binance': # Only Binance for now tasks.append(self._stream_binance_trades(symbol)) # Start continuous consolidation and bucket updates tasks.append(self._continuous_consolidation()) tasks.append(self._continuous_bucket_updates()) logger.info(f"Starting {len(tasks)} COB streaming tasks") await asyncio.gather(*tasks) async def _setup_http_session(self): """Setup aiohttp session and connector""" self.connector = aiohttp.TCPConnector( resolver=aiohttp.ThreadedResolver() # This is now created inside async function ) self.session = aiohttp.ClientSession(connector=self.connector) self.rest_session = aiohttp.ClientSession(connector=self.connector) # Moved here from __init__ logger.info("aiohttp session and connector setup completed") async def stop_streaming(self): """Stop real-time order book streaming and close sessions""" logger.info("Stopping COB Integration") self.is_streaming = False if self.session and not self.session.closed: await self.session.close() logger.info("aiohttp session closed") if self.rest_session and not self.rest_session.closed: await self.rest_session.close() logger.info("aiohttp REST session closed") if self.connector and not self.connector.closed: await self.connector.close() logger.info("aiohttp connector closed") logger.info("COB Integration stopped") async def _stream_deep_orderbook(self, exchange_name: str, symbol: str): """Fetch deep order book data via REST API periodically""" while self.is_streaming: try: start_time = time.time() if exchange_name == 'binance': await self._fetch_binance_deep_orderbook(symbol) # Add other exchanges here as needed processing_time = (time.time() - start_time) * 1000 self.processing_times['rest_api'].append(processing_time) logger.debug(f"Deep order book fetch for {symbol} took {processing_time:.2f}ms") # Wait before next fetch await asyncio.sleep(self.rest_api_frequency / 1000) except Exception as e: logger.error(f"Error fetching deep order book for {symbol}: {e}") await asyncio.sleep(5) # Wait 5 seconds on error async def _fetch_binance_deep_orderbook(self, symbol: str): """Fetch deep order book from Binance REST API""" try: if not self.rest_session: return # Convert symbol format for Binance binance_symbol = symbol.replace('/', '').upper() url = f"https://api.binance.com/api/v3/depth" params = { 'symbol': binance_symbol, 'limit': self.rest_depth_limit } async with self.rest_session.get(url, params=params) as response: if response.status == 200: data = await response.json() await self._process_binance_deep_orderbook(symbol, data) else: logger.error(f"Binance REST API error {response.status} for {symbol}") except Exception as e: logger.error(f"Error fetching Binance deep order book for {symbol}: {e}") async def _process_binance_deep_orderbook(self, symbol: str, data: Dict): """Process deep order book data from Binance REST API""" try: timestamp = datetime.now() exchange_name = 'binance' # Parse deep bids and asks deep_bids = {} deep_asks = {} for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if size > 0: deep_bids[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=timestamp ) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if size > 0: deep_asks[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=timestamp ) # Update deep order book storage async with self.data_lock: self.exchange_order_books[symbol][exchange_name]['deep_bids'] = deep_bids self.exchange_order_books[symbol][exchange_name]['deep_asks'] = deep_asks self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId') logger.debug(f"Updated deep order book for {symbol}: {len(deep_bids)} bids, {len(deep_asks)} asks") except Exception as e: logger.error(f"Error processing deep order book for {symbol}: {e}") async def _stream_exchange_orderbook(self, exchange_name: str, symbol: str): """Stream order book data from specific exchange""" config = self.exchange_configs[exchange_name] try: if exchange_name == ExchangeType.BINANCE.value: await self._stream_binance_orderbook(symbol, config) elif exchange_name == ExchangeType.COINBASE.value: await self._stream_coinbase_orderbook(symbol, config) elif exchange_name == ExchangeType.KRAKEN.value: await self._stream_kraken_orderbook(symbol, config) elif exchange_name == ExchangeType.HUOBI.value: await self._stream_huobi_orderbook(symbol, config) elif exchange_name == ExchangeType.BITFINEX.value: await self._stream_bitfinex_orderbook(symbol, config) elif exchange_name == ExchangeType.BYBIT.value: await self._stream_bybit_orderbook(symbol, config) elif exchange_name == ExchangeType.BITGET.value: await self._stream_bitget_orderbook(symbol, config) except Exception as e: logger.error(f"Error streaming {exchange_name} for {symbol}: {e}") await asyncio.sleep(5) # Wait before reconnecting async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig): """Stream order book data from Binance""" try: # Use partial book depth stream with maximum levels - Binance format # @depth20@100ms gives us 20 levels at 100ms, but we also have REST API for full depth ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms" logger.info(f"Connecting to Binance WebSocket: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: self.exchange_order_books[symbol]['binance']['connected'] = True logger.info(f"Connected to Binance order book stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_orderbook(symbol, data) # Also track trades for SVP await self._track_binance_trades(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance message: {e}") except Exception as e: logger.error(f"Error processing Binance data: {e}") except Exception as e: logger.error(f"Binance WebSocket error for {symbol}: {e}") finally: self.exchange_order_books[symbol]['binance']['connected'] = False logger.info(f"Disconnected from Binance order book stream for {symbol}") async def _track_binance_trades(self, symbol: str, data: Dict): """Track executed trades from Binance for SVP calculation""" try: # Binance depth stream doesn't include trades, so we need to connect to trade stream if 'e' in data and data['e'] == 'trade': trade = { 'exchange': 'binance', 'symbol': symbol, 'price': float(data['p']), 'quantity': float(data['q']), 'side': 'buy' if data['m'] else 'sell', # m is true for maker sell 'timestamp': datetime.fromtimestamp(data['T'] / 1000), 'volume_usd': float(data['p']) * float(data['q']) } await self._add_trade_to_svp(symbol, trade) except Exception as e: logger.error(f"Error tracking Binance trade: {e}") async def _add_trade_to_svp(self, symbol: str, trade: Dict): """Add trade to session volume profile""" try: async with self.data_lock: # Add to session trades self.session_trades[symbol].append(trade) # Update SVP cache price = trade['price'] side = trade['side'] volume = trade['volume_usd'] if price not in self.svp_cache[symbol]: self.svp_cache[symbol][price] = {'buy_volume': 0.0, 'sell_volume': 0.0} if side == 'buy': self.svp_cache[symbol][price]['buy_volume'] += volume else: self.svp_cache[symbol][price]['sell_volume'] += volume # Keep only recent trades (last 24 hours) cutoff_time = datetime.now() - timedelta(hours=24) self.session_trades[symbol] = [ t for t in self.session_trades[symbol] if t['timestamp'] > cutoff_time ] except Exception as e: logger.error(f"Error adding trade to SVP: {e}") def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict: """Get session volume profile for a symbol""" try: if bucket_size is None: bucket_size = self.fixed_usd_buckets.get(symbol, 1.0) svp_data = {} # Access SVP cache without lock for read-only operations (generally safe) try: for price, volumes in self.svp_cache[symbol].items(): bucket_price = math.floor(price / bucket_size) * bucket_size if bucket_price not in svp_data: svp_data[bucket_price] = { 'buy_volume': 0.0, 'sell_volume': 0.0, 'total_volume': 0.0, 'trade_count': 0 } svp_data[bucket_price]['buy_volume'] += volumes['buy_volume'] svp_data[bucket_price]['sell_volume'] += volumes['sell_volume'] svp_data[bucket_price]['total_volume'] += volumes['buy_volume'] + volumes['sell_volume'] svp_data[bucket_price]['trade_count'] += 1 except Exception as e: logger.error(f"Error accessing SVP cache for {symbol}: {e}") return {} # Convert to sorted list svp_list = [] for price in sorted(svp_data.keys()): data = svp_data[price] if data['total_volume'] > 0: svp_list.append({ 'price': price, 'buy_volume': data['buy_volume'], 'sell_volume': data['sell_volume'], 'total_volume': data['total_volume'], 'trade_count': data['trade_count'], 'buy_percent': (data['buy_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0, 'sell_percent': (data['sell_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0 }) return { 'symbol': symbol, 'session_start': self.session_start_time.isoformat(), 'bucket_size': bucket_size, 'data': svp_list } except Exception as e: logger.error(f"Error getting session volume profile for {symbol}: {e}") return {} async def _process_binance_orderbook(self, symbol: str, data: Dict): """Process Binance order book update""" try: timestamp = datetime.now() exchange_name = ExchangeType.BINANCE.value # Parse bids and asks bids = {} asks = {} for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if size > 0: # Only include non-zero sizes bids[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=timestamp ) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if size > 0: asks[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=timestamp ) # Update exchange order book async with self.data_lock: self.exchange_order_books[symbol][exchange_name].update({ 'bids': bids, 'asks': asks, 'timestamp': timestamp, 'connected': True }) logger.debug(f"Updated Binance order book for {symbol}: {len(bids)} bids, {len(asks)} asks") self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}") except Exception as e: logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True) async def _process_coinbase_orderbook(self, symbol: str, data: Dict): """Process Coinbase order book data""" try: if data.get('type') == 'snapshot': # Initial snapshot bids = {} asks = {} for bid_data in data.get('bids', []): price, size = float(bid_data[0]), float(bid_data[1]) if size > 0: bids[price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, # Coinbase doesn't provide order count side='bid', timestamp=datetime.now(), raw_data=bid_data ) for ask_data in data.get('asks', []): price, size = float(ask_data[0]), float(ask_data[1]) if size > 0: asks[price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=datetime.now(), raw_data=ask_data ) # Update order book async with self.data_lock: if symbol not in self.exchange_order_books: self.exchange_order_books[symbol] = {} self.exchange_order_books[symbol]['coinbase'] = { 'bids': bids, 'asks': asks, 'last_update': datetime.now(), 'connected': True } logger.info(f"Coinbase snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks") elif data.get('type') == 'l2update': # Level 2 update async with self.data_lock: if symbol in self.exchange_order_books and 'coinbase' in self.exchange_order_books[symbol]: coinbase_data = self.exchange_order_books[symbol]['coinbase'] for change in data.get('changes', []): side, price_str, size_str = change price, size = float(price_str), float(size_str) if side == 'buy': if size == 0: # Remove level coinbase_data['bids'].pop(price, None) else: # Update level coinbase_data['bids'][price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=datetime.now(), raw_data=change ) elif side == 'sell': if size == 0: # Remove level coinbase_data['asks'].pop(price, None) else: # Update level coinbase_data['asks'][price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=datetime.now(), raw_data=change ) coinbase_data['last_update'] = datetime.now() # Update exchange count exchange_name = 'coinbase' if exchange_name not in self.exchange_update_counts: self.exchange_update_counts[exchange_name] = 0 self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Coinbase updates for {symbol}") except Exception as e: logger.error(f"Error processing Coinbase order book for {symbol}: {e}", exc_info=True) async def _process_kraken_orderbook(self, symbol: str, data: Dict): """Process Kraken order book data""" try: # Kraken sends different message types if isinstance(data, list) and len(data) > 1: # Order book update format: [channel_id, data, channel_name, pair] if len(data) >= 4 and data[2] == "book-25": book_data = data[1] # Check for snapshot vs update if 'bs' in book_data and 'as' in book_data: # Snapshot bids = {} asks = {} for bid_data in book_data.get('bs', []): price, volume, timestamp = float(bid_data[0]), float(bid_data[1]), float(bid_data[2]) if volume > 0: bids[price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, # Kraken doesn't provide order count in book feed side='bid', timestamp=datetime.fromtimestamp(timestamp), raw_data=bid_data ) for ask_data in book_data.get('as', []): price, volume, timestamp = float(ask_data[0]), float(ask_data[1]), float(ask_data[2]) if volume > 0: asks[price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='ask', timestamp=datetime.fromtimestamp(timestamp), raw_data=ask_data ) # Update order book async with self.data_lock: if symbol not in self.exchange_order_books: self.exchange_order_books[symbol] = {} self.exchange_order_books[symbol]['kraken'] = { 'bids': bids, 'asks': asks, 'last_update': datetime.now(), 'connected': True } logger.info(f"Kraken snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks") else: # Incremental update async with self.data_lock: if symbol in self.exchange_order_books and 'kraken' in self.exchange_order_books[symbol]: kraken_data = self.exchange_order_books[symbol]['kraken'] # Process bid updates for bid_update in book_data.get('b', []): price, volume, timestamp = float(bid_update[0]), float(bid_update[1]), float(bid_update[2]) if volume == 0: # Remove level kraken_data['bids'].pop(price, None) else: # Update level kraken_data['bids'][price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='bid', timestamp=datetime.fromtimestamp(timestamp), raw_data=bid_update ) # Process ask updates for ask_update in book_data.get('a', []): price, volume, timestamp = float(ask_update[0]), float(ask_update[1]), float(ask_update[2]) if volume == 0: # Remove level kraken_data['asks'].pop(price, None) else: # Update level kraken_data['asks'][price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='ask', timestamp=datetime.fromtimestamp(timestamp), raw_data=ask_update ) kraken_data['last_update'] = datetime.now() # Update exchange count exchange_name = 'kraken' if exchange_name not in self.exchange_update_counts: self.exchange_update_counts[exchange_name] = 0 self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Kraken updates for {symbol}") except Exception as e: logger.error(f"Error processing Kraken order book for {symbol}: {e}", exc_info=True) async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Coinbase order book data via WebSocket""" try: import json if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") # Coinbase Pro WebSocket URL ws_url = "wss://ws-feed.pro.coinbase.com" coinbase_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '-')) # Subscribe message for level2 order book updates subscribe_message = { "type": "subscribe", "product_ids": [coinbase_symbol], "channels": ["level2"] } logger.info(f"Connecting to Coinbase order book stream for {symbol}") async with websockets_connect(ws_url) as websocket: # Send subscription await websocket.send(json.dumps(subscribe_message)) logger.info(f"Subscribed to Coinbase level2 for {coinbase_symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_coinbase_orderbook(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Coinbase message: {e}") except Exception as e: logger.error(f"Error processing Coinbase orderbook: {e}") except Exception as e: logger.error(f"Coinbase order book stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Coinbase order book stream for {symbol}") async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Kraken order book data via WebSocket""" try: import json if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") # Kraken WebSocket URL ws_url = "wss://ws.kraken.com" kraken_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '')) # Subscribe message for book updates subscribe_message = { "event": "subscribe", "pair": [kraken_symbol], "subscription": {"name": "book", "depth": 25} } logger.info(f"Connecting to Kraken order book stream for {symbol}") async with websockets_connect(ws_url) as websocket: # Send subscription await websocket.send(json.dumps(subscribe_message)) logger.info(f"Subscribed to Kraken book for {kraken_symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_kraken_orderbook(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Kraken message: {e}") except Exception as e: logger.error(f"Error processing Kraken orderbook: {e}") except Exception as e: logger.error(f"Kraken order book stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Kraken order book stream for {symbol}") async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Huobi order book data (placeholder implementation)""" try: logger.info(f"Huobi streaming for {symbol} not yet implemented") await asyncio.sleep(60) # Sleep to prevent spam except Exception as e: logger.error(f"Error streaming Huobi order book for {symbol}: {e}") async def _stream_bitfinex_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Bitfinex order book data (placeholder implementation)""" try: logger.info(f"Bitfinex streaming for {symbol} not yet implemented") await asyncio.sleep(60) # Sleep to prevent spam except Exception as e: logger.error(f"Error streaming Bitfinex order book for {symbol}: {e}") async def _stream_binance_trades(self, symbol: str): """Stream trade data from Binance for SVP calculation""" try: config = self.exchange_configs[ExchangeType.BINANCE.value] ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade" logger.info(f"Connecting to Binance trade stream: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance trade stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_trade(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance trade message: {e}") except Exception as e: logger.error(f"Error processing Binance trade: {e}") except Exception as e: logger.error(f"Binance trade stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Binance trade stream for {symbol}") async def _process_binance_trade(self, symbol: str, data: Dict): """Process Binance trade data for SVP calculation""" try: if 'e' in data and data['e'] == 'trade': trade = { 'exchange': 'binance', 'symbol': symbol, 'price': float(data['p']), 'quantity': float(data['q']), 'side': 'buy' if not data['m'] else 'sell', # m is true for maker sell 'timestamp': datetime.fromtimestamp(data['T'] / 1000), 'volume_usd': float(data['p']) * float(data['q']) } await self._add_trade_to_svp(symbol, trade) # Log every 1000th trade if len(self.session_trades[symbol]) % 1000 == 0: logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}") except Exception as e: logger.error(f"Error processing Binance trade for {symbol}: {e}") async def _continuous_consolidation(self): """Continuously consolidate order books from all exchanges""" while self.is_streaming: try: start_time = time.time() for symbol in self.symbols: logger.debug(f"Starting consolidation for {symbol}") await self._consolidate_symbol_orderbook(symbol) processing_time = (time.time() - start_time) * 1000 self.processing_times['consolidation'].append(processing_time) # Log consolidation performance every 100 iterations if len(self.processing_times['consolidation']) % 100 == 0: avg_time = sum(self.processing_times['consolidation']) / len(self.processing_times['consolidation']) logger.debug(f"Average consolidation time: {avg_time:.2f}ms") await asyncio.sleep(0.1) # 100ms consolidation frequency except Exception as e: logger.error(f"Error in consolidation loop: {e}", exc_info=True) await asyncio.sleep(1) async def _consolidate_symbol_orderbook(self, symbol: str): """Consolidate order book for a specific symbol across all exchanges""" try: timestamp = datetime.now() consolidated_bids = {} consolidated_asks = {} active_exchanges = [] # Collect order book data from all connected exchanges async with self.data_lock: logger.debug(f"Collecting order book data for {symbol}") for exchange_name, exchange_data in self.exchange_order_books[symbol].items(): if exchange_data.get('connected', False): active_exchanges.append(exchange_name) # Get real-time WebSocket data (top 20 levels) live_bids = exchange_data.get('bids', {}) live_asks = exchange_data.get('asks', {}) # Get deep REST API data (up to 1000 levels) deep_bids = exchange_data.get('deep_bids', {}) deep_asks = exchange_data.get('deep_asks', {}) # Merge data: prioritize live data for top levels, add deep data for others merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid') merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask') bid_count = len(merged_bids) ask_count = len(merged_asks) logger.debug(f"{exchange_name} data for {symbol}: {bid_count} bids ({len(live_bids)} live), {ask_count} asks ({len(live_asks)} live)") # Process merged bids for price, level in merged_bids.items(): if price not in consolidated_bids: consolidated_bids[price] = ConsolidatedOrderBookLevel( price=price, total_size=0, total_volume_usd=0, total_orders=0, side='bid', exchange_breakdown={}, dominant_exchange=exchange_name, liquidity_score=0, timestamp=timestamp ) consolidated_bids[price].total_size += level.size consolidated_bids[price].total_volume_usd += level.volume_usd consolidated_bids[price].total_orders += level.orders_count consolidated_bids[price].exchange_breakdown[exchange_name] = level # Update dominant exchange based on volume if level.volume_usd > consolidated_bids[price].exchange_breakdown.get( consolidated_bids[price].dominant_exchange, type('obj', (object,), {'volume_usd': 0})() ).volume_usd: consolidated_bids[price].dominant_exchange = exchange_name # Process merged asks (similar logic) for price, level in merged_asks.items(): if price not in consolidated_asks: consolidated_asks[price] = ConsolidatedOrderBookLevel( price=price, total_size=0, total_volume_usd=0, total_orders=0, side='ask', exchange_breakdown={}, dominant_exchange=exchange_name, liquidity_score=0, timestamp=timestamp ) consolidated_asks[price].total_size += level.size consolidated_asks[price].total_volume_usd += level.volume_usd consolidated_asks[price].total_orders += level.orders_count consolidated_asks[price].exchange_breakdown[exchange_name] = level if level.volume_usd > consolidated_asks[price].exchange_breakdown.get( consolidated_asks[price].dominant_exchange, type('obj', (object,), {'volume_usd': 0})() ).volume_usd: consolidated_asks[price].dominant_exchange = exchange_name logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}") # Sort and calculate consolidated metrics sorted_bids = sorted(consolidated_bids.values(), key=lambda x: x.price, reverse=True) sorted_asks = sorted(consolidated_asks.values(), key=lambda x: x.price) # Calculate consolidated metrics volume_weighted_mid = self._calculate_volume_weighted_mid(sorted_bids, sorted_asks) total_bid_liquidity = sum(level.total_volume_usd for level in sorted_bids) total_ask_liquidity = sum(level.total_volume_usd for level in sorted_asks) spread_bps = 0 liquidity_imbalance = 0 if sorted_bids and sorted_asks: best_bid = sorted_bids[0].price best_ask = sorted_asks[0].price spread_bps = ((best_ask - best_bid) / volume_weighted_mid) * 10000 if total_bid_liquidity + total_ask_liquidity > 0: liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity) logger.debug(f"{symbol} metrics - Mid: ${volume_weighted_mid:.2f}, Spread: {spread_bps:.1f}bps, " + f"Imbalance: {liquidity_imbalance:.2%}") # Generate fine-grain price buckets price_buckets = self._generate_price_buckets(symbol, sorted_bids, sorted_asks, volume_weighted_mid) # Create consolidated snapshot with more levels for dashboard cob_snapshot = COBSnapshot( symbol=symbol, timestamp=timestamp, consolidated_bids=sorted_bids[:100], # Top 100 levels for better dashboard display consolidated_asks=sorted_asks[:100], exchanges_active=active_exchanges, volume_weighted_mid=volume_weighted_mid, total_bid_liquidity=total_bid_liquidity, total_ask_liquidity=total_ask_liquidity, spread_bps=spread_bps, liquidity_imbalance=liquidity_imbalance, price_buckets=price_buckets ) # Store consolidated order book self.consolidated_order_books[symbol] = cob_snapshot self.realtime_snapshots[symbol].append(cob_snapshot) # Update real-time statistics self._update_realtime_stats(symbol, cob_snapshot) # Update consolidation statistics async with self.data_lock: self.consolidation_stats[symbol]['total_updates'] += 1 self.consolidation_stats[symbol]['active_price_levels'] = len(sorted_bids) + len(sorted_asks) self.consolidation_stats[symbol]['total_liquidity_usd'] = total_bid_liquidity + total_ask_liquidity # Notify callbacks with real-time data for callback in self.cob_update_callbacks: try: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(symbol, cob_snapshot)) else: callback(symbol, cob_snapshot) except Exception as e: logger.error(f"Error in COB update callback: {e}") logger.debug(f"Notified {len(self.cob_update_callbacks)} COB callbacks for {symbol}") logger.debug(f"Completed consolidation for {symbol} - {len(active_exchanges)} exchanges active") except Exception as e: logger.error(f"Error consolidating order book for {symbol}: {e}", exc_info=True) def _merge_orderbook_data(self, live_data: Dict, deep_data: Dict, side: str) -> Dict: """ Merge live WebSocket data with deep REST API data Strategy: Use live data for top levels (lowest latency), deep data for additional depth """ try: merged = {} # Always prioritize live WebSocket data (top 20 levels) for price, level in live_data.items(): merged[price] = level # Add deep data that's not already covered by live data for price, level in deep_data.items(): if price not in merged: # Mark this as deep data (older timestamp but more comprehensive) level.timestamp = level.timestamp # Keep original timestamp merged[price] = level # Sort to find the cutoff point for live vs deep data if side == 'bid': # For bids, higher prices are better (closer to mid) sorted_prices = sorted(merged.keys(), reverse=True) else: # For asks, lower prices are better (closer to mid) sorted_prices = sorted(merged.keys()) # Limit total depth to prevent memory issues (keep top 200 levels) max_levels = 200 if len(sorted_prices) > max_levels: cutoff_price = sorted_prices[max_levels - 1] if side == 'bid': merged = {p: level for p, level in merged.items() if p >= cutoff_price} else: merged = {p: level for p, level in merged.items() if p <= cutoff_price} return merged except Exception as e: logger.error(f"Error merging order book data: {e}") return live_data # Fallback to live data only def _generate_price_buckets(self, symbol: str, bids: List[ConsolidatedOrderBookLevel], asks: List[ConsolidatedOrderBookLevel], mid_price: float) -> Dict[str, Dict[str, float]]: """Generate fine-grain price buckets for volume analysis""" try: buckets = {'bids': {}, 'asks': {}} # Use fixed USD bucket size if configured for this symbol if symbol in self.fixed_usd_buckets: bucket_size = self.fixed_usd_buckets[symbol] logger.debug(f"Using fixed USD bucket size {bucket_size} for {symbol}") else: bucket_size = mid_price * (self.bucket_size_bps / 10000) # Convert bps to decimal # Process bids (below mid price) for level in bids: if level.price <= mid_price: bucket_key = int((mid_price - level.price) / bucket_size) bucket_price = mid_price - (bucket_key * bucket_size) if bucket_key not in buckets['bids']: buckets['bids'][bucket_key] = { 'price': bucket_price, 'volume_usd': 0, 'size': 0, 'orders': 0, 'exchanges': set() } buckets['bids'][bucket_key]['volume_usd'] += level.total_volume_usd buckets['bids'][bucket_key]['size'] += level.total_size buckets['bids'][bucket_key]['orders'] += level.total_orders buckets['bids'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys()) # Process asks (above mid price) for level in asks: if level.price >= mid_price: bucket_key = int((level.price - mid_price) / bucket_size) bucket_price = mid_price + (bucket_key * bucket_size) if bucket_key not in buckets['asks']: buckets['asks'][bucket_key] = { 'price': bucket_price, 'volume_usd': 0, 'size': 0, 'orders': 0, 'exchanges': set() } buckets['asks'][bucket_key]['volume_usd'] += level.total_volume_usd buckets['asks'][bucket_key]['size'] += level.total_size buckets['asks'][bucket_key]['orders'] += level.total_orders buckets['asks'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys()) # Convert sets to lists for JSON serialization for side in ['bids', 'asks']: for bucket_key in buckets[side]: buckets[side][bucket_key]['exchanges'] = list(buckets[side][bucket_key]['exchanges']) return buckets except Exception as e: logger.error(f"Error generating price buckets for {symbol}: {e}") return {'bids': {}, 'asks': {}} def _calculate_volume_weighted_mid(self, bids: List[ConsolidatedOrderBookLevel], asks: List[ConsolidatedOrderBookLevel]) -> float: """Calculate volume-weighted mid price across all exchanges""" if not bids or not asks: return 0.0 try: # Take top 5 levels for volume weighting top_bids = bids[:5] top_asks = asks[:5] total_bid_volume = sum(level.total_volume_usd for level in top_bids) total_ask_volume = sum(level.total_volume_usd for level in top_asks) if total_bid_volume + total_ask_volume == 0: return (bids[0].price + asks[0].price) / 2 weighted_bid = sum(level.price * level.total_volume_usd for level in top_bids) / total_bid_volume if total_bid_volume > 0 else bids[0].price weighted_ask = sum(level.price * level.total_volume_usd for level in top_asks) / total_ask_volume if total_ask_volume > 0 else asks[0].price bid_weight = total_bid_volume / (total_bid_volume + total_ask_volume) ask_weight = total_ask_volume / (total_bid_volume + total_ask_volume) return (weighted_bid * ask_weight) + (weighted_ask * bid_weight) except Exception as e: logger.error(f"Error calculating volume weighted mid: {e}") return (bids[0].price + asks[0].price) / 2 if bids and asks else 0.0 async def _continuous_bucket_updates(self): """Continuously update and optimize price buckets""" while self.is_streaming: try: for symbol in self.symbols: if symbol in self.consolidated_order_books: cob = self.consolidated_order_books[symbol] # Notify bucket update callbacks for callback in self.bucket_update_callbacks: try: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(symbol, cob.price_buckets)) else: callback(symbol, cob.price_buckets) except Exception as e: logger.warning(f"Error in bucket update callback: {e}") await asyncio.sleep(self.bucket_update_frequency / 1000) # Convert ms to seconds except Exception as e: logger.error(f"Error in bucket update loop: {e}") await asyncio.sleep(1) # Public interface methods def subscribe_to_cob_updates(self, callback: Callable[[str, COBSnapshot], Awaitable[None]]): """Subscribe to consolidated order book updates""" self.cob_update_callbacks.append(callback) logger.info(f"Added COB update callback: {len(self.cob_update_callbacks)} total") def subscribe_to_bucket_updates(self, callback: Callable[[str, Dict], Awaitable[None]]): """Subscribe to price bucket updates""" self.bucket_update_callbacks.append(callback) logger.info(f"Added bucket update callback: {len(self.bucket_update_callbacks)} total") def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]: """Get current consolidated order book snapshot""" return self.consolidated_order_books.get(symbol) def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]: """Get fine-grain price buckets for a symbol""" if symbol not in self.consolidated_order_books: return None cob = self.consolidated_order_books[symbol] return cob.price_buckets def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]: """Get breakdown of liquidity by exchange""" if symbol not in self.consolidated_order_books: return None cob = self.consolidated_order_books[symbol] breakdown = {} for exchange in cob.exchanges_active: breakdown[exchange] = { 'bid_liquidity': 0, 'ask_liquidity': 0, 'total_liquidity': 0, 'market_share': 0 } # Calculate liquidity by exchange for level in cob.consolidated_bids + cob.consolidated_asks: for exchange, exchange_level in level.exchange_breakdown.items(): if level.side == 'bid': breakdown[exchange]['bid_liquidity'] += exchange_level.volume_usd else: breakdown[exchange]['ask_liquidity'] += exchange_level.volume_usd breakdown[exchange]['total_liquidity'] += exchange_level.volume_usd # Calculate market share total_market_liquidity = sum(data['total_liquidity'] for data in breakdown.values()) if total_market_liquidity > 0: for exchange in breakdown: breakdown[exchange]['market_share'] = breakdown[exchange]['total_liquidity'] / total_market_liquidity return breakdown def get_statistics(self) -> Dict[str, Any]: """Get provider statistics""" return { 'symbols': self.symbols, 'is_streaming': self.is_streaming, 'active_exchanges': self.active_exchanges, 'exchange_update_counts': dict(self.exchange_update_counts), 'consolidation_stats': dict(self.consolidation_stats), 'bucket_size_bps': self.bucket_size_bps, 'cob_update_callbacks': len(self.cob_update_callbacks), 'bucket_update_callbacks': len(self.bucket_update_callbacks), 'avg_processing_time_ms': np.mean(self.processing_times.get('consolidation', [0])) if self.processing_times.get('consolidation') else 0 } def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]: """Get detailed market depth analysis""" if symbol not in self.consolidated_order_books: return None cob = self.consolidated_order_books[symbol] # Analyze depth distribution bid_levels = cob.consolidated_bids[:depth_levels] ask_levels = cob.consolidated_asks[:depth_levels] analysis = { 'symbol': symbol, 'timestamp': cob.timestamp.isoformat(), 'volume_weighted_mid': cob.volume_weighted_mid, 'spread_bps': cob.spread_bps, 'total_bid_liquidity': cob.total_bid_liquidity, 'total_ask_liquidity': cob.total_ask_liquidity, 'liquidity_imbalance': cob.liquidity_imbalance, 'exchanges_active': cob.exchanges_active, 'depth_analysis': { 'bid_levels': len(bid_levels), 'ask_levels': len(ask_levels), 'bid_liquidity_distribution': [], 'ask_liquidity_distribution': [], 'dominant_exchanges': {} } } # Analyze liquidity distribution for i, level in enumerate(bid_levels): analysis['depth_analysis']['bid_liquidity_distribution'].append({ 'level': i + 1, 'price': level.price, 'volume_usd': level.total_volume_usd, 'size': level.total_size, 'dominant_exchange': level.dominant_exchange, 'exchange_count': len(level.exchange_breakdown) }) for i, level in enumerate(ask_levels): analysis['depth_analysis']['ask_liquidity_distribution'].append({ 'level': i + 1, 'price': level.price, 'volume_usd': level.total_volume_usd, 'size': level.total_size, 'dominant_exchange': level.dominant_exchange, 'exchange_count': len(level.exchange_breakdown) }) # Count dominant exchanges for level in bid_levels + ask_levels: exchange = level.dominant_exchange if exchange not in analysis['depth_analysis']['dominant_exchanges']: analysis['depth_analysis']['dominant_exchanges'][exchange] = 0 analysis['depth_analysis']['dominant_exchanges'][exchange] += 1 return analysis def _update_realtime_stats(self, symbol: str, cob_snapshot: COBSnapshot): """Update real-time statistics for 1s and 5s windows""" try: current_time = datetime.now() # Add to history self.realtime_snapshots[symbol].append(cob_snapshot) # Calculate 1s and 5s windows window_1s = current_time - timedelta(seconds=1) window_5s = current_time - timedelta(seconds=5) # Get data within windows data_1s = [snapshot for snapshot in self.realtime_snapshots[symbol] if snapshot.timestamp >= window_1s] data_5s = [snapshot for snapshot in self.realtime_snapshots[symbol] if snapshot.timestamp >= window_5s] # Update 1s stats if data_1s: self.realtime_stats[symbol]['1s_stats'] = self._calculate_window_stats(data_1s) # Update 5s stats if data_5s: self.realtime_stats[symbol]['5s_stats'] = self._calculate_window_stats(data_5s) except Exception as e: logger.error(f"Error updating real-time stats for {symbol}: {e}") def _calculate_window_stats(self, snapshots: List[COBSnapshot]) -> Dict: """Calculate statistics for a time window""" if not snapshots: return {} mid_prices = [s.volume_weighted_mid for s in snapshots] spreads = [s.spread_bps for s in snapshots] bid_liquidity = [s.total_bid_liquidity for s in snapshots] ask_liquidity = [s.total_ask_liquidity for s in snapshots] imbalances = [s.liquidity_imbalance for s in snapshots] return { 'max_mid_price': max(mid_prices), 'min_mid_price': min(mid_prices), 'avg_mid_price': sum(mid_prices) / len(mid_prices), 'max_spread_bps': max(spreads), 'avg_spread_bps': sum(spreads) / len(spreads), 'max_bid_liquidity': max(bid_liquidity), 'avg_bid_liquidity': sum(bid_liquidity) / len(bid_liquidity), 'max_ask_liquidity': max(ask_liquidity), 'avg_ask_liquidity': sum(ask_liquidity) / len(ask_liquidity), 'max_imbalance': max(imbalances), 'avg_imbalance': sum(imbalances) / len(imbalances), 'update_count': len(snapshots) } def get_realtime_stats(self, symbol: str) -> Dict: """Get current real-time statistics for a symbol""" try: return self.realtime_stats.get(symbol, {}) except Exception as e: logger.error(f"Error getting real-time stats for {symbol}: {e}") return {}