added coin API WS implementation

This commit is contained in:
Dobromir Popov
2025-08-08 01:00:38 +03:00
parent bd15bdc87d
commit ba532327b6
5 changed files with 165 additions and 8 deletions

View File

@ -292,4 +292,29 @@ def setup_logging(config: Optional[Config] = None):
log_config = config.logging
# Add separate error log with rotation and immediate flush
try:
from logging.handlers import RotatingFileHandler
error_log_dir = Path('logs')
error_log_dir.mkdir(parents=True, exist_ok=True)
error_log_path = error_log_dir / 'errors.log'
class FlushingErrorHandler(RotatingFileHandler):
def emit(self, record):
super().emit(record)
self.flush()
error_handler = FlushingErrorHandler(
str(error_log_path), maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
error_handler.setFormatter(formatter)
error_handler.setLevel(logging.ERROR)
root_logger = logging.getLogger()
root_logger.addHandler(error_handler)
logger.info("Error log handler initialized at logs/errors.log")
except Exception as e:
logger.warning(f"Failed to initialize error log handler: {e}")
logger.info("Logging configured successfully with SafeFormatter")

View File

@ -20,6 +20,7 @@ Data is structured for consumption by CNN/DQN models and trading dashboards.
"""
import asyncio
import os
import json
import logging
import time
@ -181,6 +182,8 @@ class MultiExchangeCOBProvider:
})
self.fixed_usd_buckets = {} # Fixed USD bucket sizes per symbol
self.bucket_size_bps = 10 # Default bucket size in basis points
self.exchange_update_counts = {}
self.processing_times = defaultdict(list)
# Rate limiting for REST API fallback
self.last_rest_api_call = 0
@ -237,9 +240,12 @@ class MultiExchangeCOBProvider:
# 5. Aggregate trade stream for large order detection
tasks.append(self._stream_binance_agg_trades(symbol))
else:
elif exchange_name in ['coinbase', 'kraken', 'huobi', 'bitfinex']:
# Other exchanges - WebSocket only
tasks.append(self._stream_exchange_orderbook(exchange_name, symbol))
elif exchange_name == 'coinapi':
# Use REST polling for CoinAPI depth snapshots; merged in consolidation
tasks.append(self._poll_coinapi_snapshots(symbol, config))
# Start continuous consolidation and bucket updates
tasks.append(self._continuous_consolidation())
@ -248,6 +254,95 @@ class MultiExchangeCOBProvider:
logger.info(f"Starting {len(tasks)} COB streaming tasks (WebSocket only - NO REST API)")
await asyncio.gather(*tasks)
async def _poll_coinapi_snapshots(self, symbol: str, config: ExchangeConfig):
"""Poll CoinAPI REST current order book snapshots and merge into exchange_order_books."""
try:
api_key = os.environ.get('COINAPI_KEY') or os.getenv('COINAPI_API_KEY')
if not api_key:
logger.warning("COINAPI: API key not set (COINAPI_KEY). Skipping CoinAPI polling.")
return
base_url = config.rest_api_url.rstrip('/')
# Map symbol to CoinAPI symbol_id (e.g., BINANCE_SPOT_ETH_USD). We'll default to KRAKEN for breadth.
# Use multiple source symbols if needed; start with KRAKEN spot.
coinapi_symbol = f"KRAKEN_SPOT_{symbol.replace('/', '_').replace('USDT','USD')}"
min_interval = max(0.5, (config.rate_limits.get('min_interval_ms', 500) / 1000.0)) if config.rate_limits else 0.5
headers = {"X-CoinAPI-Key": api_key}
async with self.data_lock:
if symbol not in self.exchange_order_books:
self.exchange_order_books[symbol] = {}
success_count = 0
error_count = 0
while self.is_streaming:
try:
url = f"{base_url}/orderbooks/current?symbol_id={coinapi_symbol}"
async with self.rest_session.get(url, headers=headers, timeout=5) as resp:
if resp.status == 200:
payload = await resp.json()
# CoinAPI may return list; normalize
if isinstance(payload, list) and payload:
ob = payload[0]
else:
ob = payload
bids = {}
asks = {}
ts = datetime.utcnow()
for b in ob.get('bids', [])[:200]:
price = float(b.get('price'))
size = float(b.get('size'))
if size > 0:
bids[price] = ExchangeOrderBookLevel(
exchange='coinapi',
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='bid',
timestamp=ts,
raw_data=b
)
for a in ob.get('asks', [])[:200]:
price = float(a.get('price'))
size = float(a.get('size'))
if size > 0:
asks[price] = ExchangeOrderBookLevel(
exchange='coinapi',
price=price,
size=size,
volume_usd=price * size,
orders_count=1,
side='ask',
timestamp=ts,
raw_data=a
)
async with self.data_lock:
self.exchange_order_books[symbol]['coinapi'] = {
'bids': bids,
'asks': asks,
'last_update': ts,
'connected': True
}
logger.debug(f"COINAPI snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
success_count += 1
else:
logger.debug(f"COINAPI HTTP {resp.status} for {symbol}")
error_count += 1
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"COINAPI error for {symbol}: {e}")
error_count += 1
# Periodic audit logs every ~100 polls
total = success_count + error_count
if total and total % 100 == 0:
rate = success_count / total
logger.info(f"COINAPI {symbol}: success rate {rate:.2%} over {total} polls; last snapshot bids={len(bids) if 'bids' in locals() else 0} asks={len(asks) if 'asks' in locals() else 0}")
await asyncio.sleep(min_interval)
except Exception as e:
logger.error(f"Error in CoinAPI polling for {symbol}: {e}")
async def _setup_http_session(self):
"""Setup aiohttp session and connector"""
self.connector = aiohttp.TCPConnector(
@ -1035,6 +1130,10 @@ class MultiExchangeCOBProvider:
deep_asks = exchange_data.get('deep_asks', {})
# Merge data: prioritize live data for top levels, add deep data for others
# Treat CoinAPI snapshot as deep data when deep_* not present
if exchange_name == 'coinapi':
deep_bids = deep_bids or live_bids
deep_asks = deep_asks or live_asks
merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid')
merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask')