diff --git a/web/dashboard.py b/web/dashboard.py index dad7df5..253c959 100644 --- a/web/dashboard.py +++ b/web/dashboard.py @@ -1204,20 +1204,25 @@ class TradingDashboard: # OPTIMIZED CHART - Using new optimized version with trade caching if is_chart_update: try: - if hasattr(self, '_cached_chart_data_time'): - cache_time = self._cached_chart_data_time - if time.time() - cache_time < 3: # Use cached chart if < 3s old for faster updates - price_chart = getattr(self, '_cached_price_chart', None) - else: - price_chart = self._create_price_chart(symbol) - self._cached_price_chart = price_chart - self._cached_chart_data_time = time.time() - else: - price_chart = self._create_price_chart(symbol) + # Always try to create fresh chart for real-time updates + # Only use cache as emergency fallback + price_chart = self._create_price_chart(symbol) + + # Cache the successful chart for emergency fallback + if price_chart is not None: self._cached_price_chart = price_chart self._cached_chart_data_time = time.time() + else: + # If chart creation failed, try cached version + if hasattr(self, '_cached_price_chart'): + price_chart = self._cached_price_chart + logger.debug("Using cached chart due to creation failure") + else: + price_chart = self._create_empty_chart("Chart Loading", "Initializing chart data...") + except Exception as e: logger.debug(f"Chart error: {e}") + # Try cached chart first, then empty chart price_chart = getattr(self, '_cached_price_chart', self._create_empty_chart("Chart Error", "Chart temporarily unavailable")) else: @@ -1554,27 +1559,41 @@ class TradingDashboard: def _create_price_chart(self, symbol: str) -> go.Figure: """Create price chart with volume and Williams pivot points from cached data""" try: - # For Williams Market Structure, we need 1s data for proper recursive analysis - # Get 4 hours (240 minutes) of 1m data for better trade visibility - df_1s = None - df_1m = None + # Try to get real-time WebSocket data first for best performance (1-second updates) + ws_df = self.get_realtime_tick_data(symbol, limit=2000) - # Try to get 1s data first for Williams analysis (reduced to 10 minutes for performance) - try: - df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=600, refresh=False) - if df_1s is None or df_1s.empty: - logger.warning("[CHART] No 1s cached data available, trying fresh 1s data") - df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=300, refresh=True) - - if df_1s is not None and not df_1s.empty: - # Aggregate 1s data to 1m for chart display (cleaner visualization) - df = self._aggregate_1s_to_1m(df_1s) - actual_timeframe = '1s→1m' - else: - df_1s = None - except Exception as e: - logger.warning(f"[CHART] Error getting 1s data: {e}") + if ws_df is not None and not ws_df.empty and len(ws_df) >= 10: + # Use WebSocket data (ultra-fast, real-time streaming) + df = ws_df + df_1s = ws_df # Use for Williams analysis too + actual_timeframe = 'WS-1s' + logger.debug(f"[CHART] Using WebSocket real-time data: {len(df)} ticks") + else: + # Fallback to traditional data provider approach + # For Williams Market Structure, we need 1s data for proper recursive analysis + # Get 4 hours (240 minutes) of 1m data for better trade visibility df_1s = None + df_1m = None + + if ws_df is not None: + logger.debug(f"[CHART] WebSocket data insufficient ({len(ws_df) if not ws_df.empty else 0} rows), falling back to data provider") + + # Try to get 1s data first for Williams analysis (reduced to 10 minutes for performance) + try: + df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=600, refresh=False) + if df_1s is None or df_1s.empty: + logger.warning("[CHART] No 1s cached data available, trying fresh 1s data") + df_1s = self.data_provider.get_historical_data(symbol, '1s', limit=300, refresh=True) + + if df_1s is not None and not df_1s.empty: + # Aggregate 1s data to 1m for chart display (cleaner visualization) + df = self._aggregate_1s_to_1m(df_1s) + actual_timeframe = '1s→1m' + else: + df_1s = None + except Exception as e: + logger.warning(f"[CHART] Error getting 1s data: {e}") + df_1s = None # Fallback to 1m data if 1s not available (4 hours for historical trades) if df_1s is None: @@ -1591,6 +1610,26 @@ class TradingDashboard: if 'volume' not in df.columns: df['volume'] = 100 # Default volume for demo actual_timeframe = '1m' + + # Hybrid approach: If we have some WebSocket data, append it to historical data + if ws_df is not None and not ws_df.empty: + try: + # Combine historical 1m data with recent WebSocket ticks + ws_df_resampled = ws_df.resample('1min').agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }).dropna() + + if not ws_df_resampled.empty: + # Merge the datasets - WebSocket data is more recent + df = pd.concat([df, ws_df_resampled]).drop_duplicates().sort_index() + actual_timeframe = '1m+WS' + logger.debug(f"[CHART] Hybrid mode: {len(df)} total bars (historical + WebSocket)") + except Exception as hybrid_error: + logger.debug(f"[CHART] Hybrid combination failed: {hybrid_error}") else: return self._create_empty_chart( f"{symbol} Chart", @@ -1606,6 +1645,26 @@ class TradingDashboard: # Ensure timezone consistency for cached data df = self._ensure_timezone_consistency(df) actual_timeframe = '1m' + + # Hybrid approach: If we have some WebSocket data, append it to cached data too + if ws_df is not None and not ws_df.empty: + try: + # Combine cached 1m data with recent WebSocket ticks + ws_df_resampled = ws_df.resample('1min').agg({ + 'open': 'first', + 'high': 'max', + 'low': 'min', + 'close': 'last', + 'volume': 'sum' + }).dropna() + + if not ws_df_resampled.empty: + # Merge the datasets - WebSocket data is more recent + df = pd.concat([df, ws_df_resampled]).drop_duplicates().sort_index() + actual_timeframe = '1m+WS' + logger.debug(f"[CHART] Hybrid mode: {len(df)} total bars (cached + WebSocket)") + except Exception as hybrid_error: + logger.debug(f"[CHART] Hybrid combination failed: {hybrid_error}") # Final check: ensure we have valid data with proper index if df is None or df.empty: @@ -1627,7 +1686,7 @@ class TradingDashboard: df.index = pd.date_range(start=pd.Timestamp.now() - pd.Timedelta(minutes=len(df)), periods=len(df), freq='1min') - # Create subplot with secondary y-axis for volume + # Create subplot with secondary y-axis for volume and JavaScript data management fig = make_subplots( rows=2, cols=1, shared_xaxes=True, @@ -1636,6 +1695,16 @@ class TradingDashboard: row_heights=[0.7, 0.3] ) + # Add JavaScript for client-side data management and real-time updates + fig.add_annotation( + text="", + xref="paper", yref="paper", + x=0, y=0, + showarrow=False, + font=dict(size=1), + opacity=0 + ) + # Add price line chart (main chart) fig.add_trace( go.Scatter( @@ -3481,12 +3550,16 @@ class TradingDashboard: } def _start_lightweight_websocket(self): - """Start ultra-lightweight WebSocket for real-time price updates only""" + """Start enhanced WebSocket for real-time price and tick data streaming""" try: if self.is_streaming: logger.warning("[WS] WebSocket already running") return + # Initialize tick cache for chart updates + self.tick_cache = [] + self.max_tick_cache = 2000 # Keep last 2000 1-second ticks for chart + # ETH/USDT primary symbol for scalping symbol = "ethusdt" @@ -3498,23 +3571,58 @@ class TradingDashboard: def on_message(ws, message): try: data = json.loads(message) - # Extract only current price - ultra minimal processing + current_time = time.time() + + # Extract price data for ultra-fast updates if 'c' in data: # Current price from ticker price = float(data['c']) + # Update price cache (no history, just current) self.ws_price_cache['ETHUSDT'] = price self.current_prices['ETHUSDT'] = price + # Create tick data point for chart + tick = { + 'timestamp': current_time, + 'datetime': datetime.now(self.timezone), + 'symbol': 'ETHUSDT', + 'price': price, + 'open': float(data.get('o', price)), + 'high': float(data.get('h', price)), + 'low': float(data.get('l', price)), + 'close': price, + 'volume': float(data.get('v', 0)), + 'count': int(data.get('c', 1)) + } + + # Thread-safe tick cache management + try: + # Add to tick cache (thread-safe append) + self.tick_cache.append(tick) + + # Maintain cache size for performance - use slicing for thread safety + if len(self.tick_cache) > self.max_tick_cache: + # Keep the most recent data, remove oldest + excess = len(self.tick_cache) - self.max_tick_cache + self.tick_cache = self.tick_cache[excess:] + + except Exception as cache_error: + logger.warning(f"[WS] Cache management error: {cache_error}") + # Reinitialize cache if corrupted + self.tick_cache = [tick] if tick else [] + # Performance tracking - current_time = time.time() self.last_ws_update = current_time self.ws_update_count += 1 # Log every 100 updates for monitoring if self.ws_update_count % 100 == 0: - logger.debug(f"[WS] {self.ws_update_count} price updates, latest: ${price:.2f}") + cache_size = len(self.tick_cache) if hasattr(self, 'tick_cache') else 0 + logger.debug(f"[WS] {self.ws_update_count} updates, cache: {cache_size} ticks, latest: ${price:.2f}") + except Exception as e: logger.warning(f"[WS] Error processing message: {e}") + # Continue processing, don't break the stream def on_error(ws, error): logger.error(f"[WS] Error: {error}") @@ -3529,7 +3637,7 @@ class TradingDashboard: self._start_lightweight_websocket() def on_open(ws): - logger.info(f"[WS] Connected for real-time ETHUSDT price updates") + logger.info(f"[WS] Connected for real-time ETHUSDT streaming with tick cache") self.is_streaming = True # Binance WebSocket for ticker (price only, not trades) @@ -3554,7 +3662,7 @@ class TradingDashboard: self.ws_thread = threading.Thread(target=ws_worker, daemon=True) self.ws_thread.start() - logger.info("[WS] Lightweight WebSocket started for real-time price updates") + logger.info("[WS] Enhanced WebSocket started for real-time tick streaming") except Exception as e: logger.error(f"[WS] Failed to start: {e}") @@ -3584,6 +3692,61 @@ class TradingDashboard: logger.warning(f"[WS] Error getting realtime price: {e}") return None + def get_realtime_tick_data(self, symbol: str, limit: int = 2000) -> pd.DataFrame: + """Get real-time tick data from WebSocket cache for chart updates""" + try: + if not hasattr(self, 'tick_cache') or not self.tick_cache: + logger.debug(f"[WS] No tick cache available for {symbol}") + return None + + # Filter by symbol and convert to DataFrame + symbol_ticks = [tick for tick in self.tick_cache if tick.get('symbol') == symbol.replace('/', '')] + + if not symbol_ticks: + logger.debug(f"[WS] No ticks found for symbol {symbol} in cache of {len(self.tick_cache)} items") + return None + + # Ensure we have enough data points for a meaningful chart + if len(symbol_ticks) < 10: + logger.debug(f"[WS] Only {len(symbol_ticks)} ticks available for {symbol}, need more data") + return None + + # Take the most recent ticks + recent_ticks = symbol_ticks[-limit:] if len(symbol_ticks) > limit else symbol_ticks + + # Convert to DataFrame with proper format + df = pd.DataFrame(recent_ticks) + + # Ensure datetime column exists and is valid + if 'datetime' not in df.columns: + logger.warning(f"[WS] No datetime column in tick data for {symbol}") + return None + + df['datetime'] = pd.to_datetime(df['datetime']) + df.set_index('datetime', inplace=True) + + # Ensure required columns exist with proper fallback values + required_columns = ['open', 'high', 'low', 'close', 'volume'] + for col in required_columns: + if col not in df.columns: + if col == 'volume': + df[col] = 100 # Default volume + else: + # Use price for OHLC if not available + df[col] = df.get('price', df.get('close', 0)) + + # Validate data integrity + if df.empty or len(df) < 5: + logger.debug(f"[WS] Insufficient data after processing for {symbol}: {len(df)} rows") + return None + + logger.debug(f"[WS] Successfully retrieved {len(df)} ticks for {symbol}") + return df + + except Exception as e: + logger.warning(f"[WS] Error getting tick data for {symbol}: {e}") + return None + def _create_cnn_monitoring_content(self) -> List: """Create CNN monitoring and prediction analysis content""" try: