""" Robust COB (Consolidated Order Book) Provider This module provides a robust COB data provider that handles: - HTTP 418 errors from Binance (rate limiting) - Thread safety issues - API rate limiting and backoff - Fallback data sources - Error recovery strategies Features: - Automatic rate limiting and backoff - Multiple exchange support with fallbacks - Thread-safe operations - Comprehensive error handling - Data validation and integrity checking """ import asyncio import logging import time import threading from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field from collections import deque import json import numpy as np from concurrent.futures import ThreadPoolExecutor, as_completed import requests from .api_rate_limiter import get_rate_limiter, RateLimitConfig logger = logging.getLogger(__name__) @dataclass class COBData: """Consolidated Order Book data structure""" symbol: str timestamp: datetime bids: List[Tuple[float, float]] # [(price, quantity), ...] asks: List[Tuple[float, float]] # [(price, quantity), ...] # Derived metrics spread: float = 0.0 mid_price: float = 0.0 total_bid_volume: float = 0.0 total_ask_volume: float = 0.0 # Data quality data_source: str = 'unknown' quality_score: float = 1.0 def __post_init__(self): """Calculate derived metrics""" if self.bids and self.asks: self.spread = self.asks[0][0] - self.bids[0][0] self.mid_price = (self.asks[0][0] + self.bids[0][0]) / 2 self.total_bid_volume = sum(qty for _, qty in self.bids) self.total_ask_volume = sum(qty for _, qty in self.asks) # Calculate quality score based on data completeness self.quality_score = min( len(self.bids) / 20, # Expect at least 20 bid levels len(self.asks) / 20, # Expect at least 20 ask levels 1.0 ) class RobustCOBProvider: """Robust COB provider with error handling and rate limiting""" def __init__(self, symbols: List[str] = None): self.symbols = symbols or ['ETHUSDT', 'BTCUSDT'] # Rate limiter self.rate_limiter = get_rate_limiter() # Thread safety self.lock = threading.RLock() # Data cache self.cob_cache: Dict[str, COBData] = {} self.cache_timestamps: Dict[str, datetime] = {} self.cache_ttl = timedelta(seconds=5) # 5 second cache TTL # Error tracking self.error_counts: Dict[str, int] = {} self.last_successful_fetch: Dict[str, datetime] = {} # Background fetching self.is_running = False self.fetch_threads: Dict[str, threading.Thread] = {} self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="COB-Fetcher") # Fallback data self.fallback_data: Dict[str, COBData] = {} # Performance tracking self.fetch_stats = { 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'rate_limited_requests': 0, 'cache_hits': 0, 'fallback_uses': 0 } logger.info(f"Robust COB Provider initialized for symbols: {self.symbols}") def start_background_fetching(self): """Start background COB data fetching""" if self.is_running: logger.warning("Background fetching already running") return self.is_running = True # Start fetching thread for each symbol for symbol in self.symbols: thread = threading.Thread( target=self._background_fetch_worker, args=(symbol,), name=f"COB-{symbol}", daemon=True ) self.fetch_threads[symbol] = thread thread.start() logger.info(f"Started background COB fetching for {len(self.symbols)} symbols") def stop_background_fetching(self): """Stop background COB data fetching""" self.is_running = False # Wait for threads to finish for symbol, thread in self.fetch_threads.items(): thread.join(timeout=5) logger.debug(f"Stopped COB fetching for {symbol}") # Shutdown executor self.executor.shutdown(wait=True, timeout=10) logger.info("Stopped background COB fetching") def _background_fetch_worker(self, symbol: str): """Background worker for fetching COB data""" logger.info(f"Started COB fetching worker for {symbol}") while self.is_running: try: # Fetch COB data cob_data = self._fetch_cob_data_safe(symbol) if cob_data: with self.lock: self.cob_cache[symbol] = cob_data self.cache_timestamps[symbol] = datetime.now() self.last_successful_fetch[symbol] = datetime.now() self.error_counts[symbol] = 0 # Reset error count on success logger.debug(f"Updated COB cache for {symbol}") else: with self.lock: self.error_counts[symbol] = self.error_counts.get(symbol, 0) + 1 logger.debug(f"Failed to fetch COB for {symbol}, error count: {self.error_counts.get(symbol, 0)}") # Wait before next fetch (adaptive based on errors) error_count = self.error_counts.get(symbol, 0) base_interval = 2.0 # Base 2 second interval backoff_interval = min(base_interval * (2 ** min(error_count, 5)), 60.0) # Max 60s time.sleep(backoff_interval) except Exception as e: logger.error(f"Error in COB fetching worker for {symbol}: {e}") time.sleep(10) # Wait 10s on unexpected errors logger.info(f"Stopped COB fetching worker for {symbol}") def _fetch_cob_data_safe(self, symbol: str) -> Optional[COBData]: """Safely fetch COB data with error handling""" try: self.fetch_stats['total_requests'] += 1 # Try Binance first cob_data = self._fetch_binance_cob(symbol) if cob_data: self.fetch_stats['successful_requests'] += 1 return cob_data # Try MEXC as fallback cob_data = self._fetch_mexc_cob(symbol) if cob_data: self.fetch_stats['successful_requests'] += 1 cob_data.data_source = 'mexc_fallback' return cob_data # Use cached fallback data if available if symbol in self.fallback_data: self.fetch_stats['fallback_uses'] += 1 fallback = self.fallback_data[symbol] fallback.timestamp = datetime.now() fallback.data_source = 'fallback_cache' fallback.quality_score *= 0.5 # Reduce quality score for old data return fallback self.fetch_stats['failed_requests'] += 1 return None except Exception as e: logger.error(f"Error fetching COB data for {symbol}: {e}") self.fetch_stats['failed_requests'] += 1 return None def _fetch_binance_cob(self, symbol: str) -> Optional[COBData]: """Fetch COB data from Binance with rate limiting""" try: url = f"https://api.binance.com/api/v3/depth" params = { 'symbol': symbol, 'limit': 100 # Get 100 levels } # Use rate limiter response = self.rate_limiter.make_request( 'binance_api', url, method='GET', params=params ) if not response: self.fetch_stats['rate_limited_requests'] += 1 return None if response.status_code != 200: logger.warning(f"Binance COB API returned {response.status_code} for {symbol}") return None data = response.json() # Parse order book data bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])] asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])] if not bids or not asks: logger.warning(f"Empty order book data from Binance for {symbol}") return None cob_data = COBData( symbol=symbol, timestamp=datetime.now(), bids=bids, asks=asks, data_source='binance' ) # Store as fallback for future use self.fallback_data[symbol] = cob_data return cob_data except Exception as e: logger.error(f"Error fetching Binance COB for {symbol}: {e}") return None def _fetch_mexc_cob(self, symbol: str) -> Optional[COBData]: """Fetch COB data from MEXC as fallback""" try: url = f"https://api.mexc.com/api/v3/depth" params = { 'symbol': symbol, 'limit': 100 } response = self.rate_limiter.make_request( 'mexc_api', url, method='GET', params=params ) if not response or response.status_code != 200: return None data = response.json() # Parse order book data bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])] asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])] if not bids or not asks: return None return COBData( symbol=symbol, timestamp=datetime.now(), bids=bids, asks=asks, data_source='mexc' ) except Exception as e: logger.debug(f"Error fetching MEXC COB for {symbol}: {e}") return None def get_cob_data(self, symbol: str) -> Optional[COBData]: """Get COB data for a symbol (from cache or fresh fetch)""" with self.lock: # Check cache first if symbol in self.cob_cache: cached_data = self.cob_cache[symbol] cache_time = self.cache_timestamps.get(symbol, datetime.min) # Return cached data if still fresh if datetime.now() - cache_time < self.cache_ttl: self.fetch_stats['cache_hits'] += 1 return cached_data # If background fetching is running, return cached data even if stale if self.is_running and symbol in self.cob_cache: return self.cob_cache[symbol] # Fetch fresh data if not running background fetching if not self.is_running: return self._fetch_cob_data_safe(symbol) return None def get_cob_features(self, symbol: str, feature_count: int = 120) -> Optional[np.ndarray]: """ Get COB features for ML models Args: symbol: Trading symbol feature_count: Number of features to return Returns: Numpy array of COB features or None if no data """ cob_data = self.get_cob_data(symbol) if not cob_data: return None try: features = [] # Basic market metrics features.extend([ cob_data.mid_price, cob_data.spread, cob_data.total_bid_volume, cob_data.total_ask_volume, cob_data.quality_score ]) # Bid levels (price and volume) max_levels = min(len(cob_data.bids), 20) for i in range(max_levels): price, volume = cob_data.bids[i] features.extend([price, volume]) # Pad bid levels if needed for i in range(max_levels, 20): features.extend([0.0, 0.0]) # Ask levels (price and volume) max_levels = min(len(cob_data.asks), 20) for i in range(max_levels): price, volume = cob_data.asks[i] features.extend([price, volume]) # Pad ask levels if needed for i in range(max_levels, 20): features.extend([0.0, 0.0]) # Calculate additional features if len(cob_data.bids) > 0 and len(cob_data.asks) > 0: # Volume imbalance bid_volume_5 = sum(vol for _, vol in cob_data.bids[:5]) ask_volume_5 = sum(vol for _, vol in cob_data.asks[:5]) volume_imbalance = (bid_volume_5 - ask_volume_5) / (bid_volume_5 + ask_volume_5) if (bid_volume_5 + ask_volume_5) > 0 else 0 features.append(volume_imbalance) # Price levels bid_price_levels = [price for price, _ in cob_data.bids[:10]] ask_price_levels = [price for price, _ in cob_data.asks[:10]] features.extend(bid_price_levels + ask_price_levels) # Pad or truncate to desired feature count if len(features) < feature_count: features.extend([0.0] * (feature_count - len(features))) else: features = features[:feature_count] return np.array(features, dtype=np.float32) except Exception as e: logger.error(f"Error creating COB features for {symbol}: {e}") return None def get_provider_status(self) -> Dict[str, Any]: """Get provider status and statistics""" with self.lock: status = { 'is_running': self.is_running, 'symbols': self.symbols, 'cache_status': {}, 'error_counts': self.error_counts.copy(), 'last_successful_fetch': { symbol: timestamp.isoformat() for symbol, timestamp in self.last_successful_fetch.items() }, 'fetch_stats': self.fetch_stats.copy(), 'rate_limiter_status': self.rate_limiter.get_all_endpoint_status() } # Cache status for each symbol for symbol in self.symbols: cache_time = self.cache_timestamps.get(symbol) status['cache_status'][symbol] = { 'has_data': symbol in self.cob_cache, 'cache_time': cache_time.isoformat() if cache_time else None, 'cache_age_seconds': (datetime.now() - cache_time).total_seconds() if cache_time else None, 'data_quality': self.cob_cache[symbol].quality_score if symbol in self.cob_cache else 0.0 } return status def reset_errors(self): """Reset error counts and rate limiter""" with self.lock: self.error_counts.clear() self.rate_limiter.reset_all_endpoints() logger.info("Reset all error counts and rate limiter") def force_refresh(self, symbol: str = None): """Force refresh COB data for symbol(s)""" symbols_to_refresh = [symbol] if symbol else self.symbols for sym in symbols_to_refresh: # Clear cache to force refresh with self.lock: if sym in self.cob_cache: del self.cob_cache[sym] if sym in self.cache_timestamps: del self.cache_timestamps[sym] logger.info(f"Forced refresh for {sym}") # Global COB provider instance _global_cob_provider = None def get_cob_provider(symbols: List[str] = None) -> RobustCOBProvider: """Get global COB provider instance""" global _global_cob_provider if _global_cob_provider is None: _global_cob_provider = RobustCOBProvider(symbols) return _global_cob_provider