wip
This commit is contained in:
@@ -11,6 +11,7 @@ from datetime import datetime, timedelta
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -44,7 +45,7 @@ class HistoricalDataLoader:
|
||||
def get_data(self, symbol: str, timeframe: str,
|
||||
start_time: Optional[datetime] = None,
|
||||
end_time: Optional[datetime] = None,
|
||||
limit: int = 500,
|
||||
limit: int = 2500,
|
||||
direction: str = 'latest') -> Optional[pd.DataFrame]:
|
||||
"""
|
||||
Get historical data for symbol and timeframe
|
||||
@@ -60,12 +61,15 @@ class HistoricalDataLoader:
|
||||
Returns:
|
||||
DataFrame with OHLCV data or None if unavailable
|
||||
"""
|
||||
# Check memory cache first
|
||||
cache_key = f"{symbol}_{timeframe}_{start_time}_{end_time}_{limit}_{direction}"
|
||||
if cache_key in self.memory_cache:
|
||||
start_time_ms = time.time()
|
||||
|
||||
# Check memory cache first (exclude direction from cache key for infinite scroll)
|
||||
cache_key = f"{symbol}_{timeframe}_{start_time}_{end_time}_{limit}"
|
||||
if cache_key in self.memory_cache and direction == 'latest':
|
||||
cached_data, cached_time = self.memory_cache[cache_key]
|
||||
if datetime.now() - cached_time < self.cache_ttl:
|
||||
logger.debug(f"Returning cached data for {symbol} {timeframe}")
|
||||
elapsed_ms = (time.time() - start_time_ms) * 1000
|
||||
logger.debug(f"⚡ Memory cache hit for {symbol} {timeframe} ({elapsed_ms:.1f}ms)")
|
||||
return cached_data
|
||||
|
||||
try:
|
||||
@@ -77,7 +81,8 @@ class HistoricalDataLoader:
|
||||
if cached_df is not None and not cached_df.empty:
|
||||
# Use cached data if we have enough candles
|
||||
if len(cached_df) >= min(limit, 100): # Use cached if we have at least 100 candles
|
||||
logger.debug(f"Using DataProvider cached data for {symbol} {timeframe} ({len(cached_df)} candles)")
|
||||
elapsed_ms = (time.time() - start_time_ms) * 1000
|
||||
logger.debug(f"🚀 DataProvider cache hit for {symbol} {timeframe} ({len(cached_df)} candles, {elapsed_ms:.1f}ms)")
|
||||
|
||||
# Filter by time range with direction support
|
||||
filtered_df = self._filter_by_time_range(
|
||||
@@ -158,20 +163,29 @@ class HistoricalDataLoader:
|
||||
logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}")
|
||||
return df
|
||||
|
||||
# Fallback: Try DuckDB first, then fetch from API if needed
|
||||
if self.startup_mode:
|
||||
logger.info(f"Loading data for {symbol} {timeframe} (startup mode: allow stale cache)")
|
||||
df = self.data_provider.get_historical_data(
|
||||
# Check DuckDB first for historical data (always check for infinite scroll)
|
||||
if self.data_provider.duckdb_storage and (start_time or end_time):
|
||||
logger.info(f"Checking DuckDB for {symbol} {timeframe} historical data (direction={direction})")
|
||||
df = self.data_provider.duckdb_storage.get_ohlcv_data(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
allow_stale_cache=True
|
||||
direction=direction
|
||||
)
|
||||
else:
|
||||
# Check DuckDB first for historical data
|
||||
if self.data_provider.duckdb_storage and (start_time or end_time):
|
||||
logger.info(f"Checking DuckDB for {symbol} {timeframe} historical data (direction={direction})")
|
||||
df = self.data_provider.duckdb_storage.get_ohlcv_data(
|
||||
|
||||
if df is not None and not df.empty:
|
||||
elapsed_ms = (time.time() - start_time_ms) * 1000
|
||||
logger.info(f"✅ DuckDB hit for {symbol} {timeframe} ({len(df)} candles, {elapsed_ms:.1f}ms)")
|
||||
# Cache in memory
|
||||
self.memory_cache[cache_key] = (df.copy(), datetime.now())
|
||||
return df
|
||||
else:
|
||||
logger.info(f"📡 No data in DuckDB, fetching from exchange API for {symbol} {timeframe}")
|
||||
|
||||
# Fetch from exchange API with time range
|
||||
df = self._fetch_from_exchange_api(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
start_time=start_time,
|
||||
@@ -181,40 +195,35 @@ class HistoricalDataLoader:
|
||||
)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
logger.info(f"✅ Loaded {len(df)} candles from DuckDB for {symbol} {timeframe}")
|
||||
elapsed_ms = (time.time() - start_time_ms) * 1000
|
||||
logger.info(f"🌐 Exchange API hit for {symbol} {timeframe} ({len(df)} candles, {elapsed_ms:.1f}ms)")
|
||||
|
||||
# Store in DuckDB for future use
|
||||
if self.data_provider.duckdb_storage:
|
||||
stored_count = self.data_provider.duckdb_storage.store_ohlcv_data(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
df=df
|
||||
)
|
||||
logger.info(f"💾 Stored {stored_count} new candles in DuckDB")
|
||||
|
||||
# Cache in memory
|
||||
self.memory_cache[cache_key] = (df.copy(), datetime.now())
|
||||
return df
|
||||
else:
|
||||
logger.info(f"📡 No data in DuckDB, fetching from exchange API for {symbol} {timeframe}")
|
||||
|
||||
# Fetch from exchange API with time range
|
||||
df = self._fetch_from_exchange_api(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
direction=direction
|
||||
)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Store in DuckDB for future use
|
||||
if self.data_provider.duckdb_storage:
|
||||
stored_count = self.data_provider.duckdb_storage.store_ohlcv_data(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
df=df
|
||||
)
|
||||
logger.info(f"💾 Stored {stored_count} new candles in DuckDB")
|
||||
|
||||
# Cache in memory
|
||||
self.memory_cache[cache_key] = (df.copy(), datetime.now())
|
||||
return df
|
||||
else:
|
||||
logger.warning(f"No data available from exchange API for {symbol} {timeframe}")
|
||||
return None
|
||||
|
||||
logger.warning(f"No data available from exchange API for {symbol} {timeframe}")
|
||||
return None
|
||||
|
||||
# Fallback: Use DataProvider for latest data (startup mode or no time range)
|
||||
if self.startup_mode and not (start_time or end_time):
|
||||
logger.info(f"Loading data for {symbol} {timeframe} (startup mode: allow stale cache)")
|
||||
df = self.data_provider.get_historical_data(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
limit=limit,
|
||||
allow_stale_cache=True
|
||||
)
|
||||
else:
|
||||
# Fetch from API and store in DuckDB (no time range specified)
|
||||
logger.info(f"Fetching latest data from API for {symbol} {timeframe}")
|
||||
df = self.data_provider.get_historical_data(
|
||||
@@ -403,7 +412,7 @@ class HistoricalDataLoader:
|
||||
timeframes: List[str],
|
||||
start_time: Optional[datetime] = None,
|
||||
end_time: Optional[datetime] = None,
|
||||
limit: int = 500) -> Dict[str, pd.DataFrame]:
|
||||
limit: int = 2500) -> Dict[str, pd.DataFrame]:
|
||||
"""
|
||||
Get data for multiple timeframes at once
|
||||
|
||||
|
||||
Reference in New Issue
Block a user