""" Multi-Exchange Consolidated Order Book (COB) Data Provider This module aggregates order book data from multiple cryptocurrency exchanges to provide: - Consolidated Order Book (COB) data across multiple exchanges - Fine-grain volume buckets at configurable price levels - Real-time order book depth aggregation - Volume-weighted consolidated pricing - Exchange-specific order flow analysis - Liquidity distribution metrics Supported Exchanges: - Binance (via WebSocket depth streams) - Coinbase Pro (via WebSocket level2 updates) - Kraken (via WebSocket book updates) - Huobi (via WebSocket mbp updates) - Bitfinex (via WebSocket book updates) Data is structured for consumption by CNN/DQN models and trading dashboards. """ import asyncio import json import logging import time try: import websockets from websockets.client import connect as websockets_connect except ImportError: # Fallback for environments where websockets is not available websockets = None websockets_connect = None import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable, Union, Awaitable from collections import deque, defaultdict from dataclasses import dataclass, field from threading import Thread, Lock import requests import ccxt from enum import Enum import math import aiohttp import aiohttp.resolver logger = logging.getLogger(__name__) class SimpleRateLimiter: """Simple rate limiter to prevent 418 errors""" def __init__(self, requests_per_second: float = 0.5): # Much more conservative self.requests_per_second = requests_per_second self.last_request_time = 0 self.min_interval = 1.0 / requests_per_second self.consecutive_errors = 0 self.blocked_until = 0 def can_make_request(self) -> bool: """Check if we can make a request""" now = time.time() # Check if we're in a blocked state if now < self.blocked_until: return False return (now - self.last_request_time) >= self.min_interval def record_request(self, success: bool = True): """Record that a request was made""" self.last_request_time = time.time() if success: self.consecutive_errors = 0 else: self.consecutive_errors += 1 # Exponential backoff for errors if self.consecutive_errors >= 3: backoff_time = min(300, 10 * (2 ** (self.consecutive_errors - 3))) # Max 5 min self.blocked_until = time.time() + backoff_time logger.warning(f"Rate limiter blocked for {backoff_time}s after {self.consecutive_errors} errors") def get_wait_time(self) -> float: """Get time to wait before next request""" now = time.time() # Check if blocked if now < self.blocked_until: return self.blocked_until - now time_since_last = now - self.last_request_time if time_since_last < self.min_interval: return self.min_interval - time_since_last return 0.0 class ExchangeType(Enum): BINANCE = "binance" COINBASE = "coinbase" KRAKEN = "kraken" HUOBI = "huobi" BITFINEX = "bitfinex" @dataclass class ExchangeOrderBookLevel: """Single order book level with exchange attribution""" exchange: str price: float size: float volume_usd: float orders_count: int side: str # 'bid' or 'ask' timestamp: datetime raw_data: Dict[str, Any] = field(default_factory=dict) @dataclass class ConsolidatedOrderBookLevel: """Consolidated order book level across multiple exchanges""" price: float total_size: float total_volume_usd: float total_orders: int side: str exchange_breakdown: Dict[str, ExchangeOrderBookLevel] dominant_exchange: str liquidity_score: float timestamp: datetime @dataclass class COBSnapshot: """Complete Consolidated Order Book snapshot""" symbol: str timestamp: datetime consolidated_bids: List[ConsolidatedOrderBookLevel] consolidated_asks: List[ConsolidatedOrderBookLevel] exchanges_active: List[str] volume_weighted_mid: float total_bid_liquidity: float total_ask_liquidity: float spread_bps: float liquidity_imbalance: float price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets @dataclass class ExchangeConfig: """Exchange configuration for COB aggregation""" exchange_type: ExchangeType weight: float = 1.0 enabled: bool = True websocket_url: str = "" rest_api_url: str = "" symbols_mapping: Dict[str, str] = field(default_factory=dict) rate_limits: Dict[str, int] = field(default_factory=dict) class MultiExchangeCOBProvider: """ Multi-Exchange Consolidated Order Book Data Provider Aggregates real-time order book data from multiple cryptocurrency exchanges to create a consolidated view of market liquidity and pricing. """ def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]): """Initialize multi-exchange COB provider""" self.symbols = symbols self.exchange_configs = exchange_configs self.active_exchanges = ['binance'] # Focus on Binance for now self.is_streaming = False self.cob_data_cache = {} # Cache for COB data self.cob_subscribers = [] # List of callback functions # Initialize missing attributes that are used throughout the code self.current_order_book = {} # Current order book data per symbol self.realtime_snapshots = defaultdict(list) # Real-time snapshots per symbol self.cob_update_callbacks = [] # COB update callbacks self.data_lock = asyncio.Lock() # Lock for thread-safe data access self.consolidation_stats = defaultdict(lambda: { 'total_updates': 0, 'active_price_levels': 0, 'total_liquidity_usd': 0.0 }) self.fixed_usd_buckets = {} # Fixed USD bucket sizes per symbol self.bucket_size_bps = 10 # Default bucket size in basis points # Rate limiting for REST API fallback self.last_rest_api_call = 0 self.rest_api_call_count = 0 logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}") def subscribe_to_cob_updates(self, callback): """Subscribe to COB data updates""" self.cob_subscribers.append(callback) logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}") async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict): """Notify all subscribers of COB data updates""" try: for callback in self.cob_subscribers: try: if asyncio.iscoroutinefunction(callback): await callback(symbol, cob_snapshot) else: callback(symbol, cob_snapshot) except Exception as e: logger.error(f"Error in COB subscriber callback: {e}") except Exception as e: logger.error(f"Error notifying COB subscribers: {e}") async def start_streaming(self): """Start real-time order book streaming from all configured exchanges using only WebSocket""" logger.info(f"Starting COB streaming for symbols: {self.symbols}") self.is_streaming = True # Setup aiohttp session here, within the async context await self._setup_http_session() # Start WebSocket connections for each active exchange and symbol tasks = [] for symbol in self.symbols: for exchange_name, config in self.exchange_configs.items(): if config.enabled and exchange_name in self.active_exchanges: if exchange_name == 'binance': # Enhanced Binance WebSocket streams (NO REST API) # 1. Partial depth stream (20 levels, 100ms updates) - for real-time updates tasks.append(self._stream_binance_orderbook(symbol, config)) # 2. Full depth stream (1000 levels, 1000ms updates) - replaces REST API tasks.append(self._stream_binance_full_depth(symbol)) # 3. Trade stream for order flow analysis tasks.append(self._stream_binance_trades(symbol)) # 4. Book ticker stream for best bid/ask real-time tasks.append(self._stream_binance_book_ticker(symbol)) # 5. Aggregate trade stream for large order detection tasks.append(self._stream_binance_agg_trades(symbol)) else: # Other exchanges - WebSocket only tasks.append(self._stream_exchange_orderbook(exchange_name, symbol)) # Start continuous consolidation and bucket updates tasks.append(self._continuous_consolidation()) tasks.append(self._continuous_bucket_updates()) logger.info(f"Starting {len(tasks)} COB streaming tasks (WebSocket only - NO REST API)") await asyncio.gather(*tasks) async def _setup_http_session(self): """Setup aiohttp session and connector""" self.connector = aiohttp.TCPConnector( resolver=aiohttp.ThreadedResolver() # This is now created inside async function ) self.session = aiohttp.ClientSession(connector=self.connector) self.rest_session = aiohttp.ClientSession(connector=self.connector) # Moved here from __init__ logger.info("aiohttp session and connector setup completed") async def stop_streaming(self): """Stop real-time order book streaming and close sessions""" logger.info("Stopping COB Integration") self.is_streaming = False if self.session and not self.session.closed: await self.session.close() logger.info("aiohttp session closed") if self.rest_session and not self.rest_session.closed: await self.rest_session.close() logger.info("aiohttp REST session closed") if self.connector and not self.connector.closed: await self.connector.close() logger.info("aiohttp connector closed") logger.info("COB Integration stopped") async def _stream_deep_orderbook(self, exchange_name: str, symbol: str): """Fetch deep order book data via REST API periodically""" while self.is_streaming: try: start_time = time.time() if exchange_name == 'binance': await self._fetch_binance_deep_orderbook(symbol) # Add other exchanges here as needed processing_time = (time.time() - start_time) * 1000 self.processing_times['rest_api'].append(processing_time) logger.debug(f"Deep order book fetch for {symbol} took {processing_time:.2f}ms") # Wait before next fetch await asyncio.sleep(self.rest_api_frequency / 1000) except Exception as e: logger.error(f"Error fetching deep order book for {symbol}: {e}") await asyncio.sleep(5) # Wait 5 seconds on error async def _fetch_binance_deep_orderbook(self, symbol: str): """Fetch deep order book from Binance REST API with rate limiting""" try: if not self.rest_session: return # Check rate limiter before making request if not self.rest_rate_limiter.can_make_request(): wait_time = self.rest_rate_limiter.get_wait_time() if wait_time > 0: logger.debug(f"Rate limited, waiting {wait_time:.1f}s before {symbol} request") await asyncio.sleep(wait_time) return # Skip this cycle # Convert symbol format for Binance binance_symbol = symbol.replace('/', '').upper() url = f"https://api.binance.com/api/v3/depth" params = { 'symbol': binance_symbol, 'limit': self.rest_depth_limit } # Add headers to reduce detection headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } async with self.rest_session.get(url, params=params, headers=headers) as response: if response.status == 200: data = await response.json() await self._process_binance_deep_orderbook(symbol, data) self.rest_rate_limiter.record_request() # Record successful request elif response.status in [418, 429, 451]: logger.warning(f"Binance REST API rate limited (HTTP {response.status}) for {symbol}") # Increase wait time for next request await asyncio.sleep(10) # Wait 10 seconds on rate limit else: logger.error(f"Binance REST API error {response.status} for {symbol}") except Exception as e: logger.error(f"Error fetching Binance deep order book for {symbol}: {e}") async def _process_binance_deep_orderbook(self, symbol: str, data: Dict): """Process deep order book data from Binance REST API""" try: timestamp = datetime.now() exchange_name = 'binance' # Parse deep bids and asks deep_bids = {} deep_asks = {} for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if size > 0: deep_bids[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=timestamp ) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if size > 0: deep_asks[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=timestamp ) # Update deep order book storage async with self.data_lock: self.exchange_order_books[symbol][exchange_name]['deep_bids'] = deep_bids self.exchange_order_books[symbol][exchange_name]['deep_asks'] = deep_asks self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId') logger.debug(f"Updated deep order book for {symbol}: {len(deep_bids)} bids, {len(deep_asks)} asks") except Exception as e: logger.error(f"Error processing deep order book for {symbol}: {e}") async def _stream_exchange_orderbook(self, exchange_name: str, symbol: str): """Stream order book data from specific exchange""" config = self.exchange_configs[exchange_name] try: if exchange_name == ExchangeType.BINANCE.value: await self._stream_binance_orderbook(symbol, config) elif exchange_name == ExchangeType.COINBASE.value: await self._stream_coinbase_orderbook(symbol, config) elif exchange_name == ExchangeType.KRAKEN.value: await self._stream_kraken_orderbook(symbol, config) elif exchange_name == ExchangeType.HUOBI.value: await self._stream_huobi_orderbook(symbol, config) elif exchange_name == ExchangeType.BITFINEX.value: await self._stream_bitfinex_orderbook(symbol, config) except Exception as e: logger.error(f"Error streaming {exchange_name} for {symbol}: {e}") await asyncio.sleep(5) # Wait before reconnecting async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig): """Stream order book data from Binance""" try: ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms" logger.info(f"Connecting to Binance WebSocket: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: self.exchange_order_books[symbol]['binance']['connected'] = True logger.info(f"Connected to Binance order book stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_orderbook(symbol, data) # Also track trades for SVP await self._track_binance_trades(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance message: {e}") except Exception as e: logger.error(f"Error processing Binance data: {e}") except Exception as e: logger.error(f"Binance WebSocket error for {symbol}: {e}") finally: self.exchange_order_books[symbol]['binance']['connected'] = False logger.info(f"Disconnected from Binance order book stream for {symbol}") async def _track_binance_trades(self, symbol: str, data: Dict): """Track executed trades from Binance for SVP calculation""" try: # Binance depth stream doesn't include trades, so we need to connect to trade stream if 'e' in data and data['e'] == 'trade': trade = { 'exchange': 'binance', 'symbol': symbol, 'price': float(data['p']), 'quantity': float(data['q']), 'side': 'buy' if data['m'] else 'sell', # m is true for maker sell 'timestamp': datetime.fromtimestamp(data['T'] / 1000), 'volume_usd': float(data['p']) * float(data['q']) } await self._add_trade_to_svp(symbol, trade) except Exception as e: logger.error(f"Error tracking Binance trade: {e}") async def _add_trade_to_svp(self, symbol: str, trade: Dict): """Add trade to session volume profile""" try: async with self.data_lock: # Add to session trades self.session_trades[symbol].append(trade) # Update SVP cache price = trade['price'] side = trade['side'] volume = trade['volume_usd'] if price not in self.svp_cache[symbol]: self.svp_cache[symbol][price] = {'buy_volume': 0.0, 'sell_volume': 0.0} if side == 'buy': self.svp_cache[symbol][price]['buy_volume'] += volume else: self.svp_cache[symbol][price]['sell_volume'] += volume # Keep only recent trades (last 24 hours) cutoff_time = datetime.now() - timedelta(hours=24) self.session_trades[symbol] = [ t for t in self.session_trades[symbol] if t['timestamp'] > cutoff_time ] except Exception as e: logger.error(f"Error adding trade to SVP: {e}") def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict: """Get session volume profile for a symbol""" try: if bucket_size is None: bucket_size = self.fixed_usd_buckets.get(symbol, 1.0) svp_data = {} # Access SVP cache without lock for read-only operations (generally safe) try: for price, volumes in self.svp_cache[symbol].items(): bucket_price = math.floor(price / bucket_size) * bucket_size if bucket_price not in svp_data: svp_data[bucket_price] = { 'buy_volume': 0.0, 'sell_volume': 0.0, 'total_volume': 0.0, 'trade_count': 0 } svp_data[bucket_price]['buy_volume'] += volumes['buy_volume'] svp_data[bucket_price]['sell_volume'] += volumes['sell_volume'] svp_data[bucket_price]['total_volume'] += volumes['buy_volume'] + volumes['sell_volume'] svp_data[bucket_price]['trade_count'] += 1 except Exception as e: logger.error(f"Error accessing SVP cache for {symbol}: {e}") return {} # Convert to sorted list svp_list = [] for price in sorted(svp_data.keys()): data = svp_data[price] if data['total_volume'] > 0: svp_list.append({ 'price': price, 'buy_volume': data['buy_volume'], 'sell_volume': data['sell_volume'], 'total_volume': data['total_volume'], 'trade_count': data['trade_count'], 'buy_percent': (data['buy_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0, 'sell_percent': (data['sell_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0 }) return { 'symbol': symbol, 'session_start': self.session_start_time.isoformat(), 'bucket_size': bucket_size, 'data': svp_list } except Exception as e: logger.error(f"Error getting session volume profile for {symbol}: {e}") return {} async def _process_binance_orderbook(self, symbol: str, data: Dict): """Process Binance order book update""" try: timestamp = datetime.now() exchange_name = ExchangeType.BINANCE.value # Parse bids and asks bids = {} asks = {} for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if size > 0: # Only include non-zero sizes bids[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=timestamp ) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if size > 0: asks[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=timestamp ) # Update exchange order book async with self.data_lock: self.exchange_order_books[symbol][exchange_name].update({ 'bids': bids, 'asks': asks, 'timestamp': timestamp, 'connected': True }) logger.debug(f"Updated Binance order book for {symbol}: {len(bids)} bids, {len(asks)} asks") self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}") except Exception as e: logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True) async def _process_coinbase_orderbook(self, symbol: str, data: Dict): """Process Coinbase order book data""" try: if data.get('type') == 'snapshot': # Initial snapshot bids = {} asks = {} for bid_data in data.get('bids', []): price, size = float(bid_data[0]), float(bid_data[1]) if size > 0: bids[price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, # Coinbase doesn't provide order count side='bid', timestamp=datetime.now(), raw_data=bid_data ) for ask_data in data.get('asks', []): price, size = float(ask_data[0]), float(ask_data[1]) if size > 0: asks[price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=datetime.now(), raw_data=ask_data ) # Update order book async with self.data_lock: if symbol not in self.exchange_order_books: self.exchange_order_books[symbol] = {} self.exchange_order_books[symbol]['coinbase'] = { 'bids': bids, 'asks': asks, 'last_update': datetime.now(), 'connected': True } logger.info(f"Coinbase snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks") elif data.get('type') == 'l2update': # Level 2 update async with self.data_lock: if symbol in self.exchange_order_books and 'coinbase' in self.exchange_order_books[symbol]: coinbase_data = self.exchange_order_books[symbol]['coinbase'] for change in data.get('changes', []): side, price_str, size_str = change price, size = float(price_str), float(size_str) if side == 'buy': if size == 0: # Remove level coinbase_data['bids'].pop(price, None) else: # Update level coinbase_data['bids'][price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=datetime.now(), raw_data=change ) elif side == 'sell': if size == 0: # Remove level coinbase_data['asks'].pop(price, None) else: # Update level coinbase_data['asks'][price] = ExchangeOrderBookLevel( exchange='coinbase', price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=datetime.now(), raw_data=change ) coinbase_data['last_update'] = datetime.now() # Update exchange count exchange_name = 'coinbase' if exchange_name not in self.exchange_update_counts: self.exchange_update_counts[exchange_name] = 0 self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Coinbase updates for {symbol}") except Exception as e: logger.error(f"Error processing Coinbase order book for {symbol}: {e}", exc_info=True) async def _process_kraken_orderbook(self, symbol: str, data: Dict): """Process Kraken order book data""" try: # Kraken sends different message types if isinstance(data, list) and len(data) > 1: # Order book update format: [channel_id, data, channel_name, pair] if len(data) >= 4 and data[2] == "book-25": book_data = data[1] # Check for snapshot vs update if 'bs' in book_data and 'as' in book_data: # Snapshot bids = {} asks = {} for bid_data in book_data.get('bs', []): price, volume, timestamp = float(bid_data[0]), float(bid_data[1]), float(bid_data[2]) if volume > 0: bids[price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, # Kraken doesn't provide order count in book feed side='bid', timestamp=datetime.fromtimestamp(timestamp), raw_data=bid_data ) for ask_data in book_data.get('as', []): price, volume, timestamp = float(ask_data[0]), float(ask_data[1]), float(ask_data[2]) if volume > 0: asks[price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='ask', timestamp=datetime.fromtimestamp(timestamp), raw_data=ask_data ) # Update order book async with self.data_lock: if symbol not in self.exchange_order_books: self.exchange_order_books[symbol] = {} self.exchange_order_books[symbol]['kraken'] = { 'bids': bids, 'asks': asks, 'last_update': datetime.now(), 'connected': True } logger.info(f"Kraken snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks") else: # Incremental update async with self.data_lock: if symbol in self.exchange_order_books and 'kraken' in self.exchange_order_books[symbol]: kraken_data = self.exchange_order_books[symbol]['kraken'] # Process bid updates for bid_update in book_data.get('b', []): price, volume, timestamp = float(bid_update[0]), float(bid_update[1]), float(bid_update[2]) if volume == 0: # Remove level kraken_data['bids'].pop(price, None) else: # Update level kraken_data['bids'][price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='bid', timestamp=datetime.fromtimestamp(timestamp), raw_data=bid_update ) # Process ask updates for ask_update in book_data.get('a', []): price, volume, timestamp = float(ask_update[0]), float(ask_update[1]), float(ask_update[2]) if volume == 0: # Remove level kraken_data['asks'].pop(price, None) else: # Update level kraken_data['asks'][price] = ExchangeOrderBookLevel( exchange='kraken', price=price, size=volume, volume_usd=price * volume, orders_count=1, side='ask', timestamp=datetime.fromtimestamp(timestamp), raw_data=ask_update ) kraken_data['last_update'] = datetime.now() # Update exchange count exchange_name = 'kraken' if exchange_name not in self.exchange_update_counts: self.exchange_update_counts[exchange_name] = 0 self.exchange_update_counts[exchange_name] += 1 # Log every 1000th update if self.exchange_update_counts[exchange_name] % 1000 == 0: logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Kraken updates for {symbol}") except Exception as e: logger.error(f"Error processing Kraken order book for {symbol}: {e}", exc_info=True) async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Coinbase order book data via WebSocket""" try: import json if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") # Coinbase Pro WebSocket URL ws_url = "wss://ws-feed.pro.coinbase.com" coinbase_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '-')) # Subscribe message for level2 order book updates subscribe_message = { "type": "subscribe", "product_ids": [coinbase_symbol], "channels": ["level2"] } logger.info(f"Connecting to Coinbase order book stream for {symbol}") async with websockets_connect(ws_url) as websocket: # Send subscription await websocket.send(json.dumps(subscribe_message)) logger.info(f"Subscribed to Coinbase level2 for {coinbase_symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_coinbase_orderbook(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Coinbase message: {e}") except Exception as e: logger.error(f"Error processing Coinbase orderbook: {e}") except Exception as e: logger.error(f"Coinbase order book stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Coinbase order book stream for {symbol}") async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Kraken order book data via WebSocket""" try: import json if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") # Kraken WebSocket URL ws_url = "wss://ws.kraken.com" kraken_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '')) # Subscribe message for book updates subscribe_message = { "event": "subscribe", "pair": [kraken_symbol], "subscription": {"name": "book", "depth": 25} } logger.info(f"Connecting to Kraken order book stream for {symbol}") async with websockets_connect(ws_url) as websocket: # Send subscription await websocket.send(json.dumps(subscribe_message)) logger.info(f"Subscribed to Kraken book for {kraken_symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_kraken_orderbook(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Kraken message: {e}") except Exception as e: logger.error(f"Error processing Kraken orderbook: {e}") except Exception as e: logger.error(f"Kraken order book stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Kraken order book stream for {symbol}") async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Huobi order book data (placeholder implementation)""" try: logger.info(f"Huobi streaming for {symbol} not yet implemented") await asyncio.sleep(60) # Sleep to prevent spam except Exception as e: logger.error(f"Error streaming Huobi order book for {symbol}: {e}") async def _stream_bitfinex_orderbook(self, symbol: str, config: ExchangeConfig): """Stream Bitfinex order book data (placeholder implementation)""" try: logger.info(f"Bitfinex streaming for {symbol} not yet implemented") await asyncio.sleep(60) # Sleep to prevent spam except Exception as e: logger.error(f"Error streaming Bitfinex order book for {symbol}: {e}") async def _stream_binance_trades(self, symbol: str): """Stream trade data from Binance for SVP calculation""" try: config = self.exchange_configs[ExchangeType.BINANCE.value] ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade" logger.info(f"Connecting to Binance trade stream: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance trade stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_trade(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance trade message: {e}") except Exception as e: logger.error(f"Error processing Binance trade: {e}") except Exception as e: logger.error(f"Binance trade stream error for {symbol}: {e}") finally: logger.info(f"Disconnected from Binance trade stream for {symbol}") async def _process_binance_trade(self, symbol: str, data: Dict): """Process Binance trade data for SVP calculation""" try: if 'e' in data and data['e'] == 'trade': trade = { 'exchange': 'binance', 'symbol': symbol, 'price': float(data['p']), 'quantity': float(data['q']), 'side': 'buy' if not data['m'] else 'sell', # m is true for maker sell 'timestamp': datetime.fromtimestamp(data['T'] / 1000), 'volume_usd': float(data['p']) * float(data['q']) } await self._add_trade_to_svp(symbol, trade) # Log every 1000th trade if len(self.session_trades[symbol]) % 1000 == 0: logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}") except Exception as e: logger.error(f"Error processing Binance trade for {symbol}: {e}") async def _continuous_consolidation(self): """Continuously consolidate order books from all exchanges""" while self.is_streaming: try: start_time = time.time() for symbol in self.symbols: logger.debug(f"Starting consolidation for {symbol}") await self._consolidate_symbol_orderbook(symbol) processing_time = (time.time() - start_time) * 1000 self.processing_times['consolidation'].append(processing_time) # Log consolidation performance every 100 iterations if len(self.processing_times['consolidation']) % 100 == 0: avg_time = sum(self.processing_times['consolidation']) / len(self.processing_times['consolidation']) logger.debug(f"Average consolidation time: {avg_time:.2f}ms") await asyncio.sleep(0.1) # 100ms consolidation frequency except Exception as e: logger.error(f"Error in consolidation loop: {e}", exc_info=True) await asyncio.sleep(1) async def _consolidate_symbol_orderbook(self, symbol: str): """Consolidate order book for a specific symbol across all exchanges""" try: timestamp = datetime.now() consolidated_bids = {} consolidated_asks = {} active_exchanges = [] # Collect order book data from all connected exchanges async with self.data_lock: logger.debug(f"Collecting order book data for {symbol}") for exchange_name, exchange_data in self.exchange_order_books[symbol].items(): if exchange_data.get('connected', False): active_exchanges.append(exchange_name) # Get real-time WebSocket data (top 20 levels) live_bids = exchange_data.get('bids', {}) live_asks = exchange_data.get('asks', {}) # Get deep REST API data (up to 1000 levels) deep_bids = exchange_data.get('deep_bids', {}) deep_asks = exchange_data.get('deep_asks', {}) # Merge data: prioritize live data for top levels, add deep data for others merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid') merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask') bid_count = len(merged_bids) ask_count = len(merged_asks) logger.debug(f"{exchange_name} data for {symbol}: {bid_count} bids ({len(live_bids)} live), {ask_count} asks ({len(live_asks)} live)") # Process merged bids for price, level in merged_bids.items(): if price not in consolidated_bids: consolidated_bids[price] = ConsolidatedOrderBookLevel( price=price, total_size=0, total_volume_usd=0, total_orders=0, side='bid', exchange_breakdown={}, dominant_exchange=exchange_name, liquidity_score=0, timestamp=timestamp ) consolidated_bids[price].total_size += level.size consolidated_bids[price].total_volume_usd += level.volume_usd consolidated_bids[price].total_orders += level.orders_count consolidated_bids[price].exchange_breakdown[exchange_name] = level # Update dominant exchange based on volume current_dominant = consolidated_bids[price].exchange_breakdown.get( consolidated_bids[price].dominant_exchange ) current_volume = current_dominant.volume_usd if current_dominant else 0 if level.volume_usd > current_volume: consolidated_bids[price].dominant_exchange = exchange_name # Process merged asks (similar logic) for price, level in merged_asks.items(): if price not in consolidated_asks: consolidated_asks[price] = ConsolidatedOrderBookLevel( price=price, total_size=0, total_volume_usd=0, total_orders=0, side='ask', exchange_breakdown={}, dominant_exchange=exchange_name, liquidity_score=0, timestamp=timestamp ) consolidated_asks[price].total_size += level.size consolidated_asks[price].total_volume_usd += level.volume_usd consolidated_asks[price].total_orders += level.orders_count consolidated_asks[price].exchange_breakdown[exchange_name] = level current_dominant = consolidated_asks[price].exchange_breakdown.get( consolidated_asks[price].dominant_exchange ) current_volume = current_dominant.volume_usd if current_dominant else 0 if level.volume_usd > current_volume: consolidated_asks[price].dominant_exchange = exchange_name logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}") # Sort and calculate consolidated metrics sorted_bids = sorted(consolidated_bids.values(), key=lambda x: x.price, reverse=True) sorted_asks = sorted(consolidated_asks.values(), key=lambda x: x.price) # Calculate consolidated metrics volume_weighted_mid = self._calculate_volume_weighted_mid(sorted_bids, sorted_asks) total_bid_liquidity = sum(level.total_volume_usd for level in sorted_bids) total_ask_liquidity = sum(level.total_volume_usd for level in sorted_asks) spread_bps = 0 liquidity_imbalance = 0 if sorted_bids and sorted_asks: best_bid = sorted_bids[0].price best_ask = sorted_asks[0].price spread_bps = ((best_ask - best_bid) / volume_weighted_mid) * 10000 if total_bid_liquidity + total_ask_liquidity > 0: liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity) logger.debug(f"{symbol} metrics - Mid: ${volume_weighted_mid:.2f}, Spread: {spread_bps:.1f}bps, " + f"Imbalance: {liquidity_imbalance:.2%}") # Generate fine-grain price buckets price_buckets = self._generate_price_buckets(symbol, sorted_bids, sorted_asks, volume_weighted_mid) # Create consolidated snapshot with more levels for dashboard cob_snapshot = COBSnapshot( symbol=symbol, timestamp=timestamp, consolidated_bids=sorted_bids[:100], # Top 100 levels for better dashboard display consolidated_asks=sorted_asks[:100], exchanges_active=active_exchanges, volume_weighted_mid=volume_weighted_mid, total_bid_liquidity=total_bid_liquidity, total_ask_liquidity=total_ask_liquidity, spread_bps=spread_bps, liquidity_imbalance=liquidity_imbalance, price_buckets=price_buckets ) # Store consolidated order book self.current_order_book[symbol] = cob_snapshot self.realtime_snapshots[symbol].append(cob_snapshot) # Update real-time statistics self._update_realtime_stats(symbol, cob_snapshot) # Update consolidation statistics async with self.data_lock: self.consolidation_stats[symbol]['total_updates'] += 1 self.consolidation_stats[symbol]['active_price_levels'] = len(sorted_bids) + len(sorted_asks) self.consolidation_stats[symbol]['total_liquidity_usd'] = total_bid_liquidity + total_ask_liquidity # Notify callbacks with real-time data for callback in self.cob_update_callbacks: try: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(symbol, cob_snapshot)) else: callback(symbol, cob_snapshot) except Exception as e: logger.error(f"Error in COB update callback: {e}") logger.debug(f"Notified {len(self.cob_update_callbacks)} COB callbacks for {symbol}") logger.debug(f"Completed consolidation for {symbol} - {len(active_exchanges)} exchanges active") except Exception as e: logger.error(f"Error consolidating order book for {symbol}: {e}", exc_info=True) def _merge_orderbook_data(self, live_data: Dict, deep_data: Dict, side: str) -> Dict: """ Merge live WebSocket data with deep REST API data Strategy: Use live data for top levels (lowest latency), deep data for additional depth """ try: merged = {} # Always prioritize live WebSocket data (top 20 levels) for price, level in live_data.items(): merged[price] = level # Add deep data that's not already covered by live data for price, level in deep_data.items(): if price not in merged: # Mark this as deep data (older timestamp but more comprehensive) level.timestamp = level.timestamp # Keep original timestamp merged[price] = level # Sort to find the cutoff point for live vs deep data if side == 'bid': # For bids, higher prices are better (closer to mid) sorted_prices = sorted(merged.keys(), reverse=True) else: # For asks, lower prices are better (closer to mid) sorted_prices = sorted(merged.keys()) # Limit total depth to prevent memory issues (keep top 200 levels) max_levels = 200 if len(sorted_prices) > max_levels: cutoff_price = sorted_prices[max_levels - 1] if side == 'bid': merged = {p: level for p, level in merged.items() if p >= cutoff_price} else: merged = {p: level for p, level in merged.items() if p <= cutoff_price} return merged except Exception as e: logger.error(f"Error merging order book data: {e}") return live_data # Fallback to live data only def _generate_price_buckets(self, symbol: str, bids: List[ConsolidatedOrderBookLevel], asks: List[ConsolidatedOrderBookLevel], mid_price: float) -> Dict[str, Dict[str, float]]: """Generate fine-grain price buckets for volume analysis""" try: buckets = {'bids': {}, 'asks': {}} # Use fixed USD bucket size if configured for this symbol if symbol in self.fixed_usd_buckets: bucket_size = self.fixed_usd_buckets[symbol] logger.debug(f"Using fixed USD bucket size {bucket_size} for {symbol}") else: bucket_size = mid_price * (self.bucket_size_bps / 10000) # Convert bps to decimal # Process bids (below mid price) for level in bids: if level.price <= mid_price: bucket_key = int((mid_price - level.price) / bucket_size) bucket_price = mid_price - (bucket_key * bucket_size) if bucket_key not in buckets['bids']: buckets['bids'][bucket_key] = { 'price': bucket_price, 'volume_usd': 0, 'size': 0, 'orders': 0, 'exchanges': set() } buckets['bids'][bucket_key]['volume_usd'] += level.total_volume_usd buckets['bids'][bucket_key]['size'] += level.total_size buckets['bids'][bucket_key]['orders'] += level.total_orders buckets['bids'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys()) # Process asks (above mid price) for level in asks: if level.price >= mid_price: bucket_key = int((level.price - mid_price) / bucket_size) bucket_price = mid_price + (bucket_key * bucket_size) if bucket_key not in buckets['asks']: buckets['asks'][bucket_key] = { 'price': bucket_price, 'volume_usd': 0, 'size': 0, 'orders': 0, 'exchanges': set() } buckets['asks'][bucket_key]['volume_usd'] += level.total_volume_usd buckets['asks'][bucket_key]['size'] += level.total_size buckets['asks'][bucket_key]['orders'] += level.total_orders buckets['asks'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys()) # Convert sets to lists for JSON serialization for side in ['bids', 'asks']: for bucket_key in buckets[side]: buckets[side][bucket_key]['exchanges'] = list(buckets[side][bucket_key]['exchanges']) return buckets except Exception as e: logger.error(f"Error generating price buckets for {symbol}: {e}") return {'bids': {}, 'asks': {}} def _calculate_volume_weighted_mid(self, bids: List[ConsolidatedOrderBookLevel], asks: List[ConsolidatedOrderBookLevel]) -> float: """Calculate volume-weighted mid price across all exchanges""" if not bids or not asks: return 0.0 try: # Take top 5 levels for volume weighting top_bids = bids[:5] top_asks = asks[:5] total_bid_volume = sum(level.total_volume_usd for level in top_bids) total_ask_volume = sum(level.total_volume_usd for level in top_asks) if total_bid_volume + total_ask_volume == 0: return (bids[0].price + asks[0].price) / 2 weighted_bid = sum(level.price * level.total_volume_usd for level in top_bids) / total_bid_volume if total_bid_volume > 0 else bids[0].price weighted_ask = sum(level.price * level.total_volume_usd for level in top_asks) / total_ask_volume if total_ask_volume > 0 else asks[0].price bid_weight = total_bid_volume / (total_bid_volume + total_ask_volume) ask_weight = total_ask_volume / (total_bid_volume + total_ask_volume) return (weighted_bid * ask_weight) + (weighted_ask * bid_weight) except Exception as e: logger.error(f"Error calculating volume weighted mid: {e}") return (bids[0].price + asks[0].price) / 2 if bids and asks else 0.0 async def _continuous_bucket_updates(self): """Continuously update and optimize price buckets""" while self.is_streaming: try: for symbol in self.symbols: if symbol in self.current_order_book: cob = self.current_order_book[symbol] # Notify bucket update callbacks for callback in self.bucket_update_callbacks: try: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(symbol, cob.price_buckets)) else: callback(symbol, cob.price_buckets) except Exception as e: logger.warning(f"Error in bucket update callback: {e}") await asyncio.sleep(self.bucket_update_frequency / 1000) # Convert ms to seconds except Exception as e: logger.error(f"Error in bucket update loop: {e}") await asyncio.sleep(1) # Public interface methods def subscribe_to_cob_updates(self, callback: Callable[[str, COBSnapshot], Awaitable[None]]): """Subscribe to consolidated order book updates""" self.cob_update_callbacks.append(callback) logger.info(f"Added COB update callback: {len(self.cob_update_callbacks)} total") def subscribe_to_bucket_updates(self, callback: Callable[[str, Dict], Awaitable[None]]): """Subscribe to price bucket updates""" self.bucket_update_callbacks.append(callback) logger.info(f"Added bucket update callback: {len(self.bucket_update_callbacks)} total") def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]: """Get current consolidated order book snapshot""" return self.current_order_book.get(symbol) def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]: """Get fine-grain price buckets for a symbol""" if symbol not in self.current_order_book: return None cob = self.current_order_book[symbol] return cob.price_buckets def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]: """Get breakdown of liquidity by exchange""" if symbol not in self.current_order_book: return None cob = self.current_order_book[symbol] breakdown = {} for exchange in cob.exchanges_active: breakdown[exchange] = { 'bid_liquidity': 0, 'ask_liquidity': 0, 'total_liquidity': 0, 'market_share': 0 } # Calculate liquidity by exchange for level in cob.consolidated_bids + cob.consolidated_asks: for exchange, exchange_level in level.exchange_breakdown.items(): if level.side == 'bid': breakdown[exchange]['bid_liquidity'] += exchange_level.volume_usd else: breakdown[exchange]['ask_liquidity'] += exchange_level.volume_usd breakdown[exchange]['total_liquidity'] += exchange_level.volume_usd # Calculate market share total_market_liquidity = sum(data['total_liquidity'] for data in breakdown.values()) if total_market_liquidity > 0: for exchange in breakdown: breakdown[exchange]['market_share'] = breakdown[exchange]['total_liquidity'] / total_market_liquidity return breakdown def get_statistics(self) -> Dict[str, Any]: """Get provider statistics""" return { 'symbols': self.symbols, 'is_streaming': self.is_streaming, 'active_exchanges': self.active_exchanges, 'exchange_update_counts': dict(self.exchange_update_counts), 'consolidation_stats': dict(self.consolidation_stats), 'bucket_size_bps': self.bucket_size_bps, 'cob_update_callbacks': len(self.cob_update_callbacks), 'bucket_update_callbacks': len(self.bucket_update_callbacks), 'avg_processing_time_ms': np.mean(self.processing_times.get('consolidation', [0])) if self.processing_times.get('consolidation') else 0 } def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]: """Get detailed market depth analysis""" if symbol not in self.current_order_book: return None cob = self.current_order_book[symbol] # Analyze depth distribution bid_levels = cob.consolidated_bids[:depth_levels] ask_levels = cob.consolidated_asks[:depth_levels] analysis = { 'symbol': symbol, 'timestamp': cob.timestamp.isoformat(), 'volume_weighted_mid': cob.volume_weighted_mid, 'spread_bps': cob.spread_bps, 'total_bid_liquidity': cob.total_bid_liquidity, 'total_ask_liquidity': cob.total_ask_liquidity, 'liquidity_imbalance': cob.liquidity_imbalance, 'exchanges_active': cob.exchanges_active, 'depth_analysis': { 'bid_levels': len(bid_levels), 'ask_levels': len(ask_levels), 'bid_liquidity_distribution': [], 'ask_liquidity_distribution': [], 'dominant_exchanges': {} } } # Analyze liquidity distribution for i, level in enumerate(bid_levels): analysis['depth_analysis']['bid_liquidity_distribution'].append({ 'level': i + 1, 'price': level.price, 'volume_usd': level.total_volume_usd, 'size': level.total_size, 'dominant_exchange': level.dominant_exchange, 'exchange_count': len(level.exchange_breakdown) }) for i, level in enumerate(ask_levels): analysis['depth_analysis']['ask_liquidity_distribution'].append({ 'level': i + 1, 'price': level.price, 'volume_usd': level.total_volume_usd, 'size': level.total_size, 'dominant_exchange': level.dominant_exchange, 'exchange_count': len(level.exchange_breakdown) }) # Count dominant exchanges for level in bid_levels + ask_levels: exchange = level.dominant_exchange if exchange not in analysis['depth_analysis']['dominant_exchanges']: analysis['depth_analysis']['dominant_exchanges'][exchange] = 0 analysis['depth_analysis']['dominant_exchanges'][exchange] += 1 return analysis def _update_realtime_stats(self, symbol: str, cob_snapshot: COBSnapshot): """Update real-time statistics for 1s and 5s windows""" try: current_time = datetime.now() # Add to history self.realtime_snapshots[symbol].append(cob_snapshot) # Calculate 1s and 5s windows window_1s = current_time - timedelta(seconds=1) window_5s = current_time - timedelta(seconds=5) # Get data within windows data_1s = [snapshot for snapshot in self.realtime_snapshots[symbol] if snapshot.timestamp >= window_1s] data_5s = [snapshot for snapshot in self.realtime_snapshots[symbol] if snapshot.timestamp >= window_5s] # Update 1s stats if data_1s: self.realtime_stats[symbol]['1s_stats'] = self._calculate_window_stats(data_1s) # Update 5s stats if data_5s: self.realtime_stats[symbol]['5s_stats'] = self._calculate_window_stats(data_5s) except Exception as e: logger.error(f"Error updating real-time stats for {symbol}: {e}") def _calculate_window_stats(self, snapshots: List[COBSnapshot]) -> Dict: """Calculate statistics for a time window""" if not snapshots: return {} mid_prices = [s.volume_weighted_mid for s in snapshots] spreads = [s.spread_bps for s in snapshots] bid_liquidity = [s.total_bid_liquidity for s in snapshots] ask_liquidity = [s.total_ask_liquidity for s in snapshots] imbalances = [s.liquidity_imbalance for s in snapshots] return { 'max_mid_price': max(mid_prices), 'min_mid_price': min(mid_prices), 'avg_mid_price': sum(mid_prices) / len(mid_prices), 'max_spread_bps': max(spreads), 'avg_spread_bps': sum(spreads) / len(spreads), 'max_bid_liquidity': max(bid_liquidity), 'avg_bid_liquidity': sum(bid_liquidity) / len(bid_liquidity), 'max_ask_liquidity': max(ask_liquidity), 'avg_ask_liquidity': sum(ask_liquidity) / len(ask_liquidity), 'max_imbalance': max(imbalances), 'avg_imbalance': sum(imbalances) / len(imbalances), 'update_count': len(snapshots) } def get_realtime_stats(self, symbol: str) -> Dict: """Get current real-time statistics for a symbol""" try: return self.realtime_stats.get(symbol, {}) except Exception as e: logger.error(f"Error getting real-time stats for {symbol}: {e}") return {} async def _stream_binance_full_depth(self, symbol: str): """Stream full depth order book from Binance WebSocket (replaces REST API)""" try: binance_symbol = symbol.replace('/', '').upper() # Full depth stream with 1000 levels, updated every 1000ms ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@depth@1000ms" logger.info(f"Connecting to Binance full depth WebSocket: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance full depth stream for {symbol}") while self.is_streaming: try: message = await websocket.recv() data = json.loads(message) # Process full depth data if 'bids' in data and 'asks' in data: # Create comprehensive COB snapshot cob_snapshot = { 'symbol': symbol, 'timestamp': time.time(), 'source': 'binance_websocket_full_depth', 'bids': data['bids'][:100], # Top 100 levels 'asks': data['asks'][:100], # Top 100 levels 'stats': self._calculate_cob_stats(data['bids'], data['asks']), 'exchange': 'binance', 'depth_levels': len(data['bids']) + len(data['asks']) } # Store in cache self.cob_data_cache[symbol] = cob_snapshot # Notify subscribers await self._notify_cob_subscribers(symbol, cob_snapshot) logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks") except Exception as e: if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower(): logger.warning(f"Binance full depth WebSocket connection closed for {symbol}") break except Exception as e: logger.error(f"Error processing full depth data for {symbol}: {e}") await asyncio.sleep(1) except Exception as e: logger.error(f"Error in Binance full depth stream for {symbol}: {e}") def _calculate_cob_stats(self, bids: List, asks: List) -> Dict: """Calculate COB statistics from order book data""" try: if not bids or not asks: return { 'mid_price': 0, 'spread_bps': 0, 'imbalance': 0, 'bid_liquidity': 0, 'ask_liquidity': 0 } # Convert string values to float bid_prices = [float(bid[0]) for bid in bids] bid_sizes = [float(bid[1]) for bid in bids] ask_prices = [float(ask[0]) for ask in asks] ask_sizes = [float(ask[1]) for ask in asks] # Calculate best bid/ask best_bid = max(bid_prices) best_ask = min(ask_prices) mid_price = (best_bid + best_ask) / 2 # Calculate spread spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0 # Calculate liquidity bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels total_liquidity = bid_liquidity + ask_liquidity # Calculate imbalance imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 return { 'mid_price': mid_price, 'spread_bps': spread_bps, 'imbalance': imbalance, 'bid_liquidity': bid_liquidity, 'ask_liquidity': ask_liquidity, 'best_bid': best_bid, 'best_ask': best_ask } except Exception as e: logger.error(f"Error calculating COB stats: {e}") return { 'mid_price': 0, 'spread_bps': 0, 'imbalance': 0, 'bid_liquidity': 0, 'ask_liquidity': 0 } async def _stream_binance_book_ticker(self, symbol: str): """Stream best bid/ask prices from Binance WebSocket""" try: binance_symbol = symbol.replace('/', '').upper() ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@bookTicker" logger.info(f"Connecting to Binance book ticker WebSocket: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance book ticker stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_book_ticker(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance book ticker message: {e}") except Exception as e: logger.error(f"Error processing Binance book ticker: {e}") except Exception as e: logger.error(f"Binance book ticker WebSocket error for {symbol}: {e}") finally: logger.info(f"Disconnected from Binance book ticker stream for {symbol}") async def _stream_binance_agg_trades(self, symbol: str): """Stream aggregated trades from Binance WebSocket for large order detection""" try: binance_symbol = symbol.replace('/', '').upper() ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@aggTrade" logger.info(f"Connecting to Binance aggregate trades WebSocket: {ws_url}") if websockets is None or websockets_connect is None: raise ImportError("websockets module not available") async with websockets_connect(ws_url) as websocket: logger.info(f"Connected to Binance aggregate trades stream for {symbol}") async for message in websocket: if not self.is_streaming: break try: data = json.loads(message) await self._process_binance_agg_trade(symbol, data) except json.JSONDecodeError as e: logger.error(f"Error parsing Binance agg trade message: {e}") except Exception as e: logger.error(f"Error processing Binance agg trade: {e}") except Exception as e: logger.error(f"Binance aggregate trades WebSocket error for {symbol}: {e}") finally: logger.info(f"Disconnected from Binance aggregate trades stream for {symbol}") async def _process_binance_full_depth(self, symbol: str, data: Dict): """Process full depth order book data from WebSocket (replaces REST API)""" try: timestamp = datetime.now() exchange_name = 'binance' # Parse full depth bids and asks (up to 1000 levels) full_bids = {} full_asks = {} for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if size > 0: full_bids[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='bid', timestamp=timestamp ) for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if size > 0: full_asks[price] = ExchangeOrderBookLevel( exchange=exchange_name, price=price, size=size, volume_usd=price * size, orders_count=1, side='ask', timestamp=timestamp ) # Update full depth storage (replaces REST API data) async with self.data_lock: self.exchange_order_books[symbol][exchange_name]['deep_bids'] = full_bids self.exchange_order_books[symbol][exchange_name]['deep_asks'] = full_asks self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId') logger.debug(f"Updated full depth via WebSocket for {symbol}: {len(full_bids)} bids, {len(full_asks)} asks") except Exception as e: logger.error(f"Error processing full depth WebSocket data for {symbol}: {e}") async def _process_binance_book_ticker(self, symbol: str, data: Dict): """Process book ticker data for best bid/ask tracking""" try: timestamp = datetime.now() best_bid_price = float(data.get('b', 0)) best_bid_qty = float(data.get('B', 0)) best_ask_price = float(data.get('a', 0)) best_ask_qty = float(data.get('A', 0)) # Store best bid/ask data async with self.data_lock: if symbol not in self.realtime_stats: self.realtime_stats[symbol] = {} self.realtime_stats[symbol].update({ 'best_bid_price': best_bid_price, 'best_bid_qty': best_bid_qty, 'best_ask_price': best_ask_price, 'best_ask_qty': best_ask_qty, 'spread': best_ask_price - best_bid_price, 'mid_price': (best_bid_price + best_ask_price) / 2, 'book_ticker_timestamp': timestamp }) logger.debug(f"Book ticker update for {symbol}: Bid {best_bid_price}@{best_bid_qty}, Ask {best_ask_price}@{best_ask_qty}") except Exception as e: logger.error(f"Error processing book ticker for {symbol}: {e}") async def _process_binance_agg_trade(self, symbol: str, data: Dict): """Process aggregate trade data for large order detection""" try: timestamp = datetime.fromtimestamp(int(data['T']) / 1000) price = float(data['p']) quantity = float(data['q']) is_buyer_maker = data['m'] agg_trade_id = data['a'] first_trade_id = data['f'] last_trade_id = data['l'] # Calculate trade value and size trade_value_usd = price * quantity trade_count = last_trade_id - first_trade_id + 1 # Detect large orders (institutional activity) is_large_order = trade_value_usd > 10000 # $10k+ trades is_whale_order = trade_value_usd > 100000 # $100k+ trades agg_trade = { 'symbol': symbol, 'timestamp': timestamp, 'price': price, 'quantity': quantity, 'value_usd': trade_value_usd, 'trade_count': trade_count, 'is_buyer_maker': is_buyer_maker, 'side': 'sell' if is_buyer_maker else 'buy', # Opposite of maker 'is_large_order': is_large_order, 'is_whale_order': is_whale_order, 'agg_trade_id': agg_trade_id } # Add to aggregate trade tracking await self._add_agg_trade_to_analysis(symbol, agg_trade) # Log significant trades if is_whale_order: logger.info(f"WHALE ORDER detected for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()} at ${price}") elif is_large_order: logger.debug(f"Large order for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()}") except Exception as e: logger.error(f"Error processing aggregate trade for {symbol}: {e}") async def _add_agg_trade_to_analysis(self, symbol: str, agg_trade: Dict): """Add aggregate trade to analysis queues""" try: async with self.data_lock: # Initialize if needed if symbol not in self.realtime_stats: self.realtime_stats[symbol] = {} if 'agg_trades' not in self.realtime_stats[symbol]: self.realtime_stats[symbol]['agg_trades'] = deque(maxlen=1000) # Add to aggregate trade history self.realtime_stats[symbol]['agg_trades'].append(agg_trade) # Update real-time aggregate statistics recent_trades = list(self.realtime_stats[symbol]['agg_trades'])[-100:] # Last 100 trades if recent_trades: total_buy_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'buy') total_sell_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'sell') total_volume = total_buy_volume + total_sell_volume large_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_large_order']) large_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_large_order']) whale_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_whale_order']) whale_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_whale_order']) # Calculate order flow metrics self.realtime_stats[symbol].update({ 'buy_sell_ratio': total_buy_volume / total_sell_volume if total_sell_volume > 0 else float('inf'), 'total_volume_100': total_volume, 'large_order_ratio': (large_buy_count + large_sell_count) / len(recent_trades), 'whale_activity': whale_buy_count + whale_sell_count, 'institutional_flow': 'BULLISH' if total_buy_volume > total_sell_volume * 1.2 else 'BEARISH' if total_sell_volume > total_buy_volume * 1.2 else 'NEUTRAL' }) except Exception as e: logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}") def get_latest_cob_data(self, symbol: str) -> Optional[Dict]: """Get latest COB data for a symbol from cache""" try: if symbol in self.cob_data_cache: return self.cob_data_cache[symbol] return None except Exception as e: logger.error(f"Error getting latest COB data for {symbol}: {e}") return None