diff --git a/realtime.py b/realtime.py index 60ee7eb..5b20662 100644 --- a/realtime.py +++ b/realtime.py @@ -636,12 +636,15 @@ class CandleCache: def add_candles(self, interval: str, new_candles: pd.DataFrame): if interval in self.candles and not new_candles.empty: + # Convert DataFrame to list of dicts to avoid pandas issues for _, row in new_candles.iterrows(): - self.candles[interval].append(row) + candle_dict = row.to_dict() + self.candles[interval].append(candle_dict) logger.debug(f"Added {len(new_candles)} candles to {interval} cache") def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame: if interval in self.candles and self.candles[interval]: + # Convert deque to list of dicts first recent_candles = list(self.candles[interval])[-count:] return pd.DataFrame(recent_candles) return pd.DataFrame() @@ -660,27 +663,30 @@ class CandleCache: logger.warning(f"No timestamp column in new candles for {interval}") return - last_cached_time = None - if self.candles[interval]: - try: - # Get the timestamp from the last cached candle - last_cached_candle = self.candles[interval][-1] - if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle: - last_cached_time = last_cached_candle['timestamp'] - logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}") - except (IndexError, KeyError) as e: - logger.error(f"Error accessing timestamp from last cached candle: {e}") - try: - # Only filter if we have a valid last_cached_time - if last_cached_time is not None: - filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time] - logger.debug(f"Filtered {len(filtered_candles)} new candles for {interval}") - self.add_candles(interval, filtered_candles) - else: - # If no previous candles, add all - logger.debug(f"No previous candles, adding all {len(new_candles)} candles to {interval} cache") + # If we have no candles in cache, add all new candles + if not self.candles[interval]: + logger.debug(f"No existing candles for {interval}, adding all {len(new_candles)} candles") self.add_candles(interval, new_candles) + return + + # Get the timestamp from the last cached candle + last_cached_candle = self.candles[interval][-1] + if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle: + last_cached_time = last_cached_candle['timestamp'] + logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}") + + # Filter new candles that are after the last cached candle + filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time] + + if not filtered_candles.empty: + logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}") + self.add_candles(interval, filtered_candles) + else: + logger.debug(f"No new candles to add for {interval}") + else: + logger.warning(f"Invalid last cached candle format for {interval}") + except Exception as e: logger.error(f"Error updating cache for {interval}: {str(e)}") import traceback @@ -703,6 +709,7 @@ class RealTimeChart: } self.candle_cache = CandleCache() # Initialize local candle cache self.historical_data = BinanceHistoricalData() # For fetching historical data + self.last_cache_save_time = time.time() # Track last time we saved cache to disk logger.info(f"Initializing RealTimeChart for {symbol}") # Load historical data for longer timeframes at startup @@ -1035,13 +1042,17 @@ class RealTimeChart: price_stats = self.tick_storage.get_price_stats() time_stats = self.tick_storage.get_time_based_stats() + # Periodically save candles to disk + if n % 60 == 0: # Every 60 chart updates (~ every 30 seconds at 500ms interval) + self._save_candles_to_disk() + logger.debug(f"Current price: {current_price}, Stats: {price_stats}") fig = make_subplots( rows=6, cols=1, # Adjusted to accommodate new subcharts - vertical_spacing=0.02, # Reduced for better use of vertical space + vertical_spacing=0.05, # Reduced for better use of vertical space subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1s OHLCV', '1m OHLCV', '1h OHLCV', '1d OHLCV'), - row_heights=[0.5, 0.1, 0.1, 0.1, 0.1, 0.1] # Give more space to main chart + row_heights=[0.3, 0.15, 0.15, 0.15, 0.15, 0.15] # Give more space to main chart ) if not df.empty and len(df) > 0: @@ -1856,44 +1867,134 @@ class RealTimeChart: raise def _load_historical_data(self): - """Load historical data for 1m, 1h, and 1d timeframes from Binance API""" + """Load historical data for all timeframes from Binance API and local cache""" try: logger.info(f"Loading historical data for {self.symbol}...") # Define intervals to fetch intervals = { + '1s': 1, '1m': 60, '1h': 3600, '1d': 86400 } + # First try to load from local cache files for interval_key, interval_seconds in intervals.items(): - # Fetch historical data - historical_df = self.historical_data.get_historical_candles( - symbol=self.symbol, - interval_seconds=interval_seconds, - limit=500 # Get 500 candles - ) + try: + cache_file = os.path.join(self.historical_data.cache_dir, + f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv") + + if os.path.exists(cache_file): + # Check if cache is fresh (less than 1 day old for anything but 1d, 3 days for 1d) + file_age = time.time() - os.path.getmtime(cache_file) + max_age = 259200 if interval_key == '1d' else 86400 # 3 days for 1d, 1 day for others + + if file_age <= max_age: + cached_df = pd.read_csv(cache_file) + if not cached_df.empty: + # Convert timestamp string back to datetime + if 'timestamp' in cached_df.columns: + try: + cached_df['timestamp'] = pd.to_datetime(cached_df['timestamp']) + except: + # If conversion fails, it might already be in the right format + pass + + # Only keep the last 500 candles + if len(cached_df) > 500: + cached_df = cached_df.tail(500) + + # Add to cache + for _, row in cached_df.iterrows(): + candle_dict = row.to_dict() + self.candle_cache.candles[interval_key].append(candle_dict) + + # Update ohlcv_cache + self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key) + logger.info(f"Loaded {len(cached_df)} cached {interval_key} candles from disk") + + # Skip fetching from API if we loaded from cache (except for 1d timeframe which we always refresh) + if interval_key != '1d' and interval_key != '1h': + continue + except Exception as e: + logger.error(f"Error loading cached {interval_key} candles: {str(e)}") + + # For timeframes other than 1s, fetch from API as backup or for fresh data + for interval_key, interval_seconds in intervals.items(): + # Skip 1s for API requests + if interval_key == '1s': + continue + + # Fetch historical data from API + try: + logger.info(f"Fetching {interval_key} candles from API for {self.symbol}") + historical_df = self.historical_data.get_historical_candles( + symbol=self.symbol, + interval_seconds=interval_seconds, + limit=500 # Get 500 candles + ) + + if not historical_df.empty: + logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key} from API") + + # If we already have data in cache, merge with new data to avoid duplicates + if self.ohlcv_cache[interval_key] is not None and not self.ohlcv_cache[interval_key].empty: + existing_df = self.ohlcv_cache[interval_key] + # Get the latest timestamp from existing data + latest_time = existing_df['timestamp'].max() + # Only keep newer records from API + new_candles = historical_df[historical_df['timestamp'] > latest_time] + if not new_candles.empty: + logger.info(f"Adding {len(new_candles)} new candles to existing {interval_key} cache") + # Add to cache + for _, row in new_candles.iterrows(): + candle_dict = row.to_dict() + self.candle_cache.candles[interval_key].append(candle_dict) + else: + # No existing data, add all from API + for _, row in historical_df.iterrows(): + candle_dict = row.to_dict() + self.candle_cache.candles[interval_key].append(candle_dict) + + # Update ohlcv_cache with combined data + self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key) + logger.info(f"Total {interval_key} candles in cache: {len(self.ohlcv_cache[interval_key])}") + else: + logger.warning(f"No historical data available from API for {self.symbol} {interval_key}") + except Exception as e: + logger.error(f"Error fetching {interval_key} data from API: {str(e)}") - if not historical_df.empty: - logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key}") - # Add to cache - for _, row in historical_df.iterrows(): - # Convert to dict for storage - candle_dict = row.to_dict() - self.candle_cache.candles[interval_key].append(candle_dict) - - # Update ohlcv_cache - self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key) - logger.info(f"Added {len(historical_df)} candles to {interval_key} cache") - else: - logger.warning(f"No historical data available for {self.symbol} {interval_key}") - except Exception as e: - logger.error(f"Error loading historical data: {str(e)}") + logger.error(f"Error in _load_historical_data: {str(e)}") import traceback logger.error(traceback.format_exc()) + def _save_candles_to_disk(self): + """Save current candle cache to disk for persistence between runs""" + try: + # Only save if we have data and sufficient time has passed (every 5 minutes) + current_time = time.time() + if current_time - self.last_cache_save_time < 300: # 5 minutes + return + + # Save each timeframe's candles to disk + for interval_key, candles in self.candle_cache.candles.items(): + if candles: + # Convert to DataFrame + df = pd.DataFrame(list(candles)) + if not df.empty: + # Save to disk in the cache directory + cache_file = os.path.join(self.historical_data.cache_dir, + f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv") + df.to_csv(cache_file, index=False) + logger.info(f"Saved {len(df)} {interval_key} candles to {cache_file}") + + self.last_cache_save_time = current_time + logger.info(f"Saved all candle caches to disk at {datetime.now()}") + except Exception as e: + logger.error(f"Error saving candles to disk: {str(e)}") + async def main(): symbols = ["ETH/USDT", "BTC/USDT"]