Files
gogo2/core/robust_cob_provider.py
Dobromir Popov 12865fd3ef replay system
2025-07-20 12:37:02 +03:00

460 lines
17 KiB
Python

"""
Robust COB (Consolidated Order Book) Provider
This module provides a robust COB data provider that handles:
- HTTP 418 errors from Binance (rate limiting)
- Thread safety issues
- API rate limiting and backoff
- Fallback data sources
- Error recovery strategies
Features:
- Automatic rate limiting and backoff
- Multiple exchange support with fallbacks
- Thread-safe operations
- Comprehensive error handling
- Data validation and integrity checking
"""
import asyncio
import logging
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
from collections import deque
import json
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from .api_rate_limiter import get_rate_limiter, RateLimitConfig
logger = logging.getLogger(__name__)
@dataclass
class COBData:
"""Consolidated Order Book data structure"""
symbol: str
timestamp: datetime
bids: List[Tuple[float, float]] # [(price, quantity), ...]
asks: List[Tuple[float, float]] # [(price, quantity), ...]
# Derived metrics
spread: float = 0.0
mid_price: float = 0.0
total_bid_volume: float = 0.0
total_ask_volume: float = 0.0
# Data quality
data_source: str = 'unknown'
quality_score: float = 1.0
def __post_init__(self):
"""Calculate derived metrics"""
if self.bids and self.asks:
self.spread = self.asks[0][0] - self.bids[0][0]
self.mid_price = (self.asks[0][0] + self.bids[0][0]) / 2
self.total_bid_volume = sum(qty for _, qty in self.bids)
self.total_ask_volume = sum(qty for _, qty in self.asks)
# Calculate quality score based on data completeness
self.quality_score = min(
len(self.bids) / 20, # Expect at least 20 bid levels
len(self.asks) / 20, # Expect at least 20 ask levels
1.0
)
class RobustCOBProvider:
"""Robust COB provider with error handling and rate limiting"""
def __init__(self, symbols: List[str] = None):
self.symbols = symbols or ['ETHUSDT', 'BTCUSDT']
# Rate limiter
self.rate_limiter = get_rate_limiter()
# Thread safety
self.lock = threading.RLock()
# Data cache
self.cob_cache: Dict[str, COBData] = {}
self.cache_timestamps: Dict[str, datetime] = {}
self.cache_ttl = timedelta(seconds=5) # 5 second cache TTL
# Error tracking
self.error_counts: Dict[str, int] = {}
self.last_successful_fetch: Dict[str, datetime] = {}
# Background fetching
self.is_running = False
self.fetch_threads: Dict[str, threading.Thread] = {}
self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="COB-Fetcher")
# Fallback data
self.fallback_data: Dict[str, COBData] = {}
# Performance tracking
self.fetch_stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'rate_limited_requests': 0,
'cache_hits': 0,
'fallback_uses': 0
}
logger.info(f"Robust COB Provider initialized for symbols: {self.symbols}")
def start_background_fetching(self):
"""Start background COB data fetching"""
if self.is_running:
logger.warning("Background fetching already running")
return
self.is_running = True
# Start fetching thread for each symbol
for symbol in self.symbols:
thread = threading.Thread(
target=self._background_fetch_worker,
args=(symbol,),
name=f"COB-{symbol}",
daemon=True
)
self.fetch_threads[symbol] = thread
thread.start()
logger.info(f"Started background COB fetching for {len(self.symbols)} symbols")
def stop_background_fetching(self):
"""Stop background COB data fetching"""
self.is_running = False
# Wait for threads to finish
for symbol, thread in self.fetch_threads.items():
thread.join(timeout=5)
logger.debug(f"Stopped COB fetching for {symbol}")
# Shutdown executor
self.executor.shutdown(wait=True, timeout=10)
logger.info("Stopped background COB fetching")
def _background_fetch_worker(self, symbol: str):
"""Background worker for fetching COB data"""
logger.info(f"Started COB fetching worker for {symbol}")
while self.is_running:
try:
# Fetch COB data
cob_data = self._fetch_cob_data_safe(symbol)
if cob_data:
with self.lock:
self.cob_cache[symbol] = cob_data
self.cache_timestamps[symbol] = datetime.now()
self.last_successful_fetch[symbol] = datetime.now()
self.error_counts[symbol] = 0 # Reset error count on success
logger.debug(f"Updated COB cache for {symbol}")
else:
with self.lock:
self.error_counts[symbol] = self.error_counts.get(symbol, 0) + 1
logger.debug(f"Failed to fetch COB for {symbol}, error count: {self.error_counts.get(symbol, 0)}")
# Wait before next fetch (adaptive based on errors)
error_count = self.error_counts.get(symbol, 0)
base_interval = 2.0 # Base 2 second interval
backoff_interval = min(base_interval * (2 ** min(error_count, 5)), 60.0) # Max 60s
time.sleep(backoff_interval)
except Exception as e:
logger.error(f"Error in COB fetching worker for {symbol}: {e}")
time.sleep(10) # Wait 10s on unexpected errors
logger.info(f"Stopped COB fetching worker for {symbol}")
def _fetch_cob_data_safe(self, symbol: str) -> Optional[COBData]:
"""Safely fetch COB data with error handling"""
try:
self.fetch_stats['total_requests'] += 1
# Try Binance first
cob_data = self._fetch_binance_cob(symbol)
if cob_data:
self.fetch_stats['successful_requests'] += 1
return cob_data
# Try MEXC as fallback
cob_data = self._fetch_mexc_cob(symbol)
if cob_data:
self.fetch_stats['successful_requests'] += 1
cob_data.data_source = 'mexc_fallback'
return cob_data
# Use cached fallback data if available
if symbol in self.fallback_data:
self.fetch_stats['fallback_uses'] += 1
fallback = self.fallback_data[symbol]
fallback.timestamp = datetime.now()
fallback.data_source = 'fallback_cache'
fallback.quality_score *= 0.5 # Reduce quality score for old data
return fallback
self.fetch_stats['failed_requests'] += 1
return None
except Exception as e:
logger.error(f"Error fetching COB data for {symbol}: {e}")
self.fetch_stats['failed_requests'] += 1
return None
def _fetch_binance_cob(self, symbol: str) -> Optional[COBData]:
"""Fetch COB data from Binance with rate limiting"""
try:
url = f"https://api.binance.com/api/v3/depth"
params = {
'symbol': symbol,
'limit': 100 # Get 100 levels
}
# Use rate limiter
response = self.rate_limiter.make_request(
'binance_api',
url,
method='GET',
params=params
)
if not response:
self.fetch_stats['rate_limited_requests'] += 1
return None
if response.status_code != 200:
logger.warning(f"Binance COB API returned {response.status_code} for {symbol}")
return None
data = response.json()
# Parse order book data
bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])]
asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])]
if not bids or not asks:
logger.warning(f"Empty order book data from Binance for {symbol}")
return None
cob_data = COBData(
symbol=symbol,
timestamp=datetime.now(),
bids=bids,
asks=asks,
data_source='binance'
)
# Store as fallback for future use
self.fallback_data[symbol] = cob_data
return cob_data
except Exception as e:
logger.error(f"Error fetching Binance COB for {symbol}: {e}")
return None
def _fetch_mexc_cob(self, symbol: str) -> Optional[COBData]:
"""Fetch COB data from MEXC as fallback"""
try:
url = f"https://api.mexc.com/api/v3/depth"
params = {
'symbol': symbol,
'limit': 100
}
response = self.rate_limiter.make_request(
'mexc_api',
url,
method='GET',
params=params
)
if not response or response.status_code != 200:
return None
data = response.json()
# Parse order book data
bids = [(float(price), float(qty)) for price, qty in data.get('bids', [])]
asks = [(float(price), float(qty)) for price, qty in data.get('asks', [])]
if not bids or not asks:
return None
return COBData(
symbol=symbol,
timestamp=datetime.now(),
bids=bids,
asks=asks,
data_source='mexc'
)
except Exception as e:
logger.debug(f"Error fetching MEXC COB for {symbol}: {e}")
return None
def get_cob_data(self, symbol: str) -> Optional[COBData]:
"""Get COB data for a symbol (from cache or fresh fetch)"""
with self.lock:
# Check cache first
if symbol in self.cob_cache:
cached_data = self.cob_cache[symbol]
cache_time = self.cache_timestamps.get(symbol, datetime.min)
# Return cached data if still fresh
if datetime.now() - cache_time < self.cache_ttl:
self.fetch_stats['cache_hits'] += 1
return cached_data
# If background fetching is running, return cached data even if stale
if self.is_running and symbol in self.cob_cache:
return self.cob_cache[symbol]
# Fetch fresh data if not running background fetching
if not self.is_running:
return self._fetch_cob_data_safe(symbol)
return None
def get_cob_features(self, symbol: str, feature_count: int = 120) -> Optional[np.ndarray]:
"""
Get COB features for ML models
Args:
symbol: Trading symbol
feature_count: Number of features to return
Returns:
Numpy array of COB features or None if no data
"""
cob_data = self.get_cob_data(symbol)
if not cob_data:
return None
try:
features = []
# Basic market metrics
features.extend([
cob_data.mid_price,
cob_data.spread,
cob_data.total_bid_volume,
cob_data.total_ask_volume,
cob_data.quality_score
])
# Bid levels (price and volume)
max_levels = min(len(cob_data.bids), 20)
for i in range(max_levels):
price, volume = cob_data.bids[i]
features.extend([price, volume])
# Pad bid levels if needed
for i in range(max_levels, 20):
features.extend([0.0, 0.0])
# Ask levels (price and volume)
max_levels = min(len(cob_data.asks), 20)
for i in range(max_levels):
price, volume = cob_data.asks[i]
features.extend([price, volume])
# Pad ask levels if needed
for i in range(max_levels, 20):
features.extend([0.0, 0.0])
# Calculate additional features
if len(cob_data.bids) > 0 and len(cob_data.asks) > 0:
# Volume imbalance
bid_volume_5 = sum(vol for _, vol in cob_data.bids[:5])
ask_volume_5 = sum(vol for _, vol in cob_data.asks[:5])
volume_imbalance = (bid_volume_5 - ask_volume_5) / (bid_volume_5 + ask_volume_5) if (bid_volume_5 + ask_volume_5) > 0 else 0
features.append(volume_imbalance)
# Price levels
bid_price_levels = [price for price, _ in cob_data.bids[:10]]
ask_price_levels = [price for price, _ in cob_data.asks[:10]]
features.extend(bid_price_levels + ask_price_levels)
# Pad or truncate to desired feature count
if len(features) < feature_count:
features.extend([0.0] * (feature_count - len(features)))
else:
features = features[:feature_count]
return np.array(features, dtype=np.float32)
except Exception as e:
logger.error(f"Error creating COB features for {symbol}: {e}")
return None
def get_provider_status(self) -> Dict[str, Any]:
"""Get provider status and statistics"""
with self.lock:
status = {
'is_running': self.is_running,
'symbols': self.symbols,
'cache_status': {},
'error_counts': self.error_counts.copy(),
'last_successful_fetch': {
symbol: timestamp.isoformat()
for symbol, timestamp in self.last_successful_fetch.items()
},
'fetch_stats': self.fetch_stats.copy(),
'rate_limiter_status': self.rate_limiter.get_all_endpoint_status()
}
# Cache status for each symbol
for symbol in self.symbols:
cache_time = self.cache_timestamps.get(symbol)
status['cache_status'][symbol] = {
'has_data': symbol in self.cob_cache,
'cache_time': cache_time.isoformat() if cache_time else None,
'cache_age_seconds': (datetime.now() - cache_time).total_seconds() if cache_time else None,
'data_quality': self.cob_cache[symbol].quality_score if symbol in self.cob_cache else 0.0
}
return status
def reset_errors(self):
"""Reset error counts and rate limiter"""
with self.lock:
self.error_counts.clear()
self.rate_limiter.reset_all_endpoints()
logger.info("Reset all error counts and rate limiter")
def force_refresh(self, symbol: str = None):
"""Force refresh COB data for symbol(s)"""
symbols_to_refresh = [symbol] if symbol else self.symbols
for sym in symbols_to_refresh:
# Clear cache to force refresh
with self.lock:
if sym in self.cob_cache:
del self.cob_cache[sym]
if sym in self.cache_timestamps:
del self.cache_timestamps[sym]
logger.info(f"Forced refresh for {sym}")
# Global COB provider instance
_global_cob_provider = None
def get_cob_provider(symbols: List[str] = None) -> RobustCOBProvider:
"""Get global COB provider instance"""
global _global_cob_provider
if _global_cob_provider is None:
_global_cob_provider = RobustCOBProvider(symbols)
return _global_cob_provider