diff --git a/core/data_provider.py b/core/data_provider.py index 745dce3..0b24f34 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -196,11 +196,39 @@ class DataProvider: # Load existing pivot bounds from cache self._load_all_pivot_bounds() + # Centralized data collection for models and dashboard + self.cob_data_cache: Dict[str, deque] = {} # COB data for models + self.training_data_cache: Dict[str, deque] = {} # Training data for models + self.model_data_subscribers: Dict[str, List[Callable]] = {} # Model-specific data callbacks + + # Callbacks for data distribution + self.cob_data_callbacks: List[Callable] = [] # COB data callbacks + self.training_data_callbacks: List[Callable] = [] # Training data callbacks + self.model_prediction_callbacks: List[Callable] = [] # Model prediction callbacks + + # Initialize data caches + for symbol in self.symbols: + binance_symbol = symbol.replace('/', '').upper() + self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data + self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer + + # Data collection threads + self.data_collection_active = False + + # COB data collection + self.cob_collection_active = False + self.cob_collection_thread = None + + # Training data collection + self.training_data_collection_active = False + self.training_data_thread = None + logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") logger.info("Centralized data distribution enabled") logger.info("Pivot-based normalization system enabled") logger.info("Williams Market Structure integration enabled") + logger.info("COB and training data collection enabled") # Rate limiting self.last_request_time = {} @@ -2559,4 +2587,591 @@ class DataProvider: if attempt < self.max_retries - 1: time.sleep(5 * (attempt + 1)) - return None \ No newline at end of file + return None + # ===== CENTRALIZED DATA COLLECTION METHODS ===== + + def start_centralized_data_collection(self): + """Start all centralized data collection processes""" + logger.info("Starting centralized data collection for all models and dashboard") + + # Start COB data collection + self.start_cob_data_collection() + + # Start training data collection + self.start_training_data_collection() + + logger.info("All centralized data collection processes started") + + def stop_centralized_data_collection(self): + """Stop all centralized data collection processes""" + logger.info("Stopping centralized data collection") + + # Stop COB collection + self.cob_collection_active = False + if self.cob_collection_thread and self.cob_collection_thread.is_alive(): + self.cob_collection_thread.join(timeout=5) + + # Stop training data collection + self.training_data_collection_active = False + if self.training_data_thread and self.training_data_thread.is_alive(): + self.training_data_thread.join(timeout=5) + + logger.info("Centralized data collection stopped") + + def start_cob_data_collection(self): + """Start COB (Consolidated Order Book) data collection""" + if self.cob_collection_active: + logger.warning("COB data collection already active") + return + + self.cob_collection_active = True + self.cob_collection_thread = Thread(target=self._cob_collection_worker, daemon=True) + self.cob_collection_thread.start() + logger.info("COB data collection started") + + def _cob_collection_worker(self): + """Worker thread for COB data collection""" + import requests + import time + import threading + + logger.info("COB data collection worker started") + + # Use separate threads for each symbol to achieve higher update frequency + def collect_symbol_data(symbol): + while self.cob_collection_active: + try: + self._collect_cob_data_for_symbol(symbol) + # Sleep for a very short time to achieve ~120 updates/sec across all symbols + # With 2 symbols, each can update at ~60/sec + time.sleep(0.016) # ~60 updates per second per symbol + except Exception as e: + logger.error(f"Error collecting COB data for {symbol}: {e}") + time.sleep(1) # Short recovery time + + # Start a thread for each symbol + threads = [] + for symbol in self.symbols: + thread = threading.Thread(target=collect_symbol_data, args=(symbol,), daemon=True) + thread.start() + threads.append(thread) + + # Keep the main thread alive + while self.cob_collection_active: + time.sleep(1) + + # Join threads when collection is stopped + for thread in threads: + thread.join(timeout=1) + + def _collect_cob_data_for_symbol(self, symbol: str): + """Collect COB data for a specific symbol using Binance REST API""" + try: + import requests + + # Convert symbol format + binance_symbol = symbol.replace('/', '').upper() + + # Get order book data + url = f"https://api.binance.com/api/v3/depth" + params = { + 'symbol': binance_symbol, + 'limit': 100 # Get top 100 levels + } + + response = requests.get(url, params=params, timeout=5) + if response.status_code == 200: + order_book = response.json() + + # Process and cache the data + cob_snapshot = self._process_order_book_data(symbol, order_book) + + # Store in cache (ensure cache exists) + if binance_symbol not in self.cob_data_cache: + self.cob_data_cache[binance_symbol] = deque(maxlen=300) + + self.cob_data_cache[binance_symbol].append(cob_snapshot) + + # Distribute to COB data subscribers + self._distribute_cob_data(symbol, cob_snapshot) + + else: + logger.debug(f"Failed to fetch COB data for {symbol}: {response.status_code}") + + except Exception as e: + logger.debug(f"Error collecting COB data for {symbol}: {e}") + + def _process_order_book_data(self, symbol: str, order_book: dict) -> dict: + """Process raw order book data into structured COB snapshot with multi-timeframe imbalance metrics""" + try: + bids = [[float(price), float(qty)] for price, qty in order_book.get('bids', [])] + asks = [[float(price), float(qty)] for price, qty in order_book.get('asks', [])] + + # Calculate statistics + total_bid_volume = sum(qty for _, qty in bids) + total_ask_volume = sum(qty for _, qty in asks) + + best_bid = bids[0][0] if bids else 0 + best_ask = asks[0][0] if asks else 0 + mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0 + spread = best_ask - best_bid if best_bid and best_ask else 0 + spread_bps = (spread / mid_price * 10000) if mid_price > 0 else 0 + + # Calculate current imbalance + imbalance = (total_bid_volume - total_ask_volume) / (total_bid_volume + total_ask_volume) if (total_bid_volume + total_ask_volume) > 0 else 0 + + # Calculate multi-timeframe imbalances + binance_symbol = symbol.replace('/', '').upper() + + # Initialize imbalance metrics + imbalance_1s = imbalance # Current imbalance is 1s + imbalance_5s = imbalance # Default to current if not enough history + imbalance_15s = imbalance + imbalance_60s = imbalance + + # Calculate historical imbalances if we have enough data + if binance_symbol in self.cob_data_cache: + cache = self.cob_data_cache[binance_symbol] + now = datetime.now() + + # Get snapshots for different timeframes + snapshots_5s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 5] + snapshots_15s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 15] + snapshots_60s = [s for s in cache if (now - s['timestamp']).total_seconds() <= 60] + + # Calculate imbalances for each timeframe + if snapshots_5s: + bid_vol_5s = sum(s['stats']['bid_liquidity'] for s in snapshots_5s) + ask_vol_5s = sum(s['stats']['ask_liquidity'] for s in snapshots_5s) + total_vol_5s = bid_vol_5s + ask_vol_5s + imbalance_5s = (bid_vol_5s - ask_vol_5s) / total_vol_5s if total_vol_5s > 0 else 0 + + if snapshots_15s: + bid_vol_15s = sum(s['stats']['bid_liquidity'] for s in snapshots_15s) + ask_vol_15s = sum(s['stats']['ask_liquidity'] for s in snapshots_15s) + total_vol_15s = bid_vol_15s + ask_vol_15s + imbalance_15s = (bid_vol_15s - ask_vol_15s) / total_vol_15s if total_vol_15s > 0 else 0 + + if snapshots_60s: + bid_vol_60s = sum(s['stats']['bid_liquidity'] for s in snapshots_60s) + ask_vol_60s = sum(s['stats']['ask_liquidity'] for s in snapshots_60s) + total_vol_60s = bid_vol_60s + ask_vol_60s + imbalance_60s = (bid_vol_60s - ask_vol_60s) / total_vol_60s if total_vol_60s > 0 else 0 + + cob_snapshot = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bids': bids[:20], # Top 20 levels + 'asks': asks[:20], # Top 20 levels + 'stats': { + 'best_bid': best_bid, + 'best_ask': best_ask, + 'mid_price': mid_price, + 'spread': spread, + 'spread_bps': spread_bps, + 'bid_liquidity': total_bid_volume, + 'ask_liquidity': total_ask_volume, + 'total_liquidity': total_bid_volume + total_ask_volume, + 'imbalance': imbalance, + 'imbalance_1s': imbalance_1s, + 'imbalance_5s': imbalance_5s, + 'imbalance_15s': imbalance_15s, + 'imbalance_60s': imbalance_60s, + 'levels': len(bids) + len(asks) + } + } + + return cob_snapshot + + except Exception as e: + logger.error(f"Error processing order book data for {symbol}: {e}") + return {} + + def start_training_data_collection(self): + """Start training data collection for models""" + if self.training_data_collection_active: + logger.warning("Training data collection already active") + return + + self.training_data_collection_active = True + self.training_data_thread = Thread(target=self._training_data_collection_worker, daemon=True) + self.training_data_thread.start() + logger.info("Training data collection started") + + def _training_data_collection_worker(self): + """Worker thread for training data collection""" + import time + + logger.info("Training data collection worker started") + + while self.training_data_collection_active: + try: + # Collect training data for all symbols + for symbol in self.symbols: + training_sample = self._collect_training_sample(symbol) + if training_sample: + binance_symbol = symbol.replace('/', '').upper() + self.training_data_cache[binance_symbol].append(training_sample) + + # Distribute to training data subscribers + self._distribute_training_data(symbol, training_sample) + + # Sleep for 5 seconds between collections + time.sleep(5) + + except Exception as e: + logger.error(f"Error in training data collection worker: {e}") + time.sleep(10) # Wait longer on error + + def _collect_training_sample(self, symbol: str) -> Optional[dict]: + """Collect a training sample for a specific symbol""" + try: + # Get recent market data + recent_data = self.get_historical_data(symbol, '1m', limit=100) + if recent_data is None or len(recent_data) < 50: + return None + + # Get recent ticks + recent_ticks = self.get_recent_ticks(symbol, count=100) + if len(recent_ticks) < 10: + return None + + # Get COB data + binance_symbol = symbol.replace('/', '').upper() + recent_cob = list(self.cob_data_cache.get(binance_symbol, []))[-10:] if binance_symbol in self.cob_data_cache else [] + + # Create training sample + training_sample = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'ohlcv_data': recent_data.tail(50).to_dict('records'), + 'tick_data': [ + { + 'price': tick.price, + 'volume': tick.volume, + 'timestamp': tick.timestamp + } for tick in recent_ticks[-50:] + ], + 'cob_data': recent_cob, + 'features': self._extract_training_features(symbol, recent_data, recent_ticks, recent_cob) + } + + return training_sample + + except Exception as e: + logger.error(f"Error collecting training sample for {symbol}: {e}") + return None + + def _extract_training_features(self, symbol: str, ohlcv_data: pd.DataFrame, + recent_ticks: List[MarketTick], recent_cob: List[dict]) -> dict: + """Extract features for training from various data sources""" + try: + features = {} + + # OHLCV features + if len(ohlcv_data) > 0: + latest = ohlcv_data.iloc[-1] + features.update({ + 'price': latest['close'], + 'volume': latest['volume'], + 'price_change_1m': (latest['close'] - ohlcv_data.iloc[-2]['close']) / ohlcv_data.iloc[-2]['close'] if len(ohlcv_data) > 1 else 0, + 'volume_ratio': latest['volume'] / ohlcv_data['volume'].mean() if len(ohlcv_data) > 1 else 1, + 'volatility': ohlcv_data['close'].pct_change().std() if len(ohlcv_data) > 1 else 0 + }) + + # Tick features + if recent_ticks: + tick_prices = [tick.price for tick in recent_ticks] + tick_volumes = [tick.volume for tick in recent_ticks] + features.update({ + 'tick_price_std': np.std(tick_prices) if len(tick_prices) > 1 else 0, + 'tick_volume_mean': np.mean(tick_volumes), + 'tick_count': len(recent_ticks) + }) + + # COB features + if recent_cob: + latest_cob = recent_cob[-1] + if 'stats' in latest_cob: + stats = latest_cob['stats'] + features.update({ + 'spread_bps': stats.get('spread_bps', 0), + 'imbalance': stats.get('imbalance', 0), + 'liquidity': stats.get('total_liquidity', 0), + 'cob_levels': stats.get('levels', 0) + }) + + return features + + except Exception as e: + logger.error(f"Error extracting training features for {symbol}: {e}") + return {} + + # ===== SUBSCRIPTION METHODS FOR MODELS AND DASHBOARD ===== + + def subscribe_to_cob_data(self, callback: Callable[[str, dict], None]) -> str: + """Subscribe to COB data updates""" + subscriber_id = str(uuid.uuid4()) + self.cob_data_callbacks.append(callback) + logger.info(f"COB data subscriber added: {subscriber_id}") + return subscriber_id + + def subscribe_to_training_data(self, callback: Callable[[str, dict], None]) -> str: + """Subscribe to training data updates""" + subscriber_id = str(uuid.uuid4()) + self.training_data_callbacks.append(callback) + logger.info(f"Training data subscriber added: {subscriber_id}") + return subscriber_id + + def subscribe_to_model_predictions(self, callback: Callable[[str, dict], None]) -> str: + """Subscribe to model prediction updates""" + subscriber_id = str(uuid.uuid4()) + self.model_prediction_callbacks.append(callback) + logger.info(f"Model prediction subscriber added: {subscriber_id}") + return subscriber_id + + def _distribute_cob_data(self, symbol: str, cob_snapshot: dict): + """Distribute COB data to all subscribers""" + for callback in self.cob_data_callbacks: + try: + Thread(target=lambda: callback(symbol, cob_snapshot), daemon=True).start() + except Exception as e: + logger.error(f"Error distributing COB data: {e}") + + def _distribute_training_data(self, symbol: str, training_sample: dict): + """Distribute training data to all subscribers""" + for callback in self.training_data_callbacks: + try: + Thread(target=lambda: callback(symbol, training_sample), daemon=True).start() + except Exception as e: + logger.error(f"Error distributing training data: {e}") + + def _distribute_model_predictions(self, symbol: str, prediction: dict): + """Distribute model predictions to all subscribers""" + for callback in self.model_prediction_callbacks: + try: + Thread(target=lambda: callback(symbol, prediction), daemon=True).start() + except Exception as e: + logger.error(f"Error distributing model prediction: {e}") + + # ===== DATA ACCESS METHODS FOR MODELS AND DASHBOARD ===== + + def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]: + """Get recent COB data for a symbol""" + binance_symbol = symbol.replace('/', '').upper() + if binance_symbol in self.cob_data_cache: + return list(self.cob_data_cache[binance_symbol])[-count:] + return [] + + def get_training_data(self, symbol: str, count: int = 100) -> List[dict]: + """Get recent training data for a symbol""" + binance_symbol = symbol.replace('/', '').upper() + if binance_symbol in self.training_data_cache: + return list(self.training_data_cache[binance_symbol])[-count:] + return [] + + def collect_cob_data(self, symbol: str) -> dict: + """ + Collect Consolidated Order Book (COB) data for a symbol using REST API + + This centralized method collects COB data for all consumers (models, dashboard, etc.) + """ + try: + import requests + import time + + # Use Binance REST API for order book data + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500" + + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + + # Process order book data + bids = [[float(price), float(qty)] for price, qty in data.get('bids', [])] + asks = [[float(price), float(qty)] for price, qty in data.get('asks', [])] + + # Calculate mid price + best_bid = bids[0][0] if bids else 0 + best_ask = asks[0][0] if asks else 0 + mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0 + + # Calculate order book stats + bid_liquidity = sum(qty for _, qty in bids[:20]) + ask_liquidity = sum(qty for _, qty in asks[:20]) + total_liquidity = bid_liquidity + ask_liquidity + + # Calculate imbalance + imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 + + # Calculate spread in basis points + spread = (best_ask - best_bid) / mid_price * 10000 if mid_price > 0 else 0 + + # Create COB snapshot + cob_snapshot = { + 'symbol': symbol, + 'timestamp': int(time.time() * 1000), + 'bids': bids[:50], # Limit to top 50 levels + 'asks': asks[:50], # Limit to top 50 levels + 'stats': { + 'mid_price': mid_price, + 'best_bid': best_bid, + 'best_ask': best_ask, + 'bid_liquidity': bid_liquidity, + 'ask_liquidity': ask_liquidity, + 'total_liquidity': total_liquidity, + 'imbalance': imbalance, + 'spread_bps': spread + } + } + + # Store in cache + with self.subscriber_lock: + if not hasattr(self, 'cob_data_cache'): + self.cob_data_cache = {} + + if symbol not in self.cob_data_cache: + self.cob_data_cache[symbol] = [] + + # Add to cache with max size limit + self.cob_data_cache[symbol].append(cob_snapshot) + if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes of 1s data + self.cob_data_cache[symbol].pop(0) + + # Notify subscribers + self._notify_cob_subscribers(symbol, cob_snapshot) + + return cob_snapshot + else: + logger.warning(f"Failed to fetch COB data for {symbol}: {response.status_code}") + return {} + + except Exception as e: + logger.debug(f"Error collecting COB data for {symbol}: {e}") + return {} + + def start_cob_collection(self): + """ + Start COB data collection in background thread + """ + try: + import threading + import time + + def cob_collector(): + """Collect COB data using REST API calls""" + logger.info("Starting centralized COB data collection") + while True: + try: + # Collect data for both symbols + for symbol in ['ETH/USDT', 'BTC/USDT']: + self.collect_cob_data(symbol) + + # Sleep for 1 second between collections + time.sleep(1) + except Exception as e: + logger.debug(f"Error in COB collection: {e}") + time.sleep(5) # Wait longer on error + + # Start collector in background thread + if not hasattr(self, '_cob_thread_started') or not self._cob_thread_started: + cob_thread = threading.Thread(target=cob_collector, daemon=True) + cob_thread.start() + self._cob_thread_started = True + logger.info("Centralized COB data collection started") + + except Exception as e: + logger.error(f"Error starting COB collection: {e}") + + def _notify_cob_subscribers(self, symbol: str, cob_snapshot: dict): + """Notify subscribers of new COB data""" + with self.subscriber_lock: + if not hasattr(self, 'cob_subscribers'): + self.cob_subscribers = {} + + # Notify all subscribers for this symbol + for subscriber_id, callback in self.cob_subscribers.items(): + try: + callback(symbol, cob_snapshot) + except Exception as e: + logger.debug(f"Error notifying COB subscriber {subscriber_id}: {e}") + + def subscribe_to_cob(self, callback) -> str: + """Subscribe to COB data updates""" + with self.subscriber_lock: + if not hasattr(self, 'cob_subscribers'): + self.cob_subscribers = {} + + subscriber_id = str(uuid.uuid4()) + self.cob_subscribers[subscriber_id] = callback + + # Start collection if not already started + self.start_cob_collection() + + return subscriber_id + + def get_latest_cob_data(self, symbol: str) -> dict: + """Get latest COB data for a symbol""" + with self.subscriber_lock: + # Convert symbol to Binance format for cache lookup + binance_symbol = symbol.replace('/', '').upper() + + logger.debug(f"Getting COB data for {symbol} (binance: {binance_symbol})") + + if not hasattr(self, 'cob_data_cache'): + logger.debug("COB data cache not initialized") + return {} + + if binance_symbol not in self.cob_data_cache: + logger.debug(f"Symbol {binance_symbol} not in COB cache. Available: {list(self.cob_data_cache.keys())}") + return {} + + if not self.cob_data_cache[binance_symbol]: + logger.debug(f"COB cache for {binance_symbol} is empty") + return {} + + latest_data = self.cob_data_cache[binance_symbol][-1] + logger.debug(f"Latest COB data type for {binance_symbol}: {type(latest_data)}") + return latest_data + + def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]: + """Get recent COB data for a symbol""" + with self.subscriber_lock: + # Convert symbol to Binance format for cache lookup + binance_symbol = symbol.replace('/', '').upper() + + if not hasattr(self, 'cob_data_cache') or binance_symbol not in self.cob_data_cache: + return [] + + # Return the most recent 'count' snapshots + return list(self.cob_data_cache[binance_symbol])[-count:] + + def get_data_summary(self) -> dict: + """Get summary of all collected data""" + summary = { + 'symbols': self.symbols, + 'subscribers': { + 'tick_subscribers': len(self.subscribers), + 'cob_subscribers': len(self.cob_data_callbacks), + 'training_subscribers': len(self.training_data_callbacks), + 'prediction_subscribers': len(self.model_prediction_callbacks) + }, + 'data_counts': {}, + 'collection_status': { + 'cob_collection': self.cob_collection_active, + 'training_collection': self.training_data_collection_active, + 'streaming': self.is_streaming + } + } + + # Add data counts for each symbol + for symbol in self.symbols: + binance_symbol = symbol.replace('/', '').upper() + summary['data_counts'][symbol] = { + 'ticks': len(self.tick_buffers.get(binance_symbol, [])), + 'cob_snapshots': len(self.cob_data_cache.get(binance_symbol, [])), + 'training_samples': len(self.training_data_cache.get(binance_symbol, [])) + } + + return summary \ No newline at end of file diff --git a/core/orchestrator.py b/core/orchestrator.py index 0e4bb25..469b85b 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -215,6 +215,11 @@ class TradingOrchestrator: logger.info(f"Symbols: {self.symbols}") logger.info("Universal Data Adapter integrated for centralized data flow") + # Start centralized data collection for all models and dashboard + logger.info("Starting centralized data collection...") + self.data_provider.start_centralized_data_collection() + logger.info("Centralized data collection started - all models and dashboard will receive data") + # Initialize models, COB integration, and training system self._initialize_ml_models() self._initialize_cob_integration() diff --git a/kill_stale_processes.py b/kill_stale_processes.py index 7d7df6c..de74b8c 100644 --- a/kill_stale_processes.py +++ b/kill_stale_processes.py @@ -1,40 +1,331 @@ -import psutil +""" +Kill Stale Processes + +This script identifies and kills stale Python processes that might be causing +the dashboard startup freeze. It looks for: +1. Hanging dashboard processes +2. Stale COB data collection threads +3. Matplotlib GUI processes +4. Blocked network connections + +Usage: + python kill_stale_processes.py +""" + +import os import sys +import psutil +import signal +import time +from datetime import datetime -try: - current_pid = psutil.Process().pid - processes = [ - p for p in psutil.process_iter() - if any(x in p.name().lower() for x in ["python", "tensorboard"]) - and any(x in ' '.join(p.cmdline()) for x in ["scalping", "training", "tensorboard"]) - and p.pid != current_pid - ] - for p in processes: - try: - p.kill() - print(f"Killed process: PID={p.pid}, Name={p.name()}") - except Exception as e: - print(f"Error killing PID={p.pid}: {e}") - - killed_pids = set() - for port in range(8050, 8052): - for proc in psutil.process_iter(): - if proc.pid == current_pid: - continue +def find_python_processes(): + """Find all Python processes""" + python_processes = [] + + try: + for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time', 'status']): try: - for conn in proc.connections(kind="inet"): - if conn.laddr.port == port: - if proc.pid not in killed_pids: - proc.kill() - print(f"Killed process on port {port}: PID={proc.pid}, Name={proc.name()}") - killed_pids.add(proc.pid) - except (psutil.AccessDenied, psutil.NoSuchProcess): + if proc.info['name'] and 'python' in proc.info['name'].lower(): + # Get command line to identify dashboard processes + cmdline = ' '.join(proc.info['cmdline']) if proc.info['cmdline'] else '' + + python_processes.append({ + 'pid': proc.info['pid'], + 'name': proc.info['name'], + 'cmdline': cmdline, + 'create_time': proc.info['create_time'], + 'status': proc.info['status'], + 'process': proc + }) + except (psutil.NoSuchProcess, psutil.AccessDenied): continue - except Exception as e: - print(f"Error checking/killing PID={proc.pid} for port {port}: {e}") - if not any(pid for pid in killed_pids): - print(f"No process found using port {port}") - print("Stale processes killed") -except Exception as e: - print(f"Error in kill_stale_processes.py: {e}") - sys.exit(1) \ No newline at end of file + + except Exception as e: + print(f"Error finding Python processes: {e}") + + return python_processes + +def identify_dashboard_processes(python_processes): + """Identify processes related to the dashboard""" + dashboard_processes = [] + + dashboard_keywords = [ + 'clean_dashboard', + 'run_clean_dashboard', + 'dashboard', + 'trading', + 'cob_data', + 'orchestrator', + 'data_provider' + ] + + for proc_info in python_processes: + cmdline = proc_info['cmdline'].lower() + + # Check if this is a dashboard-related process + is_dashboard = any(keyword in cmdline for keyword in dashboard_keywords) + + if is_dashboard: + dashboard_processes.append(proc_info) + + return dashboard_processes + +def identify_stale_processes(python_processes): + """Identify potentially stale processes""" + stale_processes = [] + current_time = time.time() + + for proc_info in python_processes: + try: + proc = proc_info['process'] + + # Check if process is in a problematic state + if proc_info['status'] in ['zombie', 'stopped']: + stale_processes.append({ + **proc_info, + 'reason': f"Process status: {proc_info['status']}" + }) + continue + + # Check if process has been running for a very long time without activity + age_hours = (current_time - proc_info['create_time']) / 3600 + if age_hours > 24: # Running for more than 24 hours + try: + # Check CPU usage + cpu_percent = proc.cpu_percent(interval=1) + if cpu_percent < 0.1: # Very low CPU usage + stale_processes.append({ + **proc_info, + 'reason': f"Old process ({age_hours:.1f}h) with low CPU usage ({cpu_percent:.1f}%)" + }) + except: + pass + + # Check for processes with high memory usage but no activity + try: + memory_info = proc.memory_info() + memory_mb = memory_info.rss / 1024 / 1024 + + if memory_mb > 500: # More than 500MB + cpu_percent = proc.cpu_percent(interval=1) + if cpu_percent < 0.1: + stale_processes.append({ + **proc_info, + 'reason': f"High memory usage ({memory_mb:.1f}MB) with low CPU usage ({cpu_percent:.1f}%)" + }) + except: + pass + + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + return stale_processes + +def kill_process_safely(proc_info, force=False): + """Kill a process safely""" + try: + proc = proc_info['process'] + pid = proc_info['pid'] + + print(f"Attempting to {'force kill' if force else 'terminate'} PID {pid}: {proc_info['name']}") + + if force: + # Force kill + if os.name == 'nt': # Windows + os.system(f"taskkill /F /PID {pid}") + else: # Unix/Linux + os.kill(pid, signal.SIGKILL) + else: + # Graceful termination + proc.terminate() + + # Wait for termination + try: + proc.wait(timeout=5) + print(f"✅ Process {pid} terminated gracefully") + return True + except psutil.TimeoutExpired: + print(f"⚠️ Process {pid} didn't terminate gracefully, will force kill") + return False + + print(f"✅ Process {pid} killed") + return True + + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + print(f"⚠️ Could not kill process {proc_info['pid']}: {e}") + return False + except Exception as e: + print(f"❌ Error killing process {proc_info['pid']}: {e}") + return False + +def check_port_usage(): + """Check if dashboard port is in use""" + try: + import socket + + # Check if port 8050 is in use + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('localhost', 8050)) + sock.close() + + if result == 0: + print("⚠️ Port 8050 is in use") + + # Find process using the port + for conn in psutil.net_connections(): + if conn.laddr.port == 8050: + try: + proc = psutil.Process(conn.pid) + print(f" Port 8050 used by PID {conn.pid}: {proc.name()}") + return conn.pid + except: + pass + else: + print("✅ Port 8050 is available") + return None + + except Exception as e: + print(f"Error checking port usage: {e}") + return None + +def main(): + """Main function""" + print("🔍 Stale Process Killer") + print("=" * 50) + + try: + # Step 1: Find all Python processes + print("🔍 Finding Python processes...") + python_processes = find_python_processes() + print(f"Found {len(python_processes)} Python processes") + + # Step 2: Identify dashboard processes + print("\n🎯 Identifying dashboard processes...") + dashboard_processes = identify_dashboard_processes(python_processes) + + if dashboard_processes: + print(f"Found {len(dashboard_processes)} dashboard-related processes:") + for proc in dashboard_processes: + age_hours = (time.time() - proc['create_time']) / 3600 + print(f" PID {proc['pid']}: {proc['name']} (age: {age_hours:.1f}h, status: {proc['status']})") + print(f" Command: {proc['cmdline'][:100]}...") + else: + print("No dashboard processes found") + + # Step 3: Check port usage + print("\n🌐 Checking port usage...") + port_pid = check_port_usage() + + # Step 4: Identify stale processes + print("\n🕵️ Identifying stale processes...") + stale_processes = identify_stale_processes(python_processes) + + if stale_processes: + print(f"Found {len(stale_processes)} potentially stale processes:") + for proc in stale_processes: + print(f" PID {proc['pid']}: {proc['name']} - {proc['reason']}") + else: + print("No stale processes identified") + + # Step 5: Ask user what to do + if dashboard_processes or stale_processes or port_pid: + print("\n🤔 What would you like to do?") + print("1. Kill all dashboard processes") + print("2. Kill only stale processes") + print("3. Kill process using port 8050") + print("4. Kill all identified processes") + print("5. Show process details and exit") + print("6. Exit without killing anything") + + try: + choice = input("\nEnter your choice (1-6): ").strip() + + if choice == '1': + # Kill dashboard processes + print("\n🔫 Killing dashboard processes...") + for proc in dashboard_processes: + if not kill_process_safely(proc): + kill_process_safely(proc, force=True) + + elif choice == '2': + # Kill stale processes + print("\n🔫 Killing stale processes...") + for proc in stale_processes: + if not kill_process_safely(proc): + kill_process_safely(proc, force=True) + + elif choice == '3': + # Kill process using port 8050 + if port_pid: + print(f"\n🔫 Killing process using port 8050 (PID {port_pid})...") + try: + proc = psutil.Process(port_pid) + proc_info = { + 'pid': port_pid, + 'name': proc.name(), + 'process': proc + } + if not kill_process_safely(proc_info): + kill_process_safely(proc_info, force=True) + except: + print(f"❌ Could not kill process {port_pid}") + else: + print("No process found using port 8050") + + elif choice == '4': + # Kill all identified processes + print("\n🔫 Killing all identified processes...") + all_processes = dashboard_processes + stale_processes + if port_pid: + try: + proc = psutil.Process(port_pid) + all_processes.append({ + 'pid': port_pid, + 'name': proc.name(), + 'process': proc + }) + except: + pass + + for proc in all_processes: + if not kill_process_safely(proc): + kill_process_safely(proc, force=True) + + elif choice == '5': + # Show details + print("\n📋 Process Details:") + all_processes = dashboard_processes + stale_processes + for proc in all_processes: + print(f"\nPID {proc['pid']}: {proc['name']}") + print(f" Status: {proc['status']}") + print(f" Command: {proc['cmdline']}") + print(f" Created: {datetime.fromtimestamp(proc['create_time'])}") + + elif choice == '6': + print("👋 Exiting without killing processes") + + else: + print("❌ Invalid choice") + + except KeyboardInterrupt: + print("\n👋 Cancelled by user") + else: + print("\n✅ No problematic processes found") + + print("\n" + "=" * 50) + print("💡 After killing processes, you can try:") + print(" python run_lightweight_dashboard.py") + print(" or") + print(" python fix_startup_freeze.py") + + return True + + except Exception as e: + print(f"❌ Error in main function: {e}") + return False + +if __name__ == "__main__": + success = main() + if not success: + sys.exit(1) \ No newline at end of file diff --git a/run_clean_dashboard.py b/run_clean_dashboard.py index 4790260..4a5bea0 100644 --- a/run_clean_dashboard.py +++ b/run_clean_dashboard.py @@ -9,6 +9,10 @@ import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' os.environ['OMP_NUM_THREADS'] = '4' +# Fix matplotlib backend issue - set non-interactive backend before any imports +import matplotlib +matplotlib.use('Agg') # Use non-interactive Agg backend + import asyncio import logging import sys diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 48ebf09..ef300a5 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -18,6 +18,15 @@ This ensures consistent data across all models and components. Uses layout and component managers to reduce file size and improve maintainability """ +# Force matplotlib to use non-interactive backend before any imports +import os +os.environ['MPLBACKEND'] = 'Agg' + +# Set matplotlib configuration +import matplotlib +matplotlib.use('Agg') # Use non-interactive Agg backend +matplotlib.interactive(False) # Disable interactive mode + import dash from dash import Dash, dcc, html, Input, Output, State import plotly.graph_objects as go @@ -33,6 +42,7 @@ import threading from typing import Dict, List, Optional, Any, Union import os import asyncio +import sys # Import sys for global exception handler import dash_bootstrap_components as dbc from dash.exceptions import PreventUpdate from collections import deque @@ -236,7 +246,7 @@ class CleanTradingDashboard: logger.debug("Clean Trading Dashboard initialized with HIGH-FREQUENCY COB integration and signal generation") logger.info("🌙 Overnight Training Coordinator ready - call start_overnight_training() to begin") - + def start_overnight_training(self): """Start the overnight training session""" try: @@ -411,6 +421,19 @@ class CleanTradingDashboard: logger.error(f"Error getting model status: {e}") return {'loaded_models': {}, 'total_models': 0, 'system_status': 'ERROR'} + def _safe_strftime(self, timestamp_val, format_str='%H:%M:%S'): + """Safely format timestamp, handling both string and datetime objects""" + try: + if isinstance(timestamp_val, str): + return timestamp_val + elif hasattr(timestamp_val, 'strftime'): + return timestamp_val.strftime(format_str) + else: + return datetime.now().strftime(format_str) + except Exception as e: + logger.debug(f"Error formatting timestamp {timestamp_val}: {e}") + return datetime.now().strftime(format_str) + def _get_initial_balance(self) -> float: """Get initial balance from trading executor or default""" try: @@ -616,7 +639,18 @@ class CleanTradingDashboard: # Only show signals that are significantly different or from different time periods signal_key = f"{action}_{int(price)}_{int(confidence*100)}" - time_key = int(timestamp.timestamp() // 30) # Group by 30-second intervals + # Handle timestamp safely - could be string or datetime + if isinstance(timestamp, str): + try: + # Try to parse string timestamp + timestamp_dt = datetime.strptime(timestamp, '%H:%M:%S') + time_key = int(timestamp_dt.timestamp() // 30) + except: + time_key = int(datetime.now().timestamp() // 30) + elif hasattr(timestamp, 'timestamp'): + time_key = int(timestamp.timestamp() // 30) + else: + time_key = int(datetime.now().timestamp() // 30) full_key = f"{signal_key}_{time_key}" if full_key not in seen_signals: @@ -709,6 +743,9 @@ class CleanTradingDashboard: btc_data_time = self.cob_last_update.get('BTC/USDT', 0) if hasattr(self, 'cob_last_update') else 0 import time current_time = time.time() + # Ensure data times are not None + eth_data_time = eth_data_time or 0 + btc_data_time = btc_data_time or 0 logger.info(f"COB Data Age: ETH: {current_time - eth_data_time:.1f}s, BTC: {current_time - btc_data_time:.1f}s") eth_imbalance_stats = self._calculate_cumulative_imbalance('ETH/USDT') @@ -717,6 +754,20 @@ class CleanTradingDashboard: # Determine COB data source mode cob_mode = self._get_cob_mode() + # Debug: Log snapshot types only when needed (every 1000 intervals) + if n % 1000 == 0: + logger.debug(f"DEBUG: ETH snapshot type: {type(eth_snapshot)}, BTC snapshot type: {type(btc_snapshot)}") + if isinstance(eth_snapshot, list): + logger.debug(f"ETH snapshot is a list with {len(eth_snapshot)} items: {eth_snapshot[:2] if eth_snapshot else 'empty'}") + if isinstance(btc_snapshot, list): + logger.error(f"BTC snapshot is a list with {len(btc_snapshot)} items: {btc_snapshot[:2] if btc_snapshot else 'empty'}") + + # If we get a list, don't pass it to the formatter - create a proper object or return None + if isinstance(eth_snapshot, list): + eth_snapshot = None + if isinstance(btc_snapshot, list): + btc_snapshot = None + eth_components = self.component_manager.format_cob_data(eth_snapshot, 'ETH/USDT', eth_imbalance_stats, cob_mode) btc_components = self.component_manager.format_cob_data(btc_snapshot, 'BTC/USDT', btc_imbalance_stats, cob_mode) @@ -2252,16 +2303,73 @@ class CleanTradingDashboard: return {'error': str(e), 'cob_status': 'Error Getting Status', 'orchestrator_type': 'Unknown'} def _get_cob_snapshot(self, symbol: str) -> Optional[Any]: - """Get COB snapshot for symbol - PERFORMANCE OPTIMIZED: Use orchestrator's COB integration""" + """Get COB snapshot for symbol - CENTRALIZED: Use data provider's COB data""" try: - # PERFORMANCE FIX: Use orchestrator's COB integration instead of separate dashboard integration - # This eliminates redundant COB providers and improves performance + # Priority 1: Use data provider's centralized COB data (primary source) + if self.data_provider: + try: + cob_data = self.data_provider.get_latest_cob_data(symbol) + logger.debug(f"COB data type for {symbol}: {type(cob_data)}, data: {cob_data}") + + if cob_data and isinstance(cob_data, dict) and 'stats' in cob_data: + logger.debug(f"COB snapshot available for {symbol} from centralized data provider") + + # Create a snapshot object from the data provider's data + class COBSnapshot: + def __init__(self, data): + # Convert list format [[price, qty], ...] to dictionary format + raw_bids = data.get('bids', []) + raw_asks = data.get('asks', []) + + # Convert to dictionary format expected by component manager + self.consolidated_bids = [] + for bid in raw_bids: + if isinstance(bid, list) and len(bid) >= 2: + self.consolidated_bids.append({ + 'price': bid[0], + 'size': bid[1], + 'total_size': bid[1], + 'total_volume_usd': bid[0] * bid[1] + }) + + self.consolidated_asks = [] + for ask in raw_asks: + if isinstance(ask, list) and len(ask) >= 2: + self.consolidated_asks.append({ + 'price': ask[0], + 'size': ask[1], + 'total_size': ask[1], + 'total_volume_usd': ask[0] * ask[1] + }) + + self.stats = data.get('stats', {}) + # Add direct attributes for new format compatibility + self.volume_weighted_mid = self.stats.get('mid_price', 0) + self.spread_bps = self.stats.get('spread_bps', 0) + self.liquidity_imbalance = self.stats.get('imbalance', 0) + self.total_bid_liquidity = self.stats.get('bid_liquidity', 0) + self.total_ask_liquidity = self.stats.get('ask_liquidity', 0) + self.exchanges_active = ['Binance'] # Default for now + + return COBSnapshot(cob_data) + else: + logger.warning(f"Invalid COB data for {symbol}: type={type(cob_data)}, has_stats={'stats' in cob_data if isinstance(cob_data, dict) else False}") + except Exception as e: + logger.error(f"Error getting COB data from data provider: {e}") + + # Priority 2: Use orchestrator's COB integration (secondary source) if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration: - # First try to get snapshot from orchestrator's COB integration + # Try to get snapshot from orchestrator's COB integration snapshot = self.orchestrator.cob_integration.get_cob_snapshot(symbol) if snapshot: - logger.debug(f"COB snapshot available for {symbol} from orchestrator COB integration") - return snapshot + logger.debug(f"COB snapshot available for {symbol} from orchestrator COB integration, type: {type(snapshot)}") + + # Check if it's a list (which would cause the error) + if isinstance(snapshot, list): + logger.warning(f"Orchestrator returned list instead of COB snapshot for {symbol}") + # Don't return the list, continue to other sources + else: + return snapshot # If no snapshot, try to get from orchestrator's cached data if hasattr(self.orchestrator, 'latest_cob_data') and symbol in self.orchestrator.latest_cob_data: @@ -2277,7 +2385,7 @@ class CleanTradingDashboard: return COBSnapshot(cob_data) - # Fallback: Use cached COB data if orchestrator integration not available + # Priority 3: Use dashboard's cached COB data (last resort fallback) if symbol in self.latest_cob_data and self.latest_cob_data[symbol]: cob_data = self.latest_cob_data[symbol] logger.debug(f"COB snapshot available for {symbol} from dashboard cached data (fallback)") @@ -2298,7 +2406,7 @@ class CleanTradingDashboard: return COBSnapshot(cob_data) - logger.debug(f"No COB snapshot available for {symbol} - no orchestrator integration or cached data") + logger.debug(f"No COB snapshot available for {symbol} - no data provider, orchestrator integration, or cached data") return None except Exception as e: @@ -2458,7 +2566,13 @@ class CleanTradingDashboard: if dqn_latest: last_action = dqn_latest.get('action', 'NONE') last_confidence = dqn_latest.get('confidence', 0.72) - last_timestamp = dqn_latest.get('timestamp', datetime.now()).strftime('%H:%M:%S') + timestamp_val = dqn_latest.get('timestamp', datetime.now()) + if isinstance(timestamp_val, str): + last_timestamp = timestamp_val + elif hasattr(timestamp_val, 'strftime'): + last_timestamp = timestamp_val.strftime('%H:%M:%S') + else: + last_timestamp = datetime.now().strftime('%H:%M:%S') else: if signal_generation_active and len(self.recent_decisions) > 0: recent_signal = self.recent_decisions[-1] @@ -2531,7 +2645,13 @@ class CleanTradingDashboard: if cnn_latest: cnn_action = cnn_latest.get('action', 'PATTERN_ANALYSIS') cnn_confidence = cnn_latest.get('confidence', 0.68) - cnn_timestamp = cnn_latest.get('timestamp', datetime.now()).strftime('%H:%M:%S') + timestamp_val = cnn_latest.get('timestamp', datetime.now()) + if isinstance(timestamp_val, str): + cnn_timestamp = timestamp_val + elif hasattr(timestamp_val, 'strftime'): + cnn_timestamp = timestamp_val.strftime('%H:%M:%S') + else: + cnn_timestamp = datetime.now().strftime('%H:%M:%S') cnn_predicted_price = cnn_latest.get('predicted_price', 0) else: cnn_action = 'PATTERN_ANALYSIS' @@ -2594,7 +2714,13 @@ class CleanTradingDashboard: if transformer_latest: transformer_action = transformer_latest.get('action', 'PRICE_PREDICTION') transformer_confidence = transformer_latest.get('confidence', 0.75) - transformer_timestamp = transformer_latest.get('timestamp', datetime.now()).strftime('%H:%M:%S') + timestamp_val = transformer_latest.get('timestamp', datetime.now()) + if isinstance(timestamp_val, str): + transformer_timestamp = timestamp_val + elif hasattr(timestamp_val, 'strftime'): + transformer_timestamp = timestamp_val.strftime('%H:%M:%S') + else: + transformer_timestamp = datetime.now().strftime('%H:%M:%S') transformer_predicted_price = transformer_latest.get('predicted_price', 0) transformer_price_change = transformer_latest.get('price_change', 0) else: @@ -5159,11 +5285,11 @@ class CleanTradingDashboard: self.position_sync_enabled = False def _initialize_cob_integration(self): - """Initialize COB integration using orchestrator's COB system""" + """Initialize COB integration using centralized data provider""" try: - logger.info("Initializing COB integration via orchestrator") + logger.info("Initializing COB integration via centralized data provider") - # Initialize COB data storage (for fallback) + # Initialize COB data storage (for dashboard display) self.cob_data_history = { 'ETH/USDT': [], 'BTC/USDT': [] @@ -5181,9 +5307,15 @@ class CleanTradingDashboard: 'BTC/USDT': None } - # Check if orchestrator has COB integration + # Primary approach: Use the data provider's centralized COB collection + if self.data_provider: + logger.info("Using centralized data provider for COB data collection") + self._start_simple_cob_collection() # This now uses the data provider + + # Secondary approach: If orchestrator has COB integration, use that as well + # This ensures we have multiple data sources for redundancy if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration: - logger.info("Using orchestrator's COB integration") + logger.info("Also using orchestrator's COB integration as secondary source") # Start orchestrator's COB integration in background def start_orchestrator_cob(): @@ -5199,137 +5331,129 @@ class CleanTradingDashboard: cob_thread = threading.Thread(target=start_orchestrator_cob, daemon=True) cob_thread.start() - logger.info("Orchestrator COB integration started successfully") - - else: - logger.warning("Orchestrator COB integration not available, using fallback simple collection") - # Fallback to simple collection - self._start_simple_cob_collection() - - # ALWAYS start simple collection as backup even if orchestrator COB exists - # This ensures we have data flowing while orchestrator COB integration starts up - logger.info("Starting simple COB collection as backup/fallback") - self._start_simple_cob_collection() + logger.info("Orchestrator COB integration started as secondary source") except Exception as e: logger.error(f"Error initializing COB integration: {e}") - # Fallback to simple collection - self._start_simple_cob_collection() + # Last resort fallback + if self.data_provider: + logger.warning("Falling back to direct data provider COB collection") + self._start_simple_cob_collection() def _start_simple_cob_collection(self): - """Start simple COB data collection using REST APIs (no async required)""" + """Start COB data collection using the centralized data provider""" try: - import threading - import time - - def cob_collector(): - """Collect COB data using simple REST API calls""" - while True: + # Use the data provider's COB collection instead of implementing our own + if self.data_provider: + # Start the centralized COB data collection in the data provider + self.data_provider.start_cob_collection() + + # Subscribe to COB updates from the data provider + def cob_update_callback(symbol, cob_snapshot): + """Callback for COB data updates from data provider""" try: - # Collect data for both symbols - for symbol in ['ETH/USDT', 'BTC/USDT']: - self._collect_simple_cob_data(symbol) + # Store the latest COB data + if not hasattr(self, 'latest_cob_data'): + self.latest_cob_data = {} - # Sleep for 1 second between collections - time.sleep(1) + self.latest_cob_data[symbol] = cob_snapshot + + # Update current price from COB data + if 'stats' in cob_snapshot and 'mid_price' in cob_snapshot['stats']: + self.current_prices[symbol] = cob_snapshot['stats']['mid_price'] except Exception as e: - logger.debug(f"Error in COB collection: {e}") - time.sleep(5) # Wait longer on error - - # Start collector in background thread - cob_thread = threading.Thread(target=cob_collector, daemon=True) - cob_thread.start() - - logger.info("Simple COB data collection started") + logger.debug(f"Error in COB update callback: {e}") + + # Register for COB updates + self.data_provider.subscribe_to_cob(cob_update_callback) + + logger.info("Centralized COB data collection started via data provider") + else: + logger.error("Cannot start COB collection - data provider not available") except Exception as e: logger.error(f"Error starting COB collection: {e}") def _collect_simple_cob_data(self, symbol: str): - """Collect simple COB data using Binance REST API""" + """Get COB data from the centralized data provider""" try: - import requests - import time - - # Use Binance REST API for order book data - binance_symbol = symbol.replace('/', '') - url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500" - - response = requests.get(url, timeout=5) - if response.status_code == 200: - data = response.json() + # Use the data provider to get COB data + if self.data_provider: + # Get the COB data from the data provider + cob_snapshot = self.data_provider.collect_cob_data(symbol) - # Process order book data - bids = [] - asks = [] - - # Process bids (buy orders) - for bid in data['bids'][:100]: # Top 100 levels - price = float(bid[0]) - size = float(bid[1]) - bids.append({ - 'price': price, - 'size': size, - 'total': price * size - }) - - # Process asks (sell orders) - for ask in data['asks'][:100]: # Top 100 levels - price = float(ask[0]) - size = float(ask[1]) - asks.append({ - 'price': price, - 'size': size, - 'total': price * size - }) - - # Calculate statistics - if bids and asks: - best_bid = max(bids, key=lambda x: x['price']) - best_ask = min(asks, key=lambda x: x['price']) - mid_price = (best_bid['price'] + best_ask['price']) / 2 - spread_bps = ((best_ask['price'] - best_bid['price']) / mid_price) * 10000 if mid_price > 0 else 0 + if cob_snapshot and 'stats' in cob_snapshot: + # Process the COB data for dashboard display - total_bid_liquidity = sum(bid['total'] for bid in bids[:20]) - total_ask_liquidity = sum(ask['total'] for ask in asks[:20]) - total_liquidity = total_bid_liquidity + total_ask_liquidity - imbalance = (total_bid_liquidity - total_ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 + # Format the data for our dashboard + bids = [] + asks = [] - # Create COB snapshot - cob_snapshot = { + # Process bids + for bid_price, bid_size in cob_snapshot.get('bids', [])[:100]: + bids.append({ + 'price': bid_price, + 'size': bid_size, + 'total': bid_price * bid_size + }) + + # Process asks + for ask_price, ask_size in cob_snapshot.get('asks', [])[:100]: + asks.append({ + 'price': ask_price, + 'size': ask_size, + 'total': ask_price * ask_size + }) + + # Create dashboard-friendly COB snapshot + dashboard_cob_snapshot = { 'symbol': symbol, - 'timestamp': time.time(), + 'timestamp': cob_snapshot.get('timestamp', time.time()), 'bids': bids, 'asks': asks, 'stats': { - 'mid_price': mid_price, - 'spread_bps': spread_bps, - 'total_bid_liquidity': total_bid_liquidity, - 'total_ask_liquidity': total_ask_liquidity, - 'imbalance': imbalance, + 'mid_price': cob_snapshot['stats'].get('mid_price', 0), + 'spread_bps': cob_snapshot['stats'].get('spread_bps', 0), + 'total_bid_liquidity': cob_snapshot['stats'].get('bid_liquidity', 0), + 'total_ask_liquidity': cob_snapshot['stats'].get('ask_liquidity', 0), + 'imbalance': cob_snapshot['stats'].get('imbalance', 0), 'exchanges_active': ['Binance'] } } + # Initialize history if needed + if not hasattr(self, 'cob_data_history'): + self.cob_data_history = {} + + if symbol not in self.cob_data_history: + self.cob_data_history[symbol] = [] + # Store in history (keep last 15 seconds) - self.cob_data_history[symbol].append(cob_snapshot) + self.cob_data_history[symbol].append(dashboard_cob_snapshot) if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds self.cob_data_history[symbol] = self.cob_data_history[symbol][-15:] + # Initialize latest data if needed + if not hasattr(self, 'latest_cob_data'): + self.latest_cob_data = {} + + if not hasattr(self, 'cob_last_update'): + self.cob_last_update = {} + # Update latest data - self.latest_cob_data[symbol] = cob_snapshot + self.latest_cob_data[symbol] = dashboard_cob_snapshot self.cob_last_update[symbol] = time.time() # Generate bucketed data for models - self._generate_bucketed_cob_data(symbol, cob_snapshot) + self._generate_bucketed_cob_data(symbol, dashboard_cob_snapshot) # Generate COB signals based on imbalance - self._generate_cob_signal(symbol, cob_snapshot) + self._generate_cob_signal(symbol, dashboard_cob_snapshot) - logger.debug(f"COB data collected for {symbol}: {len(bids)} bids, {len(asks)} asks") - + logger.debug(f"COB data retrieved from data provider for {symbol}: {len(bids)} bids, {len(asks)} asks") + except Exception as e: - logger.debug(f"Error collecting COB data for {symbol}: {e}") + logger.debug(f"Error getting COB data for {symbol}: {e}") def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict): """Generate bucketed COB data for model feeding""" diff --git a/web/component_manager.py b/web/component_manager.py index f32d822..1b530be 100644 --- a/web/component_manager.py +++ b/web/component_manager.py @@ -296,6 +296,27 @@ class DashboardComponentManager: html.P(f"Mode: {cob_mode}", className="text-muted small") ]) + # Defensive: If cob_snapshot is a list, log and return error + if isinstance(cob_snapshot, list): + logger.error(f"COB snapshot for {symbol} is a list, expected object. Data: {cob_snapshot}") + return html.Div([ + html.H6(f"{symbol} COB", className="mb-2"), + html.P("Invalid COB data format (list)", className="text-danger small"), + html.P(f"Mode: {cob_mode}", className="text-muted small") + ]) + + # Debug: Log the type and structure of cob_snapshot + logger.debug(f"COB snapshot type for {symbol}: {type(cob_snapshot)}") + + # Handle case where cob_snapshot is a list (error case) + if isinstance(cob_snapshot, list): + logger.error(f"COB snapshot is a list for {symbol}, expected object or dict") + return html.Div([ + html.H6(f"{symbol} COB", className="mb-2"), + html.P("Invalid COB data format (list)", className="text-danger small"), + html.P(f"Mode: {cob_mode}", className="text-muted small") + ]) + # Handle both old format (with stats attribute) and new format (direct attributes) if hasattr(cob_snapshot, 'stats'): # Old format with stats attribute @@ -385,6 +406,18 @@ class DashboardComponentManager: html.Span(imbalance_text, className=f"fw-bold small {imbalance_color}") ]), + # Multi-timeframe imbalance metrics + html.Div([ + html.Strong("Timeframe Imbalances:", className="small d-block mt-2 mb-1") + ]), + + html.Div([ + self._create_timeframe_imbalance("1s", stats.get('imbalance_1s', imbalance)), + self._create_timeframe_imbalance("5s", stats.get('imbalance_5s', imbalance)), + self._create_timeframe_imbalance("15s", stats.get('imbalance_15s', imbalance)), + self._create_timeframe_imbalance("60s", stats.get('imbalance_60s', imbalance)), + ], className="d-flex justify-content-between mb-2"), + html.Div(imbalance_stats_display), html.Hr(className="my-2"), @@ -417,6 +450,22 @@ class DashboardComponentManager: html.Div(title, className="small text-muted"), html.Div(value, className="fw-bold") ], className="text-center") + + def _create_timeframe_imbalance(self, timeframe, value): + """Helper for creating timeframe imbalance indicators.""" + color = "text-success" if value > 0 else "text-danger" if value < 0 else "text-muted" + icon = "fas fa-chevron-up" if value > 0 else "fas fa-chevron-down" if value < 0 else "fas fa-minus" + + # Format the value with sign and 2 decimal places + formatted_value = f"{value:+.2f}" + + return html.Div([ + html.Div(timeframe, className="small text-muted"), + html.Div([ + html.I(className=f"{icon} me-1"), + html.Span(formatted_value, className="small") + ], className=color) + ], className="text-center") def _create_cob_ladder_panel(self, bids, asks, mid_price, symbol=""): """Creates the right panel with the compact COB ladder.""" diff --git a/web/layout_manager.py b/web/layout_manager.py index f539929..172cb2a 100644 --- a/web/layout_manager.py +++ b/web/layout_manager.py @@ -42,7 +42,7 @@ class DashboardLayoutManager: """Create the auto-refresh interval component""" return dcc.Interval( id='interval-component', - interval=1000, # Update every 1 second for maximum responsiveness + interval=250, # Update every 250 ms (4 Hz) n_intervals=0 )