""" Historical Data Loader - Integrates with existing DataProvider Provides data loading and caching for the annotation UI, ensuring the same data quality and structure used by training and inference systems. """ import logging from typing import Dict, List, Optional, Tuple from datetime import datetime, timedelta, timezone import pandas as pd from pathlib import Path import pickle import time logger = logging.getLogger(__name__) class HistoricalDataLoader: """ Loads historical data from the main system's DataProvider Ensures consistency with training/inference data """ def __init__(self, data_provider): """ Initialize with existing DataProvider Args: data_provider: Instance of core.data_provider.DataProvider """ self.data_provider = data_provider self.cache_dir = Path("ANNOTATE/data/cache") self.cache_dir.mkdir(parents=True, exist_ok=True) # Cache for recently loaded data self.memory_cache = {} self.cache_ttl = timedelta(minutes=5) # Startup mode - allow stale cache for faster loading self.startup_mode = True logger.info("HistoricalDataLoader initialized with existing DataProvider (startup mode: ON)") def get_data(self, symbol: str, timeframe: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 2500, direction: str = 'latest') -> Optional[pd.DataFrame]: """ Get historical data for symbol and timeframe Args: symbol: Trading pair (e.g., 'ETH/USDT') timeframe: Timeframe (e.g., '1s', '1m', '1h', '1d') start_time: Start time for data range end_time: End time for data range limit: Maximum number of candles to return direction: 'latest' (most recent), 'before' (older data), 'after' (newer data) Returns: DataFrame with OHLCV data or None if unavailable """ start_time_ms = time.time() # Check memory cache first (exclude direction from cache key for infinite scroll) cache_key = f"{symbol}_{timeframe}_{start_time}_{end_time}_{limit}" # Determine TTL based on timeframe current_ttl = self.cache_ttl if timeframe == '1s': current_ttl = timedelta(seconds=1) elif timeframe == '1m': current_ttl = timedelta(seconds=5) # For 'after' direction (incremental updates), we should force a refresh if cache is stale # or simply bypass cache for 1s/1m to ensure we get the absolute latest bypass_cache = (direction == 'after' and timeframe in ['1s', '1m']) if cache_key in self.memory_cache and direction == 'latest' and not bypass_cache: cached_data, cached_time = self.memory_cache[cache_key] if datetime.now() - cached_time < current_ttl: # For 1s/1m, we want to return immediately if valid if timeframe not in ['1s', '1m']: elapsed_ms = (time.time() - start_time_ms) * 1000 logger.debug(f"Memory cache hit for {symbol} {timeframe} ({elapsed_ms:.1f}ms)") return cached_data try: # FORCE refresh for 1s/1m if requesting latest data OR incremental update # Also force refresh for live updates (small limit + direction='latest' + no time range) is_live_update = (direction == 'latest' and not start_time and not end_time and limit <= 5) force_refresh = (timeframe in ['1s', '1m'] and (bypass_cache or (not start_time and not end_time))) or is_live_update if is_live_update: logger.debug(f"Live update detected for {symbol} {timeframe} (limit={limit}, direction={direction}) - forcing refresh") # Try to get data from DataProvider's cached data first (most efficient) if hasattr(self.data_provider, 'cached_data'): with self.data_provider.data_lock: cached_df = self.data_provider.cached_data.get(symbol, {}).get(timeframe) if cached_df is not None and not cached_df.empty: # If time range is specified, check if cached data covers it use_cached_data = True if start_time or end_time: if isinstance(cached_df.index, pd.DatetimeIndex): cache_start = cached_df.index.min() cache_end = cached_df.index.max() # Check if requested range is within cached range if start_time and start_time < cache_start: use_cached_data = False elif end_time and end_time > cache_end: use_cached_data = False elif start_time and end_time: # Both specified - check if range overlaps if end_time < cache_start or start_time > cache_end: use_cached_data = False # Use cached data if we have enough candles and it covers the range if use_cached_data and len(cached_df) >= min(limit, 100): # Use cached if we have at least 100 candles elapsed_ms = (time.time() - start_time_ms) * 1000 logger.debug(f" DataProvider cache hit for {symbol} {timeframe} ({len(cached_df)} candles, {elapsed_ms:.1f}ms)") # Filter by time range with direction support filtered_df = self._filter_by_time_range( cached_df.copy(), start_time, end_time, direction, limit ) # Only return cached data if filter produced results if filtered_df is not None and not filtered_df.empty: # Cache in memory self.memory_cache[cache_key] = (filtered_df, datetime.now()) return filtered_df # If filter returned empty, fall through to fetch from DuckDB/API # Try unified storage first if available if hasattr(self.data_provider, 'is_unified_storage_enabled') and \ self.data_provider.is_unified_storage_enabled(): try: import asyncio # Get data from unified storage loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # If we have a specific time range, get historical data if start_time or end_time: target_time = end_time if end_time else start_time inference_data = loop.run_until_complete( self.data_provider.get_inference_data_unified( symbol, timestamp=target_time, context_window_minutes=60 ) ) else: # Get latest real-time data inference_data = loop.run_until_complete( self.data_provider.get_inference_data_unified(symbol) ) # Extract the requested timeframe df = inference_data.get_timeframe_data(timeframe) if df is not None and not df.empty: # Limit number of candles if len(df) > limit: df = df.tail(limit) # Cache in memory self.memory_cache[cache_key] = (df.copy(), datetime.now()) logger.info(f"Loaded {len(df)} candles from unified storage for {symbol} {timeframe}") return df except Exception as e: logger.debug(f"Unified storage not available, falling back to cached data: {e}") # Fallback to existing cached data method (duplicate check - should not reach here if first check worked) # This is kept for backward compatibility but should rarely execute if hasattr(self.data_provider, 'cached_data'): if symbol in self.data_provider.cached_data: if timeframe in self.data_provider.cached_data[symbol]: df = self.data_provider.cached_data[symbol][timeframe] if df is not None and not df.empty: # Check if cached data covers the requested time range use_cached_data = True if start_time or end_time: if isinstance(df.index, pd.DatetimeIndex): cache_start = df.index.min() cache_end = df.index.max() if start_time and start_time < cache_start: use_cached_data = False elif end_time and end_time > cache_end: use_cached_data = False elif start_time and end_time: if end_time < cache_start or start_time > cache_end: use_cached_data = False if use_cached_data: # Filter by time range with direction support df = self._filter_by_time_range( df.copy(), start_time, end_time, direction, limit ) # Only return if filter produced results if df is not None and not df.empty: # Cache in memory self.memory_cache[cache_key] = (df.copy(), datetime.now()) logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}") return df # If filter returned empty or range not covered, fall through to fetch from DuckDB/API # Check DuckDB first for historical data (always check for infinite scroll) if self.data_provider.duckdb_storage and (start_time or end_time): logger.info(f"Checking DuckDB for {symbol} {timeframe} historical data (direction={direction})") df = self.data_provider.duckdb_storage.get_ohlcv_data( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, limit=limit, direction=direction ) if df is not None and not df.empty: elapsed_ms = (time.time() - start_time_ms) * 1000 logger.info(f" DuckDB hit for {symbol} {timeframe} ({len(df)} candles, {elapsed_ms:.1f}ms)") # Cache in memory self.memory_cache[cache_key] = (df.copy(), datetime.now()) return df else: logger.info(f"No data in DuckDB, fetching from exchange API for {symbol} {timeframe}") # Fetch from exchange API with time range df = self._fetch_from_exchange_api( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, limit=limit, direction=direction ) if df is not None and not df.empty: elapsed_ms = (time.time() - start_time_ms) * 1000 logger.info(f"Exchange API hit for {symbol} {timeframe} ({len(df)} candles, {elapsed_ms:.1f}ms)") # Store in DuckDB for future use if self.data_provider.duckdb_storage: stored_count = self.data_provider.duckdb_storage.store_ohlcv_data( symbol=symbol, timeframe=timeframe, df=df ) logger.info(f"Stored {stored_count} new candles in DuckDB") # Cache in memory self.memory_cache[cache_key] = (df.copy(), datetime.now()) return df else: logger.warning(f"No data available from exchange API for {symbol} {timeframe}") return None # Fallback: Use DataProvider for latest data (startup mode or no time range) if self.startup_mode and not (start_time or end_time) and not force_refresh: logger.info(f"Loading data for {symbol} {timeframe} (startup mode: allow stale cache)") df = self.data_provider.get_historical_data( symbol=symbol, timeframe=timeframe, limit=limit, allow_stale_cache=True ) elif is_live_update: # For live updates, use get_latest_candles which combines cached + real-time data logger.debug(f"Getting live candles (cached + real-time) for {symbol} {timeframe}") df = self.data_provider.get_latest_candles( symbol=symbol, timeframe=timeframe, limit=limit ) # Log the latest candle timestamp to help debug stale data if df is not None and not df.empty: latest_timestamp = df.index[-1] if hasattr(df.index, '__getitem__') else df.iloc[-1].name logger.debug(f"Live update for {symbol} {timeframe}: latest candle at {latest_timestamp}") else: # Fetch from API and store in DuckDB (no time range specified) # For 1s/1m, logging every request is too verbose, use debug if timeframe in ['1s', '1m']: logger.debug(f"Fetching latest data from API for {symbol} {timeframe}") else: logger.info(f"Fetching latest data from API for {symbol} {timeframe}") df = self.data_provider.get_historical_data( symbol=symbol, timeframe=timeframe, limit=limit, refresh=True # Force API fetch ) if df is not None and not df.empty: # Filter by time range with direction support df = self._filter_by_time_range( df.copy(), start_time, end_time, direction, limit ) # Cache in memory self.memory_cache[cache_key] = (df.copy(), datetime.now()) logger.info(f"Fetched {len(df)} candles for {symbol} {timeframe}") return df logger.warning(f"No data available for {symbol} {timeframe}") return None except Exception as e: logger.error(f"Error loading data for {symbol} {timeframe}: {e}") return None def _fetch_from_exchange_api(self, symbol: str, timeframe: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 1000, direction: str = 'latest') -> Optional[pd.DataFrame]: """ Fetch historical data from exchange API (Binance/MEXC) with time range support Args: symbol: Trading pair timeframe: Timeframe start_time: Start time for data range end_time: End time for data range limit: Maximum number of candles direction: 'latest', 'before', or 'after' Returns: DataFrame with OHLCV data or None """ try: import requests from core.api_rate_limiter import get_rate_limiter # Convert symbol format for Binance binance_symbol = symbol.replace('/', '').upper() # Convert timeframe timeframe_map = { '1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d' } binance_timeframe = timeframe_map.get(timeframe, '1m') # Build initial API parameters params = { 'symbol': binance_symbol, 'interval': binance_timeframe } # Add time range parameters if specified if direction == 'before' and end_time: params['endTime'] = int(end_time.timestamp() * 1000) elif direction == 'after' and start_time: params['startTime'] = int(start_time.timestamp() * 1000) elif start_time: params['startTime'] = int(start_time.timestamp() * 1000) if end_time and direction != 'before': params['endTime'] = int(end_time.timestamp() * 1000) # Use rate limiter rate_limiter = get_rate_limiter() url = "https://api.binance.com/api/v3/klines" logger.info(f"Fetching from Binance: {symbol} {timeframe} (direction={direction}, limit={limit})") # Pagination variables all_dfs = [] total_fetched = 0 is_fetching_forward = (direction == 'after') # Fetch loop while total_fetched < limit: # Calculate batch limit (max 1000 per request) batch_limit = min(limit - total_fetched, 1000) params['limit'] = batch_limit response = rate_limiter.make_request('binance_api', url, 'GET', params=params) if response is None or response.status_code != 200: if total_fetched == 0: logger.warning(f"Binance API failed, trying MEXC...") return self._fetch_from_mexc_with_time_range( symbol, timeframe, start_time, end_time, limit, direction ) else: logger.warning("Binance API failed during pagination, returning partial data") break data = response.json() if not data: if total_fetched == 0: logger.warning(f"No data returned from Binance for {symbol} {timeframe}") return None else: break # Convert to DataFrame df = pd.DataFrame(data, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'trades', 'taker_buy_base', 'taker_buy_quote', 'ignore' ]) # Process columns df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True) for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) # Keep only OHLCV columns df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] df = df.set_index('timestamp') df = df.sort_index() if df.empty: break all_dfs.append(df) total_fetched += len(df) # Prepare for next batch if total_fetched >= limit: break # Update params for next iteration if is_fetching_forward: # Next batch starts after the last candle last_ts = df.index[-1] params['startTime'] = int(last_ts.value / 10**6) + 1 # Check if we exceeded end_time if 'endTime' in params and params['startTime'] > params['endTime']: break else: # Next batch ends before the first candle first_ts = df.index[0] params['endTime'] = int(first_ts.value / 10**6) - 1 # Check if we exceeded start_time if 'startTime' in params and params['endTime'] < params['startTime']: break # Combine all batches if not all_dfs: return None final_df = pd.concat(all_dfs) final_df = final_df.sort_index() final_df = final_df[~final_df.index.duplicated(keep='first')] logger.info(f" Fetched {len(final_df)} candles from Binance for {symbol} {timeframe} (requested {limit})") return final_df except Exception as e: logger.error(f"Error fetching from exchange API: {e}") return None def _fetch_from_mexc_with_time_range(self, symbol: str, timeframe: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 1000, direction: str = 'latest') -> Optional[pd.DataFrame]: """Fetch from MEXC with time range support (fallback)""" try: # MEXC implementation would go here # For now, just return None to indicate unavailable logger.warning("MEXC time range fetch not implemented yet") return None except Exception as e: logger.error(f"Error fetching from MEXC: {e}") return None def _filter_by_time_range(self, df: pd.DataFrame, start_time: Optional[datetime], end_time: Optional[datetime], direction: str = 'latest', limit: int = 500) -> pd.DataFrame: """ Filter DataFrame by time range with direction support Args: df: DataFrame to filter start_time: Start time filter end_time: End time filter direction: 'latest', 'before', or 'after' limit: Maximum number of candles Returns: Filtered DataFrame """ try: # Ensure df index is datetime and timezone-aware (UTC) if not isinstance(df.index, pd.DatetimeIndex): df.index = pd.to_datetime(df.index, utc=True) elif df.index.tz is None: df.index = df.index.tz_localize('UTC') else: # If already aware but not UTC, convert if str(df.index.tz) != 'UTC' and str(df.index.tz) != 'datetime.timezone.utc': df.index = df.index.tz_convert('UTC') # Ensure start_time/end_time are UTC if start_time and start_time.tzinfo is None: start_time = start_time.replace(tzinfo=timezone.utc) elif start_time: start_time = start_time.astimezone(timezone.utc) if end_time and end_time.tzinfo is None: end_time = end_time.replace(tzinfo=timezone.utc) elif end_time: end_time = end_time.astimezone(timezone.utc) if direction == 'before' and end_time: # Get candles BEFORE end_time df = df[df.index < end_time] # Return the most recent N candles before end_time df = df.tail(limit) elif direction == 'after' and start_time: # Get candles AFTER start_time df = df[df.index > start_time] # Return the oldest N candles after start_time df = df.head(limit) else: # Default: filter by range if start_time: df = df[df.index >= start_time] if end_time: df = df[df.index <= end_time] # Return most recent candles if len(df) > limit: df = df.tail(limit) return df except Exception as e: logger.error(f"Error filtering data: {e}") # Fallback: return original or empty return df if not df.empty else pd.DataFrame() def get_multi_timeframe_data(self, symbol: str, timeframes: List[str], start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 2500) -> Dict[str, pd.DataFrame]: """ Get data for multiple timeframes at once Args: symbol: Trading pair timeframes: List of timeframes start_time: Start time for data range end_time: End time for data range limit: Maximum number of candles per timeframe Returns: Dictionary mapping timeframe to DataFrame """ result = {} for timeframe in timeframes: df = self.get_data( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, limit=limit ) if df is not None: result[timeframe] = df logger.info(f"Loaded data for {len(result)}/{len(timeframes)} timeframes") return result def prefetch_data(self, symbol: str, timeframes: List[str], limit: int = 1000): """ Prefetch data for smooth scrolling Args: symbol: Trading pair timeframes: List of timeframes to prefetch limit: Number of candles to prefetch """ logger.info(f"Prefetching data for {symbol}: {timeframes}") for timeframe in timeframes: self.get_data(symbol, timeframe, limit=limit) def clear_cache(self): """Clear memory cache""" self.memory_cache.clear() logger.info("Memory cache cleared") def disable_startup_mode(self): """Disable startup mode to fetch fresh data""" self.startup_mode = False logger.info("Startup mode disabled - will fetch fresh data on next request") def get_data_boundaries(self, symbol: str, timeframe: str) -> Tuple[Optional[datetime], Optional[datetime]]: """ Get the earliest and latest available data timestamps Args: symbol: Trading pair timeframe: Timeframe Returns: Tuple of (earliest_time, latest_time) or (None, None) if no data """ try: df = self.get_data(symbol, timeframe, limit=10000) if df is not None and not df.empty: return (df.index.min(), df.index.max()) return (None, None) except Exception as e: logger.error(f"Error getting data boundaries: {e}") return (None, None) class TimeRangeManager: """Manages time range calculations and data prefetching""" def __init__(self, data_loader: HistoricalDataLoader): """ Initialize with data loader Args: data_loader: HistoricalDataLoader instance """ self.data_loader = data_loader # Time range presets in seconds self.range_presets = { '1h': 3600, '4h': 14400, '1d': 86400, '1w': 604800, '1M': 2592000 } logger.info("TimeRangeManager initialized") def calculate_time_range(self, center_time: datetime, range_preset: str) -> Tuple[datetime, datetime]: """ Calculate start and end times for a range preset Args: center_time: Center point of the range range_preset: Range preset ('1h', '4h', '1d', '1w', '1M') Returns: Tuple of (start_time, end_time) """ range_seconds = self.range_presets.get(range_preset, 86400) half_range = timedelta(seconds=range_seconds / 2) start_time = center_time - half_range end_time = center_time + half_range return (start_time, end_time) def get_navigation_increment(self, range_preset: str) -> timedelta: """ Get time increment for navigation (10% of range) Args: range_preset: Range preset Returns: timedelta for navigation increment """ range_seconds = self.range_presets.get(range_preset, 86400) increment_seconds = range_seconds / 10 return timedelta(seconds=increment_seconds) def prefetch_adjacent_ranges(self, symbol: str, timeframes: List[str], center_time: datetime, range_preset: str): """ Prefetch data for adjacent time ranges for smooth scrolling Args: symbol: Trading pair timeframes: List of timeframes center_time: Current center time range_preset: Current range preset """ increment = self.get_navigation_increment(range_preset) # Prefetch previous range prev_center = center_time - increment prev_start, prev_end = self.calculate_time_range(prev_center, range_preset) # Prefetch next range next_center = center_time + increment next_start, next_end = self.calculate_time_range(next_center, range_preset) logger.debug(f"Prefetching adjacent ranges for {symbol}") # Prefetch in background (non-blocking) import threading def prefetch(): for timeframe in timeframes: self.data_loader.get_data(symbol, timeframe, prev_start, prev_end) self.data_loader.get_data(symbol, timeframe, next_start, next_end) thread = threading.Thread(target=prefetch, daemon=True) thread.start()