diff --git a/core/data_provider.py b/core/data_provider.py index 9ec350a..1f8fdca 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -188,7 +188,7 @@ class DataProvider: if self.cache_enabled: cached_data = self._load_from_cache(symbol, timeframe) if cached_data is not None and len(cached_data) >= limit * 0.8: - logger.info(f"Using cached data for {symbol} {timeframe}") + # logger.info(f"Using cached data for {symbol} {timeframe}") return cached_data.tail(limit) # Check if we need to preload 300s of data for first load diff --git a/web/dashboard.py b/web/dashboard.py index 991dc2b..1abdda7 100644 --- a/web/dashboard.py +++ b/web/dashboard.py @@ -290,22 +290,20 @@ class TradingDashboard: self.adaptive_learner = AdaptiveThresholdLearner(initial_threshold=0.30) logger.info("[ADAPTIVE] Adaptive threshold learning enabled - will adjust based on trade outcomes") - # Real-time tick data infrastructure - self.tick_cache = deque(maxlen=54000) # 15 minutes * 60 seconds * 60 ticks/second = 54000 ticks - self.one_second_bars = deque(maxlen=900) # 15 minutes of 1-second bars - self.current_second_data = { - 'timestamp': None, - 'open': None, - 'high': None, - 'low': None, - 'close': None, - 'volume': 0, - 'tick_count': 0 - } + # Lightweight WebSocket implementation for real-time scalping data + self.ws_price_cache = {} # Just current prices, no tick history self.ws_connection = None self.ws_thread = None self.is_streaming = False + # Performance-focused: only track essentials + self.last_ws_update = 0 + self.ws_update_count = 0 + + # Compatibility stubs for removed tick infrastructure + self.tick_cache = [] # Empty list for compatibility + self.one_second_bars = [] # Empty list for compatibility + # Enhanced RL Training System - Train on closed trades with comprehensive data self.rl_training_enabled = True # Force enable Enhanced RL training (bypass import issues) @@ -467,9 +465,9 @@ class TradingDashboard: def _initialize_streaming(self): """Initialize unified data streaming and WebSocket fallback""" try: - # Start WebSocket first (non-blocking) - self._start_websocket_stream() - logger.info("WebSocket streaming initialized") + # Start lightweight WebSocket for real-time price updates + self._start_lightweight_websocket() + logger.info("Lightweight WebSocket streaming initialized") if ENHANCED_RL_AVAILABLE: # Start unified data stream in background @@ -490,8 +488,8 @@ class TradingDashboard: except Exception as e: logger.error(f"Error initializing streaming: {e}") - # Ensure WebSocket is started as fallback - self._start_websocket_stream() + # Ensure lightweight WebSocket is started as fallback + self._start_lightweight_websocket() def _start_enhanced_training_data_collection(self): """Start enhanced training data collection using unified stream""" @@ -990,7 +988,14 @@ class TradingDashboard: ) def update_dashboard(n_intervals): """Update all dashboard components with trading signals""" + start_time = time.time() # Performance monitoring try: + # Periodic cleanup to prevent memory leaks + if n_intervals % 60 == 0: # Every 60 seconds + self._cleanup_old_data() + + # Lightweight update every 10 intervals to reduce load + is_lightweight_update = (n_intervals % 10 != 0) # Get current prices with improved fallback handling symbol = self.config.symbols[0] if self.config.symbols else "ETH/USDT" current_price = None @@ -998,12 +1003,11 @@ class TradingDashboard: data_source = "UNKNOWN" try: - # First try WebSocket current price (lowest latency) - ws_symbol = symbol.replace('/', '') # Convert ETH/USDT to ETHUSDT for WebSocket - if ws_symbol in self.current_prices and self.current_prices[ws_symbol] > 0: - current_price = self.current_prices[ws_symbol] - data_source = "WEBSOCKET" - logger.debug(f"[WS_PRICE] Using WebSocket price for {symbol}: ${current_price:.2f}") + # First try real-time WebSocket price (sub-second latency) + current_price = self.get_realtime_price(symbol) + if current_price: + data_source = "WEBSOCKET_RT" + logger.debug(f"[WS_RT] Using real-time WebSocket price for {symbol}: ${current_price:.2f}") else: # Try cached data first (faster than API calls) cached_data = self.data_provider.get_historical_data(symbol, '1m', limit=1, refresh=False) @@ -1032,30 +1036,30 @@ class TradingDashboard: current_price = None data_source = "ERROR" - # Get chart data - ONLY REAL DATA + # Get chart data - ONLY REAL DATA (optimized for performance) chart_data = None try: - # First try WebSocket 1s bars - chart_data = self.get_one_second_bars(count=50) - if not chart_data.empty: - logger.debug(f"[CHART] Using WebSocket 1s bars: {len(chart_data)} bars") - else: - # Try cached data only - chart_data = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=False) + if not is_lightweight_update: # Only refresh charts every 10 seconds + # Use cached data only (limited to 30 bars for performance) + chart_data = self.data_provider.get_historical_data(symbol, '1m', limit=30, refresh=False) if chart_data is not None and not chart_data.empty: logger.debug(f"[CHART] Using cached 1m data: {len(chart_data)} bars") else: - # NO SYNTHETIC DATA - Wait for real data - logger.warning("[CHART] No real chart data available - waiting for data provider") + # Wait for real data - no synthetic data + logger.debug("[CHART] No chart data available - waiting for data provider") chart_data = None + else: + # Use cached chart data for lightweight updates + chart_data = getattr(self, '_cached_chart_data', None) except Exception as e: logger.warning(f"[CHART_ERROR] Error getting chart data: {e}") chart_data = None - # Generate trading signals based on model decisions - NO FREQUENCY LIMITS + # Generate trading signals based on model decisions - OPTIMIZED try: - if current_price and chart_data is not None and not chart_data.empty and len(chart_data) >= 5: - # Model decides when to act - check every update for signals + # Only generate signals every few intervals to reduce CPU load + if not is_lightweight_update and current_price and chart_data is not None and not chart_data.empty and len(chart_data) >= 5: + # Model decides when to act - check for signals but not every single second signal = self._generate_trading_signal(symbol, current_price, chart_data) if signal: # Add to signals list (all signals, regardless of execution) @@ -1181,12 +1185,20 @@ class TradingDashboard: else: mexc_status = "OFFLINE" - # Create charts with error handling - NO SYNTHETIC DATA + # Create charts with error handling - OPTIMIZED try: - if current_price and chart_data is not None and not chart_data.empty: - price_chart = self._create_price_chart(symbol) + if not is_lightweight_update: # Only recreate chart every 10 seconds + if current_price and chart_data is not None and not chart_data.empty: + price_chart = self._create_price_chart(symbol) + self._cached_chart_data = chart_data # Cache for lightweight updates + self._cached_price_chart = price_chart # Cache chart + else: + price_chart = self._create_empty_chart("Price Chart", "Waiting for real market data...") + self._cached_price_chart = price_chart else: - price_chart = self._create_empty_chart("Price Chart", "Waiting for real market data...") + # Use cached chart for lightweight updates + price_chart = getattr(self, '_cached_price_chart', + self._create_empty_chart("Price Chart", "Loading...")) except Exception as e: logger.warning(f"Price chart error: {e}") price_chart = self._create_empty_chart("Price Chart", "Error loading chart - waiting for data") @@ -1405,18 +1417,17 @@ class TradingDashboard: return fig def _create_price_chart(self, symbol: str) -> go.Figure: - """Create enhanced 1-second price chart with volume and Williams pivot points from WebSocket stream""" + """Create price chart with volume and Williams pivot points from cached data""" try: - # Get 1-second bars from WebSocket stream - df = self.get_one_second_bars(count=300) # Last 5 minutes of 1s bars + # Use cached data from data provider (optimized for performance) + df = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=False) - # If no WebSocket data, fall back to data provider - if df.empty: - logger.warning("[CHART] No WebSocket data, falling back to data provider") + if df is None or df.empty: + logger.warning("[CHART] No cached data available, trying fresh data") try: - df = self.data_provider.get_historical_data(symbol, '1m', limit=50, refresh=True) + df = self.data_provider.get_historical_data(symbol, '1m', limit=30, refresh=True) if df is not None and not df.empty: - # Ensure timezone consistency for fallback data + # Ensure timezone consistency for fresh data df = self._ensure_timezone_consistency(df) # Add volume column if missing if 'volume' not in df.columns: @@ -1424,20 +1435,20 @@ class TradingDashboard: actual_timeframe = '1m' else: return self._create_empty_chart( - f"{symbol} 1s Chart", - f"No data available for {symbol}\nStarting WebSocket stream..." + f"{symbol} Chart", + f"No data available for {symbol}\nWaiting for data provider..." ) except Exception as e: - logger.warning(f"[ERROR] Error getting fallback data: {e}") + logger.warning(f"[ERROR] Error getting fresh data: {e}") return self._create_empty_chart( - f"{symbol} 1s Chart", + f"{symbol} Chart", f"Chart Error: {str(e)}" ) else: - # Ensure timezone consistency for WebSocket data + # Ensure timezone consistency for cached data df = self._ensure_timezone_consistency(df) - actual_timeframe = '1s' - logger.debug(f"[CHART] Using {len(df)} 1s bars from WebSocket stream in {self.timezone}") + actual_timeframe = '1m' + logger.debug(f"[CHART] Using {len(df)} 1m bars from cached data in {self.timezone}") # Create subplot with secondary y-axis for volume fig = make_subplots( @@ -3234,187 +3245,109 @@ class TradingDashboard: 'details': [html.P(f"Error: {str(e)}", className="text-danger")] } - def _start_websocket_stream(self): - """Start WebSocket connection for real-time tick data""" + def _start_lightweight_websocket(self): + """Start ultra-lightweight WebSocket for real-time price updates only""" try: - if not WEBSOCKET_AVAILABLE: - logger.warning("[WEBSOCKET] websocket-client not available. Using data provider fallback.") - self.is_streaming = False + if self.is_streaming: + logger.warning("[WS] WebSocket already running") return - symbol = self.config.symbols[0] if self.config.symbols else "ETHUSDT" + # ETH/USDT primary symbol for scalping + symbol = "ethusdt" + + def ws_worker(): + try: + import websocket + import json + + def on_message(ws, message): + try: + data = json.loads(message) + # Extract only current price - ultra minimal processing + if 'c' in data: # Current price from ticker + price = float(data['c']) + # Update price cache (no history, just current) + self.ws_price_cache['ETHUSDT'] = price + self.current_prices['ETHUSDT'] = price + + # Performance tracking + current_time = time.time() + self.last_ws_update = current_time + self.ws_update_count += 1 + + # Log every 100 updates for monitoring + if self.ws_update_count % 100 == 0: + logger.debug(f"[WS] {self.ws_update_count} price updates, latest: ${price:.2f}") + except Exception as e: + logger.warning(f"[WS] Error processing message: {e}") + + def on_error(ws, error): + logger.error(f"[WS] Error: {error}") + self.is_streaming = False + + def on_close(ws, close_status_code, close_msg): + logger.warning(f"[WS] Connection closed: {close_status_code}") + self.is_streaming = False + # Auto-reconnect after 5 seconds + time.sleep(5) + if not self.is_streaming: + self._start_lightweight_websocket() + + def on_open(ws): + logger.info(f"[WS] Connected for real-time ETHUSDT price updates") + self.is_streaming = True + + # Binance WebSocket for ticker (price only, not trades) + ws_url = f"wss://stream.binance.com:9443/ws/{symbol}@ticker" + + self.ws_connection = websocket.WebSocketApp( + ws_url, + on_message=on_message, + on_error=on_error, + on_close=on_close, + on_open=on_open + ) + + # Run WebSocket (blocking) + self.ws_connection.run_forever() + + except Exception as e: + logger.error(f"[WS] Worker error: {e}") + self.is_streaming = False # Start WebSocket in background thread - self.ws_thread = threading.Thread(target=self._websocket_worker, args=(symbol,), daemon=True) + self.ws_thread = threading.Thread(target=ws_worker, daemon=True) self.ws_thread.start() - logger.info(f"[WEBSOCKET] Starting real-time tick stream for {symbol}") + logger.info("[WS] Lightweight WebSocket started for real-time price updates") except Exception as e: - logger.error(f"Error starting WebSocket stream: {e}") + logger.error(f"[WS] Failed to start: {e}") self.is_streaming = False - - def _websocket_worker(self, symbol: str): - """WebSocket worker thread for continuous tick data streaming""" + + def stop_streaming(self): + """Stop WebSocket streaming""" try: - # Use Binance WebSocket for real-time tick data - ws_url = f"wss://stream.binance.com:9443/ws/{symbol.lower().replace('/', '')}@ticker" - - def on_message(ws, message): - try: - data = json.loads(message) - self._process_tick_data(data) - except Exception as e: - logger.warning(f"Error processing WebSocket message: {e}") - - def on_error(ws, error): - logger.error(f"WebSocket error: {error}") - self.is_streaming = False - - def on_close(ws, close_status_code, close_msg): - logger.warning("WebSocket connection closed") - self.is_streaming = False - # Attempt to reconnect after 5 seconds - time.sleep(5) - if not self.is_streaming: - self._websocket_worker(symbol) - - def on_open(ws): - logger.info("[WEBSOCKET] Connected to Binance stream") - self.is_streaming = True - - # Create WebSocket connection - self.ws_connection = websocket.WebSocketApp( - ws_url, - on_message=on_message, - on_error=on_error, - on_close=on_close, - on_open=on_open - ) - - # Run WebSocket (this blocks) - self.ws_connection.run_forever() - - except Exception as e: - logger.error(f"WebSocket worker error: {e}") self.is_streaming = False - - def _process_tick_data(self, tick_data: Dict): - """Process incoming tick data and update 1-second bars with consistent timezone""" - try: - # Extract price and volume from Binance ticker data - price = float(tick_data.get('c', 0)) # Current price - volume = float(tick_data.get('v', 0)) # 24h volume - # Use configured timezone instead of UTC for consistency - timestamp = self._now_local() - - # Add to tick cache with consistent timezone - tick = { - 'timestamp': timestamp, - 'price': price, - 'volume': volume, - 'bid': float(tick_data.get('b', price)), # Best bid - 'ask': float(tick_data.get('a', price)), # Best ask - 'high_24h': float(tick_data.get('h', price)), - 'low_24h': float(tick_data.get('l', price)) - } - - self.tick_cache.append(tick) - - # Update current second bar using local timezone - current_second = timestamp.replace(microsecond=0) - - if self.current_second_data['timestamp'] != current_second: - # New second - finalize previous bar and start new one - if self.current_second_data['timestamp'] is not None: - self._finalize_second_bar() - - # Start new second bar - self.current_second_data = { - 'timestamp': current_second, - 'open': price, - 'high': price, - 'low': price, - 'close': price, - 'volume': 0, - 'tick_count': 1 - } - else: - # Update current second bar - self.current_second_data['high'] = max(self.current_second_data['high'], price) - self.current_second_data['low'] = min(self.current_second_data['low'], price) - self.current_second_data['close'] = price - self.current_second_data['tick_count'] += 1 - - # Update current price for dashboard - self.current_prices[tick_data.get('s', 'ETHUSDT')] = price - - logger.debug(f"[TICK] Processed tick at {timestamp.strftime('%H:%M:%S')} {self.timezone.zone}: ${price:.2f}") - + if self.ws_connection: + self.ws_connection.close() + logger.info("[WS] Streaming stopped") except Exception as e: - logger.warning(f"Error processing tick data: {e}") - - def _finalize_second_bar(self): - """Finalize the current second bar and add to bars cache""" + logger.error(f"[WS] Error stopping: {e}") + + def get_realtime_price(self, symbol: str) -> float: + """Get real-time price from WebSocket cache (faster than API)""" try: - if self.current_second_data['timestamp'] is not None: - bar = { - 'timestamp': self.current_second_data['timestamp'], - 'open': self.current_second_data['open'], - 'high': self.current_second_data['high'], - 'low': self.current_second_data['low'], - 'close': self.current_second_data['close'], - 'volume': self.current_second_data['volume'], - 'tick_count': self.current_second_data['tick_count'] - } - - self.one_second_bars.append(bar) - - # Log every 10 seconds for monitoring - if len(self.one_second_bars) % 10 == 0: - logger.debug(f"[BARS] Generated {len(self.one_second_bars)} 1s bars, latest: ${bar['close']:.2f}") - + # Try WebSocket cache first (sub-second latency) + ws_price = self.ws_price_cache.get(symbol.replace('/', '')) + if ws_price: + return ws_price + + # Fallback to current_prices (from data provider) + return self.current_prices.get(symbol.replace('/', '')) except Exception as e: - logger.warning(f"Error finalizing second bar: {e}") - - def get_tick_cache_for_training(self, minutes: int = 15) -> List[Dict]: - """Get tick cache data for model training""" - try: - cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=minutes) - recent_ticks = [ - tick for tick in self.tick_cache - if tick['timestamp'] >= cutoff_time - ] - return recent_ticks - except Exception as e: - logger.error(f"Error getting tick cache: {e}") - return [] - - def get_one_second_bars(self, count: int = 300) -> pd.DataFrame: - """Get recent 1-second bars as DataFrame with consistent timezone""" - try: - if len(self.one_second_bars) == 0: - return pd.DataFrame() - - # Get recent bars - recent_bars = list(self.one_second_bars)[-count:] - - # Convert to DataFrame - df = pd.DataFrame(recent_bars) - if not df.empty: - df.set_index('timestamp', inplace=True) - df.sort_index(inplace=True) - - # Ensure timezone consistency - df = self._ensure_timezone_consistency(df) - - logger.debug(f"[BARS] Retrieved {len(df)} 1s bars in {self.timezone.zone}") - - return df - - except Exception as e: - logger.error(f"Error getting 1-second bars: {e}") - return pd.DataFrame() + logger.warning(f"[WS] Error getting realtime price: {e}") + return None def _create_cnn_monitoring_content(self) -> List: """Create CNN monitoring and prediction analysis content""" @@ -3596,33 +3529,77 @@ class TradingDashboard: logger.error(f"Error creating model performance table: {e}") return html.P(f"Error creating performance table: {str(e)}", className="text-danger") + def _cleanup_old_data(self): + """Clean up old data to prevent memory leaks and performance degradation""" + try: + cleanup_start = time.time() + + # Clean up recent decisions - keep only last 100 + if len(self.recent_decisions) > 100: + self.recent_decisions = self.recent_decisions[-100:] + + # Clean up recent signals - keep only last 50 + if len(self.recent_signals) > 50: + self.recent_signals = self.recent_signals[-50:] + + # Clean up session trades - keep only last 200 + if len(self.session_trades) > 200: + self.session_trades = self.session_trades[-200:] + + # Clean up closed trades - keep only last 100 in memory, rest in file + if len(self.closed_trades) > 100: + self.closed_trades = self.closed_trades[-100:] + + # Clean up current prices - remove old symbols not in config + current_symbols = set(self.config.symbols) if self.config.symbols else {'ETHUSDT'} + symbols_to_remove = [] + for symbol in self.current_prices: + if symbol not in current_symbols: + symbols_to_remove.append(symbol) + for symbol in symbols_to_remove: + del self.current_prices[symbol] + + # Clean up RL training queue - keep only last 500 + if len(self.rl_training_queue) > 500: + # Convert to list, slice, then back to deque + old_queue = list(self.rl_training_queue) + self.rl_training_queue.clear() + self.rl_training_queue.extend(old_queue[-500:]) + + # Tick infrastructure removed - no cleanup needed + + cleanup_time = (time.time() - cleanup_start) * 1000 + logger.info(f"[CLEANUP] Data cleanup completed in {cleanup_time:.1f}ms - " + f"Decisions: {len(self.recent_decisions)}, " + f"Signals: {len(self.recent_signals)}, " + f"Trades: {len(self.session_trades)}, " + f"Closed: {len(self.closed_trades)}") + + except Exception as e: + logger.error(f"Error during data cleanup: {e}") + def _create_training_metrics(self) -> List: """Create comprehensive model training metrics display with enhanced RL integration""" try: training_items = [] # Enhanced Training Data Streaming Status - tick_cache_size = len(self.tick_cache) - bars_cache_size = len(self.one_second_bars) + ws_updates = getattr(self, 'ws_update_count', 0) enhanced_data_available = self.training_data_available and self.enhanced_rl_training_enabled training_items.append( html.Div([ html.H6([ html.I(className="fas fa-database me-2 text-info"), - "Enhanced Training Data Stream" + "Real-Time Data & Training Stream" ], className="mb-2"), html.Div([ html.Small([ - html.Strong("Tick Cache: "), - html.Span(f"{tick_cache_size:,} ticks", className="text-success" if tick_cache_size > 1000 else "text-warning") + html.Strong("WebSocket Updates: "), + html.Span(f"{ws_updates:,} price updates", className="text-success" if ws_updates > 100 else "text-warning") ], className="d-block"), html.Small([ - html.Strong("1s Bars: "), - html.Span(f"{bars_cache_size} bars", className="text-success" if bars_cache_size > 100 else "text-warning") - ], className="d-block"), - html.Small([ - html.Strong("Stream: "), + html.Strong("Stream Status: "), html.Span("LIVE" if self.is_streaming else "OFFLINE", className="text-success" if self.is_streaming else "text-danger") ], className="d-block"), @@ -3632,9 +3609,14 @@ class TradingDashboard: className="text-success" if self.enhanced_rl_training_enabled else "text-warning") ], className="d-block"), html.Small([ - html.Strong("Comprehensive Data: "), + html.Strong("Training Data: "), html.Span("AVAILABLE" if enhanced_data_available else "WAITING", className="text-success" if enhanced_data_available else "text-warning") + ], className="d-block"), + html.Small([ + html.Strong("Cached Data: "), + html.Span("READY" if len(self.current_prices) > 0 else "LOADING", + className="text-success" if len(self.current_prices) > 0 else "text-warning") ], className="d-block") ]) ], className="mb-3 p-2 border border-info rounded") @@ -4500,12 +4482,9 @@ class TradingDashboard: logger.warning(f"Error updating training metrics: {e}") def get_tick_cache_for_training(self) -> List[Dict]: - """Get tick cache data for external training systems""" - try: - return list(self.tick_cache) - except Exception as e: - logger.error(f"Error getting tick cache for training: {e}") - return [] + """Get tick cache data for external training systems - removed for performance optimization""" + logger.debug("Tick cache removed for performance - using cached OHLCV data for training instead") + return [] # Empty since we removed tick infrastructure def start_continuous_training(self): """Start continuous training in background thread""" @@ -5243,16 +5222,13 @@ class TradingDashboard: try: # Get data for this timeframe if timeframe == '1s': - # For 1s data, use our tick aggregation - df = self.get_one_second_bars(count=60) # Last 60 seconds - if df.empty: - # Fallback to 1m data - df = self.data_provider.get_historical_data( - symbol=target_symbol, - timeframe='1m', - limit=50, - refresh=False # Use cache to prevent excessive API calls - ) + # For 1s data, fallback directly to 1m data (no tick aggregation) + df = self.data_provider.get_historical_data( + symbol=target_symbol, + timeframe='1m', + limit=50, + refresh=False # Use cache to prevent excessive API calls + ) else: # Get historical data for other timeframes df = self.data_provider.get_historical_data( @@ -5279,17 +5255,15 @@ class TradingDashboard: # BTC: 1 timeframe * 50 = 50 features # Total expected: 200 features (150 ETH + 50 BTC) - # Add BTC 1s data if we're processing ETH (for correlation analysis) + # Add BTC 1m data if we're processing ETH (for correlation analysis) if symbol.startswith('ETH'): try: - btc_1s_df = self.get_one_second_bars(count=60, symbol='BTC/USDT') - if btc_1s_df.empty: - btc_1s_df = self.data_provider.get_historical_data( - symbol='BTC/USDT', - timeframe='1m', - limit=50, - refresh=False # Use cache to prevent excessive API calls - ) + btc_1s_df = self.data_provider.get_historical_data( + symbol='BTC/USDT', + timeframe='1m', + limit=50, + refresh=False # Use cache to prevent excessive API calls + ) if btc_1s_df is not None and not btc_1s_df.empty and len(btc_1s_df) >= 10: btc_features = self._extract_timeframe_features(btc_1s_df, '1s_btc')