From 420251f2d4de1cf7313373ad09d96899ddd4de33 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 24 Oct 2025 17:14:59 +0300 Subject: [PATCH] fix pivot cache db --- ANNOTATE/web/app.py | 51 ++----- core/data_provider.py | 303 +++++++++++++++++++++++++++++------------ core/duckdb_storage.py | 78 +++++++++++ 3 files changed, 308 insertions(+), 124 deletions(-) diff --git a/ANNOTATE/web/app.py b/ANNOTATE/web/app.py index d1d3e6a..c5debde 100644 --- a/ANNOTATE/web/app.py +++ b/ANNOTATE/web/app.py @@ -140,21 +140,20 @@ class AnnotationDashboard: ], className="container") ]) - # Initialize core components - self.data_provider = DataProvider() if DataProvider else None + # Initialize core components (skip initial load for fast startup) + self.data_provider = DataProvider(skip_initial_load=True) if DataProvider else None # Enable unified storage for real-time data access if self.data_provider: self._enable_unified_storage_async() - self.orchestrator = TradingOrchestrator( - data_provider=self.data_provider - ) if TradingOrchestrator and self.data_provider else None + # ANNOTATE doesn't need orchestrator - skip ML model loading for fast startup + self.orchestrator = None # Initialize ANNOTATE components self.annotation_manager = AnnotationManager() # Use REAL training adapter - NO SIMULATION! - self.training_adapter = RealTrainingAdapter(self.orchestrator, self.data_provider) + self.training_adapter = RealTrainingAdapter(None, self.data_provider) # Initialize data loader with existing DataProvider self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None @@ -206,56 +205,32 @@ class AnnotationDashboard: storage_thread.start() def _start_background_data_refresh(self): - """Start background task to refresh recent data after startup""" + """Start background task to refresh recent data after startup - ONCE ONLY""" def refresh_recent_data(): try: import time # Wait for app to fully start time.sleep(5) - logger.info("🔄 Starting background data refresh (fetching only recent missing data)") + logger.info("🔄 Starting one-time background data refresh (fetching only recent missing data)") # Disable startup mode to fetch fresh data self.data_loader.disable_startup_mode() - # Fetch only last 5 minutes of 1m data and 300 seconds of 1s data - symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) + # Use the new on-demand refresh method + logger.info("Using on-demand refresh for recent data") + self.data_provider.refresh_data_on_demand() - for symbol in symbols: - try: - # Fetch last 5 candles of 1m data (5 minutes) - logger.info(f"Refreshing recent 1m data for {symbol}") - self.data_provider.get_historical_data( - symbol=symbol, - timeframe='1m', - limit=5, - refresh=True - ) - - # Fetch last 300 candles of 1s data (5 minutes) - logger.info(f"Refreshing recent 1s data for {symbol}") - self.data_provider.get_historical_data( - symbol=symbol, - timeframe='1s', - limit=300, - refresh=True - ) - - logger.info(f"✅ Refreshed recent data for {symbol}") - - except Exception as e: - logger.warning(f"Could not refresh recent data for {symbol}: {e}") - - logger.info("✅ Background data refresh completed") + logger.info("✅ One-time background data refresh completed") except Exception as e: logger.error(f"Error in background data refresh: {e}") - # Start in background thread + # Start refresh in background thread import threading refresh_thread = threading.Thread(target=refresh_recent_data, daemon=True) refresh_thread.start() - logger.info("📊 Background data refresh scheduled") + logger.info("📊 One-time background data refresh scheduled") def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict: """ diff --git a/core/data_provider.py b/core/data_provider.py index 3b6de80..791e135 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -29,7 +29,7 @@ import requests import pandas as pd import numpy as np import pickle -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field @@ -147,12 +147,19 @@ class DataSubscriber: class DataProvider: """Unified data provider for historical and real-time market data with centralized distribution""" - def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): - """Initialize the data provider""" + def __init__(self, symbols: List[str] = None, timeframes: List[str] = None, skip_initial_load: bool = False): + """Initialize the data provider + + Args: + symbols: List of trading symbols + timeframes: List of timeframes + skip_initial_load: If True, skip initial data load (use for ANNOTATE with DuckDB) + """ self.config = get_config() # Fixed symbols and timeframes self.symbols = ['ETH/USDT', 'BTC/USDT'] self.timeframes = ['1s', '1m', '1h', '1d'] + self.skip_initial_load = skip_initial_load # Data storage - cached OHLCV data (1500 candles each) self.cached_data = {} # {symbol: {timeframe: DataFrame}} @@ -346,6 +353,9 @@ class DataProvider: self.retry_delay = 60 # 1 minute retry delay for 451 errors self.max_retries = 3 + # Always load data from DuckDB immediately (synchronous for fast startup) + self._load_from_duckdb_sync() + # Start automatic data maintenance self.start_automatic_data_maintenance() @@ -531,75 +541,117 @@ class DataProvider: logger.info("Automatic data maintenance stopped") def _data_maintenance_worker(self): - """Worker thread for automatic data maintenance""" + """Worker thread for initial data load only - no continuous fetching""" logger.info("Data maintenance worker started") - # Initial data load - self._initial_data_load() + # Initial data load (skip if requested) + if not self.skip_initial_load: + self._initial_data_load() + else: + logger.info("Skipping initial data load (using DuckDB cache)") - # Track last update times for each symbol/timeframe - last_updates = {} - for symbol in self.symbols: - last_updates[symbol] = {} - for timeframe in self.timeframes: - last_updates[symbol][timeframe] = 0 + logger.info("✅ Initial data load completed - stopping maintenance worker") + logger.info("📊 Data will be updated on-demand only (no continuous fetching)") - while self.data_maintenance_active: - try: - current_time = time.time() - - # Check each symbol/timeframe for updates - for symbol in self.symbols: - for timeframe in self.timeframes: - interval = self.timeframe_intervals[timeframe] - half_interval = interval / 2 - - # Update every half candle period - if current_time - last_updates[symbol][timeframe] >= half_interval: - self._update_cached_data(symbol, timeframe) - last_updates[symbol][timeframe] = current_time - - # Sleep for 1 second before next check - time.sleep(1) - - except Exception as e: - logger.error(f"Error in data maintenance worker: {e}") - time.sleep(10) # Wait longer on error + # Stop the maintenance worker after initial load + self.data_maintenance_active = False def _initial_data_load(self): - """Load initial 1500 candles for each symbol/timeframe""" - logger.info("Starting initial data load (1500 candles each)") + """Smart incremental load - load from DuckDB + fetch only missing candles since last timestamp""" + logger.info("Starting smart incremental data load") for symbol in self.symbols: for timeframe in self.timeframes: try: - logger.info(f"Loading initial data for {symbol} {timeframe}") - df = self._fetch_from_binance(symbol, timeframe, 1500) + # Step 1: Load existing data from DuckDB (up to 1500 candles) + existing_df = None + last_timestamp = None - if df is None or df.empty: - logger.warning(f"Binance failed for {symbol} {timeframe}, trying MEXC") - df = self._fetch_from_mexc(symbol, timeframe, 1500) + if self.duckdb_storage: + try: + # Load existing data + existing_df = self.duckdb_storage.get_ohlcv_data( + symbol=symbol, + timeframe=timeframe, + limit=1500 + ) + + if existing_df is not None and not existing_df.empty: + # Store in memory cache + with self.data_lock: + self.cached_data[symbol][timeframe] = existing_df.tail(1500) + + last_timestamp = existing_df.index.max() + logger.info(f"📦 Loaded {len(existing_df)} candles from DuckDB for {symbol} {timeframe}") + else: + logger.debug(f"No existing data in DuckDB for {symbol} {timeframe}") + except Exception as e: + logger.debug(f"Error loading from DuckDB for {symbol} {timeframe}: {e}") - if df is not None and not df.empty: - # Ensure proper datetime index - df = self._ensure_datetime_index(df) + # Step 2: Fetch only missing candles since last timestamp + if last_timestamp: + # Calculate how many candles we might be missing + now = datetime.now(timezone.utc) + time_diff = (now - last_timestamp).total_seconds() - # Store in cached data - self.cached_data[symbol][timeframe] = df - logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}") + # Estimate missing candles based on timeframe + timeframe_seconds = {'1s': 1, '1m': 60, '1h': 3600, '1d': 86400} + estimated_missing = int(time_diff / timeframe_seconds.get(timeframe, 60)) + + if estimated_missing > 2: # Only fetch if more than 2 candles missing + # Cap at 1500 candles maximum + fetch_limit = min(estimated_missing + 10, 1500) + + logger.info(f"🔄 Fetching {fetch_limit} recent candles for {symbol} {timeframe} (since {last_timestamp})") + new_df = self._fetch_from_binance(symbol, timeframe, fetch_limit) + + if new_df is None or new_df.empty: + new_df = self._fetch_from_mexc(symbol, timeframe, fetch_limit) + + if new_df is not None and not new_df.empty: + new_df = self._ensure_datetime_index(new_df) + + # Store to DuckDB + if self.duckdb_storage: + self.duckdb_storage.store_ohlcv_data(symbol, timeframe, new_df) + + # Merge with existing data + with self.data_lock: + combined_df = pd.concat([existing_df, new_df], ignore_index=False) + combined_df = combined_df[~combined_df.index.duplicated(keep='last')] + combined_df = combined_df.sort_index() + self.cached_data[symbol][timeframe] = combined_df.tail(1500) + + logger.info(f"✅ {symbol} {timeframe}: +{len(new_df)} new (total: {len(self.cached_data[symbol][timeframe])})") + else: + logger.info(f"✅ {symbol} {timeframe}: Up to date ({len(existing_df)} candles)") else: - logger.error(f"Failed to load initial data for {symbol} {timeframe}") + # No existing data - fetch initial 1500 candles + logger.info(f"🆕 No existing data, fetching 1500 candles for {symbol} {timeframe}") + df = self._fetch_from_binance(symbol, timeframe, 1500) + + if df is None or df.empty: + df = self._fetch_from_mexc(symbol, timeframe, 1500) + + if df is not None and not df.empty: + df = self._ensure_datetime_index(df) + + # Store to DuckDB + if self.duckdb_storage: + self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df) + + with self.data_lock: + self.cached_data[symbol][timeframe] = df + + logger.info(f"✅ Loaded {len(df)} candles for {symbol} {timeframe}") - # Rate limiting between requests - time.sleep(0.5) + # Small delay to avoid rate limits + time.sleep(0.1) except Exception as e: - logger.error(f"Error loading initial data for {symbol} {timeframe}: {e}") + logger.error(f"Error loading data for {symbol} {timeframe}: {e}") - logger.info("Initial data load completed") - - # Start background candle catch-up with proper locking - self._start_background_catch_up() + logger.info("✅ Smart incremental data load completed") def _start_background_catch_up(self): """ @@ -703,47 +755,92 @@ class DataProvider: self.catch_up_completed = True def _update_cached_data(self, symbol: str, timeframe: str): - """Update cached data by fetching last 2 candles and storing to DuckDB""" + """Update cached data by fetching only missing candles since last timestamp""" try: - # Fetch last 2 candles (outside lock - network I/O) - df = self._fetch_from_binance(symbol, timeframe, 2) + # Get last timestamp from DuckDB + last_timestamp = None + if self.duckdb_storage: + last_timestamp = self.duckdb_storage.get_last_timestamp(symbol, timeframe) - if df is None or df.empty: - df = self._fetch_from_mexc(symbol, timeframe, 2) - - if df is not None and not df.empty: - # Ensure proper datetime index - df = self._ensure_datetime_index(df) + if last_timestamp: + # Calculate how many candles we might be missing + now = datetime.now(timezone.utc) + time_diff = (now - last_timestamp).total_seconds() - # Store to DuckDB immediately (live data persistence) - if self.duckdb_storage: - try: - self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df) - logger.debug(f"Stored live data to DuckDB: {symbol} {timeframe} ({len(df)} candles)") - except Exception as e: - logger.warning(f"Could not store live data to DuckDB: {e}") + # Estimate missing candles based on timeframe + timeframe_seconds = {'1s': 1, '1m': 60, '1h': 3600, '1d': 86400} + estimated_missing = int(time_diff / timeframe_seconds.get(timeframe, 60)) - # Update cached data with lock - with self.data_lock: - existing_df = self.cached_data[symbol][timeframe] + if estimated_missing > 0: + # Cap at 1500 candles maximum + fetch_limit = min(estimated_missing + 5, 1500) - if not existing_df.empty: - # Merge new data with existing, avoiding duplicates - combined_df = pd.concat([existing_df, df], ignore_index=False) - combined_df = combined_df[~combined_df.index.duplicated(keep='last')] - combined_df = combined_df.sort_index() + logger.info(f"🔄 Fetching {fetch_limit} recent candles for {symbol} {timeframe} (since {last_timestamp})") + + # Fetch missing candles + df = self._fetch_from_binance(symbol, timeframe, fetch_limit) + + if df is None or df.empty: + df = self._fetch_from_mexc(symbol, timeframe, fetch_limit) + + if df is not None and not df.empty: + df = self._ensure_datetime_index(df) - # Keep only last 1500 candles in memory - self.cached_data[symbol][timeframe] = combined_df.tail(1500) + # Store to DuckDB + if self.duckdb_storage: + try: + self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df) + logger.debug(f"Stored live data to DuckDB: {symbol} {timeframe} ({len(df)} candles)") + except Exception as e: + logger.warning(f"Could not store live data to DuckDB: {e}") + + # Update cached data with lock + with self.data_lock: + existing_df = self.cached_data[symbol][timeframe] + + if not existing_df.empty: + # Merge new data with existing, avoiding duplicates + combined_df = pd.concat([existing_df, df], ignore_index=False) + combined_df = combined_df[~combined_df.index.duplicated(keep='last')] + combined_df = combined_df.sort_index() + + # Keep only last 1500 candles in memory + self.cached_data[symbol][timeframe] = combined_df.tail(1500) + else: + self.cached_data[symbol][timeframe] = df + + candle_count = len(self.cached_data[symbol][timeframe]) + + logger.info(f"✅ Updated {symbol} {timeframe}: +{len(df)} new (total: {candle_count})") else: - self.cached_data[symbol][timeframe] = df - - candle_count = len(self.cached_data[symbol][timeframe]) - - logger.debug(f"Updated cached data for {symbol} {timeframe}: {candle_count} candles") + logger.warning(f"Could not fetch new data for {symbol} {timeframe}") + else: + logger.debug(f"{symbol} {timeframe}: Up to date") + else: + logger.debug(f"No existing data for {symbol} {timeframe} - skipping update") except Exception as e: logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}") + + def refresh_data_on_demand(self, symbol: str = None, timeframe: str = None): + """Manually refresh data for specific symbol/timeframe or all symbols""" + try: + if symbol and timeframe: + # Refresh specific symbol/timeframe + logger.info(f"🔄 Manual refresh requested for {symbol} {timeframe}") + self._update_cached_data(symbol, timeframe) + else: + # Refresh all symbols/timeframes + logger.info("🔄 Manual refresh requested for all symbols/timeframes") + for sym in self.symbols: + for tf in self.timeframes: + self._update_cached_data(sym, tf) + time.sleep(0.1) # Small delay to avoid rate limits + + logger.info("✅ Manual refresh completed for all symbols/timeframes") + + except Exception as e: + logger.error(f"Error in manual refresh: {e}") def start_cob_websocket_integration(self): """Start COB WebSocket integration using COBIntegration class""" @@ -1353,7 +1450,7 @@ class DataProvider: # If we have a RangeIndex or other non-datetime index, create datetime index if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex): # Use current UTC time and work backwards for realistic timestamps - from datetime import datetime, timedelta + from datetime import datetime, timedelta, timezone end_time = datetime.utcnow() start_time = end_time - timedelta(minutes=len(df)) df.index = pd.date_range(start=start_time, end=end_time, periods=len(df), tz='UTC') @@ -3007,6 +3104,40 @@ class DataProvider: logger.debug(f"Error calculating RSI: {e}") return 50.0 # Default neutral value + def _load_from_duckdb_sync(self): + """Load all data from DuckDB synchronously for instant startup""" + if not self.duckdb_storage: + logger.warning("⚠️ DuckDB storage not available - cannot load cached data") + return + + logger.info("📦 Loading cached data from DuckDB...") + loaded_count = 0 + + for symbol in self.symbols: + for timeframe in self.timeframes: + try: + df = self.duckdb_storage.get_ohlcv_data( + symbol=symbol, + timeframe=timeframe, + limit=1500 + ) + + if df is not None and not df.empty: + with self.data_lock: + self.cached_data[symbol][timeframe] = df.tail(1500) + logger.info(f"✅ {symbol} {timeframe}: {len(df)} candles from DuckDB") + loaded_count += len(df) + else: + logger.debug(f"No data in DuckDB for {symbol} {timeframe} - will fetch from API") + + except Exception as e: + logger.error(f"❌ Error loading {symbol} {timeframe}: {e}") + + if loaded_count > 0: + logger.info(f"✅ Loaded {loaded_count:,} candles total") + else: + logger.warning("⚠️ No cached data found - will fetch from API") + def _load_from_duckdb(self, symbol: str, timeframe: str, limit: int = 1500) -> Optional[pd.DataFrame]: """Load data from DuckDB storage diff --git a/core/duckdb_storage.py b/core/duckdb_storage.py index cc1ece3..14cafa2 100644 --- a/core/duckdb_storage.py +++ b/core/duckdb_storage.py @@ -245,6 +245,84 @@ class DuckDBStorage: traceback.print_exc() return None + def get_last_timestamp(self, symbol: str, timeframe: str) -> Optional[datetime]: + """ + Get the last timestamp for a symbol/timeframe from DuckDB + + Args: + symbol: Trading symbol + timeframe: Timeframe + + Returns: + Last timestamp or None if no data exists + """ + try: + query = """ + SELECT MAX(timestamp) as last_timestamp + FROM ohlcv_data + WHERE symbol = ? AND timeframe = ? + """ + + result = self.conn.execute(query, [symbol, timeframe]).fetchone() + + if result and result[0] is not None: + last_timestamp = pd.to_datetime(result[0], unit='ms', utc=True) + logger.debug(f"Last timestamp for {symbol} {timeframe}: {last_timestamp}") + return last_timestamp + + return None + + except Exception as e: + logger.error(f"Error getting last timestamp for {symbol} {timeframe}: {e}") + return None + + def get_ohlcv_data_since_timestamp(self, symbol: str, timeframe: str, + since_timestamp: datetime, + limit: int = 1500) -> Optional[pd.DataFrame]: + """ + Get OHLCV data since a specific timestamp, capped at limit + + Args: + symbol: Trading symbol + timeframe: Timeframe + since_timestamp: Get data since this timestamp + limit: Maximum number of candles (default 1500) + + Returns: + DataFrame with OHLCV data since timestamp + """ + try: + query = """ + SELECT timestamp, open, high, low, close, volume + FROM ohlcv_data + WHERE symbol = ? AND timeframe = ? AND timestamp > ? + ORDER BY timestamp ASC + LIMIT ? + """ + + params = [ + symbol, + timeframe, + int(since_timestamp.timestamp() * 1000), + limit + ] + + df = self.conn.execute(query, params).df() + + if df.empty: + return None + + # Convert timestamp to datetime + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True) + df = df.set_index('timestamp') + + logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe} since {since_timestamp}") + return df + + except Exception as e: + logger.error(f"Error retrieving OHLCV data since timestamp: {e}") + return None + def store_annotation(self, annotation_id: str, annotation_data: Dict[str, Any], market_snapshots: Dict[str, pd.DataFrame], model_predictions: Optional[List[Dict]] = None) -> bool: