""" Multi-Timeframe, Multi-Symbol Data Provider This module consolidates all data functionality including: - Historical data fetching from Binance API - Real-time data streaming via WebSocket - Multi-timeframe candle generation - Caching and data management - Technical indicators calculation - Williams Market Structure pivot points with monthly data analysis - Pivot-based feature normalization for improved model training - Centralized data distribution to multiple subscribers (AI models, dashboard, etc.) """ import asyncio import json import logging import os import time import uuid import websockets import requests import pandas as pd import numpy as np import pickle from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field import ta from threading import Thread, Lock from collections import deque from .config import get_config from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar from .cnn_monitor import log_cnn_prediction logger = logging.getLogger(__name__) @dataclass class PivotBounds: """Pivot-based normalization bounds derived from Williams Market Structure""" symbol: str price_max: float price_min: float volume_max: float volume_min: float pivot_support_levels: List[float] pivot_resistance_levels: List[float] pivot_context: Dict[str, Any] created_timestamp: datetime data_period_start: datetime data_period_end: datetime total_candles_analyzed: int def get_price_range(self) -> float: """Get price range for normalization""" return self.price_max - self.price_min def normalize_price(self, price: float) -> float: """Normalize price using pivot bounds""" return (price - self.price_min) / self.get_price_range() def get_nearest_support_distance(self, current_price: float) -> float: """Get distance to nearest support level (normalized)""" if not self.pivot_support_levels: return 0.5 distances = [abs(current_price - s) for s in self.pivot_support_levels] return min(distances) / self.get_price_range() def get_nearest_resistance_distance(self, current_price: float) -> float: """Get distance to nearest resistance level (normalized)""" if not self.pivot_resistance_levels: return 0.5 distances = [abs(current_price - r) for r in self.pivot_resistance_levels] return min(distances) / self.get_price_range() @dataclass class MarketTick: """Standardized market tick data structure""" symbol: str timestamp: datetime price: float volume: float quantity: float side: str # 'buy' or 'sell' trade_id: str is_buyer_maker: bool raw_data: Dict[str, Any] = field(default_factory=dict) @dataclass class DataSubscriber: """Data subscriber information""" subscriber_id: str callback: Callable[[MarketTick], None] symbols: List[str] active: bool = True last_update: datetime = field(default_factory=datetime.now) tick_count: int = 0 subscriber_name: str = "unknown" class DataProvider: """Unified data provider for historical and real-time market data with centralized distribution""" def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): """Initialize the data provider""" self.config = get_config() self.symbols = symbols or self.config.symbols self.timeframes = timeframes or self.config.timeframes # Cache settings (initialize first) self.cache_enabled = self.config.data.get('cache_enabled', True) self.cache_dir = Path(self.config.data.get('cache_dir', 'cache')) self.cache_dir.mkdir(parents=True, exist_ok=True) # Data storage self.historical_data = {} # {symbol: {timeframe: DataFrame}} self.real_time_data = {} # {symbol: {timeframe: deque}} self.current_prices = {} # {symbol: float} # Pivot-based normalization system self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds} self.pivot_cache_dir = self.cache_dir / 'pivot_bounds' self.pivot_cache_dir.mkdir(parents=True, exist_ok=True) self.pivot_refresh_interval = timedelta(days=1) # Refresh pivot bounds daily self.monthly_data_cache_dir = self.cache_dir / 'monthly_1s_data' self.monthly_data_cache_dir.mkdir(parents=True, exist_ok=True) # Real-time processing self.websocket_tasks = {} self.is_streaming = False self.data_lock = Lock() # Subscriber management for centralized data distribution self.subscribers: Dict[str, DataSubscriber] = {} self.subscriber_lock = Lock() self.tick_buffers: Dict[str, deque] = {} self.buffer_size = 1000 # Keep last 1000 ticks per symbol # Initialize tick buffers for symbol in self.symbols: binance_symbol = symbol.replace('/', '').upper() self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size) # BOM (Book of Market) data caching - 1s resolution for last 5 minutes self.bom_cache_duration = 300 # 5 minutes in seconds self.bom_feature_count = 120 # Number of BOM features per timestamp self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)} # Initialize BOM cache for each symbol for symbol in self.symbols: # Store 300 seconds worth of 1s BOM data self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration) # Initialize tick aggregator for raw tick processing binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols] self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols) # Raw tick and OHLCV bar callbacks self.raw_tick_callbacks = [] self.ohlcv_bar_callbacks = [] # Performance tracking for subscribers self.distribution_stats = { 'total_ticks_received': 0, 'total_ticks_distributed': 0, 'distribution_errors': 0, 'last_tick_time': {}, 'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols}, 'raw_ticks_processed': 0, 'ohlcv_bars_created': 0, 'patterns_detected': 0 } # Data validation self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols} self.price_change_threshold = 0.1 # 10% price change threshold for validation # Timeframe conversion self.timeframe_seconds = { '1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } # Load existing pivot bounds from cache self._load_all_pivot_bounds() logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") logger.info("Centralized data distribution enabled") logger.info("Pivot-based normalization system enabled") # Rate limiting self.last_request_time = {} self.request_interval = 0.2 # 200ms between requests self.retry_delay = 60 # 1 minute retry delay for 451 errors self.max_retries = 3 def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure dataframe has proper datetime index""" if df is None or df.empty: return df try: # If we already have a proper DatetimeIndex, return as is if isinstance(df.index, pd.DatetimeIndex): return df # If timestamp column exists, use it as index if 'timestamp' in df.columns: df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) return df # If we have a RangeIndex or other non-datetime index, create datetime index if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex): # Use current time and work backwards for realistic timestamps from datetime import datetime, timedelta end_time = datetime.now() start_time = end_time - timedelta(minutes=len(df)) df.index = pd.date_range(start=start_time, end=end_time, periods=len(df)) logger.debug(f"Converted RangeIndex to DatetimeIndex for {len(df)} records") return df except Exception as e: logger.warning(f"Error ensuring datetime index: {e}") return df def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]: """Get historical OHLCV data for a symbol and timeframe""" try: # If refresh=True, always fetch fresh data (skip cache for real-time updates) if not refresh: if self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) >= limit * 0.8: # Ensure proper datetime index for cached data cached_data = self._ensure_datetime_index(cached_data) # logger.info(f"Using cached data for {symbol} {timeframe}") return cached_data.tail(limit) # Check if we need to preload 300s of data for first load should_preload = self._should_preload_data(symbol, timeframe, limit) if should_preload: logger.info(f"Preloading 300s of data for {symbol} {timeframe}") df = self._preload_300s_data(symbol, timeframe) else: # Fetch from API with requested limit (Binance primary, MEXC fallback) logger.info(f"Fetching historical data for {symbol} {timeframe}") df = self._fetch_from_binance(symbol, timeframe, limit) # Fallback to MEXC if Binance fails if df is None or df.empty: logger.info(f"Binance failed, trying MEXC fallback for {symbol}") df = self._fetch_from_mexc(symbol, timeframe, limit) if df is not None and not df.empty: # Ensure proper datetime index df = self._ensure_datetime_index(df) # Add technical indicators. temporarily disabled to save time as it is not working as expected. # df = self._add_technical_indicators(df) # Cache the data if self.cache_enabled: self._save_to_cache(df, symbol, timeframe) # Store in memory if symbol not in self.historical_data: self.historical_data[symbol] = {} self.historical_data[symbol][timeframe] = df # Return requested amount return df.tail(limit) logger.warning(f"No data received for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}") return None def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool: """Determine if we should preload 300s of data""" try: # Check if we have any cached data if self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) > 0: return False # Already have some data # Check if we have data in memory if (symbol in self.historical_data and timeframe in self.historical_data[symbol] and len(self.historical_data[symbol][timeframe]) > 0): return False # Already have data in memory # Calculate if 300s worth of data would be more than requested limit timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) candles_in_300s = 300 // timeframe_seconds # Preload if we need more than the requested limit or if it's a short timeframe if candles_in_300s > limit or timeframe in ['1s', '1m']: return True return False except Exception as e: logger.error(f"Error determining if should preload data: {e}") return False def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Preload 300 seconds worth of data for better initial performance""" try: # Calculate how many candles we need for 300 seconds timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles # For very short timeframes, limit to reasonable amount if timeframe == '1s': candles_needed = min(candles_needed, 300) # Max 300 1s candles elif timeframe == '1m': candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour) else: candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)") # Fetch the data (Binance primary, MEXC fallback) df = self._fetch_from_binance(symbol, timeframe, candles_needed) # Fallback to MEXC if Binance fails if df is None or df.empty: logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}") df = self._fetch_from_mexc(symbol, timeframe, candles_needed) if df is not None and not df.empty: logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}") return df else: logger.warning(f"Failed to preload data for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}") return None def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]: """Preload 300s of data for all symbols and timeframes""" try: if timeframes is None: timeframes = self.timeframes preload_results = {} for symbol in self.symbols: preload_results[symbol] = {} for timeframe in timeframes: try: logger.info(f"Preloading data for {symbol} {timeframe}") # Check if we should preload if self._should_preload_data(symbol, timeframe, 100): df = self._preload_300s_data(symbol, timeframe) if df is not None and not df.empty: # Add technical indicators df = self._add_technical_indicators(df) # Cache the data if self.cache_enabled: self._save_to_cache(df, symbol, timeframe) # Store in memory if symbol not in self.historical_data: self.historical_data[symbol] = {} self.historical_data[symbol][timeframe] = df preload_results[symbol][timeframe] = True logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}") else: preload_results[symbol][timeframe] = False logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}") else: preload_results[symbol][timeframe] = True # Already have data logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)") except Exception as e: logger.error(f"Error preloading {symbol} {timeframe}: {e}") preload_results[symbol][timeframe] = False # Log summary total_pairs = len(self.symbols) * len(timeframes) successful_pairs = sum(1 for symbol_results in preload_results.values() for success in symbol_results.values() if success) logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded") return preload_results except Exception as e: logger.error(f"Error in preload_all_symbols_data: {e}") return {} def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from MEXC API (fallback data source when Binance is unavailable)""" try: # MEXC doesn't support 1s intervals if timeframe == '1s': logger.warning(f"MEXC doesn't support 1s intervals, skipping {symbol}") return None # Convert symbol format mexc_symbol = symbol.replace('/', '').upper() # Convert timeframe for MEXC (excluding 1s) timeframe_map = { '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } mexc_timeframe = timeframe_map.get(timeframe) if mexc_timeframe is None: logger.warning(f"MEXC doesn't support timeframe {timeframe}, skipping {symbol}") return None # MEXC API request url = "https://api.mexc.com/api/v3/klines" params = { 'symbol': mexc_symbol, 'interval': mexc_timeframe, 'limit': limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Convert to DataFrame (MEXC uses 8 columns vs Binance's 12) df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.sort_values('timestamp').reset_index(drop=True) logger.info(f"MEXC: Fetched {len(df)} candles for {symbol} {timeframe}") return df except Exception as e: logger.error(f"MEXC: Error fetching data: {e}") return None def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from Binance API (primary data source) with HTTP 451 error handling""" try: # Convert symbol format binance_symbol = symbol.replace('/', '').upper() # Convert timeframe (now includes 1s support) timeframe_map = { '1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } binance_timeframe = timeframe_map.get(timeframe, '1h') # API request with timeout and better headers url = "https://api.binance.com/api/v3/klines" params = { 'symbol': binance_symbol, 'interval': binance_timeframe, 'limit': limit } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json', 'Connection': 'keep-alive' } response = requests.get(url, params=params, headers=headers, timeout=10) # Handle HTTP 451 (Unavailable For Legal Reasons) specifically if response.status_code == 451: logger.warning(f"Binance API returned 451 (blocked) for {symbol} {timeframe} - using fallback") return self._get_fallback_data(symbol, timeframe, limit) response.raise_for_status() data = response.json() # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.sort_values('timestamp').reset_index(drop=True) logger.info(f"Binance: Fetched {len(df)} candles for {symbol} {timeframe}") return df except Exception as e: if "451" in str(e) or "Client Error" in str(e): logger.warning(f"Binance API access blocked (451) for {symbol} {timeframe} - using fallback") return self._get_fallback_data(symbol, timeframe, limit) else: logger.error(f"Error fetching from Binance API: {e}") return self._get_fallback_data(symbol, timeframe, limit) def _get_fallback_data(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Get fallback data when Binance API is unavailable - REAL DATA ONLY""" try: logger.info(f"FALLBACK: Attempting to get real cached data for {symbol} {timeframe}") # ONLY try cached data cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and not cached_data.empty: # Limit to requested amount limited_data = cached_data.tail(limit) if len(cached_data) > limit else cached_data logger.info(f"FALLBACK: Using cached real data for {symbol} {timeframe}: {len(limited_data)} bars") return limited_data # Try MEXC as secondary real data source mexc_data = self._fetch_from_mexc(symbol, timeframe, limit) if mexc_data is not None and not mexc_data.empty: logger.info(f"FALLBACK: Using MEXC real data for {symbol} {timeframe}: {len(mexc_data)} bars") return mexc_data # NO SYNTHETIC DATA - Return None if no real data available logger.warning(f"FALLBACK: No real data available for {symbol} {timeframe} - waiting for real data") return None except Exception as e: logger.error(f"Error getting fallback data: {e}") return None def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add comprehensive technical indicators AND pivot-based normalization context""" try: df = df.copy() # Ensure we have enough data for indicators if len(df) < 50: logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows") return self._add_basic_indicators(df) # === EXISTING TECHNICAL INDICATORS === # Moving averages (multiple timeframes) df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10) df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50) df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12) df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26) df['ema_50'] = ta.trend.ema_indicator(df['close'], window=50) # MACD family macd = ta.trend.MACD(df['close']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_histogram'] = macd.macd_diff() # ADX (Average Directional Index) adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close']) df['adx'] = adx.adx() df['adx_pos'] = adx.adx_pos() df['adx_neg'] = adx.adx_neg() # Parabolic SAR psar = ta.trend.PSARIndicator(df['high'], df['low'], df['close']) df['psar'] = psar.psar() # === MOMENTUM INDICATORS === # RSI (multiple periods) df['rsi_14'] = ta.momentum.rsi(df['close'], window=14) df['rsi_7'] = ta.momentum.rsi(df['close'], window=7) df['rsi_21'] = ta.momentum.rsi(df['close'], window=21) # Stochastic Oscillator stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close']) df['stoch_k'] = stoch.stoch() df['stoch_d'] = stoch.stoch_signal() # Williams %R df['williams_r'] = ta.momentum.williams_r(df['high'], df['low'], df['close']) # Ultimate Oscillator (instead of CCI which isn't available) df['ultimate_osc'] = ta.momentum.ultimate_oscillator(df['high'], df['low'], df['close']) # === VOLATILITY INDICATORS === # Bollinger Bands bollinger = ta.volatility.BollingerBands(df['close']) df['bb_upper'] = bollinger.bollinger_hband() df['bb_lower'] = bollinger.bollinger_lband() df['bb_middle'] = bollinger.bollinger_mavg() df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle'] df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) # Average True Range df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close']) # Keltner Channels keltner = ta.volatility.KeltnerChannel(df['high'], df['low'], df['close']) df['keltner_upper'] = keltner.keltner_channel_hband() df['keltner_lower'] = keltner.keltner_channel_lband() df['keltner_middle'] = keltner.keltner_channel_mband() # === VOLUME INDICATORS === # Volume moving averages df['volume_sma_10'] = df['volume'].rolling(window=10).mean() df['volume_sma_20'] = df['volume'].rolling(window=20).mean() df['volume_sma_50'] = df['volume'].rolling(window=50).mean() # On Balance Volume df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume']) # Volume Price Trend df['vpt'] = ta.volume.volume_price_trend(df['close'], df['volume']) # Money Flow Index df['mfi'] = ta.volume.money_flow_index(df['high'], df['low'], df['close'], df['volume']) # Accumulation/Distribution Line df['ad_line'] = ta.volume.acc_dist_index(df['high'], df['low'], df['close'], df['volume']) # Volume Weighted Average Price (VWAP) df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum() # === PRICE ACTION INDICATORS === # Price position relative to range df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low']) # True Range (use ATR calculation for true range) df['true_range'] = df['atr'] # ATR is based on true range, so use it directly # Rate of Change df['roc'] = ta.momentum.roc(df['close'], window=10) # === CUSTOM INDICATORS === # Trend strength (combination of multiple trend indicators) df['trend_strength'] = ( (df['close'] > df['sma_20']).astype(int) + (df['sma_10'] > df['sma_20']).astype(int) + (df['macd'] > df['macd_signal']).astype(int) + (df['adx'] > 25).astype(int) ) / 4.0 # Momentum composite df['momentum_composite'] = ( (df['rsi_14'] / 100) + ((df['stoch_k'] + 50) / 100) + # Normalize stoch_k ((df['williams_r'] + 50) / 100) # Normalize williams_r ) / 3.0 # Volatility regime df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True) # === WILLIAMS MARKET STRUCTURE PIVOT CONTEXT === # Check if we need to refresh pivot bounds for this symbol symbol = self._extract_symbol_from_dataframe(df) if symbol and self._should_refresh_pivot_bounds(symbol): logger.info(f"Refreshing pivot bounds for {symbol}") self._refresh_pivot_bounds_for_symbol(symbol) # Add pivot-based context features if symbol and symbol in self.pivot_bounds: df = self._add_pivot_context_features(df, symbol) # === FILL NaN VALUES === # Forward fill first, then backward fill, then zero fill df = df.ffill().bfill().fillna(0) logger.debug(f"Added technical indicators + pivot context for {len(df)} rows") return df except Exception as e: logger.error(f"Error adding comprehensive technical indicators: {e}") # Fallback to basic indicators return self._add_basic_indicators(df) # === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM === def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]: """Collect 30 days of 1m candles with smart gap-filling cache system""" try: # Check for cached data and determine what we need to fetch cached_data = self._load_monthly_data_from_cache(symbol) end_time = datetime.now() start_time = end_time - timedelta(days=30) if cached_data is not None and not cached_data.empty: logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles") # Check cache data range cache_start = cached_data['timestamp'].min() cache_end = cached_data['timestamp'].max() logger.info(f"Cache range: {cache_start} to {cache_end}") # Remove data older than 30 days cached_data = cached_data[cached_data['timestamp'] >= start_time] # Check if we need to fill gaps gap_start = cache_end + timedelta(minutes=1) if gap_start < end_time: # Need to fill gap from cache_end to now logger.info(f"Filling gap from {gap_start} to {end_time}") gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time) if gap_data is not None and not gap_data.empty: # Combine cached data with gap data monthly_df = pd.concat([cached_data, gap_data], ignore_index=True) monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) logger.info(f"Combined cache + gap: {len(monthly_df)} total candles") else: monthly_df = cached_data logger.info(f"Using cached data only: {len(monthly_df)} candles") else: monthly_df = cached_data logger.info(f"Cache is up to date: {len(monthly_df)} candles") else: # No cache - fetch full 30 days logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}") monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time) if monthly_df is not None and not monthly_df.empty: # Final cleanup: ensure exactly 30 days monthly_df = monthly_df[monthly_df['timestamp'] >= start_time] monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True) logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}") # Update cache self._save_monthly_data_to_cache(symbol, monthly_df) return monthly_df else: logger.error(f"No monthly 1m data collected for {symbol}") return None except Exception as e: logger.error(f"Error collecting monthly 1m data for {symbol}: {e}") return None def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]: """Fetch a batch of 1s candles ending at specific time""" try: binance_symbol = symbol.replace('/', '').upper() # Convert end_time to milliseconds end_ms = int(end_time.timestamp() * 1000) # API request url = "https://api.binance.com/api/v3/klines" params = { 'symbol': binance_symbol, 'interval': '1s', 'endTime': end_ms, 'limit': limit } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() if not data: return None # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] return df except Exception as e: logger.error(f"Error fetching 1s batch for {symbol}: {e}") return None def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]: """Fetch 1m candles for a specific time range with efficient batching""" try: # Convert symbol format for Binance API if '/' in symbol: api_symbol = symbol.replace('/', '') else: api_symbol = symbol logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}") all_candles = [] current_start = start_time batch_size = 1000 # Binance limit api_calls_made = 0 while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days try: # Calculate end time for this batch batch_end = min(current_start + timedelta(minutes=batch_size), end_time) # Convert to milliseconds start_timestamp = int(current_start.timestamp() * 1000) end_timestamp = int(batch_end.timestamp() * 1000) # Binance API call url = "https://api.binance.com/api/v3/klines" params = { 'symbol': api_symbol, 'interval': '1m', 'startTime': start_timestamp, 'endTime': end_timestamp, 'limit': batch_size } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'application/json' } response = requests.get(url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() api_calls_made += 1 if not data: logger.warning(f"No data returned for batch {current_start} to {batch_end}") break # Convert to DataFrame batch_df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms') for col in ['open', 'high', 'low', 'close', 'volume']: batch_df[col] = batch_df[col].astype(float) # Keep only OHLCV columns batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] all_candles.append(batch_df) # Move to next batch (add 1 minute to avoid overlap) current_start = batch_end + timedelta(minutes=1) # Rate limiting (Binance allows 1200/min) time.sleep(0.05) # 50ms delay # Progress logging if api_calls_made % 10 == 0: total_candles = sum(len(df) for df in all_candles) logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected") except Exception as e: logger.error(f"Error in batch {current_start} to {batch_end}: {e}") current_start += timedelta(minutes=batch_size) time.sleep(1) # Wait longer on error continue if not all_candles: logger.error(f"No data collected for {symbol}") return None # Combine all batches df = pd.concat(all_candles, ignore_index=True) df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)") return df except Exception as e: logger.error(f"Error fetching 1m data range for {symbol}: {e}") return None def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]: """Extract pivot bounds using Williams Market Structure analysis""" try: logger.info(f"Analyzing {len(monthly_data)} candles for pivot extraction...") # Convert DataFrame to numpy array format expected by Williams Market Structure ohlcv_array = monthly_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy() # Convert timestamp to numeric for Williams analysis ohlcv_array['timestamp'] = ohlcv_array['timestamp'].astype(np.int64) // 10**9 # Convert to seconds ohlcv_array = ohlcv_array.to_numpy() # Initialize Williams Market Structure analyzer try: from training.williams_market_structure import WilliamsMarketStructure williams = WilliamsMarketStructure( swing_strengths=[2, 3, 5, 8], # Multi-strength pivot detection enable_cnn_feature=False # We just want pivot data, not CNN training ) # Calculate 5 levels of recursive pivot points logger.info("Running Williams Market Structure analysis...") pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array) except ImportError: logger.warning("Williams Market Structure not available, using simplified pivot detection") pivot_levels = self._simple_pivot_detection(monthly_data) # Extract bounds from pivot analysis bounds = self._extract_bounds_from_pivot_levels(symbol, monthly_data, pivot_levels) return bounds except Exception as e: logger.error(f"Error extracting pivot bounds for {symbol}: {e}") return None def _extract_bounds_from_pivot_levels(self, symbol: str, monthly_data: pd.DataFrame, pivot_levels: Dict[str, Any]) -> PivotBounds: """Extract normalization bounds from Williams pivot levels""" try: # Initialize bounds price_max = monthly_data['high'].max() price_min = monthly_data['low'].min() volume_max = monthly_data['volume'].max() volume_min = monthly_data['volume'].min() support_levels = [] resistance_levels = [] # Extract pivot points from all Williams levels for level_key, level_data in pivot_levels.items(): if level_data and hasattr(level_data, 'swing_points') and level_data.swing_points: # Get prices from swing points level_prices = [sp.price for sp in level_data.swing_points] # Update overall price bounds price_max = max(price_max, max(level_prices)) price_min = min(price_min, min(level_prices)) # Extract support and resistance levels if hasattr(level_data, 'support_levels') and level_data.support_levels: support_levels.extend(level_data.support_levels) if hasattr(level_data, 'resistance_levels') and level_data.resistance_levels: resistance_levels.extend(level_data.resistance_levels) # Remove duplicates and sort support_levels = sorted(list(set(support_levels))) resistance_levels = sorted(list(set(resistance_levels))) # Create PivotBounds object bounds = PivotBounds( symbol=symbol, price_max=float(price_max), price_min=float(price_min), volume_max=float(volume_max), volume_min=float(volume_min), pivot_support_levels=support_levels, pivot_resistance_levels=resistance_levels, pivot_context=pivot_levels, created_timestamp=datetime.now(), data_period_start=monthly_data['timestamp'].min(), data_period_end=monthly_data['timestamp'].max(), total_candles_analyzed=len(monthly_data) ) logger.info(f"Extracted pivot bounds for {symbol}:") logger.info(f" Price range: ${bounds.price_min:.2f} - ${bounds.price_max:.2f}") logger.info(f" Volume range: {bounds.volume_min:.2f} - {bounds.volume_max:.2f}") logger.info(f" Support levels: {len(bounds.pivot_support_levels)}") logger.info(f" Resistance levels: {len(bounds.pivot_resistance_levels)}") return bounds except Exception as e: logger.error(f"Error extracting bounds from pivot levels: {e}") # Fallback to simple min/max bounds return PivotBounds( symbol=symbol, price_max=float(monthly_data['high'].max()), price_min=float(monthly_data['low'].min()), volume_max=float(monthly_data['volume'].max()), volume_min=float(monthly_data['volume'].min()), pivot_support_levels=[], pivot_resistance_levels=[], pivot_context={}, created_timestamp=datetime.now(), data_period_start=monthly_data['timestamp'].min(), data_period_end=monthly_data['timestamp'].max(), total_candles_analyzed=len(monthly_data) ) def _simple_pivot_detection(self, monthly_data: pd.DataFrame) -> Dict[str, Any]: """Simple pivot detection fallback when Williams Market Structure is not available""" try: # Simple high/low pivot detection using rolling windows highs = monthly_data['high'] lows = monthly_data['low'] # Find local maxima and minima using different windows pivot_highs = [] pivot_lows = [] for window in [5, 10, 20, 50]: if len(monthly_data) > window * 2: # Rolling max/min detection rolling_max = highs.rolling(window=window, center=True).max() rolling_min = lows.rolling(window=window, center=True).min() # Find pivot highs (local maxima) high_pivots = monthly_data[highs == rolling_max]['high'].tolist() pivot_highs.extend(high_pivots) # Find pivot lows (local minima) low_pivots = monthly_data[lows == rolling_min]['low'].tolist() pivot_lows.extend(low_pivots) # Create mock level structure mock_level = type('MockLevel', (), { 'swing_points': [], 'support_levels': list(set(pivot_lows)), 'resistance_levels': list(set(pivot_highs)) })() return {'level_0': mock_level} except Exception as e: logger.error(f"Error in simple pivot detection: {e}") return {} def _should_refresh_pivot_bounds(self, symbol: str) -> bool: """Check if pivot bounds need refreshing""" try: if symbol not in self.pivot_bounds: return True bounds = self.pivot_bounds[symbol] age = datetime.now() - bounds.created_timestamp return age > self.pivot_refresh_interval except Exception as e: logger.error(f"Error checking pivot bounds refresh: {e}") return True def _refresh_pivot_bounds_for_symbol(self, symbol: str): """Refresh pivot bounds for a specific symbol""" try: # Collect monthly 1m data monthly_data = self._collect_monthly_1m_data(symbol) if monthly_data is None or monthly_data.empty: logger.warning(f"Could not collect monthly data for {symbol}") return # Extract pivot bounds bounds = self._extract_pivot_bounds_from_monthly_data(symbol, monthly_data) if bounds is None: logger.warning(f"Could not extract pivot bounds for {symbol}") return # Store bounds self.pivot_bounds[symbol] = bounds # Save to cache self._save_pivot_bounds_to_cache(symbol, bounds) logger.info(f"Successfully refreshed pivot bounds for {symbol}") except Exception as e: logger.error(f"Error refreshing pivot bounds for {symbol}: {e}") def _add_pivot_context_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame: """Add pivot-derived context features for normalization""" try: if symbol not in self.pivot_bounds: return df bounds = self.pivot_bounds[symbol] current_prices = df['close'] # Distance to nearest support/resistance levels (normalized) df['pivot_support_distance'] = current_prices.apply(bounds.get_nearest_support_distance) df['pivot_resistance_distance'] = current_prices.apply(bounds.get_nearest_resistance_distance) # Price position within pivot range (0 = price_min, 1 = price_max) df['pivot_price_position'] = current_prices.apply(bounds.normalize_price).clip(0, 1) # Add binary features for proximity to key levels price_range = bounds.get_price_range() proximity_threshold = price_range * 0.02 # 2% of price range df['near_pivot_support'] = 0 df['near_pivot_resistance'] = 0 for price in current_prices: # Check if near any support level if any(abs(price - s) <= proximity_threshold for s in bounds.pivot_support_levels): df.loc[df['close'] == price, 'near_pivot_support'] = 1 # Check if near any resistance level if any(abs(price - r) <= proximity_threshold for r in bounds.pivot_resistance_levels): df.loc[df['close'] == price, 'near_pivot_resistance'] = 1 logger.debug(f"Added pivot context features for {symbol}") return df except Exception as e: logger.warning(f"Error adding pivot context features for {symbol}: {e}") return df def _extract_symbol_from_dataframe(self, df: pd.DataFrame) -> Optional[str]: """Extract symbol from dataframe context (basic implementation)""" # This is a simple implementation - in a real system, you might pass symbol explicitly # or store it as metadata in the dataframe for symbol in self.symbols: # Check if this dataframe might belong to this symbol based on current processing return symbol # Return first symbol for now - can be improved return None # === PIVOT BOUNDS CACHING === def _load_all_pivot_bounds(self): """Load all cached pivot bounds on startup""" try: for symbol in self.symbols: bounds = self._load_pivot_bounds_from_cache(symbol) if bounds: self.pivot_bounds[symbol] = bounds logger.info(f"Loaded cached pivot bounds for {symbol}") except Exception as e: logger.error(f"Error loading pivot bounds from cache: {e}") def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]: """Load pivot bounds from cache""" try: cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" if cache_file.exists(): with open(cache_file, 'rb') as f: bounds = pickle.load(f) # Check if bounds are still valid (not too old) age = datetime.now() - bounds.created_timestamp if age <= self.pivot_refresh_interval: return bounds else: logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)") return None except Exception as e: logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}") return None def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds): """Save pivot bounds to cache""" try: cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" with open(cache_file, 'wb') as f: pickle.dump(bounds, f) logger.debug(f"Saved pivot bounds to cache for {symbol}") except Exception as e: logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}") def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]: """Load monthly 1m data from cache""" try: cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" if cache_file.exists(): try: df = pd.read_parquet(cache_file) logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}") return df except Exception as parquet_e: # Handle corrupted Parquet file if "Parquet magic bytes not found" in str(parquet_e) or "corrupted" in str(parquet_e).lower(): logger.warning(f"Corrupted Parquet cache file for {symbol}, removing and returning None: {parquet_e}") try: cache_file.unlink() # Delete corrupted file except Exception: pass return None else: raise parquet_e return None except Exception as e: logger.warning(f"Error loading monthly data from cache for {symbol}: {e}") return None def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame): """Save monthly 1m data to cache""" try: cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" df.to_parquet(cache_file, index=False) logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}") except Exception as e: logger.warning(f"Error saving monthly data to cache for {symbol}: {e}") def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]: """Get pivot bounds for a symbol""" return self.pivot_bounds.get(symbol) def get_pivot_normalized_features(self, symbol: str, df: pd.DataFrame) -> Optional[pd.DataFrame]: """Get dataframe with pivot-normalized features""" try: if symbol not in self.pivot_bounds: logger.warning(f"No pivot bounds available for {symbol}") return df bounds = self.pivot_bounds[symbol] normalized_df = df.copy() # Normalize price columns using pivot bounds price_range = bounds.get_price_range() for col in ['open', 'high', 'low', 'close']: if col in normalized_df.columns: normalized_df[col] = (normalized_df[col] - bounds.price_min) / price_range # Normalize volume using pivot bounds volume_range = bounds.volume_max - bounds.volume_min if volume_range > 0 and 'volume' in normalized_df.columns: normalized_df['volume'] = (normalized_df['volume'] - bounds.volume_min) / volume_range return normalized_df except Exception as e: logger.error(f"Error applying pivot normalization for {symbol}: {e}") return df def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add basic indicators for small datasets""" try: df = df.copy() # Basic moving averages if len(df) >= 20: df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12) # Basic RSI if len(df) >= 14: df['rsi_14'] = ta.momentum.rsi(df['close'], window=14) # Basic volume indicators if len(df) >= 10: df['volume_sma_10'] = df['volume'].rolling(window=10).mean() # Basic price action df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low']) df['price_position'] = df['price_position'].fillna(0.5) # Default to middle # Fill NaN values df = df.ffill().bfill().fillna(0) return df except Exception as e: logger.error(f"Error adding basic indicators: {e}") return df def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: """Load data from cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" if cache_file.exists(): # Check if cache is recent - stricter rules for startup cache_age = time.time() - cache_file.stat().st_mtime # For 1m data, use cache only if less than 5 minutes old to avoid gaps if timeframe == '1m': max_age = 300 # 5 minutes else: max_age = 3600 # 1 hour for other timeframes if cache_age < max_age: try: df = pd.read_parquet(cache_file) logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe} (age: {cache_age/60:.1f}min)") return df except Exception as parquet_e: # Handle corrupted Parquet file if "Parquet magic bytes not found" in str(parquet_e) or "corrupted" in str(parquet_e).lower(): logger.warning(f"Corrupted Parquet cache file for {symbol} {timeframe}, removing and returning None: {parquet_e}") try: cache_file.unlink() # Delete corrupted file except Exception: pass return None else: raise parquet_e else: logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/60:.1f}min > {max_age/60:.1f}min)") return None except Exception as e: logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}") return None def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str): """Save data to cache""" try: cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet" df.to_parquet(cache_file, index=False) logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}") except Exception as e: logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}") async def start_real_time_streaming(self): """Start real-time data streaming for all symbols""" if self.is_streaming: logger.warning("Real-time streaming already active") return self.is_streaming = True logger.info("Starting real-time data streaming") # Start WebSocket for each symbol for symbol in self.symbols: task = asyncio.create_task(self._websocket_stream(symbol)) self.websocket_tasks[symbol] = task async def stop_real_time_streaming(self): """Stop real-time data streaming""" if not self.is_streaming: return logger.info("Stopping real-time data streaming") self.is_streaming = False # Cancel all WebSocket tasks for symbol, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() async def _websocket_stream(self, symbol: str): """WebSocket stream for a single symbol using trade stream for better granularity""" binance_symbol = symbol.replace('/', '').upper() url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade" while self.is_streaming: try: logger.info(f"Connecting to WebSocket for {symbol}: {url}") async with websockets.connect(url) as websocket: logger.info(f"WebSocket connected for {symbol}") async for message in websocket: if not self.is_streaming: break try: await self._process_trade_message(binance_symbol, message) except Exception as e: logger.warning(f"Error processing trade message for {symbol}: {e}") except Exception as e: logger.error(f"WebSocket error for {symbol}: {e}") self.distribution_stats['distribution_errors'] += 1 if self.is_streaming: logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...") await asyncio.sleep(5) async def _process_trade_message(self, symbol: str, message: str): """Process incoming trade message and distribute to subscribers""" try: trade_data = json.loads(message) # Extract trade information price = float(trade_data.get('p', 0)) quantity = float(trade_data.get('q', 0)) timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000) is_buyer_maker = trade_data.get('m', False) trade_id = trade_data.get('t', '') # Calculate volume in USDT volume_usdt = price * quantity # Data validation if not self._validate_tick_data(symbol, price, volume_usdt): logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}") return # Process raw tick through aggregator side = 'sell' if is_buyer_maker else 'buy' raw_tick, completed_bar = self.tick_aggregator.process_tick( symbol=symbol, timestamp=timestamp, price=price, volume=volume_usdt, quantity=quantity, side=side, trade_id=str(trade_id) ) # Update statistics self.distribution_stats['total_ticks_received'] += 1 self.distribution_stats['ticks_per_symbol'][symbol] += 1 self.distribution_stats['last_tick_time'][symbol] = timestamp self.last_prices[symbol] = price if raw_tick: self.distribution_stats['raw_ticks_processed'] += 1 # Notify raw tick callbacks for callback in self.raw_tick_callbacks: try: callback(raw_tick) except Exception as e: logger.error(f"Error in raw tick callback: {e}") if completed_bar: self.distribution_stats['ohlcv_bars_created'] += 1 # Notify OHLCV bar callbacks for callback in self.ohlcv_bar_callbacks: try: callback(completed_bar) except Exception as e: logger.error(f"Error in OHLCV bar callback: {e}") # Create standardized tick for legacy compatibility tick = MarketTick( symbol=symbol, timestamp=timestamp, price=price, volume=volume_usdt, quantity=quantity, side=side, trade_id=str(trade_id), is_buyer_maker=is_buyer_maker, raw_data=trade_data ) # Add to buffer self.tick_buffers[symbol].append(tick) # Update current prices and candles await self._process_tick(symbol, tick) # Distribute to all subscribers self._distribute_tick(tick) except Exception as e: logger.error(f"Error processing trade message for {symbol}: {e}") async def _process_tick(self, symbol: str, tick: MarketTick): """Process a single tick and update candles""" try: # Update current price with self.data_lock: self.current_prices[symbol] = tick.price # Initialize real-time data structure if needed if symbol not in self.real_time_data: self.real_time_data[symbol] = {} for tf in self.timeframes: self.real_time_data[symbol][tf] = deque(maxlen=1000) # Create tick record for candle updates tick_record = { 'timestamp': tick.timestamp, 'price': tick.price, 'volume': tick.volume } # Update all timeframes for timeframe in self.timeframes: self._update_candle(symbol, timeframe, tick_record) except Exception as e: logger.error(f"Error processing tick for {symbol}: {e}") def _update_candle(self, symbol: str, timeframe: str, tick: Dict): """Update candle for specific timeframe""" try: timeframe_secs = self.timeframe_seconds.get(timeframe, 3600) current_time = tick['timestamp'] # Calculate candle start time using proper datetime truncation if isinstance(current_time, datetime): timestamp_seconds = current_time.timestamp() else: timestamp_seconds = current_time.timestamp() if hasattr(current_time, 'timestamp') else current_time # Truncate to timeframe boundary candle_start_seconds = int(timestamp_seconds // timeframe_secs) * timeframe_secs candle_start = datetime.fromtimestamp(candle_start_seconds) # Get current candle queue candle_queue = self.real_time_data[symbol][timeframe] # Check if we need a new candle if not candle_queue or candle_queue[-1]['timestamp'] != candle_start: # Create new candle new_candle = { 'timestamp': candle_start, 'open': tick['price'], 'high': tick['price'], 'low': tick['price'], 'close': tick['price'], 'volume': tick['volume'] } candle_queue.append(new_candle) else: # Update existing candle current_candle = candle_queue[-1] current_candle['high'] = max(current_candle['high'], tick['price']) current_candle['low'] = min(current_candle['low'], tick['price']) current_candle['close'] = tick['price'] current_candle['volume'] += tick['volume'] except Exception as e: logger.error(f"Error updating candle for {symbol} {timeframe}: {e}") def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame: """Get the latest candles combining historical and real-time data""" try: # Get historical data historical_df = self.get_historical_data(symbol, timeframe, limit=limit) # Get real-time data with self.data_lock: if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]: real_time_candles = list(self.real_time_data[symbol][timeframe]) if real_time_candles: # Convert to DataFrame rt_df = pd.DataFrame(real_time_candles) if historical_df is not None: # Combine historical and real-time # Remove overlapping candles from historical data if not rt_df.empty: cutoff_time = rt_df['timestamp'].min() historical_df = historical_df[historical_df['timestamp'] < cutoff_time] # Concatenate combined_df = pd.concat([historical_df, rt_df], ignore_index=True) else: combined_df = rt_df return combined_df.tail(limit) # Return just historical data if no real-time data return historical_df.tail(limit) if historical_df is not None else pd.DataFrame() except Exception as e: logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}") return pd.DataFrame() def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol from latest candle""" try: # Try to get from 1s candle first (most recent) for tf in ['1s', '1m', '5m', '1h']: df = self.get_latest_candles(symbol, tf, limit=1) if df is not None and not df.empty: return float(df.iloc[-1]['close']) # Fallback to any available data key = f"{symbol}_{self.timeframes[0]}" if key in self.historical_data and not self.historical_data[key].empty: return float(self.historical_data[key].iloc[-1]['close']) logger.warning(f"No price data available for {symbol}") return None except Exception as e: logger.error(f"Error getting current price for {symbol}: {e}") return None def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]: """Get price at specific index for backtesting""" try: key = f"{symbol}_{timeframe}" if key in self.historical_data: df = self.historical_data[key] if 0 <= index < len(df): return float(df.iloc[index]['close']) return None except Exception as e: logger.error(f"Error getting price at index {index}: {e}") return None def get_feature_matrix(self, symbol: str, timeframes: List[str] = None, window_size: int = 20) -> Optional[np.ndarray]: """ Get comprehensive feature matrix for multiple timeframes with technical indicators Returns: np.ndarray: Shape (n_timeframes, window_size, n_features) Each timeframe becomes a separate channel for CNN """ try: if timeframes is None: timeframes = self.timeframes feature_channels = [] common_feature_names = None # First pass: determine common features across all timeframes timeframe_features = {} for tf in timeframes: logger.debug(f"Processing timeframe {tf} for {symbol}") df = self.get_latest_candles(symbol, tf, limit=window_size + 100) if df is None or len(df) < window_size: logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows") continue # Get feature columns basic_cols = ['open', 'high', 'low', 'close', 'volume'] indicator_cols = [col for col in df.columns if col not in basic_cols + ['timestamp'] and not col.startswith('unnamed')] selected_features = self._select_cnn_features(df, basic_cols, indicator_cols) timeframe_features[tf] = (df, selected_features) if common_feature_names is None: common_feature_names = set(selected_features) else: common_feature_names = common_feature_names.intersection(set(selected_features)) if not common_feature_names: logger.error(f"No common features found across timeframes for {symbol}") return None # Convert to sorted list for consistent ordering common_feature_names = sorted(list(common_feature_names)) # logger.info(f"Using {len(common_feature_names)} common features: {common_feature_names}") # Second pass: create feature channels with common features for tf in timeframes: if tf not in timeframe_features: continue df, _ = timeframe_features[tf] # Use only common features try: tf_features = self._normalize_features(df[common_feature_names].tail(window_size), symbol=symbol) if tf_features is not None and len(tf_features) == window_size: feature_channels.append(tf_features.values) logger.debug(f"Added {len(common_feature_names)} features for {tf}") else: logger.warning(f"Feature normalization failed for {tf}") except Exception as e: logger.error(f"Error processing features for {tf}: {e}") continue if not feature_channels: logger.error(f"No valid feature channels created for {symbol}") return None # Verify all channels have the same shape shapes = [channel.shape for channel in feature_channels] if len(set(shapes)) > 1: logger.error(f"Shape mismatch in feature channels: {shapes}") return None # Stack all timeframe channels feature_matrix = np.stack(feature_channels, axis=0) logger.debug(f"Created feature matrix for {symbol}: {feature_matrix.shape} " f"({len(feature_channels)} timeframes, {window_size} steps, {len(common_feature_names)} features)") return feature_matrix except Exception as e: logger.error(f"Error creating feature matrix for {symbol}: {e}") import traceback logger.error(traceback.format_exc()) return None def _select_cnn_features(self, df: pd.DataFrame, basic_cols: List[str], indicator_cols: List[str]) -> List[str]: """Select the most important features for CNN training""" try: selected = [] # Always include basic OHLCV (normalized) selected.extend(basic_cols) # Priority indicators (most informative for CNNs) priority_indicators = [ # Trend indicators 'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50', 'macd', 'macd_signal', 'macd_histogram', 'adx', 'adx_pos', 'adx_neg', 'psar', # Momentum indicators 'rsi_14', 'rsi_7', 'rsi_21', 'stoch_k', 'stoch_d', 'williams_r', 'ultimate_osc', # Volatility indicators 'bb_upper', 'bb_lower', 'bb_middle', 'bb_width', 'bb_percent', 'atr', 'keltner_upper', 'keltner_lower', 'keltner_middle', # Volume indicators 'volume_sma_10', 'volume_sma_20', 'obv', 'vpt', 'mfi', 'ad_line', 'vwap', # Price action 'price_position', 'true_range', 'roc', # Custom composites 'trend_strength', 'momentum_composite', 'volatility_regime' ] # Add available priority indicators for indicator in priority_indicators: if indicator in indicator_cols: selected.append(indicator) # Add any other technical indicators not in priority list (limit to avoid curse of dimensionality) remaining_indicators = [col for col in indicator_cols if col not in selected] if remaining_indicators: # Limit to 10 additional indicators selected.extend(remaining_indicators[:10]) # Verify all selected features exist in dataframe final_selected = [col for col in selected if col in df.columns] logger.debug(f"Selected {len(final_selected)} features from {len(df.columns)} available columns") return final_selected except Exception as e: logger.error(f"Error selecting CNN features: {e}") return basic_cols # Fallback to basic OHLCV def _normalize_features(self, df: pd.DataFrame, symbol: str = None) -> Optional[pd.DataFrame]: """Normalize features for CNN training using pivot-based bounds when available""" try: df_norm = df.copy() # Try to use pivot-based normalization if available if symbol and symbol in self.pivot_bounds: bounds = self.pivot_bounds[symbol] price_range = bounds.get_price_range() # Normalize price-based features using pivot bounds price_cols = ['open', 'high', 'low', 'close', 'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50', 'bb_upper', 'bb_lower', 'bb_middle', 'keltner_upper', 'keltner_lower', 'keltner_middle', 'psar', 'vwap'] for col in price_cols: if col in df_norm.columns: # Use pivot bounds for normalization df_norm[col] = (df_norm[col] - bounds.price_min) / price_range # Normalize volume using pivot bounds if 'volume' in df_norm.columns: volume_range = bounds.volume_max - bounds.volume_min if volume_range > 0: df_norm['volume'] = (df_norm['volume'] - bounds.volume_min) / volume_range else: df_norm['volume'] = 0.5 # Default to middle if no volume range logger.debug(f"Applied pivot-based normalization for {symbol}") else: # Use symbol-grouped normalization with consistent ranges df_norm = self._apply_symbol_grouped_normalization(df_norm, symbol) # Fill any remaining NaN values df_norm = df_norm.fillna(0.0) return df_norm except Exception as e: logger.error(f"Error normalizing features for {symbol}: {e}") return df.fillna(0.0) if df is not None else None def _apply_symbol_grouped_normalization(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame: """Apply symbol-grouped normalization with consistent ranges across timeframes""" try: df_norm = df.copy() # Get symbol-specific price ranges for consistent normalization symbol_price_ranges = { 'ETH/USDT': {'min': 1000, 'max': 5000}, # ETH price range 'BTC/USDT': {'min': 90000, 'max': 120000} # BTC price range } if symbol in symbol_price_ranges: price_range = symbol_price_ranges[symbol] range_size = price_range['max'] - price_range['min'] # Normalize price columns to [0, 1] range specific to symbol price_cols = ['open', 'high', 'low', 'close'] for col in price_cols: if col in df_norm.columns: df_norm[col] = (df_norm[col] - price_range['min']) / range_size df_norm[col] = np.clip(df_norm[col], 0, 1) # Ensure [0,1] range # Normalize volume to [0, 1] using log scale if 'volume' in df_norm.columns: df_norm['volume'] = np.log1p(df_norm['volume']) vol_max = df_norm['volume'].max() if vol_max > 0: df_norm['volume'] = df_norm['volume'] / vol_max logger.debug(f"Applied symbol-grouped normalization for {symbol}") # Fill any NaN values df_norm = df_norm.fillna(0) return df_norm except Exception as e: logger.error(f"Error in symbol-grouped normalization for {symbol}: {e}") return df def get_historical_data_for_inference(self, symbol: str, timeframe: str, limit: int = 300) -> Optional[pd.DataFrame]: """Get normalized historical data specifically for model inference""" try: # Get raw historical data raw_df = self.get_historical_data(symbol, timeframe, limit) if raw_df is None or raw_df.empty: return None # Apply normalization for inference normalized_df = self._normalize_features(raw_df, symbol) logger.debug(f"Retrieved normalized historical data for inference: {symbol} {timeframe} ({len(normalized_df)} records)") return normalized_df except Exception as e: logger.error(f"Error getting normalized historical data for inference: {symbol} {timeframe}: {e}") return None def get_multi_symbol_features_for_inference(self, symbols_timeframes: List[Tuple[str, str]], limit: int = 300) -> Dict[str, Dict[str, pd.DataFrame]]: """Get normalized multi-symbol feature matrices for model inference""" try: logger.info("Preparing normalized multi-symbol features for model inference...") symbol_features = {} for symbol, timeframe in symbols_timeframes: if symbol not in symbol_features: symbol_features[symbol] = {} # Get normalized data for inference normalized_df = self.get_historical_data_for_inference(symbol, timeframe, limit) if normalized_df is not None and not normalized_df.empty: symbol_features[symbol][timeframe] = normalized_df logger.debug(f"Prepared normalized features for {symbol} {timeframe}: {len(normalized_df)} records") else: logger.warning(f"No normalized data available for {symbol} {timeframe}") symbol_features[symbol][timeframe] = None return symbol_features except Exception as e: logger.error(f"Error preparing multi-symbol features for inference: {e}") return {} def get_cnn_features_for_inference(self, symbol: str, timeframe: str = '1m', window_size: int = 60) -> Optional[np.ndarray]: """Get normalized CNN features for a specific symbol and timeframe""" try: # Get normalized data df = self.get_historical_data_for_inference(symbol, timeframe, limit=300) if df is None or df.empty: return None # Extract recent window for CNN recent_data = df.tail(window_size) # Extract OHLCV features features = recent_data[['open', 'high', 'low', 'close', 'volume']].values logger.debug(f"Extracted CNN features for {symbol} {timeframe}: {features.shape}") return features.flatten() except Exception as e: logger.error(f"Error extracting CNN features for {symbol} {timeframe}: {e}") return None def get_dqn_state_for_inference(self, symbols_timeframes: List[Tuple[str, str]], target_size: int = 100) -> Optional[np.ndarray]: """Get normalized DQN state vector combining multiple symbols and timeframes""" try: state_components = [] for symbol, timeframe in symbols_timeframes: df = self.get_historical_data_for_inference(symbol, timeframe, limit=50) if df is not None and not df.empty: # Extract key features for state latest = df.iloc[-1] state_features = [ latest['close'], # Current price (normalized) latest['volume'], # Current volume (normalized) df['close'].pct_change().iloc[-1] if len(df) > 1 else 0, # Price change ] state_components.extend(state_features) if state_components: # Pad or truncate to expected DQN state size if len(state_components) < target_size: state_components.extend([0] * (target_size - len(state_components))) else: state_components = state_components[:target_size] state_vector = np.array(state_components, dtype=np.float32) logger.debug(f"Created DQN state vector: {len(state_vector)} dimensions") return state_vector return None except Exception as e: logger.error(f"Error creating DQN state for inference: {e}") return None def get_transformer_sequences_for_inference(self, symbols_timeframes: List[Tuple[str, str]], seq_length: int = 150) -> List[np.ndarray]: """Get normalized sequences for transformer inference""" try: sequences = [] for symbol, timeframe in symbols_timeframes: df = self.get_historical_data_for_inference(symbol, timeframe, limit=300) if df is not None and not df.empty: # Use last seq_length points as sequence sequence = df.tail(seq_length)[['open', 'high', 'low', 'close', 'volume']].values sequences.append(sequence) logger.debug(f"Created transformer sequence for {symbol} {timeframe}: {sequence.shape}") return sequences except Exception as e: logger.error(f"Error creating transformer sequences for inference: {e}") return []