fix merge
This commit is contained in:
@@ -47,57 +47,6 @@ import aiohttp.resolver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
<<<<<<< HEAD
|
||||
# goal: use top 10 exchanges
|
||||
# https://www.coingecko.com/en/exchanges
|
||||
=======
|
||||
class SimpleRateLimiter:
|
||||
"""Simple rate limiter to prevent 418 errors"""
|
||||
|
||||
def __init__(self, requests_per_second: float = 0.5): # Much more conservative
|
||||
self.requests_per_second = requests_per_second
|
||||
self.last_request_time = 0
|
||||
self.min_interval = 1.0 / requests_per_second
|
||||
self.consecutive_errors = 0
|
||||
self.blocked_until = 0
|
||||
|
||||
def can_make_request(self) -> bool:
|
||||
"""Check if we can make a request"""
|
||||
now = time.time()
|
||||
|
||||
# Check if we're in a blocked state
|
||||
if now < self.blocked_until:
|
||||
return False
|
||||
|
||||
return (now - self.last_request_time) >= self.min_interval
|
||||
|
||||
def record_request(self, success: bool = True):
|
||||
"""Record that a request was made"""
|
||||
self.last_request_time = time.time()
|
||||
|
||||
if success:
|
||||
self.consecutive_errors = 0
|
||||
else:
|
||||
self.consecutive_errors += 1
|
||||
# Exponential backoff for errors
|
||||
if self.consecutive_errors >= 3:
|
||||
backoff_time = min(300, 10 * (2 ** (self.consecutive_errors - 3))) # Max 5 min
|
||||
self.blocked_until = time.time() + backoff_time
|
||||
logger.warning(f"Rate limiter blocked for {backoff_time}s after {self.consecutive_errors} errors")
|
||||
|
||||
def get_wait_time(self) -> float:
|
||||
"""Get time to wait before next request"""
|
||||
now = time.time()
|
||||
|
||||
# Check if blocked
|
||||
if now < self.blocked_until:
|
||||
return self.blocked_until - now
|
||||
|
||||
time_since_last = now - self.last_request_time
|
||||
if time_since_last < self.min_interval:
|
||||
return self.min_interval - time_since_last
|
||||
return 0.0
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
class ExchangeType(Enum):
|
||||
BINANCE = "binance"
|
||||
@@ -105,12 +54,6 @@ class ExchangeType(Enum):
|
||||
KRAKEN = "kraken"
|
||||
HUOBI = "huobi"
|
||||
BITFINEX = "bitfinex"
|
||||
<<<<<<< HEAD
|
||||
BYBIT = "bybit"
|
||||
BITGET = "bitget"
|
||||
=======
|
||||
COINAPI = "coinapi"
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
@dataclass
|
||||
class ExchangeOrderBookLevel:
|
||||
@@ -170,74 +113,18 @@ class MultiExchangeCOBProvider:
|
||||
Aggregates real-time order book data from multiple cryptocurrency exchanges
|
||||
to create a consolidated view of market liquidity and pricing.
|
||||
"""
|
||||
|
||||
<<<<<<< HEAD
|
||||
|
||||
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
|
||||
"""
|
||||
Initialize Multi-Exchange COB Provider
|
||||
|
||||
|
||||
Args:
|
||||
symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT'])
|
||||
bucket_size_bps: Price bucket size in basis points for fine-grain analysis
|
||||
"""
|
||||
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
|
||||
self.bucket_size_bps = bucket_size_bps
|
||||
self.bucket_update_frequency = 100 # ms
|
||||
self.consolidation_frequency = 100 # ms
|
||||
|
||||
# REST API configuration for deep order book
|
||||
self.rest_api_frequency = 2000 # ms - full snapshot every 2 seconds (reduced frequency for deeper data)
|
||||
self.rest_depth_limit = 1000 # Increased to 1000 levels via REST for maximum depth
|
||||
|
||||
# Exchange configurations
|
||||
self.exchange_configs = self._initialize_exchange_configs()
|
||||
|
||||
# Order book storage - now with deep and live separation
|
||||
self.exchange_order_books = {
|
||||
symbol: {
|
||||
exchange.value: {
|
||||
'bids': {},
|
||||
'asks': {},
|
||||
'timestamp': None,
|
||||
'connected': False,
|
||||
'deep_bids': {}, # Full depth from REST API
|
||||
'deep_asks': {}, # Full depth from REST API
|
||||
'deep_timestamp': None,
|
||||
'last_update_id': None # For managing diff updates
|
||||
}
|
||||
for exchange in ExchangeType
|
||||
}
|
||||
for symbol in self.symbols
|
||||
}
|
||||
|
||||
# Consolidated order books
|
||||
self.consolidated_order_books: Dict[str, COBSnapshot] = {}
|
||||
|
||||
# Real-time statistics tracking
|
||||
self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
||||
self.realtime_snapshots: Dict[str, deque] = {
|
||||
symbol: deque(maxlen=1000) for symbol in self.symbols
|
||||
}
|
||||
|
||||
# Session tracking for SVP
|
||||
self.session_start_time = datetime.now()
|
||||
self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols}
|
||||
self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
||||
|
||||
# Fixed USD bucket sizes for different symbols as requested
|
||||
self.fixed_usd_buckets = {
|
||||
'BTC/USDT': 10.0, # $10 buckets for BTC
|
||||
'ETH/USDT': 1.0, # $1 buckets for ETH
|
||||
}
|
||||
|
||||
# WebSocket management
|
||||
=======
|
||||
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
|
||||
"""Initialize multi-exchange COB provider"""
|
||||
self.symbols = symbols
|
||||
self.exchange_configs = exchange_configs
|
||||
self.active_exchanges = ['binance'] # Focus on Binance for now
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
self.is_streaming = False
|
||||
self.cob_data_cache = {} # Cache for COB data
|
||||
self.cob_subscribers = [] # List of callback functions
|
||||
@@ -263,86 +150,6 @@ class MultiExchangeCOBProvider:
|
||||
|
||||
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
|
||||
|
||||
<<<<<<< HEAD
|
||||
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
|
||||
"""Initialize exchange configurations"""
|
||||
configs = {}
|
||||
|
||||
# Binance configuration
|
||||
configs[ExchangeType.BINANCE.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BINANCE,
|
||||
weight=0.3, # Higher weight due to volume
|
||||
websocket_url="wss://stream.binance.com:9443/ws/",
|
||||
rest_api_url="https://api.binance.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
|
||||
rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000}
|
||||
)
|
||||
|
||||
# Coinbase Pro configuration
|
||||
configs[ExchangeType.COINBASE.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.COINBASE,
|
||||
weight=0.25,
|
||||
websocket_url="wss://ws-feed.exchange.coinbase.com",
|
||||
rest_api_url="https://api.exchange.coinbase.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'},
|
||||
rate_limits={'requests_per_minute': 600}
|
||||
)
|
||||
|
||||
# Kraken configuration
|
||||
configs[ExchangeType.KRAKEN.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.KRAKEN,
|
||||
weight=0.2,
|
||||
websocket_url="wss://ws.kraken.com",
|
||||
rest_api_url="https://api.kraken.com",
|
||||
symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'},
|
||||
rate_limits={'requests_per_minute': 900}
|
||||
)
|
||||
|
||||
# Huobi configuration
|
||||
configs[ExchangeType.HUOBI.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.HUOBI,
|
||||
weight=0.15,
|
||||
websocket_url="wss://api.huobi.pro/ws",
|
||||
rest_api_url="https://api.huobi.pro",
|
||||
symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'},
|
||||
rate_limits={'requests_per_minute': 2000}
|
||||
)
|
||||
|
||||
# Bitfinex configuration
|
||||
configs[ExchangeType.BITFINEX.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BITFINEX,
|
||||
weight=0.1,
|
||||
websocket_url="wss://api-pub.bitfinex.com/ws/2",
|
||||
rest_api_url="https://api-pub.bitfinex.com",
|
||||
symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'},
|
||||
rate_limits={'requests_per_minute': 1000}
|
||||
)
|
||||
|
||||
# Bybit configuration
|
||||
configs[ExchangeType.BYBIT.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BYBIT,
|
||||
weight=0.18,
|
||||
websocket_url="wss://stream.bybit.com/v5/public/spot",
|
||||
rest_api_url="https://api.bybit.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
|
||||
rate_limits={'requests_per_minute': 1200}
|
||||
)
|
||||
# Bitget configuration
|
||||
configs[ExchangeType.BITGET.value] = ExchangeConfig(
|
||||
exchange_type=ExchangeType.BITGET,
|
||||
weight=0.12,
|
||||
websocket_url="wss://ws.bitget.com/spot/v1/stream",
|
||||
rest_api_url="https://api.bitget.com",
|
||||
symbols_mapping={'BTC/USDT': 'BTCUSDT_SPBL', 'ETH/USDT': 'ETHUSDT_SPBL'},
|
||||
rate_limits={'requests_per_minute': 1200}
|
||||
)
|
||||
return configs
|
||||
=======
|
||||
def subscribe_to_cob_updates(self, callback):
|
||||
"""Subscribe to COB data updates"""
|
||||
self.cob_subscribers.append(callback)
|
||||
logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}")
|
||||
>>>>>>> d49a473ed6f4aef55bfdd47d6370e53582be6b7b
|
||||
|
||||
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
|
||||
"""Notify all subscribers of COB data updates"""
|
||||
|
||||
Reference in New Issue
Block a user