caching system fixes
This commit is contained in:
@@ -2590,8 +2590,15 @@ class DataProvider:
|
|||||||
def _should_refresh_pivot_bounds(self, symbol: str) -> bool:
|
def _should_refresh_pivot_bounds(self, symbol: str) -> bool:
|
||||||
"""Check if pivot bounds need refreshing"""
|
"""Check if pivot bounds need refreshing"""
|
||||||
try:
|
try:
|
||||||
|
# Try to load from cache if not in memory
|
||||||
if symbol not in self.pivot_bounds:
|
if symbol not in self.pivot_bounds:
|
||||||
return True
|
cached_bounds = self._load_pivot_bounds_from_duckdb(symbol)
|
||||||
|
if cached_bounds:
|
||||||
|
self.pivot_bounds[symbol] = cached_bounds
|
||||||
|
logger.info(f"Loaded pivot bounds from cache for {symbol}")
|
||||||
|
return False # Cache is fresh, no need to refresh
|
||||||
|
else:
|
||||||
|
return True # No cache, need to refresh
|
||||||
|
|
||||||
bounds = self.pivot_bounds[symbol]
|
bounds = self.pivot_bounds[symbol]
|
||||||
age = datetime.now() - bounds.created_timestamp
|
age = datetime.now() - bounds.created_timestamp
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from datetime import datetime
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Tuple, Any
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -37,6 +38,14 @@ class DuckDBStorage:
|
|||||||
# Connect to DuckDB
|
# Connect to DuckDB
|
||||||
self.conn = duckdb.connect(str(self.db_path))
|
self.conn = duckdb.connect(str(self.db_path))
|
||||||
|
|
||||||
|
# Batch logging for compact output
|
||||||
|
self._batch_buffer = [] # List of (symbol, timeframe, count, total) tuples
|
||||||
|
self._batch_lock = threading.Lock()
|
||||||
|
self._batch_flush_timer = None
|
||||||
|
self._batch_flush_delay = 0.5 # Flush after 0.5 seconds of inactivity
|
||||||
|
self._batch_timer_lock = threading.Lock()
|
||||||
|
self._flush_in_progress = False
|
||||||
|
|
||||||
# Initialize schema
|
# Initialize schema
|
||||||
self._init_schema()
|
self._init_schema()
|
||||||
|
|
||||||
@@ -113,6 +122,52 @@ class DuckDBStorage:
|
|||||||
|
|
||||||
logger.info("DuckDB schema initialized (all data in tables)")
|
logger.info("DuckDB schema initialized (all data in tables)")
|
||||||
|
|
||||||
|
def _schedule_batch_flush(self):
|
||||||
|
"""Schedule a batch flush after delay, resetting timer on each call"""
|
||||||
|
with self._batch_timer_lock:
|
||||||
|
# Cancel existing timer if any
|
||||||
|
if self._batch_flush_timer:
|
||||||
|
self._batch_flush_timer.cancel()
|
||||||
|
|
||||||
|
# Start new timer that will flush after delay
|
||||||
|
self._batch_flush_timer = threading.Timer(self._batch_flush_delay, self._flush_batch_log)
|
||||||
|
self._batch_flush_timer.daemon = True
|
||||||
|
self._batch_flush_timer.start()
|
||||||
|
|
||||||
|
def _flush_batch_log(self):
|
||||||
|
"""Flush accumulated batch logs as a single compact line"""
|
||||||
|
with self._batch_lock:
|
||||||
|
if not self._batch_buffer or self._flush_in_progress:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._flush_in_progress = True
|
||||||
|
|
||||||
|
# Group by symbol for better readability
|
||||||
|
symbol_groups = {}
|
||||||
|
for symbol, timeframe, count, total in self._batch_buffer:
|
||||||
|
if symbol not in symbol_groups:
|
||||||
|
symbol_groups[symbol] = []
|
||||||
|
symbol_groups[symbol].append((timeframe, count, total))
|
||||||
|
|
||||||
|
# Build compact log message
|
||||||
|
parts = []
|
||||||
|
for symbol in sorted(symbol_groups.keys()):
|
||||||
|
symbol_parts = []
|
||||||
|
for timeframe, count, total in sorted(symbol_groups[symbol]):
|
||||||
|
symbol_parts.append(f"{timeframe}({count}, total: {total})")
|
||||||
|
parts.append(f"{symbol}: {', '.join(symbol_parts)}")
|
||||||
|
|
||||||
|
log_msg = "Stored candles batch: " + " | ".join(parts)
|
||||||
|
logger.info(log_msg)
|
||||||
|
|
||||||
|
# Clear buffer and reset flag
|
||||||
|
self._batch_buffer.clear()
|
||||||
|
self._flush_in_progress = False
|
||||||
|
|
||||||
|
# Clear timer reference after flushing
|
||||||
|
with self._batch_timer_lock:
|
||||||
|
self._batch_flush_timer = None
|
||||||
|
|
||||||
def store_ohlcv_data(self, symbol: str, timeframe: str, df: pd.DataFrame) -> int:
|
def store_ohlcv_data(self, symbol: str, timeframe: str, df: pd.DataFrame) -> int:
|
||||||
"""
|
"""
|
||||||
Store OHLCV data directly in DuckDB table
|
Store OHLCV data directly in DuckDB table
|
||||||
@@ -177,7 +232,11 @@ class DuckDBStorage:
|
|||||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
""", (symbol, timeframe, '', first_ts, last_ts, count, now_ts))
|
""", (symbol, timeframe, '', first_ts, last_ts, count, now_ts))
|
||||||
|
|
||||||
logger.info(f"Stored {len(df)} candles for {symbol} {timeframe} in DuckDB (total: {count})")
|
# Add to batch buffer instead of logging immediately
|
||||||
|
with self._batch_lock:
|
||||||
|
self._batch_buffer.append((symbol, timeframe, len(df), count))
|
||||||
|
self._schedule_batch_flush()
|
||||||
|
|
||||||
return len(df)
|
return len(df)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -534,6 +593,14 @@ class DuckDBStorage:
|
|||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Close database connection"""
|
"""Close database connection"""
|
||||||
|
# Cancel any pending timer
|
||||||
|
with self._batch_timer_lock:
|
||||||
|
if self._batch_flush_timer:
|
||||||
|
self._batch_flush_timer.cancel()
|
||||||
|
|
||||||
|
# Flush any pending batch logs
|
||||||
|
self._flush_batch_log()
|
||||||
|
|
||||||
if self.conn:
|
if self.conn:
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
logger.info("DuckDB connection closed")
|
logger.info("DuckDB connection closed")
|
||||||
|
|||||||
Reference in New Issue
Block a user