diff --git a/core/data_provider.py b/core/data_provider.py index d6fa99a..12c7727 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -2590,8 +2590,15 @@ class DataProvider: def _should_refresh_pivot_bounds(self, symbol: str) -> bool: """Check if pivot bounds need refreshing""" try: + # Try to load from cache if not in memory if symbol not in self.pivot_bounds: - return True + cached_bounds = self._load_pivot_bounds_from_duckdb(symbol) + if cached_bounds: + self.pivot_bounds[symbol] = cached_bounds + logger.info(f"Loaded pivot bounds from cache for {symbol}") + return False # Cache is fresh, no need to refresh + else: + return True # No cache, need to refresh bounds = self.pivot_bounds[symbol] age = datetime.now() - bounds.created_timestamp diff --git a/core/duckdb_storage.py b/core/duckdb_storage.py index fd3ab49..9ac6a41 100644 --- a/core/duckdb_storage.py +++ b/core/duckdb_storage.py @@ -18,6 +18,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any import json +import threading logger = logging.getLogger(__name__) @@ -37,6 +38,14 @@ class DuckDBStorage: # Connect to DuckDB self.conn = duckdb.connect(str(self.db_path)) + # Batch logging for compact output + self._batch_buffer = [] # List of (symbol, timeframe, count, total) tuples + self._batch_lock = threading.Lock() + self._batch_flush_timer = None + self._batch_flush_delay = 0.5 # Flush after 0.5 seconds of inactivity + self._batch_timer_lock = threading.Lock() + self._flush_in_progress = False + # Initialize schema self._init_schema() @@ -113,6 +122,52 @@ class DuckDBStorage: logger.info("DuckDB schema initialized (all data in tables)") + def _schedule_batch_flush(self): + """Schedule a batch flush after delay, resetting timer on each call""" + with self._batch_timer_lock: + # Cancel existing timer if any + if self._batch_flush_timer: + self._batch_flush_timer.cancel() + + # Start new timer that will flush after delay + self._batch_flush_timer = threading.Timer(self._batch_flush_delay, self._flush_batch_log) + self._batch_flush_timer.daemon = True + self._batch_flush_timer.start() + + def _flush_batch_log(self): + """Flush accumulated batch logs as a single compact line""" + with self._batch_lock: + if not self._batch_buffer or self._flush_in_progress: + return + + self._flush_in_progress = True + + # Group by symbol for better readability + symbol_groups = {} + for symbol, timeframe, count, total in self._batch_buffer: + if symbol not in symbol_groups: + symbol_groups[symbol] = [] + symbol_groups[symbol].append((timeframe, count, total)) + + # Build compact log message + parts = [] + for symbol in sorted(symbol_groups.keys()): + symbol_parts = [] + for timeframe, count, total in sorted(symbol_groups[symbol]): + symbol_parts.append(f"{timeframe}({count}, total: {total})") + parts.append(f"{symbol}: {', '.join(symbol_parts)}") + + log_msg = "Stored candles batch: " + " | ".join(parts) + logger.info(log_msg) + + # Clear buffer and reset flag + self._batch_buffer.clear() + self._flush_in_progress = False + + # Clear timer reference after flushing + with self._batch_timer_lock: + self._batch_flush_timer = None + def store_ohlcv_data(self, symbol: str, timeframe: str, df: pd.DataFrame) -> int: """ Store OHLCV data directly in DuckDB table @@ -177,7 +232,11 @@ class DuckDBStorage: VALUES (?, ?, ?, ?, ?, ?, ?) """, (symbol, timeframe, '', first_ts, last_ts, count, now_ts)) - logger.info(f"Stored {len(df)} candles for {symbol} {timeframe} in DuckDB (total: {count})") + # Add to batch buffer instead of logging immediately + with self._batch_lock: + self._batch_buffer.append((symbol, timeframe, len(df), count)) + self._schedule_batch_flush() + return len(df) except Exception as e: @@ -534,6 +593,14 @@ class DuckDBStorage: def close(self): """Close database connection""" + # Cancel any pending timer + with self._batch_timer_lock: + if self._batch_flush_timer: + self._batch_flush_timer.cancel() + + # Flush any pending batch logs + self._flush_batch_log() + if self.conn: self.conn.close() logger.info("DuckDB connection closed")