decouple external API and local data consumption
This commit is contained in:
@ -109,19 +109,26 @@ class DataProvider:
|
||||
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
|
||||
"""Initialize the data provider"""
|
||||
self.config = get_config()
|
||||
self.symbols = symbols or self.config.symbols
|
||||
self.timeframes = timeframes or self.config.timeframes
|
||||
# Fixed symbols and timeframes for caching
|
||||
self.symbols = ['ETH/USDT', 'BTC/USDT']
|
||||
self.timeframes = ['1s', '1m', '1h', '1d']
|
||||
|
||||
# Cache settings (initialize first)
|
||||
self.cache_enabled = self.config.data.get('cache_enabled', True)
|
||||
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
|
||||
self.cache_enabled = True
|
||||
self.cache_dir = Path('cache')
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Data storage
|
||||
self.historical_data = {} # {symbol: {timeframe: DataFrame}}
|
||||
# Data storage - cached OHLCV data (1500 candles each)
|
||||
self.cached_data = {} # {symbol: {timeframe: DataFrame}}
|
||||
self.real_time_data = {} # {symbol: {timeframe: deque}}
|
||||
self.current_prices = {} # {symbol: float}
|
||||
|
||||
# Initialize cached data structure
|
||||
for symbol in self.symbols:
|
||||
self.cached_data[symbol] = {}
|
||||
for timeframe in self.timeframes:
|
||||
self.cached_data[symbol][timeframe] = pd.DataFrame()
|
||||
|
||||
# Pivot-based normalization system
|
||||
self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds}
|
||||
self.pivot_cache_dir = self.cache_dir / 'pivot_bounds'
|
||||
@ -224,12 +231,6 @@ class DataProvider:
|
||||
self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data
|
||||
self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer
|
||||
|
||||
# Pre-built OHLCV cache for instant BaseDataInput building (optimization from SimplifiedDataIntegration)
|
||||
self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}}
|
||||
self._ohlcv_cache_lock = Lock()
|
||||
self._last_cache_update = {} # {symbol: {timeframe: datetime}}
|
||||
self._cache_refresh_interval = 5 # seconds
|
||||
|
||||
# Data collection threads
|
||||
self.data_collection_active = False
|
||||
|
||||
@ -246,8 +247,21 @@ class DataProvider:
|
||||
self.bucket_sizes = [1, 10] # $1 and $10 buckets
|
||||
self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes}
|
||||
|
||||
# Automatic data maintenance
|
||||
self.data_maintenance_active = False
|
||||
self.data_maintenance_thread = None
|
||||
|
||||
# Timeframe intervals in seconds for automatic updates
|
||||
self.timeframe_intervals = {
|
||||
'1s': 1,
|
||||
'1m': 60,
|
||||
'1h': 3600,
|
||||
'1d': 86400
|
||||
}
|
||||
|
||||
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
||||
logger.info(f"Timeframes: {self.timeframes}")
|
||||
logger.info("Automatic data maintenance enabled")
|
||||
logger.info("Centralized data distribution enabled")
|
||||
logger.info("Pivot-based normalization system enabled")
|
||||
logger.info("Williams Market Structure integration enabled")
|
||||
@ -255,13 +269,134 @@ class DataProvider:
|
||||
|
||||
# Rate limiting
|
||||
self.last_request_time = {}
|
||||
self.request_interval = 0.2 # 200ms between requests
|
||||
self.request_interval = 0.5 # 500ms between requests to avoid rate limits
|
||||
self.retry_delay = 60 # 1 minute retry delay for 451 errors
|
||||
self.max_retries = 3
|
||||
|
||||
# Start automatic data maintenance
|
||||
self.start_automatic_data_maintenance()
|
||||
|
||||
# Start COB integration
|
||||
self.start_cob_integration()
|
||||
|
||||
def start_automatic_data_maintenance(self):
|
||||
"""Start automatic data maintenance system"""
|
||||
if self.data_maintenance_active:
|
||||
logger.warning("Data maintenance already active")
|
||||
return
|
||||
|
||||
self.data_maintenance_active = True
|
||||
self.data_maintenance_thread = Thread(target=self._data_maintenance_worker, daemon=True)
|
||||
self.data_maintenance_thread.start()
|
||||
logger.info("Automatic data maintenance started")
|
||||
|
||||
def stop_automatic_data_maintenance(self):
|
||||
"""Stop automatic data maintenance system"""
|
||||
self.data_maintenance_active = False
|
||||
if self.data_maintenance_thread and self.data_maintenance_thread.is_alive():
|
||||
self.data_maintenance_thread.join(timeout=5)
|
||||
logger.info("Automatic data maintenance stopped")
|
||||
|
||||
def _data_maintenance_worker(self):
|
||||
"""Worker thread for automatic data maintenance"""
|
||||
logger.info("Data maintenance worker started")
|
||||
|
||||
# Initial data load
|
||||
self._initial_data_load()
|
||||
|
||||
# Track last update times for each symbol/timeframe
|
||||
last_updates = {}
|
||||
for symbol in self.symbols:
|
||||
last_updates[symbol] = {}
|
||||
for timeframe in self.timeframes:
|
||||
last_updates[symbol][timeframe] = 0
|
||||
|
||||
while self.data_maintenance_active:
|
||||
try:
|
||||
current_time = time.time()
|
||||
|
||||
# Check each symbol/timeframe for updates
|
||||
for symbol in self.symbols:
|
||||
for timeframe in self.timeframes:
|
||||
interval = self.timeframe_intervals[timeframe]
|
||||
half_interval = interval / 2
|
||||
|
||||
# Update every half candle period
|
||||
if current_time - last_updates[symbol][timeframe] >= half_interval:
|
||||
self._update_cached_data(symbol, timeframe)
|
||||
last_updates[symbol][timeframe] = current_time
|
||||
|
||||
# Sleep for 1 second before next check
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in data maintenance worker: {e}")
|
||||
time.sleep(10) # Wait longer on error
|
||||
|
||||
def _initial_data_load(self):
|
||||
"""Load initial 1500 candles for each symbol/timeframe"""
|
||||
logger.info("Starting initial data load (1500 candles each)")
|
||||
|
||||
for symbol in self.symbols:
|
||||
for timeframe in self.timeframes:
|
||||
try:
|
||||
logger.info(f"Loading initial data for {symbol} {timeframe}")
|
||||
df = self._fetch_from_binance(symbol, timeframe, 1500)
|
||||
|
||||
if df is None or df.empty:
|
||||
logger.warning(f"Binance failed for {symbol} {timeframe}, trying MEXC")
|
||||
df = self._fetch_from_mexc(symbol, timeframe, 1500)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
|
||||
# Store in cached data
|
||||
self.cached_data[symbol][timeframe] = df
|
||||
logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}")
|
||||
else:
|
||||
logger.error(f"Failed to load initial data for {symbol} {timeframe}")
|
||||
|
||||
# Rate limiting between requests
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading initial data for {symbol} {timeframe}: {e}")
|
||||
|
||||
logger.info("Initial data load completed")
|
||||
|
||||
def _update_cached_data(self, symbol: str, timeframe: str):
|
||||
"""Update cached data by fetching last 2 candles"""
|
||||
try:
|
||||
# Fetch last 2 candles
|
||||
df = self._fetch_from_binance(symbol, timeframe, 2)
|
||||
|
||||
if df is None or df.empty:
|
||||
df = self._fetch_from_mexc(symbol, timeframe, 2)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
|
||||
# Get existing cached data
|
||||
existing_df = self.cached_data[symbol][timeframe]
|
||||
|
||||
if not existing_df.empty:
|
||||
# Merge new data with existing, avoiding duplicates
|
||||
combined_df = pd.concat([existing_df, df], ignore_index=False)
|
||||
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
|
||||
combined_df = combined_df.sort_index()
|
||||
|
||||
# Keep only last 1500 candles
|
||||
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
|
||||
else:
|
||||
self.cached_data[symbol][timeframe] = df
|
||||
|
||||
logger.debug(f"Updated cached data for {symbol} {timeframe}: {len(self.cached_data[symbol][timeframe])} candles")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}")
|
||||
|
||||
def start_cob_integration(self):
|
||||
"""Starts the COB integration in a background thread."""
|
||||
cob_thread = Thread(target=self._run_cob_integration, daemon=True)
|
||||
@ -306,182 +441,23 @@ class DataProvider:
|
||||
return df
|
||||
|
||||
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
|
||||
"""Get historical OHLCV data for a symbol and timeframe"""
|
||||
"""Get historical OHLCV data from cache only - no external API calls"""
|
||||
try:
|
||||
# If refresh=True, always fetch fresh data (skip cache for real-time updates)
|
||||
if not refresh:
|
||||
if self.cache_enabled:
|
||||
cached_data = self._load_from_cache(symbol, timeframe)
|
||||
if cached_data is not None and len(cached_data) >= limit * 0.8:
|
||||
# Ensure proper datetime index for cached data
|
||||
cached_data = self._ensure_datetime_index(cached_data)
|
||||
# logger.info(f"Using cached data for {symbol} {timeframe}")
|
||||
return cached_data.tail(limit)
|
||||
|
||||
# Check if we need to preload 300s of data for first load
|
||||
should_preload = self._should_preload_data(symbol, timeframe, limit)
|
||||
# Only return cached data - never trigger external API calls
|
||||
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
|
||||
cached_df = self.cached_data[symbol][timeframe]
|
||||
if not cached_df.empty:
|
||||
# Return requested amount from cached data
|
||||
return cached_df.tail(limit)
|
||||
|
||||
if should_preload:
|
||||
logger.info(f"Preloading 300s of data for {symbol} {timeframe}")
|
||||
df = self._preload_300s_data(symbol, timeframe)
|
||||
else:
|
||||
# Fetch from API with requested limit (Binance primary, MEXC fallback)
|
||||
logger.info(f"Fetching historical data for {symbol} {timeframe}")
|
||||
df = self._fetch_from_binance(symbol, timeframe, limit)
|
||||
|
||||
# Fallback to MEXC if Binance fails
|
||||
if df is None or df.empty:
|
||||
logger.info(f"Binance failed, trying MEXC fallback for {symbol}")
|
||||
df = self._fetch_from_mexc(symbol, timeframe, limit)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
|
||||
# Add technical indicators. temporarily disabled to save time as it is not working as expected.
|
||||
# df = self._add_technical_indicators(df)
|
||||
|
||||
# Cache the data
|
||||
if self.cache_enabled:
|
||||
self._save_to_cache(df, symbol, timeframe)
|
||||
|
||||
# Store in memory
|
||||
if symbol not in self.historical_data:
|
||||
self.historical_data[symbol] = {}
|
||||
self.historical_data[symbol][timeframe] = df
|
||||
|
||||
# Return requested amount
|
||||
return df.tail(limit)
|
||||
|
||||
logger.warning(f"No data received for {symbol} {timeframe}")
|
||||
logger.warning(f"No cached data available for {symbol} {timeframe}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}")
|
||||
logger.error(f"Error getting cached data for {symbol} {timeframe}: {e}")
|
||||
return None
|
||||
|
||||
def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool:
|
||||
"""Determine if we should preload 300s of data"""
|
||||
try:
|
||||
# Check if we have any cached data
|
||||
if self.cache_enabled:
|
||||
cached_data = self._load_from_cache(symbol, timeframe)
|
||||
if cached_data is not None and len(cached_data) > 0:
|
||||
return False # Already have some data
|
||||
|
||||
# Check if we have data in memory
|
||||
if (symbol in self.historical_data and
|
||||
timeframe in self.historical_data[symbol] and
|
||||
len(self.historical_data[symbol][timeframe]) > 0):
|
||||
return False # Already have data in memory
|
||||
|
||||
# Calculate if 300s worth of data would be more than requested limit
|
||||
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
||||
candles_in_300s = 300 // timeframe_seconds
|
||||
|
||||
# Preload if we need more than the requested limit or if it's a short timeframe
|
||||
if candles_in_300s > limit or timeframe in ['1s', '1m']:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error determining if should preload data: {e}")
|
||||
return False
|
||||
|
||||
def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
|
||||
"""Preload 300 seconds worth of data for better initial performance"""
|
||||
try:
|
||||
# Calculate how many candles we need for 300 seconds
|
||||
timeframe_seconds = self.timeframe_seconds.get(timeframe, 60)
|
||||
candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles
|
||||
|
||||
# For very short timeframes, limit to reasonable amount
|
||||
if timeframe == '1s':
|
||||
candles_needed = min(candles_needed, 300) # Max 300 1s candles
|
||||
elif timeframe == '1m':
|
||||
candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour)
|
||||
else:
|
||||
candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes
|
||||
|
||||
logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)")
|
||||
|
||||
# Fetch the data (Binance primary, MEXC fallback)
|
||||
df = self._fetch_from_binance(symbol, timeframe, candles_needed)
|
||||
|
||||
# Fallback to MEXC if Binance fails
|
||||
if df is None or df.empty:
|
||||
logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}")
|
||||
df = self._fetch_from_mexc(symbol, timeframe, candles_needed)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}")
|
||||
return df
|
||||
else:
|
||||
logger.warning(f"Failed to preload data for {symbol} {timeframe}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}")
|
||||
return None
|
||||
|
||||
def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]:
|
||||
"""Preload 300s of data for all symbols and timeframes"""
|
||||
try:
|
||||
if timeframes is None:
|
||||
timeframes = self.timeframes
|
||||
|
||||
preload_results = {}
|
||||
|
||||
for symbol in self.symbols:
|
||||
preload_results[symbol] = {}
|
||||
|
||||
for timeframe in timeframes:
|
||||
try:
|
||||
logger.info(f"Preloading data for {symbol} {timeframe}")
|
||||
|
||||
# Check if we should preload
|
||||
if self._should_preload_data(symbol, timeframe, 100):
|
||||
df = self._preload_300s_data(symbol, timeframe)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Add technical indicators
|
||||
df = self._add_technical_indicators(df)
|
||||
|
||||
# Cache the data
|
||||
if self.cache_enabled:
|
||||
self._save_to_cache(df, symbol, timeframe)
|
||||
|
||||
# Store in memory
|
||||
if symbol not in self.historical_data:
|
||||
self.historical_data[symbol] = {}
|
||||
self.historical_data[symbol][timeframe] = df
|
||||
|
||||
preload_results[symbol][timeframe] = True
|
||||
logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}")
|
||||
else:
|
||||
preload_results[symbol][timeframe] = False
|
||||
logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}")
|
||||
else:
|
||||
preload_results[symbol][timeframe] = True # Already have data
|
||||
logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error preloading {symbol} {timeframe}: {e}")
|
||||
preload_results[symbol][timeframe] = False
|
||||
|
||||
# Log summary
|
||||
total_pairs = len(self.symbols) * len(timeframes)
|
||||
successful_pairs = sum(1 for symbol_results in preload_results.values()
|
||||
for success in symbol_results.values() if success)
|
||||
|
||||
logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded")
|
||||
|
||||
return preload_results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in preload_all_symbols_data: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
|
||||
"""Fetch data from MEXC API (fallback data source when Binance is unavailable)"""
|
||||
@ -1445,60 +1421,33 @@ class DataProvider:
|
||||
return None
|
||||
|
||||
def _get_cached_ohlcv_bars(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']:
|
||||
"""Get OHLCV data list from pre-built cache for instant access"""
|
||||
try:
|
||||
with self._ohlcv_cache_lock:
|
||||
cache_key = f"{symbol}_{timeframe}"
|
||||
|
||||
# Check if we have fresh cached data (updated within last 5 seconds)
|
||||
last_update = self._last_cache_update.get(cache_key)
|
||||
if (last_update and
|
||||
(datetime.now() - last_update).total_seconds() < self._cache_refresh_interval and
|
||||
cache_key in self._ohlcv_cache):
|
||||
|
||||
cached_data = self._ohlcv_cache[cache_key]
|
||||
return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data
|
||||
|
||||
# Need to rebuild cache for this symbol/timeframe
|
||||
data_list = self._build_ohlcv_bar_cache(symbol, timeframe, max_count)
|
||||
|
||||
# Cache the result
|
||||
self._ohlcv_cache[cache_key] = data_list
|
||||
self._last_cache_update[cache_key] = datetime.now()
|
||||
|
||||
return data_list[-max_count:] if len(data_list) >= max_count else data_list
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}")
|
||||
return []
|
||||
|
||||
def _build_ohlcv_bar_cache(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']:
|
||||
"""Build OHLCV bar cache from historical and current data"""
|
||||
"""Get OHLCV data list from cached data"""
|
||||
try:
|
||||
from .data_models import OHLCVBar
|
||||
data_list = []
|
||||
|
||||
# Get historical data first (this should be fast as it's already cached)
|
||||
historical_df = self.get_historical_data(symbol, timeframe, limit=max_count)
|
||||
if historical_df is not None and not historical_df.empty:
|
||||
# Convert historical data to OHLCVBar objects
|
||||
for idx, row in historical_df.tail(max_count).iterrows():
|
||||
bar = OHLCVBar(
|
||||
symbol=symbol,
|
||||
timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(),
|
||||
open=float(row['open']),
|
||||
high=float(row['high']),
|
||||
low=float(row['low']),
|
||||
close=float(row['close']),
|
||||
volume=float(row['volume']),
|
||||
timeframe=timeframe
|
||||
)
|
||||
data_list.append(bar)
|
||||
# Get cached data
|
||||
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
|
||||
cached_df = self.cached_data[symbol][timeframe]
|
||||
if not cached_df.empty:
|
||||
# Convert cached data to OHLCVBar objects
|
||||
for idx, row in cached_df.tail(max_count).iterrows():
|
||||
bar = OHLCVBar(
|
||||
symbol=symbol,
|
||||
timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(),
|
||||
open=float(row['open']),
|
||||
high=float(row['high']),
|
||||
low=float(row['low']),
|
||||
close=float(row['close']),
|
||||
volume=float(row['volume']),
|
||||
timeframe=timeframe
|
||||
)
|
||||
data_list.append(bar)
|
||||
|
||||
return data_list
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error building OHLCV bar cache for {symbol}/{timeframe}: {e}")
|
||||
logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}")
|
||||
return []
|
||||
|
||||
def _get_latest_technical_indicators(self, symbol: str) -> Dict[str, float]:
|
||||
@ -1548,19 +1497,7 @@ class DataProvider:
|
||||
logger.error(f"Error getting COB data object for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def invalidate_ohlcv_cache(self, symbol: str):
|
||||
"""Invalidate OHLCV cache for a symbol when new data arrives"""
|
||||
try:
|
||||
with self._ohlcv_cache_lock:
|
||||
# Remove cached data for all timeframes of this symbol
|
||||
keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")]
|
||||
for key in keys_to_remove:
|
||||
if key in self._ohlcv_cache:
|
||||
del self._ohlcv_cache[key]
|
||||
if key in self._last_cache_update:
|
||||
del self._last_cache_update[key]
|
||||
except Exception as e:
|
||||
logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}")
|
||||
|
||||
|
||||
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""Add basic indicators for small datasets"""
|
||||
@ -1997,12 +1934,12 @@ class DataProvider:
|
||||
logger.error(f"Error updating candle for {symbol} {timeframe}: {e}")
|
||||
|
||||
def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
|
||||
"""Get the latest candles combining historical and real-time data"""
|
||||
"""Get the latest candles from cached data only"""
|
||||
try:
|
||||
# Get historical data
|
||||
historical_df = self.get_historical_data(symbol, timeframe, limit=limit)
|
||||
# Get cached data
|
||||
cached_df = self.get_historical_data(symbol, timeframe, limit=limit)
|
||||
|
||||
# Get real-time data
|
||||
# Get real-time data if available
|
||||
with self.data_lock:
|
||||
if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]:
|
||||
real_time_candles = list(self.real_time_data[symbol][timeframe])
|
||||
@ -2011,42 +1948,38 @@ class DataProvider:
|
||||
# Convert to DataFrame
|
||||
rt_df = pd.DataFrame(real_time_candles)
|
||||
|
||||
if historical_df is not None:
|
||||
# Combine historical and real-time
|
||||
# Remove overlapping candles from historical data
|
||||
if cached_df is not None and not cached_df.empty:
|
||||
# Combine cached and real-time
|
||||
# Remove overlapping candles from cached data
|
||||
if not rt_df.empty:
|
||||
cutoff_time = rt_df['timestamp'].min()
|
||||
historical_df = historical_df[historical_df['timestamp'] < cutoff_time]
|
||||
cached_df = cached_df[cached_df.index < cutoff_time]
|
||||
|
||||
# Concatenate
|
||||
combined_df = pd.concat([historical_df, rt_df], ignore_index=True)
|
||||
combined_df = pd.concat([cached_df, rt_df], ignore_index=True)
|
||||
else:
|
||||
combined_df = rt_df
|
||||
|
||||
return combined_df.tail(limit)
|
||||
|
||||
# Return just historical data if no real-time data
|
||||
return historical_df.tail(limit) if historical_df is not None else pd.DataFrame()
|
||||
# Return just cached data if no real-time data
|
||||
return cached_df.tail(limit) if cached_df is not None else pd.DataFrame()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}")
|
||||
return pd.DataFrame()
|
||||
|
||||
def get_current_price(self, symbol: str) -> Optional[float]:
|
||||
"""Get current price for a symbol from latest candle"""
|
||||
"""Get current price for a symbol from cached data"""
|
||||
try:
|
||||
# Try to get from 1s candle first (most recent)
|
||||
for tf in ['1s', '1m', '5m', '1h']:
|
||||
df = self.get_latest_candles(symbol, tf, limit=1)
|
||||
if df is not None and not df.empty:
|
||||
return float(df.iloc[-1]['close'])
|
||||
for tf in ['1s', '1m', '1h', '1d']:
|
||||
if symbol in self.cached_data and tf in self.cached_data[symbol]:
|
||||
df = self.cached_data[symbol][tf]
|
||||
if not df.empty:
|
||||
return float(df.iloc[-1]['close'])
|
||||
|
||||
# Fallback to any available data
|
||||
key = f"{symbol}_{self.timeframes[0]}"
|
||||
if key in self.historical_data and not self.historical_data[key].empty:
|
||||
return float(self.historical_data[key].iloc[-1]['close'])
|
||||
|
||||
logger.warning(f"No price data available for {symbol}")
|
||||
logger.warning(f"No cached price data available for {symbol}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
@ -2199,12 +2132,11 @@ class DataProvider:
|
||||
return []
|
||||
|
||||
def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]:
|
||||
"""Get price at specific index for backtesting"""
|
||||
"""Get price at specific index for backtesting from cached data"""
|
||||
try:
|
||||
key = f"{symbol}_{timeframe}"
|
||||
if key in self.historical_data:
|
||||
df = self.historical_data[key]
|
||||
if 0 <= index < len(df):
|
||||
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
|
||||
df = self.cached_data[symbol][timeframe]
|
||||
if not df.empty and 0 <= index < len(df):
|
||||
return float(df.iloc[index]['close'])
|
||||
return None
|
||||
except Exception as e:
|
||||
@ -2231,10 +2163,16 @@ class DataProvider:
|
||||
timeframe_features = {}
|
||||
for tf in timeframes:
|
||||
logger.debug(f"Processing timeframe {tf} for {symbol}")
|
||||
df = self.get_latest_candles(symbol, tf, limit=window_size + 100)
|
||||
|
||||
if df is None or len(df) < window_size:
|
||||
logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows")
|
||||
# Use cached data directly
|
||||
if symbol in self.cached_data and tf in self.cached_data[symbol]:
|
||||
df = self.cached_data[symbol][tf]
|
||||
if not df.empty and len(df) >= window_size:
|
||||
df = df.tail(window_size + 100) # Get enough data for indicators
|
||||
else:
|
||||
logger.warning(f"Insufficient cached data for {symbol} {tf}: {len(df) if not df.empty else 0} rows")
|
||||
continue
|
||||
else:
|
||||
logger.warning(f"No cached data for {symbol} {tf}")
|
||||
continue
|
||||
|
||||
# Get feature columns
|
||||
@ -2502,23 +2440,62 @@ class DataProvider:
|
||||
"""Get health status of the data provider"""
|
||||
status = {
|
||||
'streaming': self.is_streaming,
|
||||
'data_maintenance_active': self.data_maintenance_active,
|
||||
'symbols': len(self.symbols),
|
||||
'timeframes': len(self.timeframes),
|
||||
'current_prices': len(self.current_prices),
|
||||
'websocket_tasks': len(self.websocket_tasks),
|
||||
'historical_data_loaded': {}
|
||||
'cached_data_loaded': {}
|
||||
}
|
||||
|
||||
# Check historical data availability
|
||||
# Check cached data availability
|
||||
for symbol in self.symbols:
|
||||
status['historical_data_loaded'][symbol] = {}
|
||||
status['cached_data_loaded'][symbol] = {}
|
||||
for tf in self.timeframes:
|
||||
has_data = (symbol in self.historical_data and
|
||||
tf in self.historical_data[symbol] and
|
||||
not self.historical_data[symbol][tf].empty)
|
||||
status['historical_data_loaded'][symbol][tf] = has_data
|
||||
has_data = (symbol in self.cached_data and
|
||||
tf in self.cached_data[symbol] and
|
||||
not self.cached_data[symbol][tf].empty)
|
||||
candle_count = len(self.cached_data[symbol][tf]) if has_data else 0
|
||||
status['cached_data_loaded'][symbol][tf] = {
|
||||
'has_data': has_data,
|
||||
'candle_count': candle_count
|
||||
}
|
||||
|
||||
return status
|
||||
return status
|
||||
|
||||
def get_cached_data_summary(self) -> Dict[str, Any]:
|
||||
"""Get summary of cached data"""
|
||||
summary = {
|
||||
'symbols': self.symbols,
|
||||
'timeframes': self.timeframes,
|
||||
'data_maintenance_active': self.data_maintenance_active,
|
||||
'cached_data': {}
|
||||
}
|
||||
|
||||
for symbol in self.symbols:
|
||||
summary['cached_data'][symbol] = {}
|
||||
for timeframe in self.timeframes:
|
||||
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
|
||||
df = self.cached_data[symbol][timeframe]
|
||||
if not df.empty:
|
||||
summary['cached_data'][symbol][timeframe] = {
|
||||
'candle_count': len(df),
|
||||
'start_time': df.index[0].isoformat() if hasattr(df.index[0], 'isoformat') else str(df.index[0]),
|
||||
'end_time': df.index[-1].isoformat() if hasattr(df.index[-1], 'isoformat') else str(df.index[-1]),
|
||||
'latest_price': float(df.iloc[-1]['close'])
|
||||
}
|
||||
else:
|
||||
summary['cached_data'][symbol][timeframe] = {
|
||||
'candle_count': 0,
|
||||
'status': 'empty'
|
||||
}
|
||||
else:
|
||||
summary['cached_data'][symbol][timeframe] = {
|
||||
'candle_count': 0,
|
||||
'status': 'not_initialized'
|
||||
}
|
||||
|
||||
return summary
|
||||
|
||||
def subscribe_to_ticks(self, callback: Callable[[MarketTick], None],
|
||||
symbols: List[str] = None,
|
||||
|
60
test_simplified_data_provider.py
Normal file
60
test_simplified_data_provider.py
Normal file
@ -0,0 +1,60 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for the simplified data provider
|
||||
"""
|
||||
|
||||
import time
|
||||
import logging
|
||||
from core.data_provider import DataProvider
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def test_data_provider():
|
||||
"""Test the simplified data provider"""
|
||||
logger.info("Testing simplified data provider...")
|
||||
|
||||
# Initialize data provider
|
||||
dp = DataProvider()
|
||||
|
||||
# Wait for initial data load
|
||||
logger.info("Waiting for initial data load...")
|
||||
time.sleep(10)
|
||||
|
||||
# Check health
|
||||
health = dp.health_check()
|
||||
logger.info(f"Health check: {health}")
|
||||
|
||||
# Get cached data summary
|
||||
summary = dp.get_cached_data_summary()
|
||||
logger.info(f"Cached data summary: {summary}")
|
||||
|
||||
# Test getting historical data (should be from cache only)
|
||||
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||
for timeframe in ['1s', '1m', '1h', '1d']:
|
||||
data = dp.get_historical_data(symbol, timeframe, limit=10)
|
||||
if data is not None and not data.empty:
|
||||
logger.info(f"{symbol} {timeframe}: {len(data)} candles, latest price: {data.iloc[-1]['close']}")
|
||||
else:
|
||||
logger.warning(f"{symbol} {timeframe}: No data available")
|
||||
|
||||
# Test current prices
|
||||
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||
price = dp.get_current_price(symbol)
|
||||
logger.info(f"Current price for {symbol}: {price}")
|
||||
|
||||
# Wait and check if data is being updated
|
||||
logger.info("Waiting 30 seconds to check data updates...")
|
||||
time.sleep(30)
|
||||
|
||||
# Check data again
|
||||
summary2 = dp.get_cached_data_summary()
|
||||
logger.info(f"Updated cached data summary: {summary2}")
|
||||
|
||||
# Stop data maintenance
|
||||
dp.stop_automatic_data_maintenance()
|
||||
logger.info("Test completed")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_data_provider()
|
Reference in New Issue
Block a user