fix pivot cache db

This commit is contained in:
Dobromir Popov
2025-10-24 17:14:59 +03:00
parent 4b8f44d859
commit 420251f2d4
3 changed files with 308 additions and 124 deletions

View File

@@ -140,21 +140,20 @@ class AnnotationDashboard:
], className="container") ], className="container")
]) ])
# Initialize core components # Initialize core components (skip initial load for fast startup)
self.data_provider = DataProvider() if DataProvider else None self.data_provider = DataProvider(skip_initial_load=True) if DataProvider else None
# Enable unified storage for real-time data access # Enable unified storage for real-time data access
if self.data_provider: if self.data_provider:
self._enable_unified_storage_async() self._enable_unified_storage_async()
self.orchestrator = TradingOrchestrator( # ANNOTATE doesn't need orchestrator - skip ML model loading for fast startup
data_provider=self.data_provider self.orchestrator = None
) if TradingOrchestrator and self.data_provider else None
# Initialize ANNOTATE components # Initialize ANNOTATE components
self.annotation_manager = AnnotationManager() self.annotation_manager = AnnotationManager()
# Use REAL training adapter - NO SIMULATION! # Use REAL training adapter - NO SIMULATION!
self.training_adapter = RealTrainingAdapter(self.orchestrator, self.data_provider) self.training_adapter = RealTrainingAdapter(None, self.data_provider)
# Initialize data loader with existing DataProvider # Initialize data loader with existing DataProvider
self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None
@@ -206,56 +205,32 @@ class AnnotationDashboard:
storage_thread.start() storage_thread.start()
def _start_background_data_refresh(self): def _start_background_data_refresh(self):
"""Start background task to refresh recent data after startup""" """Start background task to refresh recent data after startup - ONCE ONLY"""
def refresh_recent_data(): def refresh_recent_data():
try: try:
import time import time
# Wait for app to fully start # Wait for app to fully start
time.sleep(5) time.sleep(5)
logger.info("🔄 Starting background data refresh (fetching only recent missing data)") logger.info("🔄 Starting one-time background data refresh (fetching only recent missing data)")
# Disable startup mode to fetch fresh data # Disable startup mode to fetch fresh data
self.data_loader.disable_startup_mode() self.data_loader.disable_startup_mode()
# Fetch only last 5 minutes of 1m data and 300 seconds of 1s data # Use the new on-demand refresh method
symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) logger.info("Using on-demand refresh for recent data")
self.data_provider.refresh_data_on_demand()
for symbol in symbols: logger.info("✅ One-time background data refresh completed")
try:
# Fetch last 5 candles of 1m data (5 minutes)
logger.info(f"Refreshing recent 1m data for {symbol}")
self.data_provider.get_historical_data(
symbol=symbol,
timeframe='1m',
limit=5,
refresh=True
)
# Fetch last 300 candles of 1s data (5 minutes)
logger.info(f"Refreshing recent 1s data for {symbol}")
self.data_provider.get_historical_data(
symbol=symbol,
timeframe='1s',
limit=300,
refresh=True
)
logger.info(f"✅ Refreshed recent data for {symbol}")
except Exception as e:
logger.warning(f"Could not refresh recent data for {symbol}: {e}")
logger.info("✅ Background data refresh completed")
except Exception as e: except Exception as e:
logger.error(f"Error in background data refresh: {e}") logger.error(f"Error in background data refresh: {e}")
# Start in background thread # Start refresh in background thread
import threading import threading
refresh_thread = threading.Thread(target=refresh_recent_data, daemon=True) refresh_thread = threading.Thread(target=refresh_recent_data, daemon=True)
refresh_thread.start() refresh_thread.start()
logger.info("📊 Background data refresh scheduled") logger.info("📊 One-time background data refresh scheduled")
def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict: def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict:
""" """

View File

@@ -29,7 +29,7 @@ import requests
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import pickle import pickle
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Callable from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -147,12 +147,19 @@ class DataSubscriber:
class DataProvider: class DataProvider:
"""Unified data provider for historical and real-time market data with centralized distribution""" """Unified data provider for historical and real-time market data with centralized distribution"""
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): def __init__(self, symbols: List[str] = None, timeframes: List[str] = None, skip_initial_load: bool = False):
"""Initialize the data provider""" """Initialize the data provider
Args:
symbols: List of trading symbols
timeframes: List of timeframes
skip_initial_load: If True, skip initial data load (use for ANNOTATE with DuckDB)
"""
self.config = get_config() self.config = get_config()
# Fixed symbols and timeframes # Fixed symbols and timeframes
self.symbols = ['ETH/USDT', 'BTC/USDT'] self.symbols = ['ETH/USDT', 'BTC/USDT']
self.timeframes = ['1s', '1m', '1h', '1d'] self.timeframes = ['1s', '1m', '1h', '1d']
self.skip_initial_load = skip_initial_load
# Data storage - cached OHLCV data (1500 candles each) # Data storage - cached OHLCV data (1500 candles each)
self.cached_data = {} # {symbol: {timeframe: DataFrame}} self.cached_data = {} # {symbol: {timeframe: DataFrame}}
@@ -346,6 +353,9 @@ class DataProvider:
self.retry_delay = 60 # 1 minute retry delay for 451 errors self.retry_delay = 60 # 1 minute retry delay for 451 errors
self.max_retries = 3 self.max_retries = 3
# Always load data from DuckDB immediately (synchronous for fast startup)
self._load_from_duckdb_sync()
# Start automatic data maintenance # Start automatic data maintenance
self.start_automatic_data_maintenance() self.start_automatic_data_maintenance()
@@ -531,75 +541,117 @@ class DataProvider:
logger.info("Automatic data maintenance stopped") logger.info("Automatic data maintenance stopped")
def _data_maintenance_worker(self): def _data_maintenance_worker(self):
"""Worker thread for automatic data maintenance""" """Worker thread for initial data load only - no continuous fetching"""
logger.info("Data maintenance worker started") logger.info("Data maintenance worker started")
# Initial data load # Initial data load (skip if requested)
self._initial_data_load() if not self.skip_initial_load:
self._initial_data_load()
else:
logger.info("Skipping initial data load (using DuckDB cache)")
# Track last update times for each symbol/timeframe logger.info("✅ Initial data load completed - stopping maintenance worker")
last_updates = {} logger.info("📊 Data will be updated on-demand only (no continuous fetching)")
for symbol in self.symbols:
last_updates[symbol] = {}
for timeframe in self.timeframes:
last_updates[symbol][timeframe] = 0
while self.data_maintenance_active: # Stop the maintenance worker after initial load
try: self.data_maintenance_active = False
current_time = time.time()
# Check each symbol/timeframe for updates
for symbol in self.symbols:
for timeframe in self.timeframes:
interval = self.timeframe_intervals[timeframe]
half_interval = interval / 2
# Update every half candle period
if current_time - last_updates[symbol][timeframe] >= half_interval:
self._update_cached_data(symbol, timeframe)
last_updates[symbol][timeframe] = current_time
# Sleep for 1 second before next check
time.sleep(1)
except Exception as e:
logger.error(f"Error in data maintenance worker: {e}")
time.sleep(10) # Wait longer on error
def _initial_data_load(self): def _initial_data_load(self):
"""Load initial 1500 candles for each symbol/timeframe""" """Smart incremental load - load from DuckDB + fetch only missing candles since last timestamp"""
logger.info("Starting initial data load (1500 candles each)") logger.info("Starting smart incremental data load")
for symbol in self.symbols: for symbol in self.symbols:
for timeframe in self.timeframes: for timeframe in self.timeframes:
try: try:
logger.info(f"Loading initial data for {symbol} {timeframe}") # Step 1: Load existing data from DuckDB (up to 1500 candles)
df = self._fetch_from_binance(symbol, timeframe, 1500) existing_df = None
last_timestamp = None
if df is None or df.empty: if self.duckdb_storage:
logger.warning(f"Binance failed for {symbol} {timeframe}, trying MEXC") try:
df = self._fetch_from_mexc(symbol, timeframe, 1500) # Load existing data
existing_df = self.duckdb_storage.get_ohlcv_data(
symbol=symbol,
timeframe=timeframe,
limit=1500
)
if existing_df is not None and not existing_df.empty:
# Store in memory cache
with self.data_lock:
self.cached_data[symbol][timeframe] = existing_df.tail(1500)
last_timestamp = existing_df.index.max()
logger.info(f"📦 Loaded {len(existing_df)} candles from DuckDB for {symbol} {timeframe}")
else:
logger.debug(f"No existing data in DuckDB for {symbol} {timeframe}")
except Exception as e:
logger.debug(f"Error loading from DuckDB for {symbol} {timeframe}: {e}")
if df is not None and not df.empty: # Step 2: Fetch only missing candles since last timestamp
# Ensure proper datetime index if last_timestamp:
df = self._ensure_datetime_index(df) # Calculate how many candles we might be missing
now = datetime.now(timezone.utc)
time_diff = (now - last_timestamp).total_seconds()
# Store in cached data # Estimate missing candles based on timeframe
self.cached_data[symbol][timeframe] = df timeframe_seconds = {'1s': 1, '1m': 60, '1h': 3600, '1d': 86400}
logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}") estimated_missing = int(time_diff / timeframe_seconds.get(timeframe, 60))
if estimated_missing > 2: # Only fetch if more than 2 candles missing
# Cap at 1500 candles maximum
fetch_limit = min(estimated_missing + 10, 1500)
logger.info(f"🔄 Fetching {fetch_limit} recent candles for {symbol} {timeframe} (since {last_timestamp})")
new_df = self._fetch_from_binance(symbol, timeframe, fetch_limit)
if new_df is None or new_df.empty:
new_df = self._fetch_from_mexc(symbol, timeframe, fetch_limit)
if new_df is not None and not new_df.empty:
new_df = self._ensure_datetime_index(new_df)
# Store to DuckDB
if self.duckdb_storage:
self.duckdb_storage.store_ohlcv_data(symbol, timeframe, new_df)
# Merge with existing data
with self.data_lock:
combined_df = pd.concat([existing_df, new_df], ignore_index=False)
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
combined_df = combined_df.sort_index()
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
logger.info(f"{symbol} {timeframe}: +{len(new_df)} new (total: {len(self.cached_data[symbol][timeframe])})")
else:
logger.info(f"{symbol} {timeframe}: Up to date ({len(existing_df)} candles)")
else: else:
logger.error(f"Failed to load initial data for {symbol} {timeframe}") # No existing data - fetch initial 1500 candles
logger.info(f"🆕 No existing data, fetching 1500 candles for {symbol} {timeframe}")
df = self._fetch_from_binance(symbol, timeframe, 1500)
if df is None or df.empty:
df = self._fetch_from_mexc(symbol, timeframe, 1500)
if df is not None and not df.empty:
df = self._ensure_datetime_index(df)
# Store to DuckDB
if self.duckdb_storage:
self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df)
with self.data_lock:
self.cached_data[symbol][timeframe] = df
logger.info(f"✅ Loaded {len(df)} candles for {symbol} {timeframe}")
# Rate limiting between requests # Small delay to avoid rate limits
time.sleep(0.5) time.sleep(0.1)
except Exception as e: except Exception as e:
logger.error(f"Error loading initial data for {symbol} {timeframe}: {e}") logger.error(f"Error loading data for {symbol} {timeframe}: {e}")
logger.info("Initial data load completed") logger.info("✅ Smart incremental data load completed")
# Start background candle catch-up with proper locking
self._start_background_catch_up()
def _start_background_catch_up(self): def _start_background_catch_up(self):
""" """
@@ -703,47 +755,92 @@ class DataProvider:
self.catch_up_completed = True self.catch_up_completed = True
def _update_cached_data(self, symbol: str, timeframe: str): def _update_cached_data(self, symbol: str, timeframe: str):
"""Update cached data by fetching last 2 candles and storing to DuckDB""" """Update cached data by fetching only missing candles since last timestamp"""
try: try:
# Fetch last 2 candles (outside lock - network I/O) # Get last timestamp from DuckDB
df = self._fetch_from_binance(symbol, timeframe, 2) last_timestamp = None
if self.duckdb_storage:
last_timestamp = self.duckdb_storage.get_last_timestamp(symbol, timeframe)
if df is None or df.empty: if last_timestamp:
df = self._fetch_from_mexc(symbol, timeframe, 2) # Calculate how many candles we might be missing
now = datetime.now(timezone.utc)
if df is not None and not df.empty: time_diff = (now - last_timestamp).total_seconds()
# Ensure proper datetime index
df = self._ensure_datetime_index(df)
# Store to DuckDB immediately (live data persistence) # Estimate missing candles based on timeframe
if self.duckdb_storage: timeframe_seconds = {'1s': 1, '1m': 60, '1h': 3600, '1d': 86400}
try: estimated_missing = int(time_diff / timeframe_seconds.get(timeframe, 60))
self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df)
logger.debug(f"Stored live data to DuckDB: {symbol} {timeframe} ({len(df)} candles)")
except Exception as e:
logger.warning(f"Could not store live data to DuckDB: {e}")
# Update cached data with lock if estimated_missing > 0:
with self.data_lock: # Cap at 1500 candles maximum
existing_df = self.cached_data[symbol][timeframe] fetch_limit = min(estimated_missing + 5, 1500)
if not existing_df.empty: logger.info(f"🔄 Fetching {fetch_limit} recent candles for {symbol} {timeframe} (since {last_timestamp})")
# Merge new data with existing, avoiding duplicates
combined_df = pd.concat([existing_df, df], ignore_index=False) # Fetch missing candles
combined_df = combined_df[~combined_df.index.duplicated(keep='last')] df = self._fetch_from_binance(symbol, timeframe, fetch_limit)
combined_df = combined_df.sort_index()
if df is None or df.empty:
df = self._fetch_from_mexc(symbol, timeframe, fetch_limit)
if df is not None and not df.empty:
df = self._ensure_datetime_index(df)
# Keep only last 1500 candles in memory # Store to DuckDB
self.cached_data[symbol][timeframe] = combined_df.tail(1500) if self.duckdb_storage:
try:
self.duckdb_storage.store_ohlcv_data(symbol, timeframe, df)
logger.debug(f"Stored live data to DuckDB: {symbol} {timeframe} ({len(df)} candles)")
except Exception as e:
logger.warning(f"Could not store live data to DuckDB: {e}")
# Update cached data with lock
with self.data_lock:
existing_df = self.cached_data[symbol][timeframe]
if not existing_df.empty:
# Merge new data with existing, avoiding duplicates
combined_df = pd.concat([existing_df, df], ignore_index=False)
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
combined_df = combined_df.sort_index()
# Keep only last 1500 candles in memory
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
else:
self.cached_data[symbol][timeframe] = df
candle_count = len(self.cached_data[symbol][timeframe])
logger.info(f"✅ Updated {symbol} {timeframe}: +{len(df)} new (total: {candle_count})")
else: else:
self.cached_data[symbol][timeframe] = df logger.warning(f"Could not fetch new data for {symbol} {timeframe}")
else:
candle_count = len(self.cached_data[symbol][timeframe]) logger.debug(f"{symbol} {timeframe}: Up to date")
else:
logger.debug(f"Updated cached data for {symbol} {timeframe}: {candle_count} candles") logger.debug(f"No existing data for {symbol} {timeframe} - skipping update")
except Exception as e: except Exception as e:
logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}") logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}")
def refresh_data_on_demand(self, symbol: str = None, timeframe: str = None):
"""Manually refresh data for specific symbol/timeframe or all symbols"""
try:
if symbol and timeframe:
# Refresh specific symbol/timeframe
logger.info(f"🔄 Manual refresh requested for {symbol} {timeframe}")
self._update_cached_data(symbol, timeframe)
else:
# Refresh all symbols/timeframes
logger.info("🔄 Manual refresh requested for all symbols/timeframes")
for sym in self.symbols:
for tf in self.timeframes:
self._update_cached_data(sym, tf)
time.sleep(0.1) # Small delay to avoid rate limits
logger.info("✅ Manual refresh completed for all symbols/timeframes")
except Exception as e:
logger.error(f"Error in manual refresh: {e}")
def start_cob_websocket_integration(self): def start_cob_websocket_integration(self):
"""Start COB WebSocket integration using COBIntegration class""" """Start COB WebSocket integration using COBIntegration class"""
@@ -1353,7 +1450,7 @@ class DataProvider:
# If we have a RangeIndex or other non-datetime index, create datetime index # If we have a RangeIndex or other non-datetime index, create datetime index
if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex): if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex):
# Use current UTC time and work backwards for realistic timestamps # Use current UTC time and work backwards for realistic timestamps
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
end_time = datetime.utcnow() end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=len(df)) start_time = end_time - timedelta(minutes=len(df))
df.index = pd.date_range(start=start_time, end=end_time, periods=len(df), tz='UTC') df.index = pd.date_range(start=start_time, end=end_time, periods=len(df), tz='UTC')
@@ -3007,6 +3104,40 @@ class DataProvider:
logger.debug(f"Error calculating RSI: {e}") logger.debug(f"Error calculating RSI: {e}")
return 50.0 # Default neutral value return 50.0 # Default neutral value
def _load_from_duckdb_sync(self):
"""Load all data from DuckDB synchronously for instant startup"""
if not self.duckdb_storage:
logger.warning("⚠️ DuckDB storage not available - cannot load cached data")
return
logger.info("📦 Loading cached data from DuckDB...")
loaded_count = 0
for symbol in self.symbols:
for timeframe in self.timeframes:
try:
df = self.duckdb_storage.get_ohlcv_data(
symbol=symbol,
timeframe=timeframe,
limit=1500
)
if df is not None and not df.empty:
with self.data_lock:
self.cached_data[symbol][timeframe] = df.tail(1500)
logger.info(f"{symbol} {timeframe}: {len(df)} candles from DuckDB")
loaded_count += len(df)
else:
logger.debug(f"No data in DuckDB for {symbol} {timeframe} - will fetch from API")
except Exception as e:
logger.error(f"❌ Error loading {symbol} {timeframe}: {e}")
if loaded_count > 0:
logger.info(f"✅ Loaded {loaded_count:,} candles total")
else:
logger.warning("⚠️ No cached data found - will fetch from API")
def _load_from_duckdb(self, symbol: str, timeframe: str, limit: int = 1500) -> Optional[pd.DataFrame]: def _load_from_duckdb(self, symbol: str, timeframe: str, limit: int = 1500) -> Optional[pd.DataFrame]:
"""Load data from DuckDB storage """Load data from DuckDB storage

View File

@@ -245,6 +245,84 @@ class DuckDBStorage:
traceback.print_exc() traceback.print_exc()
return None return None
def get_last_timestamp(self, symbol: str, timeframe: str) -> Optional[datetime]:
"""
Get the last timestamp for a symbol/timeframe from DuckDB
Args:
symbol: Trading symbol
timeframe: Timeframe
Returns:
Last timestamp or None if no data exists
"""
try:
query = """
SELECT MAX(timestamp) as last_timestamp
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
"""
result = self.conn.execute(query, [symbol, timeframe]).fetchone()
if result and result[0] is not None:
last_timestamp = pd.to_datetime(result[0], unit='ms', utc=True)
logger.debug(f"Last timestamp for {symbol} {timeframe}: {last_timestamp}")
return last_timestamp
return None
except Exception as e:
logger.error(f"Error getting last timestamp for {symbol} {timeframe}: {e}")
return None
def get_ohlcv_data_since_timestamp(self, symbol: str, timeframe: str,
since_timestamp: datetime,
limit: int = 1500) -> Optional[pd.DataFrame]:
"""
Get OHLCV data since a specific timestamp, capped at limit
Args:
symbol: Trading symbol
timeframe: Timeframe
since_timestamp: Get data since this timestamp
limit: Maximum number of candles (default 1500)
Returns:
DataFrame with OHLCV data since timestamp
"""
try:
query = """
SELECT timestamp, open, high, low, close, volume
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ? AND timestamp > ?
ORDER BY timestamp ASC
LIMIT ?
"""
params = [
symbol,
timeframe,
int(since_timestamp.timestamp() * 1000),
limit
]
df = self.conn.execute(query, params).df()
if df.empty:
return None
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
df = df.set_index('timestamp')
logger.debug(f"Retrieved {len(df)} candles for {symbol} {timeframe} since {since_timestamp}")
return df
except Exception as e:
logger.error(f"Error retrieving OHLCV data since timestamp: {e}")
return None
def store_annotation(self, annotation_id: str, annotation_data: Dict[str, Any], def store_annotation(self, annotation_id: str, annotation_data: Dict[str, Any],
market_snapshots: Dict[str, pd.DataFrame], market_snapshots: Dict[str, pd.DataFrame],
model_predictions: Optional[List[Dict]] = None) -> bool: model_predictions: Optional[List[Dict]] = None) -> bool: