diff --git a/core/data_provider.py b/core/data_provider.py index 857f9ce..3b6de80 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -351,6 +351,9 @@ class DataProvider: # Start COB WebSocket integration self.start_cob_websocket_integration() + + # Start periodic DuckDB storage of live data + self._start_periodic_duckdb_storage() # =================================================================== # UNIFIED STORAGE SYSTEM METHODS @@ -700,7 +703,7 @@ class DataProvider: self.catch_up_completed = True def _update_cached_data(self, symbol: str, timeframe: str): - """Update cached data by fetching last 2 candles with thread-safe locking""" + """Update cached data by fetching last 2 candles and storing to DuckDB""" try: # Fetch last 2 candles (outside lock - network I/O) df = self._fetch_from_binance(symbol, timeframe, 2) @@ -712,6 +715,14 @@ class DataProvider: # Ensure proper datetime index df = self._ensure_datetime_index(df) + # Store to DuckDB immediately (live data persistence) + if self.duckdb_storage: + try: + self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df) + logger.debug(f"Stored live data to DuckDB: {symbol} {timeframe} ({len(df)} candles)") + except Exception as e: + logger.warning(f"Could not store live data to DuckDB: {e}") + # Update cached data with lock with self.data_lock: existing_df = self.cached_data[symbol][timeframe] @@ -722,7 +733,7 @@ class DataProvider: combined_df = combined_df[~combined_df.index.duplicated(keep='last')] combined_df = combined_df.sort_index() - # Keep only last 1500 candles + # Keep only last 1500 candles in memory self.cached_data[symbol][timeframe] = combined_df.tail(1500) else: self.cached_data[symbol][timeframe] = df @@ -2037,10 +2048,22 @@ class DataProvider: # === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM === def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]: - """Collect 30 days of 1m candles with smart gap-filling cache system""" + """Collect 30 days of 1m candles with DuckDB cache system""" try: - # Check for cached data and determine what we need to fetch - cached_data = self._load_monthly_data_from_cache(symbol) + # Check for cached data in DuckDB + cached_data = None + if self.duckdb_storage: + try: + end_time_check = datetime.utcnow() + start_time_check = end_time_check - timedelta(days=30) + cached_data = self.duckdb_storage.get_ohlcv_data( + symbol=symbol, + timeframe='1m', + start_time=start_time_check, + end_time=end_time_check + ) + except Exception as e: + logger.debug(f"No cached monthly data in DuckDB: {e}") import pytz utc = pytz.UTC @@ -2097,8 +2120,12 @@ class DataProvider: logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}") - # Update cache - self._save_monthly_data_to_cache(symbol, monthly_df) + # Update DuckDB cache + if self.duckdb_storage: + try: + self.duckdb_storage.store_ohlcv_data(symbol, '1m', monthly_df) + except Exception as e: + logger.warning(f"Could not cache monthly data in DuckDB: {e}") return monthly_df else: @@ -2473,8 +2500,37 @@ class DataProvider: # Store bounds self.pivot_bounds[symbol] = bounds - # Save to cache - self._save_pivot_bounds_to_cache(symbol, bounds) + # Save to DuckDB as JSON + if self.duckdb_storage: + try: + import pickle + bounds_json = { + 'symbol': symbol, + 'price_max': bounds.price_max, + 'price_min': bounds.price_min, + 'volume_max': bounds.volume_max, + 'volume_min': bounds.volume_min, + 'pivot_support_levels': bounds.pivot_support_levels, + 'pivot_resistance_levels': bounds.pivot_resistance_levels, + 'created_timestamp': bounds.created_timestamp.isoformat(), + 'data_period_start': bounds.data_period_start.isoformat(), + 'data_period_end': bounds.data_period_end.isoformat(), + 'total_candles_analyzed': bounds.total_candles_analyzed + } + # Store in DuckDB metadata table + self.duckdb_storage.conn.execute(""" + CREATE TABLE IF NOT EXISTS pivot_bounds_cache ( + symbol VARCHAR PRIMARY KEY, + bounds_data JSON, + updated_at BIGINT + ) + """) + self.duckdb_storage.conn.execute(""" + INSERT OR REPLACE INTO pivot_bounds_cache (symbol, bounds_data, updated_at) + VALUES (?, ?, ?) + """, (symbol, json.dumps(bounds_json), int(datetime.now().timestamp() * 1000))) + except Exception as e: + logger.warning(f"Could not save pivot bounds to DuckDB: {e}") logger.info(f"Successfully refreshed pivot bounds for {symbol}") @@ -2530,135 +2586,82 @@ class DataProvider: return symbol # Return first symbol for now - can be improved return None - # === CACHE MANAGEMENT === + # === DUCKDB CACHE MANAGEMENT === - def _auto_fix_corrupted_cache(self): - """Automatically fix corrupted cache files on startup""" - try: - from utils.cache_manager import get_cache_manager - cache_manager = get_cache_manager() + def _start_periodic_duckdb_storage(self): + """Start periodic task to store live data to DuckDB""" + def storage_worker(): + """Worker thread that periodically stores cached data to DuckDB""" + import time - # Quick health check - health_summary = cache_manager.get_cache_summary() + logger.info("Started periodic DuckDB storage worker (every 60 seconds)") - if health_summary['corrupted_files'] > 0: - logger.warning(f"Found {health_summary['corrupted_files']} corrupted cache files, cleaning up...") - - # Auto-cleanup corrupted files (no confirmation needed) - deleted_files = cache_manager.cleanup_corrupted_files(dry_run=False) - - deleted_count = 0 - for cache_dir, files in deleted_files.items(): - for file_info in files: - if "DELETED:" in file_info: - deleted_count += 1 - - logger.info(f"Auto-cleaned {deleted_count} corrupted cache files") - else: - logger.debug("Cache health check passed - no corrupted files found") - - except Exception as e: - logger.warning(f"Cache auto-fix failed: {e}") - - # === PIVOT BOUNDS CACHING === - - def _load_all_pivot_bounds(self): - """Load all cached pivot bounds on startup""" - try: - for symbol in self.symbols: - bounds = self._load_pivot_bounds_from_cache(symbol) - if bounds: - self.pivot_bounds[symbol] = bounds - logger.info(f"Loaded cached pivot bounds for {symbol}") - except Exception as e: - logger.error(f"Error loading pivot bounds from cache: {e}") - - def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]: - """Load pivot bounds from cache""" - try: - cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" - if cache_file.exists(): - with open(cache_file, 'rb') as f: - bounds = pickle.load(f) - - # Check if bounds are still valid (not too old) - age = datetime.now() - bounds.created_timestamp - if age <= self.pivot_refresh_interval: - return bounds - else: - logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)") - - return None - - except Exception as e: - logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}") - return None - - def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds): - """Save pivot bounds to cache""" - try: - cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl" - with open(cache_file, 'wb') as f: - pickle.dump(bounds, f) - logger.debug(f"Saved pivot bounds to cache for {symbol}") - except Exception as e: - logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}") - - def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]: - """Load monthly 1m data from cache""" - try: - cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" - if cache_file.exists(): + while True: try: - df = pd.read_parquet(cache_file) - # Ensure cached monthly data has proper timezone (UTC to match COB WebSocket data) - if not df.empty and 'timestamp' in df.columns: - if df['timestamp'].dt.tz is None: - # If no timezone info, assume UTC and keep in UTC - df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True) - elif str(df['timestamp'].dt.tz) != 'UTC': - # Convert to UTC if different timezone - df['timestamp'] = df['timestamp'].dt.tz_convert('UTC') - logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}") - return df - except Exception as parquet_e: - # Handle corrupted Parquet file - expanded error detection - error_str = str(parquet_e).lower() - corrupted_indicators = [ - "parquet magic bytes not found", - "corrupted", - "couldn't deserialize thrift", - "don't know what type", - "invalid parquet file", - "unexpected end of file", - "invalid metadata" - ] + time.sleep(60) # Store every minute - if any(indicator in error_str for indicator in corrupted_indicators): - logger.warning(f"Corrupted Parquet cache file for {symbol}, removing and returning None: {parquet_e}") - try: - cache_file.unlink() # Delete corrupted file - logger.info(f"Deleted corrupted monthly cache file: {cache_file}") - except Exception as delete_e: - logger.error(f"Failed to delete corrupted monthly cache file: {delete_e}") - return None - else: - raise parquet_e - - return None - - except Exception as e: - logger.warning(f"Error loading monthly data from cache for {symbol}: {e}") - return None + if not self.duckdb_storage: + continue + + # Store all cached data to DuckDB + for symbol in self.symbols: + for timeframe in self.timeframes: + with self.data_lock: + df = self.cached_data.get(symbol, {}).get(timeframe) + + if df is not None and not df.empty: + try: + # Store last 100 candles to avoid duplicates + self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df.tail(100)) + logger.debug(f"Periodic storage: {symbol} {timeframe} ({len(df.tail(100))} candles)") + except Exception as e: + logger.warning(f"Error in periodic storage for {symbol} {timeframe}: {e}") + + except Exception as e: + logger.error(f"Error in periodic DuckDB storage worker: {e}") + + # Start worker thread + storage_thread = Thread(target=storage_worker, daemon=True, name="DuckDBStorageWorker") + storage_thread.start() + logger.info("Periodic DuckDB storage worker started") - def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame): - """Save monthly 1m data to cache""" + def _load_pivot_bounds_from_duckdb(self, symbol: str) -> Optional[PivotBounds]: + """Load pivot bounds from DuckDB""" + if not self.duckdb_storage: + return None + try: - cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" - df.to_parquet(cache_file, index=False) - logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}") + result = self.duckdb_storage.conn.execute(""" + SELECT bounds_data FROM pivot_bounds_cache WHERE symbol = ? + """, (symbol,)).fetchone() + + if result: + bounds_json = json.loads(result[0]) + bounds = PivotBounds( + symbol=bounds_json['symbol'], + price_max=bounds_json['price_max'], + price_min=bounds_json['price_min'], + volume_max=bounds_json['volume_max'], + volume_min=bounds_json['volume_min'], + pivot_support_levels=bounds_json['pivot_support_levels'], + pivot_resistance_levels=bounds_json['pivot_resistance_levels'], + pivot_context={}, + created_timestamp=datetime.fromisoformat(bounds_json['created_timestamp']), + data_period_start=datetime.fromisoformat(bounds_json['data_period_start']), + data_period_end=datetime.fromisoformat(bounds_json['data_period_end']), + total_candles_analyzed=bounds_json['total_candles_analyzed'] + ) + + # Check if cache is recent + age = datetime.now() - bounds.created_timestamp + if age < self.pivot_refresh_interval: + logger.debug(f"Loaded pivot bounds from DuckDB for {symbol}") + return bounds + except Exception as e: - logger.warning(f"Error saving monthly data to cache for {symbol}: {e}") + logger.debug(f"Could not load pivot bounds from DuckDB for {symbol}: {e}") + + return None def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]: """Get pivot bounds for a symbol""" diff --git a/core/duckdb_storage.py b/core/duckdb_storage.py index 7ee55c2..cc1ece3 100644 --- a/core/duckdb_storage.py +++ b/core/duckdb_storage.py @@ -30,8 +30,8 @@ class DuckDBStorage: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) - # Parquet storage directory - self.parquet_dir = self.db_path.parent / "parquet_store" + # Parquet storage directory (only for annotation snapshots) + self.parquet_dir = self.db_path.parent / "annotation_snapshots" self.parquet_dir.mkdir(parents=True, exist_ok=True) # Connect to DuckDB @@ -41,12 +41,42 @@ class DuckDBStorage: self._init_schema() logger.info(f"DuckDB storage initialized: {self.db_path}") - logger.info(f"Parquet storage: {self.parquet_dir}") + logger.info(f"Annotation snapshots: {self.parquet_dir}") def _init_schema(self): - """Initialize database schema with Parquet integration""" + """Initialize database schema - all data in DuckDB tables""" - # Create annotations table (metadata only) + # Create OHLCV data table - stores ALL candles + self.conn.execute(""" + CREATE SEQUENCE IF NOT EXISTS ohlcv_id_seq START 1 + """) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS ohlcv_data ( + id INTEGER PRIMARY KEY DEFAULT nextval('ohlcv_id_seq'), + symbol VARCHAR NOT NULL, + timeframe VARCHAR NOT NULL, + timestamp BIGINT NOT NULL, + open DOUBLE NOT NULL, + high DOUBLE NOT NULL, + low DOUBLE NOT NULL, + close DOUBLE NOT NULL, + volume DOUBLE NOT NULL, + created_at BIGINT NOT NULL, + UNIQUE(symbol, timeframe, timestamp) + ) + """) + + # Create indexes for fast queries + self.conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe + ON ohlcv_data(symbol, timeframe) + """) + self.conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_ohlcv_timestamp + ON ohlcv_data(timestamp) + """) + + # Create annotations table self.conn.execute(""" CREATE TABLE IF NOT EXISTS annotations ( annotation_id VARCHAR PRIMARY KEY, @@ -72,7 +102,7 @@ class DuckDBStorage: CREATE TABLE IF NOT EXISTS cache_metadata ( symbol VARCHAR NOT NULL, timeframe VARCHAR NOT NULL, - parquet_path VARCHAR NOT NULL, + parquet_path VARCHAR, first_timestamp BIGINT NOT NULL, last_timestamp BIGINT NOT NULL, candle_count INTEGER NOT NULL, @@ -81,11 +111,11 @@ class DuckDBStorage: ) """) - logger.info("DuckDB schema initialized") + logger.info("DuckDB schema initialized (all data in tables)") def store_ohlcv_data(self, symbol: str, timeframe: str, df: pd.DataFrame) -> int: """ - Store OHLCV data as Parquet file and register in DuckDB + Store OHLCV data directly in DuckDB table Args: symbol: Trading symbol @@ -113,38 +143,41 @@ class DuckDBStorage: # Add metadata df_copy['symbol'] = symbol df_copy['timeframe'] = timeframe + df_copy['created_at'] = int(datetime.now().timestamp() * 1000) - # Define parquet file path - parquet_file = self.parquet_dir / f"{symbol.replace('/', '_')}_{timeframe}.parquet" + # Select columns in correct order + columns = ['symbol', 'timeframe', 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'created_at'] + df_insert = df_copy[columns] - # Load existing data if file exists - if parquet_file.exists(): - try: - existing_df = pd.read_parquet(parquet_file) - # Combine with new data - df_copy = pd.concat([existing_df, df_copy], ignore_index=True) - # Remove duplicates - df_copy = df_copy.drop_duplicates(subset=['timestamp'], keep='last') - df_copy = df_copy.sort_values('timestamp') - except Exception as e: - logger.warning(f"Could not load existing parquet: {e}") + # Insert data directly into DuckDB (ignore duplicates) + # Note: id column is auto-generated, so we don't include it + self.conn.execute(""" + INSERT INTO ohlcv_data (symbol, timeframe, timestamp, open, high, low, close, volume, created_at) + SELECT symbol, timeframe, timestamp, open, high, low, close, volume, created_at + FROM df_insert + ON CONFLICT DO NOTHING + """) - # Save to parquet - df_copy.to_parquet(parquet_file, index=False, compression='snappy') + # Update metadata + result = self.conn.execute(""" + SELECT + MIN(timestamp) as first_ts, + MAX(timestamp) as last_ts, + COUNT(*) as count + FROM ohlcv_data + WHERE symbol = ? AND timeframe = ? + """, (symbol, timeframe)).fetchone() - # Update metadata in DuckDB - first_ts = int(df_copy['timestamp'].min()) - last_ts = int(df_copy['timestamp'].max()) - count = len(df_copy) + first_ts, last_ts, count = result now_ts = int(datetime.now().timestamp() * 1000) self.conn.execute(""" INSERT OR REPLACE INTO cache_metadata (symbol, timeframe, parquet_path, first_timestamp, last_timestamp, candle_count, last_update) VALUES (?, ?, ?, ?, ?, ?, ?) - """, (symbol, timeframe, str(parquet_file), first_ts, last_ts, count, now_ts)) + """, (symbol, timeframe, '', first_ts, last_ts, count, now_ts)) - logger.info(f"Stored {len(df)} candles for {symbol} {timeframe} in Parquet (total: {count})") + logger.info(f"Stored {len(df)} candles for {symbol} {timeframe} in DuckDB (total: {count})") return len(df) except Exception as e: @@ -158,7 +191,7 @@ class DuckDBStorage: end_time: Optional[datetime] = None, limit: Optional[int] = None) -> Optional[pd.DataFrame]: """ - Query OHLCV data directly from Parquet using DuckDB + Query OHLCV data directly from DuckDB table Args: symbol: Trading symbol @@ -171,26 +204,10 @@ class DuckDBStorage: DataFrame with OHLCV data """ try: - # Get parquet file path from metadata - result = self.conn.execute(""" - SELECT parquet_path FROM cache_metadata - WHERE symbol = ? AND timeframe = ? - """, (symbol, timeframe)).fetchone() - - if not result: - logger.debug(f"No data found for {symbol} {timeframe}") - return None - - parquet_path = result[0] - - if not Path(parquet_path).exists(): - logger.warning(f"Parquet file not found: {parquet_path}") - return None - - # Build query - DuckDB can query Parquet directly! - query = f""" + # Build query + query = """ SELECT timestamp, open, high, low, close, volume - FROM read_parquet('{parquet_path}') + FROM ohlcv_data WHERE symbol = ? AND timeframe = ? """ params = [symbol, timeframe] @@ -219,7 +236,7 @@ class DuckDBStorage: df = df.set_index('timestamp') df = df.sort_index() - logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe} from Parquet") + logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe} from DuckDB") return df except Exception as e: