From aec536d0077e291f05039caf551ec4820db74f48 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 2 Apr 2025 11:44:21 +0300 Subject: [PATCH] charts working with cahe historical data --- realtime.py | 254 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 165 insertions(+), 89 deletions(-) diff --git a/realtime.py b/realtime.py index d3ec3cb..749b472 100644 --- a/realtime.py +++ b/realtime.py @@ -134,8 +134,18 @@ class BinanceHistoricalData: try: cache_file = self._get_cache_filename(symbol, interval) if os.path.exists(cache_file): + # For 1s interval, check if the cache is recent (less than 10 minutes old) + if interval == "1s" or interval == 1: + file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) + time_diff = (datetime.now() - file_mod_time).total_seconds() / 60 + if time_diff > 10: + logger.info("1s cache is older than 10 minutes, skipping load") + return None + logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)") + df = pd.read_csv(cache_file) df["timestamp"] = pd.to_datetime(df["timestamp"]) + logger.info(f"Loaded {len(df)} candles from cache: {cache_file}") return df except Exception as e: logger.error(f"Error loading cached data: {str(e)}") @@ -411,6 +421,12 @@ class TickStorage: # Update all timeframe candles self._update_all_candles(tick) + # Verify 1s candles are being updated - periodically log for monitoring + if len(self.ticks) % 100 == 0: + logger.debug(f"Tick count: {len(self.ticks)}, 1s candles count: {len(self.candles['1s'])}") + + return tick + def get_latest_price(self): """Get the latest price""" return self.latest_price @@ -455,6 +471,10 @@ class TickStorage: current_candle['close'] = tick['price'] current_candle['volume'] += tick['volume'] + # For 1s timeframe specifically, log a debug message to confirm updates are occurring + if interval_key == '1s': + logger.debug(f"Updated 1s candle at {current_candle['timestamp']} with price {tick['price']}") + # Limit the number of candles to keep for each timeframe # Keep more candles for shorter timeframes, fewer for longer ones max_candles = { @@ -495,6 +515,10 @@ class TickStorage: 'volume': 0 } + # Log when we're creating a new 1s candle for debugging + if interval_key == '1s': + logger.debug(f"Creating new 1s candle at {candle_start}") + self.candles[interval_key].append(candle) return candle @@ -602,12 +626,17 @@ class TickStorage: def load_historical_data(self, historical_data, symbol): """Load historical data for all timeframes""" try: - # Load data for different timeframes + # Clear any existing 1s candles to prevent using old cached data + self.candles['1s'] = [] + + # Clear tick data to ensure we start with an empty collection + self.ticks = [] + + # Load data for different timeframes (without 1s - we'll handle it separately) timeframes = [ - (1, '1s'), # 1 second - limit to 20 minutes (1200 seconds) + (60, '1m'), # 1 minute (5, '5s'), # 5 seconds (15, '15s'), # 15 seconds - (60, '1m'), # 1 minute (300, '5m'), # 5 minutes (900, '15m'), # 15 minutes (3600, '1h'), # 1 hour @@ -615,12 +644,35 @@ class TickStorage: (86400, '1d') # 1 day ] + # For 1s, we only load from cache if available (handled in _load_from_cache method) + # The _load_from_cache method will check if cache is no more than 10 minutes old + df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Try to get 1s data from cache + if df_1s is not None and not df_1s.empty: + logger.info(f"Loaded {len(df_1s)} recent 1s candles from cache") + + # Convert to our candle format and store + candles_1s = [] + for _, row in df_1s.iterrows(): + candle = { + 'timestamp': row['timestamp'], + 'open': row['open'], + 'high': row['high'], + 'low': row['low'], + 'close': row['close'], + 'volume': row['volume'] + } + candles_1s.append(candle) + + # Add the 1s candles to our candles storage + self.candles['1s'] = candles_1s + else: + logger.info("No recent 1s cache available, starting with empty 1s data") + + # Load the remaining timeframes normally for interval_seconds, interval_key in timeframes: # Set appropriate limits based on timeframe limit = 1000 # Default - if interval_seconds == 1: - limit = 1200 # 1s data - limit to 20 minutes as requested - elif interval_seconds < 60: + if interval_seconds < 60: limit = 500 # For seconds-level data elif interval_seconds < 300: limit = 1000 # 1m @@ -632,81 +684,32 @@ class TickStorage: limit = 200 # hourly/daily data try: - # For 1s data, we might need to generate it from 1m data - if interval_seconds == 1: - # Get 1m data first - df_1m = historical_data.get_historical_candles(symbol, 60, 60) # Get 60 minutes of 1m data - if df_1m is not None and not df_1m.empty: - # Create simulated 1s data from 1m data - simulated_1s = [] - for _, row in df_1m.iterrows(): - # For each 1m candle, create 60 1s candles - start_time = row['timestamp'] - for i in range(60): - # Calculate second-level timestamp - second_time = start_time + timedelta(seconds=i) - - # Create candle with random price movement around close price - close_price = row['close'] - price_range = (row['high'] - row['low']) / 60 # Reduced range - - # Interpolate price - gradual movement from open to close - progress = i / 60 - interp_price = row['open'] + (row['close'] - row['open']) * progress - - # Add some small random movement - random_factor = np.random.normal(0, price_range * 0.5) - s_price = max(0, interp_price + random_factor) - - # Create 1s candle - s_candle = { - 'timestamp': second_time, - 'open': s_price, - 'high': s_price * 1.0001, # Tiny movement - 'low': s_price * 0.9999, # Tiny movement - 'close': s_price, - 'volume': row['volume'] / 60 # Distribute volume - } - simulated_1s.append(s_candle) - - # Add the simulated 1s candles to our candles storage - self.candles['1s'] = simulated_1s - logger.info(f"Generated {len(simulated_1s)} simulated 1s candles for {symbol}") - else: - # Load normal historical data - df = historical_data.get_historical_candles(symbol, interval_seconds, limit) - if df is not None and not df.empty: - logger.info(f"Loaded {len(df)} historical candles for {symbol} ({interval_key})") - - # Convert to our candle format and store - candles = [] - for _, row in df.iterrows(): - candle = { - 'timestamp': row['timestamp'], - 'open': row['open'], - 'high': row['high'], - 'low': row['low'], - 'close': row['close'], - 'volume': row['volume'] - } - candles.append(candle) - - # Set the candles for this timeframe - self.candles[interval_key] = candles - - # For 1m data, also use it to generate tick data - if interval_key == '1m': - for candle in candles[-20:]: # Use only the last 20 candles for tick data - self.add_tick( - price=candle['close'], - volume=candle['volume'] / 10, # Distribute volume - timestamp=candle['timestamp'] - ) - - # Set latest price from most recent candle - if candles: - self.latest_price = candles[-1]['close'] - logger.info(f"Set latest price to ${self.latest_price:.2f} from historical data") + # Load normal historical data + df = historical_data.get_historical_candles(symbol, interval_seconds, limit) + if df is not None and not df.empty: + logger.info(f"Loaded {len(df)} historical candles for {symbol} ({interval_key})") + + # Convert to our candle format and store + candles = [] + for _, row in df.iterrows(): + candle = { + 'timestamp': row['timestamp'], + 'open': row['open'], + 'high': row['high'], + 'low': row['low'], + 'close': row['close'], + 'volume': row['volume'] + } + candles.append(candle) + + # Set the candles for this timeframe + self.candles[interval_key] = candles + + # No longer load 1m data into ticks collection, as this persists the problem + # Just store the latest price from the most recent candle for reference + if interval_key == '1m' and candles: + self.latest_price = candles[-1]['close'] + logger.info(f"Set latest price to ${self.latest_price:.2f} from historical data without adding to ticks") except Exception as e: logger.error(f"Error loading {interval_key} data: {e}") @@ -767,8 +770,11 @@ class RealTimeChart: self.latest_price = None self.latest_volume = None self.latest_timestamp = None - self.positions = [] # List to store positions - self.accumulative_pnl = 0.0 # Track total PnL + + # Initialize with empty positions list to prevent old trade actions from affecting chart resizing + # We MUST start with a clean state for each new chart instance + self.positions = [] # Empty positions list - CRITICAL for proper chart resizing + self.accumulative_pnl = 0.0 # Reset PnL self.current_balance = 100.0 # Start with $100 balance # Store historical data for different timeframes @@ -992,8 +998,8 @@ class RealTimeChart: interval_key = self._get_interval_key(interval) # Make sure we have data for this interval - if interval_key not in self.tick_storage.candles or not self.tick_storage.candles[interval_key]: - logger.warning(f"No candle data available for {interval_key}") + if interval_key not in self.tick_storage.candles: + logger.warning(f"No candle data structure available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( @@ -1003,6 +1009,26 @@ class RealTimeChart: ) fig.update_layout(title=f"{self.symbol} - {interval_key}") return fig + + # For 1s specifically, log more debug info + if interval_key == '1s': + logger.info(f"1s candles count: {len(self.tick_storage.candles[interval_key])}") + logger.info(f"Ticks count: {len(self.tick_storage.ticks)}") + if not self.tick_storage.candles[interval_key]: + logger.warning("No 1s candles available - this may indicate the WebSocket isn't sending data, or candles aren't being created") + + # Check if we have any candles for this interval + if not self.tick_storage.candles[interval_key]: + logger.warning(f"No candle data available for {interval_key}") + # Return empty figure with a message + fig = go.Figure() + fig.add_annotation( + text=f"No data available for {interval_key}. Waiting for real-time data...", + xref="paper", yref="paper", + x=0.5, y=0.5, showarrow=False + ) + fig.update_layout(title=f"{self.symbol} - {interval_key} (waiting for data)") + return fig # For rendering, limit to the last 500 candles for performance candles = self.tick_storage.candles[interval_key][-500:] @@ -1047,6 +1073,7 @@ class RealTimeChart: marker_color='rgba(100, 100, 255, 0.5)' ), row=2, col=1) + # Add trading markers if available if hasattr(self, 'positions') and self.positions: # Get last 100 positions for display (to avoid too many markers) @@ -1095,6 +1122,8 @@ class RealTimeChart: ) ), row=1, col=1) + + # Update layout fig.update_layout( title=f"{self.symbol} - {interval_key}", @@ -1348,18 +1377,51 @@ class RealTimeChart: else: return f"{interval_seconds // 86400}d" + def _update_chart_and_positions(self): + """Update the chart with current data and positions""" + try: + # Force an update of the charts + self._update_main_chart(1) # Update 1s chart by default + self._update_secondary_charts() + logger.debug("Updated charts and positions") + return True + except Exception as e: + logger.error(f"Error updating chart and positions: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return False + async def start_websocket(self): """Start the websocket connection for real-time data""" try: + # Step 1: Clear everything related to positions FIRST, before any other operations + logger.info(f"Initializing fresh chart for {self.symbol} - clearing all previous positions") + self.positions = [] # Clear positions list + self.accumulative_pnl = 0.0 # Reset accumulated PnL + self.current_balance = 100.0 # Reset balance + + # Step 2: Clear any previous tick data to avoid using stale data from previous training sessions + self.tick_storage.ticks = [] + + # Step 3: Clear any previous 1s candles before loading historical data + self.tick_storage.candles['1s'] = [] + + logger.info("Initialized empty 1s candles, tick collection, and positions for fresh data") + # Load historical data first to ensure we have candles for all timeframes logger.info(f"Loading historical data for {self.symbol}") # Initialize a BinanceHistoricalData instance historical_data = BinanceHistoricalData() - # Load historical data for display + # Load historical data for all timeframes (1s will load from cache if recent, otherwise empty) self.tick_storage.load_historical_data(historical_data, self.symbol) + # Double check that we have the 1s timeframe initialized + if '1s' not in self.tick_storage.candles: + self.tick_storage.candles['1s'] = [] + logger.info(f"After loading historical data, 1s candles count: {len(self.tick_storage.candles['1s'])}") + # Make sure we update the charts once with historical data before websocket starts # Update all the charts with the initial historical data self._update_chart_and_positions() @@ -1370,6 +1432,10 @@ class RealTimeChart: logger.info(f"WebSocket connected for {self.symbol}") + # Counter for received ticks + tick_count = 0 + last_update_time = time.time() + # Start receiving data while self.websocket.running: try: @@ -1377,6 +1443,8 @@ class RealTimeChart: if data: # Process the received data if 'price' in data: + tick_count += 1 + # Update tick storage self.tick_storage.add_tick( price=data['price'], @@ -1389,9 +1457,17 @@ class RealTimeChart: self.latest_volume = data.get('volume', 0) self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000) - # Log occasional price updates (every 500 messages) - if hasattr(self.websocket.ws, 'message_count') and self.websocket.ws.message_count % 500 == 0: - logger.info(f"Current {self.symbol} price: ${self.latest_price:.2f}") + # Force chart update every 5 seconds + current_time = time.time() + if current_time - last_update_time >= 5.0: + self._update_chart_and_positions() + last_update_time = current_time + logger.debug("Forced chart update after new ticks") + + # Log tick processing for debugging (every 100 ticks) + if tick_count % 100 == 0: + logger.info(f"Processed {tick_count} ticks, current price: ${self.latest_price:.2f}") + logger.info(f"Current 1s candles count: {len(self.tick_storage.candles['1s'])}") except Exception as e: logger.error(f"Error processing websocket data: {str(e)}")