From a94b80c1f481e03ae2383fcb37b67dd64a9ace3b Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 27 Jul 2025 17:28:07 +0300 Subject: [PATCH] decouple external API and local data consumption --- core/data_provider.py | 537 +++++++++++++++---------------- test_simplified_data_provider.py | 60 ++++ 2 files changed, 317 insertions(+), 280 deletions(-) create mode 100644 test_simplified_data_provider.py diff --git a/core/data_provider.py b/core/data_provider.py index 24be006..51ac1fc 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -109,19 +109,26 @@ class DataProvider: def __init__(self, symbols: List[str] = None, timeframes: List[str] = None): """Initialize the data provider""" self.config = get_config() - self.symbols = symbols or self.config.symbols - self.timeframes = timeframes or self.config.timeframes + # Fixed symbols and timeframes for caching + self.symbols = ['ETH/USDT', 'BTC/USDT'] + self.timeframes = ['1s', '1m', '1h', '1d'] # Cache settings (initialize first) - self.cache_enabled = self.config.data.get('cache_enabled', True) - self.cache_dir = Path(self.config.data.get('cache_dir', 'cache')) + self.cache_enabled = True + self.cache_dir = Path('cache') self.cache_dir.mkdir(parents=True, exist_ok=True) - # Data storage - self.historical_data = {} # {symbol: {timeframe: DataFrame}} + # Data storage - cached OHLCV data (1500 candles each) + self.cached_data = {} # {symbol: {timeframe: DataFrame}} self.real_time_data = {} # {symbol: {timeframe: deque}} self.current_prices = {} # {symbol: float} + # Initialize cached data structure + for symbol in self.symbols: + self.cached_data[symbol] = {} + for timeframe in self.timeframes: + self.cached_data[symbol][timeframe] = pd.DataFrame() + # Pivot-based normalization system self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds} self.pivot_cache_dir = self.cache_dir / 'pivot_bounds' @@ -224,12 +231,6 @@ class DataProvider: self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer - # Pre-built OHLCV cache for instant BaseDataInput building (optimization from SimplifiedDataIntegration) - self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}} - self._ohlcv_cache_lock = Lock() - self._last_cache_update = {} # {symbol: {timeframe: datetime}} - self._cache_refresh_interval = 5 # seconds - # Data collection threads self.data_collection_active = False @@ -246,8 +247,21 @@ class DataProvider: self.bucket_sizes = [1, 10] # $1 and $10 buckets self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes} + # Automatic data maintenance + self.data_maintenance_active = False + self.data_maintenance_thread = None + + # Timeframe intervals in seconds for automatic updates + self.timeframe_intervals = { + '1s': 1, + '1m': 60, + '1h': 3600, + '1d': 86400 + } + logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") + logger.info("Automatic data maintenance enabled") logger.info("Centralized data distribution enabled") logger.info("Pivot-based normalization system enabled") logger.info("Williams Market Structure integration enabled") @@ -255,13 +269,134 @@ class DataProvider: # Rate limiting self.last_request_time = {} - self.request_interval = 0.2 # 200ms between requests + self.request_interval = 0.5 # 500ms between requests to avoid rate limits self.retry_delay = 60 # 1 minute retry delay for 451 errors self.max_retries = 3 + # Start automatic data maintenance + self.start_automatic_data_maintenance() + # Start COB integration self.start_cob_integration() + def start_automatic_data_maintenance(self): + """Start automatic data maintenance system""" + if self.data_maintenance_active: + logger.warning("Data maintenance already active") + return + + self.data_maintenance_active = True + self.data_maintenance_thread = Thread(target=self._data_maintenance_worker, daemon=True) + self.data_maintenance_thread.start() + logger.info("Automatic data maintenance started") + + def stop_automatic_data_maintenance(self): + """Stop automatic data maintenance system""" + self.data_maintenance_active = False + if self.data_maintenance_thread and self.data_maintenance_thread.is_alive(): + self.data_maintenance_thread.join(timeout=5) + logger.info("Automatic data maintenance stopped") + + def _data_maintenance_worker(self): + """Worker thread for automatic data maintenance""" + logger.info("Data maintenance worker started") + + # Initial data load + self._initial_data_load() + + # Track last update times for each symbol/timeframe + last_updates = {} + for symbol in self.symbols: + last_updates[symbol] = {} + for timeframe in self.timeframes: + last_updates[symbol][timeframe] = 0 + + while self.data_maintenance_active: + try: + current_time = time.time() + + # Check each symbol/timeframe for updates + for symbol in self.symbols: + for timeframe in self.timeframes: + interval = self.timeframe_intervals[timeframe] + half_interval = interval / 2 + + # Update every half candle period + if current_time - last_updates[symbol][timeframe] >= half_interval: + self._update_cached_data(symbol, timeframe) + last_updates[symbol][timeframe] = current_time + + # Sleep for 1 second before next check + time.sleep(1) + + except Exception as e: + logger.error(f"Error in data maintenance worker: {e}") + time.sleep(10) # Wait longer on error + + def _initial_data_load(self): + """Load initial 1500 candles for each symbol/timeframe""" + logger.info("Starting initial data load (1500 candles each)") + + for symbol in self.symbols: + for timeframe in self.timeframes: + try: + logger.info(f"Loading initial data for {symbol} {timeframe}") + df = self._fetch_from_binance(symbol, timeframe, 1500) + + if df is None or df.empty: + logger.warning(f"Binance failed for {symbol} {timeframe}, trying MEXC") + df = self._fetch_from_mexc(symbol, timeframe, 1500) + + if df is not None and not df.empty: + # Ensure proper datetime index + df = self._ensure_datetime_index(df) + + # Store in cached data + self.cached_data[symbol][timeframe] = df + logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}") + else: + logger.error(f"Failed to load initial data for {symbol} {timeframe}") + + # Rate limiting between requests + time.sleep(0.5) + + except Exception as e: + logger.error(f"Error loading initial data for {symbol} {timeframe}: {e}") + + logger.info("Initial data load completed") + + def _update_cached_data(self, symbol: str, timeframe: str): + """Update cached data by fetching last 2 candles""" + try: + # Fetch last 2 candles + df = self._fetch_from_binance(symbol, timeframe, 2) + + if df is None or df.empty: + df = self._fetch_from_mexc(symbol, timeframe, 2) + + if df is not None and not df.empty: + # Ensure proper datetime index + df = self._ensure_datetime_index(df) + + # Get existing cached data + existing_df = self.cached_data[symbol][timeframe] + + if not existing_df.empty: + # Merge new data with existing, avoiding duplicates + combined_df = pd.concat([existing_df, df], ignore_index=False) + combined_df = combined_df[~combined_df.index.duplicated(keep='last')] + combined_df = combined_df.sort_index() + + # Keep only last 1500 candles + self.cached_data[symbol][timeframe] = combined_df.tail(1500) + else: + self.cached_data[symbol][timeframe] = df + + logger.debug(f"Updated cached data for {symbol} {timeframe}: {len(self.cached_data[symbol][timeframe])} candles") + + except Exception as e: + logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}") + def start_cob_integration(self): """Starts the COB integration in a background thread.""" cob_thread = Thread(target=self._run_cob_integration, daemon=True) @@ -306,182 +441,23 @@ class DataProvider: return df def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]: - """Get historical OHLCV data for a symbol and timeframe""" + """Get historical OHLCV data from cache only - no external API calls""" try: - # If refresh=True, always fetch fresh data (skip cache for real-time updates) - if not refresh: - if self.cache_enabled: - cached_data = self._load_from_cache(symbol, timeframe) - if cached_data is not None and len(cached_data) >= limit * 0.8: - # Ensure proper datetime index for cached data - cached_data = self._ensure_datetime_index(cached_data) - # logger.info(f"Using cached data for {symbol} {timeframe}") - return cached_data.tail(limit) - - # Check if we need to preload 300s of data for first load - should_preload = self._should_preload_data(symbol, timeframe, limit) + # Only return cached data - never trigger external API calls + if symbol in self.cached_data and timeframe in self.cached_data[symbol]: + cached_df = self.cached_data[symbol][timeframe] + if not cached_df.empty: + # Return requested amount from cached data + return cached_df.tail(limit) - if should_preload: - logger.info(f"Preloading 300s of data for {symbol} {timeframe}") - df = self._preload_300s_data(symbol, timeframe) - else: - # Fetch from API with requested limit (Binance primary, MEXC fallback) - logger.info(f"Fetching historical data for {symbol} {timeframe}") - df = self._fetch_from_binance(symbol, timeframe, limit) - - # Fallback to MEXC if Binance fails - if df is None or df.empty: - logger.info(f"Binance failed, trying MEXC fallback for {symbol}") - df = self._fetch_from_mexc(symbol, timeframe, limit) - - if df is not None and not df.empty: - # Ensure proper datetime index - df = self._ensure_datetime_index(df) - - # Add technical indicators. temporarily disabled to save time as it is not working as expected. - # df = self._add_technical_indicators(df) - - # Cache the data - if self.cache_enabled: - self._save_to_cache(df, symbol, timeframe) - - # Store in memory - if symbol not in self.historical_data: - self.historical_data[symbol] = {} - self.historical_data[symbol][timeframe] = df - - # Return requested amount - return df.tail(limit) - - logger.warning(f"No data received for {symbol} {timeframe}") + logger.warning(f"No cached data available for {symbol} {timeframe}") return None except Exception as e: - logger.error(f"Error fetching historical data for {symbol} {timeframe}: {e}") + logger.error(f"Error getting cached data for {symbol} {timeframe}: {e}") return None - def _should_preload_data(self, symbol: str, timeframe: str, limit: int) -> bool: - """Determine if we should preload 300s of data""" - try: - # Check if we have any cached data - if self.cache_enabled: - cached_data = self._load_from_cache(symbol, timeframe) - if cached_data is not None and len(cached_data) > 0: - return False # Already have some data - - # Check if we have data in memory - if (symbol in self.historical_data and - timeframe in self.historical_data[symbol] and - len(self.historical_data[symbol][timeframe]) > 0): - return False # Already have data in memory - - # Calculate if 300s worth of data would be more than requested limit - timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) - candles_in_300s = 300 // timeframe_seconds - - # Preload if we need more than the requested limit or if it's a short timeframe - if candles_in_300s > limit or timeframe in ['1s', '1m']: - return True - - return False - - except Exception as e: - logger.error(f"Error determining if should preload data: {e}") - return False - - def _preload_300s_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]: - """Preload 300 seconds worth of data for better initial performance""" - try: - # Calculate how many candles we need for 300 seconds - timeframe_seconds = self.timeframe_seconds.get(timeframe, 60) - candles_needed = max(300 // timeframe_seconds, 100) # At least 100 candles - - # For very short timeframes, limit to reasonable amount - if timeframe == '1s': - candles_needed = min(candles_needed, 300) # Max 300 1s candles - elif timeframe == '1m': - candles_needed = min(candles_needed, 60) # Max 60 1m candles (1 hour) - else: - candles_needed = min(candles_needed, 500) # Max 500 candles for other timeframes - - logger.info(f"Preloading {candles_needed} candles for {symbol} {timeframe} (300s worth)") - - # Fetch the data (Binance primary, MEXC fallback) - df = self._fetch_from_binance(symbol, timeframe, candles_needed) - - # Fallback to MEXC if Binance fails - if df is None or df.empty: - logger.info(f"Binance failed, trying MEXC fallback for preload {symbol}") - df = self._fetch_from_mexc(symbol, timeframe, candles_needed) - - if df is not None and not df.empty: - logger.info(f"Successfully preloaded {len(df)} candles for {symbol} {timeframe}") - return df - else: - logger.warning(f"Failed to preload data for {symbol} {timeframe}") - return None - - except Exception as e: - logger.error(f"Error preloading 300s data for {symbol} {timeframe}: {e}") - return None - - def preload_all_symbols_data(self, timeframes: List[str] = None) -> Dict[str, Dict[str, bool]]: - """Preload 300s of data for all symbols and timeframes""" - try: - if timeframes is None: - timeframes = self.timeframes - - preload_results = {} - - for symbol in self.symbols: - preload_results[symbol] = {} - - for timeframe in timeframes: - try: - logger.info(f"Preloading data for {symbol} {timeframe}") - - # Check if we should preload - if self._should_preload_data(symbol, timeframe, 100): - df = self._preload_300s_data(symbol, timeframe) - - if df is not None and not df.empty: - # Add technical indicators - df = self._add_technical_indicators(df) - - # Cache the data - if self.cache_enabled: - self._save_to_cache(df, symbol, timeframe) - - # Store in memory - if symbol not in self.historical_data: - self.historical_data[symbol] = {} - self.historical_data[symbol][timeframe] = df - - preload_results[symbol][timeframe] = True - logger.info(f"OK: Preloaded {len(df)} candles for {symbol} {timeframe}") - else: - preload_results[symbol][timeframe] = False - logger.warning(f"FAIL: Failed to preload {symbol} {timeframe}") - else: - preload_results[symbol][timeframe] = True # Already have data - logger.info(f"SKIP: Skipped preloading {symbol} {timeframe} (already have data)") - - except Exception as e: - logger.error(f"Error preloading {symbol} {timeframe}: {e}") - preload_results[symbol][timeframe] = False - - # Log summary - total_pairs = len(self.symbols) * len(timeframes) - successful_pairs = sum(1 for symbol_results in preload_results.values() - for success in symbol_results.values() if success) - - logger.info(f"Preloading completed: {successful_pairs}/{total_pairs} symbol-timeframe pairs loaded") - - return preload_results - - except Exception as e: - logger.error(f"Error in preload_all_symbols_data: {e}") - return {} + def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]: """Fetch data from MEXC API (fallback data source when Binance is unavailable)""" @@ -1445,60 +1421,33 @@ class DataProvider: return None def _get_cached_ohlcv_bars(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']: - """Get OHLCV data list from pre-built cache for instant access""" - try: - with self._ohlcv_cache_lock: - cache_key = f"{symbol}_{timeframe}" - - # Check if we have fresh cached data (updated within last 5 seconds) - last_update = self._last_cache_update.get(cache_key) - if (last_update and - (datetime.now() - last_update).total_seconds() < self._cache_refresh_interval and - cache_key in self._ohlcv_cache): - - cached_data = self._ohlcv_cache[cache_key] - return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data - - # Need to rebuild cache for this symbol/timeframe - data_list = self._build_ohlcv_bar_cache(symbol, timeframe, max_count) - - # Cache the result - self._ohlcv_cache[cache_key] = data_list - self._last_cache_update[cache_key] = datetime.now() - - return data_list[-max_count:] if len(data_list) >= max_count else data_list - - except Exception as e: - logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}") - return [] - - def _build_ohlcv_bar_cache(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']: - """Build OHLCV bar cache from historical and current data""" + """Get OHLCV data list from cached data""" try: from .data_models import OHLCVBar data_list = [] - # Get historical data first (this should be fast as it's already cached) - historical_df = self.get_historical_data(symbol, timeframe, limit=max_count) - if historical_df is not None and not historical_df.empty: - # Convert historical data to OHLCVBar objects - for idx, row in historical_df.tail(max_count).iterrows(): - bar = OHLCVBar( - symbol=symbol, - timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(), - open=float(row['open']), - high=float(row['high']), - low=float(row['low']), - close=float(row['close']), - volume=float(row['volume']), - timeframe=timeframe - ) - data_list.append(bar) + # Get cached data + if symbol in self.cached_data and timeframe in self.cached_data[symbol]: + cached_df = self.cached_data[symbol][timeframe] + if not cached_df.empty: + # Convert cached data to OHLCVBar objects + for idx, row in cached_df.tail(max_count).iterrows(): + bar = OHLCVBar( + symbol=symbol, + timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(), + open=float(row['open']), + high=float(row['high']), + low=float(row['low']), + close=float(row['close']), + volume=float(row['volume']), + timeframe=timeframe + ) + data_list.append(bar) return data_list except Exception as e: - logger.error(f"Error building OHLCV bar cache for {symbol}/{timeframe}: {e}") + logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}") return [] def _get_latest_technical_indicators(self, symbol: str) -> Dict[str, float]: @@ -1548,19 +1497,7 @@ class DataProvider: logger.error(f"Error getting COB data object for {symbol}: {e}") return None - def invalidate_ohlcv_cache(self, symbol: str): - """Invalidate OHLCV cache for a symbol when new data arrives""" - try: - with self._ohlcv_cache_lock: - # Remove cached data for all timeframes of this symbol - keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")] - for key in keys_to_remove: - if key in self._ohlcv_cache: - del self._ohlcv_cache[key] - if key in self._last_cache_update: - del self._last_cache_update[key] - except Exception as e: - logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}") + def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Add basic indicators for small datasets""" @@ -1997,12 +1934,12 @@ class DataProvider: logger.error(f"Error updating candle for {symbol} {timeframe}: {e}") def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame: - """Get the latest candles combining historical and real-time data""" + """Get the latest candles from cached data only""" try: - # Get historical data - historical_df = self.get_historical_data(symbol, timeframe, limit=limit) + # Get cached data + cached_df = self.get_historical_data(symbol, timeframe, limit=limit) - # Get real-time data + # Get real-time data if available with self.data_lock: if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]: real_time_candles = list(self.real_time_data[symbol][timeframe]) @@ -2011,42 +1948,38 @@ class DataProvider: # Convert to DataFrame rt_df = pd.DataFrame(real_time_candles) - if historical_df is not None: - # Combine historical and real-time - # Remove overlapping candles from historical data + if cached_df is not None and not cached_df.empty: + # Combine cached and real-time + # Remove overlapping candles from cached data if not rt_df.empty: cutoff_time = rt_df['timestamp'].min() - historical_df = historical_df[historical_df['timestamp'] < cutoff_time] + cached_df = cached_df[cached_df.index < cutoff_time] # Concatenate - combined_df = pd.concat([historical_df, rt_df], ignore_index=True) + combined_df = pd.concat([cached_df, rt_df], ignore_index=True) else: combined_df = rt_df return combined_df.tail(limit) - # Return just historical data if no real-time data - return historical_df.tail(limit) if historical_df is not None else pd.DataFrame() + # Return just cached data if no real-time data + return cached_df.tail(limit) if cached_df is not None else pd.DataFrame() except Exception as e: logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}") return pd.DataFrame() def get_current_price(self, symbol: str) -> Optional[float]: - """Get current price for a symbol from latest candle""" + """Get current price for a symbol from cached data""" try: # Try to get from 1s candle first (most recent) - for tf in ['1s', '1m', '5m', '1h']: - df = self.get_latest_candles(symbol, tf, limit=1) - if df is not None and not df.empty: - return float(df.iloc[-1]['close']) + for tf in ['1s', '1m', '1h', '1d']: + if symbol in self.cached_data and tf in self.cached_data[symbol]: + df = self.cached_data[symbol][tf] + if not df.empty: + return float(df.iloc[-1]['close']) - # Fallback to any available data - key = f"{symbol}_{self.timeframes[0]}" - if key in self.historical_data and not self.historical_data[key].empty: - return float(self.historical_data[key].iloc[-1]['close']) - - logger.warning(f"No price data available for {symbol}") + logger.warning(f"No cached price data available for {symbol}") return None except Exception as e: @@ -2199,12 +2132,11 @@ class DataProvider: return [] def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]: - """Get price at specific index for backtesting""" + """Get price at specific index for backtesting from cached data""" try: - key = f"{symbol}_{timeframe}" - if key in self.historical_data: - df = self.historical_data[key] - if 0 <= index < len(df): + if symbol in self.cached_data and timeframe in self.cached_data[symbol]: + df = self.cached_data[symbol][timeframe] + if not df.empty and 0 <= index < len(df): return float(df.iloc[index]['close']) return None except Exception as e: @@ -2231,10 +2163,16 @@ class DataProvider: timeframe_features = {} for tf in timeframes: logger.debug(f"Processing timeframe {tf} for {symbol}") - df = self.get_latest_candles(symbol, tf, limit=window_size + 100) - - if df is None or len(df) < window_size: - logger.warning(f"Insufficient data for {symbol} {tf}: {len(df) if df is not None else 0} rows") + # Use cached data directly + if symbol in self.cached_data and tf in self.cached_data[symbol]: + df = self.cached_data[symbol][tf] + if not df.empty and len(df) >= window_size: + df = df.tail(window_size + 100) # Get enough data for indicators + else: + logger.warning(f"Insufficient cached data for {symbol} {tf}: {len(df) if not df.empty else 0} rows") + continue + else: + logger.warning(f"No cached data for {symbol} {tf}") continue # Get feature columns @@ -2502,23 +2440,62 @@ class DataProvider: """Get health status of the data provider""" status = { 'streaming': self.is_streaming, + 'data_maintenance_active': self.data_maintenance_active, 'symbols': len(self.symbols), 'timeframes': len(self.timeframes), 'current_prices': len(self.current_prices), 'websocket_tasks': len(self.websocket_tasks), - 'historical_data_loaded': {} + 'cached_data_loaded': {} } - # Check historical data availability + # Check cached data availability for symbol in self.symbols: - status['historical_data_loaded'][symbol] = {} + status['cached_data_loaded'][symbol] = {} for tf in self.timeframes: - has_data = (symbol in self.historical_data and - tf in self.historical_data[symbol] and - not self.historical_data[symbol][tf].empty) - status['historical_data_loaded'][symbol][tf] = has_data + has_data = (symbol in self.cached_data and + tf in self.cached_data[symbol] and + not self.cached_data[symbol][tf].empty) + candle_count = len(self.cached_data[symbol][tf]) if has_data else 0 + status['cached_data_loaded'][symbol][tf] = { + 'has_data': has_data, + 'candle_count': candle_count + } - return status + return status + + def get_cached_data_summary(self) -> Dict[str, Any]: + """Get summary of cached data""" + summary = { + 'symbols': self.symbols, + 'timeframes': self.timeframes, + 'data_maintenance_active': self.data_maintenance_active, + 'cached_data': {} + } + + for symbol in self.symbols: + summary['cached_data'][symbol] = {} + for timeframe in self.timeframes: + if symbol in self.cached_data and timeframe in self.cached_data[symbol]: + df = self.cached_data[symbol][timeframe] + if not df.empty: + summary['cached_data'][symbol][timeframe] = { + 'candle_count': len(df), + 'start_time': df.index[0].isoformat() if hasattr(df.index[0], 'isoformat') else str(df.index[0]), + 'end_time': df.index[-1].isoformat() if hasattr(df.index[-1], 'isoformat') else str(df.index[-1]), + 'latest_price': float(df.iloc[-1]['close']) + } + else: + summary['cached_data'][symbol][timeframe] = { + 'candle_count': 0, + 'status': 'empty' + } + else: + summary['cached_data'][symbol][timeframe] = { + 'candle_count': 0, + 'status': 'not_initialized' + } + + return summary def subscribe_to_ticks(self, callback: Callable[[MarketTick], None], symbols: List[str] = None, diff --git a/test_simplified_data_provider.py b/test_simplified_data_provider.py new file mode 100644 index 0000000..d7c7488 --- /dev/null +++ b/test_simplified_data_provider.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +Test script for the simplified data provider +""" + +import time +import logging +from core.data_provider import DataProvider + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def test_data_provider(): + """Test the simplified data provider""" + logger.info("Testing simplified data provider...") + + # Initialize data provider + dp = DataProvider() + + # Wait for initial data load + logger.info("Waiting for initial data load...") + time.sleep(10) + + # Check health + health = dp.health_check() + logger.info(f"Health check: {health}") + + # Get cached data summary + summary = dp.get_cached_data_summary() + logger.info(f"Cached data summary: {summary}") + + # Test getting historical data (should be from cache only) + for symbol in ['ETH/USDT', 'BTC/USDT']: + for timeframe in ['1s', '1m', '1h', '1d']: + data = dp.get_historical_data(symbol, timeframe, limit=10) + if data is not None and not data.empty: + logger.info(f"{symbol} {timeframe}: {len(data)} candles, latest price: {data.iloc[-1]['close']}") + else: + logger.warning(f"{symbol} {timeframe}: No data available") + + # Test current prices + for symbol in ['ETH/USDT', 'BTC/USDT']: + price = dp.get_current_price(symbol) + logger.info(f"Current price for {symbol}: {price}") + + # Wait and check if data is being updated + logger.info("Waiting 30 seconds to check data updates...") + time.sleep(30) + + # Check data again + summary2 = dp.get_cached_data_summary() + logger.info(f"Updated cached data summary: {summary2}") + + # Stop data maintenance + dp.stop_automatic_data_maintenance() + logger.info("Test completed") + +if __name__ == "__main__": + test_data_provider() \ No newline at end of file