Merge commit 'd49a473ed6f4aef55bfdd47d6370e53582be6b7b' into cleanup
This commit is contained in:
@@ -20,6 +20,7 @@ Data is structured for consumption by CNN/DQN models and trading dashboards.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
@@ -46,8 +47,57 @@ import aiohttp.resolver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
<<<<<<< HEAD
|
||||
# goal: use top 10 exchanges
|
||||
# https://www.coingecko.com/en/exchanges
|
||||
=======
|
||||
class SimpleRateLimiter:
|
||||
"""Simple rate limiter to prevent 418 errors"""
|
||||
|
||||
def __init__(self, requests_per_second: float = 0.5): # Much more conservative
|
||||
self.requests_per_second = requests_per_second
|
||||
self.last_request_time = 0
|
||||
self.min_interval = 1.0 / requests_per_second
|
||||
self.consecutive_errors = 0
|
||||
self.blocked_until = 0
|
||||
|
||||
def can_make_request(self) -> bool:
|
||||
"""Check if we can make a request"""
|
||||
now = time.time()
|
||||
|
||||
# Check if we're in a blocked state
|
||||
if now < self.blocked_until:
|
||||
return False
|
||||
|
||||
return (now - self.last_request_time) >= self.min_interval
|
||||
|
||||
def record_request(self, success: bool = True):
|
||||
"""Record that a request was made"""
|
||||
self.last_request_time = time.time()
|
||||
|
||||
if success:
|
||||
self.consecutive_errors = 0
|
||||
else:
|
||||
self.consecutive_errors += 1
|
||||
# Exponential backoff for errors
|
||||
if self.consecutive_errors >= 3:
|
||||
backoff_time = min(300, 10 * (2 ** (self.consecutive_errors - 3))) # Max 5 min
|
||||
self.blocked_until = time.time() + backoff_time
|
||||
logger.warning(f"Rate limiter blocked for {backoff_time}s after {self.consecutive_errors} errors")
|
||||
|
||||
def get_wait_time(self) -> float:
|
||||
"""Get time to wait before next request"""
|
||||
now = time.time()
|
||||
|
||||
# Check if blocked
|
||||
if now < self.blocked_until:
|
||||
return self.blocked_until - now
|
||||
|
||||
time_since_last = now - self.last_request_time
|
||||
if time_since_last < self.min_interval:
|
||||
return self.min_interval - time_since_last
|
||||
return 0.0
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
class ExchangeType(Enum):
|
||||
BINANCE = "binance"
|
||||
@@ -55,8 +105,12 @@ class ExchangeType(Enum):
|
||||
KRAKEN = "kraken"
|
||||
HUOBI = "huobi"
|
||||
BITFINEX = "bitfinex"
|
||||
<<<<<<< HEAD
|
||||
BYBIT = "bybit"
|
||||
BITGET = "bitget"
|
||||
=======
|
||||
COINAPI = "coinapi"
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
@dataclass
|
||||
class ExchangeOrderBookLevel:
|
||||
@@ -117,6 +171,7 @@ class MultiExchangeCOBProvider:
|
||||
to create a consolidated view of market liquidity and pricing.
|
||||
"""
|
||||
|
||||
<<<<<<< HEAD
|
||||
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
|
||||
"""
|
||||
Initialize Multi-Exchange COB Provider
|
||||
@@ -176,69 +231,39 @@ class MultiExchangeCOBProvider:
|
||||
}
|
||||
|
||||
# WebSocket management
|
||||
=======
|
||||
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
|
||||
"""Initialize multi-exchange COB provider"""
|
||||
self.symbols = symbols
|
||||
self.exchange_configs = exchange_configs
|
||||
self.active_exchanges = ['binance'] # Focus on Binance for now
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
self.is_streaming = False
|
||||
self.active_exchanges = ['binance'] # Start with Binance only
|
||||
self.cob_data_cache = {} # Cache for COB data
|
||||
self.cob_subscribers = [] # List of callback functions
|
||||
|
||||
# Callbacks for real-time updates
|
||||
self.cob_update_callbacks = []
|
||||
self.bucket_update_callbacks = []
|
||||
# Initialize missing attributes that are used throughout the code
|
||||
self.current_order_book = {} # Current order book data per symbol
|
||||
self.realtime_snapshots = defaultdict(list) # Real-time snapshots per symbol
|
||||
self.cob_update_callbacks = [] # COB update callbacks
|
||||
self.data_lock = asyncio.Lock() # Lock for thread-safe data access
|
||||
self.consolidation_stats = defaultdict(lambda: {
|
||||
'total_updates': 0,
|
||||
'active_price_levels': 0,
|
||||
'total_liquidity_usd': 0.0
|
||||
})
|
||||
self.fixed_usd_buckets = {} # Fixed USD bucket sizes per symbol
|
||||
self.bucket_size_bps = 10 # Default bucket size in basis points
|
||||
self.exchange_update_counts = {}
|
||||
self.processing_times = defaultdict(list)
|
||||
|
||||
# Performance tracking
|
||||
self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType}
|
||||
self.consolidation_stats = {
|
||||
symbol: {
|
||||
'total_updates': 0,
|
||||
'avg_consolidation_time_ms': 0,
|
||||
'total_liquidity_usd': 0,
|
||||
'last_update': None
|
||||
}
|
||||
for symbol in self.symbols
|
||||
}
|
||||
self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)}
|
||||
# Rate limiting for REST API fallback
|
||||
self.last_rest_api_call = 0
|
||||
self.rest_api_call_count = 0
|
||||
|
||||
# Thread safety
|
||||
self.data_lock = asyncio.Lock()
|
||||
|
||||
# Initialize aiohttp session and connector to None, will be set up in start_streaming
|
||||
self.session: Optional[aiohttp.ClientSession] = None
|
||||
self.connector: Optional[aiohttp.TCPConnector] = None
|
||||
self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization
|
||||
|
||||
# Create REST API session
|
||||
# Fix for Windows aiodns issue - use ThreadedResolver instead
|
||||
connector = aiohttp.TCPConnector(
|
||||
resolver=aiohttp.ThreadedResolver(),
|
||||
use_dns_cache=False
|
||||
)
|
||||
self.rest_session = aiohttp.ClientSession(connector=connector)
|
||||
|
||||
# Initialize data structures
|
||||
for symbol in self.symbols:
|
||||
self.exchange_order_books[symbol]['binance']['connected'] = False
|
||||
self.exchange_order_books[symbol]['binance']['deep_bids'] = {}
|
||||
self.exchange_order_books[symbol]['binance']['deep_asks'] = {}
|
||||
self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None
|
||||
self.exchange_order_books[symbol]['binance']['last_update_id'] = None
|
||||
self.realtime_snapshots[symbol].append(COBSnapshot(
|
||||
symbol=symbol,
|
||||
timestamp=datetime.now(),
|
||||
consolidated_bids=[],
|
||||
consolidated_asks=[],
|
||||
exchanges_active=[],
|
||||
volume_weighted_mid=0.0,
|
||||
total_bid_liquidity=0.0,
|
||||
total_ask_liquidity=0.0,
|
||||
spread_bps=0.0,
|
||||
liquidity_imbalance=0.0,
|
||||
price_buckets={}
|
||||
))
|
||||
|
||||
logger.info(f"Multi-Exchange COB Provider initialized")
|
||||
logger.info(f"Symbols: {self.symbols}")
|
||||
logger.info(f"Bucket size: {bucket_size_bps} bps")
|
||||
logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}")
|
||||
logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}")
|
||||
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
|
||||
|
||||
<<<<<<< HEAD
|
||||
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
|
||||
"""Initialize exchange configurations"""
|
||||
configs = {}
|
||||
@@ -312,9 +337,29 @@ class MultiExchangeCOBProvider:
|
||||
rate_limits={'requests_per_minute': 1200}
|
||||
)
|
||||
return configs
|
||||
=======
|
||||
def subscribe_to_cob_updates(self, callback):
|
||||
"""Subscribe to COB data updates"""
|
||||
self.cob_subscribers.append(callback)
|
||||
logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}")
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
|
||||
"""Notify all subscribers of COB data updates"""
|
||||
try:
|
||||
for callback in self.cob_subscribers:
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(callback):
|
||||
await callback(symbol, cob_snapshot)
|
||||
else:
|
||||
callback(symbol, cob_snapshot)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in COB subscriber callback: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error notifying COB subscribers: {e}")
|
||||
|
||||
async def start_streaming(self):
|
||||
"""Start real-time order book streaming from all configured exchanges"""
|
||||
"""Start real-time order book streaming from all configured exchanges using only WebSocket"""
|
||||
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
|
||||
self.is_streaming = True
|
||||
|
||||
@@ -326,23 +371,126 @@ class MultiExchangeCOBProvider:
|
||||
for symbol in self.symbols:
|
||||
for exchange_name, config in self.exchange_configs.items():
|
||||
if config.enabled and exchange_name in self.active_exchanges:
|
||||
# Start WebSocket stream
|
||||
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
|
||||
|
||||
# Start deep order book (REST API) stream
|
||||
tasks.append(self._stream_deep_orderbook(exchange_name, symbol))
|
||||
|
||||
# Start trade stream (for SVP)
|
||||
if exchange_name == 'binance': # Only Binance for now
|
||||
if exchange_name == 'binance':
|
||||
# Enhanced Binance WebSocket streams (NO REST API)
|
||||
|
||||
# 1. Partial depth stream (20 levels, 100ms updates) - for real-time updates
|
||||
tasks.append(self._stream_binance_orderbook(symbol, config))
|
||||
|
||||
# 2. Full depth stream (1000 levels, 1000ms updates) - replaces REST API
|
||||
tasks.append(self._stream_binance_full_depth(symbol))
|
||||
|
||||
# 3. Trade stream for order flow analysis
|
||||
tasks.append(self._stream_binance_trades(symbol))
|
||||
|
||||
# 4. Book ticker stream for best bid/ask real-time
|
||||
tasks.append(self._stream_binance_book_ticker(symbol))
|
||||
|
||||
# 5. Aggregate trade stream for large order detection
|
||||
tasks.append(self._stream_binance_agg_trades(symbol))
|
||||
elif exchange_name in ['coinbase', 'kraken', 'huobi', 'bitfinex']:
|
||||
# Other exchanges - WebSocket only
|
||||
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
|
||||
elif exchange_name == 'coinapi':
|
||||
# Use REST polling for CoinAPI depth snapshots; merged in consolidation
|
||||
tasks.append(self._poll_coinapi_snapshots(symbol, config))
|
||||
|
||||
# Start continuous consolidation and bucket updates
|
||||
tasks.append(self._continuous_consolidation())
|
||||
tasks.append(self._continuous_bucket_updates())
|
||||
|
||||
logger.info(f"Starting {len(tasks)} COB streaming tasks")
|
||||
logger.info(f"Starting {len(tasks)} COB streaming tasks (WebSocket only - NO REST API)")
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def _poll_coinapi_snapshots(self, symbol: str, config: ExchangeConfig):
|
||||
"""Poll CoinAPI REST current order book snapshots and merge into exchange_order_books."""
|
||||
try:
|
||||
api_key = os.environ.get('COINAPI_KEY') or os.getenv('COINAPI_API_KEY')
|
||||
if not api_key:
|
||||
logger.warning("COINAPI: API key not set (COINAPI_KEY). Skipping CoinAPI polling.")
|
||||
return
|
||||
base_url = config.rest_api_url.rstrip('/')
|
||||
# Map symbol to CoinAPI symbol_id (e.g., BINANCE_SPOT_ETH_USD). We'll default to KRAKEN for breadth.
|
||||
# Use multiple source symbols if needed; start with KRAKEN spot.
|
||||
coinapi_symbol = f"KRAKEN_SPOT_{symbol.replace('/', '_').replace('USDT','USD')}"
|
||||
|
||||
min_interval = max(0.5, (config.rate_limits.get('min_interval_ms', 500) / 1000.0)) if config.rate_limits else 0.5
|
||||
headers = {"X-CoinAPI-Key": api_key}
|
||||
|
||||
async with self.data_lock:
|
||||
if symbol not in self.exchange_order_books:
|
||||
self.exchange_order_books[symbol] = {}
|
||||
|
||||
success_count = 0
|
||||
error_count = 0
|
||||
while self.is_streaming:
|
||||
try:
|
||||
url = f"{base_url}/orderbooks/current?symbol_id={coinapi_symbol}"
|
||||
async with self.rest_session.get(url, headers=headers, timeout=5) as resp:
|
||||
if resp.status == 200:
|
||||
payload = await resp.json()
|
||||
# CoinAPI may return list; normalize
|
||||
if isinstance(payload, list) and payload:
|
||||
ob = payload[0]
|
||||
else:
|
||||
ob = payload
|
||||
bids = {}
|
||||
asks = {}
|
||||
ts = datetime.utcnow()
|
||||
for b in ob.get('bids', [])[:200]:
|
||||
price = float(b.get('price'))
|
||||
size = float(b.get('size'))
|
||||
if size > 0:
|
||||
bids[price] = ExchangeOrderBookLevel(
|
||||
exchange='coinapi',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='bid',
|
||||
timestamp=ts,
|
||||
raw_data=b
|
||||
)
|
||||
for a in ob.get('asks', [])[:200]:
|
||||
price = float(a.get('price'))
|
||||
size = float(a.get('size'))
|
||||
if size > 0:
|
||||
asks[price] = ExchangeOrderBookLevel(
|
||||
exchange='coinapi',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=ts,
|
||||
raw_data=a
|
||||
)
|
||||
async with self.data_lock:
|
||||
self.exchange_order_books[symbol]['coinapi'] = {
|
||||
'bids': bids,
|
||||
'asks': asks,
|
||||
'last_update': ts,
|
||||
'connected': True
|
||||
}
|
||||
logger.debug(f"COINAPI snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
||||
success_count += 1
|
||||
else:
|
||||
logger.debug(f"COINAPI HTTP {resp.status} for {symbol}")
|
||||
error_count += 1
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug(f"COINAPI error for {symbol}: {e}")
|
||||
error_count += 1
|
||||
# Periodic audit logs every ~100 polls
|
||||
total = success_count + error_count
|
||||
if total and total % 100 == 0:
|
||||
rate = success_count / total
|
||||
logger.info(f"COINAPI {symbol}: success rate {rate:.2%} over {total} polls; last snapshot bids={len(bids) if 'bids' in locals() else 0} asks={len(asks) if 'asks' in locals() else 0}")
|
||||
await asyncio.sleep(min_interval)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in CoinAPI polling for {symbol}: {e}")
|
||||
|
||||
async def _setup_http_session(self):
|
||||
"""Setup aiohttp session and connector"""
|
||||
self.connector = aiohttp.TCPConnector(
|
||||
@@ -394,11 +542,19 @@ class MultiExchangeCOBProvider:
|
||||
await asyncio.sleep(5) # Wait 5 seconds on error
|
||||
|
||||
async def _fetch_binance_deep_orderbook(self, symbol: str):
|
||||
"""Fetch deep order book from Binance REST API"""
|
||||
"""Fetch deep order book from Binance REST API with rate limiting"""
|
||||
try:
|
||||
if not self.rest_session:
|
||||
return
|
||||
|
||||
# Check rate limiter before making request
|
||||
if not self.rest_rate_limiter.can_make_request():
|
||||
wait_time = self.rest_rate_limiter.get_wait_time()
|
||||
if wait_time > 0:
|
||||
logger.debug(f"Rate limited, waiting {wait_time:.1f}s before {symbol} request")
|
||||
await asyncio.sleep(wait_time)
|
||||
return # Skip this cycle
|
||||
|
||||
# Convert symbol format for Binance
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
url = f"https://api.binance.com/api/v3/depth"
|
||||
@@ -407,10 +563,21 @@ class MultiExchangeCOBProvider:
|
||||
'limit': self.rest_depth_limit
|
||||
}
|
||||
|
||||
async with self.rest_session.get(url, params=params) as response:
|
||||
# Add headers to reduce detection
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
||||
'Accept': 'application/json'
|
||||
}
|
||||
|
||||
async with self.rest_session.get(url, params=params, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
await self._process_binance_deep_orderbook(symbol, data)
|
||||
self.rest_rate_limiter.record_request() # Record successful request
|
||||
elif response.status in [418, 429, 451]:
|
||||
logger.warning(f"Binance REST API rate limited (HTTP {response.status}) for {symbol}")
|
||||
# Increase wait time for next request
|
||||
await asyncio.sleep(10) # Wait 10 seconds on rate limit
|
||||
else:
|
||||
logger.error(f"Binance REST API error {response.status} for {symbol}")
|
||||
|
||||
@@ -1117,6 +1284,10 @@ class MultiExchangeCOBProvider:
|
||||
deep_asks = exchange_data.get('deep_asks', {})
|
||||
|
||||
# Merge data: prioritize live data for top levels, add deep data for others
|
||||
# Treat CoinAPI snapshot as deep data when deep_* not present
|
||||
if exchange_name == 'coinapi':
|
||||
deep_bids = deep_bids or live_bids
|
||||
deep_asks = deep_asks or live_asks
|
||||
merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid')
|
||||
merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask')
|
||||
|
||||
@@ -1145,10 +1316,11 @@ class MultiExchangeCOBProvider:
|
||||
consolidated_bids[price].exchange_breakdown[exchange_name] = level
|
||||
|
||||
# Update dominant exchange based on volume
|
||||
if level.volume_usd > consolidated_bids[price].exchange_breakdown.get(
|
||||
consolidated_bids[price].dominant_exchange,
|
||||
type('obj', (object,), {'volume_usd': 0})()
|
||||
).volume_usd:
|
||||
current_dominant = consolidated_bids[price].exchange_breakdown.get(
|
||||
consolidated_bids[price].dominant_exchange
|
||||
)
|
||||
current_volume = current_dominant.volume_usd if current_dominant else 0
|
||||
if level.volume_usd > current_volume:
|
||||
consolidated_bids[price].dominant_exchange = exchange_name
|
||||
|
||||
# Process merged asks (similar logic)
|
||||
@@ -1171,10 +1343,11 @@ class MultiExchangeCOBProvider:
|
||||
consolidated_asks[price].total_orders += level.orders_count
|
||||
consolidated_asks[price].exchange_breakdown[exchange_name] = level
|
||||
|
||||
if level.volume_usd > consolidated_asks[price].exchange_breakdown.get(
|
||||
consolidated_asks[price].dominant_exchange,
|
||||
type('obj', (object,), {'volume_usd': 0})()
|
||||
).volume_usd:
|
||||
current_dominant = consolidated_asks[price].exchange_breakdown.get(
|
||||
consolidated_asks[price].dominant_exchange
|
||||
)
|
||||
current_volume = current_dominant.volume_usd if current_dominant else 0
|
||||
if level.volume_usd > current_volume:
|
||||
consolidated_asks[price].dominant_exchange = exchange_name
|
||||
|
||||
logger.debug(f"Consolidated {len(consolidated_bids)} bids and {len(consolidated_asks)} asks for {symbol}")
|
||||
@@ -1221,7 +1394,7 @@ class MultiExchangeCOBProvider:
|
||||
)
|
||||
|
||||
# Store consolidated order book
|
||||
self.consolidated_order_books[symbol] = cob_snapshot
|
||||
self.current_order_book[symbol] = cob_snapshot
|
||||
self.realtime_snapshots[symbol].append(cob_snapshot)
|
||||
|
||||
# Update real-time statistics
|
||||
@@ -1390,8 +1563,8 @@ class MultiExchangeCOBProvider:
|
||||
while self.is_streaming:
|
||||
try:
|
||||
for symbol in self.symbols:
|
||||
if symbol in self.consolidated_order_books:
|
||||
cob = self.consolidated_order_books[symbol]
|
||||
if symbol in self.current_order_book:
|
||||
cob = self.current_order_book[symbol]
|
||||
|
||||
# Notify bucket update callbacks
|
||||
for callback in self.bucket_update_callbacks:
|
||||
@@ -1423,22 +1596,22 @@ class MultiExchangeCOBProvider:
|
||||
|
||||
def get_consolidated_orderbook(self, symbol: str) -> Optional[COBSnapshot]:
|
||||
"""Get current consolidated order book snapshot"""
|
||||
return self.consolidated_order_books.get(symbol)
|
||||
return self.current_order_book.get(symbol)
|
||||
|
||||
def get_price_buckets(self, symbol: str, bucket_count: int = 100) -> Optional[Dict]:
|
||||
"""Get fine-grain price buckets for a symbol"""
|
||||
if symbol not in self.consolidated_order_books:
|
||||
if symbol not in self.current_order_book:
|
||||
return None
|
||||
|
||||
cob = self.consolidated_order_books[symbol]
|
||||
cob = self.current_order_book[symbol]
|
||||
return cob.price_buckets
|
||||
|
||||
def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]:
|
||||
"""Get breakdown of liquidity by exchange"""
|
||||
if symbol not in self.consolidated_order_books:
|
||||
if symbol not in self.current_order_book:
|
||||
return None
|
||||
|
||||
cob = self.consolidated_order_books[symbol]
|
||||
cob = self.current_order_book[symbol]
|
||||
breakdown = {}
|
||||
|
||||
for exchange in cob.exchanges_active:
|
||||
@@ -1482,10 +1655,10 @@ class MultiExchangeCOBProvider:
|
||||
|
||||
def get_market_depth_analysis(self, symbol: str, depth_levels: int = 20) -> Optional[Dict]:
|
||||
"""Get detailed market depth analysis"""
|
||||
if symbol not in self.consolidated_order_books:
|
||||
if symbol not in self.current_order_book:
|
||||
return None
|
||||
|
||||
cob = self.consolidated_order_books[symbol]
|
||||
cob = self.current_order_book[symbol]
|
||||
|
||||
# Analyze depth distribution
|
||||
bid_levels = cob.consolidated_bids[:depth_levels]
|
||||
@@ -1600,4 +1773,346 @@ class MultiExchangeCOBProvider:
|
||||
return self.realtime_stats.get(symbol, {})
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting real-time stats for {symbol}: {e}")
|
||||
return {}
|
||||
return {}
|
||||
|
||||
async def _stream_binance_full_depth(self, symbol: str):
|
||||
"""Stream full depth order book from Binance WebSocket (replaces REST API)"""
|
||||
try:
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
# Full depth stream with 1000 levels, updated every 1000ms
|
||||
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@depth@1000ms"
|
||||
logger.info(f"Connecting to Binance full depth WebSocket: {ws_url}")
|
||||
|
||||
if websockets is None or websockets_connect is None:
|
||||
raise ImportError("websockets module not available")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
logger.info(f"Connected to Binance full depth stream for {symbol}")
|
||||
|
||||
while self.is_streaming:
|
||||
try:
|
||||
message = await websocket.recv()
|
||||
data = json.loads(message)
|
||||
|
||||
# Process full depth data
|
||||
if 'bids' in data and 'asks' in data:
|
||||
# Create comprehensive COB snapshot
|
||||
cob_snapshot = {
|
||||
'symbol': symbol,
|
||||
'timestamp': time.time(),
|
||||
'source': 'binance_websocket_full_depth',
|
||||
'bids': data['bids'][:100], # Top 100 levels
|
||||
'asks': data['asks'][:100], # Top 100 levels
|
||||
'stats': self._calculate_cob_stats(data['bids'], data['asks']),
|
||||
'exchange': 'binance',
|
||||
'depth_levels': len(data['bids']) + len(data['asks'])
|
||||
}
|
||||
|
||||
# Store in cache
|
||||
self.cob_data_cache[symbol] = cob_snapshot
|
||||
|
||||
# Notify subscribers
|
||||
await self._notify_cob_subscribers(symbol, cob_snapshot)
|
||||
|
||||
logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
|
||||
|
||||
except Exception as e:
|
||||
if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower():
|
||||
logger.warning(f"Binance full depth WebSocket connection closed for {symbol}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing full depth data for {symbol}: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Binance full depth stream for {symbol}: {e}")
|
||||
|
||||
def _calculate_cob_stats(self, bids: List, asks: List) -> Dict:
|
||||
"""Calculate COB statistics from order book data"""
|
||||
try:
|
||||
if not bids or not asks:
|
||||
return {
|
||||
'mid_price': 0,
|
||||
'spread_bps': 0,
|
||||
'imbalance': 0,
|
||||
'bid_liquidity': 0,
|
||||
'ask_liquidity': 0
|
||||
}
|
||||
|
||||
# Convert string values to float
|
||||
bid_prices = [float(bid[0]) for bid in bids]
|
||||
bid_sizes = [float(bid[1]) for bid in bids]
|
||||
ask_prices = [float(ask[0]) for ask in asks]
|
||||
ask_sizes = [float(ask[1]) for ask in asks]
|
||||
|
||||
# Calculate best bid/ask
|
||||
best_bid = max(bid_prices)
|
||||
best_ask = min(ask_prices)
|
||||
mid_price = (best_bid + best_ask) / 2
|
||||
|
||||
# Calculate spread
|
||||
spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0
|
||||
|
||||
# Calculate liquidity
|
||||
bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels
|
||||
ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels
|
||||
total_liquidity = bid_liquidity + ask_liquidity
|
||||
|
||||
# Calculate imbalance
|
||||
imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
||||
|
||||
return {
|
||||
'mid_price': mid_price,
|
||||
'spread_bps': spread_bps,
|
||||
'imbalance': imbalance,
|
||||
'bid_liquidity': bid_liquidity,
|
||||
'ask_liquidity': ask_liquidity,
|
||||
'best_bid': best_bid,
|
||||
'best_ask': best_ask
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating COB stats: {e}")
|
||||
return {
|
||||
'mid_price': 0,
|
||||
'spread_bps': 0,
|
||||
'imbalance': 0,
|
||||
'bid_liquidity': 0,
|
||||
'ask_liquidity': 0
|
||||
}
|
||||
|
||||
async def _stream_binance_book_ticker(self, symbol: str):
|
||||
"""Stream best bid/ask prices from Binance WebSocket"""
|
||||
try:
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@bookTicker"
|
||||
logger.info(f"Connecting to Binance book ticker WebSocket: {ws_url}")
|
||||
|
||||
if websockets is None or websockets_connect is None:
|
||||
raise ImportError("websockets module not available")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
logger.info(f"Connected to Binance book ticker stream for {symbol}")
|
||||
|
||||
async for message in websocket:
|
||||
if not self.is_streaming:
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_binance_book_ticker(symbol, data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing Binance book ticker message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Binance book ticker: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Binance book ticker WebSocket error for {symbol}: {e}")
|
||||
finally:
|
||||
logger.info(f"Disconnected from Binance book ticker stream for {symbol}")
|
||||
|
||||
async def _stream_binance_agg_trades(self, symbol: str):
|
||||
"""Stream aggregated trades from Binance WebSocket for large order detection"""
|
||||
try:
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
ws_url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@aggTrade"
|
||||
logger.info(f"Connecting to Binance aggregate trades WebSocket: {ws_url}")
|
||||
|
||||
if websockets is None or websockets_connect is None:
|
||||
raise ImportError("websockets module not available")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
logger.info(f"Connected to Binance aggregate trades stream for {symbol}")
|
||||
|
||||
async for message in websocket:
|
||||
if not self.is_streaming:
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_binance_agg_trade(symbol, data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing Binance agg trade message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Binance agg trade: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Binance aggregate trades WebSocket error for {symbol}: {e}")
|
||||
finally:
|
||||
logger.info(f"Disconnected from Binance aggregate trades stream for {symbol}")
|
||||
|
||||
async def _process_binance_full_depth(self, symbol: str, data: Dict):
|
||||
"""Process full depth order book data from WebSocket (replaces REST API)"""
|
||||
try:
|
||||
timestamp = datetime.now()
|
||||
exchange_name = 'binance'
|
||||
|
||||
# Parse full depth bids and asks (up to 1000 levels)
|
||||
full_bids = {}
|
||||
full_asks = {}
|
||||
|
||||
for bid_data in data.get('bids', []):
|
||||
price = float(bid_data[0])
|
||||
size = float(bid_data[1])
|
||||
if size > 0:
|
||||
full_bids[price] = ExchangeOrderBookLevel(
|
||||
exchange=exchange_name,
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='bid',
|
||||
timestamp=timestamp
|
||||
)
|
||||
|
||||
for ask_data in data.get('asks', []):
|
||||
price = float(ask_data[0])
|
||||
size = float(ask_data[1])
|
||||
if size > 0:
|
||||
full_asks[price] = ExchangeOrderBookLevel(
|
||||
exchange=exchange_name,
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=timestamp
|
||||
)
|
||||
|
||||
# Update full depth storage (replaces REST API data)
|
||||
async with self.data_lock:
|
||||
self.exchange_order_books[symbol][exchange_name]['deep_bids'] = full_bids
|
||||
self.exchange_order_books[symbol][exchange_name]['deep_asks'] = full_asks
|
||||
self.exchange_order_books[symbol][exchange_name]['deep_timestamp'] = timestamp
|
||||
self.exchange_order_books[symbol][exchange_name]['last_update_id'] = data.get('lastUpdateId')
|
||||
|
||||
logger.debug(f"Updated full depth via WebSocket for {symbol}: {len(full_bids)} bids, {len(full_asks)} asks")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing full depth WebSocket data for {symbol}: {e}")
|
||||
|
||||
async def _process_binance_book_ticker(self, symbol: str, data: Dict):
|
||||
"""Process book ticker data for best bid/ask tracking"""
|
||||
try:
|
||||
timestamp = datetime.now()
|
||||
|
||||
best_bid_price = float(data.get('b', 0))
|
||||
best_bid_qty = float(data.get('B', 0))
|
||||
best_ask_price = float(data.get('a', 0))
|
||||
best_ask_qty = float(data.get('A', 0))
|
||||
|
||||
# Store best bid/ask data
|
||||
async with self.data_lock:
|
||||
if symbol not in self.realtime_stats:
|
||||
self.realtime_stats[symbol] = {}
|
||||
|
||||
self.realtime_stats[symbol].update({
|
||||
'best_bid_price': best_bid_price,
|
||||
'best_bid_qty': best_bid_qty,
|
||||
'best_ask_price': best_ask_price,
|
||||
'best_ask_qty': best_ask_qty,
|
||||
'spread': best_ask_price - best_bid_price,
|
||||
'mid_price': (best_bid_price + best_ask_price) / 2,
|
||||
'book_ticker_timestamp': timestamp
|
||||
})
|
||||
|
||||
logger.debug(f"Book ticker update for {symbol}: Bid {best_bid_price}@{best_bid_qty}, Ask {best_ask_price}@{best_ask_qty}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing book ticker for {symbol}: {e}")
|
||||
|
||||
async def _process_binance_agg_trade(self, symbol: str, data: Dict):
|
||||
"""Process aggregate trade data for large order detection"""
|
||||
try:
|
||||
timestamp = datetime.fromtimestamp(int(data['T']) / 1000)
|
||||
price = float(data['p'])
|
||||
quantity = float(data['q'])
|
||||
is_buyer_maker = data['m']
|
||||
agg_trade_id = data['a']
|
||||
first_trade_id = data['f']
|
||||
last_trade_id = data['l']
|
||||
|
||||
# Calculate trade value and size
|
||||
trade_value_usd = price * quantity
|
||||
trade_count = last_trade_id - first_trade_id + 1
|
||||
|
||||
# Detect large orders (institutional activity)
|
||||
is_large_order = trade_value_usd > 10000 # $10k+ trades
|
||||
is_whale_order = trade_value_usd > 100000 # $100k+ trades
|
||||
|
||||
agg_trade = {
|
||||
'symbol': symbol,
|
||||
'timestamp': timestamp,
|
||||
'price': price,
|
||||
'quantity': quantity,
|
||||
'value_usd': trade_value_usd,
|
||||
'trade_count': trade_count,
|
||||
'is_buyer_maker': is_buyer_maker,
|
||||
'side': 'sell' if is_buyer_maker else 'buy', # Opposite of maker
|
||||
'is_large_order': is_large_order,
|
||||
'is_whale_order': is_whale_order,
|
||||
'agg_trade_id': agg_trade_id
|
||||
}
|
||||
|
||||
# Add to aggregate trade tracking
|
||||
await self._add_agg_trade_to_analysis(symbol, agg_trade)
|
||||
|
||||
# Log significant trades
|
||||
if is_whale_order:
|
||||
logger.info(f"WHALE ORDER detected for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()} at ${price}")
|
||||
elif is_large_order:
|
||||
logger.debug(f"Large order for {symbol}: ${trade_value_usd:,.0f} {agg_trade['side'].upper()}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing aggregate trade for {symbol}: {e}")
|
||||
|
||||
async def _add_agg_trade_to_analysis(self, symbol: str, agg_trade: Dict):
|
||||
"""Add aggregate trade to analysis queues"""
|
||||
try:
|
||||
async with self.data_lock:
|
||||
# Initialize if needed
|
||||
if symbol not in self.realtime_stats:
|
||||
self.realtime_stats[symbol] = {}
|
||||
if 'agg_trades' not in self.realtime_stats[symbol]:
|
||||
self.realtime_stats[symbol]['agg_trades'] = deque(maxlen=1000)
|
||||
|
||||
# Add to aggregate trade history
|
||||
self.realtime_stats[symbol]['agg_trades'].append(agg_trade)
|
||||
|
||||
# Update real-time aggregate statistics
|
||||
recent_trades = list(self.realtime_stats[symbol]['agg_trades'])[-100:] # Last 100 trades
|
||||
|
||||
if recent_trades:
|
||||
total_buy_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'buy')
|
||||
total_sell_volume = sum(t['value_usd'] for t in recent_trades if t['side'] == 'sell')
|
||||
total_volume = total_buy_volume + total_sell_volume
|
||||
|
||||
large_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_large_order'])
|
||||
large_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_large_order'])
|
||||
|
||||
whale_buy_count = sum(1 for t in recent_trades if t['side'] == 'buy' and t['is_whale_order'])
|
||||
whale_sell_count = sum(1 for t in recent_trades if t['side'] == 'sell' and t['is_whale_order'])
|
||||
|
||||
# Calculate order flow metrics
|
||||
self.realtime_stats[symbol].update({
|
||||
'buy_sell_ratio': total_buy_volume / total_sell_volume if total_sell_volume > 0 else float('inf'),
|
||||
'total_volume_100': total_volume,
|
||||
'large_order_ratio': (large_buy_count + large_sell_count) / len(recent_trades),
|
||||
'whale_activity': whale_buy_count + whale_sell_count,
|
||||
'institutional_flow': 'BULLISH' if total_buy_volume > total_sell_volume * 1.2 else 'BEARISH' if total_sell_volume > total_buy_volume * 1.2 else 'NEUTRAL'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
|
||||
|
||||
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||
"""Get latest COB data for a symbol from cache"""
|
||||
try:
|
||||
if symbol in self.cob_data_cache:
|
||||
return self.cob_data_cache[symbol]
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting latest COB data for {symbol}: {e}")
|
||||
return None
|
||||
Reference in New Issue
Block a user