Files
gogo2/core/enhanced_cob_websocket.py
Dobromir Popov 7c508ab536 cob
2025-07-28 11:12:42 +03:00

1838 lines
84 KiB
Python

#!/usr/bin/env python3
"""
Enhanced COB WebSocket Implementation
Robust WebSocket implementation for Consolidated Order Book data with:
- Maximum allowed depth subscription
- Clear error handling and warnings
- Automatic reconnection with exponential backoff
- Fallback to REST API when WebSocket fails
- Dashboard integration with status updates
This replaces the existing COB WebSocket implementation with a more reliable version.
binance DOCS: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams
"""
import asyncio
import json
import logging
import time
import traceback
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from collections import deque, defaultdict
from dataclasses import dataclass
import aiohttp
import weakref
try:
import websockets
from websockets.client import connect as websockets_connect
from websockets.exceptions import ConnectionClosed, WebSocketException
WEBSOCKETS_AVAILABLE = True
except ImportError:
websockets = None
websockets_connect = None
ConnectionClosed = Exception
WebSocketException = Exception
WEBSOCKETS_AVAILABLE = False
logger = logging.getLogger(__name__)
@dataclass
class COBWebSocketStatus:
"""Status tracking for COB WebSocket connections"""
connected: bool = False
last_message_time: Optional[datetime] = None
connection_attempts: int = 0
last_error: Optional[str] = None
reconnect_delay: float = 1.0
max_reconnect_delay: float = 60.0
messages_received: int = 0
def reset_reconnect_delay(self):
"""Reset reconnect delay on successful connection"""
self.reconnect_delay = 1.0
def increase_reconnect_delay(self):
"""Increase reconnect delay with exponential backoff"""
self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2)
class EnhancedCOBWebSocket:
"""Enhanced COB WebSocket with multi-stream support and robust error handling
Subscribes to multiple Binance WebSocket streams:
- Order book depth updates (@depth@1000ms)
- 1-second candlestick data (@kline_1s) with timezone support
- 24hr ticker statistics (@ticker) - includes volume data
- Aggregated trade data (@aggTrade) - for real-time volume analysis
Features:
- Binance WebSocket compliance (ping/pong, rate limiting, 24hr reconnection)
- Proper order book synchronization with REST API snapshots
- Real-time volume metrics and trade analysis
- Timezone offset support for kline streams (UTC or UTC+8)
- Fallback to REST API when WebSocket fails
Usage Examples:
# UTC timezone (default)
cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT'])
# UTC+8 timezone for kline streams
cob_ws = EnhancedCOBWebSocket(['BTC/USDT', 'ETH/USDT'], timezone_offset='+08:00')
# Get kline data with timezone conversion
kline_utc8 = cob_ws.get_kline_with_timezone('BTC/USDT', '+08:00')
"""
_instances = {} # Track instances by symbols to prevent duplicates
def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None):
"""
Initialize Enhanced COB WebSocket
Args:
symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT'])
dashboard_callback: Callback function for dashboard status updates
timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8)
"""
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
self.dashboard_callback = dashboard_callback
self.timezone_offset = timezone_offset # None for UTC, '+08:00' for UTC+8
# Check for existing instances to prevent duplicate connections
symbols_key = tuple(sorted(self.symbols))
if symbols_key in EnhancedCOBWebSocket._instances:
logger.warning(f"EnhancedCOBWebSocket already exists for symbols {self.symbols} - reusing existing instance")
existing = EnhancedCOBWebSocket._instances[symbols_key]
# Copy existing instance data
self.__dict__.update(existing.__dict__)
return
# Register this instance
EnhancedCOBWebSocket._instances[symbols_key] = self
# Connection status tracking
self.status: Dict[str, COBWebSocketStatus] = {
symbol: COBWebSocketStatus() for symbol in self.symbols
}
# Data callbacks
self.cob_callbacks: List[Callable] = []
self.error_callbacks: List[Callable] = []
# Latest data cache
self.latest_cob_data: Dict[str, Dict] = {}
# Order book management for proper diff depth stream handling (Binance compliant)
self.order_books: Dict[str, Dict] = {} # {symbol: {'bids': {price: qty}, 'asks': {price: qty}}}
self.last_update_ids: Dict[str, int] = {} # Track last update IDs for synchronization
self.order_book_initialized: Dict[str, bool] = {} # Track if order book is properly initialized
# Event buffering for proper Binance order book synchronization
self.event_buffers: Dict[str, List[Dict]] = {} # Buffer events before snapshot
self.first_event_u: Dict[str, int] = {} # Track first event U for synchronization
self.snapshot_in_progress: Dict[str, bool] = {} # Track snapshot initialization
# Rate limiting for message processing (Binance: max 5 messages per second)
self.last_message_time: Dict[str, datetime] = {}
self.min_message_interval = 0.2 # 200ms = 5 messages per second compliance
self.message_count: Dict[str, int] = {}
self.message_window_start: Dict[str, datetime] = {}
# WebSocket connections
self.websocket_tasks: Dict[str, asyncio.Task] = {}
# REST API fallback
self.rest_session: Optional[aiohttp.ClientSession] = None
self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols}
self.rest_tasks: Dict[str, asyncio.Task] = {}
# Configuration
self.max_depth = 1000 # Maximum depth for order book
self.update_speed = '1000ms' # Binance update speed - reduced for stability
# Timezone configuration
if self.timezone_offset == '+08:00':
logger.info("Configured for UTC+8 timezone (Asian markets)")
elif self.timezone_offset:
logger.info(f"Configured for {self.timezone_offset} timezone")
else:
logger.info("Configured for UTC timezone (default)")
logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}")
if not WEBSOCKETS_AVAILABLE:
logger.error("WebSockets module not available - COB data will be limited to REST API")
def add_cob_callback(self, callback: Callable):
"""Add callback for COB data updates"""
self.cob_callbacks.append(callback)
def add_error_callback(self, callback: Callable):
"""Add callback for error notifications"""
self.error_callbacks.append(callback)
async def start(self):
"""Start COB WebSocket connections"""
logger.info("Starting Enhanced COB WebSocket system")
# Initialize REST session for fallback
await self._init_rest_session()
# Start WebSocket connections for each symbol
for symbol in self.symbols:
await self._start_symbol_websocket(symbol)
# Start monitoring task
asyncio.create_task(self._monitor_connections())
logger.info("Enhanced COB WebSocket system started")
async def stop(self):
"""Stop all WebSocket connections"""
logger.info("Stopping Enhanced COB WebSocket system")
# Cancel all WebSocket tasks
for symbol, task in self.websocket_tasks.items():
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Cancel all REST tasks
for symbol, task in self.rest_tasks.items():
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Close REST session
if self.rest_session:
await self.rest_session.close()
# Remove from instances registry
symbols_key = tuple(sorted(self.symbols))
if symbols_key in EnhancedCOBWebSocket._instances:
del EnhancedCOBWebSocket._instances[symbols_key]
logger.info("Enhanced COB WebSocket system stopped")
async def _init_rest_session(self):
"""Initialize REST API session - Windows uses requests-only mode"""
import platform
# On Windows, completely skip aiohttp to avoid aiodns issues
is_windows = platform.system().lower() == 'windows'
if is_windows:
logger.info("Windows detected - using WebSocket-only mode (skipping REST API due to rate limits)")
self.rest_session = None # Force requests fallback
return
# Non-Windows: try aiohttp
try:
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=20,
enable_cleanup_closed=True,
use_dns_cache=True
)
timeout = aiohttp.ClientTimeout(total=10, connect=5)
self.rest_session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'}
)
logger.info("REST API session initialized (Unix/Linux)")
except Exception as e:
logger.warning(f"Failed to initialize aiohttp session: {e}")
logger.info("Falling back to requests-only mode")
self.rest_session = None
async def _get_order_book_snapshot(self, symbol: str):
"""Get initial order book snapshot from REST API
This is necessary for properly maintaining the order book state
with the WebSocket depth stream.
"""
try:
# Ensure REST session is available
if not self.rest_session:
await self._init_rest_session()
if not self.rest_session:
logger.warning(f"Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only")
return
# Convert symbol format for Binance API
binance_symbol = symbol.replace('/', '')
# Get order book snapshot with maximum depth
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000"
logger.debug(f"Getting order book snapshot for {symbol} from {url}")
async with self.rest_session.get(url) as response:
if response.status == 200:
data = await response.json()
# Validate response structure
if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data:
logger.error(f"Invalid order book snapshot response for {symbol}: missing bids/asks")
return
# Initialize order book state for proper WebSocket synchronization
if symbol not in self.order_books:
self.order_books[symbol] = {'bids': {}, 'asks': {}}
# Clear existing data and populate with snapshot
self.order_books[symbol]['bids'] = {float(price): float(qty) for price, qty in data['bids'] if float(qty) > 0}
self.order_books[symbol]['asks'] = {float(price): float(qty) for price, qty in data['asks'] if float(qty) > 0}
# Store last update ID for synchronization
if 'lastUpdateId' in data:
self.last_update_ids[symbol] = data['lastUpdateId']
logger.debug(f"Order book snapshot for {symbol}: lastUpdateId = {data['lastUpdateId']}")
# Mark as initialized
self.order_book_initialized[symbol] = True
logger.info(f"Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
# Create initial COB data from snapshot
bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0]
asks = [{'price': float(price), 'size': float(qty)} for price, qty in data['asks'] if float(qty) > 0]
# Sort bids (descending) and asks (ascending)
bids.sort(key=lambda x: x['price'], reverse=True)
asks.sort(key=lambda x: x['price'])
# Create COB data structure if we have valid data
if bids and asks:
best_bid = bids[0]
best_ask = asks[0]
mid_price = (best_bid['price'] + best_ask['price']) / 2
spread = best_ask['price'] - best_bid['price']
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
# Calculate volumes
bid_volume = sum(bid['size'] * bid['price'] for bid in bids)
ask_volume = sum(ask['size'] * ask['price'] for ask in asks)
total_volume = bid_volume + ask_volume
cob_data = {
'symbol': symbol,
'timestamp': datetime.now(),
'bids': bids,
'asks': asks,
'source': 'rest_snapshot',
'exchange': 'binance',
'stats': {
'best_bid': best_bid['price'],
'best_ask': best_ask['price'],
'mid_price': mid_price,
'spread': spread,
'spread_bps': spread_bps,
'bid_volume': bid_volume,
'ask_volume': ask_volume,
'total_bid_volume': bid_volume,
'total_ask_volume': ask_volume,
'imbalance': (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0,
'bid_levels': len(bids),
'ask_levels': len(asks),
'timestamp': datetime.now().isoformat()
}
}
# Update cache
self.latest_cob_data[symbol] = cob_data
# Notify callbacks
for callback in self.cob_callbacks:
try:
await callback(symbol, cob_data)
except Exception as e:
logger.error(f"❌ Error in COB callback: {e}")
logger.debug(f"Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps")
else:
logger.warning(f"No valid bid/ask data in snapshot for {symbol}")
elif response.status == 429:
logger.warning(f"Rate limited getting snapshot for {symbol}, will continue with WebSocket only")
else:
logger.error(f"Failed to get order book snapshot for {symbol}: HTTP {response.status}")
response_text = await response.text()
logger.debug(f"Response: {response_text}")
except asyncio.TimeoutError:
logger.warning(f"Timeout getting order book snapshot for {symbol}, will continue with WebSocket only")
except Exception as e:
logger.warning(f"Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only")
logger.debug(f"Snapshot error details: {e}")
# Don't fail the entire connection due to snapshot issues
async def _start_symbol_websocket(self, symbol: str):
"""Start WebSocket connection for a specific symbol"""
if not WEBSOCKETS_AVAILABLE:
logger.warning(f"WebSockets not available for {symbol}, starting REST fallback")
await self._start_rest_fallback(symbol)
return
# Cancel existing task if running
if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done():
self.websocket_tasks[symbol].cancel()
# Start new WebSocket task
self.websocket_tasks[symbol] = asyncio.create_task(
self._websocket_connection_loop(symbol)
)
logger.info(f"Started WebSocket task for {symbol}")
async def _websocket_connection_loop(self, symbol: str):
"""Main WebSocket connection loop with reconnection logic
Subscribes to multiple streams via combined stream endpoint:
- Order book depth (@depth@1000ms)
- 1-second candlesticks (@kline_1s)
- 24hr ticker with volume (@ticker)
- Aggregated trades (@aggTrade)
"""
status = self.status[symbol]
while True:
try:
logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})")
status.connection_attempts += 1
# Create WebSocket URL with combined streams for multiple data types
ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT
# Subscribe to multiple streams:
# 1. depth@1000ms - Order book depth updates
# 2. kline_1s - 1-second candlestick data (with optional timezone offset)
# 3. ticker - 24hr ticker statistics (includes volume)
# 4. aggTrade - Aggregated trade data for volume analysis
# Configure kline stream with timezone offset if specified
if self.timezone_offset:
kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}"
logger.info(f"Using timezone offset {self.timezone_offset} for kline stream")
else:
kline_stream = f"{ws_symbol}@kline_1s"
logger.info("Using UTC timezone for kline stream")
streams = [
f"{ws_symbol}@depth@1000ms", # Order book depth
kline_stream, # 1-second candlesticks (with timezone)
f"{ws_symbol}@ticker", # 24hr ticker with volume
f"{ws_symbol}@aggTrade" # Aggregated trades
]
# Use combined stream endpoint
streams_param = "/".join(streams)
ws_url = f"wss://stream.binance.com:9443/stream?streams={streams_param}"
logger.info(f"Connecting to: {ws_url}")
# Add ping/pong handling and proper connection management
async with websockets_connect(
ws_url,
ping_interval=20, # Binance sends ping every 20 seconds
ping_timeout=60, # Binance disconnects after 1 minute without pong
close_timeout=10
) as websocket:
# Connection successful
status.connected = True
status.last_error = None
status.reset_reconnect_delay()
logger.info(f"WebSocket connected for {symbol} with ping/pong handling")
await self._notify_dashboard_status(symbol, "connected", "WebSocket connected")
# Deactivate REST fallback
if self.rest_fallback_active[symbol]:
await self._stop_rest_fallback(symbol)
# Track connection start time for 24-hour limit compliance
connection_start = datetime.now()
# Message receiving loop with proper ping/pong handling
try:
async for message in websocket:
try:
# Check for 24-hour connection limit (Binance requirement)
if (datetime.now() - connection_start).total_seconds() > 23.5 * 3600: # 23.5 hours
logger.info(f"Approaching 24-hour limit for {symbol}, reconnecting...")
break
# Handle different message types
if isinstance(message, bytes):
# Handle ping frames (though websockets library handles this automatically)
continue
# Rate limiting: Binance allows max 5 messages per second
now = datetime.now()
# Initialize rate limiting tracking
if symbol not in self.message_window_start:
self.message_window_start[symbol] = now
self.message_count[symbol] = 0
# Reset counter every second
if (now - self.message_window_start[symbol]).total_seconds() >= 1.0:
self.message_window_start[symbol] = now
self.message_count[symbol] = 0
# Check rate limit (5 messages per second)
if self.message_count[symbol] >= 5:
continue # Skip this message to comply with rate limit
self.message_count[symbol] += 1
self.last_message_time[symbol] = now
# Parse JSON message
data = json.loads(message)
# Handle combined stream format
if 'stream' in data and 'data' in data:
# Combined stream format: {"stream":"<streamName>","data":<rawPayload>}
stream_name = data['stream']
stream_data = data['data']
await self._process_combined_stream_message(symbol, stream_name, stream_data)
else:
# Single stream format (fallback)
await self._process_websocket_message(symbol, data)
status.last_message_time = now
status.messages_received += 1
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}")
except Exception as e:
logger.error(f"Error processing WebSocket message for {symbol}: {e}")
except websockets.exceptions.ConnectionClosedOK:
logger.info(f"WebSocket connection closed normally for {symbol}")
except websockets.exceptions.ConnectionClosedError as e:
logger.warning(f"WebSocket connection closed with error for {symbol}: {e}")
raise
except ConnectionClosed as e:
status.connected = False
status.last_error = f"Connection closed: {e}"
logger.warning(f"WebSocket connection closed for {symbol}: {e}")
except WebSocketException as e:
status.connected = False
status.last_error = f"WebSocket error: {e}"
logger.error(f"WebSocket error for {symbol}: {e}")
except Exception as e:
status.connected = False
status.last_error = f"Unexpected error: {e}"
logger.error(f"Unexpected WebSocket error for {symbol}: {e}")
logger.error(traceback.format_exc())
# Connection failed or closed - start REST fallback
await self._notify_dashboard_status(symbol, "disconnected", status.last_error)
await self._start_rest_fallback(symbol)
# Wait before reconnecting
status.increase_reconnect_delay()
logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}")
await asyncio.sleep(status.reconnect_delay)
async def _process_websocket_message(self, symbol: str, data: Dict):
"""Process WebSocket message and convert to COB format
Properly handles Binance depth stream format according to documentation:
- depthUpdate events with U (first update ID) and u (final update ID)
- Maintains local order book with proper synchronization
"""
try:
# Check if this is a depth update event
if data.get('e') == 'depthUpdate':
# Handle diff depth stream according to Binance documentation
await self._handle_depth_update(symbol, data)
return
# Fallback: handle partial book depth format (depth5, depth10, depth20)
# Extract bids and asks from the message - handle all possible formats
bids_data = data.get('bids', data.get('b', []))
asks_data = data.get('asks', data.get('a', []))
# Process the order book data - filter out zero quantities
# Binance uses 0 quantity to indicate removal from the book
valid_bids = []
valid_asks = []
# Process bids
for bid in bids_data:
try:
if len(bid) >= 2:
price = float(bid[0])
size = float(bid[1])
if size > 0: # Only include non-zero quantities
valid_bids.append({'price': price, 'size': size})
except (IndexError, ValueError, TypeError):
continue
# Process asks
for ask in asks_data:
try:
if len(ask) >= 2:
price = float(ask[0])
size = float(ask[1])
if size > 0: # Only include non-zero quantities
valid_asks.append({'price': price, 'size': size})
except (IndexError, ValueError, TypeError):
continue
# Sort bids (descending) and asks (ascending) for proper order book
valid_bids.sort(key=lambda x: x['price'], reverse=True)
valid_asks.sort(key=lambda x: x['price'])
# Limit to maximum depth (1000 levels for maximum DOM)
max_depth = 1000
if len(valid_bids) > max_depth:
valid_bids = valid_bids[:max_depth]
if len(valid_asks) > max_depth:
valid_asks = valid_asks[:max_depth]
# Create COB data structure matching the working dashboard format
cob_data = {
'symbol': symbol,
'timestamp': datetime.now(),
'bids': valid_bids,
'asks': valid_asks,
'source': 'enhanced_websocket',
'exchange': 'binance'
}
# Calculate comprehensive stats if we have valid data
if valid_bids and valid_asks:
best_bid = valid_bids[0] # Already sorted, first is highest
best_ask = valid_asks[0] # Already sorted, first is lowest
# Core price metrics
mid_price = (best_bid['price'] + best_ask['price']) / 2
spread = best_ask['price'] - best_bid['price']
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
# Volume calculations (notional value) - limit to top 20 levels for performance
top_bids = valid_bids[:20]
top_asks = valid_asks[:20]
bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids)
ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks)
# Size calculations (base currency)
bid_size = sum(bid['size'] for bid in top_bids)
ask_size = sum(ask['size'] for ask in top_asks)
# Imbalance calculations
total_volume = bid_volume + ask_volume
volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0
total_size = bid_size + ask_size
size_imbalance = (bid_size - ask_size) / total_size if total_size > 0 else 0
cob_data['stats'] = {
'best_bid': best_bid['price'],
'best_ask': best_ask['price'],
'mid_price': mid_price,
'spread': spread,
'spread_bps': spread_bps,
'bid_volume': bid_volume,
'ask_volume': ask_volume,
'total_bid_volume': bid_volume,
'total_ask_volume': ask_volume,
'bid_liquidity': bid_volume, # Add liquidity fields
'ask_liquidity': ask_volume,
'total_bid_liquidity': bid_volume,
'total_ask_liquidity': ask_volume,
'bid_size': bid_size,
'ask_size': ask_size,
'volume_imbalance': volume_imbalance,
'size_imbalance': size_imbalance,
'imbalance': volume_imbalance, # Default to volume imbalance
'bid_levels': len(valid_bids),
'ask_levels': len(valid_asks),
'timestamp': datetime.now().isoformat(),
'update_id': data.get('u', 0), # Binance update ID
'event_time': data.get('E', 0) # Binance event time
}
else:
# Provide default stats if no valid data
cob_data['stats'] = {
'best_bid': 0,
'best_ask': 0,
'mid_price': 0,
'spread': 0,
'spread_bps': 0,
'bid_volume': 0,
'ask_volume': 0,
'total_bid_volume': 0,
'total_ask_volume': 0,
'bid_size': 0,
'ask_size': 0,
'volume_imbalance': 0,
'size_imbalance': 0,
'imbalance': 0,
'bid_levels': 0,
'ask_levels': 0,
'timestamp': datetime.now().isoformat(),
'update_id': data.get('u', 0),
'event_time': data.get('E', 0)
}
# Update cache
self.latest_cob_data[symbol] = cob_data
# Notify callbacks
for callback in self.cob_callbacks:
try:
await callback(symbol, cob_data)
except Exception as e:
logger.error(f"Error in COB callback: {e}")
# Log success with key metrics (only for non-empty updates)
if valid_bids and valid_asks:
logger.debug(f"{symbol}: ${cob_data['stats']['mid_price']:.2f}, {len(valid_bids)} bids, {len(valid_asks)} asks, spread: {cob_data['stats']['spread_bps']:.1f} bps")
except Exception as e:
logger.error(f"Error processing WebSocket message for {symbol}: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _handle_depth_update(self, symbol: str, data: Dict):
"""Handle Binance depth update events following official recommendations
Binance Order Book Management Procedure:
1. Buffer events received from stream, note U of first event
2. Get depth snapshot from REST API
3. If snapshot lastUpdateId < first event U, get new snapshot
4. Discard buffered events where u <= snapshot lastUpdateId
5. First buffered event should have lastUpdateId within [U;u] range
6. Set local order book to snapshot
7. Apply buffered events, then subsequent events
"""
try:
# Extract update IDs
first_update_id = data.get('U') # First update ID in event
final_update_id = data.get('u') # Final update ID in event
if first_update_id is None or final_update_id is None:
logger.warning(f"Missing update IDs in depth update for {symbol}")
return
# Initialize buffers if needed
if symbol not in self.event_buffers:
self.event_buffers[symbol] = []
self.first_event_u[symbol] = first_update_id
self.snapshot_in_progress[symbol] = False
logger.debug(f"Started buffering events for {symbol}, first U: {first_update_id}")
# Check if order book is initialized and synchronized
if not self.order_book_initialized.get(symbol, False):
# Buffer this event
self.event_buffers[symbol].append(data)
logger.debug(f"Buffered event for {symbol}: U={first_update_id}, u={final_update_id}")
# Start snapshot initialization if not already in progress
if not self.snapshot_in_progress.get(symbol, False):
self.snapshot_in_progress[symbol] = True
await self._initialize_order_book_with_buffering(symbol)
return
# Order book is initialized, apply update directly
# In WebSocket-only mode (no REST snapshot), be more lenient with update IDs
last_update_id = self.last_update_ids.get(symbol, 0)
if last_update_id == 0:
# WebSocket-only mode: accept any update as starting point
logger.debug(f"WebSocket-only mode for {symbol}: accepting update as starting point (U={first_update_id}, u={final_update_id})")
await self._apply_depth_update(symbol, data)
elif final_update_id <= last_update_id:
# Event is older than our current state, ignore
logger.debug(f"Ignoring old update for {symbol}: {final_update_id} <= {last_update_id}")
return
elif first_update_id > last_update_id + 1:
# Gap detected - in WebSocket-only mode, just continue (less strict)
logger.warning(f"Gap detected for {symbol}: {first_update_id} > {last_update_id + 1}, continuing anyway (WebSocket-only mode)")
await self._apply_depth_update(symbol, data)
else:
# Normal update
await self._apply_depth_update(symbol, data)
except Exception as e:
logger.error(f"Error handling depth update for {symbol}: {e}")
async def _initialize_order_book_with_buffering(self, symbol: str):
"""Initialize order book following Binance recommendations with event buffering"""
try:
# On Windows, skip REST API and go directly to simplified mode to avoid rate limits
import platform
if platform.system().lower() == 'windows':
logger.info(f"Windows detected - using WebSocket-only mode for {symbol} (avoiding REST API rate limits)")
await self._init_simplified_mode(symbol)
return
max_attempts = 3
attempt = 0
while attempt < max_attempts:
attempt += 1
logger.info(f"Initializing order book for {symbol} (attempt {attempt})")
# Get snapshot from REST API
snapshot_data = await self._get_order_book_snapshot_data(symbol)
if not snapshot_data:
logger.error(f"Failed to get snapshot for {symbol}")
await asyncio.sleep(1)
continue
snapshot_last_update_id = snapshot_data.get('lastUpdateId', 0)
first_buffered_u = self.first_event_u.get(symbol, 0)
logger.debug(f"Snapshot lastUpdateId: {snapshot_last_update_id}, First buffered U: {first_buffered_u}")
# Check if snapshot is valid (step 3 in Binance procedure)
if snapshot_last_update_id < first_buffered_u:
logger.warning(f"Snapshot too old for {symbol}: {snapshot_last_update_id} < {first_buffered_u}, retrying...")
await asyncio.sleep(0.5)
continue
# Snapshot is valid, set up order book
await self._setup_order_book_from_snapshot(symbol, snapshot_data)
# Process buffered events (steps 4-7 in Binance procedure)
await self._process_buffered_events(symbol, snapshot_last_update_id)
# Mark as initialized
self.order_book_initialized[symbol] = True
self.snapshot_in_progress[symbol] = False
logger.info(f"Order book initialized for {symbol} with {len(self.event_buffers[symbol])} buffered events")
return
logger.error(f"Failed to initialize order book for {symbol} after {max_attempts} attempts")
logger.info(f"Switching to simplified mode for {symbol} (no order book sync)")
# Use simplified mode that doesn't maintain order book
await self._init_simplified_mode(symbol)
except Exception as e:
logger.error(f"Error initializing order book with buffering for {symbol}: {e}")
self.snapshot_in_progress[symbol] = False
async def _init_simplified_mode(self, symbol: str):
"""Initialize simplified mode that processes WebSocket data without order book sync"""
try:
# Mark as using simplified mode
if not hasattr(self, 'simplified_mode'):
self.simplified_mode = {}
self.simplified_mode[symbol] = True
# Clear any existing state
if symbol in self.event_buffers:
self.event_buffers[symbol] = []
if symbol in self.order_books:
del self.order_books[symbol]
# Mark as initialized
self.order_book_initialized[symbol] = True
self.snapshot_in_progress[symbol] = False
logger.info(f"Simplified mode initialized for {symbol} - will process raw WebSocket data")
except Exception as e:
logger.error(f"Error initializing simplified mode for {symbol}: {e}")
async def _process_simplified_depth_data(self, symbol: str, data: Dict):
"""Process depth data in simplified mode without order book synchronization"""
try:
# Extract bids and asks directly from the update
bids_data = data.get('b', [])
asks_data = data.get('a', [])
if not bids_data and not asks_data:
return
# Convert to simple format
bids = []
asks = []
for bid in bids_data:
if len(bid) >= 2:
price = float(bid[0])
size = float(bid[1])
if size > 0: # Only include non-zero quantities
bids.append({'price': price, 'size': size})
for ask in asks_data:
if len(ask) >= 2:
price = float(ask[0])
size = float(ask[1])
if size > 0: # Only include non-zero quantities
asks.append({'price': price, 'size': size})
if not bids and not asks:
return
# Sort for best bid/ask calculation
if bids:
bids.sort(key=lambda x: x['price'], reverse=True)
if asks:
asks.sort(key=lambda x: x['price'])
# Create simplified COB data
cob_data = {
'symbol': symbol,
'timestamp': datetime.now(),
'bids': bids,
'asks': asks,
'source': 'simplified_depth_stream',
'exchange': 'binance'
}
# Calculate basic stats if we have data
if bids and asks:
best_bid = bids[0]['price']
best_ask = asks[0]['price']
mid_price = (best_bid + best_ask) / 2
spread = best_ask - best_bid
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
cob_data['stats'] = {
'best_bid': best_bid,
'best_ask': best_ask,
'mid_price': mid_price,
'spread': spread,
'spread_bps': spread_bps,
'bid_levels': len(bids),
'ask_levels': len(asks),
'timestamp': datetime.now().isoformat(),
'mode': 'simplified'
}
# Update cache
self.latest_cob_data[symbol] = cob_data
# Notify callbacks
for callback in self.cob_callbacks:
try:
await callback(symbol, cob_data)
except Exception as e:
logger.error(f"Error in COB callback: {e}")
logger.debug(f"{symbol}: Simplified depth - {len(bids)} bids, {len(asks)} asks")
except Exception as e:
logger.error(f"Error processing simplified depth data for {symbol}: {e}")
async def _get_order_book_snapshot_data(self, symbol: str) -> Optional[Dict]:
"""Get order book snapshot data from REST API with Windows fallback"""
try:
# Try aiohttp first
if not self.rest_session:
await self._init_rest_session()
if self.rest_session:
# Convert symbol format for Binance API
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000"
async with self.rest_session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"Failed to get snapshot for {symbol}: HTTP {response.status}")
return None
else:
# Fallback to synchronous requests (Windows compatible)
logger.info(f"Using synchronous HTTP fallback for {symbol} snapshot")
return await self._get_snapshot_with_requests(symbol)
except Exception as e:
logger.error(f"Error getting snapshot data for {symbol}: {e}")
# Try synchronous fallback
try:
return await self._get_snapshot_with_requests(symbol)
except Exception as e2:
logger.error(f"Fallback also failed for {symbol}: {e2}")
return None
async def _get_snapshot_with_requests(self, symbol: str) -> Optional[Dict]:
"""Fallback method using synchronous requests library (Windows compatible)"""
try:
import requests
import asyncio
# Convert symbol format for Binance API
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=5000"
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: requests.get(url, timeout=10, headers={'User-Agent': 'COB-WebSocket/1.0'})
)
if response.status_code == 200:
logger.info(f"Got snapshot for {symbol} using requests fallback")
return response.json()
else:
logger.error(f"Requests fallback failed for {symbol}: HTTP {response.status_code}")
return None
except ImportError:
logger.error("requests library not available for fallback")
return None
except Exception as e:
logger.error(f"Error in requests fallback for {symbol}: {e}")
return None
async def _setup_order_book_from_snapshot(self, symbol: str, snapshot_data: Dict):
"""Set up local order book from REST API snapshot"""
try:
# Initialize order book structure
if symbol not in self.order_books:
self.order_books[symbol] = {'bids': {}, 'asks': {}}
# Clear existing data and populate with snapshot
self.order_books[symbol]['bids'] = {
float(price): float(qty) for price, qty in snapshot_data['bids'] if float(qty) > 0
}
self.order_books[symbol]['asks'] = {
float(price): float(qty) for price, qty in snapshot_data['asks'] if float(qty) > 0
}
# Set last update ID from snapshot
self.last_update_ids[symbol] = snapshot_data['lastUpdateId']
logger.debug(f"Set up order book for {symbol}: {len(self.order_books[symbol]['bids'])} bids, {len(self.order_books[symbol]['asks'])} asks, lastUpdateId: {snapshot_data['lastUpdateId']}")
except Exception as e:
logger.error(f"Error setting up order book from snapshot for {symbol}: {e}")
async def _process_buffered_events(self, symbol: str, snapshot_last_update_id: int):
"""Process buffered events according to Binance procedure"""
try:
buffered_events = self.event_buffers.get(symbol, [])
valid_events = []
# Step 4: Discard events where u <= lastUpdateId of snapshot
for event in buffered_events:
event_final_u = event.get('u', 0)
if event_final_u > snapshot_last_update_id:
valid_events.append(event)
else:
logger.debug(f"Discarded buffered event for {symbol}: u={event_final_u} <= snapshot lastUpdateId={snapshot_last_update_id}")
# Step 5: Validate first buffered event
if valid_events:
first_event = valid_events[0]
first_u = first_event.get('U', 0)
first_final_u = first_event.get('u', 0)
if not (first_u <= snapshot_last_update_id <= first_final_u):
logger.error(f"Synchronization error for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} not in range [{first_u}, {first_final_u}]")
# Reset and try again
self.order_book_initialized[symbol] = False
self.event_buffers[symbol] = []
return
logger.debug(f"First buffered event valid for {symbol}: snapshot lastUpdateId {snapshot_last_update_id} in range [{first_u}, {first_final_u}]")
# Step 6-7: Apply all valid buffered events
for event in valid_events:
await self._apply_depth_update(symbol, event)
# Clear buffer
self.event_buffers[symbol] = []
logger.info(f"Processed {len(valid_events)} buffered events for {symbol}")
except Exception as e:
logger.error(f"Error processing buffered events for {symbol}: {e}")
async def _apply_depth_update(self, symbol: str, data: Dict):
"""Apply a single depth update to the local order book"""
try:
first_update_id = data.get('U', 0)
final_update_id = data.get('u', 0)
last_update_id = self.last_update_ids.get(symbol, 0)
# Validate update sequence
if final_update_id <= last_update_id:
# Event is older than our current state, ignore
logger.debug(f"Ignoring old event for {symbol}: u={final_update_id} <= lastUpdateId={last_update_id}")
return
if first_update_id > last_update_id + 1:
# Gap detected, need to reinitialize
logger.warning(f"Gap detected in depth updates for {symbol}: U={first_update_id} > lastUpdateId+1={last_update_id+1}")
self.order_book_initialized[symbol] = False
self.event_buffers[symbol] = []
self.first_event_u[symbol] = first_update_id
await self._initialize_order_book_with_buffering(symbol)
return
# Apply updates to local order book
bids_updates = data.get('b', [])
asks_updates = data.get('a', [])
# Update bids
for bid_update in bids_updates:
if len(bid_update) >= 2:
price = float(bid_update[0])
quantity = float(bid_update[1])
if quantity == 0:
# Remove price level
self.order_books[symbol]['bids'].pop(price, None)
else:
# Update price level
self.order_books[symbol]['bids'][price] = quantity
# Update asks
for ask_update in asks_updates:
if len(ask_update) >= 2:
price = float(ask_update[0])
quantity = float(ask_update[1])
if quantity == 0:
# Remove price level
self.order_books[symbol]['asks'].pop(price, None)
else:
# Update price level
self.order_books[symbol]['asks'][price] = quantity
# Update last update ID
self.last_update_ids[symbol] = final_update_id
# Convert to COB format and notify callbacks
await self._create_cob_from_order_book(symbol)
logger.debug(f"Applied depth update for {symbol}: U={first_update_id}, u={final_update_id}")
except Exception as e:
logger.error(f"Error applying depth update for {symbol}: {e}")
async def _initialize_order_book(self, symbol: str):
"""Initialize order book from REST API snapshot (legacy method)"""
try:
# Use the new buffering approach for proper synchronization
if symbol not in self.event_buffers:
self.event_buffers[symbol] = []
self.first_event_u[symbol] = 0
self.snapshot_in_progress[symbol] = False
await self._initialize_order_book_with_buffering(symbol)
except Exception as e:
logger.error(f"Failed to initialize order book for {symbol}: {e}")
self.order_book_initialized[symbol] = False
async def _create_cob_from_order_book(self, symbol: str):
"""Create COB data from maintained order book"""
try:
if symbol not in self.order_books:
return
order_book = self.order_books[symbol]
# Sort and convert to list format
bids = [{'price': price, 'size': qty} for price, qty in order_book['bids'].items()]
asks = [{'price': price, 'size': qty} for price, qty in order_book['asks'].items()]
# Sort bids (descending) and asks (ascending)
bids.sort(key=lambda x: x['price'], reverse=True)
asks.sort(key=lambda x: x['price'])
# Limit depth
max_depth = 1000
if len(bids) > max_depth:
bids = bids[:max_depth]
if len(asks) > max_depth:
asks = asks[:max_depth]
# Create COB data structure
cob_data = {
'symbol': symbol,
'timestamp': datetime.now(),
'bids': bids,
'asks': asks,
'source': 'binance_depth_stream',
'exchange': 'binance'
}
# Calculate stats if we have valid data
if bids and asks:
best_bid = bids[0]
best_ask = asks[0]
mid_price = (best_bid['price'] + best_ask['price']) / 2
spread = best_ask['price'] - best_bid['price']
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
# Calculate volumes (top 20 levels for performance)
top_bids = bids[:20]
top_asks = asks[:20]
bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids)
ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks)
total_volume = bid_volume + ask_volume
volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0
cob_data['stats'] = {
'best_bid': best_bid['price'],
'best_ask': best_ask['price'],
'mid_price': mid_price,
'spread': spread,
'spread_bps': spread_bps,
'bid_volume': bid_volume,
'ask_volume': ask_volume,
'imbalance': volume_imbalance,
'bid_levels': len(bids),
'ask_levels': len(asks),
'timestamp': datetime.now().isoformat(),
'update_id': self.last_update_ids.get(symbol, 0)
}
# Update cache
self.latest_cob_data[symbol] = cob_data
# Notify callbacks
for callback in self.cob_callbacks:
try:
await callback(symbol, cob_data)
except Exception as e:
logger.error(f"Error in COB callback: {e}")
logger.debug(f"{symbol}: Depth stream update - ${cob_data.get('stats', {}).get('mid_price', 0):.2f}")
except Exception as e:
logger.error(f"Error creating COB from order book for {symbol}: {e}")
async def _process_combined_stream_message(self, symbol: str, stream_name: str, data: Dict):
"""Process messages from combined stream format"""
try:
# Extract stream type from stream name (e.g., "btcusdt@depth@1000ms" -> "depth")
stream_parts = stream_name.split('@')
if len(stream_parts) < 2:
logger.warning(f"Invalid stream name format: {stream_name}")
return
stream_type = stream_parts[1]
# Route to appropriate handler based on stream type
if stream_type == 'depth':
await self._handle_depth_stream(symbol, data)
elif stream_type == 'kline_1s':
await self._handle_kline_stream(symbol, data)
elif stream_type == 'ticker':
await self._handle_ticker_stream(symbol, data)
elif stream_type == 'aggTrade':
await self._handle_aggtrade_stream(symbol, data)
else:
logger.debug(f"Unhandled stream type: {stream_type} for {symbol}")
except Exception as e:
logger.error(f"Error processing combined stream message for {symbol}: {e}")
async def _handle_depth_stream(self, symbol: str, data: Dict):
"""Handle depth stream data (order book updates)"""
try:
# Check if we're in simplified mode
if hasattr(self, 'simplified_mode') and self.simplified_mode.get(symbol, False):
# Simplified mode: process raw data without order book sync
await self._process_simplified_depth_data(symbol, data)
return
# Normal mode: try to maintain order book
if data.get('e') == 'depthUpdate':
await self._handle_depth_update(symbol, data)
else:
# Handle partial book depth format
await self._process_websocket_message(symbol, data)
except Exception as e:
logger.error(f"Error handling depth stream for {symbol}: {e}")
# Fall back to simplified mode on repeated errors
if not hasattr(self, 'simplified_mode'):
self.simplified_mode = {}
if not self.simplified_mode.get(symbol, False):
logger.info(f"Switching {symbol} to simplified mode due to errors")
await self._init_simplified_mode(symbol)
async def _handle_kline_stream(self, symbol: str, data: Dict):
"""Handle 1-second kline/candlestick data with timezone support"""
try:
if data.get('e') != 'kline':
return
kline_data = data.get('k', {})
if not kline_data:
return
# Extract timestamps (always in UTC milliseconds according to Binance docs)
open_time_ms = kline_data.get('t', 0)
close_time_ms = kline_data.get('T', 0)
event_time_ms = data.get('E', 0)
# Convert timestamps to datetime objects
open_time_utc = datetime.fromtimestamp(open_time_ms / 1000) if open_time_ms else None
close_time_utc = datetime.fromtimestamp(close_time_ms / 1000) if close_time_ms else None
event_time_utc = datetime.fromtimestamp(event_time_ms / 1000) if event_time_ms else datetime.now()
# Extract candlestick data
candlestick = {
'symbol': symbol,
'timestamp': datetime.now(),
'event_time': event_time_utc,
'open_time': open_time_ms, # Raw timestamp (ms)
'close_time': close_time_ms, # Raw timestamp (ms)
'open_time_utc': open_time_utc, # Converted to UTC datetime
'close_time_utc': close_time_utc, # Converted to UTC datetime
'interval': kline_data.get('i', '1s'),
'open_price': float(kline_data.get('o', 0)),
'high_price': float(kline_data.get('h', 0)),
'low_price': float(kline_data.get('l', 0)),
'close_price': float(kline_data.get('c', 0)),
'volume': float(kline_data.get('v', 0)), # Base asset volume
'quote_volume': float(kline_data.get('q', 0)), # Quote asset volume
'trades_count': kline_data.get('n', 0),
'taker_buy_volume': float(kline_data.get('V', 0)), # Taker buy base asset volume
'taker_buy_quote_volume': float(kline_data.get('Q', 0)), # Taker buy quote asset volume
'is_closed': kline_data.get('x', False), # Is this kline closed?
'first_trade_id': kline_data.get('f', 0),
'last_trade_id': kline_data.get('L', 0),
'timezone_offset': self.timezone_offset, # Timezone info
'source': f'binance_kline_1s_{self.timezone_offset or "utc"}'
}
# Store latest candlestick data
if not hasattr(self, 'latest_candlestick_data'):
self.latest_candlestick_data = {}
self.latest_candlestick_data[symbol] = candlestick
# Notify callbacks with candlestick data
for callback in self.cob_callbacks:
try:
await callback(symbol, {'type': 'candlestick', 'data': candlestick})
except Exception as e:
logger.error(f"Error in candlestick callback: {e}")
logger.debug(f"{symbol}: 1s Kline - O:{candlestick['open_price']:.2f} H:{candlestick['high_price']:.2f} L:{candlestick['low_price']:.2f} C:{candlestick['close_price']:.2f} V:{candlestick['volume']:.2f}")
except Exception as e:
logger.error(f"Error handling kline stream for {symbol}: {e}")
async def _handle_ticker_stream(self, symbol: str, data: Dict):
"""Handle 24hr ticker data (includes volume statistics)"""
try:
if data.get('e') != '24hrTicker':
return
# Extract ticker data
ticker = {
'symbol': symbol,
'timestamp': datetime.now(),
'price_change': float(data.get('p', 0)),
'price_change_percent': float(data.get('P', 0)),
'weighted_avg_price': float(data.get('w', 0)),
'last_price': float(data.get('c', 0)),
'last_qty': float(data.get('Q', 0)),
'best_bid_price': float(data.get('b', 0)),
'best_bid_qty': float(data.get('B', 0)),
'best_ask_price': float(data.get('a', 0)),
'best_ask_qty': float(data.get('A', 0)),
'open_price': float(data.get('o', 0)),
'high_price': float(data.get('h', 0)),
'low_price': float(data.get('l', 0)),
'volume': float(data.get('v', 0)), # 24hr volume
'quote_volume': float(data.get('q', 0)), # 24hr quote volume
'open_time': data.get('O', 0),
'close_time': data.get('C', 0),
'first_trade_id': data.get('F', 0),
'last_trade_id': data.get('L', 0),
'trades_count': data.get('n', 0),
'source': 'binance_24hr_ticker'
}
# Store latest ticker data
if not hasattr(self, 'latest_ticker_data'):
self.latest_ticker_data = {}
self.latest_ticker_data[symbol] = ticker
# Notify callbacks with ticker data
for callback in self.cob_callbacks:
try:
await callback(symbol, {'type': 'ticker', 'data': ticker})
except Exception as e:
logger.error(f"Error in ticker callback: {e}")
logger.debug(f"{symbol}: 24hr Ticker - Price:{ticker['last_price']:.2f} Volume:{ticker['volume']:.2f} Change:{ticker['price_change_percent']:.2f}%")
except Exception as e:
logger.error(f"Error handling ticker stream for {symbol}: {e}")
async def _handle_aggtrade_stream(self, symbol: str, data: Dict):
"""Handle aggregated trade data for real-time volume analysis"""
try:
if data.get('e') != 'aggTrade':
return
# Extract aggregated trade data
agg_trade = {
'symbol': symbol,
'timestamp': datetime.now(),
'trade_id': data.get('a', 0),
'price': float(data.get('p', 0)),
'quantity': float(data.get('q', 0)),
'first_trade_id': data.get('f', 0),
'last_trade_id': data.get('l', 0),
'trade_time': data.get('T', 0),
'is_buyer_maker': data.get('m', False), # True if buyer is market maker
'notional_value': float(data.get('p', 0)) * float(data.get('q', 0)),
'source': 'binance_aggtrade'
}
# Store latest trade data
if not hasattr(self, 'latest_trade_data'):
self.latest_trade_data = {}
if symbol not in self.latest_trade_data:
self.latest_trade_data[symbol] = []
# Keep last 100 trades for volume analysis
self.latest_trade_data[symbol].append(agg_trade)
if len(self.latest_trade_data[symbol]) > 100:
self.latest_trade_data[symbol] = self.latest_trade_data[symbol][-100:]
# Calculate real-time volume metrics
recent_trades = self.latest_trade_data[symbol][-10:] # Last 10 trades
buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker'])
sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker'])
total_volume = buy_volume + sell_volume
volume_metrics = {
'buy_volume': buy_volume,
'sell_volume': sell_volume,
'total_volume': total_volume,
'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'),
'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0
}
# Notify callbacks with trade data
for callback in self.cob_callbacks:
try:
await callback(symbol, {
'type': 'aggtrade',
'data': agg_trade,
'volume_metrics': volume_metrics
})
except Exception as e:
logger.error(f"Error in aggtrade callback: {e}")
logger.debug(f"{symbol}: AggTrade - Price:{agg_trade['price']:.2f} Qty:{agg_trade['quantity']:.4f} BuyerMaker:{agg_trade['is_buyer_maker']}")
except Exception as e:
logger.error(f"Error handling aggtrade stream for {symbol}: {e}")
async def _start_rest_fallback(self, symbol: str):
"""Start REST API fallback for a symbol"""
if self.rest_fallback_active[symbol]:
return # Already active
self.rest_fallback_active[symbol] = True
# Cancel existing REST task
if symbol in self.rest_tasks and not self.rest_tasks[symbol].done():
self.rest_tasks[symbol].cancel()
# Start new REST task
self.rest_tasks[symbol] = asyncio.create_task(
self._rest_fallback_loop(symbol)
)
logger.warning(f"Started REST API fallback for {symbol}")
await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback")
async def _stop_rest_fallback(self, symbol: str):
"""Stop REST API fallback for a symbol"""
if not self.rest_fallback_active[symbol]:
return
self.rest_fallback_active[symbol] = False
if symbol in self.rest_tasks and not self.rest_tasks[symbol].done():
self.rest_tasks[symbol].cancel()
logger.info(f"Stopped REST API fallback for {symbol}")
async def _rest_fallback_loop(self, symbol: str):
"""REST API fallback loop"""
while self.rest_fallback_active[symbol]:
try:
await self._fetch_rest_orderbook(symbol)
await asyncio.sleep(1) # Update every second
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"REST fallback error for {symbol}: {e}")
await asyncio.sleep(5) # Wait longer on error
async def _fetch_rest_orderbook(self, symbol: str):
"""Fetch order book data via REST API"""
try:
if not self.rest_session:
return
# Binance REST API
rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT
url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000"
async with self.rest_session.get(url) as response:
if response.status == 200:
data = await response.json()
cob_data = {
'symbol': symbol,
'timestamp': datetime.now(),
'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']],
'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']],
'source': 'rest_fallback',
'exchange': 'binance'
}
# Calculate stats
if cob_data['bids'] and cob_data['asks']:
best_bid = max(cob_data['bids'], key=lambda x: x['price'])
best_ask = min(cob_data['asks'], key=lambda x: x['price'])
cob_data['stats'] = {
'best_bid': best_bid['price'],
'best_ask': best_ask['price'],
'spread': best_ask['price'] - best_bid['price'],
'mid_price': (best_bid['price'] + best_ask['price']) / 2,
'bid_volume': sum(bid['size'] for bid in cob_data['bids']),
'ask_volume': sum(ask['size'] for ask in cob_data['asks'])
}
# Update cache
self.latest_cob_data[symbol] = cob_data
# Notify callbacks
for callback in self.cob_callbacks:
try:
await callback(symbol, cob_data)
except Exception as e:
logger.error(f"Error in COB callback: {e}")
logger.debug(f"Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks")
else:
logger.warning(f"REST API error for {symbol}: HTTP {response.status}")
except Exception as e:
logger.error(f"Error fetching REST order book for {symbol}: {e}")
async def _monitor_connections(self):
"""Monitor WebSocket connections and provide status updates"""
while True:
try:
await asyncio.sleep(10) # Check every 10 seconds
for symbol in self.symbols:
status = self.status[symbol]
# Check for stale connections (no messages for 30 seconds)
if status.connected and status.last_message_time:
time_since_last = (datetime.now() - status.last_message_time).total_seconds()
if time_since_last > 30:
logger.warning(f"Stale connection detected for {symbol}: {time_since_last:.1f}s since last message")
status.connected = False
status.last_error = f"Stale connection: {time_since_last:.1f}s without messages"
await self._notify_dashboard_status(symbol, "stale", status.last_error)
# Log connection status and order book sync status
if status.connected:
ob_status = "Synced" if self.order_book_initialized.get(symbol, False) else "Syncing"
buffered_count = len(self.event_buffers.get(symbol, []))
logger.debug(f"{symbol}: Connected, {status.messages_received} messages, OB: {ob_status}, Buffered: {buffered_count}")
else:
logger.debug(f"{symbol}: Disconnected - {status.last_error}")
# Log overall system status
connected_count = sum(1 for status in self.status.values() if status.connected)
logger.debug(f"COB WebSocket status: {connected_count}/{len(self.symbols)} symbols connected")
except Exception as e:
logger.error(f"Error in connection monitor: {e}")
await asyncio.sleep(10)
async def _notify_dashboard_status(self, symbol: str, status: str, message: str):
"""Notify dashboard of WebSocket status changes"""
try:
if self.dashboard_callback:
status_data = {
'type': 'websocket_status',
'data': {
'status': status,
'message': message,
'timestamp': datetime.now().isoformat()
}
}
await self.dashboard_callback(symbol, status_data)
except Exception as e:
logger.error(f"Error notifying dashboard status for {symbol}: {e}")
def get_latest_candlestick(self, symbol: str) -> Optional[Dict]:
"""Get the latest 1-second candlestick data for a symbol"""
if hasattr(self, 'latest_candlestick_data'):
return self.latest_candlestick_data.get(symbol)
return None
def get_latest_ticker(self, symbol: str) -> Optional[Dict]:
"""Get the latest 24hr ticker data for a symbol"""
if hasattr(self, 'latest_ticker_data'):
return self.latest_ticker_data.get(symbol)
return None
def get_latest_trades(self, symbol: str, count: int = 10) -> List[Dict]:
"""Get the latest aggregated trades for a symbol"""
if hasattr(self, 'latest_trade_data') and symbol in self.latest_trade_data:
return self.latest_trade_data[symbol][-count:]
return []
def get_volume_metrics(self, symbol: str, timeframe_seconds: int = 60) -> Dict:
"""Calculate volume metrics for a given timeframe"""
try:
if not hasattr(self, 'latest_trade_data') or symbol not in self.latest_trade_data:
return {'error': 'No trade data available'}
current_time = datetime.now().timestamp() * 1000 # Convert to milliseconds
cutoff_time = current_time - (timeframe_seconds * 1000)
# Filter trades within timeframe
recent_trades = [
trade for trade in self.latest_trade_data[symbol]
if trade['trade_time'] >= cutoff_time
]
if not recent_trades:
return {'error': 'No recent trades in timeframe'}
# Calculate metrics
buy_volume = sum(trade['notional_value'] for trade in recent_trades if not trade['is_buyer_maker'])
sell_volume = sum(trade['notional_value'] for trade in recent_trades if trade['is_buyer_maker'])
total_volume = buy_volume + sell_volume
trade_count = len(recent_trades)
avg_trade_size = total_volume / trade_count if trade_count > 0 else 0
return {
'timeframe_seconds': timeframe_seconds,
'trade_count': trade_count,
'buy_volume': buy_volume,
'sell_volume': sell_volume,
'total_volume': total_volume,
'buy_sell_ratio': buy_volume / sell_volume if sell_volume > 0 else float('inf'),
'volume_imbalance': (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0,
'avg_trade_size': avg_trade_size,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error calculating volume metrics for {symbol}: {e}")
return {'error': str(e)}
def _convert_timestamp_to_timezone(self, timestamp_ms: int, target_timezone: str = None) -> datetime:
"""Convert UTC timestamp to specified timezone
Args:
timestamp_ms: UTC timestamp in milliseconds
target_timezone: Target timezone (e.g., '+08:00' for UTC+8)
Returns:
datetime object in target timezone
"""
try:
from datetime import timezone, timedelta
# Convert to UTC datetime
utc_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
if not target_timezone:
return utc_dt
# Parse timezone offset (e.g., '+08:00')
if target_timezone.startswith('+') or target_timezone.startswith('-'):
sign = 1 if target_timezone.startswith('+') else -1
hours, minutes = map(int, target_timezone[1:].split(':'))
offset = timedelta(hours=sign * hours, minutes=sign * minutes)
target_tz = timezone(offset)
return utc_dt.astimezone(target_tz)
return utc_dt
except Exception as e:
logger.error(f"Error converting timestamp to timezone: {e}")
return datetime.fromtimestamp(timestamp_ms / 1000)
def get_kline_with_timezone(self, symbol: str, target_timezone: str = None) -> Optional[Dict]:
"""Get latest kline data with timezone conversion
Args:
symbol: Trading symbol
target_timezone: Target timezone (e.g., '+08:00' for UTC+8)
Returns:
Kline data with timezone-converted timestamps
"""
try:
candlestick = self.get_latest_candlestick(symbol)
if not candlestick:
return None
# Create a copy with timezone conversions
result = candlestick.copy()
if target_timezone and candlestick.get('open_time'):
result['open_time_local'] = self._convert_timestamp_to_timezone(
candlestick['open_time'], target_timezone
)
result['close_time_local'] = self._convert_timestamp_to_timezone(
candlestick['close_time'], target_timezone
)
result['target_timezone'] = target_timezone
return result
except Exception as e:
logger.error(f"Error getting kline with timezone for {symbol}: {e}")
return None
def get_order_book_status(self, symbol: str) -> Dict:
"""Get detailed order book synchronization status"""
try:
return {
'symbol': symbol,
'initialized': self.order_book_initialized.get(symbol, False),
'snapshot_in_progress': self.snapshot_in_progress.get(symbol, False),
'buffered_events': len(self.event_buffers.get(symbol, [])),
'first_event_u': self.first_event_u.get(symbol, 0),
'last_update_id': self.last_update_ids.get(symbol, 0),
'bid_levels': len(self.order_books.get(symbol, {}).get('bids', {})),
'ask_levels': len(self.order_books.get(symbol, {}).get('asks', {})),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting order book status for {symbol}: {e}")
return {'error': str(e)}
def get_all_order_book_status(self) -> Dict[str, Dict]:
"""Get order book status for all symbols"""
return {symbol: self.get_order_book_status(symbol) for symbol in self.symbols}
async def _notify_dashboard_status(self, symbol: str, status: str, message: str):
"""Notify dashboard of status changes"""
try:
if self.dashboard_callback:
status_data = {
'type': 'cob_status',
'symbol': symbol,
'status': status,
'message': message,
'timestamp': datetime.now().isoformat()
}
# Check if callback is async or sync
if asyncio.iscoroutinefunction(self.dashboard_callback):
await self.dashboard_callback(status_data)
else:
# Call sync function directly
self.dashboard_callback(status_data)
except Exception as e:
logger.error(f"Error notifying dashboard: {e}")
def get_status_summary(self) -> Dict[str, Any]:
"""Get status summary for all symbols"""
summary = {
'websockets_available': WEBSOCKETS_AVAILABLE,
'symbols': {},
'overall_status': 'unknown'
}
connected_count = 0
fallback_count = 0
for symbol in self.symbols:
status = self.status[symbol]
symbol_status = {
'connected': status.connected,
'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None,
'connection_attempts': status.connection_attempts,
'last_error': status.last_error,
'messages_received': status.messages_received,
'rest_fallback_active': self.rest_fallback_active[symbol]
}
if status.connected:
connected_count += 1
elif self.rest_fallback_active[symbol]:
fallback_count += 1
summary['symbols'][symbol] = symbol_status
# Determine overall status
if connected_count == len(self.symbols):
summary['overall_status'] = 'all_connected'
elif connected_count + fallback_count == len(self.symbols):
summary['overall_status'] = 'partial_fallback'
else:
summary['overall_status'] = 'degraded'
return summary
# Global instance for easy access
enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None
async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None, timezone_offset: str = None) -> EnhancedCOBWebSocket:
"""Get or create the global enhanced COB WebSocket instance
Args:
symbols: List of symbols to monitor
dashboard_callback: Callback function for dashboard updates
timezone_offset: Timezone offset for kline streams (None for UTC, '+08:00' for UTC+8)
"""
global enhanced_cob_websocket
if enhanced_cob_websocket is None:
enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback, timezone_offset)
await enhanced_cob_websocket.start()
return enhanced_cob_websocket
async def stop_enhanced_cob_websocket():
"""Stop the global enhanced COB WebSocket instance"""
global enhanced_cob_websocket
if enhanced_cob_websocket:
await enhanced_cob_websocket.stop()
enhanced_cob_websocket = None