From ba532327b606625c99337a8dcd7c8068ea02cf58 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 8 Aug 2025 01:00:38 +0300 Subject: [PATCH] added coin API WS implementation --- core/config.py | 25 +++++++ core/multi_exchange_cob_provider.py | 101 +++++++++++++++++++++++++++- web/clean_dashboard.py | 27 ++++++-- web/cob_realtime_dashboard.py | 5 +- web/component_manager.py | 15 ++++- 5 files changed, 165 insertions(+), 8 deletions(-) diff --git a/core/config.py b/core/config.py index 6a0f027..72e7210 100644 --- a/core/config.py +++ b/core/config.py @@ -292,4 +292,29 @@ def setup_logging(config: Optional[Config] = None): log_config = config.logging + # Add separate error log with rotation and immediate flush + try: + from logging.handlers import RotatingFileHandler + error_log_dir = Path('logs') + error_log_dir.mkdir(parents=True, exist_ok=True) + error_log_path = error_log_dir / 'errors.log' + + class FlushingErrorHandler(RotatingFileHandler): + def emit(self, record): + super().emit(record) + self.flush() + + error_handler = FlushingErrorHandler( + str(error_log_path), maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' + ) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + error_handler.setFormatter(formatter) + error_handler.setLevel(logging.ERROR) + + root_logger = logging.getLogger() + root_logger.addHandler(error_handler) + logger.info("Error log handler initialized at logs/errors.log") + except Exception as e: + logger.warning(f"Failed to initialize error log handler: {e}") + logger.info("Logging configured successfully with SafeFormatter") diff --git a/core/multi_exchange_cob_provider.py b/core/multi_exchange_cob_provider.py index d10fb5b..1d879b4 100644 --- a/core/multi_exchange_cob_provider.py +++ b/core/multi_exchange_cob_provider.py @@ -20,6 +20,7 @@ Data is structured for consumption by CNN/DQN models and trading dashboards. """ import asyncio +import os import json import logging import time @@ -181,6 +182,8 @@ class MultiExchangeCOBProvider: }) self.fixed_usd_buckets = {} # Fixed USD bucket sizes per symbol self.bucket_size_bps = 10 # Default bucket size in basis points + self.exchange_update_counts = {} + self.processing_times = defaultdict(list) # Rate limiting for REST API fallback self.last_rest_api_call = 0 @@ -237,9 +240,12 @@ class MultiExchangeCOBProvider: # 5. Aggregate trade stream for large order detection tasks.append(self._stream_binance_agg_trades(symbol)) - else: + elif exchange_name in ['coinbase', 'kraken', 'huobi', 'bitfinex']: # Other exchanges - WebSocket only tasks.append(self._stream_exchange_orderbook(exchange_name, symbol)) + elif exchange_name == 'coinapi': + # Use REST polling for CoinAPI depth snapshots; merged in consolidation + tasks.append(self._poll_coinapi_snapshots(symbol, config)) # Start continuous consolidation and bucket updates tasks.append(self._continuous_consolidation()) @@ -248,6 +254,95 @@ class MultiExchangeCOBProvider: logger.info(f"Starting {len(tasks)} COB streaming tasks (WebSocket only - NO REST API)") await asyncio.gather(*tasks) + async def _poll_coinapi_snapshots(self, symbol: str, config: ExchangeConfig): + """Poll CoinAPI REST current order book snapshots and merge into exchange_order_books.""" + try: + api_key = os.environ.get('COINAPI_KEY') or os.getenv('COINAPI_API_KEY') + if not api_key: + logger.warning("COINAPI: API key not set (COINAPI_KEY). Skipping CoinAPI polling.") + return + base_url = config.rest_api_url.rstrip('/') + # Map symbol to CoinAPI symbol_id (e.g., BINANCE_SPOT_ETH_USD). We'll default to KRAKEN for breadth. + # Use multiple source symbols if needed; start with KRAKEN spot. + coinapi_symbol = f"KRAKEN_SPOT_{symbol.replace('/', '_').replace('USDT','USD')}" + + min_interval = max(0.5, (config.rate_limits.get('min_interval_ms', 500) / 1000.0)) if config.rate_limits else 0.5 + headers = {"X-CoinAPI-Key": api_key} + + async with self.data_lock: + if symbol not in self.exchange_order_books: + self.exchange_order_books[symbol] = {} + + success_count = 0 + error_count = 0 + while self.is_streaming: + try: + url = f"{base_url}/orderbooks/current?symbol_id={coinapi_symbol}" + async with self.rest_session.get(url, headers=headers, timeout=5) as resp: + if resp.status == 200: + payload = await resp.json() + # CoinAPI may return list; normalize + if isinstance(payload, list) and payload: + ob = payload[0] + else: + ob = payload + bids = {} + asks = {} + ts = datetime.utcnow() + for b in ob.get('bids', [])[:200]: + price = float(b.get('price')) + size = float(b.get('size')) + if size > 0: + bids[price] = ExchangeOrderBookLevel( + exchange='coinapi', + price=price, + size=size, + volume_usd=price * size, + orders_count=1, + side='bid', + timestamp=ts, + raw_data=b + ) + for a in ob.get('asks', [])[:200]: + price = float(a.get('price')) + size = float(a.get('size')) + if size > 0: + asks[price] = ExchangeOrderBookLevel( + exchange='coinapi', + price=price, + size=size, + volume_usd=price * size, + orders_count=1, + side='ask', + timestamp=ts, + raw_data=a + ) + async with self.data_lock: + self.exchange_order_books[symbol]['coinapi'] = { + 'bids': bids, + 'asks': asks, + 'last_update': ts, + 'connected': True + } + logger.debug(f"COINAPI snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks") + success_count += 1 + else: + logger.debug(f"COINAPI HTTP {resp.status} for {symbol}") + error_count += 1 + except asyncio.CancelledError: + break + except Exception as e: + logger.debug(f"COINAPI error for {symbol}: {e}") + error_count += 1 + # Periodic audit logs every ~100 polls + total = success_count + error_count + if total and total % 100 == 0: + rate = success_count / total + logger.info(f"COINAPI {symbol}: success rate {rate:.2%} over {total} polls; last snapshot bids={len(bids) if 'bids' in locals() else 0} asks={len(asks) if 'asks' in locals() else 0}") + await asyncio.sleep(min_interval) + except Exception as e: + logger.error(f"Error in CoinAPI polling for {symbol}: {e}") + async def _setup_http_session(self): """Setup aiohttp session and connector""" self.connector = aiohttp.TCPConnector( @@ -1035,6 +1130,10 @@ class MultiExchangeCOBProvider: deep_asks = exchange_data.get('deep_asks', {}) # Merge data: prioritize live data for top levels, add deep data for others + # Treat CoinAPI snapshot as deep data when deep_* not present + if exchange_name == 'coinapi': + deep_bids = deep_bids or live_bids + deep_asks = deep_asks or live_asks merged_bids = self._merge_orderbook_data(live_bids, deep_bids, 'bid') merged_asks = self._merge_orderbook_data(live_asks, deep_asks, 'ask') diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 91ea391..de19afb 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -1379,19 +1379,38 @@ class CleanTradingDashboard: eth_recent = _recent_ticks('ETH/USDT') btc_recent = _recent_ticks('BTC/USDT') + # Include per-exchange stats when available + exchange_stats_eth = None + exchange_stats_btc = None + if hasattr(self.data_provider, 'cob_integration') and self.data_provider.cob_integration: + try: + snaps = self.data_provider.cob_integration.exchange_order_books + if 'ETH/USDT' in snaps: + exchange_stats_eth = {ex: { + 'bids': len(data.get('bids', {})), + 'asks': len(data.get('asks', {})) + } for ex, data in snaps['ETH/USDT'].items() if isinstance(data, dict)} + if 'BTC/USDT' in snaps: + exchange_stats_btc = {ex: { + 'bids': len(data.get('bids', {})), + 'asks': len(data.get('asks', {})) + } for ex, data in snaps['BTC/USDT'].items() if isinstance(data, dict)} + except Exception: + pass + eth_components = self.component_manager.format_cob_data( eth_snapshot, 'ETH/USDT', eth_imbalance_stats, cob_mode, - update_info={'update_rate': eth_rate, 'aggregated_1s': eth_agg_1s[-5:], 'recent_ticks': eth_recent} + update_info={'update_rate': eth_rate, 'aggregated_1s': eth_agg_1s[-5:], 'recent_ticks': eth_recent, 'exchanges': exchange_stats_eth} ) btc_components = self.component_manager.format_cob_data( btc_snapshot, 'BTC/USDT', btc_imbalance_stats, cob_mode, - update_info={'update_rate': btc_rate, 'aggregated_1s': btc_agg_1s[-5:], 'recent_ticks': btc_recent} + update_info={'update_rate': btc_rate, 'aggregated_1s': btc_agg_1s[-5:], 'recent_ticks': btc_recent, 'exchanges': exchange_stats_btc} ) return eth_components, btc_components @@ -7089,7 +7108,7 @@ class CleanTradingDashboard: """Initialize enhanced training system for model predictions""" try: # Try to import and initialize enhanced training system - from enhanced_realtime_training import EnhancedRealtimeTrainingSystem + from enhanced_realtime_training import EnhancedRealtimeTrainingSystem # Optional self.training_system = EnhancedRealtimeTrainingSystem( orchestrator=self.orchestrator, @@ -7106,7 +7125,7 @@ class CleanTradingDashboard: logger.debug("Enhanced training system initialized for model predictions") except ImportError: - logger.warning("Enhanced training system not available - using mock predictions") + logger.warning("Enhanced training system not available - predictions disabled for this module") self.training_system = None except Exception as e: logger.error(f"Error initializing enhanced training system: {e}") diff --git a/web/cob_realtime_dashboard.py b/web/cob_realtime_dashboard.py index 58c8fc5..38c42fe 100644 --- a/web/cob_realtime_dashboard.py +++ b/web/cob_realtime_dashboard.py @@ -356,7 +356,8 @@ class COBDashboardServer: if mid_price <= 0: return - now = datetime.now() + from datetime import timezone + now = datetime.now(timezone.utc) current_second = now.replace(microsecond=0) # Get or create current candle @@ -377,7 +378,7 @@ class COBDashboardServer: if current_second > current_candle['timestamp']: # Close previous candle finished_candle = { - 'timestamp': current_candle['timestamp'].isoformat(), + 'timestamp': current_candle['timestamp'].isoformat(), # UTC ISO8601 'open': current_candle['open'], 'high': current_candle['high'], 'low': current_candle['low'], diff --git a/web/component_manager.py b/web/component_manager.py index 3a8758b..8504726 100644 --- a/web/component_manager.py +++ b/web/component_manager.py @@ -377,8 +377,21 @@ class DashboardComponentManager: overview_panel ]) - # --- Right Panel: Compact Ladder --- + # --- Right Panel: Compact Ladder with optional exchange stats --- + exchange_stats = (update_info or {}).get('exchanges') if isinstance(update_info, dict) else None ladder_panel = self._create_cob_ladder_panel(bids, asks, mid_price, symbol) + if exchange_stats: + # Render a tiny exchange contribution summary above ladder + try: + rows = [] + for ex, stats_ex in exchange_stats.items(): + rows.append(html.Small(f"{ex}: {stats_ex.get('bids',0)}/{stats_ex.get('asks',0)}", className="text-muted me-2")) + ladder_panel = html.Div([ + html.Div(rows, className="mb-1"), + ladder_panel + ]) + except Exception: + pass # Append small extras line from aggregated_1s and recent_ticks extras = [] if update_info: