This commit is contained in:
Dobromir Popov
2025-12-10 00:45:41 +02:00
parent c21d8cbea1
commit fadfa8c741
5 changed files with 256 additions and 117 deletions

View File

@@ -38,6 +38,10 @@ class DuckDBStorage:
# Connect to DuckDB
self.conn = duckdb.connect(str(self.db_path))
# CRITICAL: DuckDB connections are NOT thread-safe
# All database operations must be serialized with this lock
self._conn_lock = threading.RLock() # Use RLock to allow reentrant calls from same thread
# Batch logging for compact output
self._batch_buffer = [] # List of (symbol, timeframe, count, total) tuples
self._batch_lock = threading.Lock()
@@ -54,9 +58,10 @@ class DuckDBStorage:
def _init_schema(self):
"""Initialize database schema - all data in DuckDB tables"""
# Create OHLCV data table - stores ALL candles
self.conn.execute("""
# CRITICAL: Schema initialization must be serialized
with self._conn_lock:
# Create OHLCV data table - stores ALL candles
self.conn.execute("""
CREATE SEQUENCE IF NOT EXISTS ohlcv_id_seq START 1
""")
self.conn.execute("""
@@ -207,34 +212,36 @@ class DuckDBStorage:
# Insert data directly into DuckDB (ignore duplicates)
# Note: id column is auto-generated, so we don't include it
# Using INSERT OR IGNORE for better DuckDB compatibility
self.conn.execute("""
INSERT OR IGNORE INTO ohlcv_data (symbol, timeframe, timestamp, open, high, low, close, volume, created_at)
SELECT symbol, timeframe, timestamp, open, high, low, close, volume, created_at
FROM df_insert
""")
# Update metadata
result = self.conn.execute("""
SELECT
MIN(timestamp) as first_ts,
MAX(timestamp) as last_ts,
COUNT(*) as count
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
""", (symbol, timeframe)).fetchone()
# Handle case where no data exists yet
if result is None or result[0] is None:
first_ts, last_ts, count = 0, 0, 0
else:
first_ts, last_ts, count = result
now_ts = int(datetime.now().timestamp() * 1000)
self.conn.execute("""
INSERT OR REPLACE INTO cache_metadata
(symbol, timeframe, parquet_path, first_timestamp, last_timestamp, candle_count, last_update)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (symbol, timeframe, '', first_ts, last_ts, count, now_ts))
# CRITICAL: All DuckDB operations must be serialized with lock
with self._conn_lock:
self.conn.execute("""
INSERT OR IGNORE INTO ohlcv_data (symbol, timeframe, timestamp, open, high, low, close, volume, created_at)
SELECT symbol, timeframe, timestamp, open, high, low, close, volume, created_at
FROM df_insert
""")
# Update metadata
result = self.conn.execute("""
SELECT
MIN(timestamp) as first_ts,
MAX(timestamp) as last_ts,
COUNT(*) as count
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
""", (symbol, timeframe)).fetchone()
# Handle case where no data exists yet
if result is None or result[0] is None:
first_ts, last_ts, count = 0, 0, 0
else:
first_ts, last_ts, count = result
now_ts = int(datetime.now().timestamp() * 1000)
self.conn.execute("""
INSERT OR REPLACE INTO cache_metadata
(symbol, timeframe, parquet_path, first_timestamp, last_timestamp, candle_count, last_update)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (symbol, timeframe, '', first_ts, last_ts, count, now_ts))
# Add to batch buffer instead of logging immediately
with self._batch_lock:
@@ -303,8 +310,9 @@ class DuckDBStorage:
if limit:
query += f" LIMIT {limit}"
# Execute query
df = self.conn.execute(query, params).df()
# Execute query with thread-safe lock
with self._conn_lock:
df = self.conn.execute(query, params).df()
if df.empty:
return None
@@ -341,7 +349,8 @@ class DuckDBStorage:
WHERE symbol = ? AND timeframe = ?
"""
result = self.conn.execute(query, [symbol, timeframe]).fetchone()
with self._conn_lock:
result = self.conn.execute(query, [symbol, timeframe]).fetchone()
if result and result[0] is not None:
last_timestamp = pd.to_datetime(result[0], unit='ms', utc=True)
@@ -385,7 +394,8 @@ class DuckDBStorage:
limit
]
df = self.conn.execute(query, params).df()
with self._conn_lock:
df = self.conn.execute(query, params).df()
if df.empty:
return None
@@ -449,14 +459,15 @@ class DuckDBStorage:
df_copy.to_parquet(parquet_file, index=False, compression='snappy')
# Store annotation metadata in DuckDB
self.conn.execute("""
INSERT OR REPLACE INTO annotations
(annotation_id, symbol, timeframe, direction,
entry_timestamp, entry_price, exit_timestamp, exit_price,
profit_loss_pct, notes, created_at, market_context,
model_features, pivot_data, parquet_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
with self._conn_lock:
self.conn.execute("""
INSERT OR REPLACE INTO annotations
(annotation_id, symbol, timeframe, direction,
entry_timestamp, entry_price, exit_timestamp, exit_price,
profit_loss_pct, notes, created_at, market_context,
model_features, pivot_data, parquet_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
annotation_id,
annotation_data.get('symbol'),
annotation_data.get('timeframe'),
@@ -495,15 +506,16 @@ class DuckDBStorage:
"""
try:
# Get annotation metadata
result = self.conn.execute("""
SELECT * FROM annotations WHERE annotation_id = ?
""", (annotation_id,)).fetchone()
if not result:
return None
# Parse annotation data
columns = [desc[0] for desc in self.conn.description]
with self._conn_lock:
result = self.conn.execute("""
SELECT * FROM annotations WHERE annotation_id = ?
""", (annotation_id,)).fetchone()
if not result:
return None
# Parse annotation data
columns = [desc[0] for desc in self.conn.description]
annotation = dict(zip(columns, result))
# Parse JSON fields
@@ -520,11 +532,12 @@ class DuckDBStorage:
timeframe = parquet_file.stem
# Query parquet directly with DuckDB
df = self.conn.execute(f"""
SELECT timestamp, open, high, low, close, volume
FROM read_parquet('{parquet_file}')
ORDER BY timestamp
""").df()
with self._conn_lock:
df = self.conn.execute(f"""
SELECT timestamp, open, high, low, close, volume
FROM read_parquet('{parquet_file}')
ORDER BY timestamp
""").df()
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
@@ -550,12 +563,13 @@ class DuckDBStorage:
DataFrame with results
"""
try:
if params:
result = self.conn.execute(query, params)
else:
result = self.conn.execute(query)
return result.df()
with self._conn_lock:
if params:
result = self.conn.execute(query, params)
else:
result = self.conn.execute(query)
return result.df()
except Exception as e:
logger.error(f"Error executing query: {e}")
@@ -564,26 +578,27 @@ class DuckDBStorage:
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
try:
# Get OHLCV stats
ohlcv_stats = self.conn.execute("""
SELECT symbol, timeframe, candle_count, first_timestamp, last_timestamp
FROM cache_metadata
ORDER BY symbol, timeframe
""").df()
if not ohlcv_stats.empty:
ohlcv_stats['first_timestamp'] = pd.to_datetime(ohlcv_stats['first_timestamp'], unit='ms')
ohlcv_stats['last_timestamp'] = pd.to_datetime(ohlcv_stats['last_timestamp'], unit='ms')
# Get annotation count
annotation_count = self.conn.execute("""
SELECT COUNT(*) as count FROM annotations
""").fetchone()[0]
# Get total candles
total_candles = self.conn.execute("""
SELECT SUM(candle_count) as total FROM cache_metadata
""").fetchone()[0] or 0
with self._conn_lock:
# Get OHLCV stats
ohlcv_stats = self.conn.execute("""
SELECT symbol, timeframe, candle_count, first_timestamp, last_timestamp
FROM cache_metadata
ORDER BY symbol, timeframe
""").df()
if not ohlcv_stats.empty:
ohlcv_stats['first_timestamp'] = pd.to_datetime(ohlcv_stats['first_timestamp'], unit='ms')
ohlcv_stats['last_timestamp'] = pd.to_datetime(ohlcv_stats['last_timestamp'], unit='ms')
# Get annotation count
annotation_count = self.conn.execute("""
SELECT COUNT(*) as count FROM annotations
""").fetchone()[0]
# Get total candles
total_candles = self.conn.execute("""
SELECT SUM(candle_count) as total FROM cache_metadata
""").fetchone()[0] or 0
return {
'ohlcv_stats': ohlcv_stats.to_dict('records') if not ohlcv_stats.empty else [],