12 KiB
WebSocket COB Data Fix Design Document
Overview
This design document outlines the approach to fix the WebSocket COB (Change of Basis) data processing issue in the trading system. The current implementation is failing with 'NoneType' object has no attribute 'append'
errors for both BTC/USDT and ETH/USDT pairs, which indicates that a data structure expected to be a list is actually None. This issue is preventing the dashboard from functioning properly and needs to be addressed to ensure reliable real-time market data processing.
Architecture
The COB data processing pipeline involves several components:
- MultiExchangeCOBProvider: Collects order book data from exchanges via WebSockets
- StandardizedDataProvider: Extends DataProvider with standardized BaseDataInput functionality
- Dashboard Components: Display COB data in the UI
The error occurs during WebSocket data processing, specifically when trying to append data to a collection that hasn't been properly initialized. The fix will focus on ensuring proper initialization of data structures and implementing robust error handling.
Components and Interfaces
1. MultiExchangeCOBProvider
The MultiExchangeCOBProvider
class is responsible for collecting order book data from exchanges and distributing it to subscribers. The issue appears to be in the WebSocket data processing logic, where data structures may not be properly initialized before use.
Key Issues to Address
- Data Structure Initialization: Ensure all data structures (particularly collections that will have
append
called on them) are properly initialized during object creation. - Subscriber Notification: Fix the
_notify_cob_subscribers
method to handle edge cases and ensure data is properly formatted before notification. - WebSocket Processing: Enhance error handling in WebSocket processing methods to prevent cascading failures.
Implementation Details
class MultiExchangeCOBProvider:
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
# Existing initialization code...
# Ensure all data structures are properly initialized
self.cob_data_cache = {} # Cache for COB data
self.cob_subscribers = [] # List of callback functions
self.exchange_order_books = {}
self.session_trades = {}
self.svp_cache = {}
# Initialize data structures for each symbol
for symbol in symbols:
self.cob_data_cache[symbol] = {}
self.exchange_order_books[symbol] = {}
self.session_trades[symbol] = []
self.svp_cache[symbol] = {}
# Initialize exchange-specific data structures
for exchange_name in self.active_exchanges:
self.exchange_order_books[symbol][exchange_name] = {
'bids': {},
'asks': {},
'deep_bids': {},
'deep_asks': {},
'timestamp': datetime.now(),
'deep_timestamp': datetime.now(),
'connected': False,
'last_update_id': 0
}
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
"""Notify all subscribers of COB data updates with improved error handling"""
try:
if not cob_snapshot:
logger.warning(f"Attempted to notify subscribers with empty COB snapshot for {symbol}")
return
for callback in self.cob_subscribers:
try:
if asyncio.iscoroutinefunction(callback):
await callback(symbol, cob_snapshot)
else:
callback(symbol, cob_snapshot)
except Exception as e:
logger.error(f"Error in COB subscriber callback: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error notifying COB subscribers: {e}", exc_info=True)
2. StandardizedDataProvider
The StandardizedDataProvider
class extends the base DataProvider
with standardized data input functionality. It needs to properly handle COB data and ensure all data structures are initialized.
Key Issues to Address
- COB Data Handling: Ensure proper initialization and validation of COB data structures.
- Error Handling: Improve error handling when processing COB data.
- Data Structure Consistency: Maintain consistent data structures throughout the processing pipeline.
Implementation Details
class StandardizedDataProvider(DataProvider):
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
"""Initialize the standardized data provider with proper data structure initialization"""
super().__init__(symbols, timeframes)
# Standardized data storage
self.base_data_cache = {} # {symbol: BaseDataInput}
self.cob_data_cache = {} # {symbol: COBData}
# Model output management with extensible storage
self.model_output_manager = ModelOutputManager(
cache_dir=str(self.cache_dir / "model_outputs"),
max_history=1000
)
# COB moving averages calculation
self.cob_imbalance_history = {} # {symbol: deque of (timestamp, imbalance_data)}
self.ma_calculation_lock = Lock()
# Initialize caches for each symbol
for symbol in self.symbols:
self.base_data_cache[symbol] = None
self.cob_data_cache[symbol] = None
self.cob_imbalance_history[symbol] = deque(maxlen=300) # 5 minutes of 1s data
# COB provider integration
self.cob_provider = None
self._initialize_cob_provider()
logger.info("StandardizedDataProvider initialized with BaseDataInput support")
def _process_cob_data(self, symbol: str, cob_snapshot: Dict):
"""Process COB data with improved error handling"""
try:
if not cob_snapshot:
logger.warning(f"Received empty COB snapshot for {symbol}")
return
# Process COB data and update caches
# ...
except Exception as e:
logger.error(f"Error processing COB data for {symbol}: {e}", exc_info=True)
3. WebSocket COB Data Processing
The WebSocket COB data processing logic needs to be enhanced to handle edge cases and ensure proper data structure initialization.
Key Issues to Address
- WebSocket Connection Management: Improve connection management to handle disconnections gracefully.
- Data Processing: Ensure data is properly validated before processing.
- Error Recovery: Implement recovery mechanisms for WebSocket failures.
Implementation Details
async def _stream_binance_orderbook(self, symbol: str, config: ExchangeConfig):
"""Stream order book data from Binance with improved error handling"""
reconnect_delay = 1 # Start with 1 second delay
max_reconnect_delay = 60 # Maximum delay of 60 seconds
while self.is_streaming:
try:
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms"
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
if websockets is None or websockets_connect is None:
raise ImportError("websockets module not available")
async with websockets_connect(ws_url) as websocket:
# Ensure data structures are initialized
if symbol not in self.exchange_order_books:
self.exchange_order_books[symbol] = {}
if 'binance' not in self.exchange_order_books[symbol]:
self.exchange_order_books[symbol]['binance'] = {
'bids': {},
'asks': {},
'deep_bids': {},
'deep_asks': {},
'timestamp': datetime.now(),
'deep_timestamp': datetime.now(),
'connected': False,
'last_update_id': 0
}
self.exchange_order_books[symbol]['binance']['connected'] = True
logger.info(f"Connected to Binance order book stream for {symbol}")
# Reset reconnect delay on successful connection
reconnect_delay = 1
async for message in websocket:
if not self.is_streaming:
break
try:
data = json.loads(message)
await self._process_binance_orderbook(symbol, data)
except json.JSONDecodeError as e:
logger.error(f"Error parsing Binance message: {e}")
except Exception as e:
logger.error(f"Error processing Binance data: {e}", exc_info=True)
except Exception as e:
logger.error(f"Binance WebSocket error for {symbol}: {e}", exc_info=True)
# Mark as disconnected
if symbol in self.exchange_order_books and 'binance' in self.exchange_order_books[symbol]:
self.exchange_order_books[symbol]['binance']['connected'] = False
# Implement exponential backoff for reconnection
logger.info(f"Reconnecting to Binance WebSocket for {symbol} in {reconnect_delay}s")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
Data Models
The data models remain unchanged, but we need to ensure they are properly initialized and validated throughout the system.
COBSnapshot
@dataclass
class COBSnapshot:
"""Complete Consolidated Order Book snapshot"""
symbol: str
timestamp: datetime
consolidated_bids: List[ConsolidatedOrderBookLevel]
consolidated_asks: List[ConsolidatedOrderBookLevel]
exchanges_active: List[str]
volume_weighted_mid: float
total_bid_liquidity: float
total_ask_liquidity: float
spread_bps: float
liquidity_imbalance: float
price_buckets: Dict[str, Dict[str, float]] # Fine-grain volume buckets
Error Handling
WebSocket Connection Errors
- Implement exponential backoff for reconnection attempts
- Log detailed error information
- Maintain system operation with last valid data
Data Processing Errors
- Validate data before processing
- Handle edge cases gracefully
- Log detailed error information
- Continue operation with last valid data
Subscriber Notification Errors
- Catch and log errors in subscriber callbacks
- Prevent errors in one subscriber from affecting others
- Ensure data is properly formatted before notification
Testing Strategy
Unit Testing
- Test data structure initialization
- Test error handling in WebSocket processing
- Test subscriber notification with various edge cases
Integration Testing
- Test end-to-end COB data flow
- Test recovery from WebSocket disconnections
- Test handling of malformed data
System Testing
- Test dashboard operation with COB data
- Test system stability under high load
- Test recovery from various failure scenarios
Implementation Plan
- Fix data structure initialization in
MultiExchangeCOBProvider
- Enhance error handling in WebSocket processing
- Improve subscriber notification logic
- Update
StandardizedDataProvider
to properly handle COB data - Add comprehensive logging for debugging
- Implement recovery mechanisms for WebSocket failures
- Test all changes thoroughly
Conclusion
This design addresses the WebSocket COB data processing issue by ensuring proper initialization of data structures, implementing robust error handling, and adding recovery mechanisms for WebSocket failures. These changes will improve the reliability and stability of the trading system, allowing traders to monitor market data in real-time without interruptions.