1574 lines
72 KiB
Python
1574 lines
72 KiB
Python
"""
|
|
Multi-Exchange Consolidated Order Book (COB) Data Provider
|
|
|
|
This module aggregates order book data from multiple cryptocurrency exchanges to provide:
|
|
- Consolidated Order Book (COB) data across multiple exchanges
|
|
- Fine-grain volume buckets at configurable price levels
|
|
- Real-time order book depth aggregation
|
|
- Volume-weighted consolidated pricing
|
|
- Exchange-specific order flow analysis
|
|
- Liquidity distribution metrics
|
|
|
|
Supported Exchanges:
|
|
- Binance (via WebSocket depth streams)
|
|
- Coinbase Pro (via WebSocket level2 updates)
|
|
- Kraken (via WebSocket book updates)
|
|
- Huobi (via WebSocket mbp updates)
|
|
- Bitfinex (via WebSocket book updates)
|
|
|
|
Data is structured for consumption by CNN/DQN models and trading dashboards.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
try:
|
|
import websockets
|
|
from websockets.client import connect as websockets_connect
|
|
except ImportError:
|
|
# Fallback for environments where websockets is not available
|
|
websockets = None
|
|
websockets_connect = None
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable, Union, Awaitable
|
|
from collections import deque, defaultdict
|
|
from dataclasses import dataclass, field
|
|
from threading import Thread, Lock
|
|
import requests
|
|
import ccxt
|
|
from enum import Enum
|
|
import math
|
|
import aiohttp
|
|
import aiohttp.resolver
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ExchangeType(Enum):
|
|
BINANCE = "binance"
|
|
COINBASE = "coinbase"
|
|
KRAKEN = "kraken"
|
|
HUOBI = "huobi"
|
|
BITFINEX = "bitfinex"
|
|
|
|
@dataclass
|
|
class ExchangeOrderBookLevel:
|
|
"""Single order book level with exchange attribution"""
|
|
exchange: str
|
|
price: float
|
|
size: float
|
|
volume_usd: float
|
|
orders_count: int
|
|
side: str # 'bid' or 'ask'
|
|
timestamp: datetime
|
|
raw_data: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class ConsolidatedOrderBookLevel:
|
|
"""Consolidated order book level across multiple exchanges"""
|
|
price: float
|
|
total_size: float
|
|
total_volume_usd: float
|
|
total_orders: int
|
|
side: str
|
|
exchange_breakdown: Dict[str, ExchangeOrderBookLevel]
|
|
dominant_exchange: str
|
|
liquidity_score: float
|
|
timestamp: datetime
|
|
|
|
@dataclass
|
|
class COBSnapshot:
|
|
"""Complete Consolidated Order Book snapshot"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
consolidated_bids: List[ConsolidatedOrderBookLevel]
|
|
consolidated_asks: List[ConsolidatedOrderBookLevel]
|
|
exchanges_active: List[str]
|
|
volume_weighted_mid: float
|
|
total_bid_liquidity: float
|
|
total_ask_liquidity: float
|
|
spread_bps: float
|
|
liquidity_imbalance: float
|
|
price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets
|
|
|
|
@dataclass
|
|
class ExchangeConfig:
|
|
"""Exchange configuration for COB aggregation"""
|
|
exchange_type: ExchangeType
|
|
weight: float = 1.0
|
|
enabled: bool = True
|
|
websocket_url: str = ""
|
|
rest_api_url: str = ""
|
|
symbols_mapping: Dict[str, str] = field(default_factory=dict)
|
|
rate_limits: Dict[str, int] = field(default_factory=dict)
|
|
|
|
class MultiExchangeCOBProvider:
|
|
"""
|
|
Multi-Exchange Consolidated Order Book Data Provider
|
|
|
|
Aggregates real-time order book data from multiple cryptocurrency exchanges
|
|
to create a consolidated view of market liquidity and pricing.
|
|
"""
|
|
|
|
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
|
|
"""
|
|
Initialize Multi-Exchange COB Provider
|
|
|
|
Args:
|
|
symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT'])
|
|
bucket_size_bps: Price bucket size in basis points for fine-grain analysis
|
|
"""
|
|
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
|
|
self.bucket_size_bps = bucket_size_bps
|
|
self.bucket_update_frequency = 100 # ms
|
|
self.consolidation_frequency = 100 # ms
|
|
|
|
# REST API configuration for deep order book
|
|
self.rest_api_frequency = 2000 # ms - full snapshot every 2 seconds (reduced frequency for deeper data)
|
|
self.rest_depth_limit = 1000 # Increased to 1000 levels via REST for maximum depth
|
|
|
|
# Exchange configurations
|
|
self.exchange_configs = self._initialize_exchange_configs()
|
|
|
|
# Order book storage - now with deep and live separation
|
|
self.exchange_order_books = {
|
|
symbol: {
|
|
exchange.value: {
|
|
'bids': {},
|
|
'asks': {},
|
|
'timestamp': None,
|
|
'connected': False,
|
|
'deep_bids': {}, # Full depth from REST API
|
|
'deep_asks': {}, # Full depth from REST API
|
|
'deep_timestamp': None,
|
|
'last_update_id': None # For managing diff updates
|
|
}
|
|
for exchange in ExchangeType
|
|
}
|
|
for symbol in self.symbols
|
|
}
|
|
|
|
# Consolidated order books
|
|
self.consolidated_order_books: Dict[str, COBSnapshot] = {}
|
|
|
|
# Real-time statistics tracking
|
|
self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
|
self.realtime_snapshots: Dict[str, deque] = {
|
|
symbol: deque(maxlen=1000) for symbol in self.symbols
|
|
}
|
|
|
|
# Session tracking for SVP
|
|
self.session_start_time = datetime.now()
|
|
self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols}
|
|
self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
|
|
|
# Fixed USD bucket sizes for different symbols as requested
|
|
self.fixed_usd_buckets = {
|
|
'BTC/USDT': 10.0, # $10 buckets for BTC
|
|
'ETH/USDT': 1.0, # $1 buckets for ETH
|
|
}
|
|
|
|
# WebSocket management
|
|
self.is_streaming = False
|
|
self.active_exchanges = ['binance'] # Start with Binance only
|
|
|
|
# Callbacks for real-time updates
|
|
self.cob_update_callbacks = []
|
|
self.bucket_update_callbacks = []
|
|
|
|
# Performance tracking
|
|
self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType}
|
|
self.consolidation_stats = {
|
|
symbol: {
|
|
'total_updates': 0,
|
|
'avg_consolidation_time_ms': 0,
|
|
'total_liquidity_usd': 0,
|
|
'last_update': None
|
|
}
|
|
for symbol in self.symbols
|
|
}
|
|
self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)}
|
|
|
|
# Thread safety
|
|
self.data_lock = asyncio.Lock()
|
|
|
|
# Initialize aiohttp session and connector to None, will be set up in start_streaming
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
self.connector: Optional[aiohttp.TCPConnector] = None
|
|
self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization
|
|
|
|
# Create REST API session
|
|
# Fix for Windows aiodns issue - use ThreadedResolver instead
|
|
connector = aiohttp.TCPConnector(
|
|
resolver=aiohttp.ThreadedResolver(),
|
|
use_dns_cache=False
|
|
)
|
|
self.rest_session = aiohttp.ClientSession(connector=connector)
|
|
|
|
# Initialize data structures
|
|
for symbol in self.symbols:
|
|
self.exchange_order_books[symbol]['binance']['connected'] = False
|
|
self.exchange_order_books[symbol]['binance']['deep_bids'] = {}
|
|
self.exchange_order_books[symbol]['binance']['deep_asks'] = {}
|
|
self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None
|
|
self.exchange_order_books[symbol]['binance']['last_update_id'] = None
|
|
self.realtime_snapshots[symbol].append(COBSnapshot(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(),
|
|
consolidated_bids=[],
|
|
consolidated_asks=[],
|
|
exchanges_active=[],
|
|
volume_weighted_mid=0.0,
|
|
total_bid_liquidity=0.0,
|
|
total_ask_liquidity=0.0,
|
|
spread_bps=0.0,
|
|
liquidity_imbalance=0.0,
|
|
price_buckets={}
|
|
))
|
|
|
|
logger.info(f"Multi-Exchange COB Provider initialized")
|
|
logger.info(f"Symbols: {self.symbols}")
|
|
logger.info(f"Bucket size: {bucket_size_bps} bps")
|
|
logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}")
|
|
logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}")
|
|
|
|
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
|
|
"""Initialize exchange configurations"""
|
|
configs = {}
|
|
|
|
# Binance configuration
|
|
configs[ExchangeType.BINANCE.value] = ExchangeConfig(
|
|
exchange_type=ExchangeType.BINANCE,
|
|
weight=0.3, # Higher weight due to volume
|
|
websocket_url="wss://stream.binance.com:9443/ws/",
|
|
rest_api_url="https://api.binance.com",
|
|
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
|
|
rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000}
|
|
)
|
|
|
|
# Coinbase Pro configuration
|
|
configs[ExchangeType.COINBASE.value] = ExchangeConfig(
|
|
exchange_type=ExchangeType.COINBASE,
|
|
weight=0.25,
|
|
websocket_url="wss://ws-feed.exchange.coinbase.com",
|
|
rest_api_url="https://api.exchange.coinbase.com",
|
|
symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'},
|
|
rate_limits={'requests_per_minute': 600}
|
|
)
|
|
|
|
# Kraken configuration
|
|
configs[ExchangeType.KRAKEN.value] = ExchangeConfig(
|
|
exchange_type=ExchangeType.KRAKEN,
|
|
weight=0.2,
|
|
websocket_url="wss://ws.kraken.com",
|
|
rest_api_url="https://api.kraken.com",
|
|
symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'},
|
|
rate_limits={'requests_per_minute': 900}
|
|
)
|
|
|
|
# Huobi configuration
|
|
configs[ExchangeType.HUOBI.value] = ExchangeConfig(
|
|
exchange_type=ExchangeType.HUOBI,
|
|
weight=0.15,
|
|
websocket_url="wss://api.huobi.pro/ws",
|
|
rest_api_url="https://api.huobi.pro",
|
|
symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'},
|
|
rate_limits={'requests_per_minute': 2000}
|
|
)
|
|
|
|
# Bitfinex configuration
|
|
configs[ExchangeType.BITFINEX.value] = ExchangeConfig(
|
|
exchange_type=ExchangeType.BITFINEX,
|
|
weight=0.1,
|
|
websocket_url="wss://api-pub.bitfinex.com/ws/2",
|
|
rest_api_url="https://api-pub.bitfinex.com",
|
|
symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'},
|
|
rate_limits={'requests_per_minute': 1000}
|
|
)
|
|
|
|
return configs
|
|
|
|
async def start_streaming(self):
|
|
"""Start real-time order book streaming from all configured exchanges"""
|
|
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
|
|
self.is_streaming = True
|
|
|
|
# Setup aiohttp session here, within the async context
|
|
await self._setup_http_session()
|
|
|
|
# Start WebSocket connections for each active exchange and symbol
|
|
tasks = []
|
|
for symbol in self.symbols:
|
|
for exchange_name, config in self.exchange_configs.items():
|
|
if config.enabled and exchange_name in self.active_exchanges:
|
|
# Start WebSocket stream
|
|
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
|
|
|
|
# Start deep order book (REST API) stream
|
|
tasks.append(self._stream_deep_orderbook(exchange_name, symbol))
|
|
|
|
# Start trade stream (for SVP)
|
|
if exchange_name == 'binance': # Only Binance for now
|
|
tasks.append(self._stream_binance_trades(symbol))
|
|
|
|
# Start continuous consolidation and bucket updates
|
|
tasks.append(self._continuous_consolidation())
|
|
tasks.append(self._continuous_bucket_updates())
|
|
|
|
logger.info(f"Starting {len(tasks)} COB streaming tasks")
|
|
await asyncio.gather(*tasks)
|
|
|
|
async def _setup_http_session(self):
|
|
"""Setup aiohttp session and connector"""
|
|
self.connector = aiohttp.TCPConnector(
|
|
resolver=aiohttp.ThreadedResolver() # This is now created inside async function
|
|
)
|
|
self.session = aiohttp.ClientSession(connector=self.connector)
|
|
self.rest_session = aiohttp.ClientSession(connector=self.connector) # Moved here from __init__
|
|
logger.info("aiohttp session and connector setup completed")
|
|
|
|
async def stop_streaming(self):
|
|
"""Stop real-time order book streaming and close sessions"""
|
|
logger.info("Stopping COB Integration")
|
|
self.is_streaming = False
|
|
|
|
if self.session and not self.session.closed:
|
|
await self.session.close()
|
|
logger.info("aiohttp session closed")
|
|
|
|
if self.rest_session and not self.rest_session.closed:
|
|
await self.rest_session.close()
|
|
logger.info("aiohttp REST session closed")
|
|
|
|
if self.connector and not self.connector.closed:
|
|
await self.connector.close()
|
|
logger.info("aiohttp connector closed")
|
|
|
|
logger.info("COB Integration stopped")
|
|
|
|
async def _stream_deep_orderbook(self, exchange_name: str, symbol: str):
|
|
"""Fetch deep order book data via REST API periodically"""
|
|
while self.is_streaming:
|
|
try:
|
|
start_time = time.time()
|
|
|
|
if exchange_name == 'binance':
|
|
await self._fetch_binance_deep_orderbook(symbol)
|
|
# Add other exchanges here as needed
|
|
|
|
processing_time = (time.time() - start_time) * 1000
|
|
self.processing_times['rest_api'].append(processing_time)
|
|
|
|
logger.debug(f"Deep order book fetch for {symbol} took {processing_time:.2f}ms")
|
|
|
|
# Wait before next fetch
|
|
await asyncio.sleep(self.rest_api_frequency / 1000)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching deep order book for {symbol}: {e}")
|
|
await asyncio.sleep(5) # Wait 5 seconds on error
|
|
|
|
async def _fetch_binance_deep_orderbook(self, symbol: str):
|
|
"""Fetch deep order book from Binance REST API"""
|
|
try:
|
|
if not self.rest_session:
|
|
return
|
|
|
|
# Convert symbol format for Binance
|
|
binance_symbol = symbol.replace('/', '').upper()
|
|
url = f"https://api.binance.com/api/v3/depth"
|
|
params = {
|
|
'symbol': binance_symbol,
|
|
'limit': self.rest_depth_limit
|
|
}
|
|
|
|
async with self.rest_session.get(url, params=params) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
await self._process_binance_deep_orderbook(symbol, data)
|
|
else:
|
|
logger.error(f"Binance REST API error {response.status} for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Binance deep order book for {symbol}: {e}")
|
|
|
|
async def _process_binance_deep_orderbook(self, symbol: str, data: Dict):
|
|
"""Process deep order book data from Binance REST API"""
|
|
try:
|
|
timestamp = datetime.now()
|
|
exchange_name = 'binance'
|
|
|
|
# Parse deep bids and asks
|
|
deep_bids = {}
|
|
deep_asks = {}
|
|
|
|
for bid_data in data.get('bids', []):
|
|
price = float(bid_data[0])
|
|
size = float(bid_data[1])
|
|
if size > 0:
|
|
deep_bids[price] = ExchangeOrderBookLevel(
|
|
exchange=exchange_name,
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='bid',
|
|
timestamp=timestamp
|
|
)
|
|
|
|
for ask_data in data.get('asks', []):
|
|
price = float(ask_data[0])
|
|
size = float(ask_data[1])
|
|
if size > 0:
|
|
deep_asks[price] = ExchangeOrderBookLevel(
|
|
exchange=exchange_name,
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=timestamp
|
|
)
|
|
|
|
# Update deep order book storage
|
|
async with self.data_lock:
|
|
self.exchange_order_books[symbol][exchange_name]['deep_bids'] = deep_bids
|
|
self.exchange_order_books[symbol][exchange_name]['deep_asks'] = deep_asks
|
|
self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp
|
|
self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId')
|
|
|
|
logger.debug(f"Updated deep order book for {symbol}: {len(deep_bids)} bids, {len(deep_asks)} asks")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing deep order book for {symbol}: {e}")
|
|
|
|
async def _stream_exchange_orderbook(self, exchange_name: str, symbol: str):
|
|
"""Stream order book data from specific exchange"""
|
|
config = self.exchange_configs[exchange_name]
|
|
|
|
try:
|
|
if exchange_name == ExchangeType.BINANCE.value:
|
|
await self._stream_binance_orderbook(symbol, config)
|
|
elif exchange_name == ExchangeType.COINBASE.value:
|
|
await self._stream_coinbase_orderbook(symbol, config)
|
|
elif exchange_name == ExchangeType.KRAKEN.value:
|
|
await self._stream_kraken_orderbook(symbol, config)
|
|
elif exchange_name == ExchangeType.HUOBI.value:
|
|
await self._stream_huobi_orderbook(symbol, config)
|
|
elif exchange_name == ExchangeType.BITFINEX.value:
|
|
await self._stream_bitfinex_orderbook(symbol, config)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error streaming {exchange_name} for {symbol}: {e}")
|
|
await asyncio.sleep(5) # Wait before reconnecting
|
|
|
|
async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig):
|
|
"""Stream order book data from Binance"""
|
|
try:
|
|
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth@1000ms"
|
|
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
|
|
|
|
if websockets is None or websockets_connect is None:
|
|
raise ImportError("websockets module not available")
|
|
async with websockets_connect(ws_url) as websocket:
|
|
self.exchange_order_books[symbol]['binance']['connected'] = True
|
|
logger.info(f"Connected to Binance order book stream for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_binance_orderbook(symbol, data)
|
|
|
|
# Also track trades for SVP
|
|
await self._track_binance_trades(symbol, data)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing Binance message: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing Binance data: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Binance WebSocket error for {symbol}: {e}")
|
|
finally:
|
|
self.exchange_order_books[symbol]['binance']['connected'] = False
|
|
logger.info(f"Disconnected from Binance order book stream for {symbol}")
|
|
|
|
async def _track_binance_trades(self, symbol: str, data: Dict):
|
|
"""Track executed trades from Binance for SVP calculation"""
|
|
try:
|
|
# Binance depth stream doesn't include trades, so we need to connect to trade stream
|
|
if 'e' in data and data['e'] == 'trade':
|
|
trade = {
|
|
'exchange': 'binance',
|
|
'symbol': symbol,
|
|
'price': float(data['p']),
|
|
'quantity': float(data['q']),
|
|
'side': 'buy' if data['m'] else 'sell', # m is true for maker sell
|
|
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
|
|
'volume_usd': float(data['p']) * float(data['q'])
|
|
}
|
|
|
|
await self._add_trade_to_svp(symbol, trade)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error tracking Binance trade: {e}")
|
|
|
|
async def _add_trade_to_svp(self, symbol: str, trade: Dict):
|
|
"""Add trade to session volume profile"""
|
|
try:
|
|
async with self.data_lock:
|
|
# Add to session trades
|
|
self.session_trades[symbol].append(trade)
|
|
|
|
# Update SVP cache
|
|
price = trade['price']
|
|
side = trade['side']
|
|
volume = trade['volume_usd']
|
|
|
|
if price not in self.svp_cache[symbol]:
|
|
self.svp_cache[symbol][price] = {'buy_volume': 0.0, 'sell_volume': 0.0}
|
|
|
|
if side == 'buy':
|
|
self.svp_cache[symbol][price]['buy_volume'] += volume
|
|
else:
|
|
self.svp_cache[symbol][price]['sell_volume'] += volume
|
|
|
|
# Keep only recent trades (last 24 hours)
|
|
cutoff_time = datetime.now() - timedelta(hours=24)
|
|
self.session_trades[symbol] = [
|
|
t for t in self.session_trades[symbol]
|
|
if t['timestamp'] > cutoff_time
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding trade to SVP: {e}")
|
|
|
|
def get_session_volume_profile(self, symbol: str, bucket_size: Optional[float] = None) -> Dict:
|
|
"""Get session volume profile for a symbol"""
|
|
try:
|
|
if bucket_size is None:
|
|
bucket_size = self.fixed_usd_buckets.get(symbol, 1.0)
|
|
|
|
svp_data = {}
|
|
|
|
# Access SVP cache without lock for read-only operations (generally safe)
|
|
try:
|
|
for price, volumes in self.svp_cache[symbol].items():
|
|
bucket_price = math.floor(price / bucket_size) * bucket_size
|
|
|
|
if bucket_price not in svp_data:
|
|
svp_data[bucket_price] = {
|
|
'buy_volume': 0.0,
|
|
'sell_volume': 0.0,
|
|
'total_volume': 0.0,
|
|
'trade_count': 0
|
|
}
|
|
|
|
svp_data[bucket_price]['buy_volume'] += volumes['buy_volume']
|
|
svp_data[bucket_price]['sell_volume'] += volumes['sell_volume']
|
|
svp_data[bucket_price]['total_volume'] += volumes['buy_volume'] + volumes['sell_volume']
|
|
svp_data[bucket_price]['trade_count'] += 1
|
|
except Exception as e:
|
|
logger.error(f"Error accessing SVP cache for {symbol}: {e}")
|
|
return {}
|
|
|
|
# Convert to sorted list
|
|
svp_list = []
|
|
for price in sorted(svp_data.keys()):
|
|
data = svp_data[price]
|
|
if data['total_volume'] > 0:
|
|
svp_list.append({
|
|
'price': price,
|
|
'buy_volume': data['buy_volume'],
|
|
'sell_volume': data['sell_volume'],
|
|
'total_volume': data['total_volume'],
|
|
'trade_count': data['trade_count'],
|
|
'buy_percent': (data['buy_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0,
|
|
'sell_percent': (data['sell_volume'] / data['total_volume']) * 100 if data['total_volume'] > 0 else 0
|
|
})
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'session_start': self.session_start_time.isoformat(),
|
|
'bucket_size': bucket_size,
|
|
'data': svp_list
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting session volume profile for {symbol}: {e}")
|
|
return {}
|
|
|
|
async def _process_binance_orderbook(self, symbol: str, data: Dict):
|
|
"""Process Binance order book update"""
|
|
try:
|
|
timestamp = datetime.now()
|
|
exchange_name = ExchangeType.BINANCE.value
|
|
|
|
# Parse bids and asks
|
|
bids = {}
|
|
asks = {}
|
|
|
|
for bid_data in data.get('bids', []):
|
|
price = float(bid_data[0])
|
|
size = float(bid_data[1])
|
|
if size > 0: # Only include non-zero sizes
|
|
bids[price] = ExchangeOrderBookLevel(
|
|
exchange=exchange_name,
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='bid',
|
|
timestamp=timestamp
|
|
)
|
|
|
|
for ask_data in data.get('asks', []):
|
|
price = float(ask_data[0])
|
|
size = float(ask_data[1])
|
|
if size > 0:
|
|
asks[price] = ExchangeOrderBookLevel(
|
|
exchange=exchange_name,
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=timestamp
|
|
)
|
|
|
|
# Update exchange order book
|
|
async with self.data_lock:
|
|
self.exchange_order_books[symbol][exchange_name].update({
|
|
'bids': bids,
|
|
'asks': asks,
|
|
'timestamp': timestamp,
|
|
'connected': True
|
|
})
|
|
|
|
logger.debug(f"Updated Binance order book for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
|
|
|
self.exchange_update_counts[exchange_name] += 1
|
|
|
|
# Log every 1000th update
|
|
if self.exchange_update_counts[exchange_name] % 1000 == 0:
|
|
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Binance updates for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True)
|
|
|
|
async def _process_coinbase_orderbook(self, symbol: str, data: Dict):
|
|
"""Process Coinbase order book data"""
|
|
try:
|
|
if data.get('type') == 'snapshot':
|
|
# Initial snapshot
|
|
bids = {}
|
|
asks = {}
|
|
|
|
for bid_data in data.get('bids', []):
|
|
price, size = float(bid_data[0]), float(bid_data[1])
|
|
if size > 0:
|
|
bids[price] = ExchangeOrderBookLevel(
|
|
exchange='coinbase',
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1, # Coinbase doesn't provide order count
|
|
side='bid',
|
|
timestamp=datetime.now(),
|
|
raw_data=bid_data
|
|
)
|
|
|
|
for ask_data in data.get('asks', []):
|
|
price, size = float(ask_data[0]), float(ask_data[1])
|
|
if size > 0:
|
|
asks[price] = ExchangeOrderBookLevel(
|
|
exchange='coinbase',
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=datetime.now(),
|
|
raw_data=ask_data
|
|
)
|
|
|
|
# Update order book
|
|
async with self.data_lock:
|
|
if symbol not in self.exchange_order_books:
|
|
self.exchange_order_books[symbol] = {}
|
|
|
|
self.exchange_order_books[symbol]['coinbase'] = {
|
|
'bids': bids,
|
|
'asks': asks,
|
|
'last_update': datetime.now(),
|
|
'connected': True
|
|
}
|
|
|
|
logger.info(f"Coinbase snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
|
|
|
elif data.get('type') == 'l2update':
|
|
# Level 2 update
|
|
async with self.data_lock:
|
|
if symbol in self.exchange_order_books and 'coinbase' in self.exchange_order_books[symbol]:
|
|
coinbase_data = self.exchange_order_books[symbol]['coinbase']
|
|
|
|
for change in data.get('changes', []):
|
|
side, price_str, size_str = change
|
|
price, size = float(price_str), float(size_str)
|
|
|
|
if side == 'buy':
|
|
if size == 0:
|
|
# Remove level
|
|
coinbase_data['bids'].pop(price, None)
|
|
else:
|
|
# Update level
|
|
coinbase_data['bids'][price] = ExchangeOrderBookLevel(
|
|
exchange='coinbase',
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='bid',
|
|
timestamp=datetime.now(),
|
|
raw_data=change
|
|
)
|
|
elif side == 'sell':
|
|
if size == 0:
|
|
# Remove level
|
|
coinbase_data['asks'].pop(price, None)
|
|
else:
|
|
# Update level
|
|
coinbase_data['asks'][price] = ExchangeOrderBookLevel(
|
|
exchange='coinbase',
|
|
price=price,
|
|
size=size,
|
|
volume_usd=price * size,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=datetime.now(),
|
|
raw_data=change
|
|
)
|
|
|
|
coinbase_data['last_update'] = datetime.now()
|
|
|
|
# Update exchange count
|
|
exchange_name = 'coinbase'
|
|
if exchange_name not in self.exchange_update_counts:
|
|
self.exchange_update_counts[exchange_name] = 0
|
|
self.exchange_update_counts[exchange_name] += 1
|
|
|
|
# Log every 1000th update
|
|
if self.exchange_update_counts[exchange_name] % 1000 == 0:
|
|
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Coinbase updates for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Coinbase order book for {symbol}: {e}", exc_info=True)
|
|
|
|
async def _process_kraken_orderbook(self, symbol: str, data: Dict):
|
|
"""Process Kraken order book data"""
|
|
try:
|
|
# Kraken sends different message types
|
|
if isinstance(data, list) and len(data) > 1:
|
|
# Order book update format: [channel_id, data, channel_name, pair]
|
|
if len(data) >= 4 and data[2] == "book-25":
|
|
book_data = data[1]
|
|
|
|
# Check for snapshot vs update
|
|
if 'bs' in book_data and 'as' in book_data:
|
|
# Snapshot
|
|
bids = {}
|
|
asks = {}
|
|
|
|
for bid_data in book_data.get('bs', []):
|
|
price, volume, timestamp = float(bid_data[0]), float(bid_data[1]), float(bid_data[2])
|
|
if volume > 0:
|
|
bids[price] = ExchangeOrderBookLevel(
|
|
exchange='kraken',
|
|
price=price,
|
|
size=volume,
|
|
volume_usd=price * volume,
|
|
orders_count=1, # Kraken doesn't provide order count in book feed
|
|
side='bid',
|
|
timestamp=datetime.fromtimestamp(timestamp),
|
|
raw_data=bid_data
|
|
)
|
|
|
|
for ask_data in book_data.get('as', []):
|
|
price, volume, timestamp = float(ask_data[0]), float(ask_data[1]), float(ask_data[2])
|
|
if volume > 0:
|
|
asks[price] = ExchangeOrderBookLevel(
|
|
exchange='kraken',
|
|
price=price,
|
|
size=volume,
|
|
volume_usd=price * volume,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=datetime.fromtimestamp(timestamp),
|
|
raw_data=ask_data
|
|
)
|
|
|
|
# Update order book
|
|
async with self.data_lock:
|
|
if symbol not in self.exchange_order_books:
|
|
self.exchange_order_books[symbol] = {}
|
|
|
|
self.exchange_order_books[symbol]['kraken'] = {
|
|
'bids': bids,
|
|
'asks': asks,
|
|
'last_update': datetime.now(),
|
|
'connected': True
|
|
}
|
|
|
|
logger.info(f"Kraken snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
|
|
|
else:
|
|
# Incremental update
|
|
async with self.data_lock:
|
|
if symbol in self.exchange_order_books and 'kraken' in self.exchange_order_books[symbol]:
|
|
kraken_data = self.exchange_order_books[symbol]['kraken']
|
|
|
|
# Process bid updates
|
|
for bid_update in book_data.get('b', []):
|
|
price, volume, timestamp = float(bid_update[0]), float(bid_update[1]), float(bid_update[2])
|
|
if volume == 0:
|
|
# Remove level
|
|
kraken_data['bids'].pop(price, None)
|
|
else:
|
|
# Update level
|
|
kraken_data['bids'][price] = ExchangeOrderBookLevel(
|
|
exchange='kraken',
|
|
price=price,
|
|
size=volume,
|
|
volume_usd=price * volume,
|
|
orders_count=1,
|
|
side='bid',
|
|
timestamp=datetime.fromtimestamp(timestamp),
|
|
raw_data=bid_update
|
|
)
|
|
|
|
# Process ask updates
|
|
for ask_update in book_data.get('a', []):
|
|
price, volume, timestamp = float(ask_update[0]), float(ask_update[1]), float(ask_update[2])
|
|
if volume == 0:
|
|
# Remove level
|
|
kraken_data['asks'].pop(price, None)
|
|
else:
|
|
# Update level
|
|
kraken_data['asks'][price] = ExchangeOrderBookLevel(
|
|
exchange='kraken',
|
|
price=price,
|
|
size=volume,
|
|
volume_usd=price * volume,
|
|
orders_count=1,
|
|
side='ask',
|
|
timestamp=datetime.fromtimestamp(timestamp),
|
|
raw_data=ask_update
|
|
)
|
|
|
|
kraken_data['last_update'] = datetime.now()
|
|
|
|
# Update exchange count
|
|
exchange_name = 'kraken'
|
|
if exchange_name not in self.exchange_update_counts:
|
|
self.exchange_update_counts[exchange_name] = 0
|
|
self.exchange_update_counts[exchange_name] += 1
|
|
|
|
# Log every 1000th update
|
|
if self.exchange_update_counts[exchange_name] % 1000 == 0:
|
|
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Kraken updates for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Kraken order book for {symbol}: {e}", exc_info=True)
|
|
|
|
async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig):
|
|
"""Stream Coinbase order book data via WebSocket"""
|
|
try:
|
|
import json
|
|
if websockets is None or websockets_connect is None:
|
|
raise ImportError("websockets module not available")
|
|
|
|
# Coinbase Pro WebSocket URL
|
|
ws_url = "wss://ws-feed.pro.coinbase.com"
|
|
coinbase_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '-'))
|
|
|
|
# Subscribe message for level2 order book updates
|
|
subscribe_message = {
|
|
"type": "subscribe",
|
|
"product_ids": [coinbase_symbol],
|
|
"channels": ["level2"]
|
|
}
|
|
|
|
logger.info(f"Connecting to Coinbase order book stream for {symbol}")
|
|
|
|
async with websockets_connect(ws_url) as websocket:
|
|
# Send subscription
|
|
await websocket.send(json.dumps(subscribe_message))
|
|
logger.info(f"Subscribed to Coinbase level2 for {coinbase_symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_coinbase_orderbook(symbol, data)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing Coinbase message: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing Coinbase orderbook: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Coinbase order book stream error for {symbol}: {e}")
|
|
finally:
|
|
logger.info(f"Disconnected from Coinbase order book stream for {symbol}")
|
|
|
|
async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig):
|
|
"""Stream Kraken order book data via WebSocket"""
|
|
try:
|
|
import json
|
|
if websockets is None or websockets_connect is None:
|
|
raise ImportError("websockets module not available")
|
|
|
|
# Kraken WebSocket URL
|
|
ws_url = "wss://ws.kraken.com"
|
|
kraken_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', ''))
|
|
|
|
# Subscribe message for book updates
|
|
subscribe_message = {
|
|
"event": "subscribe",
|
|
"pair": [kraken_symbol],
|
|
"subscription": {"name": "book", "depth": 25}
|
|
}
|
|
|
|
logger.info(f"Connecting to Kraken order book stream for {symbol}")
|
|
|
|
async with websockets_connect(ws_url) as websocket:
|
|
# Send subscription
|
|
await websocket.send(json.dumps(subscribe_message))
|
|
logger.info(f"Subscribed to Kraken book for {kraken_symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_kraken_orderbook(symbol, data)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing Kraken message: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing Kraken orderbook: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Kraken order book stream error for {symbol}: {e}")
|
|
finally:
|
|
logger.info(f"Disconnected from Kraken order book stream for {symbol}")
|
|
|
|
async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig):
|
|
"""Stream Huobi order book data (placeholder implementation)"""
|
|
try:
|
|
logger.info(f"Huobi streaming for {symbol} not yet implemented")
|
|
await asyncio.sleep(60) # Sleep to prevent spam
|
|
except Exception as e:
|
|
logger.error(f"Error streaming Huobi order book for {symbol}: {e}")
|
|
|
|
async def _stream_bitfinex_orderbook(self, symbol: str, config: ExchangeConfig):
|
|
"""Stream Bitfinex order book data (placeholder implementation)"""
|
|
try:
|
|
logger.info(f"Bitfinex streaming for {symbol} not yet implemented")
|
|
await asyncio.sleep(60) # Sleep to prevent spam
|
|
except Exception as e:
|
|
logger.error(f"Error streaming Bitfinex order book for {symbol}: {e}")
|
|
|
|
async def _stream_binance_trades(self, symbol: str):
|
|
"""Stream trade data from Binance for SVP calculation"""
|
|
try:
|
|
config = self.exchange_configs[ExchangeType.BINANCE.value]
|
|
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade"
|
|
logger.info(f"Connecting to Binance trade stream: {ws_url}")
|
|
|
|
if websockets is None or websockets_connect is None:
|
|
raise ImportError("websockets module not available")
|
|
async with websockets_connect(ws_url) as websocket:
|
|
logger.info(f"Connected to Binance trade stream for {symbol}")
|
|
|
|
async for message in websocket:
|
|
if not self.is_streaming:
|
|
break
|
|
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_binance_trade(symbol, data)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing Binance trade message: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing Binance trade: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Binance trade stream error for {symbol}: {e}")
|
|
finally:
|
|
logger.info(f"Disconnected from Binance trade stream for {symbol}")
|
|
|
|
async def _process_binance_trade(self, symbol: str, data: Dict):
|
|
"""Process Binance trade data for SVP calculation"""
|
|
try:
|
|
if 'e' in data and data['e'] == 'trade':
|
|
trade = {
|
|
'exchange': 'binance',
|
|
'symbol': symbol,
|
|
'price': float(data['p']),
|
|
'quantity': float(data['q']),
|
|
'side': 'buy' if not data['m'] else 'sell', # m is true for maker sell
|
|
'timestamp': datetime.fromtimestamp(data['T'] / 1000),
|
|
'volume_usd': float(data['p']) * float(data['q'])
|
|
}
|
|
|
|
await self._add_trade_to_svp(symbol, trade)
|
|
|
|
# Log every 1000th trade
|
|
if len(self.session_trades[symbol]) % 1000 == 0:
|
|
logger.info(f"Tracked {len(self.session_trades[symbol])} trades for {symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Binance trade for {symbol}: {e}")
|
|
|
|
async def _continuous_consolidation(self):
|
|
"""Continuously consolidate order books from all exchanges"""
|
|
while self.is_streaming:
|
|
try:
|
|
start_time = time.time()
|
|
|
|
for symbol in self.symbols:
|
|
logger.debug(f"Starting consolidation for {symbol}")
|
|
await self._consolidate_symbol_orderbook(symbol)
|
|
|
|
processing_time = (time.time() - start_time) * 1000
|
|
self.processing_times['consolidation'].append(processing_time)
|
|
|
|
# Log consolidation performance every 100 iterations
|
|
if len(self.processing_times['consolidation']) % 100 == 0:
|
|
avg_time = sum(self.processing_times['consolidation']) / len(self.processing_times['consolidation'])
|
|
logger.debug(f"Average consolidation time: {avg_time:.2f}ms")
|
|
|
|
await asyncio.sleep(0.1) # 100ms consolidation frequency
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in consolidation loop: {e}", exc_info=True)
|
|
await asyncio.sleep(1)
|
|
|
|
async def _consolidate_symbol_orderbook(self, symbol: str):
|
|
"""Consolidate order book for a specific symbol across all exchanges"""
|
|
try:
|
|
timestamp = datetime.now()
|
|
consolidated_bids = {}
|
|
consolidated_asks = {}
|
|
active_exchanges = []
|
|
|
|
# Collect order book data from all connected exchanges
|
|
async with self.data_lock:
|
|
logger.debug(f"Collecting order book data for {symbol}")
|
|
for exchange_name, exchange_data in self.exchange_order_books[symbol].items():
|
|
if exchange_data.get('connected', False):
|
|
active_exchanges.append(exchange_name)
|
|
|
|
# Get real-time WebSocket data (top 20 levels)
|
|
live_bids = exchange_data.get('bids', {})
|
|
live_asks = exchange_data.get('asks', {})
|
|
|
|
# Get deep REST API data (up to 1000 levels)
|
|
deep_bids = exchange_data.get('deep_bids', {})
|
|
deep_asks = exchange_data.get('deep_asks', {})
|
|
|
|
# Merge data: prioritize live data for top levels, add deep data for others
|
|
merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid')
|
|
merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask')
|
|
|
|
bid_count = len(merged_bids)
|
|
ask_count = len(merged_asks)
|
|
logger.debug(f"{exchange_name} data for {symbol}: {bid_count} bids ({len(live_bids)} live), {ask_count} asks ({len(live_asks)} live)")
|
|
|
|
# Process merged bids
|
|
for price, level in merged_bids.items():
|
|
if price not in consolidated_bids:
|
|
consolidated_bids[price] = ConsolidatedOrderBookLevel(
|
|
price=price,
|
|
total_size=0,
|
|
total_volume_usd=0,
|
|
total_orders=0,
|
|
side='bid',
|
|
exchange_breakdown={},
|
|
dominant_exchange=exchange_name,
|
|
liquidity_score=0,
|
|
timestamp=timestamp
|
|
)
|
|
|
|
consolidated_bids[price].total_size += level.size
|
|
consolidated_bids[price].total_volume_usd += level.volume_usd
|
|
consolidated_bids[price].total_orders += level.orders_count
|
|
consolidated_bids[price].exchange_breakdown[exchange_name] = level
|
|
|
|
# Update dominant exchange based on volume
|
|
if level.volume_usd > consolidated_bids[price].exchange_breakdown.get(
|
|
consolidated_bids[price].dominant_exchange,
|
|
type('obj', (object,), {'volume_usd': 0})()
|
|
).volume_usd:
|
|
consolidated_bids[price].dominant_exchange = exchange_name
|
|
|
|
# Process merged asks (similar logic)
|
|
for price, level in merged_asks.items():
|
|
if price not in consolidated_asks:
|
|
consolidated_asks[price] = ConsolidatedOrderBookLevel(
|
|
price=price,
|
|
total_size=0,
|
|
total_volume_usd=0,
|
|
total_orders=0,
|
|
side='ask',
|
|
exchange_breakdown={},
|
|
dominant_exchange=exchange_name,
|
|
liquidity_score=0,
|
|
timestamp=timestamp
|
|
)
|
|
|
|
consolidated_asks[price].total_size += level.size
|
|
consolidated_asks[price].total_volume_usd += level.volume_usd
|
|
consolidated_asks[price].total_orders += level.orders_count
|
|
consolidated_asks[price].exchange_breakdown[exchange_name] = level
|
|
|
|
if level.volume_usd > consolidated_asks[price].exchange_breakdown.get(
|
|
consolidated_asks[price].dominant_exchange,
|
|
type('obj', (object,), {'volume_usd': 0})()
|
|
).volume_usd:
|
|
consolidated_asks[price].dominant_exchange = exchange_name
|
|
|
|
logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}")
|
|
|
|
# Sort and calculate consolidated metrics
|
|
sorted_bids = sorted(consolidated_bids.values(), key=lambda x: x.price, reverse=True)
|
|
sorted_asks = sorted(consolidated_asks.values(), key=lambda x: x.price)
|
|
|
|
# Calculate consolidated metrics
|
|
volume_weighted_mid = self._calculate_volume_weighted_mid(sorted_bids, sorted_asks)
|
|
total_bid_liquidity = sum(level.total_volume_usd for level in sorted_bids)
|
|
total_ask_liquidity = sum(level.total_volume_usd for level in sorted_asks)
|
|
|
|
spread_bps = 0
|
|
liquidity_imbalance = 0
|
|
|
|
if sorted_bids and sorted_asks:
|
|
best_bid = sorted_bids[0].price
|
|
best_ask = sorted_asks[0].price
|
|
spread_bps = ((best_ask - best_bid) / volume_weighted_mid) * 10000
|
|
|
|
if total_bid_liquidity + total_ask_liquidity > 0:
|
|
liquidity_imbalance = (total_bid_liquidity - total_ask_liquidity) / (total_bid_liquidity + total_ask_liquidity)
|
|
|
|
logger.debug(f"{symbol} metrics - Mid: ${volume_weighted_mid:.2f}, Spread: {spread_bps:.1f}bps, " +
|
|
f"Imbalance: {liquidity_imbalance:.2%}")
|
|
|
|
# Generate fine-grain price buckets
|
|
price_buckets = self._generate_price_buckets(symbol, sorted_bids, sorted_asks, volume_weighted_mid)
|
|
|
|
# Create consolidated snapshot with more levels for dashboard
|
|
cob_snapshot = COBSnapshot(
|
|
symbol=symbol,
|
|
timestamp=timestamp,
|
|
consolidated_bids=sorted_bids[:100], # Top 100 levels for better dashboard display
|
|
consolidated_asks=sorted_asks[:100],
|
|
exchanges_active=active_exchanges,
|
|
volume_weighted_mid=volume_weighted_mid,
|
|
total_bid_liquidity=total_bid_liquidity,
|
|
total_ask_liquidity=total_ask_liquidity,
|
|
spread_bps=spread_bps,
|
|
liquidity_imbalance=liquidity_imbalance,
|
|
price_buckets=price_buckets
|
|
)
|
|
|
|
# Store consolidated order book
|
|
self.consolidated_order_books[symbol] = cob_snapshot
|
|
self.realtime_snapshots[symbol].append(cob_snapshot)
|
|
|
|
# Update real-time statistics
|
|
self._update_realtime_stats(symbol, cob_snapshot)
|
|
|
|
# Update consolidation statistics
|
|
async with self.data_lock:
|
|
self.consolidation_stats[symbol]['total_updates'] += 1
|
|
self.consolidation_stats[symbol]['active_price_levels'] = len(sorted_bids) + len(sorted_asks)
|
|
self.consolidation_stats[symbol]['total_liquidity_usd'] = total_bid_liquidity + total_ask_liquidity
|
|
|
|
# Notify callbacks with real-time data
|
|
for callback in self.cob_update_callbacks:
|
|
try:
|
|
if asyncio.iscoroutinefunction(callback):
|
|
asyncio.create_task(callback(symbol, cob_snapshot))
|
|
else:
|
|
callback(symbol, cob_snapshot)
|
|
except Exception as e:
|
|
logger.error(f"Error in COB update callback: {e}")
|
|
|
|
logger.debug(f"Notified {len(self.cob_update_callbacks)} COB callbacks for {symbol}")
|
|
|
|
logger.debug(f"Completed consolidation for {symbol} - {len(active_exchanges)} exchanges active")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error consolidating order book for {symbol}: {e}", exc_info=True)
|
|
|
|
def _merge_orderbook_data(self, live_data: Dict, deep_data: Dict, side: str) -> Dict:
|
|
"""
|
|
Merge live WebSocket data with deep REST API data
|
|
Strategy: Use live data for top levels (lowest latency), deep data for additional depth
|
|
"""
|
|
try:
|
|
merged = {}
|
|
|
|
# Always prioritize live WebSocket data (top 20 levels)
|
|
for price, level in live_data.items():
|
|
merged[price] = level
|
|
|
|
# Add deep data that's not already covered by live data
|
|
for price, level in deep_data.items():
|
|
if price not in merged:
|
|
# Mark this as deep data (older timestamp but more comprehensive)
|
|
level.timestamp = level.timestamp # Keep original timestamp
|
|
merged[price] = level
|
|
|
|
# Sort to find the cutoff point for live vs deep data
|
|
if side == 'bid':
|
|
# For bids, higher prices are better (closer to mid)
|
|
sorted_prices = sorted(merged.keys(), reverse=True)
|
|
else:
|
|
# For asks, lower prices are better (closer to mid)
|
|
sorted_prices = sorted(merged.keys())
|
|
|
|
# Limit total depth to prevent memory issues (keep top 200 levels)
|
|
max_levels = 200
|
|
if len(sorted_prices) > max_levels:
|
|
cutoff_price = sorted_prices[max_levels - 1]
|
|
if side == 'bid':
|
|
merged = {p: level for p, level in merged.items() if p >= cutoff_price}
|
|
else:
|
|
merged = {p: level for p, level in merged.items() if p <= cutoff_price}
|
|
|
|
return merged
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error merging order book data: {e}")
|
|
return live_data # Fallback to live data only
|
|
|
|
def _generate_price_buckets(self, symbol: str, bids: List[ConsolidatedOrderBookLevel],
|
|
asks: List[ConsolidatedOrderBookLevel], mid_price: float) -> Dict[str, Dict[str, float]]:
|
|
"""Generate fine-grain price buckets for volume analysis"""
|
|
try:
|
|
buckets = {'bids': {}, 'asks': {}}
|
|
|
|
# Use fixed USD bucket size if configured for this symbol
|
|
if symbol in self.fixed_usd_buckets:
|
|
bucket_size = self.fixed_usd_buckets[symbol]
|
|
logger.debug(f"Using fixed USD bucket size {bucket_size} for {symbol}")
|
|
else:
|
|
bucket_size = mid_price * (self.bucket_size_bps / 10000) # Convert bps to decimal
|
|
|
|
# Process bids (below mid price)
|
|
for level in bids:
|
|
if level.price <= mid_price:
|
|
bucket_key = int((mid_price - level.price) / bucket_size)
|
|
bucket_price = mid_price - (bucket_key * bucket_size)
|
|
|
|
if bucket_key not in buckets['bids']:
|
|
buckets['bids'][bucket_key] = {
|
|
'price': bucket_price,
|
|
'volume_usd': 0,
|
|
'size': 0,
|
|
'orders': 0,
|
|
'exchanges': set()
|
|
}
|
|
|
|
buckets['bids'][bucket_key]['volume_usd'] += level.total_volume_usd
|
|
buckets['bids'][bucket_key]['size'] += level.total_size
|
|
buckets['bids'][bucket_key]['orders'] += level.total_orders
|
|
buckets['bids'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
|
|
|
|
# Process asks (above mid price)
|
|
for level in asks:
|
|
if level.price >= mid_price:
|
|
bucket_key = int((level.price - mid_price) / bucket_size)
|
|
bucket_price = mid_price + (bucket_key * bucket_size)
|
|
|
|
if bucket_key not in buckets['asks']:
|
|
buckets['asks'][bucket_key] = {
|
|
'price': bucket_price,
|
|
'volume_usd': 0,
|
|
'size': 0,
|
|
'orders': 0,
|
|
'exchanges': set()
|
|
}
|
|
|
|
buckets['asks'][bucket_key]['volume_usd'] += level.total_volume_usd
|
|
buckets['asks'][bucket_key]['size'] += level.total_size
|
|
buckets['asks'][bucket_key]['orders'] += level.total_orders
|
|
buckets['asks'][bucket_key]['exchanges'].update(level.exchange_breakdown.keys())
|
|
|
|
# Convert sets to lists for JSON serialization
|
|
for side in ['bids', 'asks']:
|
|
for bucket_key in buckets[side]:
|
|
buckets[side][bucket_key]['exchanges'] = list(buckets[side][bucket_key]['exchanges'])
|
|
|
|
return buckets
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating price buckets for {symbol}: {e}")
|
|
return {'bids': {}, 'asks': {}}
|
|
|
|
def _calculate_volume_weighted_mid(self, bids: List[ConsolidatedOrderBookLevel],
|
|
asks: List[ConsolidatedOrderBookLevel]) -> float:
|
|
"""Calculate volume-weighted mid price across all exchanges"""
|
|
if not bids or not asks:
|
|
return 0.0
|
|
|
|
try:
|
|
# Take top 5 levels for volume weighting
|
|
top_bids = bids[:5]
|
|
top_asks = asks[:5]
|
|
|
|
total_bid_volume = sum(level.total_volume_usd for level in top_bids)
|
|
total_ask_volume = sum(level.total_volume_usd for level in top_asks)
|
|
|
|
if total_bid_volume + total_ask_volume == 0:
|
|
return (bids[0].price + asks[0].price) / 2
|
|
|
|
weighted_bid = sum(level.price * level.total_volume_usd for level in top_bids) / total_bid_volume if total_bid_volume > 0 else bids[0].price
|
|
weighted_ask = sum(level.price * level.total_volume_usd for level in top_asks) / total_ask_volume if total_ask_volume > 0 else asks[0].price
|
|
|
|
bid_weight = total_bid_volume / (total_bid_volume + total_ask_volume)
|
|
ask_weight = total_ask_volume / (total_bid_volume + total_ask_volume)
|
|
|
|
return (weighted_bid * ask_weight) + (weighted_ask * bid_weight)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating volume weighted mid: {e}")
|
|
return (bids[0].price + asks[0].price) / 2 if bids and asks else 0.0
|
|
|
|
async def _continuous_bucket_updates(self):
|
|
"""Continuously update and optimize price buckets"""
|
|
while self.is_streaming:
|
|
try:
|
|
for symbol in self.symbols:
|
|
if symbol in self.consolidated_order_books:
|
|
cob = self.consolidated_order_books[symbol]
|
|
|
|
# Notify bucket update callbacks
|
|
for callback in self.bucket_update_callbacks:
|
|
try:
|
|
if asyncio.iscoroutinefunction(callback):
|
|
asyncio.create_task(callback(symbol, cob.price_buckets))
|
|
else:
|
|
callback(symbol, cob.price_buckets)
|
|
except Exception as e:
|
|
logger.warning(f"Error in bucket update callback: {e}")
|
|
|
|
await asyncio.sleep(self.bucket_update_frequency / 1000) # Convert ms to seconds
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in bucket update loop: {e}")
|
|
await asyncio.sleep(1)
|
|
|
|
# Public interface methods
|
|
|
|
def subscribe_to_cob_updates(self, callback: Callable[[str, COBSnapshot], Awaitable[None]]):
|
|
"""Subscribe to consolidated order book updates"""
|
|
self.cob_update_callbacks.append(callback)
|
|
logger.info(f"Added COB update callback: {len(self.cob_update_callbacks)} total")
|
|
|
|
def subscribe_to_bucket_updates(self, callback: Callable[[str, Dict], Awaitable[None]]):
|
|
"""Subscribe to price bucket updates"""
|
|
self.bucket_update_callbacks.append(callback)
|
|
logger.info(f"Added bucket update callback: {len(self.bucket_update_callbacks)} total")
|
|
|
|
def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]:
|
|
"""Get current consolidated order book snapshot"""
|
|
return self.consolidated_order_books.get(symbol)
|
|
|
|
def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]:
|
|
"""Get fine-grain price buckets for a symbol"""
|
|
if symbol not in self.consolidated_order_books:
|
|
return None
|
|
|
|
cob = self.consolidated_order_books[symbol]
|
|
return cob.price_buckets
|
|
|
|
def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]:
|
|
"""Get breakdown of liquidity by exchange"""
|
|
if symbol not in self.consolidated_order_books:
|
|
return None
|
|
|
|
cob = self.consolidated_order_books[symbol]
|
|
breakdown = {}
|
|
|
|
for exchange in cob.exchanges_active:
|
|
breakdown[exchange] = {
|
|
'bid_liquidity': 0,
|
|
'ask_liquidity': 0,
|
|
'total_liquidity': 0,
|
|
'market_share': 0
|
|
}
|
|
|
|
# Calculate liquidity by exchange
|
|
for level in cob.consolidated_bids + cob.consolidated_asks:
|
|
for exchange, exchange_level in level.exchange_breakdown.items():
|
|
if level.side == 'bid':
|
|
breakdown[exchange]['bid_liquidity'] += exchange_level.volume_usd
|
|
else:
|
|
breakdown[exchange]['ask_liquidity'] += exchange_level.volume_usd
|
|
breakdown[exchange]['total_liquidity'] += exchange_level.volume_usd
|
|
|
|
# Calculate market share
|
|
total_market_liquidity = sum(data['total_liquidity'] for data in breakdown.values())
|
|
if total_market_liquidity > 0:
|
|
for exchange in breakdown:
|
|
breakdown[exchange]['market_share'] = breakdown[exchange]['total_liquidity'] / total_market_liquidity
|
|
|
|
return breakdown
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get provider statistics"""
|
|
return {
|
|
'symbols': self.symbols,
|
|
'is_streaming': self.is_streaming,
|
|
'active_exchanges': self.active_exchanges,
|
|
'exchange_update_counts': dict(self.exchange_update_counts),
|
|
'consolidation_stats': dict(self.consolidation_stats),
|
|
'bucket_size_bps': self.bucket_size_bps,
|
|
'cob_update_callbacks': len(self.cob_update_callbacks),
|
|
'bucket_update_callbacks': len(self.bucket_update_callbacks),
|
|
'avg_processing_time_ms': np.mean(self.processing_times.get('consolidation', [0])) if self.processing_times.get('consolidation') else 0
|
|
}
|
|
|
|
def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]:
|
|
"""Get detailed market depth analysis"""
|
|
if symbol not in self.consolidated_order_books:
|
|
return None
|
|
|
|
cob = self.consolidated_order_books[symbol]
|
|
|
|
# Analyze depth distribution
|
|
bid_levels = cob.consolidated_bids[:depth_levels]
|
|
ask_levels = cob.consolidated_asks[:depth_levels]
|
|
|
|
analysis = {
|
|
'symbol': symbol,
|
|
'timestamp': cob.timestamp.isoformat(),
|
|
'volume_weighted_mid': cob.volume_weighted_mid,
|
|
'spread_bps': cob.spread_bps,
|
|
'total_bid_liquidity': cob.total_bid_liquidity,
|
|
'total_ask_liquidity': cob.total_ask_liquidity,
|
|
'liquidity_imbalance': cob.liquidity_imbalance,
|
|
'exchanges_active': cob.exchanges_active,
|
|
'depth_analysis': {
|
|
'bid_levels': len(bid_levels),
|
|
'ask_levels': len(ask_levels),
|
|
'bid_liquidity_distribution': [],
|
|
'ask_liquidity_distribution': [],
|
|
'dominant_exchanges': {}
|
|
}
|
|
}
|
|
|
|
# Analyze liquidity distribution
|
|
for i, level in enumerate(bid_levels):
|
|
analysis['depth_analysis']['bid_liquidity_distribution'].append({
|
|
'level': i + 1,
|
|
'price': level.price,
|
|
'volume_usd': level.total_volume_usd,
|
|
'size': level.total_size,
|
|
'dominant_exchange': level.dominant_exchange,
|
|
'exchange_count': len(level.exchange_breakdown)
|
|
})
|
|
|
|
for i, level in enumerate(ask_levels):
|
|
analysis['depth_analysis']['ask_liquidity_distribution'].append({
|
|
'level': i + 1,
|
|
'price': level.price,
|
|
'volume_usd': level.total_volume_usd,
|
|
'size': level.total_size,
|
|
'dominant_exchange': level.dominant_exchange,
|
|
'exchange_count': len(level.exchange_breakdown)
|
|
})
|
|
|
|
# Count dominant exchanges
|
|
for level in bid_levels + ask_levels:
|
|
exchange = level.dominant_exchange
|
|
if exchange not in analysis['depth_analysis']['dominant_exchanges']:
|
|
analysis['depth_analysis']['dominant_exchanges'][exchange] = 0
|
|
analysis['depth_analysis']['dominant_exchanges'][exchange] += 1
|
|
|
|
return analysis
|
|
|
|
def _update_realtime_stats(self, symbol: str, cob_snapshot: COBSnapshot):
|
|
"""Update real-time statistics for 1s and 5s windows"""
|
|
try:
|
|
current_time = datetime.now()
|
|
|
|
# Add to history
|
|
self.realtime_snapshots[symbol].append(cob_snapshot)
|
|
|
|
# Calculate 1s and 5s windows
|
|
window_1s = current_time - timedelta(seconds=1)
|
|
window_5s = current_time - timedelta(seconds=5)
|
|
|
|
# Get data within windows
|
|
data_1s = [snapshot for snapshot in self.realtime_snapshots[symbol]
|
|
if snapshot.timestamp >= window_1s]
|
|
data_5s = [snapshot for snapshot in self.realtime_snapshots[symbol]
|
|
if snapshot.timestamp >= window_5s]
|
|
|
|
# Update 1s stats
|
|
if data_1s:
|
|
self.realtime_stats[symbol]['1s_stats'] = self._calculate_window_stats(data_1s)
|
|
|
|
# Update 5s stats
|
|
if data_5s:
|
|
self.realtime_stats[symbol]['5s_stats'] = self._calculate_window_stats(data_5s)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating real-time stats for {symbol}: {e}")
|
|
|
|
def _calculate_window_stats(self, snapshots: List[COBSnapshot]) -> Dict:
|
|
"""Calculate statistics for a time window"""
|
|
if not snapshots:
|
|
return {}
|
|
|
|
mid_prices = [s.volume_weighted_mid for s in snapshots]
|
|
spreads = [s.spread_bps for s in snapshots]
|
|
bid_liquidity = [s.total_bid_liquidity for s in snapshots]
|
|
ask_liquidity = [s.total_ask_liquidity for s in snapshots]
|
|
imbalances = [s.liquidity_imbalance for s in snapshots]
|
|
|
|
return {
|
|
'max_mid_price': max(mid_prices),
|
|
'min_mid_price': min(mid_prices),
|
|
'avg_mid_price': sum(mid_prices) / len(mid_prices),
|
|
'max_spread_bps': max(spreads),
|
|
'avg_spread_bps': sum(spreads) / len(spreads),
|
|
'max_bid_liquidity': max(bid_liquidity),
|
|
'avg_bid_liquidity': sum(bid_liquidity) / len(bid_liquidity),
|
|
'max_ask_liquidity': max(ask_liquidity),
|
|
'avg_ask_liquidity': sum(ask_liquidity) / len(ask_liquidity),
|
|
'max_imbalance': max(imbalances),
|
|
'avg_imbalance': sum(imbalances) / len(imbalances),
|
|
'update_count': len(snapshots)
|
|
}
|
|
|
|
def get_realtime_stats(self, symbol: str) -> Dict:
|
|
"""Get current real-time statistics for a symbol"""
|
|
try:
|
|
return self.realtime_stats.get(symbol, {})
|
|
except Exception as e:
|
|
logger.error(f"Error getting real-time stats for {symbol}: {e}")
|
|
return {} |