""" Multi-Timeframe, Multi-Symbol Data Provider This module consolidates all data functionality including: - Historical data fetching from Binance API - Real-time data streaming via WebSocket - Multi-timeframe candle generation - Caching and data management - Technical indicators calculation - Williams Market Structure pivot points with monthly data analysis - Pivot-based feature normalization for improved model training - Centralized data distribution to multiple subscribers (AI models, dashboard, etc.) """ import asyncio import json import logging import os import time import uuid import websockets import requests import pandas as pd import numpy as np import pickle from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field import ta from threading import Thread, Lock from collections import deque import math from .config import get_config from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar from .cnn_monitor import log_cnn_prediction from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket from .cob_integration import COBIntegration logger = logging.getLogger(__name__) @dataclass class PivotBounds: """Pivot-based normalization bounds derived from Williams Market Structure""" symbol: str price_max: float price_min: float volume_max: float volume_min: float pivot_support_levels: List[float] pivot_resistance_levels: List[float] pivot_context: Dict[str, Any] created_timestamp: datetime data_period_start: datetime data_period_end: datetime total_candles_analyzed: int def get_price_range(self) -> float: """Get price range for normalization""" return self.price_max - self.price_min def normalize_price(self, price: float) -> float: """Normalize price using pivot bounds""" return (price - self.price_min) / self.get_price_range() def get_nearest_support_distance(self, current_price: float) -> float: """Get distance to nearest support level (normalized)""" if not self.pivot_support_levels: return 0.5 distances = [abs(current_price - s) for s in self.pivot_support_levels] return min(distances) / self.get_price_range() def get_nearest_resistance_distance(self, current_price: float) -> float: """Get distance to nearest resistance level (normalized)""" if not self.pivot_resistance_levels: return 0.5 distances = [abs(current_price - r) for r in self.pivot_resistance_levels] return min(distances) / self.get_price_range() @dataclass class MarketTick: """Standardized market tick data structure""" symbol: str timestamp: datetime price: float volume: float quantity: float side: str # 'buy' or 'sell' trade_id: str is_buyer_maker: bool raw_data: Dict[str, Any] = field(default_factory=dict) @dataclass class DataSubscriber: """Data subscriber information""" subscriber_id: str callback: Callable[[MarketTick], None] symbols: List[str] active: bool = True last_update: datetime = field(default_factory=datetime.now) tick_count: int = 0 subscriber_name: str = "unknown" class DataProvider: """Unified data provider for historical and real-time market data with centralized distribution""" def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): """Initialize the data provider""" self.config = get_config() self.symbols = symbols or self.config.symbols self.timeframes = timeframes or self.config.timeframes # Cache settings (initialize first) self.cache_enabled = self.config.data.get('cache_enabled', True) self.cache_dir = Path(self.config.data.get('cache_dir', 'cache')) self.cache_dir.mkdir(parents=True, exist_ok=True) # Data storage self.historical_data = {} # {symbol: {timeframe: DataFrame}} self.real_time_data = {} # {symbol: {timeframe: deque}} self.current_prices = {} # {symbol: float} # Pivot-based normalization system self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds} self.pivot_cache_dir = self.cache_dir / 'pivot_bounds' self.pivot_cache_dir.mkdir(parents=True, exist_ok=True) self.pivot_refresh_interval = timedelta(days=1) # Refresh pivot bounds daily self.monthly_data_cache_dir = self.cache_dir / 'monthly_1s_data' self.monthly_data_cache_dir.mkdir(parents=True, exist_ok=True) # Enhanced WebSocket integration self.enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None self.websocket_tasks = {} self.is_streaming = False self.data_lock = Lock() # COB data from enhanced WebSocket self.cob_websocket_data: Dict[str, Dict] = {} # Latest COB data from WebSocket self.cob_websocket_status: Dict[str, str] = {} # WebSocket status per symbol # Subscriber management for centralized data distribution self.subscribers: Dict[str, DataSubscriber] = {} self.subscriber_lock = Lock() self.tick_buffers: Dict[str, deque] = {} self.buffer_size = 1000 # Keep last 1000 ticks per symbol # Initialize tick buffers for symbol in self.symbols: binance_symbol = symbol.replace('/', '').upper() self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size) # BOM (Book of Market) data caching - 1s resolution for last 5 minutes self.bom_cache_duration = 300 # 5 minutes in seconds self.bom_feature_count = 120 # Number of BOM features per timestamp self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)} # Initialize BOM cache for each symbol for symbol in self.symbols: # Store 300 seconds worth of 1s BOM data self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration) # Initialize tick aggregator for raw tick processing binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols] self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols) # Raw tick and OHLCV bar callbacks self.raw_tick_callbacks = [] self.ohlcv_bar_callbacks = [] # Performance tracking for subscribers self.distribution_stats = { 'total_ticks_received': 0, 'total_ticks_distributed': 0, 'distribution_errors': 0, 'last_tick_time': {}, 'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols}, 'raw_ticks_processed': 0, 'ohlcv_bars_created': 0, 'patterns_detected': 0 } # Data validation self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols} self.price_change_threshold = 0.1 # 10% price change threshold for validation # Timeframe conversion self.timeframe_seconds = { '1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } # Williams Market Structure integration self.williams_structure: Dict[str, WilliamsMarketStructure] = {} for symbol in self.symbols: self.williams_structure[symbol] = WilliamsMarketStructure(min_pivot_distance=3) # Pivot point caching self.pivot_points_cache: Dict[str, Dict[int, TrendLevel]] = {} # {symbol: {level: TrendLevel}} self.last_pivot_calculation: Dict[str, datetime] = {} self.pivot_calculation_interval = timedelta(minutes=5) # Recalculate every 5 minutes # Auto-fix corrupted cache files on startup self._auto_fix_corrupted_cache() # Load existing pivot bounds from cache self._load_all_pivot_bounds() # Centralized data collection for models and dashboard self.cob_integration = COBIntegration(data_provider=self, symbols=self.symbols) self.cob_data_cache: Dict[str, deque] = {} # COB data for models self.training_data_cache: Dict[str, deque] = {} # Training data for models self.model_data_subscribers: Dict[str, List[Callable]] = {} # Model-specific data callbacks # Callbacks for data distribution self.cob_data_callbacks: List[Callable] = [] # COB data callbacks self.training_data_callbacks: List[Callable] = [] # Training data callbacks self.model_prediction_callbacks: List[Callable] = [] # Model prediction callbacks # Initialize data caches for symbol in self.symbols: binance_symbol = symbol.replace('/', '').upper() self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer # Pre-built OHLCV cache for instant BaseDataInput building (optimization from SimplifiedDataIntegration) self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}} self._ohlcv_cache_lock = Lock() self._last_cache_update = {} # {symbol: {timeframe: datetime}} self._cache_refresh_interval = 5 # seconds # Data collection threads self.data_collection_active = False # COB data collection self.cob_collection_active = False self.cob_collection_thread = None # Training data collection self.training_data_collection_active = False self.training_data_thread = None # Price-level bucketing self.bucketed_cob_data: Dict[str, Dict] = {} self.bucket_sizes = [1, 10] # $1 and $10 buckets self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes} logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") logger.info("Centralized data distribution enabled") logger.info("Pivot-based normalization system enabled") logger.info("Williams Market Structure integration enabled") logger.info("COB and training data collection enabled") # Rate limiting self.last_request_time = {} self.request_interval = 0.2 # 200ms between requests self.retry_delay = 60 # 1 minute retry delay for 451 errors self.max_retries = 3 # Start COB integration self.start_cob_integration() def start_cob_integration(self): """Starts the COB integration in a background thread.""" cob_thread = Thread(target=self._run_cob_integration, daemon=True) cob_thread.start() def _run_cob_integration(self): """Runs the asyncio event loop for COB integration.""" try: asyncio.run(self.cob_integration.start()) except Exception as e: logger.error(f"Error running COB integration: {e}") def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure dataframe has proper datetime index""" if df is None or df.empty: return df try: # If we already have a proper DatetimeIndex, return as is if isinstance(df.index, pd.DatetimeIndex): return df # If timestamp column exists, use it as index if 'timestamp' in df.columns: df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) return df # If we have a RangeIndex or other non-datetime index, create datetime index if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex): # Use current time and work backwards for realistic timestamps from datetime import datetime, timedelta end_time = datetime.now() start_time = end_time - timedelta(minutes=len(df)) df.index = pd.date_range(start=start_time, end=end_time, periods=len(df)) logger.debug(f"Converted RangeIndex to DatetimeIndex for {len(df)} records") return df except Exception as e: logger.warning(f"Error ensuring datetime index: {e}") return df def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]: """Get historical OHLCV data for a symbol and timeframe""" try: # If refresh=True, always fetch fresh data (skip cache for real-time updates) if not refresh: if self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) >= limit * 0.8: # Ensure proper datetime index for cached data cached_data = self._ensure_datetime_index(cached_data) # logger.info(f"Using cached data for {symbol} {timeframe}") return cached_data.tail(limit) # Check if we need to preload 300s of data for first load should_preload = self._should_preload_data(symbol, timeframe, limit) if should_preload: logger.info(f"Preloading 300s of data for {symbol} {timeframe}") df = self._preload_300s_data(symbol, timeframe) else: # Fetch from API with requested limit (Binance primary, MEXC fallback) logger.info(f"Fetching historical data for {symbol} {timeframe}") df = self._fetch_from_binance(symbol, timeframe, limit) # Fallback to MEXC if Binance fails if df is None or df.empty: logger.info(f"Binance failed, trying MEXC fallback for {symbol}") df = self._fetch_from_mexc(symbol, timeframe, limit) if df is not None and not df.empty: # Ensure proper datetime index df = self._ensure_datetime_index(df) # Add technical indicators. temporarily disabled to save time as it is not working as expected. # df = self._add_technical_indicators(df) # Cache the data if self.cache_enabled: self._save_to_cache(df, symbol, timeframe) # Store in memory if symbol not in self.historical_data: self.historical_data[symbol] = {} self.historical_data[symbol][timeframe] = df # Return requested amount return df.tail(limit) logger.warning(f"No data received for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}") return None def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool: """Determine if we should preload 300s of data""" try: # Check if we have any cached data if self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) > 0: return False # Already have some data # Check if we have data in memory if (symbol in self.historical_data and timeframe in self.historical_data[symbol] and len(self.historical_data[symbol][timeframe]) > 0): return False # Already have data in memory # Calculate if 300s worth of data would be more than requested limit timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) candles_in_300s = 300 // timeframe_seconds # Preload if we need more than the requested limit or if it's a short timeframe if candles_in_300s > limit or timeframe in ['1s', '1m']: return True return False except Exception as e: logger.error(f"Error determining if should preload data: {e}") return False def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Preload 300 seconds worth of data for better initial performance""" try: # Calculate how many candles we need for 300 seconds timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles # For very short timeframes, limit to reasonable amount if timeframe == '1s': candles_needed = min(candles_needed, 300) # Max 300 1s candles elif timeframe == '1m': candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour) else: candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)") # Fetch the data (Binance primary, MEXC fallback) df = self._fetch_from_binance(symbol, timeframe, candles_needed) # Fallback to MEXC if Binance fails if df is None or df.empty: logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}") df = self._fetch_from_mexc(symbol, timeframe, candles_needed) if df is not None and not df.empty: logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}") return df else: logger.warning(f"Failed to preload data for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}") return None def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]: """Preload 300s of data for all symbols and timeframes""" try: if timeframes is None: timeframes = self.timeframes preload_results = {} for symbol in self.symbols: preload_results[symbol] = {} for timeframe in timeframes: try: logger.info(f"Preloading data for {symbol} {timeframe}") # Check if we should preload if self._should_preload_data(symbol, timeframe, 100): df = self._preload_300s_data(symbol, timeframe) if df is not None and not df.empty: # Add technical indicators df = self._add_technical_indicators(df) # Cache the data if self.cache_enabled: self._save_to_cache(df, symbol, timeframe) # Store in memory if symbol not in self.historical_data: self.historical_data[symbol] = {} self.historical_data[symbol][timeframe] = df preload_results[symbol][timeframe] = True logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}") else: preload_results[symbol][timeframe] = False logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}") else: preload_results[symbol][timeframe] = True # Already have data logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)") except Exception as e: logger.error(f"Error preloading {symbol} {timeframe}: {e}") preload_results[symbol][timeframe] = False # Log summary total_pairs = len(self.symbols) * len(timeframes) successful_pairs = sum(1 for symbol_results in preload_results.values() for success in symbol_results.values() if success) logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded") return preload_results except Exception as e: logger.error(f"Error in preload_all_symbols_data: {e}") return {} def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from MEXC API (fallback data source when Binance is unavailable)""" try: # MEXC doesn't support 1s intervals if timeframe == '1s': logger.warning(f"MEXC doesn't support 1s intervals, skipping {symbol}") return None # Convert symbol format mexc_symbol = symbol.replace('/', '').upper() # Convert timeframe for MEXC (excluding 1s) timeframe_map = { '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } mexc_timeframe = timeframe_map.get(timeframe) if mexc_timeframe is None: logger.warning(f"MEXC doesn't support timeframe {timeframe}, skipping {symbol}") return None # MEXC API request url = "https://api.mexc.com/api/v3/klines" params = { 'symbol': mexc_symbol, 'interval': mexc_timeframe, 'limit': limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Convert to DataFrame (MEXC uses 8 columns vs Binance's 12) df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.sort_values('timestamp').reset_index(drop=True) logger.info(f"MEXC: Fetched {len(df)} candles for {symbol} {timeframe}") return df except Exception as e: logger.error(f"MEXC: Error fetching data: {e}") return None def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from Binance API with robust rate limiting and error handling""" try: from .api_rate_limiter import get_rate_limiter # Convert symbol format binance_symbol = symbol.replace('/', '').upper() # Convert timeframe (now includes 1s support) timeframe_map = { '1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } binance_timeframe = timeframe_map.get(timeframe, '1h') # Use rate limiter for API requests rate_limiter = get_rate_limiter() # Check if we can make request can_request, wait_time = rate_limiter.can_make_request('binance_api') if not can_request: logger.debug(f"Binance rate limited, waiting {wait_time:.1f}s for {symbol} {timeframe}") if wait_time > 30: # If wait is too long, use fallback return self._get_fallback_data(symbol, timeframe, limit) time.sleep(min(wait_time, 5)) # Cap wait at 5 seconds # API request with rate limiter url = "https://api.binance.com/api/v3/klines" params = { 'symbol': binance_symbol, 'interval': binance_timeframe, 'limit': limit } response = rate_limiter.make_request('binance_api', url, 'GET', params=params) if response is None: logger.warning(f"Binance API request failed for {symbol} {timeframe} - using fallback") return self._get_fallback_data(symbol, timeframe, limit) if response.status_code != 200: logger.warning(f"Binance API returned {response.status_code} for {symbol} {timeframe}") return self._get_fallback_data(symbol, timeframe, limit) data = response.json() # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.sort_values('timestamp').reset_index(drop=True) logger.info(f"Binance: Fetched {len(df)} candles for {symbol} {timeframe}") return df except Exception as e: if "451" in str(e) or "Client Error" in str(e): logger.warning(f"Binance API access blocked (451) for {symbol} {timeframe} - using fallback") return self._get_fallback_data(symbol, timeframe, limit) else: logger.error(f"Error fetching from Binance API: {e}") return self._get_fallback_data(symbol, timeframe, limit) def _get_fallback_data(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Get fallback data when Binance API is unavailable - REAL DATA ONLY""" try: logger.info(f"FALLBACK: Attempting to get real cached data for {symbol} {timeframe}") # ONLY try cached data cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and not cached_data.empty: # Limit to requested amount limited_data = cached_data.tail(limit) if len(cached_data) > limit else cached_data logger.info(f"FALLBACK: Using cached real data for {symbol} {timeframe}: {len(limited_data)} bars") return limited_data # Try MEXC as secondary real data source mexc_data = self._fetch_from_mexc(symbol, timeframe, limit) if mexc_data is not None and not mexc_data.empty: logger.info(f"FALLBACK: Using MEXC real data for {symbol} {timeframe}: {len(mexc_data)} bars") return mexc_data # NO SYNTHETIC DATA - Return None if no real data available logger.warning(f"FALLBACK: No real data available for {symbol} {timeframe} - waiting for real data") return None except Exception as e: logger.error(f"Error getting fallback data: {e}") return None def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add comprehensive technical indicators AND pivot-based normalization context""" try: df = df.copy() # Ensure we have enough data for indicators if len(df) < 50: logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows") return self._add_basic_indicators(df) # === EXISTING TECHNICAL INDICATORS === # Moving averages (multiple timeframes) df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10) df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50) df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12) df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26) df['ema_50'] = ta.trend.ema_indicator(df['close'], window=50) # MACD family macd = ta.trend.MACD(df['close']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_histogram'] = macd.macd_diff() # ADX (Average Directional Index) adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close']) df['adx'] = adx.adx() df['adx_pos'] = adx.adx_pos() df['adx_neg'] = adx.adx_neg() # Parabolic SAR psar = ta.trend.PSARIndicator(df['high'], df['low'], df['close']) df['psar'] = psar.psar() # === MOMENTUM INDICATORS === # RSI (multiple periods) df['rsi_14'] = ta.momentum.rsi(df['close'], window=14) df['rsi_7'] = ta.momentum.rsi(df['close'], window=7) df['rsi_21'] = ta.momentum.rsi(df['close'], window=21) # Stochastic Oscillator stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close']) df['stoch_k'] = stoch.stoch() df['stoch_d'] = stoch.stoch_signal() # Williams %R df['williams_r'] = ta.momentum.williams_r(df['high'], df['low'], df['close']) # Ultimate Oscillator (instead of CCI which isn't available) df['ultimate_osc'] = ta.momentum.ultimate_oscillator(df['high'], df['low'], df['close']) # === VOLATILITY INDICATORS === # Bollinger Bands bollinger = ta.volatility.BollingerBands(df['close']) df['bb_upper'] = bollinger.bollinger_hband() df['bb_lower'] = bollinger.bollinger_lband() df['bb_middle'] = bollinger.bollinger_mavg() df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle'] df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) # Average True Range df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close']) # Keltner Channels keltner = ta.volatility.KeltnerChannel(df['high'], df['low'], df['close']) df['keltner_upper'] = keltner.keltner_channel_hband() df['keltner_lower'] = keltner.keltner_channel_lband() df['keltner_middle'] = keltner.keltner_channel_mband() # === VOLUME INDICATORS === # Volume moving averages df['volume_sma_10'] = df['volume'].rolling(window=10).mean() df['volume_sma_20'] = df['volume'].rolling(window=20).mean() df['volume_sma_50'] = df['volume'].rolling(window=50).mean() # On Balance Volume df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume']) # Volume Price Trend df['vpt'] = ta.volume.volume_price_trend(df['close'], df['volume']) # Money Flow Index df['mfi'] = ta.volume.money_flow_index(df['high'], df['low'], df['close'], df['volume']) # Accumulation/Distribution Line df['ad_line'] = ta.volume.acc_dist_index(df['high'], df['low'], df['close'], df['volume']) # Volume Weighted Average Price (VWAP) df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum() # === PRICE ACTION INDICATORS === # Price position relative to range df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low']) # True Range (use ATR calculation for true range) df['true_range'] = df['atr'] # ATR is based on true range, so use it directly # Rate of Change df['roc'] = ta.momentum.roc(df['close'], window=10) # === CUSTOM INDICATORS === # Trend strength (combination of multiple trend indicators) df['trend_strength'] = ( (df['close'] > df['sma_20']).astype(int) + (df['sma_10'] > df['sma_20']).astype(int) + (df['macd'] > df['macd_signal']).astype(int) + (df['adx'] > 25).astype(int) ) / 4.0 # Momentum composite df['momentum_composite'] = ( (df['rsi_14'] / 100) + ((df['stoch_k'] + 50) / 100) + # Normalize stoch_k ((df['williams_r'] + 50) / 100) # Normalize williams_r ) / 3.0 # Volatility regime df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True) # === WILLIAMS MARKET STRUCTURE PIVOT CONTEXT === # Check if we need to refresh pivot bounds for this symbol symbol = self._extract_symbol_from_dataframe(df) if symbol and self._should_refresh_pivot_bounds(symbol): logger.info(f"Refreshing pivot bounds for {symbol}") self._refresh_pivot_bounds_for_symbol(symbol) # Add pivot-based context features if symbol and symbol in self.pivot_bounds: df = self._add_pivot_context_features(df, symbol) # === FILL NaN VALUES === # Forward fill first, then backward fill, then zero fill df = df.ffill().bfill().fillna(0) logger.debug(f"Added technical indicators + pivot context for {len(df)} rows") return df except Exception as e: logger.error(f"Error adding comprehensive technical indicators: {e}") # Fallback to basic indicators return self._add_basic_indicators(df) # === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM === def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]: """Collect 30 days of 1m candles with smart gap-filling cache system""" try: # Check for cached data and determine what we need to fetch cached_data = self._load_monthly_data_from_cache(symbol) end_time = datetime.now() start_time = end_time - timedelta(days=30) if cached_data is not None and not cached_data.empty: logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles") # Check cache data range cache_start = cached_data['timestamp'].min() cache_end = cached_data['timestamp'].max() logger.info(f"Cache range: {cache_start} to {cache_end}") # Remove data older than 30 days cached_data = cached_data[cached_data['timestamp'] >= start_time] # Check if we need to fill gaps gap_start = cache_end + timedelta(minutes=1) if gap_start < end_time: # Need to fill gap from cache_end to now logger.info(f"Filling gap from {gap_start} to {end_time}") gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time) if gap_data is not None and not gap_data.empty: # Combine cached data with gap data monthly_df = pd.concat([cached_data, gap_data], ignore_index=True) monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) logger.info(f"Combined cache + gap: {len(monthly_df)} total candles") else: monthly_df = cached_data logger.info(f"Using cached data only: {len(monthly_df)} candles") else: monthly_df = cached_data logger.info(f"Cache is up to date: {len(monthly_df)} candles") else: # No cache - fetch full 30 days logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}") monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time) if monthly_df is not None and not monthly_df.empty: # Final cleanup: ensure exactly 30 days monthly_df = monthly_df[monthly_df['timestamp'] >= start_time] monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True) logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}") # Update cache self._save_monthly_data_to_cache(symbol, monthly_df) return monthly_df else: logger.error(f"No monthly 1m data collected for {symbol}") return None except Exception as e: logger.error(f"Error collecting monthly 1m data for {symbol}: {e}") return None def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]: """Fetch a batch of 1s candles ending at specific time""" try: binance_symbol = symbol.replace('/', '').upper() # Convert end_time to milliseconds end_ms = int(end_time.timestamp() * 1000) # API request url = "https://api.binance.com/api/v3/klines" params = { 'symbol': binance_symbol, 'interval': '1s', 'endTime': end_ms, 'limit': limit } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() if not data: return None # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] return df except Exception as e: logger.error(f"Error fetching 1s batch for {symbol}: {e}") return None def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]: """Fetch 1m candles for a specific time range with efficient batching""" try: # Convert symbol format for Binance API if '/' in symbol: api_symbol = symbol.replace('/', '') else: api_symbol = symbol logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}") all_candles = [] current_start = start_time batch_size = 1000 # Binance limit api_calls_made = 0 while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days try: # Calculate end time for this batch batch_end = min(current_start + timedelta(minutes=batch_size), end_time) # Convert to milliseconds start_timestamp = int(current_start.timestamp() * 1000) end_timestamp = int(batch_end.timestamp() * 1000) # Binance API call url = "https://api.binance.com/api/v3/klines" params = { 'symbol': api_symbol, 'interval': '1m', 'startTime': start_timestamp, 'endTime': end_timestamp, 'limit': batch_size } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() api_calls_made += 1 if not data: logger.warning(f"No data returned for batch {current_start} to {batch_end}") break # Convert to DataFrame batch_df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: batch_df[col] = batch_df[col].astype(float) # Keep only OHLCV columns batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] all_candles.append(batch_df) # Move to next batch (add 1 minute to avoid overlap) current_start = batch_end + timedelta(minutes=1) # Rate limiting (Binance allows 1200/min) time.sleep(0.05) # 50ms delay # Progress logging if api_calls_made % 10 == 0: total_candles = sum(len(df) for df in all_candles) logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected") except Exception as e: logger.error(f"Error in batch {current_start} to {batch_end}: {e}") current_start += timedelta(minutes=batch_size) time.sleep(1) # Wait longer on error continue if not all_candles: logger.error(f"No data collected for {symbol}") return None # Combine all batches df = pd.concat(all_candles, ignore_index=True) df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)") return df except Exception as e: logger.error(f"Error fetching 1m data range for {symbol}: {e}") return None def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]: """Extract pivot bounds using Williams Market Structure analysis""" try: logger.info(f"Analyzing {len(monthly_data)} candles for pivot extraction...") # Convert DataFrame to numpy array format expected by Williams Market Structure ohlcv_array = monthly_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy() # Convert timestamp to numeric for Williams analysis ohlcv_array['timestamp'] = ohlcv_array['timestamp'].astype(np.int64) // 10**9 # Convert to seconds ohlcv_array = ohlcv_array.to_numpy() # Initialize Williams Market Structure analyzer try: from training.williams_market_structure import WilliamsMarketStructure williams = WilliamsMarketStructure( swing_strengths=[2, 3, 5, 8], # Multi-strength pivot detection enable_cnn_feature=False # We just want pivot data, not CNN training ) # Calculate 5 levels of recursive pivot points logger.info("Running Williams Market Structure analysis...") pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array) except ImportError: logger.warning("Williams Market Structure not available, using simplified pivot detection") pivot_levels = self._simple_pivot_detection(monthly_data) # Extract bounds from pivot analysis bounds = self._extract_bounds_from_pivot_levels(symbol, monthly_data, pivot_levels) return bounds except Exception as e: logger.error(f"Error extracting pivot bounds for {symbol}: {e}") return None def _extract_bounds_from_pivot_levels(self, symbol: str, monthly_data: pd.DataFrame, pivot_levels: Dict[str, Any]) -> PivotBounds: """Extract normalization bounds from Williams pivot levels""" try: # Initialize bounds price_max = monthly_data['high'].max() price_min = monthly_data['low'].min() volume_max = monthly_data['volume'].max() volume_min = monthly_data['volume'].min() support_levels = [] resistance_levels = [] # Extract pivot points from all Williams levels for level_key, level_data in pivot_levels.items(): if level_data and hasattr(level_data, 'swing_points') and level_data.swing_points: # Get prices from swing points level_prices = [sp.price for sp in level_data.swing_points] # Update overall price bounds price_max = max(price_max, max(level_prices)) price_min = min(price_min, min(level_prices)) # Extract support and resistance levels if hasattr(level_data, 'support_levels') and level_data.support_levels: support_levels.extend(level_data.support_levels) if hasattr(level_data, 'resistance_levels') and level_data.resistance_levels: resistance_levels.extend(level_data.resistance_levels) # Remove duplicates and sort support_levels = sorted(list(set(support_levels))) resistance_levels = sorted(list(set(resistance_levels))) # Create PivotBounds object bounds = PivotBounds( symbol=symbol, price_max=float(price_max), price_min=float(price_min), volume_max=float(volume_max), volume_min=float(volume_min), pivot_support_levels=support_levels, pivot_resistance_levels=resistance_levels, pivot_context=pivot_levels, created_timestamp=datetime.now(), data_period_start=monthly_data['timestamp'].min(), data_period_end=monthly_data['timestamp'].max(), total_candles_analyzed=len(monthly_data) ) logger.info(f"Extracted pivot bounds for {symbol}:") logger.info(f" Price range: ${bounds.price_min:.2f} - ${bounds.price_max:.2f}") logger.info(f" Volume range: {bounds.volume_min:.2f} - {bounds.volume_max:.2f}") logger.info(f" Support levels: {len(bounds.pivot_support_levels)}") logger.info(f" Resistance levels: {len(bounds.pivot_resistance_levels)}") return bounds except Exception as e: logger.error(f"Error extracting bounds from pivot levels: {e}") # Fallback to simple min/max bounds return PivotBounds( symbol=symbol, price_max=float(monthly_data['high'].max()), price_min=float(monthly_data['low'].min()), volume_max=float(monthly_data['volume'].max()), volume_min=float(monthly_data['volume'].min()), pivot_support_levels=[], pivot_resistance_levels=[], pivot_context={}, created_timestamp=datetime.now(), data_period_start=monthly_data['timestamp'].min(), data_period_end=monthly_data['timestamp'].max(), total_candles_analyzed=len(monthly_data) ) def _simple_pivot_detection(self, monthly_data: pd.DataFrame) -> Dict[str, Any]: """Simple pivot detection fallback when Williams Market Structure is not available""" try: # Simple high/low pivot detection using rolling windows highs = monthly_data['high'] lows = monthly_data['low'] # Find local maxima and minima using different windows pivot_highs = [] pivot_lows = [] for window in [5, 10, 20, 50]: if len(monthly_data) > window * 2: # Rolling max/min detection rolling_max = highs.rolling(window=window, center=True).max() rolling_min = lows.rolling(window=window, center=True).min() # Find pivot highs (local maxima) high_pivots = monthly_data[highs == rolling_max]['high'].tolist() pivot_highs.extend(high_pivots) # Find pivot lows (local minima) low_pivots = monthly_data[lows == rolling_min]['low'].tolist() pivot_lows.extend(low_pivots) # Create mock level structure mock_level = type('MockLevel', (), { 'swing_points': [], 'support_levels': list(set(pivot_lows)), 'resistance_levels': list(set(pivot_highs)) })() return {'level_0': mock_level} except Exception as e: logger.error(f"Error in simple pivot detection: {e}") return {} def _should_refresh_pivot_bounds(self, symbol: str) -> bool: """Check if pivot bounds need refreshing""" try: if symbol not in self.pivot_bounds: return True bounds = self.pivot_bounds[symbol] age = datetime.now() - bounds.created_timestamp return age > self.pivot_refresh_interval except Exception as e: logger.error(f"Error checking pivot bounds refresh: {e}") return True def _refresh_pivot_bounds_for_symbol(self, symbol: str): """Refresh pivot bounds for a specific symbol""" try: # Collect monthly 1m data monthly_data = self._collect_monthly_1m_data(symbol) if monthly_data is None or monthly_data.empty: logger.warning(f"Could not collect monthly data for {symbol}") return # Extract pivot bounds bounds = self._extract_pivot_bounds_from_monthly_data(symbol, monthly_data) if bounds is None: logger.warning(f"Could not extract pivot bounds for {symbol}") return # Store bounds self.pivot_bounds[symbol] = bounds # Save to cache self._save_pivot_bounds_to_cache(symbol, bounds) logger.info(f"Successfully refreshed pivot bounds for {symbol}") except Exception as e: logger.error(f"Error refreshing pivot bounds for {symbol}: {e}") def _add_pivot_context_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame: """Add pivot-derived context features for normalization""" try: if symbol not in self.pivot_bounds: return df bounds = self.pivot_bounds[symbol] current_prices = df['close'] # Distance to nearest support/resistance levels (normalized) df['pivot_support_distance'] = current_prices.apply(bounds.get_nearest_support_distance) df['pivot_resistance_distance'] = current_prices.apply(bounds.get_nearest_resistance_distance) # Price position within pivot range (0 = price_min, 1 = price_max) df['pivot_price_position'] = current_prices.apply(bounds.normalize_price).clip(0, 1) # Add binary features for proximity to key levels price_range = bounds.get_price_range() proximity_threshold = price_range * 0.02 # 2% of price range df['near_pivot_support'] = 0 df['near_pivot_resistance'] = 0 for price in current_prices: # Check if near any support level if any(abs(price - s) <= proximity_threshold for s in bounds.pivot_support_levels): df.loc[df['close'] == price, 'near_pivot_support'] = 1 # Check if near any resistance level if any(abs(price - r) <= proximity_threshold for r in bounds.pivot_resistance_levels): df.loc[df['close'] == price, 'near_pivot_resistance'] = 1 logger.debug(f"Added pivot context features for {symbol}") return df except Exception as e: logger.warning(f"Error adding pivot context features for {symbol}: {e}") return df def _extract_symbol_from_dataframe(self, df: pd.DataFrame) -> Optional[str]: """Extract symbol from dataframe context (basic implementation)""" # This is a simple implementation - in a real system, you might pass symbol explicitly # or store it as metadata in the dataframe for symbol in self.symbols: # Check if this dataframe might belong to this symbol based on current processing return symbol # Return first symbol for now - can be improved return None # === CACHE MANAGEMENT === def _auto_fix_corrupted_cache(self): """Automatically fix corrupted cache files on startup""" try: from utils.cache_manager import get_cache_manager cache_manager = get_cache_manager() # Quick health check health_summary = cache_manager.get_cache_summary() if health_summary['corrupted_files'] > 0: logger.warning(f"Found {health_summary['corrupted_files']} corrupted cache files, cleaning up...") # Auto-cleanup corrupted files (no confirmation needed) deleted_files = cache_manager.cleanup_corrupted_files(dry_run=False) deleted_count = 0 for cache_dir, files in deleted_files.items(): for file_info in files: if "DELETED:" in file_info: deleted_count += 1 logger.info(f"Auto-cleaned {deleted_count} corrupted cache files") else: logger.debug("Cache health check passed - no corrupted files found") except Exception as e: logger.warning(f"Cache auto-fix failed: {e}") # === PIVOT BOUNDS CACHING === def _load_all_pivot_bounds(self): """Load all cached pivot bounds on startup""" try: for symbol in self.symbols: bounds = self._load_pivot_bounds_from_cache(symbol) if bounds: self.pivot_bounds[symbol] = bounds logger.info(f"Loaded cached pivot bounds for {symbol}") except Exception as e: logger.error(f"Error loading pivot bounds from cache: {e}") def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]: """Load pivot bounds from cache""" try: cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" if cache_file.exists(): with open(cache_file, 'rb') as f: bounds = pickle.load(f) # Check if bounds are still valid (not too old) age = datetime.now() - bounds.created_timestamp if age <= self.pivot_refresh_interval: return bounds else: logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)") return None except Exception as e: logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}") return None def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds): """Save pivot bounds to cache""" try: cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" with open(cache_file, 'wb') as f: pickle.dump(bounds, f) logger.debug(f"Saved pivot bounds to cache for {symbol}") except Exception as e: logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}") def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]: """Load monthly 1m data from cache""" try: cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" if cache_file.exists(): try: df = pd.read_parquet(cache_file) logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}") return df except Exception as parquet_e: # Handle corrupted Parquet file - expanded error detection error_str = str(parquet_e).lower() corrupted_indicators = [ "parquet magic bytes not found", "corrupted", "couldn't deserialize thrift", "don't know what type", "invalid parquet file", "unexpected end of file", "invalid metadata" ] if any(indicator in error_str for indicator in corrupted_indicators): logger.warning(f"Corrupted Parquet cache file for {symbol}, removing and returning None: {parquet_e}") try: cache_file.unlink() # Delete corrupted file logger.info(f"Deleted corrupted monthly cache file: {cache_file}") except Exception as delete_e: logger.error(f"Failed to delete corrupted monthly cache file: {delete_e}") return None else: raise parquet_e return None except Exception as e: logger.warning(f"Error loading monthly data from cache for {symbol}: {e}") return None def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame): """Save monthly 1m data to cache""" try: cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" df.to_parquet(cache_file, index=False) logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}") except Exception as e: logger.warning(f"Error saving monthly data to cache for {symbol}: {e}") def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]: """Get pivot bounds for a symbol""" return self.pivot_bounds.get(symbol) def get_pivot_normalized_features(self, symbol: str, df: pd.DataFrame) -> Optional[pd.DataFrame]: """Get dataframe with pivot-normalized features""" try: if symbol not in self.pivot_bounds: logger.warning(f"No pivot bounds available for {symbol}") return df bounds = self.pivot_bounds[symbol] normalized_df = df.copy() # Normalize price columns using pivot bounds price_range = bounds.get_price_range() for col in ['open', 'high', 'low', 'close']: if col in normalized_df.columns: normalized_df[col] = (normalized_df[col] - bounds.price_min) / price_range # Normalize volume using pivot bounds volume_range = bounds.volume_max - bounds.volume_min if volume_range > 0 and 'volume' in normalized_df.columns: normalized_df['volume'] = (normalized_df['volume'] - bounds.volume_min) / volume_range return normalized_df except Exception as e: logger.error(f"Error applying pivot normalization for {symbol}: {e}") return df def build_base_data_input(self, symbol: str) -> Optional['BaseDataInput']: """ Build BaseDataInput from cached data (optimized for speed) Args: symbol: Trading symbol Returns: BaseDataInput with consistent data structure """ try: from .data_models import BaseDataInput # Get OHLCV data directly from optimized cache (no validation checks for speed) ohlcv_1s_list = self._get_cached_ohlcv_bars(symbol, '1s', 300) ohlcv_1m_list = self._get_cached_ohlcv_bars(symbol, '1m', 300) ohlcv_1h_list = self._get_cached_ohlcv_bars(symbol, '1h', 300) ohlcv_1d_list = self._get_cached_ohlcv_bars(symbol, '1d', 300) # Get BTC reference data btc_symbol = 'BTC/USDT' btc_ohlcv_1s_list = self._get_cached_ohlcv_bars(btc_symbol, '1s', 300) if not btc_ohlcv_1s_list: # Use ETH data as fallback btc_ohlcv_1s_list = ohlcv_1s_list # Get cached data (fast lookups) technical_indicators = self._get_latest_technical_indicators(symbol) cob_data = self._get_latest_cob_data_object(symbol) last_predictions = {} # TODO: Implement model prediction caching # Build BaseDataInput (no validation for speed - assume data is good) base_data = BaseDataInput( symbol=symbol, timestamp=datetime.now(), ohlcv_1s=ohlcv_1s_list, ohlcv_1m=ohlcv_1m_list, ohlcv_1h=ohlcv_1h_list, ohlcv_1d=ohlcv_1d_list, btc_ohlcv_1s=btc_ohlcv_1s_list, technical_indicators=technical_indicators, cob_data=cob_data, last_predictions=last_predictions ) return base_data except Exception as e: logger.error(f"Error building BaseDataInput for {symbol}: {e}") return None def _get_cached_ohlcv_bars(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']: """Get OHLCV data list from pre-built cache for instant access""" try: with self._ohlcv_cache_lock: cache_key = f"{symbol}_{timeframe}" # Check if we have fresh cached data (updated within last 5 seconds) last_update = self._last_cache_update.get(cache_key) if (last_update and (datetime.now() - last_update).total_seconds() < self._cache_refresh_interval and cache_key in self._ohlcv_cache): cached_data = self._ohlcv_cache[cache_key] return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data # Need to rebuild cache for this symbol/timeframe data_list = self._build_ohlcv_bar_cache(symbol, timeframe, max_count) # Cache the result self._ohlcv_cache[cache_key] = data_list self._last_cache_update[cache_key] = datetime.now() return data_list[-max_count:] if len(data_list) >= max_count else data_list except Exception as e: logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}") return [] def _build_ohlcv_bar_cache(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']: """Build OHLCV bar cache from historical and current data""" try: from .data_models import OHLCVBar data_list = [] # Get historical data first (this should be fast as it's already cached) historical_df = self.get_historical_data(symbol, timeframe, limit=max_count) if historical_df is not None and not historical_df.empty: # Convert historical data to OHLCVBar objects for idx, row in historical_df.tail(max_count).iterrows(): bar = OHLCVBar( symbol=symbol, timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(), open=float(row['open']), high=float(row['high']), low=float(row['low']), close=float(row['close']), volume=float(row['volume']), timeframe=timeframe ) data_list.append(bar) return data_list except Exception as e: logger.error(f"Error building OHLCV bar cache for {symbol}/{timeframe}: {e}") return [] def _get_latest_technical_indicators(self, symbol: str) -> Dict[str, float]: """Get latest technical indicators for a symbol""" try: # Get latest data and calculate indicators df = self.get_historical_data(symbol, '1h', limit=50) if df is not None and not df.empty: df_with_indicators = self._add_technical_indicators(df) if not df_with_indicators.empty: # Return the latest indicators as a dict latest_row = df_with_indicators.iloc[-1] indicators = {} for col in df_with_indicators.columns: if col not in ['open', 'high', 'low', 'close', 'volume', 'timestamp']: indicators[col] = float(latest_row[col]) if pd.notna(latest_row[col]) else 0.0 return indicators return {} except Exception as e: logger.error(f"Error getting technical indicators for {symbol}: {e}") return {} def _get_latest_cob_data_object(self, symbol: str) -> Optional['COBData']: """Get latest COB data as COBData object""" try: from .data_models import COBData # Get latest COB data from cache cob_data = self.get_latest_cob_data(symbol) if cob_data and 'current_price' in cob_data: return COBData( symbol=symbol, timestamp=datetime.now(), current_price=cob_data['current_price'], bucket_size=1.0 if 'ETH' in symbol else 10.0, price_buckets=cob_data.get('price_buckets', {}), bid_ask_imbalance=cob_data.get('bid_ask_imbalance', {}), volume_weighted_prices=cob_data.get('volume_weighted_prices', {}), order_flow_metrics=cob_data.get('order_flow_metrics', {}), ma_1s_imbalance=cob_data.get('ma_1s_imbalance', {}), ma_5s_imbalance=cob_data.get('ma_5s_imbalance', {}), ma_15s_imbalance=cob_data.get('ma_15s_imbalance', {}), ma_60s_imbalance=cob_data.get('ma_60s_imbalance', {}) ) return None except Exception as e: logger.error(f"Error getting COB data object for {symbol}: {e}") return None def invalidate_ohlcv_cache(self, symbol: str): """Invalidate OHLCV cache for a symbol when new data arrives""" try: with self._ohlcv_cache_lock: # Remove cached data for all timeframes of this symbol keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")] for key in keys_to_remove: if key in self._ohlcv_cache: del self._ohlcv_cache[key] if key in self._last_cache_update: del self._last_cache_update[key] except Exception as e: logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}") def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add basic indicators for small datasets""" try: df = df.copy() # Basic moving averages if len(df) >= 20: df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12) # Basic RSI if len(df) >= 14: df['rsi_14'] = ta.momentum.rsi(df['close'], window=14) # Basic volume indicators if len(df) >= 10: df['volume_sma_10'] = df['volume'].rolling(window=10).mean() # Basic price action df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low']) df['price_position'] = df['price_position'].fillna(0.5) # Default to middle # Fill NaN values df = df.ffill().bfill().fillna(0) return df except Exception as e: logger.error(f"Error adding basic indicators: {e}") return df def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Load data from cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" if cache_file.exists(): # Check if cache is recent - stricter rules for startup cache_age = time.time() - cache_file.stat().st_mtime # For 1m data, use cache only if less than 5 minutes old to avoid gaps if timeframe == '1m': max_age = 300 # 5 minutes else: max_age = 3600 # 1 hour for other timeframes if cache_age < max_age: try: df = pd.read_parquet(cache_file) logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe} (age: {cache_age/60:.1f}min)") return df except Exception as parquet_e: # Handle corrupted Parquet file - expanded error detection error_str = str(parquet_e).lower() corrupted_indicators = [ "parquet magic bytes not found", "corrupted", "couldn't deserialize thrift", "don't know what type", "invalid parquet file", "unexpected end of file", "invalid metadata" ] if any(indicator in error_str for indicator in corrupted_indicators): logger.warning(f"Corrupted Parquet cache file for {symbol} {timeframe}, removing and returning None: {parquet_e}") try: cache_file.unlink() # Delete corrupted file logger.info(f"Deleted corrupted cache file: {cache_file}") except Exception as delete_e: logger.error(f"Failed to delete corrupted cache file: {delete_e}") return None else: raise parquet_e else: logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/60:.1f}min > {max_age/60:.1f}min)") return None except Exception as e: logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}") return None def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str): """Save data to cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" df.to_parquet(cache_file, index=False) logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}") except Exception as e: logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}") async def start_real_time_streaming(self): """Start real-time data streaming using COBIntegration""" if self.is_streaming: logger.warning("Real-time streaming already active") return self.is_streaming = True logger.info("Starting real-time streaming via COBIntegration") # COBIntegration is started in the constructor async def stop_real_time_streaming(self): """Stop real-time data streaming""" if not self.is_streaming: return logger.info("Stopping Enhanced COB WebSocket streaming") self.is_streaming = False # Stop COB Integration if self.cob_integration: try: await self.cob_integration.stop() logger.info("COB Integration stopped") except Exception as e: logger.error(f"Error stopping COB Integration: {e}") # Stop Enhanced COB WebSocket if self.enhanced_cob_websocket: try: await self.enhanced_cob_websocket.stop() self.enhanced_cob_websocket = None logger.info("Enhanced COB WebSocket stopped") except Exception as e: logger.error(f"Error stopping Enhanced COB WebSocket: {e}") # Cancel any remaining WebSocket tasks for symbol, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() async def _on_enhanced_cob_data(self, symbol: str, cob_data: Dict): """Handle COB data from Enhanced WebSocket""" try: # This method will now be called by COBIntegration # Ensure cob_websocket_data is initialized if not hasattr(self, 'cob_websocket_data'): self.cob_websocket_data = {} # Store the latest COB data self.cob_websocket_data[symbol] = cob_data # Trigger bucketing self._update_price_buckets(symbol, cob_data) # Ensure cob_data_cache is initialized if not hasattr(self, 'cob_data_cache'): self.cob_data_cache = {} # Update COB data cache for distribution binance_symbol = symbol.replace('/', '').upper() if binance_symbol not in self.cob_data_cache or self.cob_data_cache[binance_symbol] is None: self.cob_data_cache[binance_symbol] = deque(maxlen=300) # Ensure the deque is properly initialized if not isinstance(self.cob_data_cache[binance_symbol], deque): self.cob_data_cache[binance_symbol] = deque(maxlen=300) self.cob_data_cache[binance_symbol].append({ 'timestamp': cob_data.get('timestamp', datetime.now()), 'data': cob_data, 'source': 'enhanced_websocket' }) # Ensure cob_data_callbacks is initialized if not hasattr(self, 'cob_data_callbacks'): self.cob_data_callbacks = [] # Distribute to COB data callbacks for callback in self.cob_data_callbacks: try: callback(symbol, cob_data) except Exception as e: logger.error(f"Error in COB data callback: {e}") # Ensure distribution_stats is initialized if not hasattr(self, 'distribution_stats'): self.distribution_stats = { 'total_ticks_received': 0, 'last_tick_time': {} } # Update distribution stats self.distribution_stats['total_ticks_received'] += 1 self.distribution_stats['last_tick_time'][symbol] = datetime.now() logger.debug(f"Enhanced COB data received for {symbol}: {len(cob_data.get('bids', []))} bids, {len(cob_data.get('asks', []))} asks") except Exception as e: logger.error(f"Error handling enhanced COB data for {symbol}: {e}", exc_info=True) async def _on_websocket_status_update(self, status_data: Dict): """Handle WebSocket status updates""" try: symbol = status_data.get('symbol') status = status_data.get('status') message = status_data.get('message', '') # Ensure cob_websocket_status is initialized if not hasattr(self, 'cob_websocket_status'): self.cob_websocket_status = {} if symbol: self.cob_websocket_status[symbol] = status logger.info(f"🔌 Enhanced WebSocket status for {symbol}: {status} - {message}") except Exception as e: logger.error(f"Error handling WebSocket status update: {e}", exc_info=True) async def _start_fallback_websocket_streaming(self): """Fallback to old WebSocket method if Enhanced COB WebSocket fails""" try: logger.warning("⚠️ Starting fallback WebSocket streaming") # Start old WebSocket for each symbol for symbol in self.symbols: task = asyncio.create_task(self._websocket_stream(symbol)) self.websocket_tasks[symbol] = task except Exception as e: logger.error(f"❌ Error starting fallback WebSocket: {e}") def get_cob_websocket_status(self) -> Dict[str, Any]: """Get COB WebSocket status for dashboard""" try: if self.enhanced_cob_websocket: return self.enhanced_cob_websocket.get_status_summary() else: return { 'overall_status': 'not_initialized', 'symbols': {}, 'websockets_available': False } except Exception as e: logger.error(f"Error getting COB WebSocket status: {e}") return { 'overall_status': 'error', 'symbols': {}, 'error': str(e) } def get_latest_cob_data(self, symbol: str) -> Optional[Dict]: """Get latest COB data from Enhanced WebSocket""" try: return self.cob_websocket_data.get(symbol) except Exception as e: logger.error(f"Error getting latest COB data for {symbol}: {e}") return None async def _websocket_stream(self, symbol: str): """WebSocket stream for a single symbol using trade stream for better granularity""" binance_symbol = symbol.replace('/', '').upper() url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade" while self.is_streaming: try: logger.info(f"Connecting to WebSocket for {symbol}: {url}") async with websockets.connect(url) as websocket: logger.info(f"WebSocket connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: await self._process_trade_message(binance_symbol, message) except Exception as e: logger.warning(f"Error processing trade message for {symbol}: {e}") except Exception as e: logger.error(f"WebSocket error for {symbol}: {e}") self.distribution_stats['distribution_errors'] += 1 if self.is_streaming: logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...") await asyncio.sleep(5) async def _process_trade_message(self, symbol: str, message: str): """Process incoming trade message and distribute to subscribers""" try: trade_data = json.loads(message) # Extract trade information price = float(trade_data.get('p', 0)) quantity = float(trade_data.get('q', 0)) timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000) is_buyer_maker = trade_data.get('m', False) trade_id = trade_data.get('t', '') # Calculate volume in USDT volume_usdt = price * quantity # Data validation if not self._validate_tick_data(symbol, price, volume_usdt): logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}") return # Process raw tick through aggregator side = 'sell' if is_buyer_maker else 'buy' raw_tick, completed_bar = self.tick_aggregator.process_tick( symbol=symbol, timestamp=timestamp, price=price, volume=volume_usdt, quantity=quantity, side=side, trade_id=str(trade_id) ) # Update statistics self.distribution_stats['total_ticks_received'] += 1 self.distribution_stats['ticks_per_symbol'][symbol] += 1 self.distribution_stats['last_tick_time'][symbol] = timestamp self.last_prices[symbol] = price if raw_tick: self.distribution_stats['raw_ticks_processed'] += 1 # Notify raw tick callbacks for callback in self.raw_tick_callbacks: try: callback(raw_tick) except Exception as e: logger.error(f"Error in raw tick callback: {e}") if completed_bar: self.distribution_stats['ohlcv_bars_created'] += 1 # Notify OHLCV bar callbacks for callback in self.ohlcv_bar_callbacks: try: callback(completed_bar) except Exception as e: logger.error(f"Error in OHLCV bar callback: {e}") # Create standardized tick for legacy compatibility tick = MarketTick( symbol=symbol, timestamp=timestamp, price=price, volume=volume_usdt, quantity=quantity, side=side, trade_id=str(trade_id), is_buyer_maker=is_buyer_maker, raw_data=trade_data ) # Add to buffer self.tick_buffers[symbol].append(tick) # Update current prices and candles await self._process_tick(symbol, tick) # Distribute to all subscribers self._distribute_tick(tick) except Exception as e: logger.error(f"Error processing trade message for {symbol}: {e}") async def _process_tick(self, symbol: str, tick: MarketTick): """Process a single tick and update candles""" try: # Update current price with self.data_lock: self.current_prices[symbol] = tick.price # Initialize real-time data structure if needed if symbol not in self.real_time_data: self.real_time_data[symbol] = {} for tf in self.timeframes: self.real_time_data[symbol][tf] = deque(maxlen=1000) # Create tick record for candle updates tick_record = { 'timestamp': tick.timestamp, 'price': tick.price, 'volume': tick.volume } # Update all timeframes for timeframe in self.timeframes: self._update_candle(symbol, timeframe, tick_record) except Exception as e: logger.error(f"Error processing tick for {symbol}: {e}") def _update_candle(self, symbol: str, timeframe: str, tick: Dict): """Update candle for specific timeframe""" try: timeframe_secs = self.timeframe_seconds.get(timeframe, 3600) current_time = tick['timestamp'] # Calculate candle start time using proper datetime truncation if isinstance(current_time, datetime): timestamp_seconds = current_time.timestamp() else: timestamp_seconds = current_time.timestamp() if hasattr(current_time, 'timestamp') else current_time # Truncate to timeframe boundary candle_start_seconds = int(timestamp_seconds // timeframe_secs) * timeframe_secs candle_start = datetime.fromtimestamp(candle_start_seconds) # Get current candle queue candle_queue = self.real_time_data[symbol][timeframe] # Check if we need a new candle if not candle_queue or candle_queue[-1]['timestamp'] != candle_start: # Create new candle new_candle = { 'timestamp': candle_start, 'open': tick['price'], 'high': tick['price'], 'low': tick['price'], 'close': tick['price'], 'volume': tick['volume'] } candle_queue.append(new_candle) else: # Update existing candle current_candle = candle_queue[-1] current_candle['high'] = max(current_candle['high'], tick['price']) current_candle['low'] = min(current_candle['low'], tick['price']) current_candle['close'] = tick['price'] current_candle['volume'] += tick['volume'] except Exception as e: logger.error(f"Error updating candle for {symbol} {timeframe}: {e}") def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame: """Get the latest candles combining historical and real-time data""" try: # Get historical data historical_df = self.get_historical_data(symbol, timeframe, limit=limit) # Get real-time data with self.data_lock: if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]: real_time_candles = list(self.real_time_data[symbol][timeframe]) if real_time_candles: # Convert to DataFrame rt_df = pd.DataFrame(real_time_candles) if historical_df is not None: # Combine historical and real-time # Remove overlapping candles from historical data if not rt_df.empty: cutoff_time = rt_df['timestamp'].min() historical_df = historical_df[historical_df['timestamp'] < cutoff_time] # Concatenate combined_df = pd.concat([historical_df, rt_df], ignore_index=True) else: combined_df = rt_df return combined_df.tail(limit) # Return just historical data if no real-time data return historical_df.tail(limit) if historical_df is not None else pd.DataFrame() except Exception as e: logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}") return pd.DataFrame() def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol from latest candle""" try: # Try to get from 1s candle first (most recent) for tf in ['1s', '1m', '5m', '1h']: df = self.get_latest_candles(symbol, tf, limit=1) if df is not None and not df.empty: return float(df.iloc[-1]['close']) # Fallback to any available data key = f"{symbol}_{self.timeframes[0]}" if key in self.historical_data and not self.historical_data[key].empty: return float(self.historical_data[key].iloc[-1]['close']) logger.warning(f"No price data available for {symbol}") return None except Exception as e: logger.error(f"Error getting current price for {symbol}: {e}") return None def calculate_williams_pivot_points(self, symbol: str, force_recalculate: bool = False) -> Dict[int, TrendLevel]: """ Calculate Williams Market Structure pivot points for a symbol Args: symbol: Trading symbol (e.g., 'ETH/USDT') force_recalculate: Force recalculation even if cache is fresh Returns: Dictionary of trend levels with pivot points """ try: # Check if we need to recalculate now = datetime.now() if (not force_recalculate and symbol in self.last_pivot_calculation and now - self.last_pivot_calculation[symbol] < self.pivot_calculation_interval): # Return cached results return self.pivot_points_cache.get(symbol, {}) # Get 1s OHLCV data for Williams Market Structure calculation df_1s = self.get_historical_data(symbol, '1s', limit=1000) if df_1s is None or len(df_1s) < 50: logger.warning(f"Insufficient 1s data for Williams pivot calculation: {symbol}") return {} # Convert DataFrame to numpy array for Williams calculation # Format: [timestamp_ms, open, high, low, close, volume] ohlcv_array = np.column_stack([ df_1s.index.astype(np.int64) // 10**6, # Convert to milliseconds df_1s['open'].values, df_1s['high'].values, df_1s['low'].values, df_1s['close'].values, df_1s['volume'].values ]) # Calculate recursive pivot points using Williams Market Structure williams = self.williams_structure[symbol] pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array) # Cache the results self.pivot_points_cache[symbol] = pivot_levels self.last_pivot_calculation[symbol] = now logger.debug(f"Calculated Williams pivot points for {symbol}: {len(pivot_levels)} levels") return pivot_levels except Exception as e: logger.error(f"Error calculating Williams pivot points for {symbol}: {e}") return {} def get_pivot_features_for_ml(self, symbol: str) -> np.ndarray: """ Get pivot point features for machine learning models Returns a 250-element feature vector containing: - Recent pivot points (price, strength, type) for each level - Trend direction and strength for each level - Time since last pivot for each level """ try: # Ensure we have fresh pivot points pivot_levels = self.calculate_williams_pivot_points(symbol) if not pivot_levels: logger.warning(f"No pivot points available for {symbol}") return np.zeros(250, dtype=np.float32) # Use Williams Market Structure to extract ML features williams = self.williams_structure[symbol] features = williams.get_pivot_features_for_ml(symbol) return features except Exception as e: logger.error(f"Error getting pivot features for ML: {e}") return np.zeros(250, dtype=np.float32) def get_market_structure_summary(self, symbol: str) -> Dict[str, Any]: """ Get current market structure summary for dashboard display Returns: Dictionary containing market structure information """ try: # Ensure we have fresh pivot points pivot_levels = self.calculate_williams_pivot_points(symbol) if not pivot_levels: return { 'symbol': symbol, 'levels': {}, 'overall_trend': 'sideways', 'overall_strength': 0.0, 'last_update': datetime.now().isoformat(), 'error': 'No pivot points available' } # Use Williams Market Structure to get summary williams = self.williams_structure[symbol] structure = williams.get_current_market_structure() structure['symbol'] = symbol return structure except Exception as e: logger.error(f"Error getting market structure summary for {symbol}: {e}") return { 'symbol': symbol, 'levels': {}, 'overall_trend': 'sideways', 'overall_strength': 0.0, 'last_update': datetime.now().isoformat(), 'error': str(e) } def get_recent_pivot_points(self, symbol: str, level: int = 1, count: int = 10) -> List[PivotPoint]: """ Get recent pivot points for a specific level Args: symbol: Trading symbol level: Pivot level (1-5) count: Number of recent pivots to return Returns: List of recent pivot points """ try: pivot_levels = self.calculate_williams_pivot_points(symbol) if level not in pivot_levels: return [] trend_level = pivot_levels[level] recent_pivots = trend_level.pivot_points[-count:] if len(trend_level.pivot_points) >= count else trend_level.pivot_points return recent_pivots except Exception as e: logger.error(f"Error getting recent pivot points for {symbol} level {level}: {e}") return [] def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]: """Get price at specific index for backtesting""" try: key = f"{symbol}_{timeframe}" if key in self.historical_data: df = self.historical_data[key] if 0 <= index < len(df): return float(df.iloc[index]['close']) return None except Exception as e: logger.error(f"Error getting price at index {index}: {e}") return None def get_feature_matrix(self, symbol: str, timeframes: List[str] = None, window_size: int = 20) -> Optional[np.ndarray]: """ Get comprehensive feature matrix for multiple timeframes with technical indicators Returns: np.ndarray: Shape (n_timeframes, window_size, n_features) Each timeframe becomes a separate channel for CNN """ try: if timeframes is None: timeframes = self.timeframes feature_channels = [] common_feature_names = None # First pass: determine common features across all timeframes timeframe_features = {} for tf in timeframes: logger.debug(f"Processing timeframe {tf} for {symbol}") df = self.get_latest_candles(symbol, tf, limit=window_size + 100) if df is None or len(df) < window_size: logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows") continue # Get feature columns basic_cols = ['open', 'high', 'low', 'close', 'volume'] indicator_cols = [col for col in df.columns if col not in basic_cols + ['timestamp'] and not col.startswith('unnamed')] selected_features = self._select_cnn_features(df, basic_cols, indicator_cols) timeframe_features[tf] = (df, selected_features) if common_feature_names is None: common_feature_names = set(selected_features) else: common_feature_names = common_feature_names.intersection(set(selected_features)) if not common_feature_names: logger.error(f"No common features found across timeframes for {symbol}") return None # Convert to sorted list for consistent ordering common_feature_names = sorted(list(common_feature_names)) # logger.info(f"Using {len(common_feature_names)} common features: {common_feature_names}") # Second pass: create feature channels with common features for tf in timeframes: if tf not in timeframe_features: continue df, _ = timeframe_features[tf] # Use only common features try: tf_features = self._normalize_features(df[common_feature_names].tail(window_size), symbol=symbol) if tf_features is not None and len(tf_features) == window_size: feature_channels.append(tf_features.values) logger.debug(f"Added {len(common_feature_names)} features for {tf}") else: logger.warning(f"Feature normalization failed for {tf}") except Exception as e: logger.error(f"Error processing features for {tf}: {e}") continue if not feature_channels: logger.error(f"No valid feature channels created for {symbol}") return None # Verify all channels have the same shape shapes = [channel.shape for channel in feature_channels] if len(set(shapes)) > 1: logger.error(f"Shape mismatch in feature channels: {shapes}") return None # Stack all timeframe channels feature_matrix = np.stack(feature_channels, axis=0) logger.debug(f"Created feature matrix for {symbol}: {feature_matrix.shape} " f"({len(feature_channels)} timeframes, {window_size} steps, {len(common_feature_names)} features)") return feature_matrix except Exception as e: logger.error(f"Error creating feature matrix for {symbol}: {e}") import traceback logger.error(traceback.format_exc()) return None def _select_cnn_features(self, df: pd.DataFrame, basic_cols: List[str], indicator_cols: List[str]) -> List[str]: """Select the most important features for CNN training""" try: selected = [] # Always include basic OHLCV (normalized) selected.extend(basic_cols) # Priority indicators (most informative for CNNs) priority_indicators = [ # Trend indicators 'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50', 'macd', 'macd_signal', 'macd_histogram', 'adx', 'adx_pos', 'adx_neg', 'psar', # Momentum indicators 'rsi_14', 'rsi_7', 'rsi_21', 'stoch_k', 'stoch_d', 'williams_r', 'ultimate_osc', # Volatility indicators 'bb_upper', 'bb_lower', 'bb_middle', 'bb_width', 'bb_percent', 'atr', 'keltner_upper', 'keltner_lower', 'keltner_middle', # Volume indicators 'volume_sma_10', 'volume_sma_20', 'obv', 'vpt', 'mfi', 'ad_line', 'vwap', # Price action 'price_position', 'true_range', 'roc', # Custom composites 'trend_strength', 'momentum_composite', 'volatility_regime' ] # Add available priority indicators for indicator in priority_indicators: if indicator in indicator_cols: selected.append(indicator) # Add any other technical indicators not in priority list (limit to avoid curse of dimensionality) remaining_indicators = [col for col in indicator_cols if col not in selected] if remaining_indicators: # Limit to 10 additional indicators selected.extend(remaining_indicators[:10]) # Verify all selected features exist in dataframe final_selected = [col for col in selected if col in df.columns] logger.debug(f"Selected {len(final_selected)} features from {len(df.columns)} available columns") return final_selected except Exception as e: logger.error(f"Error selecting CNN features: {e}") return basic_cols # Fallback to basic OHLCV def _normalize_features(self, df: pd.DataFrame, symbol: str = None) -> Optional[pd.DataFrame]: """Normalize features for CNN training using pivot-based bounds when available""" try: df_norm = df.copy() # Try to use pivot-based normalization if available if symbol and symbol in self.pivot_bounds: bounds = self.pivot_bounds[symbol] price_range = bounds.get_price_range() # Normalize price-based features using pivot bounds price_cols = ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle', 'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap'] for col in price_cols: if col in df_norm.columns: # Use pivot bounds for normalization df_norm[col] = (df_norm[col] - bounds.price_min) / price_range # Normalize volume using pivot bounds if 'volume' in df_norm.columns: volume_range = bounds.volume_max - bounds.volume_min if volume_range > 0: df_norm['volume'] = (df_norm['volume'] - bounds.volume_min) / volume_range else: df_norm['volume'] = 0.5 # Default to middle if no volume range logger.debug(f"Applied pivot-based normalization for {symbol}") else: # Fallback to traditional normalization when pivot bounds not available logger.debug("Using traditional normalization (no pivot bounds available)") for col in df_norm.columns: if col in ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle', 'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap']: # Price-based indicators: normalize by close price if 'close' in df_norm.columns: base_price = df_norm['close'].iloc[-1] # Use latest close as reference if base_price > 0: df_norm[col] = df_norm[col] / base_price elif col == 'volume': # Volume: normalize by its own rolling mean volume_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1] if volume_mean > 0: df_norm[col] = df_norm[col] / volume_mean # Normalize indicators that have standard ranges (regardless of pivot bounds) for col in df_norm.columns: if col in ['rsi_14', 'rsi_7', 'rsi_21']: # RSI: already 0-100, normalize to 0-1 df_norm[col] = df_norm[col] / 100.0 elif col in ['stoch_k', 'stoch_d']: # Stochastic: already 0-100, normalize to 0-1 df_norm[col] = df_norm[col] / 100.0 elif col == 'williams_r': # Williams %R: -100 to 0, normalize to 0-1 df_norm[col] = (df_norm[col] + 100) / 100.0 elif col in ['macd', 'macd_signal', 'macd_histogram']: # MACD: normalize by ATR or close price if 'atr' in df_norm.columns and df_norm['atr'].iloc[-1] > 0: df_norm[col] = df_norm[col] / df_norm['atr'].iloc[-1] elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0: df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1] elif col in ['bb_width', 'bb_percent', 'price_position', 'trend_strength', 'momentum_composite', 'volatility_regime', 'pivot_price_position', 'pivot_support_distance', 'pivot_resistance_distance']: # Already normalized indicators: ensure 0-1 range df_norm[col] = np.clip(df_norm[col], 0, 1) elif col in ['atr', 'true_range']: # Volatility indicators: normalize by close price or pivot range if symbol and symbol in self.pivot_bounds: bounds = self.pivot_bounds[symbol] df_norm[col] = df_norm[col] / bounds.get_price_range() elif 'close' in df_norm.columns and df_norm['close'].iloc[-1] > 0: df_norm[col] = df_norm[col] / df_norm['close'].iloc[-1] elif col not in ['timestamp', 'near_pivot_support', 'near_pivot_resistance']: # Other indicators: z-score normalization col_mean = df_norm[col].rolling(window=min(20, len(df_norm))).mean().iloc[-1] col_std = df_norm[col].rolling(window=min(20, len(df_norm))).std().iloc[-1] if col_std > 0: df_norm[col] = (df_norm[col] - col_mean) / col_std else: df_norm[col] = 0 # Replace inf/-inf with 0 df_norm = df_norm.replace([np.inf, -np.inf], 0) # Fill any remaining NaN values df_norm = df_norm.fillna(0) return df_norm except Exception as e: logger.error(f"Error normalizing features: {e}") return df def get_multi_symbol_feature_matrix(self, symbols: List[str] = None, timeframes: List[str] = None, window_size: int = 20) -> Optional[np.ndarray]: """ Get feature matrix for multiple symbols and timeframes Returns: np.ndarray: Shape (n_symbols, n_timeframes, window_size, n_features) """ try: if symbols is None: symbols = self.symbols if timeframes is None: timeframes = self.timeframes symbol_matrices = [] for symbol in symbols: symbol_matrix = self.get_feature_matrix(symbol, timeframes, window_size) if symbol_matrix is not None: symbol_matrices.append(symbol_matrix) else: logger.warning(f"Could not create feature matrix for {symbol}") if symbol_matrices: # Stack all symbol matrices multi_symbol_matrix = np.stack(symbol_matrices, axis=0) logger.info(f"Created multi-symbol feature matrix: {multi_symbol_matrix.shape}") return multi_symbol_matrix return None except Exception as e: logger.error(f"Error creating multi-symbol feature matrix: {e}") return None def health_check(self) -> Dict[str, Any]: """Get health status of the data provider""" status = { 'streaming': self.is_streaming, 'symbols': len(self.symbols), 'timeframes': len(self.timeframes), 'current_prices': len(self.current_prices), 'websocket_tasks': len(self.websocket_tasks), 'historical_data_loaded': {} } # Check historical data availability for symbol in self.symbols: status['historical_data_loaded'][symbol] = {} for tf in self.timeframes: has_data = (symbol in self.historical_data and tf in self.historical_data[symbol] and not self.historical_data[symbol][tf].empty) status['historical_data_loaded'][symbol][tf] = has_data return status def subscribe_to_ticks(self, callback: Callable[[MarketTick], None], symbols: List[str] = None, subscriber_name: str = None) -> str: """Subscribe to real-time tick data updates""" subscriber_id = str(uuid.uuid4())[:8] subscriber_name = subscriber_name or f"subscriber_{subscriber_id}" # Convert symbols to Binance format if symbols: binance_symbols = [s.replace('/', '').upper() for s in symbols] else: binance_symbols = [s.replace('/', '').upper() for s in self.symbols] subscriber = DataSubscriber( subscriber_id=subscriber_id, callback=callback, symbols=binance_symbols, subscriber_name=subscriber_name ) with self.subscriber_lock: self.subscribers[subscriber_id] = subscriber logger.info(f"New tick subscriber registered: {subscriber_name} ({subscriber_id}) for symbols: {binance_symbols}") # Send recent tick data to new subscriber self._send_recent_ticks_to_subscriber(subscriber) return subscriber_id def unsubscribe_from_ticks(self, subscriber_id: str): """Unsubscribe from tick data updates""" with self.subscriber_lock: if subscriber_id in self.subscribers: subscriber_name = self.subscribers[subscriber_id].subscriber_name self.subscribers[subscriber_id].active = False del self.subscribers[subscriber_id] logger.info(f"Subscriber {subscriber_name} ({subscriber_id}) unsubscribed") def _send_recent_ticks_to_subscriber(self, subscriber: DataSubscriber): """Send recent tick data to a new subscriber""" try: for symbol in subscriber.symbols: if symbol in self.tick_buffers: # Send last 50 ticks to get subscriber up to speed recent_ticks = list(self.tick_buffers[symbol])[-50:] for tick in recent_ticks: try: subscriber.callback(tick) except Exception as e: logger.warning(f"Error sending recent tick to subscriber {subscriber.subscriber_id}: {e}") except Exception as e: logger.error(f"Error sending recent ticks: {e}") def _distribute_tick(self, tick: MarketTick): """Distribute tick to all relevant subscribers""" distributed_count = 0 with self.subscriber_lock: subscribers_to_remove = [] for subscriber_id, subscriber in self.subscribers.items(): if not subscriber.active: subscribers_to_remove.append(subscriber_id) continue if tick.symbol in subscriber.symbols: try: # Call subscriber callback in a thread to avoid blocking def call_callback(): try: subscriber.callback(tick) subscriber.tick_count += 1 subscriber.last_update = datetime.now() except Exception as e: logger.warning(f"Error in subscriber {subscriber_id} callback: {e}") subscriber.active = False # Use thread to avoid blocking the main data processing Thread(target=call_callback, daemon=True).start() distributed_count += 1 except Exception as e: logger.warning(f"Error distributing tick to subscriber {subscriber_id}: {e}") subscriber.active = False # Remove inactive subscribers for subscriber_id in subscribers_to_remove: if subscriber_id in self.subscribers: del self.subscribers[subscriber_id] self.distribution_stats['total_ticks_distributed'] += distributed_count def _validate_tick_data(self, symbol: str, price: float, volume: float) -> bool: """Validate incoming tick data for quality""" try: # Basic validation if price <= 0 or volume < 0: return False # Price change validation last_price = self.last_prices.get(symbol, 0) if last_price > 0: price_change_pct = abs(price - last_price) / last_price if price_change_pct > self.price_change_threshold: logger.warning(f"Large price change for {symbol}: {price_change_pct:.2%}") # Don't reject, just warn - could be legitimate return True except Exception as e: logger.error(f"Error validating tick data: {e}") return False def get_recent_ticks(self, symbol: str, count: int = 100) -> List[MarketTick]: """Get recent ticks for a symbol""" binance_symbol = symbol.replace('/', '').upper() if binance_symbol in self.tick_buffers: return list(self.tick_buffers[binance_symbol])[-count:] return [] def subscribe_to_raw_ticks(self, callback: Callable[[RawTick], None]) -> str: """Subscribe to raw tick data with timing information""" subscriber_id = str(uuid.uuid4()) self.raw_tick_callbacks.append(callback) logger.info(f"Raw tick subscriber added: {subscriber_id}") return subscriber_id def subscribe_to_ohlcv_bars(self, callback: Callable[[OHLCVBar], None]) -> str: """Subscribe to 1s OHLCV bars calculated from ticks""" subscriber_id = str(uuid.uuid4()) self.ohlcv_bar_callbacks.append(callback) logger.info(f"OHLCV bar subscriber added: {subscriber_id}") return subscriber_id def get_raw_tick_features(self, symbol: str, window_size: int = 50) -> Optional[np.ndarray]: """Get raw tick features for model consumption""" return self.tick_aggregator.get_tick_features_for_model(symbol, window_size) def get_ohlcv_features(self, symbol: str, window_size: int = 60) -> Optional[np.ndarray]: """Get 1s OHLCV features for model consumption""" return self.tick_aggregator.get_ohlcv_features_for_model(symbol, window_size) def get_detected_patterns(self, symbol: str, count: int = 50) -> List: """Get recently detected tick patterns""" return self.tick_aggregator.get_detected_patterns(symbol, count) def get_tick_aggregator_stats(self) -> Dict[str, Any]: """Get tick aggregator statistics""" return self.tick_aggregator.get_statistics() def get_subscriber_stats(self) -> Dict[str, Any]: """Get subscriber and distribution statistics""" with self.subscriber_lock: active_subscribers = len([s for s in self.subscribers.values() if s.active]) subscriber_stats = { sid: { 'name': s.subscriber_name, 'active': s.active, 'symbols': s.symbols, 'tick_count': s.tick_count, 'last_update': s.last_update.isoformat() if s.last_update else None } for sid, s in self.subscribers.items() } # Get tick aggregator stats aggregator_stats = self.get_tick_aggregator_stats() return { 'active_subscribers': active_subscribers, 'total_subscribers': len(self.subscribers), 'raw_tick_callbacks': len(self.raw_tick_callbacks), 'ohlcv_bar_callbacks': len(self.ohlcv_bar_callbacks), 'subscriber_details': subscriber_stats, 'distribution_stats': self.distribution_stats.copy(), 'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()}, 'tick_aggregator': aggregator_stats } def update_bom_cache(self, symbol: str, bom_features: List[float], cob_integration=None): """ Update BOM cache with latest features for a symbol Args: symbol: Trading symbol (e.g., 'ETH/USDT') bom_features: List of BOM features (should be 120 features) cob_integration: Optional COB integration instance for real BOM data """ try: current_time = datetime.now() # Ensure we have exactly 120 features if len(bom_features) != self.bom_feature_count: if len(bom_features) > self.bom_feature_count: bom_features = bom_features[:self.bom_feature_count] else: bom_features.extend([0.0] * (self.bom_feature_count - len(bom_features))) # Convert to numpy array for efficient storage bom_array = np.array(bom_features, dtype=np.float32) # Add timestamp and features to cache with self.data_lock: self.bom_data_cache[symbol].append((current_time, bom_array)) logger.debug(f"Updated BOM cache for {symbol}: {len(self.bom_data_cache[symbol])} timestamps cached") except Exception as e: logger.error(f"Error updating BOM cache for {symbol}: {e}") def get_bom_matrix_for_cnn(self, symbol: str, sequence_length: int = 50) -> Optional[np.ndarray]: """ Get BOM matrix for CNN input from cached 1s data Args: symbol: Trading symbol (e.g., 'ETH/USDT') sequence_length: Required sequence length (default 50) Returns: np.ndarray: BOM matrix of shape (sequence_length, 120) or None if insufficient data """ try: with self.data_lock: if symbol not in self.bom_data_cache or len(self.bom_data_cache[symbol]) == 0: logger.warning(f"No BOM data cached for {symbol}") return None # Get recent data cached_data = list(self.bom_data_cache[symbol]) if len(cached_data) < sequence_length: logger.warning(f"Insufficient BOM data for {symbol}: {len(cached_data)} < {sequence_length}") # Pad with zeros if we don't have enough data bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32) # Fill available data at the end for i, (timestamp, features) in enumerate(cached_data): if i < sequence_length: bom_matrix[sequence_length - len(cached_data) + i] = features return bom_matrix # Take the most recent sequence_length samples recent_data = cached_data[-sequence_length:] # Create matrix bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32) for i, (timestamp, features) in enumerate(recent_data): bom_matrix[i] = features logger.debug(f"Retrieved BOM matrix for {symbol}: shape={bom_matrix.shape}") return bom_matrix except Exception as e: logger.error(f"Error getting BOM matrix for {symbol}: {e}") return None def get_real_bom_features(self, symbol: str) -> Optional[List[float]]: """ Get REAL BOM features from actual market data ONLY NO SYNTHETIC DATA - Returns None if real data is not available """ try: # Try to get real COB data from integration if hasattr(self, 'cob_integration') and self.cob_integration: return self._extract_real_bom_features(symbol, self.cob_integration) # No real data available - return None instead of synthetic logger.warning(f"No real BOM data available for {symbol} - waiting for real market data") return None except Exception as e: logger.error(f"Error getting real BOM features for {symbol}: {e}") return None def start_bom_cache_updates(self, cob_integration=None): """ Start background updates of BOM cache every second Args: cob_integration: Optional COB integration instance for real data """ try: def update_loop(): while self.is_streaming: try: for symbol in self.symbols: if cob_integration: # Try to get real BOM features from COB integration try: bom_features = self._extract_real_bom_features(symbol, cob_integration) if bom_features: self.update_bom_cache(symbol, bom_features, cob_integration) else: # NO SYNTHETIC FALLBACK - Wait for real data logger.warning(f"No real BOM features available for {symbol} - waiting for real data") except Exception as e: logger.warning(f"Error getting real BOM features for {symbol}: {e}") logger.warning(f"Waiting for real data instead of using synthetic") else: # NO SYNTHETIC FEATURES - Wait for real COB integration logger.warning(f"No COB integration available for {symbol} - waiting for real data") time.sleep(1.0) # Update every second except Exception as e: logger.error(f"Error in BOM cache update loop: {e}") time.sleep(5.0) # Wait longer on error # Start background thread bom_thread = Thread(target=update_loop, daemon=True) bom_thread.start() logger.info("Started BOM cache updates (1s resolution)") except Exception as e: logger.error(f"Error starting BOM cache updates: {e}") def _extract_real_bom_features(self, symbol: str, cob_integration) -> Optional[List[float]]: """Extract real BOM features from COB integration""" try: features = [] # Get consolidated order book if hasattr(cob_integration, 'get_consolidated_orderbook'): cob_snapshot = cob_integration.get_consolidated_orderbook(symbol) if cob_snapshot: # Extract order book features (40 features) features.extend(self._extract_orderbook_features(cob_snapshot)) else: features.extend([0.0] * 40) else: features.extend([0.0] * 40) # Get volume profile features (30 features) if hasattr(cob_integration, 'get_session_volume_profile'): volume_profile = cob_integration.get_session_volume_profile(symbol) if volume_profile: features.extend(self._extract_volume_profile_features(volume_profile)) else: features.extend([0.0] * 30) else: features.extend([0.0] * 30) # Add flow and microstructure features (50 features) features.extend(self._extract_flow_microstructure_features(symbol, cob_integration)) # Ensure exactly 120 features if len(features) > 120: features = features[:120] elif len(features) < 120: features.extend([0.0] * (120 - len(features))) return features except Exception as e: logger.warning(f"Error extracting real BOM features for {symbol}: {e}") return None def _extract_orderbook_features(self, cob_snapshot) -> List[float]: """Extract order book features from COB snapshot""" features = [] try: # Top 10 bid levels for i in range(10): if i < len(cob_snapshot.consolidated_bids): level = cob_snapshot.consolidated_bids[i] price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid volume_normalized = level.total_volume_usd / 1000000 features.extend([price_offset, volume_normalized]) else: features.extend([0.0, 0.0]) # Top 10 ask levels for i in range(10): if i < len(cob_snapshot.consolidated_asks): level = cob_snapshot.consolidated_asks[i] price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid volume_normalized = level.total_volume_usd / 1000000 features.extend([price_offset, volume_normalized]) else: features.extend([0.0, 0.0]) except Exception as e: logger.warning(f"Error extracting order book features: {e}") features = [0.0] * 40 return features[:40] def _extract_volume_profile_features(self, volume_profile) -> List[float]: """Extract volume profile features""" features = [] try: if 'data' in volume_profile: svp_data = volume_profile['data'] top_levels = sorted(svp_data, key=lambda x: x.get('total_volume', 0), reverse=True)[:10] for level in top_levels: buy_percent = level.get('buy_percent', 50.0) / 100.0 sell_percent = level.get('sell_percent', 50.0) / 100.0 total_volume = level.get('total_volume', 0.0) / 1000000 features.extend([buy_percent, sell_percent, total_volume]) # Pad to 30 features while len(features) < 30: features.extend([0.5, 0.5, 0.0]) except Exception as e: logger.warning(f"Error extracting volume profile features: {e}") features = [0.0] * 30 return features[:30] def _extract_flow_microstructure_features(self, symbol: str, cob_integration) -> List[float]: """Extract flow and microstructure features""" try: # For now, return synthetic features since full implementation would be complex # NO SYNTHETIC DATA - Return None if no real microstructure data logger.warning(f"No real microstructure data available for {symbol}") return None except: return [0.0] * 50 def _handle_rate_limit(self, url: str): """Handle rate limiting with exponential backoff""" current_time = time.time() # Check if we need to wait if url in self.last_request_time: time_since_last = current_time - self.last_request_time[url] if time_since_last < self.request_interval: sleep_time = self.request_interval - time_since_last logger.info(f"Rate limiting: sleeping {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request_time[url] = time.time() def _make_request_with_retry(self, url: str, params: dict = None): """Make HTTP request with retry logic for 451 errors""" for attempt in range(self.max_retries): try: self._handle_rate_limit(url) response = requests.get(url, params=params, timeout=30) if response.status_code == 451: logger.warning(f"Rate limit hit (451), attempt {attempt + 1}/{self.max_retries}") if attempt < self.max_retries - 1: sleep_time = self.retry_delay * (2 ** attempt) # Exponential backoff logger.info(f"Waiting {sleep_time}s before retry...") time.sleep(sleep_time) continue else: logger.error("Max retries reached, using cached data") return None response.raise_for_status() return response except Exception as e: logger.error(f"Request failed (attempt {attempt + 1}): {e}") if attempt < self.max_retries - 1: time.sleep(5 * (attempt + 1)) return None # ===== CENTRALIZED DATA COLLECTION METHODS ===== def start_centralized_data_collection(self): """Start all centralized data collection processes""" logger.info("Starting centralized data collection for all models and dashboard") # Start COB data collection self.start_cob_data_collection() # Start training data collection self.start_training_data_collection() logger.info("All centralized data collection processes started") def stop_centralized_data_collection(self): """Stop all centralized data collection processes""" logger.info("Stopping centralized data collection") # Stop COB collection self.cob_collection_active = False if self.cob_collection_thread and self.cob_collection_thread.is_alive(): self.cob_collection_thread.join(timeout=5) # Stop training data collection self.training_data_collection_active = False if self.training_data_thread and self.training_data_thread.is_alive(): self.training_data_thread.join(timeout=5) logger.info("Centralized data collection stopped") def start_cob_data_collection(self): """Start COB (Consolidated Order Book) data collection prioritizing WebSocket""" if self.cob_collection_active: logger.warning("COB data collection already active") return # Start real-time WebSocket streaming first (no rate limits) if not self.is_streaming: logger.info("Auto-starting WebSocket streaming for COB data (rate limit free)") self.start_real_time_streaming() self.cob_collection_active = True self.cob_collection_thread = Thread(target=self._cob_collection_worker, daemon=True) self.cob_collection_thread.start() logger.info("COB data collection started (WebSocket priority, minimal REST API)") def _cob_collection_worker(self): """Worker thread for COB data collection with WebSocket priority""" import requests import time import threading logger.info("COB data collection worker started (WebSocket-first approach)") # Significantly reduced frequency for REST API fallback only def collect_symbol_data(symbol): rest_api_fallback_count = 0 last_rest_api_call = 0 # Track last REST API call time while self.cob_collection_active: try: # PRIORITY 1: Try to use WebSocket data first ws_data = self._get_websocket_cob_data(symbol) if ws_data and len(ws_data) > 0: # Distribute WebSocket COB data self._distribute_cob_data(symbol, ws_data) rest_api_fallback_count = 0 # Reset fallback counter # Much longer sleep since WebSocket provides real-time data time.sleep(10.0) # Only check every 10 seconds when WS is working else: # FALLBACK: Only use REST API if WebSocket fails AND rate limit allows rest_api_fallback_count += 1 current_time = time.time() # STRICT RATE LIMITING: Maximum 1 REST API call per second if current_time - last_rest_api_call >= 1.0: # At least 1 second between calls if rest_api_fallback_count <= 3: # Limited fallback attempts logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}") self._collect_cob_data_for_symbol(symbol) last_rest_api_call = current_time # Update last call time else: logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)") else: logger.debug(f"Rate limiting REST API for {symbol} - waiting {1.0 - (current_time - last_rest_api_call):.1f}s") # Much longer sleep when using REST API fallback time.sleep(30.0) # 30 seconds between REST calls except Exception as e: logger.error(f"Error collecting COB data for {symbol}: {e}") time.sleep(10) # Longer recovery time # Start a thread for each symbol threads = [] for symbol in self.symbols: thread = threading.Thread(target=collect_symbol_data, args=(symbol,), daemon=True) thread.start() threads.append(thread) # Keep the main thread alive while self.cob_collection_active: time.sleep(1) # Join threads when collection is stopped for thread in threads: thread.join(timeout=1) def _get_websocket_cob_data(self, symbol: str) -> Optional[Dict]: """Get COB data from Enhanced WebSocket (primary source)""" try: # First check Enhanced COB WebSocket data if self.enhanced_cob_websocket: latest_data = self.enhanced_cob_websocket.latest_cob_data.get(symbol) if latest_data: # Check data freshness timestamp = latest_data.get('timestamp') if isinstance(timestamp, datetime): data_age = (datetime.now() - timestamp).total_seconds() if data_age < 5.0: # Data is fresh (less than 5 seconds old) logger.debug(f"✅ Using Enhanced WebSocket COB data for {symbol} (age: {data_age:.1f}s)") return latest_data else: logger.debug(f"⚠️ Enhanced WebSocket COB data for {symbol} is stale (age: {data_age:.1f}s)") else: # If no timestamp, assume it's fresh logger.debug(f"✅ Using Enhanced WebSocket COB data for {symbol} (no timestamp)") return latest_data # Fallback to cached WebSocket data if hasattr(self, 'cob_websocket_data') and symbol in self.cob_websocket_data: cached_data = self.cob_websocket_data[symbol] if cached_data: logger.debug(f"Using cached Enhanced WebSocket COB data for {symbol}") return cached_data # Check legacy COB data cache if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache: cached_data = self.cob_data_cache[symbol] if cached_data and isinstance(cached_data, dict): # Check if data is recent (within last 10 seconds as fallback) import time current_time = time.time() data_age = current_time - cached_data.get('timestamp', 0) if data_age < 10.0: # Allow slightly older data as fallback logger.debug(f"Using legacy WebSocket COB data for {symbol} (age: {data_age:.1f}s)") return cached_data logger.debug(f"No WebSocket COB data available for {symbol}") return None except Exception as e: logger.debug(f"Error getting WebSocket COB data for {symbol}: {e}") return None def _collect_cob_data_for_symbol(self, symbol: str): """Collect COB data for a specific symbol using Binance REST API with rate limiting""" try: import requests import time # Basic rate limiting check if not self._handle_rate_limit(f"https://api.binance.com/api/v3/depth"): logger.debug(f"Rate limited for {symbol}, skipping COB collection") return # Convert symbol format binance_symbol = symbol.replace('/', '').upper() # Get order book data with reduced limit to minimize load url = f"https://api.binance.com/api/v3/depth" params = { 'symbol': binance_symbol, 'limit': 50 # Reduced from 100 to 50 levels to reduce load } # Add headers to reduce detection headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, params=params, headers=headers, timeout=10) if response.status_code == 200: order_book = response.json() # Process and cache the data cob_snapshot = self._process_order_book_data(symbol, order_book) # Store in cache (ensure cache exists) if binance_symbol not in self.cob_data_cache: self.cob_data_cache[binance_symbol] = deque(maxlen=300) self.cob_data_cache[binance_symbol].append(cob_snapshot) # Distribute to COB data subscribers self._distribute_cob_data(symbol, cob_snapshot) elif response.status_code in [418, 429, 451]: logger.warning(f"Rate limited (HTTP {response.status_code}) for {symbol} COB collection") # Don't retry immediately, let the sleep in the worker handle it else: logger.debug(f"Failed to fetch COB data for {symbol}: {response.status_code}") except Exception as e: logger.debug(f"Error collecting COB data for {symbol}: {e}") def _process_order_book_data(self, symbol: str, order_book: dict) -> dict: """Process raw order book data into structured COB snapshot with multi-timeframe imbalance metrics""" try: bids = [[float(price), float(qty)] for price, qty in order_book.get('bids', [])] asks = [[float(price), float(qty)] for price, qty in order_book.get('asks', [])] # Calculate statistics total_bid_volume = sum(qty for _, qty in bids) total_ask_volume = sum(qty for _, qty in asks) best_bid = bids[0][0] if bids else 0 best_ask = asks[0][0] if asks else 0 mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0 spread = best_ask - best_bid if best_bid and best_ask else 0 spread_bps = (spread / mid_price * 10000) if mid_price > 0 else 0 # Calculate current imbalance imbalance = (total_bid_volume - total_ask_volume) / (total_bid_volume + total_ask_volume) if (total_bid_volume + total_ask_volume) > 0 else 0 # Calculate multi-timeframe imbalances binance_symbol = symbol.replace('/', '').upper() # Initialize imbalance metrics imbalance_1s = imbalance # Current imbalance is 1s imbalance_5s = imbalance # Default to current if not enough history imbalance_15s = imbalance imbalance_60s = imbalance # Calculate historical imbalances if we have enough data if binance_symbol in self.cob_data_cache: cache = list(self.cob_data_cache[binance_symbol]) now = datetime.now() # Get snapshots for different timeframes # Make sure we're using the actual timestamp from the snapshot snapshots_5s = [] snapshots_15s = [] snapshots_60s = [] for s in cache: if not isinstance(s, dict) or 'timestamp' not in s: continue snapshot_time = s['timestamp'] if not isinstance(snapshot_time, datetime): try: # Try to convert string timestamp to datetime if isinstance(snapshot_time, str): snapshot_time = datetime.fromisoformat(snapshot_time.replace('Z', '+00:00')) elif isinstance(snapshot_time, (int, float)): snapshot_time = datetime.fromtimestamp(snapshot_time) except: # If conversion fails, use current time minus index position snapshot_time = now - timedelta(seconds=cache.index(s)) time_diff = (now - snapshot_time).total_seconds() if time_diff <= 5: snapshots_5s.append(s) if time_diff <= 15: snapshots_15s.append(s) if time_diff <= 60: snapshots_60s.append(s) # Calculate imbalances for each timeframe using EMA for smoother results if snapshots_5s: # Calculate simple sum first bid_vol_5s = sum(s['stats'].get('bid_liquidity', 0) for s in snapshots_5s) ask_vol_5s = sum(s['stats'].get('ask_liquidity', 0) for s in snapshots_5s) total_vol_5s = bid_vol_5s + ask_vol_5s # Calculate volume-weighted average imbalance if total_vol_5s > 0: # Get individual imbalances imbalances = [s['stats'].get('imbalance', 0) for s in snapshots_5s] # Calculate EMA with alpha = 2/(n+1) alpha = 2.0 / (len(imbalances) + 1) ema = imbalances[0] if imbalances else 0 for val in imbalances[1:]: ema = alpha * val + (1 - alpha) * ema imbalance_5s = ema if snapshots_15s: # Calculate simple sum first bid_vol_15s = sum(s['stats'].get('bid_liquidity', 0) for s in snapshots_15s) ask_vol_15s = sum(s['stats'].get('ask_liquidity', 0) for s in snapshots_15s) total_vol_15s = bid_vol_15s + ask_vol_15s # Calculate volume-weighted average imbalance if total_vol_15s > 0: # Get individual imbalances imbalances = [s['stats'].get('imbalance', 0) for s in snapshots_15s] # Calculate EMA with alpha = 2/(n+1) alpha = 2.0 / (len(imbalances) + 1) ema = imbalances[0] if imbalances else 0 for val in imbalances[1:]: ema = alpha * val + (1 - alpha) * ema imbalance_15s = ema if snapshots_60s: # Calculate simple sum first bid_vol_60s = sum(s['stats'].get('bid_liquidity', 0) for s in snapshots_60s) ask_vol_60s = sum(s['stats'].get('ask_liquidity', 0) for s in snapshots_60s) total_vol_60s = bid_vol_60s + ask_vol_60s # Calculate volume-weighted average imbalance if total_vol_60s > 0: # Get individual imbalances imbalances = [s['stats'].get('imbalance', 0) for s in snapshots_60s] # Calculate EMA with alpha = 2/(n+1) alpha = 2.0 / (len(imbalances) + 1) ema = imbalances[0] if imbalances else 0 for val in imbalances[1:]: ema = alpha * val + (1 - alpha) * ema imbalance_60s = ema # Log imbalance values for debugging logger.debug(f"Imbalance values for {symbol}: 1s={imbalance_1s:.3f}, 5s={imbalance_5s:.3f}, 15s={imbalance_15s:.3f}, 60s={imbalance_60s:.3f}") cob_snapshot = { 'symbol': symbol, 'timestamp': datetime.now(), 'bids': bids[:20], # Top 20 levels 'asks': asks[:20], # Top 20 levels 'stats': { 'best_bid': best_bid, 'best_ask': best_ask, 'mid_price': mid_price, 'spread': spread, 'spread_bps': spread_bps, 'bid_liquidity': total_bid_volume, 'ask_liquidity': total_ask_volume, 'total_liquidity': total_bid_volume + total_ask_volume, 'imbalance': imbalance, 'imbalance_1s': imbalance_1s, 'imbalance_5s': imbalance_5s, 'imbalance_15s': imbalance_15s, 'imbalance_60s': imbalance_60s, 'levels': len(bids) + len(asks) } } return cob_snapshot except Exception as e: logger.error(f"Error processing order book data for {symbol}: {e}") return {} def start_training_data_collection(self): """Start training data collection for models""" if self.training_data_collection_active: logger.warning("Training data collection already active") return self.training_data_collection_active = True self.training_data_thread = Thread(target=self._training_data_collection_worker, daemon=True) self.training_data_thread.start() logger.info("Training data collection started") def _training_data_collection_worker(self): """Worker thread for training data collection""" import time logger.info("Training data collection worker started") while self.training_data_collection_active: try: # Collect training data for all symbols for symbol in self.symbols: training_sample = self._collect_training_sample(symbol) if training_sample: binance_symbol = symbol.replace('/', '').upper() self.training_data_cache[binance_symbol].append(training_sample) # Distribute to training data subscribers self._distribute_training_data(symbol, training_sample) # Sleep for 5 seconds between collections time.sleep(5) except Exception as e: logger.error(f"Error in training data collection worker: {e}") time.sleep(10) # Wait longer on error def _collect_training_sample(self, symbol: str) -> Optional[dict]: """Collect a training sample for a specific symbol""" try: # Get recent market data recent_data = self.get_historical_data(symbol, '1m', limit=100) if recent_data is None or len(recent_data) < 50: return None # Get recent ticks recent_ticks = self.get_recent_ticks(symbol, count=100) if len(recent_ticks) < 10: return None # Get COB data binance_symbol = symbol.replace('/', '').upper() recent_cob = list(self.cob_data_cache.get(binance_symbol, []))[-10:] if binance_symbol in self.cob_data_cache else [] # Create training sample training_sample = { 'symbol': symbol, 'timestamp': datetime.now(), 'ohlcv_data': recent_data.tail(50).to_dict('records'), 'tick_data': [ { 'price': tick.price, 'volume': tick.volume, 'timestamp': tick.timestamp } for tick in recent_ticks[-50:] ], 'cob_data': recent_cob, 'features': self._extract_training_features(symbol, recent_data, recent_ticks, recent_cob) } return training_sample except Exception as e: logger.error(f"Error collecting training sample for {symbol}: {e}") return None def _extract_training_features(self, symbol: str, ohlcv_data: pd.DataFrame, recent_ticks: List[MarketTick], recent_cob: List[dict]) -> dict: """Extract features for training from various data sources""" try: features = {} # OHLCV features if len(ohlcv_data) > 0: latest = ohlcv_data.iloc[-1] features.update({ 'price': latest['close'], 'volume': latest['volume'], 'price_change_1m': (latest['close'] - ohlcv_data.iloc[-2]['close']) / ohlcv_data.iloc[-2]['close'] if len(ohlcv_data) > 1 else 0, 'volume_ratio': latest['volume'] / ohlcv_data['volume'].mean() if len(ohlcv_data) > 1 else 1, 'volatility': ohlcv_data['close'].pct_change().std() if len(ohlcv_data) > 1 else 0 }) # Tick features if recent_ticks: tick_prices = [tick.price for tick in recent_ticks] tick_volumes = [tick.volume for tick in recent_ticks] features.update({ 'tick_price_std': np.std(tick_prices) if len(tick_prices) > 1 else 0, 'tick_volume_mean': np.mean(tick_volumes), 'tick_count': len(recent_ticks) }) # COB features if recent_cob: latest_cob = recent_cob[-1] if 'stats' in latest_cob: stats = latest_cob['stats'] features.update({ 'spread_bps': stats.get('spread_bps', 0), 'imbalance': stats.get('imbalance', 0), 'liquidity': stats.get('total_liquidity', 0), 'cob_levels': stats.get('levels', 0) }) return features except Exception as e: logger.error(f"Error extracting training features for {symbol}: {e}") return {} # ===== SUBSCRIPTION METHODS FOR MODELS AND DASHBOARD ===== def subscribe_to_cob_data(self, callback: Callable[[str, dict], None]) -> str: """Subscribe to COB data updates""" subscriber_id = str(uuid.uuid4()) self.cob_data_callbacks.append(callback) logger.info(f"COB data subscriber added: {subscriber_id}") return subscriber_id def subscribe_to_training_data(self, callback: Callable[[str, dict], None]) -> str: """Subscribe to training data updates""" subscriber_id = str(uuid.uuid4()) self.training_data_callbacks.append(callback) logger.info(f"Training data subscriber added: {subscriber_id}") return subscriber_id def subscribe_to_model_predictions(self, callback: Callable[[str, dict], None]) -> str: """Subscribe to model prediction updates""" subscriber_id = str(uuid.uuid4()) self.model_prediction_callbacks.append(callback) logger.info(f"Model prediction subscriber added: {subscriber_id}") return subscriber_id def _distribute_cob_data(self, symbol: str, cob_snapshot: dict): """Distribute COB data to all subscribers""" for callback in self.cob_data_callbacks: try: Thread(target=lambda: callback(symbol, cob_snapshot), daemon=True).start() except Exception as e: logger.error(f"Error distributing COB data: {e}") def _distribute_training_data(self, symbol: str, training_sample: dict): """Distribute training data to all subscribers""" for callback in self.training_data_callbacks: try: Thread(target=lambda: callback(symbol, training_sample), daemon=True).start() except Exception as e: logger.error(f"Error distributing training data: {e}") def _distribute_model_predictions(self, symbol: str, prediction: dict): """Distribute model predictions to all subscribers""" for callback in self.model_prediction_callbacks: try: Thread(target=lambda: callback(symbol, prediction), daemon=True).start() except Exception as e: logger.error(f"Error distributing model prediction: {e}") # ===== DATA ACCESS METHODS FOR MODELS AND DASHBOARD ===== def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]: """Get recent COB data for a symbol""" binance_symbol = symbol.replace('/', '').upper() if binance_symbol in self.cob_data_cache: return list(self.cob_data_cache[binance_symbol])[-count:] return [] def get_training_data(self, symbol: str, count: int = 100) -> List[dict]: """Get recent training data for a symbol""" binance_symbol = symbol.replace('/', '').upper() if binance_symbol in self.training_data_cache: return list(self.training_data_cache[binance_symbol])[-count:] return [] def collect_cob_data(self, symbol: str) -> dict: """ Collect Consolidated Order Book (COB) data for a symbol using REST API This centralized method collects COB data for all consumers (models, dashboard, etc.) """ try: import requests import time # Check rate limits before making request if not self._handle_rate_limit(f"https://api.binance.com/api/v3/depth"): logger.warning(f"Rate limited for {symbol}, using cached data") # Return cached data if available binance_symbol = symbol.replace('/', '').upper() if binance_symbol in self.cob_data_cache and self.cob_data_cache[binance_symbol]: return self.cob_data_cache[binance_symbol][-1] return {} # Use Binance REST API for order book data with reduced limit binance_symbol = symbol.replace('/', '') url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=100" # Reduced from 500 # Add headers to reduce detection headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() # Process order book data bids = [[float(price), float(qty)] for price, qty in data.get('bids', [])] asks = [[float(price), float(qty)] for price, qty in data.get('asks', [])] # Calculate mid price best_bid = bids[0][0] if bids else 0 best_ask = asks[0][0] if asks else 0 mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0 # Calculate order book stats bid_liquidity = sum(qty for _, qty in bids[:20]) ask_liquidity = sum(qty for _, qty in asks[:20]) total_liquidity = bid_liquidity + ask_liquidity # Calculate imbalance imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0 # Calculate spread in basis points spread = (best_ask - best_bid) / mid_price * 10000 if mid_price > 0 else 0 # Create COB snapshot cob_snapshot = { 'symbol': symbol, 'timestamp': int(time.time() * 1000), 'bids': bids[:50], # Limit to top 50 levels 'asks': asks[:50], # Limit to top 50 levels 'stats': { 'mid_price': mid_price, 'best_bid': best_bid, 'best_ask': best_ask, 'bid_liquidity': bid_liquidity, 'ask_liquidity': ask_liquidity, 'total_liquidity': total_liquidity, 'imbalance': imbalance, 'spread_bps': spread } } # Store in cache with self.subscriber_lock: if not hasattr(self, 'cob_data_cache'): self.cob_data_cache = {} if symbol not in self.cob_data_cache: self.cob_data_cache[symbol] = [] # Add to cache with max size limit self.cob_data_cache[symbol].append(cob_snapshot) if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes of 1s data self.cob_data_cache[symbol].pop(0) # Notify subscribers self._notify_cob_subscribers(symbol, cob_snapshot) return cob_snapshot elif response.status_code in [418, 429, 451]: logger.warning(f"Rate limited (HTTP {response.status_code}) for {symbol}, using cached data") # Return cached data if available binance_symbol = symbol.replace('/', '').upper() if binance_symbol in self.cob_data_cache and self.cob_data_cache[binance_symbol]: return self.cob_data_cache[binance_symbol][-1] return {} else: logger.warning(f"Failed to fetch COB data for {symbol}: {response.status_code}") return {} except Exception as e: logger.debug(f"Error collecting COB data for {symbol}: {e}") return {} def start_cob_collection(self): """ Start enhanced COB data collection with WebSocket and raw tick aggregation """ try: # Initialize COB WebSocket system self._initialize_enhanced_cob_websocket() # Start aggregation system self._start_cob_tick_aggregation() logger.info("Enhanced COB data collection started with WebSocket and tick aggregation") except Exception as e: logger.error(f"Error starting enhanced COB collection: {e}") # Fallback to REST-only collection self._start_rest_only_cob_collection() def _initialize_enhanced_cob_websocket(self): """Initialize the enhanced COB WebSocket system""" try: from .enhanced_cob_websocket import EnhancedCOBWebSocket # Initialize WebSocket with our symbols self.cob_websocket = EnhancedCOBWebSocket( symbols=['ETH/USDT', 'BTC/USDT'], dashboard_callback=self._on_cob_websocket_status ) # Add callback for COB data self.cob_websocket.add_cob_callback(self._on_cob_websocket_data) # Start WebSocket in background thread import threading import asyncio def run_websocket(): """Run WebSocket in separate thread with its own event loop""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.cob_websocket.start()) loop.run_forever() except Exception as e: logger.error(f"Error in COB WebSocket thread: {e}") websocket_thread = threading.Thread(target=run_websocket, daemon=True) websocket_thread.start() logger.info("Enhanced COB WebSocket initialized and started") except ImportError: logger.warning("Enhanced COB WebSocket not available, falling back to REST") self._start_rest_only_cob_collection() except Exception as e: logger.error(f"Error initializing COB WebSocket: {e}") self._start_rest_only_cob_collection() def _start_cob_tick_aggregation(self): """Start COB tick aggregation system""" try: # Initialize tick storage if not hasattr(self, 'cob_raw_ticks'): self.cob_raw_ticks = { 'ETH/USDT': [], 'BTC/USDT': [] } if not hasattr(self, 'cob_1s_aggregated'): self.cob_1s_aggregated = { 'ETH/USDT': [], 'BTC/USDT': [] } # Start aggregation thread import threading import time def tick_aggregator(): """Aggregate raw ticks into 1-second intervals""" logger.info("Starting COB tick aggregation system") while True: try: current_time = time.time() current_second = int(current_time) # Process each symbol for symbol in ['ETH/USDT', 'BTC/USDT']: self._aggregate_ticks_for_symbol(symbol, current_second) # Sleep until next second boundary sleep_time = 1.0 - (current_time % 1.0) time.sleep(sleep_time) except Exception as e: logger.error(f"Error in tick aggregation: {e}") time.sleep(1) aggregation_thread = threading.Thread(target=tick_aggregator, daemon=True) aggregation_thread.start() logger.info("COB tick aggregation system started") except Exception as e: logger.error(f"Error starting tick aggregation: {e}") def _start_rest_only_cob_collection(self): """Fallback to REST-only COB collection""" try: import threading import time def cob_collector(): """Collect COB data using REST API calls""" logger.info("Starting REST-only COB data collection") while True: try: # Collect data for both symbols for symbol in ['ETH/USDT', 'BTC/USDT']: self.collect_cob_data(symbol) # Sleep for 1 second between collections time.sleep(1) except Exception as e: logger.debug(f"Error in COB collection: {e}") time.sleep(5) # Wait longer on error # Start collector in background thread if not hasattr(self, '_cob_thread_started') or not self._cob_thread_started: cob_thread = threading.Thread(target=cob_collector, daemon=True) cob_thread.start() self._cob_thread_started = True logger.info("REST-only COB data collection started") except Exception as e: logger.error(f"Error starting REST-only COB collection: {e}") async def _on_cob_websocket_data(self, symbol: str, cob_data: dict): """Handle COB data from WebSocket (100+ updates per second)""" try: import time # Add timestamp if not present if 'timestamp' not in cob_data: cob_data['timestamp'] = time.time() elif hasattr(cob_data['timestamp'], 'timestamp'): # Convert datetime to timestamp cob_data['timestamp'] = cob_data['timestamp'].timestamp() # Store raw tick - ensure proper initialization if not hasattr(self, 'cob_raw_ticks'): self.cob_raw_ticks = {} # Ensure symbol keys exist in the dictionary for sym in ['ETH/USDT', 'BTC/USDT']: if sym not in self.cob_raw_ticks: self.cob_raw_ticks[sym] = [] # Add to raw ticks with size limit (keep last 10 seconds of data) max_ticks = 1000 # ~10 seconds at 100 updates/sec self.cob_raw_ticks[symbol].append(cob_data) if len(self.cob_raw_ticks[symbol]) > max_ticks: self.cob_raw_ticks[symbol] = self.cob_raw_ticks[symbol][-max_ticks:] # Update latest data cache for immediate access with self.subscriber_lock: if not hasattr(self, 'cob_data_cache'): self.cob_data_cache = {} # Ensure symbol key exists in the cache if symbol not in self.cob_data_cache: self.cob_data_cache[symbol] = [] # Convert WebSocket format to standard format standard_cob_data = { 'symbol': symbol, 'timestamp': int(cob_data['timestamp'] * 1000), # Convert to milliseconds 'bids': [[bid['price'], bid['size']] for bid in cob_data.get('bids', [])[:50]], 'asks': [[ask['price'], ask['size']] for ask in cob_data.get('asks', [])[:50]], 'stats': cob_data.get('stats', {}) } # Add to cache if symbol not in self.cob_data_cache: self.cob_data_cache[symbol] = [] elif not isinstance(self.cob_data_cache[symbol], (list, deque)): self.cob_data_cache[symbol] = [] self.cob_data_cache[symbol].append(standard_cob_data) if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes self.cob_data_cache[symbol].pop(0) # Notify subscribers self._notify_cob_subscribers(symbol, standard_cob_data) logger.debug(f"Processed WebSocket COB tick for {symbol}: {len(cob_data.get('bids', []))} bids, {len(cob_data.get('asks', []))} asks") except Exception as e: logger.error(f"Error processing WebSocket COB data for {symbol}: {e}", exc_info=True) def _on_cob_websocket_status(self, status_data: dict): """Handle WebSocket status updates""" try: symbol = status_data.get('symbol') status = status_data.get('status') message = status_data.get('message', '') logger.info(f"COB WebSocket status for {symbol}: {status} - {message}") except Exception as e: logger.error(f"Error handling WebSocket status: {e}") def _aggregate_ticks_for_symbol(self, symbol: str, current_second: int): """Aggregate raw ticks for a symbol into 1-second intervals""" try: if not hasattr(self, 'cob_raw_ticks') or symbol not in self.cob_raw_ticks: return # Get ticks for the previous second target_second = current_second - 1 target_ticks = [] # Filter ticks for the target second for tick in self.cob_raw_ticks[symbol]: tick_time = tick.get('timestamp', 0) if isinstance(tick_time, (int, float)): tick_second = int(tick_time) if tick_second == target_second: target_ticks.append(tick) if not target_ticks: return # Aggregate the ticks aggregated_data = self._create_1s_aggregation(symbol, target_ticks, target_second) # Store aggregated data if not hasattr(self, 'cob_1s_aggregated'): self.cob_1s_aggregated = {'ETH/USDT': [], 'BTC/USDT': []} self.cob_1s_aggregated[symbol].append(aggregated_data) # Keep only last 300 seconds (5 minutes) if len(self.cob_1s_aggregated[symbol]) > 300: self.cob_1s_aggregated[symbol] = self.cob_1s_aggregated[symbol][-300:] logger.debug(f"Aggregated {len(target_ticks)} ticks for {symbol} at second {target_second}") except Exception as e: logger.error(f"Error aggregating ticks for {symbol}: {e}") def _create_1s_aggregation(self, symbol: str, ticks: list, timestamp: int) -> dict: """Create 1-second aggregation from raw ticks""" try: if not ticks: return {} # Get first and last tick for open/close first_tick = ticks[0] last_tick = ticks[-1] # Extract price data prices = [] volumes = [] spreads = [] imbalances = [] best_bids = [] best_asks = [] for tick in ticks: stats = tick.get('stats', {}) if stats: mid_price = stats.get('mid_price', 0) if mid_price > 0: prices.append(mid_price) # Volume data bid_vol = stats.get('bid_volume', 0) ask_vol = stats.get('ask_volume', 0) total_vol = bid_vol + ask_vol if total_vol > 0: volumes.append(total_vol) # Spread data spread_bps = stats.get('spread_bps', 0) if spread_bps > 0: spreads.append(spread_bps) # Imbalance data imbalance = stats.get('imbalance', 0) imbalances.append(imbalance) # Best bid/ask best_bid = stats.get('best_bid', 0) best_ask = stats.get('best_ask', 0) if best_bid > 0: best_bids.append(best_bid) if best_ask > 0: best_asks.append(best_ask) # Calculate OHLC for prices if prices: open_price = prices[0] close_price = prices[-1] high_price = max(prices) low_price = min(prices) else: open_price = close_price = high_price = low_price = 0 # Calculate aggregated metrics avg_volume = sum(volumes) / len(volumes) if volumes else 0 avg_spread = sum(spreads) / len(spreads) if spreads else 0 avg_imbalance = sum(imbalances) / len(imbalances) if imbalances else 0 # Best bid/ask aggregation avg_best_bid = sum(best_bids) / len(best_bids) if best_bids else 0 avg_best_ask = sum(best_asks) / len(best_asks) if best_asks else 0 # Order book depth aggregation total_bid_levels = 0 total_ask_levels = 0 total_bid_liquidity = 0 total_ask_liquidity = 0 for tick in ticks: stats = tick.get('stats', {}) total_bid_levels += stats.get('bid_levels', 0) total_ask_levels += stats.get('ask_levels', 0) total_bid_liquidity += stats.get('bid_volume', 0) total_ask_liquidity += stats.get('ask_volume', 0) avg_bid_levels = total_bid_levels / len(ticks) if ticks else 0 avg_ask_levels = total_ask_levels / len(ticks) if ticks else 0 avg_bid_liquidity = total_bid_liquidity / len(ticks) if ticks else 0 avg_ask_liquidity = total_ask_liquidity / len(ticks) if ticks else 0 # Create aggregated data structure aggregated = { 'symbol': symbol, 'timestamp': timestamp, 'tick_count': len(ticks), 'price_ohlc': { 'open': open_price, 'high': high_price, 'low': low_price, 'close': close_price }, 'volume': { 'average': avg_volume, 'total_bid': total_bid_liquidity, 'total_ask': total_ask_liquidity, 'average_bid': avg_bid_liquidity, 'average_ask': avg_ask_liquidity }, 'spread': { 'average_bps': avg_spread, 'min_bps': min(spreads) if spreads else 0, 'max_bps': max(spreads) if spreads else 0 }, 'imbalance': { 'average': avg_imbalance, 'min': min(imbalances) if imbalances else 0, 'max': max(imbalances) if imbalances else 0 }, 'depth': { 'average_bid_levels': avg_bid_levels, 'average_ask_levels': avg_ask_levels, 'total_levels': avg_bid_levels + avg_ask_levels }, 'best_prices': { 'average_bid': avg_best_bid, 'average_ask': avg_best_ask, 'average_mid': (avg_best_bid + avg_best_ask) / 2 if (avg_best_bid > 0 and avg_best_ask > 0) else 0 }, 'raw_tick_data': { 'first_tick_time': first_tick.get('timestamp', 0), 'last_tick_time': last_tick.get('timestamp', 0), 'source': first_tick.get('source', 'unknown') } } return aggregated except Exception as e: logger.error(f"Error creating 1s aggregation for {symbol}: {e}") return {} def _notify_cob_subscribers(self, symbol: str, cob_snapshot: dict): """Notify subscribers of new COB data""" with self.subscriber_lock: if not hasattr(self, 'cob_subscribers'): self.cob_subscribers = {} # Notify all subscribers for this symbol for subscriber_id, callback in self.cob_subscribers.items(): try: callback(symbol, cob_snapshot) except Exception as e: logger.debug(f"Error notifying COB subscriber {subscriber_id}: {e}") def subscribe_to_cob(self, callback) -> str: """Subscribe to COB data updates""" with self.subscriber_lock: if not hasattr(self, 'cob_subscribers'): self.cob_subscribers = {} subscriber_id = str(uuid.uuid4()) self.cob_subscribers[subscriber_id] = callback # Start collection if not already started self.start_cob_collection() return subscriber_id def get_latest_cob_data(self, symbol: str) -> dict: """Get latest COB data for a symbol""" with self.subscriber_lock: # Use the original symbol format for cache lookup (matches how data is stored) logger.debug(f"Getting COB data for {symbol}") if not hasattr(self, 'cob_data_cache'): logger.debug("COB data cache not initialized") return {} if symbol not in self.cob_data_cache: logger.debug(f"Symbol {symbol} not in COB cache. Available: {list(self.cob_data_cache.keys())}") return {} if not self.cob_data_cache[symbol]: logger.debug(f"COB cache for {symbol} is empty") return {} latest_data = self.cob_data_cache[symbol][-1] logger.debug(f"Latest COB data type for {symbol}: {type(latest_data)}") return latest_data def get_cob_raw_ticks(self, symbol: str, count: int = 100) -> List[dict]: """Get raw COB ticks for a symbol (100+ updates per second)""" try: if not hasattr(self, 'cob_raw_ticks') or symbol not in self.cob_raw_ticks: return [] # Return the most recent 'count' ticks return list(self.cob_raw_ticks[symbol])[-count:] except Exception as e: logger.error(f"Error getting raw COB ticks for {symbol}: {e}") return [] def get_cob_1s_aggregated(self, symbol: str, count: int = 60) -> List[dict]: """Get 1-second aggregated COB data for a symbol""" try: if not hasattr(self, 'cob_1s_aggregated') or symbol not in self.cob_1s_aggregated: return [] # Return the most recent 'count' 1-second aggregations return list(self.cob_1s_aggregated[symbol])[-count:] except Exception as e: logger.error(f"Error getting 1s aggregated COB data for {symbol}: {e}") return [] def get_combined_ohlcv_cob_data(self, symbol: str, timeframe: str = '1s', count: int = 60) -> dict: """ Get combined OHLCV and COB data for model inputs Returns: dict: { 'ohlcv': DataFrame with OHLCV data, 'cob_1s': List of 1-second aggregated COB data, 'cob_raw_ticks': List of raw COB ticks, 'timestamps_aligned': bool } """ try: # Get OHLCV data ohlcv_data = self.get_historical_data(symbol, timeframe, limit=count, refresh=True) # Get COB data cob_1s_data = self.get_cob_1s_aggregated(symbol, count) cob_raw_ticks = self.get_cob_raw_ticks(symbol, count * 10) # More raw ticks # Check timestamp alignment timestamps_aligned = False if ohlcv_data is not None and cob_1s_data: try: # Get latest timestamps latest_ohlcv_time = ohlcv_data.index[-1].timestamp() if hasattr(ohlcv_data.index[-1], 'timestamp') else 0 latest_cob_time = cob_1s_data[-1].get('timestamp', 0) # Check if timestamps are within 5 seconds of each other time_diff = abs(latest_ohlcv_time - latest_cob_time) timestamps_aligned = time_diff <= 5 except Exception as e: logger.debug(f"Error checking timestamp alignment: {e}") result = { 'symbol': symbol, 'timeframe': timeframe, 'ohlcv': ohlcv_data, 'cob_1s': cob_1s_data, 'cob_raw_ticks': cob_raw_ticks, 'timestamps_aligned': timestamps_aligned, 'ohlcv_count': len(ohlcv_data) if ohlcv_data is not None else 0, 'cob_1s_count': len(cob_1s_data), 'cob_raw_count': len(cob_raw_ticks), 'data_quality': self._assess_data_quality(ohlcv_data, cob_1s_data, cob_raw_ticks) } logger.debug(f"Combined data for {symbol}: OHLCV={result['ohlcv_count']}, COB_1s={result['cob_1s_count']}, COB_raw={result['cob_raw_count']}, aligned={timestamps_aligned}") return result except Exception as e: logger.error(f"Error getting combined OHLCV+COB data for {symbol}: {e}") return { 'symbol': symbol, 'timeframe': timeframe, 'ohlcv': None, 'cob_1s': [], 'cob_raw_ticks': [], 'timestamps_aligned': False, 'ohlcv_count': 0, 'cob_1s_count': 0, 'cob_raw_count': 0, 'data_quality': 'error' } def _assess_data_quality(self, ohlcv_data, cob_1s_data, cob_raw_ticks) -> str: """Assess the quality of combined data""" try: # Check if we have all data types has_ohlcv = ohlcv_data is not None and not ohlcv_data.empty has_cob_1s = len(cob_1s_data) > 0 has_cob_raw = len(cob_raw_ticks) > 0 if has_ohlcv and has_cob_1s and has_cob_raw: # Check data freshness (within last 60 seconds) import time current_time = time.time() # Check OHLCV freshness ohlcv_fresh = False if has_ohlcv: try: latest_ohlcv_time = ohlcv_data.index[-1].timestamp() ohlcv_fresh = (current_time - latest_ohlcv_time) <= 60 except: pass # Check COB freshness cob_fresh = False if has_cob_1s: try: latest_cob_time = cob_1s_data[-1].get('timestamp', 0) cob_fresh = (current_time - latest_cob_time) <= 60 except: pass if ohlcv_fresh and cob_fresh: return 'excellent' elif has_ohlcv and has_cob_1s: return 'good' else: return 'fair' elif has_ohlcv and has_cob_1s: return 'good' elif has_ohlcv or has_cob_1s: return 'limited' else: return 'poor' except Exception as e: logger.error(f"Error assessing data quality: {e}") return 'unknown' def get_model_input_features(self, symbol: str, feature_count: int = 100) -> dict: """ Get comprehensive model input features combining OHLCV and COB data Returns: dict: { 'features': numpy array of shape (feature_count,), 'feature_names': list of feature names, 'timestamp': latest timestamp, 'data_sources': list of data sources used } """ try: import numpy as np # Get combined data combined_data = self.get_combined_ohlcv_cob_data(symbol, '1s', count=60) features = [] feature_names = [] data_sources = [] # OHLCV features (40 features) if combined_data['ohlcv'] is not None and not combined_data['ohlcv'].empty: ohlcv_df = combined_data['ohlcv'].tail(20) # Last 20 seconds data_sources.append('ohlcv') # Price features (20 features) for i, (_, row) in enumerate(ohlcv_df.iterrows()): if len(features) < 20: features.extend([ row.get('close', 0) / 100000, # Normalized price row.get('volume', 0) / 1000000, # Normalized volume ]) feature_names.extend([f'ohlcv_close_{i}', f'ohlcv_volume_{i}']) # Technical indicators (20 features) if len(ohlcv_df) > 0: latest_row = ohlcv_df.iloc[-1] tech_features = [ latest_row.get('sma_10', 0) / 100000, latest_row.get('sma_20', 0) / 100000, latest_row.get('ema_12', 0) / 100000, latest_row.get('ema_26', 0) / 100000, latest_row.get('rsi', 50) / 100, latest_row.get('macd', 0) / 1000, latest_row.get('bb_upper', 0) / 100000, latest_row.get('bb_lower', 0) / 100000, latest_row.get('atr', 0) / 1000, latest_row.get('adx', 0) / 100, ] # Pad to 20 features tech_features.extend([0.0] * (20 - len(tech_features))) features.extend(tech_features[:20]) feature_names.extend([f'tech_{i}' for i in range(20)]) else: # Pad with zeros if no OHLCV data features.extend([0.0] * 40) feature_names.extend([f'ohlcv_missing_{i}' for i in range(40)]) # COB 1s aggregated features (40 features) if combined_data['cob_1s']: data_sources.append('cob_1s') cob_1s_data = combined_data['cob_1s'][-20:] # Last 20 seconds for i, cob_data in enumerate(cob_1s_data): if len(features) < 80: # 40 OHLCV + 40 COB price_ohlc = cob_data.get('price_ohlc', {}) volume_data = cob_data.get('volume', {}) features.extend([ price_ohlc.get('close', 0) / 100000, # Normalized close price volume_data.get('average', 0) / 1000000, # Normalized volume ]) feature_names.extend([f'cob_1s_close_{i}', f'cob_1s_volume_{i}']) else: # Pad with zeros if no COB 1s data features.extend([0.0] * 40) feature_names.extend([f'cob_1s_missing_{i}' for i in range(40)]) # COB raw tick features (20 features) if combined_data['cob_raw_ticks']: data_sources.append('cob_raw') raw_ticks = combined_data['cob_raw_ticks'][-100:] # Last 100 ticks # Aggregate raw tick statistics if raw_ticks: spreads = [] imbalances = [] volumes = [] for tick in raw_ticks: stats = tick.get('stats', {}) if stats: spreads.append(stats.get('spread_bps', 0)) imbalances.append(stats.get('imbalance', 0)) volumes.append(stats.get('bid_volume', 0) + stats.get('ask_volume', 0)) # Statistical features from raw ticks raw_features = [ np.mean(spreads) / 100 if spreads else 0, # Average spread np.std(spreads) / 100 if spreads else 0, # Spread volatility np.mean(imbalances) if imbalances else 0, # Average imbalance np.std(imbalances) if imbalances else 0, # Imbalance volatility np.mean(volumes) / 1000000 if volumes else 0, # Average volume len(raw_ticks) / 100, # Tick frequency (normalized) ] # Pad to 20 features raw_features.extend([0.0] * (20 - len(raw_features))) features.extend(raw_features[:20]) feature_names.extend([f'cob_raw_{i}' for i in range(20)]) else: features.extend([0.0] * 20) feature_names.extend([f'cob_raw_empty_{i}' for i in range(20)]) else: # Pad with zeros if no raw tick data features.extend([0.0] * 20) feature_names.extend([f'cob_raw_missing_{i}' for i in range(20)]) # Ensure we have exactly the requested number of features if len(features) > feature_count: features = features[:feature_count] feature_names = feature_names[:feature_count] elif len(features) < feature_count: padding_needed = feature_count - len(features) features.extend([0.0] * padding_needed) feature_names.extend([f'padding_{i}' for i in range(padding_needed)]) # Get latest timestamp latest_timestamp = 0 if combined_data['ohlcv'] is not None and not combined_data['ohlcv'].empty: try: latest_timestamp = combined_data['ohlcv'].index[-1].timestamp() except: pass elif combined_data['cob_1s']: try: latest_timestamp = combined_data['cob_1s'][-1].get('timestamp', 0) except: pass result = { 'features': np.array(features, dtype=np.float32), 'feature_names': feature_names, 'timestamp': latest_timestamp, 'data_sources': data_sources, 'data_quality': combined_data['data_quality'], 'feature_count': len(features) } logger.debug(f"Generated {len(features)} model features for {symbol} from sources: {data_sources}") return result except Exception as e: logger.error(f"Error generating model input features for {symbol}: {e}") return { 'features': np.zeros(feature_count, dtype=np.float32), 'feature_names': [f'error_{i}' for i in range(feature_count)], 'timestamp': 0, 'data_sources': [], 'data_quality': 'error', 'feature_count': feature_count } def get_cob_data(self, symbol: str, count: int = 50) -> List[dict]: """Get recent COB data for a symbol""" with self.subscriber_lock: # Use the original symbol format for cache lookup (matches how data is stored) if not hasattr(self, 'cob_data_cache') or symbol not in self.cob_data_cache: return [] # Return the most recent 'count' snapshots return list(self.cob_data_cache[symbol])[-count:] def get_data_summary(self) -> dict: """Get summary of all collected data""" summary = { 'symbols': self.symbols, 'subscribers': { 'tick_subscribers': len(self.subscribers), 'cob_subscribers': len(self.cob_data_callbacks), 'training_subscribers': len(self.training_data_callbacks), 'prediction_subscribers': len(self.model_prediction_callbacks) }, 'data_counts': {}, 'collection_status': { 'cob_collection': self.cob_collection_active, 'training_collection': self.training_data_collection_active, 'streaming': self.is_streaming } } # Add data counts for each symbol for symbol in self.symbols: binance_symbol = symbol.replace('/', '').upper() summary['data_counts'][symbol] = { 'ticks': len(self.tick_buffers.get(binance_symbol, [])), 'cob_snapshots': len(self.cob_data_cache.get(binance_symbol, [])), 'training_samples': len(self.training_data_cache.get(binance_symbol, [])) } return summary def _update_price_buckets(self, symbol: str, cob_data: Dict): """Update price-level buckets based on new COB data.""" try: bids = cob_data.get('bids', []) asks = cob_data.get('asks', []) for size in self.bucket_sizes: bid_buckets = self._calculate_buckets(bids, size) ask_buckets = self._calculate_buckets(asks, size) bucketed_data = { 'symbol': symbol, 'timestamp': datetime.now(), 'bucket_size': size, 'bids': bid_buckets, 'asks': ask_buckets } if symbol not in self.bucketed_cob_data: self.bucketed_cob_data[symbol] = {} self.bucketed_cob_data[symbol][size] = bucketed_data # Distribute to subscribers self._distribute_bucketed_data(symbol, size, bucketed_data) except Exception as e: logger.error(f"Error updating price buckets for {symbol}: {e}") def _calculate_buckets(self, levels: List[Dict], bucket_size: int) -> Dict[float, float]: """Calculates aggregated volume for price buckets.""" buckets = {} for level in levels: price = level.get('price', 0) volume = level.get('volume', 0) if price > 0 and volume > 0: bucket = math.floor(price / bucket_size) * bucket_size if bucket not in buckets: buckets[bucket] = 0 buckets[bucket] += volume return buckets def subscribe_to_bucketed_cob(self, bucket_size: int, callback: Callable): """Subscribe to bucketed COB data.""" if bucket_size in self.bucketed_cob_callbacks: self.bucketed_cob_callbacks[bucket_size].append(callback) logger.info(f"New subscriber for ${bucket_size} bucketed COB data.") else: logger.warning(f"Bucket size {bucket_size} not supported.") def _distribute_bucketed_data(self, symbol: str, bucket_size: int, data: Dict): """Distribute bucketed data to subscribers.""" if bucket_size in self.bucketed_cob_callbacks: for callback in self.bucketed_cob_callbacks[bucket_size]: try: callback(symbol, data) except Exception as e: logger.error(f"Error in bucketed COB callback: {e}")