From b9d2a36e50c9f9e8a62099d664c6a0630abd9c76 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 1 Apr 2025 15:10:50 +0300 Subject: [PATCH] better sticks --- realtime.py | 1972 +++++++++++++------------------------ train_rl_with_realtime.py | 110 ++- 2 files changed, 804 insertions(+), 1278 deletions(-) diff --git a/realtime.py b/realtime.py index aac8b05..a9cc3d5 100644 --- a/realtime.py +++ b/realtime.py @@ -20,6 +20,8 @@ from datetime import datetime, timedelta import pytz import tzlocal import threading +import random +import dash_bootstrap_components as dbc # Configure logging with more detailed format logging.basicConfig( @@ -215,86 +217,97 @@ def convert_to_local_time(timestamp): return timestamp class TradeTickStorage: - """Store and manage raw trade ticks for display and candle formation""" - def __init__(self, symbol: str = None, max_age_seconds: int = 3600): # 1 hour by default, up from 30 min + """Storage for trade ticks with a maximum age limit""" + + def __init__(self, symbol: str = None, max_age_seconds: int = 3600, use_sample_data: bool = True, log_no_ticks_warning: bool = False): # 1 hour by default, up from 30 min + """Initialize the tick storage + + Args: + symbol: Trading symbol + max_age_seconds: Maximum age for ticks to be stored + use_sample_data: If True, generate sample ticks when no real ticks available + log_no_ticks_warning: If True, log a warning when no ticks are available + """ + self.symbol = symbol self.ticks = [] - self.symbol = symbol or "unknown" # Use symbol name for cache files self.max_age_seconds = max_age_seconds self.last_cleanup_time = time.time() - self.last_cache_time = time.time() - # Adjust cleanup interval based on max_age_seconds (more time = less frequent cleanup) - self.cleanup_interval = min(max(10, max_age_seconds // 60), 60) # Between 10-60 seconds - self.cache_interval = 60 # Cache ticks every 60 seconds - self.last_tick = None + self.cleanup_interval = 60 # run cleanup every 60 seconds self.cache_dir = "cache" - os.makedirs(self.cache_dir, exist_ok=True) - logger.info(f"Initialized TradeTickStorage for {self.symbol} with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds") + self.use_sample_data = use_sample_data + self.log_no_ticks_warning = log_no_ticks_warning + self.last_sample_price = 83000.0 # Starting price for sample data (for BTC) + self.last_sample_time = time.time() * 1000 # Starting time for sample data + self.last_tick_time = 0 # Initialize last_tick_time attribute + self.tick_count = 0 # Initialize tick_count attribute - # Try to load cached ticks on startup + # Create cache directory if it doesn't exist + if not os.path.exists(self.cache_dir): + os.makedirs(self.cache_dir) + + # Try to load cached ticks self._load_cached_ticks() + logger.info(f"Initialized TradeTickStorage for {symbol} with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds") + def add_tick(self, tick: Dict): - """Add a new trade tick to storage""" - # Check for duplicate timestamps - if self.ticks and tick['timestamp'] == self.ticks[-1]['timestamp']: - # For duplicate timestamps, update volume instead of adding new tick - self.ticks[-1]['volume'] += tick.get('volume', 0) - # Update price to the newest one - self.ticks[-1]['price'] = tick['price'] - logger.debug(f"Merged tick with duplicate timestamp: {tick['timestamp']}") - else: - # No duplicate, add as new tick - self.ticks.append(tick) - self.last_tick = tick - logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}") + """Add a tick to storage - # Only clean up periodically rather than on every tick - current_time = time.time() - if current_time - self.last_cleanup_time > self.cleanup_interval: - self._cleanup() - self.last_cleanup_time = current_time + Args: + tick: Tick data dict with fields: + price: The price + volume: The volume + timestamp: Timestamp in milliseconds + """ + if not tick: + return - # Cache ticks periodically - if current_time - self.last_cache_time > self.cache_interval: - self._cache_ticks() - self.last_cache_time = current_time + # Check if we need to generate a timestamp + if 'timestamp' not in tick: + tick['timestamp'] = int(time.time() * 1000) # Current time in ms + # Ensure timestamp is an integer (milliseconds since epoch) + if not isinstance(tick['timestamp'], int): + try: + # Try to convert from float or string + tick['timestamp'] = int(float(tick['timestamp'])) + except (ValueError, TypeError): + # If conversion fails, use current time + tick['timestamp'] = int(time.time() * 1000) + + # Set default volume if not present + if 'volume' not in tick: + tick['volume'] = 0.01 # Default small volume + + # Add tick to storage with a copy to avoid mutation + self.ticks.append(tick.copy()) + + # Keep track of latest tick for stats + self.last_tick_time = max(self.last_tick_time, tick['timestamp']) + + # Cache every 100 ticks to avoid data loss + self.tick_count += 1 + if self.tick_count % 100 == 0: + self._cache_ticks() + + # Periodically clean up old ticks + if self.tick_count % 1000 == 0: + self._cleanup() + def _cleanup(self): """Remove ticks older than max_age_seconds""" - now = int(time.time() * 1000) # Current time in ms + # Get current time in milliseconds + now = int(time.time() * 1000) + + # Remove old ticks cutoff = now - (self.max_age_seconds * 1000) - old_count = len(self.ticks) + original_count = len(self.ticks) self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] - removed = old_count - len(self.ticks) + removed = original_count - len(self.ticks) + if removed > 0: logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}") - def _cache_ticks(self): - """Cache recent ticks to disk""" - if not self.ticks: - return - - # Get ticks from last 10 minutes - now = int(time.time() * 1000) # Current time in ms - cutoff = now - (600 * 1000) # 10 minutes in ms - recent_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] - - if not recent_ticks: - logger.debug("No recent ticks to cache") - return - - # Create symbol-specific filename - symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower() - cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv") - - # Save to disk - try: - tick_df = pd.DataFrame(recent_ticks) - tick_df.to_csv(cache_file, index=False) - logger.info(f"Cached {len(recent_ticks)} recent ticks for {self.symbol} to {cache_file}") - except Exception as e: - logger.error(f"Error caching ticks: {str(e)}") - def _load_cached_ticks(self): """Load cached ticks from disk on startup""" # Create symbol-specific filename @@ -327,14 +340,36 @@ class TradeTickStorage: import traceback logger.error(traceback.format_exc()) + def _cache_ticks(self): + """Cache recent ticks to disk""" + if not self.ticks: + return + + # Get ticks from last 10 minutes + now = int(time.time() * 1000) # Current time in ms + cutoff = now - (600 * 1000) # 10 minutes in ms + recent_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff] + + if not recent_ticks: + logger.debug("No recent ticks to cache") + return + + # Create symbol-specific filename + symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower() + cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv") + + # Save to disk + try: + tick_df = pd.DataFrame(recent_ticks) + tick_df.to_csv(cache_file, index=False) + logger.info(f"Cached {len(recent_ticks)} recent ticks for {self.symbol} to {cache_file}") + except Exception as e: + logger.error(f"Error caching ticks: {str(e)}") + def get_latest_price(self) -> Optional[float]: """Get the latest price from the most recent tick""" - if self.last_tick: - return self.last_tick.get('price') - elif self.ticks: - # If last_tick not available but ticks exist, use the last tick - self.last_tick = self.ticks[-1] - return self.last_tick.get('price') + if self.ticks: + return self.ticks[-1].get('price') return None def get_price_stats(self) -> Dict: @@ -369,7 +404,13 @@ class TradeTickStorage: # Ensure we have fresh data self._cleanup() - df = pd.DataFrame(self.ticks) + # Create a new list from ticks to avoid modifying the original data + ticks_data = self.ticks.copy() + + # Ensure we have the latest ticks at the end of the DataFrame + ticks_data.sort(key=lambda x: x['timestamp']) + + df = pd.DataFrame(ticks_data) if not df.empty: logger.debug(f"Converting timestamps for {len(df)} ticks") # Ensure timestamp column exists @@ -395,233 +436,270 @@ class TradeTickStorage: return df def get_candles(self, interval_seconds: int = 1, start_time_ms: int = None, end_time_ms: int = None) -> pd.DataFrame: - """Convert ticks to OHLCV candles at specified interval with optional time range filtering + """Generate candlestick data from ticks Args: - interval_seconds: The interval in seconds for each candle - start_time_ms: Optional start time in milliseconds for filtering - end_time_ms: Optional end time in milliseconds for filtering - + interval_seconds: Interval in seconds for each candle + start_time_ms: Start time in milliseconds + end_time_ms: End time in milliseconds + Returns: - DataFrame with OHLCV candles + DataFrame with candlestick data """ - try: - if not self.ticks: + # Get filtered ticks + ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) + + if not ticks: + if self.use_sample_data: + # Generate multiple sample ticks to create several candles + current_time = int(time.time() * 1000) + sample_ticks = [] + + # Generate ticks for the past 10 intervals + for i in range(20): + # Base price with some trend + base_price = self.last_sample_price * (1 + 0.0001 * (10 - i)) + + # Add some randomness to the price + random_factor = random.uniform(-0.002, 0.002) # Small random change + tick_price = base_price * (1 + random_factor) + + # Create timestamp with appropriate offset + tick_time = current_time - (i * interval_seconds * 1000 // 2) + + sample_tick = { + 'price': tick_price, + 'volume': random.uniform(0.01, 0.5), + 'timestamp': tick_time, + 'is_sample': True + } + + sample_ticks.append(sample_tick) + + # Update the last sample values + self.last_sample_price = sample_ticks[0]['price'] + self.last_sample_time = sample_ticks[0]['timestamp'] + + # Add the sample ticks in chronological order + for tick in sorted(sample_ticks, key=lambda x: x['timestamp']): + self.add_tick(tick) + + # Try again with the new ticks + ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) + + if not ticks and self.log_no_ticks_warning: + logger.warning("Still no ticks available after adding sample data") + elif self.log_no_ticks_warning: logger.warning("No ticks available for candle formation") - return pd.DataFrame() - - # Ensure ticks are up to date - try: - self._cleanup() - except Exception as cleanup_error: - logger.error(f"Error cleaning up ticks: {str(cleanup_error)}") - # Continue with existing ticks - - # Get ticks from specified time range if provided - if start_time_ms is not None or end_time_ms is not None: - logger.debug(f"Filtering ticks for time range from {start_time_ms} to {end_time_ms}") - try: - filtered_ticks = self.get_ticks_from_time(start_time_ms, end_time_ms) - if not filtered_ticks: - logger.warning("No ticks in the specified time range") - return pd.DataFrame() - df = pd.DataFrame(filtered_ticks) - except Exception as filter_error: - logger.error(f"Error filtering ticks by time: {str(filter_error)}") - return pd.DataFrame() + return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) else: - # Use all available ticks + return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) + + # Ensure ticks are up to date + try: + self._cleanup() + except Exception as cleanup_error: + logger.error(f"Error cleaning up ticks: {str(cleanup_error)}") + + df = pd.DataFrame(ticks) + if df.empty: + logger.warning("Tick DataFrame is empty after filtering/conversion") + return pd.DataFrame() + + logger.info(f"Preparing to create candles from {len(df)} ticks with {interval_seconds}s interval") + + # First, ensure all required columns exist + required_columns = ['timestamp', 'price', 'volume'] + for col in required_columns: + if col not in df.columns: + logger.error(f"Required column '{col}' missing from tick data") + return pd.DataFrame() + + # Make sure DataFrame has no duplicated timestamps before setting index + try: + if 'timestamp' in df.columns: + # Check for duplicate timestamps + duplicate_count = df['timestamp'].duplicated().sum() + if duplicate_count > 0: + logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping the last occurrence") + # Keep the last occurrence of each timestamp + df = df.drop_duplicates(subset='timestamp', keep='last') + + # Convert timestamp to datetime if it's not already + if not pd.api.types.is_datetime64_any_dtype(df['timestamp']): + logger.debug("Converting timestamp to datetime") + # Try multiple approaches to convert timestamps try: - df = self.get_ticks_as_df() - except Exception as df_error: - logger.error(f"Error getting ticks as DataFrame: {str(df_error)}") - return pd.DataFrame() - - if df.empty: - logger.warning("Tick DataFrame is empty after filtering/conversion") - return pd.DataFrame() - - logger.info(f"Preparing to create candles from {len(df)} ticks with {interval_seconds}s interval") - - # First, ensure all required columns exist - required_columns = ['timestamp', 'price', 'volume'] - for col in required_columns: - if col not in df.columns: - logger.error(f"Required column '{col}' missing from tick data") - return pd.DataFrame() - - # Make sure DataFrame has no duplicated timestamps before setting index - try: - if 'timestamp' in df.columns: - # Check for duplicate timestamps - duplicate_count = df['timestamp'].duplicated().sum() - if duplicate_count > 0: - logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping the last occurrence") - # Keep the last occurrence of each timestamp - df = df.drop_duplicates(subset='timestamp', keep='last') - - # Use timestamp column for resampling - df = df.set_index('timestamp') - - # Make sure index is sorted (required for resampling) - if not df.index.is_monotonic_increasing: - logger.warning("Timestamp index is not monotonically increasing, sorting...") - df = df.sort_index() - except Exception as prep_error: - logger.error(f"Error preprocessing DataFrame for resampling: {str(prep_error)}") - import traceback - logger.error(traceback.format_exc()) - return pd.DataFrame() - - # Create interval string for resampling - use 's' instead of deprecated 'S' - interval_str = f'{interval_seconds}s' - - # Resample to create OHLCV candles with multiple fallback options - logger.debug(f"Resampling with interval: {interval_str}") - - candles = None - - # First attempt - individual column resampling - try: - # Check that price column exists and has enough data - if 'price' not in df.columns: - raise ValueError("Price column missing from DataFrame") - - if len(df) < 2: - logger.warning("Not enough data points for resampling, using direct data") - # For single data point, create a single candle - if len(df) == 1: - price_val = df['price'].iloc[0] - volume_val = df['volume'].iloc[0] if 'volume' in df.columns else 0 - timestamp_val = df.index[0] - - candles = pd.DataFrame({ - 'timestamp': [timestamp_val], - 'open': [price_val], - 'high': [price_val], - 'low': [price_val], - 'close': [price_val], - 'volume': [volume_val] - }) - return candles + # First, try to convert from milliseconds (integer timestamps) + if pd.api.types.is_integer_dtype(df['timestamp']): + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') else: - # No data - return pd.DataFrame() - - # Resample and aggregate each column separately - open_df = df['price'].resample(interval_str).first() - high_df = df['price'].resample(interval_str).max() - low_df = df['price'].resample(interval_str).min() - close_df = df['price'].resample(interval_str).last() - volume_df = df['volume'].resample(interval_str).sum() - - # Check for length mismatches before combining - expected_length = len(open_df) - if (len(high_df) != expected_length or - len(low_df) != expected_length or - len(close_df) != expected_length or - len(volume_df) != expected_length): - logger.warning("Length mismatch in resampled columns, falling back to alternative method") - raise ValueError("Length mismatch") - - # Combine into a single DataFrame - candles = pd.DataFrame({ - 'open': open_df, - 'high': high_df, - 'low': low_df, - 'close': close_df, - 'volume': volume_df - }) - logger.debug(f"Successfully created {len(candles)} candles with individual column resampling") - except Exception as resample_error: - logger.error(f"Error in individual column resampling: {str(resample_error)}") - - # Second attempt - built-in agg method - try: - logger.debug("Trying fallback resampling method with agg()") - candles = df.resample(interval_str).agg({ - 'price': ['first', 'max', 'min', 'last'], - 'volume': 'sum' - }) - # Flatten MultiIndex columns - candles.columns = ['open', 'high', 'low', 'close', 'volume'] - logger.debug(f"Successfully created {len(candles)} candles with agg() method") - except Exception as agg_error: - logger.error(f"Error in agg() resampling: {str(agg_error)}") - - # Third attempt - manual candle construction + # Otherwise try standard conversion + df['timestamp'] = pd.to_datetime(df['timestamp']) + except Exception as conv_error: + logger.error(f"Error converting timestamps to datetime: {str(conv_error)}") + # Try a fallback approach try: - logger.debug("Trying manual candle construction method") - resampler = df.resample(interval_str) - candle_data = [] - - for name, group in resampler: - if not group.empty: - candle = { - 'timestamp': name, - 'open': group['price'].iloc[0], - 'high': group['price'].max(), - 'low': group['price'].min(), - 'close': group['price'].iloc[-1], - 'volume': group['volume'].sum() if 'volume' in group.columns else 0 - } - candle_data.append(candle) - - if candle_data: - candles = pd.DataFrame(candle_data) - logger.debug(f"Successfully created {len(candles)} candles with manual method") - else: - logger.warning("No candles created with manual method") - return pd.DataFrame() - except Exception as manual_error: - logger.error(f"Error in manual candle construction: {str(manual_error)}") - import traceback - logger.error(traceback.format_exc()) + # Fallback for integer timestamps + if df['timestamp'].iloc[0] > 1000000000000: # Check if milliseconds timestamp + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') + else: # Otherwise assume seconds + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') + except Exception as fallback_error: + logger.error(f"Fallback timestamp conversion failed: {str(fallback_error)}") return pd.DataFrame() - - # Ensure the result isn't empty - if candles is None or candles.empty: - logger.warning("No candles were created after all resampling attempts") - return pd.DataFrame() - # Reset index to get timestamp as column - try: - candles = candles.reset_index() - except Exception as reset_error: - logger.error(f"Error resetting index: {str(reset_error)}") - # Try to create a new DataFrame with the timestamp index as a column - try: - timestamp_col = candles.index.to_list() - candles_dict = candles.to_dict('list') - candles_dict['timestamp'] = timestamp_col - candles = pd.DataFrame(candles_dict) - except Exception as fallback_error: - logger.error(f"Error in fallback index reset: {str(fallback_error)}") - return pd.DataFrame() - - # Ensure no NaN values - try: - nan_count_before = candles.isna().sum().sum() - if nan_count_before > 0: - logger.warning(f"Found {nan_count_before} NaN values in candles, dropping them") - - candles = candles.dropna() - except Exception as nan_error: - logger.error(f"Error handling NaN values: {str(nan_error)}") - # Try to fill NaN values instead of dropping - try: - candles = candles.fillna(method='ffill').fillna(method='bfill') - except: - pass - - logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks") - return candles - - except Exception as e: - logger.error(f"Unhandled error in candle formation: {str(e)}") + # Use timestamp column for resampling + df = df.set_index('timestamp') + except Exception as prep_error: + logger.error(f"Error preprocessing DataFrame for resampling: {str(prep_error)}") import traceback logger.error(traceback.format_exc()) return pd.DataFrame() + + # Create interval string for resampling - use 's' instead of deprecated 'S' + interval_str = f'{interval_seconds}s' + + # Resample to create OHLCV candles with multiple fallback options + logger.debug(f"Resampling with interval: {interval_str}") + + candles = None + + # First attempt - individual column resampling + try: + # Check that price column exists and has enough data + if 'price' not in df.columns: + raise ValueError("Price column missing from DataFrame") + + if len(df) < 2: + logger.warning("Not enough data points for resampling, using direct data") + # For single data point, create a single candle + if len(df) == 1: + price_val = df['price'].iloc[0] + volume_val = df['volume'].iloc[0] if 'volume' in df.columns else 0 + timestamp_val = df.index[0] + + candles = pd.DataFrame({ + 'timestamp': [timestamp_val], + 'open': [price_val], + 'high': [price_val], + 'low': [price_val], + 'close': [price_val], + 'volume': [volume_val] + }) + return candles + else: + # No data + return pd.DataFrame() + + # Resample and aggregate each column separately + open_df = df['price'].resample(interval_str).first() + high_df = df['price'].resample(interval_str).max() + low_df = df['price'].resample(interval_str).min() + close_df = df['price'].resample(interval_str).last() + volume_df = df['volume'].resample(interval_str).sum() + + # Check for length mismatches before combining + expected_length = len(open_df) + if (len(high_df) != expected_length or + len(low_df) != expected_length or + len(close_df) != expected_length or + len(volume_df) != expected_length): + logger.warning("Length mismatch in resampled columns, falling back to alternative method") + raise ValueError("Length mismatch") + + # Combine into a single DataFrame + candles = pd.DataFrame({ + 'open': open_df, + 'high': high_df, + 'low': low_df, + 'close': close_df, + 'volume': volume_df + }) + logger.debug(f"Successfully created {len(candles)} candles with individual column resampling") + except Exception as resample_error: + logger.error(f"Error in individual column resampling: {str(resample_error)}") + + # Second attempt - built-in agg method + try: + logger.debug("Trying fallback resampling method with agg()") + candles = df.resample(interval_str).agg({ + 'price': ['first', 'max', 'min', 'last'], + 'volume': 'sum' + }) + # Flatten MultiIndex columns + candles.columns = ['open', 'high', 'low', 'close', 'volume'] + logger.debug(f"Successfully created {len(candles)} candles with agg() method") + except Exception as agg_error: + logger.error(f"Error in agg() resampling: {str(agg_error)}") + + # Third attempt - manual candle construction + try: + logger.debug("Trying manual candle construction method") + resampler = df.resample(interval_str) + candle_data = [] + + for name, group in resampler: + if not group.empty: + candle = { + 'timestamp': name, + 'open': group['price'].iloc[0], + 'high': group['price'].max(), + 'low': group['price'].min(), + 'close': group['price'].iloc[-1], + 'volume': group['volume'].sum() if 'volume' in group.columns else 0 + } + candle_data.append(candle) + + if candle_data: + candles = pd.DataFrame(candle_data) + logger.debug(f"Successfully created {len(candles)} candles with manual method") + else: + logger.warning("No candles created with manual method") + return pd.DataFrame() + except Exception as manual_error: + logger.error(f"Error in manual candle construction: {str(manual_error)}") + import traceback + logger.error(traceback.format_exc()) + return pd.DataFrame() + + # Ensure the result isn't empty + if candles is None or candles.empty: + logger.warning("No candles were created after all resampling attempts") + return pd.DataFrame() + + # Reset index to get timestamp as column + try: + candles = candles.reset_index() + except Exception as reset_error: + logger.error(f"Error resetting index: {str(reset_error)}") + # Try to create a new DataFrame with the timestamp index as a column + try: + timestamp_col = candles.index.to_list() + candles_dict = candles.to_dict('list') + candles_dict['timestamp'] = timestamp_col + candles = pd.DataFrame(candles_dict) + except Exception as fallback_error: + logger.error(f"Error in fallback index reset: {str(fallback_error)}") + return pd.DataFrame() + + # Ensure no NaN values + try: + nan_count_before = candles.isna().sum().sum() + if nan_count_before > 0: + logger.warning(f"Found {nan_count_before} NaN values in candles, dropping them") + + candles = candles.dropna() + except Exception as nan_error: + logger.error(f"Error handling NaN values: {str(nan_error)}") + # Try to fill NaN values instead of dropping + try: + candles = candles.fillna(method='ffill').fillna(method='bfill') + except: + pass + + logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks") + return candles def get_candle_stats(self) -> Dict: """Get statistics about cached candles for different intervals""" @@ -1272,202 +1350,161 @@ class CandleCache: return pd.DataFrame() class RealTimeChart: - """Real-time chart using Dash and Plotly with WebSocket data feed""" + """Real-time chart using Dash and Plotly""" - def __init__(self, symbol: str): - """Initialize the chart with necessary components""" + def __init__(self, symbol="BTC/USDT", use_sample_data=False, log_no_ticks_warning=True): + """Initialize a new RealTimeChart + + Args: + symbol: Trading pair symbol (e.g., BTC/USDT) + use_sample_data: Whether to use sample data when no real data is available + log_no_ticks_warning: Whether to log warnings when no ticks are available + """ self.symbol = symbol - # Create a multi-page Dash app instead of a simple Dash app - self.app = dash.Dash(__name__, - suppress_callback_exceptions=True, - meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]) - self.candlestick_data = CandlestickData() - self.tick_storage = TradeTickStorage(symbol=symbol, max_age_seconds=3600) # Store 1 hour of ticks - self.ohlcv_cache = { # Cache for different intervals + self.use_sample_data = use_sample_data + + # Initialize variables for trading info display + self.current_signal = 'HOLD' + self.signal_time = datetime.now() + self.current_position = 0.0 + self.session_balance = 100.0 # Start with $100 balance + self.session_pnl = 0.0 + + # Initialize NN signals and trades lists + self.nn_signals = [] + self.trades = [] + + # Use existing timezone variable instead of trying to detect again + logger.info(f"Using timezone: {tz_name}") + + # Initialize tick storage + logger.info(f"Initializing RealTimeChart for {symbol}") + self.tick_storage = TradeTickStorage( + symbol=symbol, + max_age_seconds=3600, # Keep ticks for 1 hour + use_sample_data=use_sample_data, + log_no_ticks_warning=log_no_ticks_warning + ) + + # Initialize candlestick data for backward compatibility + self.candlestick_data = CandlestickData(max_length=5000) + + # Initialize candle cache + self.candle_cache = CandleCache(max_candles=5000) + + # Initialize OHLCV cache dictionaries for different timeframes + self.ohlcv_cache = { '1s': None, + '5s': None, + '15s': None, + '60s': None, + '300s': None, + '900s': None, + '3600s': None, '1m': None, + '5m': None, + '15m': None, '1h': None, '1d': None } - self.candle_cache = CandleCache(max_candles=5000) # Increase max candles to 5000 for better history - self.historical_data = BinanceHistoricalData() # For fetching historical data - self.last_cache_save_time = time.time() # Track last time we saved cache to disk - self.first_render = True # Flag to track first render - # Storage for NN signals - self.nn_signals = [] + # Historical data handler + self.historical_data = BinanceHistoricalData() - logger.info(f"Initializing RealTimeChart for {symbol}") + # Flag for first render to force data loading + self.first_render = True - # Load historical data for longer timeframes at startup - self._load_historical_data() + # Last time candles were saved to disk + self.last_cache_save_time = time.time() - # Setup the multi-page layout + # Initialize Dash app + self.app = dash.Dash( + __name__, + external_stylesheets=[dbc.themes.DARKLY], + suppress_callback_exceptions=True, + meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}] + ) + + # Set up layout and callbacks self._setup_app_layout() - - # Save the loaded cache immediately to ensure we have the data saved - self._save_candles_to_disk(force=True) def _setup_app_layout(self): - # Button style + """Set up the app layout and callbacks""" + # Define styling for interval buttons button_style = { - 'background-color': '#4CAF50', + 'backgroundColor': '#2C2C2C', 'color': 'white', + 'border': 'none', 'padding': '10px 15px', 'margin': '5px', - 'border': 'none', - 'border-radius': '5px', - 'font-size': '14px', + 'borderRadius': '5px', 'cursor': 'pointer', - 'transition': 'background-color 0.3s', - 'font-weight': 'bold' + 'fontWeight': 'bold' } active_button_style = { **button_style, - 'background-color': '#2E7D32', - 'box-shadow': '0 0 5px #2E7D32' + 'backgroundColor': '#4CAF50', + 'boxShadow': '0 2px 4px rgba(0,0,0,0.5)' } - nav_button_style = { - 'background-color': '#3f51b5', - 'color': 'white', - 'padding': '10px 15px', - 'margin': '5px', - 'border': 'none', - 'border-radius': '5px', - 'font-size': '14px', - 'cursor': 'pointer', - 'font-weight': 'bold' - } - - # Content div for the current page - content_div = html.Div(id='page-content') + # Create tab layout + self.app.layout = dbc.Tabs([ + dbc.Tab(self._get_chart_layout(button_style, active_button_style), label="Chart", tab_id="chart-tab"), + # No longer need ticks tab as it's causing errors + ], id="tabs") - # Initialize the layout with navigation - self.app.layout = html.Div([ - # Header with symbol and title - html.Div([ - html.H1(f"{self.symbol} Real-Time Data", style={ - 'textAlign': 'center', - 'color': '#FFFFFF', - 'fontFamily': 'Arial, sans-serif', - 'margin': '10px', - 'textShadow': '2px 2px 4px #000000' - }), - - # Navigation bar - html.Div([ - dcc.Link( - html.Button('Price Chart', style=nav_button_style), - href=f'/{self.symbol.replace("/", "-")}/chart', - id='chart-link' - ), - dcc.Link( - html.Button('Raw Ticks', style=nav_button_style), - href=f'/{self.symbol.replace("/", "-")}/ticks', - id='ticks-link' - ), - ], style={ - 'display': 'flex', - 'justifyContent': 'center', - 'margin': '10px' - }), - ], style={ - 'backgroundColor': '#1E1E1E', - 'padding': '10px', - 'borderRadius': '5px', - 'marginBottom': '10px', - 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' - }), - - # URL Bar - dcc.Location(id='url', refresh=False), - - # Content div will be populated based on the URL - content_div, - - # Interval component for periodic updates - dcc.Interval( - id='interval-component', - interval=500, # Update every 500ms for smoother display - n_intervals=0 - ) - ], style={ - 'backgroundColor': '#121212', - 'padding': '20px', - 'minHeight': '100vh', - 'fontFamily': 'Arial, sans-serif' - }) - - # Register URL callback to update page content - @self.app.callback( - Output('page-content', 'children'), - [Input('url', 'pathname')] - ) - def display_page(pathname): - if pathname is None: - pathname = f'/{self.symbol.replace("/", "-")}/chart' - - symbol_path = self.symbol.replace("/", "-") - - if pathname == f'/{symbol_path}/chart' or pathname == f'/' or pathname == f'/{symbol_path}': - return self._get_chart_layout(button_style, active_button_style) - elif pathname == f'/{symbol_path}/ticks': - return self._get_ticks_layout() - else: - return html.Div([ - html.H1('404 - Page Not Found', style={'textAlign': 'center', 'color': 'white'}) - ]) - - # Register callback to update the interval selection from button clicks + # Set up callbacks self._setup_interval_callback(button_style, active_button_style) - - # Register callback to update the chart data self._setup_chart_callback() - - # Register callback to update the ticks data - self._setup_ticks_callback() + # We've removed the ticks callback, so don't call it + # self._setup_ticks_callback() def _get_chart_layout(self, button_style, active_button_style): # Chart page layout return html.Div([ - # Interval selection buttons + # Chart title and interval buttons html.Div([ - html.Div("Candlestick Interval:", style={ - 'color': '#FFFFFF', - 'marginRight': '10px', - 'fontSize': '16px', - 'fontWeight': 'bold' + html.H2(f"{self.symbol} Real-Time Chart", style={ + 'textAlign': 'center', + 'color': '#FFFFFF', + 'marginBottom': '10px' }), - html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style), - html.Button('5s', id='btn-5s', n_clicks=0, style=button_style), - html.Button('15s', id='btn-15s', n_clicks=0, style=button_style), - html.Button('30s', id='btn-30s', n_clicks=0, style=button_style), - html.Button('1m', id='btn-1m', n_clicks=0, style=button_style), - ], style={ - 'display': 'flex', - 'alignItems': 'center', - 'justifyContent': 'center', - 'margin': '15px', - 'backgroundColor': '#2C2C2C', - 'padding': '10px', - 'borderRadius': '5px' - }), - - # Store for current interval - dcc.Store(id='interval-store', data={'interval': 1}), - - # Main chart - dcc.Graph( - id='live-chart', - style={ - 'height': '180vh', - 'border': '1px solid #444444', - 'borderRadius': '5px', - 'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)' - } - ), + + # Store interval data + dcc.Store(id='interval-store', data={'interval': 1}), + + # Interval selection buttons + html.Div([ + html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style), + html.Button('5s', id='btn-5s', n_clicks=0, style=button_style), + html.Button('15s', id='btn-15s', n_clicks=0, style=button_style), + html.Button('30s', id='btn-30s', n_clicks=0, style=button_style), + html.Button('1m', id='btn-1m', n_clicks=0, style=button_style), + ], style={ + 'display': 'flex', + 'justifyContent': 'center', + 'marginBottom': '15px' + }), + + # Interval component for updates - set to refresh every 500ms + dcc.Interval( + id='interval-component', + interval=500, # Refresh every 500ms for better real-time updates + n_intervals=0 + ), + + # Main chart + dcc.Graph(id='live-chart', style={'height': '75vh'}), + + # Chart acknowledgment + html.Div("Real-time trading chart with ML signals", style={ + 'textAlign': 'center', + 'color': '#AAAAAA', + 'fontSize': '12px', + 'marginTop': '5px' + }) + ]) ]) def _get_ticks_layout(self): @@ -1622,392 +1659,111 @@ class RealTimeChart: def update_chart(n, interval_data): try: interval = interval_data.get('interval', 1) - logger.debug(f"Updating chart for {self.symbol} with interval {interval}s") + logger.debug(f"Updating chart with interval {interval}") - # First render flag - used to force cache loading - is_first_render = self.first_render - if is_first_render: - logger.info("First render - ensuring cached data is loaded") - self.first_render = False - - # Check if we have cached data for the requested interval - cached_candles = None - if interval == 1 and self.ohlcv_cache['1s'] is not None and not self.ohlcv_cache['1s'].empty: - cached_candles = self.ohlcv_cache['1s'] - logger.info(f"We have {len(cached_candles)} cached 1s candles available") - elif interval == 60 and self.ohlcv_cache['1m'] is not None and not self.ohlcv_cache['1m'].empty: - cached_candles = self.ohlcv_cache['1m'] - logger.info(f"We have {len(cached_candles)} cached 1m candles available") - - # Get candlesticks from tick storage for real-time data + # Get candlesticks data try: df = self.tick_storage.get_candles(interval_seconds=interval) - logger.debug(f"Got {len(df) if not df.empty else 0} candles from tick storage") - except Exception as candle_error: - logger.error(f"Error getting candles from tick storage: {str(candle_error)}") - logger.error(f"Using empty DataFrame to avoid chart restart") + if df.empty and self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m'] is not None: + df = self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m'] + except Exception as e: + logger.error(f"Error getting candles: {str(e)}") df = pd.DataFrame() - # If we don't have real-time data yet, use cached data - if df.empty and cached_candles is not None and not cached_candles.empty: - df = cached_candles - logger.info(f"Using {len(df)} cached candles for main chart") + # Create basic layout + fig = make_subplots( + rows=2, cols=1, + vertical_spacing=0.08, + subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume'), + row_heights=[0.8, 0.2] + ) - # Get current price and stats using our enhanced methods - try: - current_price = self.tick_storage.get_latest_price() - price_stats = self.tick_storage.get_price_stats() - time_stats = self.tick_storage.get_time_based_stats() - candle_stats = self.tick_storage.get_candle_stats() - except Exception as stats_error: - logger.error(f"Error getting market stats: {str(stats_error)}") - # Provide fallback values to avoid chart restart - current_price = None - price_stats = {'count': 0, 'min': None, 'max': None, 'latest': None, 'age_seconds': 0} - time_stats = {'total_ticks': 0, 'periods': {}} - candle_stats = {} - - # Periodically save candles to disk - if n % 60 == 0 or is_first_render: # Every 60 chart updates or on first render - try: - self._save_candles_to_disk() - except Exception as save_error: - logger.error(f"Error saving candles: {str(save_error)}") - - logger.debug(f"Current price: {current_price}, Stats: {price_stats}") - - # Create subplot layout - don't include 1s since that's the main chart - try: - fig = make_subplots( - rows=5, cols=1, - vertical_spacing=0.05, - subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1m OHLCV', '1h OHLCV', '1d OHLCV'), - row_heights=[0.2, 0.2, 0.2, 0.2, 0.2] + # Add candlestick chart + if not df.empty and 'open' in df.columns: + # Candlestick chart + fig.add_trace( + go.Candlestick( + x=df.index, + open=df['open'], + high=df['high'], + low=df['low'], + close=df['close'], + name='OHLC' + ), + row=1, col=1 ) - except Exception as subplot_error: - logger.error(f"Error creating subplot layout: {str(subplot_error)}") - # Create a simpler layout if the complex one fails - fig = make_subplots( - rows=2, cols=1, - vertical_spacing=0.05, - subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume'), - row_heights=[0.7, 0.3] + + # Calculate y-axis range with padding + low_min = df['low'].min() + high_max = df['high'].max() + price_range = high_max - low_min + y_min = low_min - (price_range * 0.05) # 5% padding below + y_max = high_max + (price_range * 0.05) # 5% padding above + + # Set y-axis range to ensure candles are visible + fig.update_yaxes(range=[y_min, y_max], row=1, col=1) + + # Volume bars + colors = ['rgba(0,255,0,0.7)' if close >= open else 'rgba(255,0,0,0.7)' + for open, close in zip(df['open'], df['close'])] + + fig.add_trace( + go.Bar( + x=df.index, + y=df['volume'], + name='Volume', + marker_color=colors + ), + row=2, col=1 ) - - # Process and add main chart data - try: - # Main price chart - if not df.empty: - fig.add_trace( - go.Candlestick( - x=df.index, - open=df['open'], - high=df['high'], - low=df['low'], - close=df['close'], - name='OHLC' - ), - row=1, col=1 - ) - - # Volume chart - use red/green colors based on price movement - colors = ['rgba(0,255,0,0.7)' if close >= open else 'rgba(255,0,0,0.7)' - for open, close in zip(df['open'], df['close'])] - - fig.add_trace( - go.Bar( - x=df.index, - y=df['volume'], - name='Volume', - marker_color=colors - ), - row=2, col=1 - ) - except Exception as render_error: - logger.error(f"Error rendering main chart data: {str(render_error)}") - - # Add 1-minute candles in a separate subplot - try: - candles_1m = self.candle_cache.get_candles(timeframe='1m') - if candles_1m is not None and not candles_1m.empty: - logger.debug(f"Rendering 1m chart with {len(candles_1m)} candles") - # Candlestick chart for 1m - fig.add_trace( - go.Candlestick( - x=candles_1m.index, - open=candles_1m['open'], - high=candles_1m['high'], - low=candles_1m['low'], - close=candles_1m['close'], - name='1m' - ), - row=3, col=1 - ) - - # Volume bars for 1m - inline at the bottom of the chart - # Determine colors based on price movement - colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' - for open, close in zip(candles_1m['open'], candles_1m['close'])] - - # Add volume as bars at the bottom of the same subplot - # Scale down the volume to fit in the same chart - if 'volume' in candles_1m.columns: - # Get volume to 15% of the chart height - volume_data = candles_1m['volume'] - if len(volume_data) > 0: - y_range = max(candles_1m['high']) - min(candles_1m['low']) - if y_range > 0: - scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 - scaled_volume = volume_data * scale_factor - min_price = min(candles_1m['low']) - - fig.add_trace( - go.Bar( - x=candles_1m.index, - y=scaled_volume, - marker_color=colors, - name='1m Volume', - opacity=0.5, - yaxis='y3', - base=min(candles_1m['low']) - (y_range * 0.02), # Slightly below price - offset=0 - ), - row=3, col=1 - ) - - # Add a note about the scaled volume - fig.add_annotation( - x=candles_1m.index[0], - y=min_price, - text=f"Volume (scaled)", - showarrow=False, - font=dict(size=10, color="gray"), - xanchor="left", - yanchor="bottom", - row=3, col=1 - ) - except Exception as render_1m_error: - logger.error(f"Error rendering 1m chart: {str(render_1m_error)}") - - # Add 1-hour candles in a separate subplot - try: - candles_1h = self.candle_cache.get_candles(timeframe='1h') - if candles_1h is not None and not candles_1h.empty: - logger.debug(f"Rendering 1h chart with {len(candles_1h)} candles") - # Candlestick chart for 1h - fig.add_trace( - go.Candlestick( - x=candles_1h.index, - open=candles_1h['open'], - high=candles_1h['high'], - low=candles_1h['low'], - close=candles_1h['close'], - name='1h' - ), - row=4, col=1 - ) - - # Volume bars for 1h - inline at the bottom of the chart - # Determine colors based on price movement - colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' - for open, close in zip(candles_1h['open'], candles_1h['close'])] - - # Add volume as bars at the bottom of the same subplot - if 'volume' in candles_1h.columns: - # Get volume to 15% of the chart height - volume_data = candles_1h['volume'] - if len(volume_data) > 0: - y_range = max(candles_1h['high']) - min(candles_1h['low']) - if y_range > 0: - scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 - scaled_volume = volume_data * scale_factor - min_price = min(candles_1h['low']) - - fig.add_trace( - go.Bar( - x=candles_1h.index, - y=scaled_volume, - marker_color=colors, - name='1h Volume', - opacity=0.5, - yaxis='y4', - base=min(candles_1h['low']) - (y_range * 0.02), - offset=0 - ), - row=4, col=1 - ) - - # Add a note about the scaled volume - fig.add_annotation( - x=candles_1h.index[0], - y=min_price, - text=f"Volume (scaled)", - showarrow=False, - font=dict(size=10, color="gray"), - xanchor="left", - yanchor="bottom", - row=4, col=1 - ) - except Exception as render_1h_error: - logger.error(f"Error rendering 1h chart: {str(render_1h_error)}") - - # Add 1-day candles in a separate subplot - try: - candles_1d = self.candle_cache.get_candles(timeframe='1d') - if candles_1d is not None and not candles_1d.empty: - logger.debug(f"Rendering 1d chart with {len(candles_1d)} candles") - # Candlestick chart for 1d - fig.add_trace( - go.Candlestick( - x=candles_1d.index, - open=candles_1d['open'], - high=candles_1d['high'], - low=candles_1d['low'], - close=candles_1d['close'], - name='1d' - ), - row=5, col=1 - ) - - # Volume bars for 1d - inline at the bottom of the chart - # Determine colors based on price movement - colors = ['rgba(0,255,0,0.5)' if close >= open else 'rgba(255,0,0,0.5)' - for open, close in zip(candles_1d['open'], candles_1d['close'])] - - # Add volume as bars at the bottom of the same subplot - if 'volume' in candles_1d.columns: - # Get volume to 15% of the chart height - volume_data = candles_1d['volume'] - if len(volume_data) > 0: - y_range = max(candles_1d['high']) - min(candles_1d['low']) - if y_range > 0: - scale_factor = 0.15 * y_range / max(volume_data) if max(volume_data) > 0 else 1 - scaled_volume = volume_data * scale_factor - min_price = min(candles_1d['low']) - - fig.add_trace( - go.Bar( - x=candles_1d.index, - y=scaled_volume, - marker_color=colors, - name='1d Volume', - opacity=0.5, - yaxis='y5', - base=min(candles_1d['low']) - (y_range * 0.02), - offset=0 - ), - row=5, col=1 - ) - - # Add a note about the scaled volume - fig.add_annotation( - x=candles_1d.index[0], - y=min_price, - text=f"Volume (scaled)", - showarrow=False, - font=dict(size=10, color="gray"), - xanchor="left", - yanchor="bottom", - row=5, col=1 - ) - except Exception as render_1d_error: - logger.error(f"Error rendering 1d chart: {str(render_1d_error)}") - + + # Add trading info annotation if available + if hasattr(self, 'current_signal') and self.current_signal: + signal_color = "#33DD33" if self.current_signal == "BUY" else "#FF4444" if self.current_signal == "SELL" else "#BBBBBB" + + # Format position value + position_text = f"{self.current_position:.4f}" if self.current_position < 0.01 else f"{self.current_position:.2f}" + + # Create trading info text + info_text = ( + f"Signal: {self.current_signal} | " + f"Position: {position_text} | " + f"Balance: ${self.session_balance:.2f} | " + f"PnL: {self.session_pnl:.4f}" + ) + + # Add annotation + fig.add_annotation( + x=0.5, y=1.05, + xref="paper", yref="paper", + text=info_text, + showarrow=False, + font=dict(size=14, color="white"), + bgcolor="rgba(50,50,50,0.6)", + borderwidth=1, + borderpad=6, + align="center" + ) + # Update layout - try: - interval_text = { - 1: "1 second", - 5: "5 seconds", - 15: "15 seconds", - 30: "30 seconds", - 60: "1 minute" - }.get(interval, f"{interval}s") - - fig.update_layout( - title_text=f"{self.symbol} Real-Time Data ({interval_text})", - title_x=0.5, # Center the title - xaxis_rangeslider_visible=False, - height=1800, # Increased height for taller display - template='plotly_dark', - paper_bgcolor='rgba(0,0,0,0)', - plot_bgcolor='rgba(25,25,50,1)', - font=dict(family="Arial, sans-serif", size=12, color="white"), - showlegend=True, - legend=dict( - yanchor="top", - y=0.99, - xanchor="left", - x=0.01 - ) - ) - - # Update axes styling - fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') - fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') - - # Add neural network signals as annotations if available - if hasattr(self, 'nn_signals') and self.nn_signals: - for signal in self.nn_signals: - try: - # Skip HOLD signals for clarity - if signal['type'] == 'HOLD': - continue - - # Check if this signal's timestamp is within the visible range - signal_ts = signal['timestamp'] - - # Only add annotations for signals in the visible time range - if df is not None and not df.empty: - if signal_ts < df.index.min() or signal_ts > df.index.max(): - continue - - # Set color and symbol based on signal type - if signal['type'] == 'BUY': - color = 'green' - symbol = '▲' # Up triangle - y_position = df['low'].min() * 0.98 # Below the candles - else: # SELL - color = 'red' - symbol = '▼' # Down triangle - y_position = df['high'].max() * 1.02 # Above the candles - - # Add probability if available - text = f"{signal['type']}" - if signal['probability'] is not None: - text += f" ({signal['probability']:.2f})" - - # Add annotation - fig.add_annotation( - x=signal_ts, - y=y_position, - text=text, - showarrow=True, - arrowhead=2, - arrowsize=1, - arrowwidth=2, - arrowcolor=color, - font=dict(size=12, color=color), - bgcolor='rgba(255, 255, 255, 0.8)', - bordercolor=color, - borderwidth=1, - borderpad=4, - row=1, col=1 - ) - except Exception as e: - logger.error(f"Error adding NN signal annotation: {str(e)}") - - return fig - except Exception as layout_error: - logger.error(f"Error updating layout: {str(layout_error)}") - import traceback - logger.error(traceback.format_exc()) - return go.Figure() + fig.update_layout( + title_text=f"{self.symbol} Real-Time Data", + title_x=0.5, + xaxis_rangeslider_visible=False, + height=700, + template='plotly_dark', + paper_bgcolor='rgba(0,0,0,0)', + plot_bgcolor='rgba(25,25,50,1)' + ) + + # Update axes styling + fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') + fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)') + + return fig + except Exception as e: logger.error(f"Error updating chart: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - - # Create a minimal figure with error message fig = go.Figure() fig.add_annotation( x=0.5, y=0.5, @@ -2016,441 +1772,8 @@ class RealTimeChart: font=dict(size=14, color="red"), xref="paper", yref="paper" ) - fig.update_layout( - height=800, - template='plotly_dark', - paper_bgcolor='rgba(0,0,0,0)', - plot_bgcolor='rgba(25,25,50,1)' - ) return fig - def _setup_ticks_callback(self): - # Callbacks to update the ticks data display - - # Callback to update stats cards - @self.app.callback( - Output('tick-stats-cards', 'children'), - [Input('interval-component', 'n_intervals'), - Input('refresh-ticks-btn', 'n_clicks'), - Input('time-window-dropdown', 'value')] - ) - def update_tick_stats(n_intervals, n_clicks, time_window): - # Get time window in seconds - window_seconds = time_window if time_window else 300 - - # Calculate time range for filtering - now = int(time.time() * 1000) - start_time = now - (window_seconds * 1000) - - # Get filtered ticks - filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) - - if not filtered_ticks: - return [html.Div("No tick data available in the selected time window.", - style={'color': 'white', 'textAlign': 'center', 'margin': '20px'})] - - # Calculate stats - tick_count = len(filtered_ticks) - - prices = [tick['price'] for tick in filtered_ticks] - min_price = min(prices) if prices else 0 - max_price = max(prices) if prices else 0 - avg_price = sum(prices) / len(prices) if prices else 0 - price_range = max_price - min_price - - volumes = [tick.get('volume', 0) for tick in filtered_ticks] - total_volume = sum(volumes) - avg_volume = total_volume / len(volumes) if volumes else 0 - - # Calculate additional volume stats - max_volume = max(volumes) if volumes else 0 - min_volume = min(volumes) if volumes else 0 - median_volume = sorted(volumes)[len(volumes)//2] if volumes else 0 - - # Calculate trade value in USDT - trade_values = [p * v for p, v in zip(prices, volumes)] - total_value = sum(trade_values) - avg_trade_value = total_value / len(trade_values) if trade_values else 0 - - first_timestamp = filtered_ticks[0]['timestamp'] - last_timestamp = filtered_ticks[-1]['timestamp'] - time_span_ms = last_timestamp - first_timestamp - time_span_seconds = time_span_ms / 1000 - ticks_per_second = tick_count / time_span_seconds if time_span_seconds > 0 else 0 - - # Create stat cards - card_style = { - 'backgroundColor': '#1E2130', - 'borderRadius': '5px', - 'padding': '15px', - 'marginBottom': '10px', - 'width': '23%', - 'boxShadow': '0 2px 4px rgba(0,0,0,0.2)', - 'color': 'white', - 'display': 'flex', - 'flexDirection': 'column', - 'alignItems': 'center', - 'justifyContent': 'center' - } - - mobile_card_style = { - **card_style, - 'width': '100%', - 'marginBottom': '10px' - } - - value_style = { - 'fontSize': '24px', - 'fontWeight': 'bold', - 'color': '#4CAF50', - 'marginTop': '5px' - } - - label_style = { - 'fontSize': '14px', - 'color': '#AAAAAA' - } - - return [ - html.Div([ - html.Div("Tick Count", style=label_style), - html.Div(f"{tick_count:,}", style=value_style), - html.Div(f"{ticks_per_second:.2f} ticks/sec", style=label_style) - ], style=card_style), - - html.Div([ - html.Div("Price Range", style=label_style), - html.Div(f"{min_price:.2f} - {max_price:.2f}", style=value_style), - html.Div(f"Range: {price_range:.2f}", style=label_style) - ], style=card_style), - - html.Div([ - html.Div("Average Price", style=label_style), - html.Div(f"{avg_price:.2f}", style=value_style), - html.Div("USDT", style=label_style) - ], style=card_style), - - html.Div([ - html.Div("Total Volume", style=label_style), - html.Div(f"{total_volume:.8f}", style=value_style), - html.Div(f"Avg: {avg_volume:.8f}", style=label_style) - ], style=card_style), - - html.Div([ - html.Div("Max Volume", style=label_style), - html.Div(f"{max_volume:.8f}", style=value_style), - html.Div("Median: {:.8f}".format(median_volume), style=label_style) - ], style=card_style), - - html.Div([ - html.Div("Total Value", style=label_style), - html.Div(f"{total_value:.2f}", style=value_style), - html.Div(f"Avg: {avg_trade_value:.2f}", style=label_style) - ], style=card_style) - ] - - # Callback to update ticks table - @self.app.callback( - Output('ticks-table-container', 'children'), - [Input('interval-component', 'n_intervals'), - Input('refresh-ticks-btn', 'n_clicks'), - Input('time-window-dropdown', 'value')] - ) - def update_ticks_table(n_intervals, n_clicks, time_window): - # Get time window in seconds - window_seconds = time_window if time_window else 300 - - # Calculate time range for filtering - now = int(time.time() * 1000) - start_time = now - (window_seconds * 1000) - - # Get filtered ticks - filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) - - if not filtered_ticks: - return html.Div("No tick data available in the selected time window.", - style={'color': 'white', 'textAlign': 'center', 'margin': '20px'}) - - # Convert to readable format - formatted_ticks = [] - for tick in filtered_ticks: - timestamp_dt = datetime.fromtimestamp(tick['timestamp'] / 1000) - formatted_time = timestamp_dt.strftime('%H:%M:%S.%f')[:-3] # Format as HH:MM:SS.mmm - - formatted_ticks.append({ - 'time': formatted_time, - 'price': f"{tick['price']:.2f}", - 'volume': f"{tick['volume']:.8f}", - 'value': f"{tick['price'] * tick['volume']:.2f}" - }) - - # Display only the most recent ticks - display_limit = 100 - limited_ticks = formatted_ticks[-display_limit:] if len(formatted_ticks) > display_limit else formatted_ticks - limited_ticks.reverse() # Show most recent at the top - - # Create table - table_style = { - 'width': '100%', - 'borderCollapse': 'collapse', - 'color': 'white', - 'fontFamily': 'monospace' - } - - header_style = { - 'backgroundColor': '#1A1A1A', - 'fontWeight': 'bold', - 'padding': '8px', - 'textAlign': 'left', - 'borderBottom': '2px solid #444' - } - - cell_style = { - 'padding': '6px', - 'borderBottom': '1px solid #333', - 'textAlign': 'right' - } - - time_cell_style = { - **cell_style, - 'textAlign': 'left' - } - - # Create table header - header = html.Tr([ - html.Th("Time", style=header_style), - html.Th("Price", style=header_style), - html.Th("Volume", style=header_style), - html.Th("Value (USDT)", style=header_style) - ]) - - # Create table rows - rows = [] - for i, tick in enumerate(limited_ticks): - row_style = {'backgroundColor': '#1E1E1E'} if i % 2 == 0 else {'backgroundColor': '#252525'} - - rows.append(html.Tr([ - html.Td(tick['time'], style={**time_cell_style, **row_style}), - html.Td(tick['price'], style={**cell_style, **row_style}), - html.Td(tick['volume'], style={**cell_style, **row_style}), - html.Td(tick['value'], style={**cell_style, **row_style}) - ])) - - return [ - html.Div([ - html.H3(f"Latest {len(limited_ticks)} Ticks (from {len(filtered_ticks)} total)", - style={'color': 'white', 'marginBottom': '10px', 'textAlign': 'center'}), - html.Table([html.Thead(header), html.Tbody(rows)], style=table_style) - ]) - ] - - # Callback to update price chart - @self.app.callback( - Output('tick-price-chart', 'figure'), - [Input('interval-component', 'n_intervals'), - Input('refresh-ticks-btn', 'n_clicks'), - Input('time-window-dropdown', 'value')] - ) - def update_price_chart(n_intervals, n_clicks, time_window): - # Get time window in seconds - window_seconds = time_window if time_window else 300 - - # Calculate time range for filtering - now = int(time.time() * 1000) - start_time = now - (window_seconds * 1000) - - # Get filtered ticks - filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time) - - # Create figure with 2 subplots - price and volume - fig = make_subplots( - rows=2, cols=1, - shared_xaxes=True, - vertical_spacing=0.03, - row_heights=[0.7, 0.3], - subplot_titles=(f"Price Movement (Last {window_seconds // 60} minutes)", "Volume") - ) - - if not filtered_ticks: - fig.add_annotation( - x=0.5, y=0.5, - text="No tick data available in the selected time window.", - showarrow=False, - font=dict(size=14, color="white"), - xref="paper", yref="paper" - ) - else: - # Convert timestamps to datetime for better x-axis display - timestamps = [datetime.fromtimestamp(tick['timestamp'] / 1000) for tick in filtered_ticks] - prices = [tick['price'] for tick in filtered_ticks] - volumes = [tick.get('volume', 0) for tick in filtered_ticks] - - # Add price scatter plot - fig.add_trace( - go.Scatter( - x=timestamps, - y=prices, - mode='lines', - name='Price', - line=dict(color='#4CAF50', width=1.5) - ), - row=1, col=1 - ) - - # Create a volume profile on the right side of the price chart - if len(prices) > 5: # Only create profile if we have enough data - # Group prices into bins - price_min = min(prices) - price_max = max(prices) - # Create approximately 20 bins based on price range - bin_size = max(0.01, (price_max - price_min) / 20) - - # Create a dictionary to hold volume by price level - volume_by_price = {} - - # Group volumes by price bins - for p, v in zip(prices, volumes): - bin_key = round(p / bin_size) * bin_size - if bin_key in volume_by_price: - volume_by_price[bin_key] += v - else: - volume_by_price[bin_key] = v - - # Sort by price level - sorted_bins = sorted(volume_by_price.items()) - profile_prices = [p for p, _ in sorted_bins] - profile_volumes = [v for _, v in sorted_bins] - - # Add separate volume profile trace - fig.add_trace( - go.Bar( - y=profile_prices, - x=profile_volumes, - orientation='h', - name='Volume Profile', - marker=dict( - color=['#33CC33' if p <= latest_price else '#FF4136' for p in profile_prices], - opacity=0.5 - ), - showlegend=True, - hovertemplate='Price: %{y:.2f}
Volume: %{x:.8f}' - ), - row=1, col=1 - ) - - # Add a line marking the latest price - fig.add_shape( - type="line", - y0=latest_price, y1=latest_price, - x0=0, x1=max(profile_volumes) * 1.1, - line=dict(color="yellow", width=1, dash="dash"), - row=1, col=1 - ) - - # Add volume bars in separate subplot - # Color volume bars green for price increases, red for decreases - if len(timestamps) > 1: - # Compare each price with the previous to determine color - colors = [] - for i in range(len(prices)): - if i == 0: - colors.append('#33CC33') # Default to green for first tick - else: - if prices[i] >= prices[i-1]: - colors.append('#33CC33') # Green for price increase/same - else: - colors.append('#FF4136') # Red for price decrease - else: - colors = ['#33CC33'] # Default green if only one tick - - fig.add_trace( - go.Bar( - x=timestamps, - y=volumes, - name='Volume', - marker=dict(color=colors) - ), - row=2, col=1 - ) - - # Compute stats for annotations - latest_price = prices[-1] if prices else 0 - total_volume = sum(volumes) - max_volume = max(volumes) if volumes else 0 - avg_volume = total_volume / len(volumes) if volumes else 0 - - # Add annotations for latest price - fig.add_annotation( - x=timestamps[-1] if timestamps else 0, - y=latest_price, - text=f"{latest_price:.2f}", - showarrow=True, - arrowhead=1, - arrowsize=1, - arrowwidth=2, - arrowcolor="#4CAF50", - font=dict(size=12, color="#4CAF50"), - xshift=50, - row=1, col=1 - ) - - # Add annotations for volume stats - fig.add_annotation( - x=timestamps[-1] if timestamps else 0, - y=max_volume, - text=f"Max: {max_volume:.8f}", - showarrow=False, - font=dict(size=10, color="rgba(128, 128, 255, 1)"), - xshift=50, - row=2, col=1 - ) - - # Update layout - fig.update_layout( - title_text=f"{self.symbol} Tick Data", - title_x=0.5, - template='plotly_dark', - paper_bgcolor='rgba(0,0,0,0)', - plot_bgcolor='rgba(25,25,50,1)', - height=600, # Increased height for better visualization - margin=dict(l=40, r=40, t=80, b=40), - legend=dict( - orientation="h", - yanchor="bottom", - y=1.02, - xanchor="right", - x=1 - ), - hovermode="x unified" # Show all data points at the same x-coordinate - ) - - # Update x-axis to be shared - fig.update_xaxes( - showgrid=True, - gridwidth=1, - gridcolor='rgba(128,128,128,0.2)', - rangeslider_visible=False - ) - - # Update y-axes - fig.update_yaxes( - title_text="Price (USDT)", - showgrid=True, - gridwidth=1, - gridcolor='rgba(128,128,128,0.2)', - row=1, col=1 - ) - - fig.update_yaxes( - title_text="Volume", - showgrid=True, - gridwidth=1, - gridcolor='rgba(128,128,128,0.2)', - row=2, col=1 - ) - - return fig - def _interval_to_seconds(self, interval_key: str) -> int: """Convert interval key to seconds""" mapping = { @@ -2583,23 +1906,48 @@ class RealTimeChart: await asyncio.sleep(5) def run(self, host='localhost', port=8050): - try: - logger.info(f"Starting Dash server for {self.symbol} on {host}:{port}") - self.app.run(debug=False, host=host, port=port) - except Exception as e: - logger.error(f"Error running Dash server on port {port}: {str(e)}") + """Run the Dash app + + Args: + host: Hostname to run on + port: Port to run on + """ + logger.info(f"Starting Dash app on {host}:{port}") + + # Ensure interval component is created + if not hasattr(self, 'app') or not self.app.layout: + logger.error("App layout not initialized properly") + return - # Try an alternative port if the primary one is in use - if "Address already in use" in str(e): - alt_port = port + 100 - logger.warning(f"Port {port} is busy, trying alternative port {alt_port}") - try: - self.app.run(debug=False, host=host, port=alt_port) - except Exception as alt_e: - logger.error(f"Error running Dash server on alternative port {alt_port}: {str(alt_e)}") - else: - # Re-raise other exceptions - raise + # If interval-component is not in the layout, add it + if 'interval-component' not in str(self.app.layout): + logger.warning("Interval component not found in layout, adding it") + self.app.layout.children.append( + dcc.Interval( + id='interval-component', + interval=500, # 500ms for real-time updates + n_intervals=0 + ) + ) + + # Start websocket connection in a separate thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.websocket_thread = threading.Thread(target=lambda: asyncio.run(self.start_websocket())) + self.websocket_thread.daemon = True + self.websocket_thread.start() + + # Ensure historical data is loaded before starting + self._load_historical_data() + + try: + self.app.run(host=host, port=port, debug=False) + except Exception as e: + logger.error(f"Error running Dash app: {str(e)}") + finally: + # Ensure resources are cleaned up + self._save_candles_to_disk(force=True) + logger.info("Dash app stopped") def _load_historical_data(self): """Load historical data for all timeframes from Binance API and local cache""" @@ -2814,6 +2162,94 @@ class RealTimeChart: self.nn_signals = self.nn_signals[-50:] logger.info(f"Added NN signal: {signal_type} at {timestamp}") + + def add_trade(self, price, timestamp, pnl=None, amount=0.1, action=None, type=None): + """Add a trade to be displayed on the chart + + Args: + price: The price at which the trade was executed + timestamp: The timestamp for the trade + pnl: Optional profit and loss value for the trade + amount: Amount traded + action: The type of trade (BUY or SELL) - alternative to type parameter + type: The type of trade (BUY or SELL) - alternative to action parameter + """ + # Handle both action and type parameters for backward compatibility + trade_type = type or action + + # Default to BUY if trade_type is None or not specified + if trade_type is None: + logger.warning(f"Trade type not specified in add_trade call, defaulting to BUY. Price: {price}, Timestamp: {timestamp}") + trade_type = "BUY" + + if isinstance(trade_type, int): + trade_type = "BUY" if trade_type == 0 else "SELL" + + # Ensure trade_type is uppercase if it's a string + if isinstance(trade_type, str): + trade_type = trade_type.upper() + + if trade_type not in ['BUY', 'SELL']: + logger.warning(f"Invalid trade type: {trade_type} (value type: {type(trade_type).__name__}), defaulting to BUY. Price: {price}, Timestamp: {timestamp}") + trade_type = "BUY" + + # Convert timestamp to datetime if it's not already + if not isinstance(timestamp, datetime): + try: + if isinstance(timestamp, str): + timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) + elif isinstance(timestamp, (int, float)): + timestamp = datetime.fromtimestamp(timestamp / 1000.0) + except Exception as e: + logger.error(f"Error converting timestamp for trade: {str(e)}") + timestamp = datetime.now() + + # Initialize trades list if it doesn't exist + if not hasattr(self, 'trades'): + self.trades = [] + + # Add the trade to our list + self.trades.append({ + 'type': trade_type, + 'price': price, + 'timestamp': timestamp, + 'amount': amount, + 'pnl': pnl, + 'added': datetime.now() + }) + + # Only keep the most recent 50 trades + if len(self.trades) > 50: + self.trades = self.trades[-50:] + + logger.info(f"Added trade: {trade_type} {amount} at price {price} at time {timestamp} with PnL: {pnl}") + + def update_trading_info(self, signal=None, position=None, balance=None, pnl=None): + """Update the current trading information to be displayed on the chart + + Args: + signal: Current signal (BUY, SELL, HOLD) + position: Current position size + balance: Current session balance + pnl: Current session PnL + """ + if signal is not None: + if signal in ['BUY', 'SELL', 'HOLD']: + self.current_signal = signal + self.signal_time = datetime.now() + else: + logger.warning(f"Invalid signal type: {signal}") + + if position is not None: + self.current_position = position + + if balance is not None: + self.session_balance = balance + + if pnl is not None: + self.session_pnl = pnl + + logger.debug(f"Updated trading info: Signal={self.current_signal}, Position={self.current_position}, Balance=${self.session_balance:.2f}, PnL={self.session_pnl:.4f}") async def main(): global charts # Make charts globally accessible for NN integration diff --git a/train_rl_with_realtime.py b/train_rl_with_realtime.py index 0fa52a3..ecb8945 100644 --- a/train_rl_with_realtime.py +++ b/train_rl_with_realtime.py @@ -107,6 +107,12 @@ class RLTrainingIntegrator: self.trade_count = 0 self.win_count = 0 + # Add session-wide PnL tracking + self.session_pnl = 0.0 + self.session_trades = 0 + self.session_wins = 0 + self.session_balance = 100.0 # Start with $100 balance + # Track current position state self.in_position = False self.entry_price = None @@ -151,7 +157,28 @@ class RLTrainingIntegrator: # Calculate profit if we have entry data pnl = None if self.entry_price is not None: - pnl = (price - self.entry_price) / self.entry_price + # Calculate percentage change + pnl_pct = (price - self.entry_price) / self.entry_price + + # Cap extreme PnL values to more realistic levels (-90% to +100%) + pnl_pct = max(min(pnl_pct, 1.0), -0.9) + + # Apply to current balance + trade_amount = self.session_balance * 0.1 # Use 10% of balance per trade + trade_profit = trade_amount * pnl_pct + self.session_balance += trade_profit + + # Ensure session balance doesn't go below $1 + self.session_balance = max(self.session_balance, 1.0) + + # For normalized display in charts and logs + pnl = pnl_pct + + # Update session-wide PnL + self.session_pnl += pnl + self.session_trades += 1 + if pnl > 0: + self.session_wins += 1 # Log the complete trade on the chart if self.chart: @@ -162,10 +189,12 @@ class RLTrainingIntegrator: # Record the trade with PnL if hasattr(self.chart, 'add_trade'): self.chart.add_trade( - action=action_str, price=price, timestamp=timestamp, - pnl=pnl + pnl=pnl, + amount=0.1, + action=action_str, + type=action_str # Add explicit type ) # Update trade counts @@ -185,6 +214,21 @@ class RLTrainingIntegrator: 'reward': reward, 'timestamp': timestamp.isoformat() }) + else: + # Hold action + action_str = "HOLD" + timestamp = datetime.now() + + # Update chart trading info + if self.chart and hasattr(self.chart, 'update_trading_info'): + # Determine current position size (0.1 if in position, 0 if not) + position_size = 0.1 if self.in_position else 0.0 + self.chart.update_trading_info( + signal=action_str, + position=position_size, + balance=self.session_balance, + pnl=self.session_pnl + ) # Track reward for all actions (including hold) self.reward_history.append(reward) @@ -205,6 +249,23 @@ class RLTrainingIntegrator: logger.info(f" Win rate: {info['win_rate']:.4f}") logger.info(f" Trades: {info['trades']}") + # Log session-wide PnL + session_win_rate = self.session_wins / self.session_trades if self.session_trades > 0 else 0 + logger.info(f" Session Balance: ${self.session_balance:.2f}") + logger.info(f" Session Total PnL: {self.session_pnl:.4f}") + logger.info(f" Session Win Rate: {session_win_rate:.4f}") + logger.info(f" Session Trades: {self.session_trades}") + + # Update chart trading info with final episode information + if self.chart and hasattr(self.chart, 'update_trading_info'): + # Reset position since we're between episodes + self.chart.update_trading_info( + signal="HOLD", + position=0.0, + balance=self.session_balance, + pnl=self.session_pnl + ) + # Reset position state for new episode self.in_position = False self.entry_price = None @@ -441,8 +502,8 @@ async def start_realtime_chart(symbol="BTC/USDT", port=8050): try: logger.info(f"Initializing RealTimeChart for {symbol}") - # Create the chart - chart = RealTimeChart(symbol) + # Create the chart with sample data enabled and no-ticks warnings disabled + chart = RealTimeChart(symbol, use_sample_data=True, log_no_ticks_warning=False) # Start the WebSocket connection in a separate thread # The _start_websocket_thread method already handles this correctly @@ -466,10 +527,18 @@ async def start_realtime_chart(symbol="BTC/USDT", port=8050): logger.error(traceback.format_exc()) raise -def run_training_thread(chart, num_episodes=5000, max_steps=2000): - """Run the RL training in a separate thread""" +def run_training_thread(chart): + """Start the RL training in a separate thread""" integrator = RLTrainingIntegrator(chart) - thread = Thread(target=lambda: integrator.start_training(num_episodes, max_steps)) + + def training_thread_func(): + try: + # Use a small number of episodes to test termination handling + integrator.start_training(num_episodes=2, max_steps=500) + except Exception as e: + logger.error(f"Error in training thread: {str(e)}") + + thread = threading.Thread(target=training_thread_func) thread.daemon = True thread.start() logger.info("Started RL training thread") @@ -490,8 +559,17 @@ def test_signals(chart): # Add a test SELL signal chart.add_nn_signal("SELL", datetime.now(), 0.85) - # Add a test trade - chart.add_trade("BUY", 50000.0, datetime.now(), 0.05) + # Add a test trade if the method exists + if hasattr(chart, 'add_trade'): + chart.add_trade( + price=83000.0, + timestamp=datetime.now(), + pnl=0.05, + action="BUY", + type="BUY" # Add explicit type + ) + else: + logger.warning("RealTimeChart has no add_trade method - skipping test trade") async def main(): """Main function that coordinates the realtime chart and RL training""" @@ -520,6 +598,18 @@ async def main(): except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: + # Log final PnL summary + if hasattr(integrator, 'session_pnl'): + session_win_rate = integrator.session_wins / integrator.session_trades if integrator.session_trades > 0 else 0 + logger.info("=" * 50) + logger.info("FINAL SESSION SUMMARY") + logger.info("=" * 50) + logger.info(f"Final Session Balance: ${integrator.session_balance:.2f}") + logger.info(f"Total Session PnL: {integrator.session_pnl:.4f}") + logger.info(f"Total Session Win Rate: {session_win_rate:.4f} ({integrator.session_wins}/{integrator.session_trades})") + logger.info(f"Total Session Trades: {integrator.session_trades}") + logger.info("=" * 50) + # Clean up if realtime_websocket_task: realtime_websocket_task.cancel()