488 lines
20 KiB
Python
488 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced COB WebSocket Implementation
|
|
|
|
Robust WebSocket implementation for Consolidated Order Book data with:
|
|
- Maximum allowed depth subscription
|
|
- Clear error handling and warnings
|
|
- Automatic reconnection with exponential backoff
|
|
- Fallback to REST API when WebSocket fails
|
|
- Dashboard integration with status updates
|
|
|
|
This replaces the existing COB WebSocket implementation with a more reliable version.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
from collections import deque, defaultdict
|
|
from dataclasses import dataclass
|
|
import aiohttp
|
|
import weakref
|
|
|
|
try:
|
|
import websockets
|
|
from websockets.client import connect as websockets_connect
|
|
from websockets.exceptions import ConnectionClosed, WebSocketException
|
|
WEBSOCKETS_AVAILABLE = True
|
|
except ImportError:
|
|
websockets = None
|
|
websockets_connect = None
|
|
ConnectionClosed = Exception
|
|
WebSocketException = Exception
|
|
WEBSOCKETS_AVAILABLE = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class COBWebSocketStatus:
|
|
"""Status tracking for COB WebSocket connections"""
|
|
connected: bool = False
|
|
last_message_time: Optional[datetime] = None
|
|
connection_attempts: int = 0
|
|
last_error: Optional[str] = None
|
|
reconnect_delay: float = 1.0
|
|
max_reconnect_delay: float = 60.0
|
|
messages_received: int = 0
|
|
|
|
def reset_reconnect_delay(self):
|
|
"""Reset reconnect delay on successful connection"""
|
|
self.reconnect_delay = 1.0
|
|
|
|
def increase_reconnect_delay(self):
|
|
"""Increase reconnect delay with exponential backoff"""
|
|
self.reconnect_delay = min(self.max_reconnect_delay, self.reconnect_delay * 2)
|
|
|
|
class EnhancedCOBWebSocket:
|
|
"""Enhanced COB WebSocket with robust error handling and fallback"""
|
|
|
|
def __init__(self, symbols: List[str] = None, dashboard_callback: Callable = None):
|
|
"""
|
|
Initialize Enhanced COB WebSocket
|
|
|
|
Args:
|
|
symbols: List of symbols to monitor (default: ['BTC/USDT', 'ETH/USDT'])
|
|
dashboard_callback: Callback function for dashboard status updates
|
|
"""
|
|
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
|
|
self.dashboard_callback = dashboard_callback
|
|
|
|
# Connection status tracking
|
|
self.status: Dict[str, COBWebSocketStatus] = {
|
|
symbol: COBWebSocketStatus() for symbol in self.symbols
|
|
}
|
|
|
|
# Data callbacks
|
|
self.cob_callbacks: List[Callable] = []
|
|
self.error_callbacks: List[Callable] = []
|
|
|
|
# Latest data cache
|
|
self.latest_cob_data: Dict[str, Dict] = {}
|
|
|
|
# WebSocket connections
|
|
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
# REST API fallback
|
|
self.rest_session: Optional[aiohttp.ClientSession] = None
|
|
self.rest_fallback_active: Dict[str, bool] = {symbol: False for symbol in self.symbols}
|
|
self.rest_tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
# Configuration
|
|
self.max_depth = 1000 # Maximum depth for order book
|
|
self.update_speed = '100ms' # Binance update speed
|
|
|
|
logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}")
|
|
if not WEBSOCKETS_AVAILABLE:
|
|
logger.error("⚠️ WebSockets module not available - COB data will be limited to REST API")
|
|
|
|
def add_cob_callback(self, callback: Callable):
|
|
"""Add callback for COB data updates"""
|
|
self.cob_callbacks.append(callback)
|
|
|
|
def add_error_callback(self, callback: Callable):
|
|
"""Add callback for error notifications"""
|
|
self.error_callbacks.append(callback)
|
|
|
|
async def start(self):
|
|
"""Start COB WebSocket connections"""
|
|
logger.info("🚀 Starting Enhanced COB WebSocket system")
|
|
|
|
# Initialize REST session for fallback
|
|
await self._init_rest_session()
|
|
|
|
# Start WebSocket connections for each symbol
|
|
for symbol in self.symbols:
|
|
await self._start_symbol_websocket(symbol)
|
|
|
|
# Start monitoring task
|
|
asyncio.create_task(self._monitor_connections())
|
|
|
|
logger.info("✅ Enhanced COB WebSocket system started")
|
|
|
|
async def stop(self):
|
|
"""Stop all WebSocket connections"""
|
|
logger.info("🛑 Stopping Enhanced COB WebSocket system")
|
|
|
|
# Cancel all WebSocket tasks
|
|
for symbol, task in self.websocket_tasks.items():
|
|
if task and not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Cancel all REST tasks
|
|
for symbol, task in self.rest_tasks.items():
|
|
if task and not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Close REST session
|
|
if self.rest_session:
|
|
await self.rest_session.close()
|
|
|
|
logger.info("✅ Enhanced COB WebSocket system stopped")
|
|
|
|
async def _init_rest_session(self):
|
|
"""Initialize REST API session for fallback"""
|
|
try:
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
self.rest_session = aiohttp.ClientSession(timeout=timeout)
|
|
logger.info("✅ REST API session initialized for fallback")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize REST session: {e}")
|
|
|
|
async def _start_symbol_websocket(self, symbol: str):
|
|
"""Start WebSocket connection for a specific symbol"""
|
|
if not WEBSOCKETS_AVAILABLE:
|
|
logger.warning(f"⚠️ WebSockets not available for {symbol}, starting REST fallback")
|
|
await self._start_rest_fallback(symbol)
|
|
return
|
|
|
|
# Cancel existing task if running
|
|
if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done():
|
|
self.websocket_tasks[symbol].cancel()
|
|
|
|
# Start new WebSocket task
|
|
self.websocket_tasks[symbol] = asyncio.create_task(
|
|
self._websocket_connection_loop(symbol)
|
|
)
|
|
|
|
logger.info(f"🔌 Started WebSocket task for {symbol}")
|
|
|
|
async def _websocket_connection_loop(self, symbol: str):
|
|
"""Main WebSocket connection loop with reconnection logic"""
|
|
status = self.status[symbol]
|
|
|
|
while True:
|
|
try:
|
|
logger.info(f"🔌 Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})")
|
|
status.connection_attempts += 1
|
|
|
|
# Create WebSocket URL with maximum depth
|
|
ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT
|
|
ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@{self.update_speed}"
|
|
|
|
logger.info(f"🔗 Connecting to: {ws_url}")
|
|
|
|
async with websockets_connect(ws_url) as websocket:
|
|
# Connection successful
|
|
status.connected = True
|
|
status.last_error = None
|
|
status.reset_reconnect_delay()
|
|
|
|
logger.info(f"✅ WebSocket connected for {symbol}")
|
|
await self._notify_dashboard_status(symbol, "connected", "WebSocket connected")
|
|
|
|
# Deactivate REST fallback
|
|
if self.rest_fallback_active[symbol]:
|
|
await self._stop_rest_fallback(symbol)
|
|
|
|
# Message receiving loop
|
|
async for message in websocket:
|
|
try:
|
|
data = json.loads(message)
|
|
await self._process_websocket_message(symbol, data)
|
|
|
|
status.last_message_time = datetime.now()
|
|
status.messages_received += 1
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"⚠️ Invalid JSON from {symbol} WebSocket: {e}")
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}")
|
|
|
|
except ConnectionClosed as e:
|
|
status.connected = False
|
|
status.last_error = f"Connection closed: {e}"
|
|
logger.warning(f"🔌 WebSocket connection closed for {symbol}: {e}")
|
|
|
|
except WebSocketException as e:
|
|
status.connected = False
|
|
status.last_error = f"WebSocket error: {e}"
|
|
logger.error(f"❌ WebSocket error for {symbol}: {e}")
|
|
|
|
except Exception as e:
|
|
status.connected = False
|
|
status.last_error = f"Unexpected error: {e}"
|
|
logger.error(f"❌ Unexpected WebSocket error for {symbol}: {e}")
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Connection failed or closed - start REST fallback
|
|
await self._notify_dashboard_status(symbol, "disconnected", status.last_error)
|
|
await self._start_rest_fallback(symbol)
|
|
|
|
# Wait before reconnecting
|
|
status.increase_reconnect_delay()
|
|
logger.info(f"⏳ Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}")
|
|
await asyncio.sleep(status.reconnect_delay)
|
|
|
|
async def _process_websocket_message(self, symbol: str, data: Dict):
|
|
"""Process WebSocket message and convert to COB format"""
|
|
try:
|
|
# Binance depth stream format
|
|
if 'b' in data and 'a' in data: # bids and asks
|
|
cob_data = {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now(),
|
|
'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['b']],
|
|
'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['a']],
|
|
'source': 'websocket',
|
|
'exchange': 'binance'
|
|
}
|
|
|
|
# Calculate stats
|
|
if cob_data['bids'] and cob_data['asks']:
|
|
best_bid = max(cob_data['bids'], key=lambda x: x['price'])
|
|
best_ask = min(cob_data['asks'], key=lambda x: x['price'])
|
|
|
|
cob_data['stats'] = {
|
|
'best_bid': best_bid['price'],
|
|
'best_ask': best_ask['price'],
|
|
'spread': best_ask['price'] - best_bid['price'],
|
|
'mid_price': (best_bid['price'] + best_ask['price']) / 2,
|
|
'bid_volume': sum(bid['size'] for bid in cob_data['bids']),
|
|
'ask_volume': sum(ask['size'] for ask in cob_data['asks'])
|
|
}
|
|
|
|
# Update cache
|
|
self.latest_cob_data[symbol] = cob_data
|
|
|
|
# Notify callbacks
|
|
for callback in self.cob_callbacks:
|
|
try:
|
|
await callback(symbol, cob_data)
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in COB callback: {e}")
|
|
|
|
logger.debug(f"📊 Processed WebSocket COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}")
|
|
|
|
async def _start_rest_fallback(self, symbol: str):
|
|
"""Start REST API fallback for a symbol"""
|
|
if self.rest_fallback_active[symbol]:
|
|
return # Already active
|
|
|
|
self.rest_fallback_active[symbol] = True
|
|
|
|
# Cancel existing REST task
|
|
if symbol in self.rest_tasks and not self.rest_tasks[symbol].done():
|
|
self.rest_tasks[symbol].cancel()
|
|
|
|
# Start new REST task
|
|
self.rest_tasks[symbol] = asyncio.create_task(
|
|
self._rest_fallback_loop(symbol)
|
|
)
|
|
|
|
logger.warning(f"⚠️ Started REST API fallback for {symbol}")
|
|
await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback")
|
|
|
|
async def _stop_rest_fallback(self, symbol: str):
|
|
"""Stop REST API fallback for a symbol"""
|
|
if not self.rest_fallback_active[symbol]:
|
|
return
|
|
|
|
self.rest_fallback_active[symbol] = False
|
|
|
|
if symbol in self.rest_tasks and not self.rest_tasks[symbol].done():
|
|
self.rest_tasks[symbol].cancel()
|
|
|
|
logger.info(f"✅ Stopped REST API fallback for {symbol}")
|
|
|
|
async def _rest_fallback_loop(self, symbol: str):
|
|
"""REST API fallback loop"""
|
|
while self.rest_fallback_active[symbol]:
|
|
try:
|
|
await self._fetch_rest_orderbook(symbol)
|
|
await asyncio.sleep(1) # Update every second
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"❌ REST fallback error for {symbol}: {e}")
|
|
await asyncio.sleep(5) # Wait longer on error
|
|
|
|
async def _fetch_rest_orderbook(self, symbol: str):
|
|
"""Fetch order book data via REST API"""
|
|
try:
|
|
if not self.rest_session:
|
|
return
|
|
|
|
# Binance REST API
|
|
rest_symbol = symbol.replace('/', '') # BTCUSDT, ETHUSDT
|
|
url = f"https://api.binance.com/api/v3/depth?symbol={rest_symbol}&limit=1000"
|
|
|
|
async with self.rest_session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
|
|
cob_data = {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now(),
|
|
'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['bids']],
|
|
'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['asks']],
|
|
'source': 'rest_fallback',
|
|
'exchange': 'binance'
|
|
}
|
|
|
|
# Calculate stats
|
|
if cob_data['bids'] and cob_data['asks']:
|
|
best_bid = max(cob_data['bids'], key=lambda x: x['price'])
|
|
best_ask = min(cob_data['asks'], key=lambda x: x['price'])
|
|
|
|
cob_data['stats'] = {
|
|
'best_bid': best_bid['price'],
|
|
'best_ask': best_ask['price'],
|
|
'spread': best_ask['price'] - best_bid['price'],
|
|
'mid_price': (best_bid['price'] + best_ask['price']) / 2,
|
|
'bid_volume': sum(bid['size'] for bid in cob_data['bids']),
|
|
'ask_volume': sum(ask['size'] for ask in cob_data['asks'])
|
|
}
|
|
|
|
# Update cache
|
|
self.latest_cob_data[symbol] = cob_data
|
|
|
|
# Notify callbacks
|
|
for callback in self.cob_callbacks:
|
|
try:
|
|
await callback(symbol, cob_data)
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in COB callback: {e}")
|
|
|
|
logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks")
|
|
|
|
else:
|
|
logger.warning(f"⚠️ REST API error for {symbol}: HTTP {response.status}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error fetching REST order book for {symbol}: {e}")
|
|
|
|
async def _monitor_connections(self):
|
|
"""Monitor WebSocket connections and provide status updates"""
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(10) # Check every 10 seconds
|
|
|
|
for symbol in self.symbols:
|
|
status = self.status[symbol]
|
|
|
|
# Check for stale connections
|
|
if status.connected and status.last_message_time:
|
|
time_since_last = datetime.now() - status.last_message_time
|
|
if time_since_last > timedelta(seconds=30):
|
|
logger.warning(f"⚠️ No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s")
|
|
await self._notify_dashboard_status(symbol, "stale", "No recent messages")
|
|
|
|
# Log status
|
|
if status.connected:
|
|
logger.debug(f"✅ {symbol}: Connected, {status.messages_received} messages received")
|
|
elif self.rest_fallback_active[symbol]:
|
|
logger.debug(f"⚠️ {symbol}: Using REST fallback")
|
|
else:
|
|
logger.debug(f"❌ {symbol}: Disconnected, last error: {status.last_error}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in connection monitor: {e}")
|
|
|
|
async def _notify_dashboard_status(self, symbol: str, status: str, message: str):
|
|
"""Notify dashboard of status changes"""
|
|
try:
|
|
if self.dashboard_callback:
|
|
await self.dashboard_callback({
|
|
'type': 'cob_status',
|
|
'symbol': symbol,
|
|
'status': status,
|
|
'message': message,
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"❌ Error notifying dashboard: {e}")
|
|
|
|
def get_status_summary(self) -> Dict[str, Any]:
|
|
"""Get status summary for all symbols"""
|
|
summary = {
|
|
'websockets_available': WEBSOCKETS_AVAILABLE,
|
|
'symbols': {},
|
|
'overall_status': 'unknown'
|
|
}
|
|
|
|
connected_count = 0
|
|
fallback_count = 0
|
|
|
|
for symbol in self.symbols:
|
|
status = self.status[symbol]
|
|
symbol_status = {
|
|
'connected': status.connected,
|
|
'last_message_time': status.last_message_time.isoformat() if status.last_message_time else None,
|
|
'connection_attempts': status.connection_attempts,
|
|
'last_error': status.last_error,
|
|
'messages_received': status.messages_received,
|
|
'rest_fallback_active': self.rest_fallback_active[symbol]
|
|
}
|
|
|
|
if status.connected:
|
|
connected_count += 1
|
|
elif self.rest_fallback_active[symbol]:
|
|
fallback_count += 1
|
|
|
|
summary['symbols'][symbol] = symbol_status
|
|
|
|
# Determine overall status
|
|
if connected_count == len(self.symbols):
|
|
summary['overall_status'] = 'all_connected'
|
|
elif connected_count + fallback_count == len(self.symbols):
|
|
summary['overall_status'] = 'partial_fallback'
|
|
else:
|
|
summary['overall_status'] = 'degraded'
|
|
|
|
return summary
|
|
|
|
# Global instance for easy access
|
|
enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None
|
|
|
|
async def get_enhanced_cob_websocket(symbols: List[str] = None, dashboard_callback: Callable = None) -> EnhancedCOBWebSocket:
|
|
"""Get or create the global enhanced COB WebSocket instance"""
|
|
global enhanced_cob_websocket
|
|
|
|
if enhanced_cob_websocket is None:
|
|
enhanced_cob_websocket = EnhancedCOBWebSocket(symbols, dashboard_callback)
|
|
await enhanced_cob_websocket.start()
|
|
|
|
return enhanced_cob_websocket
|
|
|
|
async def stop_enhanced_cob_websocket():
|
|
"""Stop the global enhanced COB WebSocket instance"""
|
|
global enhanced_cob_websocket
|
|
|
|
if enhanced_cob_websocket:
|
|
await enhanced_cob_websocket.stop()
|
|
enhanced_cob_websocket = None |