diff --git a/realtime.py b/realtime.py index 7d55211..89191f3 100644 --- a/realtime.py +++ b/realtime.py @@ -77,26 +77,48 @@ def convert_to_local_time(timestamp): class TradeTickStorage: """Store and manage raw trade ticks for display and candle formation""" - def __init__(self, max_age_seconds: int = 1800): # 30 minutes by default + def __init__(self, symbol: str = None, max_age_seconds: int = 3600): # 1 hour by default, up from 30 min self.ticks = [] + self.symbol = symbol or "unknown" # Use symbol name for cache files self.max_age_seconds = max_age_seconds self.last_cleanup_time = time.time() + self.last_cache_time = time.time() # Adjust cleanup interval based on max_age_seconds (more time = less frequent cleanup) self.cleanup_interval = min(max(10, max_age_seconds // 60), 60) # Between 10-60 seconds + self.cache_interval = 60 # Cache ticks every 60 seconds self.last_tick = None - logger.info(f"Initialized TradeTickStorage with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds") + self.cache_dir = "cache" + os.makedirs(self.cache_dir, exist_ok=True) + logger.info(f"Initialized TradeTickStorage for {self.symbol} with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds") + + # Try to load cached ticks on startup + self._load_cached_ticks() def add_tick(self, tick: Dict): """Add a new trade tick to storage""" - self.ticks.append(tick) - self.last_tick = tick - logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}") + # Check for duplicate timestamps + if self.ticks and tick['timestamp'] == self.ticks[-1]['timestamp']: + # For duplicate timestamps, update volume instead of adding new tick + self.ticks[-1]['volume'] += tick.get('volume', 0) + # Update price to the newest one + self.ticks[-1]['price'] = tick['price'] + logger.debug(f"Merged tick with duplicate timestamp: {tick['timestamp']}") + else: + # No duplicate, add as new tick + self.ticks.append(tick) + self.last_tick = tick + logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}") # Only clean up periodically rather than on every tick current_time = time.time() if current_time - self.last_cleanup_time > self.cleanup_interval: self._cleanup() self.last_cleanup_time = current_time + + # Cache ticks periodically + if current_time - self.last_cache_time > self.cache_interval: + self._cache_ticks() + self.last_cache_time = current_time def _cleanup(self): """Remove ticks older than max_age_seconds""" @@ -107,6 +129,64 @@ class TradeTickStorage: removed = old_count - len(self.ticks) if removed > 0: logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}") + + def _cache_ticks(self): + """Cache recent ticks to disk""" + if not self.ticks: + return + + # Get ticks from last 10 minutes + now = int(time.time() * 1000) # Current time in ms + cutoff = now - (600 * 1000) # 10 minutes in ms + recent_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] + + if not recent_ticks: + logger.debug("No recent ticks to cache") + return + + # Create symbol-specific filename + symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower() + cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv") + + # Save to disk + try: + tick_df = pd.DataFrame(recent_ticks) + tick_df.to_csv(cache_file, index=False) + logger.info(f"Cached {len(recent_ticks)} recent ticks for {self.symbol} to {cache_file}") + except Exception as e: + logger.error(f"Error caching ticks: {str(e)}") + + def _load_cached_ticks(self): + """Load cached ticks from disk on startup""" + # Create symbol-specific filename + symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower() + cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv") + + if not os.path.exists(cache_file): + logger.info(f"No cached ticks found for {self.symbol}") + return + + try: + # Check if cache is fresh (less than 10 minutes old) + file_age = time.time() - os.path.getmtime(cache_file) + if file_age > 600: # 10 minutes + logger.info(f"Cached ticks for {self.symbol} are too old ({file_age:.1f}s), skipping") + return + + # Load cached ticks + tick_df = pd.read_csv(cache_file) + if tick_df.empty: + logger.info(f"Cached ticks file for {self.symbol} is empty") + return + + # Convert to list of dicts and add to storage + cached_ticks = tick_df.to_dict('records') + self.ticks.extend(cached_ticks) + logger.info(f"Loaded {len(cached_ticks)} cached ticks for {self.symbol} from {cache_file}") + except Exception as e: + logger.error(f"Error loading cached ticks for {self.symbol}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) def get_latest_price(self) -> Optional[float]: """Get the latest price from the most recent tick""" @@ -186,73 +266,256 @@ class TradeTickStorage: Returns: DataFrame with OHLCV candles """ - if not self.ticks: - logger.warning("No ticks available for candle formation") - return pd.DataFrame() - - # Ensure ticks are up to date - self._cleanup() - - # Get ticks from specified time range if provided - if start_time_ms is not None or end_time_ms is not None: - logger.debug(f"Filtering ticks for time range from {start_time_ms} to {end_time_ms}") - filtered_ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) - if not filtered_ticks: - logger.warning("No ticks in the specified time range") - return pd.DataFrame() - df = pd.DataFrame(filtered_ticks) - else: - # Use all available ticks - df = self.get_ticks_as_df() - - if df.empty: - logger.warning("Tick DataFrame is empty after filtering/conversion") - return pd.DataFrame() - - logger.info(f"Preparing to create candles from {len(df)} ticks") - try: - # Use timestamp column for resampling - df = df.set_index('timestamp') + if not self.ticks: + logger.warning("No ticks available for candle formation") + return pd.DataFrame() + # Ensure ticks are up to date + try: + self._cleanup() + except Exception as cleanup_error: + logger.error(f"Error cleaning up ticks: {str(cleanup_error)}") + # Continue with existing ticks + + # Get ticks from specified time range if provided + if start_time_ms is not None or end_time_ms is not None: + logger.debug(f"Filtering ticks for time range from {start_time_ms} to {end_time_ms}") + try: + filtered_ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) + if not filtered_ticks: + logger.warning("No ticks in the specified time range") + return pd.DataFrame() + df = pd.DataFrame(filtered_ticks) + except Exception as filter_error: + logger.error(f"Error filtering ticks by time: {str(filter_error)}") + return pd.DataFrame() + else: + # Use all available ticks + try: + df = self.get_ticks_as_df() + except Exception as df_error: + logger.error(f"Error getting ticks as DataFrame: {str(df_error)}") + return pd.DataFrame() + + if df.empty: + logger.warning("Tick DataFrame is empty after filtering/conversion") + return pd.DataFrame() + + logger.info(f"Preparing to create candles from {len(df)} ticks with {interval_seconds}s interval") + + # First, ensure all required columns exist + required_columns = ['timestamp', 'price', 'volume'] + for col in required_columns: + if col not in df.columns: + logger.error(f"Required column '{col}' missing from tick data") + return pd.DataFrame() + + # Make sure DataFrame has no duplicated timestamps before setting index + try: + if 'timestamp' in df.columns: + # Check for duplicate timestamps + duplicate_count = df['timestamp'].duplicated().sum() + if duplicate_count > 0: + logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping the last occurrence") + # Keep the last occurrence of each timestamp + df = df.drop_duplicates(subset='timestamp', keep='last') + + # Use timestamp column for resampling + df = df.set_index('timestamp') + + # Make sure index is sorted (required for resampling) + if not df.index.is_monotonic_increasing: + logger.warning("Timestamp index is not monotonically increasing, sorting...") + df = df.sort_index() + except Exception as prep_error: + logger.error(f"Error preprocessing DataFrame for resampling: {str(prep_error)}") + import traceback + logger.error(traceback.format_exc()) + return pd.DataFrame() + # Create interval string for resampling - use 's' instead of deprecated 'S' interval_str = f'{interval_seconds}s' - # Resample to create OHLCV candles + # Resample to create OHLCV candles with multiple fallback options logger.debug(f"Resampling with interval: {interval_str}") - candles = df.resample(interval_str).agg({ - 'price': ['first', 'max', 'min', 'last'], - 'volume': 'sum' - }) - # Check if resampling produced any data - if candles.empty: - logger.warning("Resampling produced empty dataframe - check timestamp distribution") - # Show timestamp ranges to diagnose potential resampling issues - if not df.empty: - min_time = df.index.min() - max_time = df.index.max() - logger.info(f"Tick timestamp range: {min_time} to {max_time}") + candles = None + + # First attempt - individual column resampling + try: + # Check that price column exists and has enough data + if 'price' not in df.columns: + raise ValueError("Price column missing from DataFrame") + + if len(df) < 2: + logger.warning("Not enough data points for resampling, using direct data") + # For single data point, create a single candle + if len(df) == 1: + price_val = df['price'].iloc[0] + volume_val = df['volume'].iloc[0] if 'volume' in df.columns else 0 + timestamp_val = df.index[0] + + candles = pd.DataFrame({ + 'timestamp': [timestamp_val], + 'open': [price_val], + 'high': [price_val], + 'low': [price_val], + 'close': [price_val], + 'volume': [volume_val] + }) + return candles + else: + # No data + return pd.DataFrame() + + # Resample and aggregate each column separately + open_df = df['price'].resample(interval_str).first() + high_df = df['price'].resample(interval_str).max() + low_df = df['price'].resample(interval_str).min() + close_df = df['price'].resample(interval_str).last() + volume_df = df['volume'].resample(interval_str).sum() + + # Check for length mismatches before combining + expected_length = len(open_df) + if (len(high_df) != expected_length or + len(low_df) != expected_length or + len(close_df) != expected_length or + len(volume_df) != expected_length): + logger.warning("Length mismatch in resampled columns, falling back to alternative method") + raise ValueError("Length mismatch") + + # Combine into a single DataFrame + candles = pd.DataFrame({ + 'open': open_df, + 'high': high_df, + 'low': low_df, + 'close': close_df, + 'volume': volume_df + }) + logger.debug(f"Successfully created {len(candles)} candles with individual column resampling") + except Exception as resample_error: + logger.error(f"Error in individual column resampling: {str(resample_error)}") + + # Second attempt - built-in agg method + try: + logger.debug("Trying fallback resampling method with agg()") + candles = df.resample(interval_str).agg({ + 'price': ['first', 'max', 'min', 'last'], + 'volume': 'sum' + }) + # Flatten MultiIndex columns + candles.columns = ['open', 'high', 'low', 'close', 'volume'] + logger.debug(f"Successfully created {len(candles)} candles with agg() method") + except Exception as agg_error: + logger.error(f"Error in agg() resampling: {str(agg_error)}") + + # Third attempt - manual candle construction + try: + logger.debug("Trying manual candle construction method") + resampler = df.resample(interval_str) + candle_data = [] + + for name, group in resampler: + if not group.empty: + candle = { + 'timestamp': name, + 'open': group['price'].iloc[0], + 'high': group['price'].max(), + 'low': group['price'].min(), + 'close': group['price'].iloc[-1], + 'volume': group['volume'].sum() if 'volume' in group.columns else 0 + } + candle_data.append(candle) + + if candle_data: + candles = pd.DataFrame(candle_data) + logger.debug(f"Successfully created {len(candles)} candles with manual method") + else: + logger.warning("No candles created with manual method") + return pd.DataFrame() + except Exception as manual_error: + logger.error(f"Error in manual candle construction: {str(manual_error)}") + import traceback + logger.error(traceback.format_exc()) + return pd.DataFrame() + + # Ensure the result isn't empty + if candles is None or candles.empty: + logger.warning("No candles were created after all resampling attempts") return pd.DataFrame() - - # Flatten MultiIndex columns - candles.columns = ['open', 'high', 'low', 'close', 'volume'] - + # Reset index to get timestamp as column - candles = candles.reset_index() + try: + candles = candles.reset_index() + except Exception as reset_error: + logger.error(f"Error resetting index: {str(reset_error)}") + # Try to create a new DataFrame with the timestamp index as a column + try: + timestamp_col = candles.index.to_list() + candles_dict = candles.to_dict('list') + candles_dict['timestamp'] = timestamp_col + candles = pd.DataFrame(candles_dict) + except Exception as fallback_error: + logger.error(f"Error in fallback index reset: {str(fallback_error)}") + return pd.DataFrame() # Ensure no NaN values - candles = candles.dropna() + try: + nan_count_before = candles.isna().sum().sum() + if nan_count_before > 0: + logger.warning(f"Found {nan_count_before} NaN values in candles, dropping them") + + candles = candles.dropna() + except Exception as nan_error: + logger.error(f"Error handling NaN values: {str(nan_error)}") + # Try to fill NaN values instead of dropping + try: + candles = candles.fillna(method='ffill').fillna(method='bfill') + except: + pass logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks") return candles except Exception as e: - logger.error(f"Error in candle formation: {str(e)}") + logger.error(f"Unhandled error in candle formation: {str(e)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() + def get_candle_stats(self) -> Dict: + """Get statistics about cached candles for different intervals""" + stats = {} + + # Define intervals to check + intervals = [1, 5, 15, 60, 300, 900, 3600] + + for interval in intervals: + candles = self.get_candles(interval_seconds=interval) + count = len(candles) if not candles.empty else 0 + + # Get time range if we have candles + time_range = None + if count > 0: + try: + start_time = candles['timestamp'].min() + end_time = candles['timestamp'].max() + if isinstance(start_time, pd.Timestamp): + start_time = start_time.strftime('%Y-%m-%d %H:%M:%S') + if isinstance(end_time, pd.Timestamp): + end_time = end_time.strftime('%Y-%m-%d %H:%M:%S') + time_range = f"{start_time} to {end_time}" + except: + time_range = "Unknown" + + stats[f"{interval}s"] = { + 'count': count, + 'time_range': time_range + } + + return stats + def get_ticks_from_time(self, start_time_ms: int = None, end_time_ms: int = None) -> List[Dict]: """Get ticks within a specific time range @@ -715,11 +978,18 @@ class CandleCache: return pd.DataFrame() def update_cache(self, interval: str, new_candles: pd.DataFrame): + """ + Update the candle cache for a specific timeframe with new candles + + Args: + interval: The timeframe interval ('1s', '1m', '1h', '1d') + new_candles: DataFrame with new candles to add to the cache + """ if interval not in self.candles: logger.warning(f"Invalid interval {interval} for cache update") return - if new_candles.empty: + if new_candles is None or new_candles.empty: logger.debug(f"No new candles to update {interval} cache") return @@ -729,34 +999,131 @@ class CandleCache: return try: + # Ensure timestamp is datetime for proper comparison + try: + if not pd.api.types.is_datetime64_any_dtype(new_candles['timestamp']): + logger.debug(f"Converting timestamps to datetime for {interval}") + new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp']) + except Exception as e: + logger.error(f"Error converting timestamps: {str(e)}") + # Try a different approach + try: + new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp'], errors='coerce') + # Drop any rows where conversion failed + new_candles = new_candles.dropna(subset=['timestamp']) + if new_candles.empty: + logger.warning(f"All timestamps conversion failed for {interval}") + return + except Exception as e2: + logger.error(f"Second attempt to convert timestamps failed: {str(e2)}") + return + + # Create a copy to avoid modifying the original + new_candles_copy = new_candles.copy() + # If we have no candles in cache, add all new candles if not self.candles[interval]: - logger.debug(f"No existing candles for {interval}, adding all {len(new_candles)} candles") - self.add_candles(interval, new_candles) + logger.debug(f"No existing candles for {interval}, adding all {len(new_candles_copy)} candles") + self.add_candles(interval, new_candles_copy) return # Get the timestamp from the last cached candle last_cached_candle = self.candles[interval][-1] - if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle: - last_cached_time = last_cached_candle['timestamp'] - logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}") + if not isinstance(last_cached_candle, dict): + logger.warning(f"Last cached candle is not a dictionary for {interval}") + last_cached_candle = {'timestamp': None} - # Filter new candles that are after the last cached candle - filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time] + if 'timestamp' not in last_cached_candle: + logger.warning(f"No timestamp in last cached candle for {interval}") + last_cached_candle['timestamp'] = None + + last_cached_time = last_cached_candle['timestamp'] + logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}") + + # If last_cached_time is None, add all candles + if last_cached_time is None: + logger.debug(f"No valid last cached timestamp, adding all {len(new_candles_copy)} candles for {interval}") + self.add_candles(interval, new_candles_copy) + return + + # Convert last_cached_time to datetime if needed + if not isinstance(last_cached_time, (pd.Timestamp, datetime)): + try: + last_cached_time = pd.to_datetime(last_cached_time) + except Exception as e: + logger.error(f"Cannot convert last cached time to datetime: {str(e)}") + # Add all candles as fallback + self.add_candles(interval, new_candles_copy) + return + + # Make a backup of current cache before filtering + cache_backup = list(self.candles[interval]) + + # Filter new candles that are after the last cached candle + try: + filtered_candles = new_candles_copy[new_candles_copy['timestamp'] > last_cached_time] if not filtered_candles.empty: logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}") self.add_candles(interval, filtered_candles) else: - logger.debug(f"No new candles to add for {interval}") - else: - logger.warning(f"Invalid last cached candle format for {interval}") - + # No new candles after last cached time, check for missing candles + try: + # Get unique timestamps in cache + cached_timestamps = set() + for candle in self.candles[interval]: + if isinstance(candle, dict) and 'timestamp' in candle: + ts = candle['timestamp'] + if isinstance(ts, (pd.Timestamp, datetime)): + cached_timestamps.add(ts) + else: + try: + cached_timestamps.add(pd.to_datetime(ts)) + except: + pass + + # Find candles in new_candles that aren't in the cache + missing_candles = new_candles_copy[~new_candles_copy['timestamp'].isin(cached_timestamps)] + + if not missing_candles.empty: + logger.info(f"Found {len(missing_candles)} missing candles for {interval}") + self.add_candles(interval, missing_candles) + else: + logger.debug(f"No new or missing candles to add for {interval}") + except Exception as missing_error: + logger.error(f"Error checking for missing candles: {str(missing_error)}") + except Exception as filter_error: + logger.error(f"Error filtering candles by timestamp: {str(filter_error)}") + # Restore from backup + self.candles[interval] = deque(cache_backup, maxlen=self.candles[interval].maxlen) + # Try adding all candles as fallback + self.add_candles(interval, new_candles_copy) except Exception as e: - logger.error(f"Error updating cache for {interval}: {str(e)}") + logger.error(f"Unhandled error updating cache for {interval}: {str(e)}") import traceback logger.error(traceback.format_exc()) + def get_candles(self, timeframe: str, count: int = 500) -> pd.DataFrame: + """ + Get candles for a specific timeframe. This is an alias for get_recent_candles + to maintain compatibility with code that expects this method name. + + Args: + timeframe: The timeframe interval ('1s', '1m', '1h', '1d') + count: Maximum number of candles to return + + Returns: + DataFrame containing the candles + """ + try: + logger.debug(f"Getting {count} candles for {timeframe} via get_candles()") + return self.get_recent_candles(timeframe, count) + except Exception as e: + logger.error(f"Error in get_candles for {timeframe}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return pd.DataFrame() + class RealTimeChart: def __init__(self, symbol: str): self.symbol = symbol @@ -765,7 +1132,7 @@ class RealTimeChart: suppress_callback_exceptions=True, meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]) self.candlestick_data = CandlestickData() - self.tick_storage = TradeTickStorage(max_age_seconds=1800) # Store 30 minutes of ticks + self.tick_storage = TradeTickStorage(symbol=symbol, max_age_seconds=3600) # Store 1 hour of ticks self.ohlcv_cache = { # Cache for different intervals '1s': None, '1m': None, @@ -1119,8 +1486,13 @@ class RealTimeChart: logger.info(f"We have {len(cached_candles)} cached 1m candles available") # Get candlesticks from tick storage for real-time data - df = self.tick_storage.get_candles(interval_seconds=interval) - logger.debug(f"Got {len(df) if not df.empty else 0} candles from tick storage") + try: + df = self.tick_storage.get_candles(interval_seconds=interval) + logger.debug(f"Got {len(df) if not df.empty else 0} candles from tick storage") + except Exception as candle_error: + logger.error(f"Error getting candles from tick storage: {str(candle_error)}") + logger.error(f"Using empty DataFrame to avoid chart restart") + df = pd.DataFrame() # If we don't have real-time data yet, use cached data if df.empty and cached_candles is not None and not cached_candles.empty: @@ -1128,419 +1500,300 @@ class RealTimeChart: logger.info(f"Using {len(df)} cached candles for main chart") # Get current price and stats using our enhanced methods - current_price = self.tick_storage.get_latest_price() - price_stats = self.tick_storage.get_price_stats() - time_stats = self.tick_storage.get_time_based_stats() + try: + current_price = self.tick_storage.get_latest_price() + price_stats = self.tick_storage.get_price_stats() + time_stats = self.tick_storage.get_time_based_stats() + candle_stats = self.tick_storage.get_candle_stats() + except Exception as stats_error: + logger.error(f"Error getting market stats: {str(stats_error)}") + # Provide fallback values to avoid chart restart + current_price = None + price_stats = {'count': 0, 'min': None, 'max': None, 'latest': None, 'age_seconds': 0} + time_stats = {'total_ticks': 0, 'periods': {}} + candle_stats = {} # Periodically save candles to disk if n % 60 == 0 or is_first_render: # Every 60 chart updates or on first render - self._save_candles_to_disk() + try: + self._save_candles_to_disk() + except Exception as save_error: + logger.error(f"Error saving candles: {str(save_error)}") logger.debug(f"Current price: {current_price}, Stats: {price_stats}") # Create subplot layout - don't include 1s since that's the main chart - fig = make_subplots( - rows=5, cols=1, - vertical_spacing=0.05, - subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1m OHLCV', '1h OHLCV', '1d OHLCV'), - row_heights=[0.2, 0.2, 0.2, 0.2, 0.2] - ) + try: + fig = make_subplots( + rows=5, cols=1, + vertical_spacing=0.05, + subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1m OHLCV', '1h OHLCV', '1d OHLCV'), + row_heights=[0.2, 0.2, 0.2, 0.2, 0.2] + ) + except Exception as subplot_error: + logger.error(f"Error creating subplot layout: {str(subplot_error)}") + # Create a simpler layout if the complex one fails + fig = make_subplots( + rows=2, cols=1, + vertical_spacing=0.05, + subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume'), + row_heights=[0.7, 0.3] + ) # Process and add main chart data - if not df.empty and len(df) > 0: - # Limit how many candles we display for better performance - display_df = df - if len(df) > 500: - logger.debug(f"Limiting main chart display from {len(df)} to 500 candles") - display_df = df.tail(500) - - logger.debug(f"Displaying {len(display_df)} candles in main chart") - - # Convert timestamps to local time for display - display_df = display_df.copy() - try: - display_df['local_time'] = display_df['timestamp'].apply(convert_to_local_time) - logger.debug(f"Converted timestamps to local time ({local_timezone})") - except Exception as e: - logger.error(f"Error converting timestamps to local time: {str(e)}") - display_df['local_time'] = display_df['timestamp'] - - # Add candlestick chart - fig.add_trace( - go.Candlestick( - x=display_df['local_time'], # Use local time for display - open=display_df['open'], - high=display_df['high'], - low=display_df['low'], - close=display_df['close'], - name='Price', - increasing_line_color='#33CC33', # Green - decreasing_line_color='#FF4136' # Red - ), - row=1, col=1 - ) - - # Add volume bars - colors = ['#33CC33' if close >= open else '#FF4136' - for close, open in zip(display_df['close'], display_df['open'])] - - fig.add_trace( - go.Bar( - x=display_df['local_time'], # Use local time for display - y=display_df['volume'], - name='Volume', - marker_color=colors - ), - row=2, col=1 - ) - - # Add latest price line from the candlestick data - latest_price = display_df['close'].iloc[-1] - latest_time = display_df['local_time'].iloc[-1] - earliest_time = display_df['local_time'].iloc[0] - - fig.add_shape( - type="line", - x0=earliest_time, - y0=latest_price, - x1=latest_time, - y1=latest_price, - line=dict(color="yellow", width=1, dash="dash"), - row=1, col=1 - ) - - # Format the last update time in local timezone - try: - last_update_time = latest_time.strftime('%Y-%m-%d %H:%M:%S') - except: - last_update_time = datetime.now().astimezone(local_timezone).strftime('%Y-%m-%d %H:%M:%S') - - # Annotation for last candle close price - fig.add_annotation( - x=latest_time, - y=latest_price, - text=f"{latest_price:.2f} ({last_update_time})", - showarrow=False, - font=dict(size=14, color="yellow"), - xshift=50, - row=1, col=1 - ) - - # If we have a more recent price from ticks, add that too - if current_price and abs(current_price - latest_price) > 0.01: - # Add current price line - fig.add_shape( - type="line", - x0=earliest_time, - y0=current_price, - x1=latest_time, - y1=current_price, - line=dict(color="cyan", width=1, dash="dot"), - row=1, col=1 - ) - - # Format current time in local timezone - current_time_str = datetime.now().astimezone(local_timezone).strftime('%Y-%m-%d %H:%M:%S') - - # Add current price annotation - fig.add_annotation( - x=latest_time, - y=current_price, - text=f"Current: {current_price:.2f} ({current_time_str})", - showarrow=False, - font=dict(size=14, color="cyan"), - xshift=50, - yshift=20, - row=1, col=1 - ) - - # Update candle cache for all timeframes if we have new data + try: + # Main price chart if not df.empty: - try: - # Update the 1s cache with current df (if it's 1s data) - if interval == 1: - self.candle_cache.update_cache('1s', df) - self.ohlcv_cache['1s'] = self.candle_cache.get_recent_candles('1s', count=2000) - logger.debug(f"Updated 1s cache, now has {len(self.ohlcv_cache['1s'])} candles") - - # For other intervals, get fresh data from tick storage - for interval_key in ['1m', '1h', '1d']: - int_seconds = self._interval_to_seconds(interval_key) - new_candles = self.tick_storage.get_candles(interval_seconds=int_seconds) - if not new_candles.empty: - self.candle_cache.update_cache(interval_key, new_candles) - self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000) - logger.debug(f"Updated cache for {interval_key}, now has {len(self.ohlcv_cache[interval_key])} candles") - except Exception as e: - logger.error(f"Error updating candle caches: {str(e)}") - - # Add OHLCV subcharts for 1m, 1h, 1d (not 1s since it's the main chart) - timeframe_map = { - '1m': (3, '1 Minute'), - '1h': (4, '1 Hour'), - '1d': (5, '1 Day') - } - - for interval_key, (row_idx, label) in timeframe_map.items(): - ohlcv_df = self.ohlcv_cache.get(interval_key) - if ohlcv_df is not None and not ohlcv_df.empty: - # Limit to last 100 candles to avoid overcrowding - if len(ohlcv_df) > 100: - ohlcv_df = ohlcv_df.tail(100) - - # Convert to local time - ohlcv_display_df = ohlcv_df.copy() - try: - ohlcv_display_df['local_time'] = ohlcv_df['timestamp'].apply(convert_to_local_time) - except Exception as e: - logger.error(f"Error converting {interval_key} timestamps to local time: {str(e)}") - ohlcv_display_df['local_time'] = ohlcv_df['timestamp'] - - fig.add_trace( - go.Candlestick( - x=ohlcv_display_df['local_time'], - open=ohlcv_display_df['open'], - high=ohlcv_display_df['high'], - low=ohlcv_display_df['low'], - close=ohlcv_display_df['close'], - name=f'{label}', - increasing_line_color='#33CC33', - decreasing_line_color='#FF4136', - showlegend=False - ), - row=row_idx, col=1 - ) - - # Add latest price line - latest_timeframe_price = ohlcv_display_df['close'].iloc[-1] if len(ohlcv_display_df) > 0 else None - if latest_timeframe_price: - # Get first and last timestamps - earliest_tf_time = ohlcv_display_df['local_time'].iloc[0] - latest_tf_time = ohlcv_display_df['local_time'].iloc[-1] - - fig.add_shape( - type="line", - x0=earliest_tf_time, - y0=latest_timeframe_price, - x1=latest_tf_time, - y1=latest_timeframe_price, - line=dict(color="yellow", width=1, dash="dash"), - row=row_idx, col=1 - ) - - # Get formatted time - try: - update_time = latest_tf_time.strftime('%Y-%m-%d %H:%M') - except: - update_time = "" - - # Add price annotation - fig.add_annotation( - x=latest_tf_time, - y=latest_timeframe_price, - text=f"{latest_timeframe_price:.2f} ({update_time})", - showarrow=False, - font=dict(size=12, color="yellow"), - xshift=50, - row=row_idx, col=1 - ) - else: - # If no data, add a text annotation to the chart - logger.warning(f"No data to display for {self.symbol} - tick count: {len(self.tick_storage.ticks)}") - - # Try to display cached data for each timeframe even if main chart is empty - timeframe_map = { - '1m': (3, '1 Minute'), - '1h': (4, '1 Hour'), - '1d': (5, '1 Day') - } - - has_any_data = False - for interval_key, (row_idx, label) in timeframe_map.items(): - ohlcv_df = self.ohlcv_cache.get(interval_key) - if ohlcv_df is not None and not ohlcv_df.empty: - has_any_data = True - # Limit to last 100 candles - if len(ohlcv_df) > 100: - ohlcv_df = ohlcv_df.tail(100) - - # Convert to local time - ohlcv_display_df = ohlcv_df.copy() - try: - ohlcv_display_df['local_time'] = ohlcv_df['timestamp'].apply(convert_to_local_time) - except Exception as e: - logger.error(f"Error converting {interval_key} timestamps to local time: {str(e)}") - ohlcv_display_df['local_time'] = ohlcv_df['timestamp'] - - fig.add_trace( - go.Candlestick( - x=ohlcv_display_df['local_time'], - open=ohlcv_display_df['open'], - high=ohlcv_display_df['high'], - low=ohlcv_display_df['low'], - close=ohlcv_display_df['close'], - name=f'{label}', - increasing_line_color='#33CC33', - decreasing_line_color='#FF4136', - showlegend=False - ), - row=row_idx, col=1 - ) - - # Add latest price line - latest_timeframe_price = ohlcv_display_df['close'].iloc[-1] if len(ohlcv_display_df) > 0 else None - if latest_timeframe_price: - # Get first and last timestamps - earliest_tf_time = ohlcv_display_df['local_time'].iloc[0] - latest_tf_time = ohlcv_display_df['local_time'].iloc[-1] - - fig.add_shape( - type="line", - x0=earliest_tf_time, - y0=latest_timeframe_price, - x1=latest_tf_time, - y1=latest_timeframe_price, - line=dict(color="yellow", width=1, dash="dash"), - row=row_idx, col=1 - ) - - # Get formatted time - try: - update_time = latest_tf_time.strftime('%Y-%m-%d %H:%M') - except: - update_time = "" - - # Add price annotation - fig.add_annotation( - x=latest_tf_time, - y=latest_timeframe_price, - text=f"{latest_timeframe_price:.2f} ({update_time})", - showarrow=False, - font=dict(size=12, color="yellow"), - xshift=50, - row=row_idx, col=1 - ) - - # Add a message to the empty main chart - fig.add_annotation( - x=0.5, y=0.5, - text=f"Waiting for {self.symbol} real-time data...", - showarrow=False, - font=dict(size=20, color="white"), - xref="paper", yref="paper", - row=1, col=1 - ) - - if not has_any_data: - # If no data at all, show message - fig.add_annotation( - x=0.5, y=0.5, - text=f"No data available for {self.symbol}", - showarrow=False, - font=dict(size=20, color="white"), - xref="paper", yref="paper" + fig.add_trace( + go.Candlestick( + x=df.index, + open=df['open'], + high=df['high'], + low=df['low'], + close=df['close'], + name='OHLC' + ), + row=1, col=1 ) + + # Volume chart + fig.add_trace( + go.Bar( + x=df.index, + y=df['volume'], + name='Volume', + marker_color='rgba(0, 0, 255, 0.5)' + ), + row=2, col=1 + ) + except Exception as render_error: + logger.error(f"Error rendering main chart data: {str(render_error)}") - # Build info box text with all the statistics - info_lines = [f"{self.symbol}"] - - # Add current time in local timezone - current_local_time = datetime.now().astimezone(local_timezone) - info_lines.append(f"Time: {current_local_time.strftime('%Y-%m-%d %H:%M:%S')} ({tz_name})") - - # Add current price if available - if current_price: - info_lines.append(f"Current: {current_price:.2f} USDT") - - # Add price statistics if available - if price_stats['count'] > 0: - # Format time range - age_text = f"{price_stats['age_seconds']:.1f}s" - if price_stats['age_seconds'] > 60: - minutes = int(price_stats['age_seconds'] / 60) - seconds = int(price_stats['age_seconds'] % 60) - age_text = f"{minutes}m {seconds}s" + # Add 1-minute candles in a separate subplot + try: + candles_1m = self.candle_cache.get_candles(timeframe='1m') + if candles_1m is not None and not candles_1m.empty: + logger.debug(f"Rendering 1m chart with {len(candles_1m)} candles") + # Candlestick chart for 1m + fig.add_trace( + go.Candlestick( + x=candles_1m.index, + open=candles_1m['open'], + high=candles_1m['high'], + low=candles_1m['low'], + close=candles_1m['close'], + name='1m' + ), + row=3, col=1 + ) + + # Volume bars for 1m - inline at the bottom of the chart + # Determine colors based on price movement + colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' + for open, close in zip(candles_1m['open'], candles_1m['close'])] + + # Add volume as bars at the bottom of the same subplot + # Scale down the volume to fit in the same chart + if 'volume' in candles_1m.columns: + # Get volume to 15% of the chart height + volume_data = candles_1m['volume'] + if len(volume_data) > 0: + y_range = max(candles_1m['high']) - min(candles_1m['low']) + if y_range > 0: + scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 + scaled_volume = volume_data * scale_factor + min_price = min(candles_1m['low']) + + fig.add_trace( + go.Bar( + x=candles_1m.index, + y=scaled_volume, + marker_color=colors, + name='1m Volume', + opacity=0.5, + yaxis='y3', + base=min(candles_1m['low']) - (y_range * 0.02), # Slightly below price + offset=0 + ), + row=3, col=1 + ) + + # Add a note about the scaled volume + fig.add_annotation( + x=candles_1m.index[0], + y=min_price, + text=f"Volume (scaled)", + showarrow=False, + font=dict(size=10, color="gray"), + xanchor="left", + yanchor="bottom", + row=3, col=1 + ) + except Exception as render_1m_error: + logger.error(f"Error rendering 1m chart: {str(render_1m_error)}") + + # Add 1-hour candles in a separate subplot + try: + candles_1h = self.candle_cache.get_candles(timeframe='1h') + if candles_1h is not None and not candles_1h.empty: + logger.debug(f"Rendering 1h chart with {len(candles_1h)} candles") + # Candlestick chart for 1h + fig.add_trace( + go.Candlestick( + x=candles_1h.index, + open=candles_1h['open'], + high=candles_1h['high'], + low=candles_1h['low'], + close=candles_1h['close'], + name='1h' + ), + row=4, col=1 + ) + + # Volume bars for 1h - inline at the bottom of the chart + # Determine colors based on price movement + colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' + for open, close in zip(candles_1h['open'], candles_1h['close'])] + + # Add volume as bars at the bottom of the same subplot + if 'volume' in candles_1h.columns: + # Get volume to 15% of the chart height + volume_data = candles_1h['volume'] + if len(volume_data) > 0: + y_range = max(candles_1h['high']) - min(candles_1h['low']) + if y_range > 0: + scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 + scaled_volume = volume_data * scale_factor + min_price = min(candles_1h['low']) + + fig.add_trace( + go.Bar( + x=candles_1h.index, + y=scaled_volume, + marker_color=colors, + name='1h Volume', + opacity=0.5, + yaxis='y4', + base=min(candles_1h['low']) - (y_range * 0.02), + offset=0 + ), + row=4, col=1 + ) + + # Add a note about the scaled volume + fig.add_annotation( + x=candles_1h.index[0], + y=min_price, + text=f"Volume (scaled)", + showarrow=False, + font=dict(size=10, color="gray"), + xanchor="left", + yanchor="bottom", + row=4, col=1 + ) + except Exception as render_1h_error: + logger.error(f"Error rendering 1h chart: {str(render_1h_error)}") + + # Add 1-day candles in a separate subplot + try: + candles_1d = self.candle_cache.get_candles(timeframe='1d') + if candles_1d is not None and not candles_1d.empty: + logger.debug(f"Rendering 1d chart with {len(candles_1d)} candles") + # Candlestick chart for 1d + fig.add_trace( + go.Candlestick( + x=candles_1d.index, + open=candles_1d['open'], + high=candles_1d['high'], + low=candles_1d['low'], + close=candles_1d['close'], + name='1d' + ), + row=5, col=1 + ) + + # Volume bars for 1d - inline at the bottom of the chart + # Determine colors based on price movement + colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' + for open, close in zip(candles_1d['open'], candles_1d['close'])] + + # Add volume as bars at the bottom of the same subplot + if 'volume' in candles_1d.columns: + # Get volume to 15% of the chart height + volume_data = candles_1d['volume'] + if len(volume_data) > 0: + y_range = max(candles_1d['high']) - min(candles_1d['low']) + if y_range > 0: + scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 + scaled_volume = volume_data * scale_factor + min_price = min(candles_1d['low']) + + fig.add_trace( + go.Bar( + x=candles_1d.index, + y=scaled_volume, + marker_color=colors, + name='1d Volume', + opacity=0.5, + yaxis='y5', + base=min(candles_1d['low']) - (y_range * 0.02), + offset=0 + ), + row=5, col=1 + ) + + # Add a note about the scaled volume + fig.add_annotation( + x=candles_1d.index[0], + y=min_price, + text=f"Volume (scaled)", + showarrow=False, + font=dict(size=10, color="gray"), + xanchor="left", + yanchor="bottom", + row=5, col=1 + ) + except Exception as render_1d_error: + logger.error(f"Error rendering 1d chart: {str(render_1d_error)}") + + # Update layout + try: + interval_text = { + 1: "1 second", + 5: "5 seconds", + 15: "15 seconds", + 30: "30 seconds", + 60: "1 minute" + }.get(interval, f"{interval}s") - # Add price range and change - if price_stats['min'] is not None and price_stats['max'] is not None: - price_range = f"Range: {price_stats['min']:.2f} - {price_stats['max']:.2f}" - info_lines.append(price_range) - - # Add tick count and time range - info_lines.append(f"Ticks: {price_stats['count']} in {age_text}") - - # Add candle count - candle_count = len(df) if not df.empty else 0 - info_lines.append(f"Candles: {candle_count} ({interval}s)") - - # Add time-based statistics - if time_stats and time_stats['periods']: - info_lines.append("Time-Based Stats:") - for period, stats in time_stats['periods'].items(): - if stats['tick_count'] > 0: - info_lines.append(f"{period}: {stats['tick_count']} ticks, {stats['ticks_per_second']:.2f}/s") - if stats['min_price'] is not None and stats['max_price'] is not None: - price_change = stats['last_price'] - stats['min_price'] - change_pct = (price_change / stats['min_price']) * 100 if stats['min_price'] > 0 else 0 - info_lines.append(f" Range: {stats['min_price']:.2f}-{stats['max_price']:.2f} ({change_pct:.2f}%)") - - # Add cache information - info_lines.append("Cached Candles:") - for interval_key, cache_df in self.ohlcv_cache.items(): - count = len(cache_df) if cache_df is not None else 0 - info_lines.append(f"{interval_key}: {count}") - - # Add info box to the chart - fig.add_annotation( - x=0.01, - y=0.99, - xref="paper", - yref="paper", - text="
".join(info_lines), - showarrow=False, - font=dict(size=12, color="white"), - align="left", - bgcolor="rgba(0,0,50,0.7)", - bordercolor="#3366CC", - borderwidth=2, - borderpad=5, - xanchor="left", - yanchor="top" - ) - - # Update layout with improved styling - interval_text = { - 1: "1 second", - 5: "5 seconds", - 15: "15 seconds", - 30: "30 seconds", - 60: "1 minute" - }.get(interval, f"{interval}s") - - fig.update_layout( - title_text=f"{self.symbol} Real-Time Data ({interval_text})", - title_x=0.5, # Center the title - xaxis_rangeslider_visible=False, - height=1800, # Increased height for taller display - template='plotly_dark', - paper_bgcolor='rgba(0,0,0,0)', - plot_bgcolor='rgba(25,25,50,1)', - font=dict(family="Arial, sans-serif", size=12, color="white"), - showlegend=True, - legend=dict( - yanchor="top", - y=0.99, - xanchor="left", - x=0.01 + fig.update_layout( + title_text=f"{self.symbol} Real-Time Data ({interval_text})", + title_x=0.5, # Center the title + xaxis_rangeslider_visible=False, + height=1800, # Increased height for taller display + template='plotly_dark', + paper_bgcolor='rgba(0,0,0,0)', + plot_bgcolor='rgba(25,25,50,1)', + font=dict(family="Arial, sans-serif", size=12, color="white"), + showlegend=True, + legend=dict( + yanchor="top", + y=0.99, + xanchor="left", + x=0.01 + ) ) - ) - # Update axes styling - fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') - fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') + # Update axes styling + fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') + fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') - return fig - + return fig + except Exception as layout_error: + logger.error(f"Error updating layout: {str(layout_error)}") + import traceback + logger.error(traceback.format_exc()) + return go.Figure() except Exception as e: logger.error(f"Error updating chart: {str(e)}") import traceback