460 lines
17 KiB
Python
460 lines
17 KiB
Python
"""
|
|
Robust COB (Consolidated Order Book) Provider
|
|
|
|
This module provides a robust COB data provider that handles:
|
|
- HTTP 418 errors from Binance (rate limiting)
|
|
- Thread safety issues
|
|
- API rate limiting and backoff
|
|
- Fallback data sources
|
|
- Error recovery strategies
|
|
|
|
Features:
|
|
- Automatic rate limiting and backoff
|
|
- Multiple exchange support with fallbacks
|
|
- Thread-safe operations
|
|
- Comprehensive error handling
|
|
- Data validation and integrity checking
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
import json
|
|
import numpy as np
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import requests
|
|
|
|
from .api_rate_limiter import get_rate_limiter, RateLimitConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class COBData:
|
|
"""Consolidated Order Book data structure"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
bids: List[Tuple[float, float]] # [(price, quantity), ...]
|
|
asks: List[Tuple[float, float]] # [(price, quantity), ...]
|
|
|
|
# Derived metrics
|
|
spread: float = 0.0
|
|
mid_price: float = 0.0
|
|
total_bid_volume: float = 0.0
|
|
total_ask_volume: float = 0.0
|
|
|
|
# Data quality
|
|
data_source: str = 'unknown'
|
|
quality_score: float = 1.0
|
|
|
|
def __post_init__(self):
|
|
"""Calculate derived metrics"""
|
|
if self.bids and self.asks:
|
|
self.spread = self.asks[0][0] - self.bids[0][0]
|
|
self.mid_price = (self.asks[0][0] + self.bids[0][0]) / 2
|
|
self.total_bid_volume = sum(qty for _, qty in self.bids)
|
|
self.total_ask_volume = sum(qty for _, qty in self.asks)
|
|
|
|
# Calculate quality score based on data completeness
|
|
self.quality_score = min(
|
|
len(self.bids) / 20, # Expect at least 20 bid levels
|
|
len(self.asks) / 20, # Expect at least 20 ask levels
|
|
1.0
|
|
)
|
|
|
|
class RobustCOBProvider:
|
|
"""Robust COB provider with error handling and rate limiting"""
|
|
|
|
def __init__(self, symbols: List[str] = None):
|
|
self.symbols = symbols or ['ETHUSDT', 'BTCUSDT']
|
|
|
|
# Rate limiter
|
|
self.rate_limiter = get_rate_limiter()
|
|
|
|
# Thread safety
|
|
self.lock = threading.RLock()
|
|
|
|
# Data cache
|
|
self.cob_cache: Dict[str, COBData] = {}
|
|
self.cache_timestamps: Dict[str, datetime] = {}
|
|
self.cache_ttl = timedelta(seconds=5) # 5 second cache TTL
|
|
|
|
# Error tracking
|
|
self.error_counts: Dict[str, int] = {}
|
|
self.last_successful_fetch: Dict[str, datetime] = {}
|
|
|
|
# Background fetching
|
|
self.is_running = False
|
|
self.fetch_threads: Dict[str, threading.Thread] = {}
|
|
self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="COB-Fetcher")
|
|
|
|
# Fallback data
|
|
self.fallback_data: Dict[str, COBData] = {}
|
|
|
|
# Performance tracking
|
|
self.fetch_stats = {
|
|
'total_requests': 0,
|
|
'successful_requests': 0,
|
|
'failed_requests': 0,
|
|
'rate_limited_requests': 0,
|
|
'cache_hits': 0,
|
|
'fallback_uses': 0
|
|
}
|
|
|
|
logger.info(f"Robust COB Provider initialized for symbols: {self.symbols}")
|
|
|
|
def start_background_fetching(self):
|
|
"""Start background COB data fetching"""
|
|
if self.is_running:
|
|
logger.warning("Background fetching already running")
|
|
return
|
|
|
|
self.is_running = True
|
|
|
|
# Start fetching thread for each symbol
|
|
for symbol in self.symbols:
|
|
thread = threading.Thread(
|
|
target=self._background_fetch_worker,
|
|
args=(symbol,),
|
|
name=f"COB-{symbol}",
|
|
daemon=True
|
|
)
|
|
self.fetch_threads[symbol] = thread
|
|
thread.start()
|
|
|
|
logger.info(f"Started background COB fetching for {len(self.symbols)} symbols")
|
|
|
|
def stop_background_fetching(self):
|
|
"""Stop background COB data fetching"""
|
|
self.is_running = False
|
|
|
|
# Wait for threads to finish
|
|
for symbol, thread in self.fetch_threads.items():
|
|
thread.join(timeout=5)
|
|
logger.debug(f"Stopped COB fetching for {symbol}")
|
|
|
|
# Shutdown executor
|
|
self.executor.shutdown(wait=True, timeout=10)
|
|
|
|
logger.info("Stopped background COB fetching")
|
|
|
|
def _background_fetch_worker(self, symbol: str):
|
|
"""Background worker for fetching COB data"""
|
|
logger.info(f"Started COB fetching worker for {symbol}")
|
|
|
|
while self.is_running:
|
|
try:
|
|
# Fetch COB data
|
|
cob_data = self._fetch_cob_data_safe(symbol)
|
|
|
|
if cob_data:
|
|
with self.lock:
|
|
self.cob_cache[symbol] = cob_data
|
|
self.cache_timestamps[symbol] = datetime.now()
|
|
self.last_successful_fetch[symbol] = datetime.now()
|
|
self.error_counts[symbol] = 0 # Reset error count on success
|
|
|
|
logger.debug(f"Updated COB cache for {symbol}")
|
|
else:
|
|
with self.lock:
|
|
self.error_counts[symbol] = self.error_counts.get(symbol, 0) + 1
|
|
|
|
logger.debug(f"Failed to fetch COB for {symbol}, error count: {self.error_counts.get(symbol, 0)}")
|
|
|
|
# Wait before next fetch (adaptive based on errors)
|
|
error_count = self.error_counts.get(symbol, 0)
|
|
base_interval = 2.0 # Base 2 second interval
|
|
backoff_interval = min(base_interval * (2 ** min(error_count, 5)), 60.0) # Max 60s
|
|
|
|
time.sleep(backoff_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in COB fetching worker for {symbol}: {e}")
|
|
time.sleep(10) # Wait 10s on unexpected errors
|
|
|
|
logger.info(f"Stopped COB fetching worker for {symbol}")
|
|
|
|
def _fetch_cob_data_safe(self, symbol: str) -> Optional[COBData]:
|
|
"""Safely fetch COB data with error handling"""
|
|
try:
|
|
self.fetch_stats['total_requests'] += 1
|
|
|
|
# Try Binance first
|
|
cob_data = self._fetch_binance_cob(symbol)
|
|
if cob_data:
|
|
self.fetch_stats['successful_requests'] += 1
|
|
return cob_data
|
|
|
|
# Try MEXC as fallback
|
|
cob_data = self._fetch_mexc_cob(symbol)
|
|
if cob_data:
|
|
self.fetch_stats['successful_requests'] += 1
|
|
cob_data.data_source = 'mexc_fallback'
|
|
return cob_data
|
|
|
|
# Use cached fallback data if available
|
|
if symbol in self.fallback_data:
|
|
self.fetch_stats['fallback_uses'] += 1
|
|
fallback = self.fallback_data[symbol]
|
|
fallback.timestamp = datetime.now()
|
|
fallback.data_source = 'fallback_cache'
|
|
fallback.quality_score *= 0.5 # Reduce quality score for old data
|
|
return fallback
|
|
|
|
self.fetch_stats['failed_requests'] += 1
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching COB data for {symbol}: {e}")
|
|
self.fetch_stats['failed_requests'] += 1
|
|
return None
|
|
|
|
def _fetch_binance_cob(self, symbol: str) -> Optional[COBData]:
|
|
"""Fetch COB data from Binance with rate limiting"""
|
|
try:
|
|
url = f"https://api.binance.com/api/v3/depth"
|
|
params = {
|
|
'symbol': symbol,
|
|
'limit': 100 # Get 100 levels
|
|
}
|
|
|
|
# Use rate limiter
|
|
response = self.rate_limiter.make_request(
|
|
'binance_api',
|
|
url,
|
|
method='GET',
|
|
params=params
|
|
)
|
|
|
|
if not response:
|
|
self.fetch_stats['rate_limited_requests'] += 1
|
|
return None
|
|
|
|
if response.status_code != 200:
|
|
logger.warning(f"Binance COB API returned {response.status_code} for {symbol}")
|
|
return None
|
|
|
|
data = response.json()
|
|
|
|
# Parse order book data
|
|
bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])]
|
|
asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])]
|
|
|
|
if not bids or not asks:
|
|
logger.warning(f"Empty order book data from Binance for {symbol}")
|
|
return None
|
|
|
|
cob_data = COBData(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(),
|
|
bids=bids,
|
|
asks=asks,
|
|
data_source='binance'
|
|
)
|
|
|
|
# Store as fallback for future use
|
|
self.fallback_data[symbol] = cob_data
|
|
|
|
return cob_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Binance COB for {symbol}: {e}")
|
|
return None
|
|
|
|
def _fetch_mexc_cob(self, symbol: str) -> Optional[COBData]:
|
|
"""Fetch COB data from MEXC as fallback"""
|
|
try:
|
|
url = f"https://api.mexc.com/api/v3/depth"
|
|
params = {
|
|
'symbol': symbol,
|
|
'limit': 100
|
|
}
|
|
|
|
response = self.rate_limiter.make_request(
|
|
'mexc_api',
|
|
url,
|
|
method='GET',
|
|
params=params
|
|
)
|
|
|
|
if not response or response.status_code != 200:
|
|
return None
|
|
|
|
data = response.json()
|
|
|
|
# Parse order book data
|
|
bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])]
|
|
asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])]
|
|
|
|
if not bids or not asks:
|
|
return None
|
|
|
|
return COBData(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(),
|
|
bids=bids,
|
|
asks=asks,
|
|
data_source='mexc'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error fetching MEXC COB for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_cob_data(self, symbol: str) -> Optional[COBData]:
|
|
"""Get COB data for a symbol (from cache or fresh fetch)"""
|
|
with self.lock:
|
|
# Check cache first
|
|
if symbol in self.cob_cache:
|
|
cached_data = self.cob_cache[symbol]
|
|
cache_time = self.cache_timestamps.get(symbol, datetime.min)
|
|
|
|
# Return cached data if still fresh
|
|
if datetime.now() - cache_time < self.cache_ttl:
|
|
self.fetch_stats['cache_hits'] += 1
|
|
return cached_data
|
|
|
|
# If background fetching is running, return cached data even if stale
|
|
if self.is_running and symbol in self.cob_cache:
|
|
return self.cob_cache[symbol]
|
|
|
|
# Fetch fresh data if not running background fetching
|
|
if not self.is_running:
|
|
return self._fetch_cob_data_safe(symbol)
|
|
|
|
return None
|
|
|
|
def get_cob_features(self, symbol: str, feature_count: int = 120) -> Optional[np.ndarray]:
|
|
"""
|
|
Get COB features for ML models
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
feature_count: Number of features to return
|
|
|
|
Returns:
|
|
Numpy array of COB features or None if no data
|
|
"""
|
|
cob_data = self.get_cob_data(symbol)
|
|
if not cob_data:
|
|
return None
|
|
|
|
try:
|
|
features = []
|
|
|
|
# Basic market metrics
|
|
features.extend([
|
|
cob_data.mid_price,
|
|
cob_data.spread,
|
|
cob_data.total_bid_volume,
|
|
cob_data.total_ask_volume,
|
|
cob_data.quality_score
|
|
])
|
|
|
|
# Bid levels (price and volume)
|
|
max_levels = min(len(cob_data.bids), 20)
|
|
for i in range(max_levels):
|
|
price, volume = cob_data.bids[i]
|
|
features.extend([price, volume])
|
|
|
|
# Pad bid levels if needed
|
|
for i in range(max_levels, 20):
|
|
features.extend([0.0, 0.0])
|
|
|
|
# Ask levels (price and volume)
|
|
max_levels = min(len(cob_data.asks), 20)
|
|
for i in range(max_levels):
|
|
price, volume = cob_data.asks[i]
|
|
features.extend([price, volume])
|
|
|
|
# Pad ask levels if needed
|
|
for i in range(max_levels, 20):
|
|
features.extend([0.0, 0.0])
|
|
|
|
# Calculate additional features
|
|
if len(cob_data.bids) > 0 and len(cob_data.asks) > 0:
|
|
# Volume imbalance
|
|
bid_volume_5 = sum(vol for _, vol in cob_data.bids[:5])
|
|
ask_volume_5 = sum(vol for _, vol in cob_data.asks[:5])
|
|
volume_imbalance = (bid_volume_5 - ask_volume_5) / (bid_volume_5 + ask_volume_5) if (bid_volume_5 + ask_volume_5) > 0 else 0
|
|
features.append(volume_imbalance)
|
|
|
|
# Price levels
|
|
bid_price_levels = [price for price, _ in cob_data.bids[:10]]
|
|
ask_price_levels = [price for price, _ in cob_data.asks[:10]]
|
|
features.extend(bid_price_levels + ask_price_levels)
|
|
|
|
# Pad or truncate to desired feature count
|
|
if len(features) < feature_count:
|
|
features.extend([0.0] * (feature_count - len(features)))
|
|
else:
|
|
features = features[:feature_count]
|
|
|
|
return np.array(features, dtype=np.float32)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating COB features for {symbol}: {e}")
|
|
return None
|
|
|
|
def get_provider_status(self) -> Dict[str, Any]:
|
|
"""Get provider status and statistics"""
|
|
with self.lock:
|
|
status = {
|
|
'is_running': self.is_running,
|
|
'symbols': self.symbols,
|
|
'cache_status': {},
|
|
'error_counts': self.error_counts.copy(),
|
|
'last_successful_fetch': {
|
|
symbol: timestamp.isoformat()
|
|
for symbol, timestamp in self.last_successful_fetch.items()
|
|
},
|
|
'fetch_stats': self.fetch_stats.copy(),
|
|
'rate_limiter_status': self.rate_limiter.get_all_endpoint_status()
|
|
}
|
|
|
|
# Cache status for each symbol
|
|
for symbol in self.symbols:
|
|
cache_time = self.cache_timestamps.get(symbol)
|
|
status['cache_status'][symbol] = {
|
|
'has_data': symbol in self.cob_cache,
|
|
'cache_time': cache_time.isoformat() if cache_time else None,
|
|
'cache_age_seconds': (datetime.now() - cache_time).total_seconds() if cache_time else None,
|
|
'data_quality': self.cob_cache[symbol].quality_score if symbol in self.cob_cache else 0.0
|
|
}
|
|
|
|
return status
|
|
|
|
def reset_errors(self):
|
|
"""Reset error counts and rate limiter"""
|
|
with self.lock:
|
|
self.error_counts.clear()
|
|
self.rate_limiter.reset_all_endpoints()
|
|
logger.info("Reset all error counts and rate limiter")
|
|
|
|
def force_refresh(self, symbol: str = None):
|
|
"""Force refresh COB data for symbol(s)"""
|
|
symbols_to_refresh = [symbol] if symbol else self.symbols
|
|
|
|
for sym in symbols_to_refresh:
|
|
# Clear cache to force refresh
|
|
with self.lock:
|
|
if sym in self.cob_cache:
|
|
del self.cob_cache[sym]
|
|
if sym in self.cache_timestamps:
|
|
del self.cache_timestamps[sym]
|
|
|
|
logger.info(f"Forced refresh for {sym}")
|
|
|
|
# Global COB provider instance
|
|
_global_cob_provider = None
|
|
|
|
def get_cob_provider(symbols: List[str] = None) -> RobustCOBProvider:
|
|
"""Get global COB provider instance"""
|
|
global _global_cob_provider
|
|
if _global_cob_provider is None:
|
|
_global_cob_provider = RobustCOBProvider(symbols)
|
|
return _global_cob_provider |