dash working
This commit is contained in:
@ -2653,6 +2653,7 @@ class DataProvider:
|
||||
# Significantly reduced frequency for REST API fallback only
|
||||
def collect_symbol_data(symbol):
|
||||
rest_api_fallback_count = 0
|
||||
last_rest_api_call = 0 # Track last REST API call time
|
||||
while self.cob_collection_active:
|
||||
try:
|
||||
# PRIORITY 1: Try to use WebSocket data first
|
||||
@ -2664,13 +2665,20 @@ class DataProvider:
|
||||
# Much longer sleep since WebSocket provides real-time data
|
||||
time.sleep(10.0) # Only check every 10 seconds when WS is working
|
||||
else:
|
||||
# FALLBACK: Only use REST API if WebSocket fails
|
||||
# FALLBACK: Only use REST API if WebSocket fails AND rate limit allows
|
||||
rest_api_fallback_count += 1
|
||||
if rest_api_fallback_count <= 3: # Limited fallback attempts
|
||||
logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}")
|
||||
self._collect_cob_data_for_symbol(symbol)
|
||||
current_time = time.time()
|
||||
|
||||
# STRICT RATE LIMITING: Maximum 1 REST API call per second
|
||||
if current_time - last_rest_api_call >= 1.0: # At least 1 second between calls
|
||||
if rest_api_fallback_count <= 3: # Limited fallback attempts
|
||||
logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}")
|
||||
self._collect_cob_data_for_symbol(symbol)
|
||||
last_rest_api_call = current_time # Update last call time
|
||||
else:
|
||||
logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)")
|
||||
else:
|
||||
logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)")
|
||||
logger.debug(f"Rate limiting REST API for {symbol} - waiting {1.0 - (current_time - last_rest_api_call):.1f}s")
|
||||
|
||||
# Much longer sleep when using REST API fallback
|
||||
time.sleep(30.0) # 30 seconds between REST calls
|
||||
@ -2694,49 +2702,35 @@ class DataProvider:
|
||||
for thread in threads:
|
||||
thread.join(timeout=1)
|
||||
|
||||
def _get_websocket_cob_data(self, symbol: str) -> Optional[dict]:
|
||||
"""Get COB data from WebSocket streams (rate limit free)"""
|
||||
def _get_websocket_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||
"""Get COB data from WebSocket streams (primary source)"""
|
||||
try:
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
# Check if we have WebSocket COB data available
|
||||
if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache:
|
||||
cached_data = self.cob_data_cache[symbol]
|
||||
if cached_data and isinstance(cached_data, dict):
|
||||
# Check if data is recent (within last 5 seconds)
|
||||
import time
|
||||
current_time = time.time()
|
||||
data_age = current_time - cached_data.get('timestamp', 0)
|
||||
|
||||
if data_age < 5.0: # Data is fresh
|
||||
logger.debug(f"Using WebSocket COB data for {symbol} (age: {data_age:.1f}s)")
|
||||
return cached_data
|
||||
else:
|
||||
logger.debug(f"WebSocket COB data for {symbol} is stale (age: {data_age:.1f}s)")
|
||||
|
||||
# Check if we have recent WebSocket tick data
|
||||
if binance_symbol in self.tick_buffers and len(self.tick_buffers[binance_symbol]) > 10:
|
||||
recent_ticks = list(self.tick_buffers[binance_symbol])[-50:] # Last 50 ticks
|
||||
|
||||
if recent_ticks:
|
||||
# Calculate COB data from WebSocket ticks
|
||||
latest_tick = recent_ticks[-1]
|
||||
|
||||
# Calculate bid/ask liquidity from recent tick patterns
|
||||
buy_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'buy')
|
||||
sell_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'sell')
|
||||
total_volume = buy_volume + sell_volume
|
||||
|
||||
# Calculate metrics
|
||||
imbalance = (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0
|
||||
avg_price = sum(tick.price for tick in recent_ticks) / len(recent_ticks)
|
||||
|
||||
# Create synthetic COB snapshot from WebSocket data
|
||||
cob_snapshot = {
|
||||
'symbol': symbol,
|
||||
'timestamp': datetime.now(),
|
||||
'source': 'websocket', # Mark as WebSocket source
|
||||
'stats': {
|
||||
'mid_price': latest_tick.price,
|
||||
'avg_price': avg_price,
|
||||
'imbalance': imbalance,
|
||||
'buy_volume': buy_volume,
|
||||
'sell_volume': sell_volume,
|
||||
'total_volume': total_volume,
|
||||
'tick_count': len(recent_ticks),
|
||||
'best_bid': latest_tick.price - 0.01, # Approximate
|
||||
'best_ask': latest_tick.price + 0.01, # Approximate
|
||||
'spread_bps': 10 # Approximate spread
|
||||
}
|
||||
}
|
||||
|
||||
return cob_snapshot
|
||||
# Check if multi-exchange COB provider has WebSocket data
|
||||
if hasattr(self, 'multi_exchange_cob_provider') and self.multi_exchange_cob_provider:
|
||||
try:
|
||||
cob_data = self.multi_exchange_cob_provider.get_latest_cob_data(symbol)
|
||||
if cob_data and isinstance(cob_data, dict):
|
||||
logger.debug(f"Using multi-exchange WebSocket COB data for {symbol}")
|
||||
return cob_data
|
||||
except Exception as e:
|
||||
logger.debug(f"Error getting multi-exchange COB data for {symbol}: {e}")
|
||||
|
||||
logger.debug(f"No WebSocket COB data available for {symbol}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
|
@ -159,187 +159,40 @@ class MultiExchangeCOBProvider:
|
||||
to create a consolidated view of market liquidity and pricing.
|
||||
"""
|
||||
|
||||
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
|
||||
"""
|
||||
Initialize Multi-Exchange COB Provider
|
||||
|
||||
Args:
|
||||
symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT'])
|
||||
bucket_size_bps: Price bucket size in basis points for fine-grain analysis
|
||||
"""
|
||||
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
|
||||
self.bucket_size_bps = bucket_size_bps
|
||||
self.bucket_update_frequency = 100 # ms
|
||||
self.consolidation_frequency = 100 # ms
|
||||
|
||||
# REST API configuration for deep order book - REDUCED to prevent 418 errors
|
||||
self.rest_api_frequency = 5000 # ms - full snapshot every 5 seconds (reduced from 1s)
|
||||
self.rest_depth_limit = 100 # Reduced from 500 to 100 levels to reduce load
|
||||
|
||||
# Exchange configurations
|
||||
self.exchange_configs = self._initialize_exchange_configs()
|
||||
|
||||
# Rate limiter for REST API calls
|
||||
self.rest_rate_limiter = SimpleRateLimiter(requests_per_second=2.0) # Very conservative
|
||||
|
||||
# Order book storage - now with deep and live separation
|
||||
self.exchange_order_books = {
|
||||
symbol: {
|
||||
exchange.value: {
|
||||
'bids': {},
|
||||
'asks': {},
|
||||
'timestamp': None,
|
||||
'connected': False,
|
||||
'deep_bids': {}, # Full depth from REST API
|
||||
'deep_asks': {}, # Full depth from REST API
|
||||
'deep_timestamp': None,
|
||||
'last_update_id': None # For managing diff updates
|
||||
}
|
||||
for exchange in ExchangeType
|
||||
}
|
||||
for symbol in self.symbols
|
||||
}
|
||||
|
||||
# Consolidated order books
|
||||
self.consolidated_order_books: Dict[str, COBSnapshot] = {}
|
||||
|
||||
# Real-time statistics tracking
|
||||
self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
||||
self.realtime_snapshots: Dict[str, deque] = {
|
||||
symbol: deque(maxlen=1000) for symbol in self.symbols
|
||||
}
|
||||
|
||||
# Session tracking for SVP
|
||||
self.session_start_time = datetime.now()
|
||||
self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols}
|
||||
self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
||||
|
||||
# Fixed USD bucket sizes for different symbols as requested
|
||||
self.fixed_usd_buckets = {
|
||||
'BTC/USDT': 10.0, # $10 buckets for BTC
|
||||
'ETH/USDT': 1.0, # $1 buckets for ETH
|
||||
}
|
||||
|
||||
# WebSocket management
|
||||
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
|
||||
"""Initialize multi-exchange COB provider"""
|
||||
self.symbols = symbols
|
||||
self.exchange_configs = exchange_configs
|
||||
self.active_exchanges = ['binance'] # Focus on Binance for now
|
||||
self.is_streaming = False
|
||||
self.active_exchanges = ['binance'] # Start with Binance only
|
||||
self.cob_data_cache = {} # Cache for COB data
|
||||
self.cob_subscribers = [] # List of callback functions
|
||||
|
||||
# Callbacks for real-time updates
|
||||
self.cob_update_callbacks = []
|
||||
self.bucket_update_callbacks = []
|
||||
# Rate limiting for REST API fallback
|
||||
self.last_rest_api_call = 0
|
||||
self.rest_api_call_count = 0
|
||||
|
||||
# Performance tracking
|
||||
self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType}
|
||||
self.consolidation_stats = {
|
||||
symbol: {
|
||||
'total_updates': 0,
|
||||
'avg_consolidation_time_ms': 0,
|
||||
'total_liquidity_usd': 0,
|
||||
'last_update': None
|
||||
}
|
||||
for symbol in self.symbols
|
||||
}
|
||||
self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)}
|
||||
|
||||
# Thread safety
|
||||
self.data_lock = asyncio.Lock()
|
||||
|
||||
# Initialize aiohttp session and connector to None, will be set up in start_streaming
|
||||
self.session: Optional[aiohttp.ClientSession] = None
|
||||
self.connector: Optional[aiohttp.TCPConnector] = None
|
||||
self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization
|
||||
|
||||
# Create REST API session
|
||||
# Fix for Windows aiodns issue - use ThreadedResolver instead
|
||||
connector = aiohttp.TCPConnector(
|
||||
resolver=aiohttp.ThreadedResolver(),
|
||||
use_dns_cache=False
|
||||
)
|
||||
self.rest_session = aiohttp.ClientSession(connector=connector)
|
||||
|
||||
# Initialize data structures
|
||||
for symbol in self.symbols:
|
||||
self.exchange_order_books[symbol]['binance']['connected'] = False
|
||||
self.exchange_order_books[symbol]['binance']['deep_bids'] = {}
|
||||
self.exchange_order_books[symbol]['binance']['deep_asks'] = {}
|
||||
self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None
|
||||
self.exchange_order_books[symbol]['binance']['last_update_id'] = None
|
||||
self.realtime_snapshots[symbol].append(COBSnapshot(
|
||||
symbol=symbol,
|
||||
timestamp=datetime.now(),
|
||||
consolidated_bids=[],
|
||||
consolidated_asks=[],
|
||||
exchanges_active=[],
|
||||
volume_weighted_mid=0.0,
|
||||
total_bid_liquidity=0.0,
|
||||
total_ask_liquidity=0.0,
|
||||
spread_bps=0.0,
|
||||
liquidity_imbalance=0.0,
|
||||
price_buckets={}
|
||||
))
|
||||
|
||||
logger.info(f"Multi-Exchange COB Provider initialized")
|
||||
logger.info(f"Symbols: {self.symbols}")
|
||||
logger.info(f"Bucket size: {bucket_size_bps} bps")
|
||||
logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}")
|
||||
logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}")
|
||||
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
|
||||
|
||||
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
|
||||
"""Initialize exchange configurations"""
|
||||
configs = {}
|
||||
|
||||
# Binance configuration
|
||||
configs[ExchangeType.BINANCE.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BINANCE,
|
||||
weight=0.3, # Higher weight due to volume
|
||||
websocket_url="wss://stream.binance.com:9443/ws/",
|
||||
rest_api_url="https://api.binance.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
|
||||
rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000}
|
||||
)
|
||||
|
||||
# Coinbase Pro configuration
|
||||
configs[ExchangeType.COINBASE.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.COINBASE,
|
||||
weight=0.25,
|
||||
websocket_url="wss://ws-feed.exchange.coinbase.com",
|
||||
rest_api_url="https://api.exchange.coinbase.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'},
|
||||
rate_limits={'requests_per_minute': 600}
|
||||
)
|
||||
|
||||
# Kraken configuration
|
||||
configs[ExchangeType.KRAKEN.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.KRAKEN,
|
||||
weight=0.2,
|
||||
websocket_url="wss://ws.kraken.com",
|
||||
rest_api_url="https://api.kraken.com",
|
||||
symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'},
|
||||
rate_limits={'requests_per_minute': 900}
|
||||
)
|
||||
|
||||
# Huobi configuration
|
||||
configs[ExchangeType.HUOBI.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.HUOBI,
|
||||
weight=0.15,
|
||||
websocket_url="wss://api.huobi.pro/ws",
|
||||
rest_api_url="https://api.huobi.pro",
|
||||
symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'},
|
||||
rate_limits={'requests_per_minute': 2000}
|
||||
)
|
||||
|
||||
# Bitfinex configuration
|
||||
configs[ExchangeType.BITFINEX.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BITFINEX,
|
||||
weight=0.1,
|
||||
websocket_url="wss://api-pub.bitfinex.com/ws/2",
|
||||
rest_api_url="https://api-pub.bitfinex.com",
|
||||
symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'},
|
||||
rate_limits={'requests_per_minute': 1000}
|
||||
)
|
||||
|
||||
return configs
|
||||
def subscribe_to_cob_updates(self, callback):
|
||||
"""Subscribe to COB data updates"""
|
||||
self.cob_subscribers.append(callback)
|
||||
logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}")
|
||||
|
||||
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
|
||||
"""Notify all subscribers of COB data updates"""
|
||||
try:
|
||||
for callback in self.cob_subscribers:
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(callback):
|
||||
await callback(symbol, cob_snapshot)
|
||||
else:
|
||||
callback(symbol, cob_snapshot)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in COB subscriber callback: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error notifying COB subscribers: {e}")
|
||||
|
||||
async def start_streaming(self):
|
||||
"""Start real-time order book streaming from all configured exchanges using only WebSocket"""
|
||||
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
|
||||
@ -1667,23 +1520,97 @@ class MultiExchangeCOBProvider:
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
logger.info(f"Connected to Binance full depth stream for {symbol}")
|
||||
|
||||
async for message in websocket:
|
||||
if not self.is_streaming:
|
||||
break
|
||||
|
||||
while self.is_streaming:
|
||||
try:
|
||||
message = await websocket.recv()
|
||||
data = json.loads(message)
|
||||
await self._process_binance_full_depth(symbol, data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing Binance full depth message: {e}")
|
||||
# Process full depth data
|
||||
if 'bids' in data and 'asks' in data:
|
||||
# Create comprehensive COB snapshot
|
||||
cob_snapshot = {
|
||||
'symbol': symbol,
|
||||
'timestamp': time.time(),
|
||||
'source': 'binance_websocket_full_depth',
|
||||
'bids': data['bids'][:100], # Top 100 levels
|
||||
'asks': data['asks'][:100], # Top 100 levels
|
||||
'stats': self._calculate_cob_stats(data['bids'], data['asks']),
|
||||
'exchange': 'binance',
|
||||
'depth_levels': len(data['bids']) + len(data['asks'])
|
||||
}
|
||||
|
||||
# Store in cache
|
||||
self.cob_data_cache[symbol] = cob_snapshot
|
||||
|
||||
# Notify subscribers
|
||||
await self._notify_cob_subscribers(symbol, cob_snapshot)
|
||||
|
||||
logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Binance full depth: {e}")
|
||||
if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower():
|
||||
logger.warning(f"Binance full depth WebSocket connection closed for {symbol}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing full depth data for {symbol}: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Binance full depth WebSocket error for {symbol}: {e}")
|
||||
finally:
|
||||
logger.info(f"Disconnected from Binance full depth stream for {symbol}")
|
||||
logger.error(f"Error in Binance full depth stream for {symbol}: {e}")
|
||||
|
||||
def _calculate_cob_stats(self, bids: List, asks: List) -> Dict:
|
||||
"""Calculate COB statistics from order book data"""
|
||||
try:
|
||||
if not bids or not asks:
|
||||
return {
|
||||
'mid_price': 0,
|
||||
'spread_bps': 0,
|
||||
'imbalance': 0,
|
||||
'bid_liquidity': 0,
|
||||
'ask_liquidity': 0
|
||||
}
|
||||
|
||||
# Convert string values to float
|
||||
bid_prices = [float(bid[0]) for bid in bids]
|
||||
bid_sizes = [float(bid[1]) for bid in bids]
|
||||
ask_prices = [float(ask[0]) for ask in asks]
|
||||
ask_sizes = [float(ask[1]) for ask in asks]
|
||||
|
||||
# Calculate best bid/ask
|
||||
best_bid = max(bid_prices)
|
||||
best_ask = min(ask_prices)
|
||||
mid_price = (best_bid + best_ask) / 2
|
||||
|
||||
# Calculate spread
|
||||
spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0
|
||||
|
||||
# Calculate liquidity
|
||||
bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels
|
||||
ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels
|
||||
total_liquidity = bid_liquidity + ask_liquidity
|
||||
|
||||
# Calculate imbalance
|
||||
imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
||||
|
||||
return {
|
||||
'mid_price': mid_price,
|
||||
'spread_bps': spread_bps,
|
||||
'imbalance': imbalance,
|
||||
'bid_liquidity': bid_liquidity,
|
||||
'ask_liquidity': ask_liquidity,
|
||||
'best_bid': best_bid,
|
||||
'best_ask': best_ask
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating COB stats: {e}")
|
||||
return {
|
||||
'mid_price': 0,
|
||||
'spread_bps': 0,
|
||||
'imbalance': 0,
|
||||
'bid_liquidity': 0,
|
||||
'ask_liquidity': 0
|
||||
}
|
||||
|
||||
async def _stream_binance_book_ticker(self, symbol: str):
|
||||
"""Stream best bid/ask prices from Binance WebSocket"""
|
||||
@ -1909,4 +1836,14 @@ class MultiExchangeCOBProvider:
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
|
||||
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
|
||||
|
||||
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||
"""Get latest COB data for a symbol from cache"""
|
||||
try:
|
||||
if symbol in self.cob_data_cache:
|
||||
return self.cob_data_cache[symbol]
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting latest COB data for {symbol}: {e}")
|
||||
return None
|
Reference in New Issue
Block a user