From 240d2b78775f99f114158e8e8eb1e73f26c65716 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 28 Jul 2025 08:35:08 +0300 Subject: [PATCH] stats, standartized data provider --- core/orchestrator.py | 23 +++- core/standardized_data_provider.py | 202 +++++++++++++++++++++++++---- web/clean_dashboard.py | 53 +++----- 3 files changed, 220 insertions(+), 58 deletions(-) diff --git a/core/orchestrator.py b/core/orchestrator.py index a0543ce..e109168 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -2139,11 +2139,25 @@ class TradingOrchestrator: outcome_status = "✅ CORRECT" if was_correct else "❌ INCORRECT" + # Get model statistics for enhanced logging + model_stats = self.get_model_statistics(model_name) + current_loss = model_stats.current_loss if model_stats else None + best_loss = model_stats.best_loss if model_stats else None + avg_loss = model_stats.average_loss if model_stats else None + + # Enhanced logging with detailed information logger.info(f"Completed immediate training for {model_name} - {outcome_status}") - logger.info(f" Prediction: {predicted_action} ({predicted_confidence:.3f})") + logger.info(f" Prediction: {predicted_action} (confidence: {predicted_confidence:.3f})") logger.info(f" {price_outcome}") + logger.info(f" Reward: {reward:.4f} | Time: {time_diff_seconds:.1f}s") + logger.info(f" Loss: {current_loss:.4f} | Best: {best_loss:.4f} | Avg: {avg_loss:.4f}") logger.info(f" Outcome: {outcome_status}") + # Add performance summary + if model_name in self.model_performance: + perf = self.model_performance[model_name] + logger.info(f" Performance: {perf['accuracy']:.1%} ({perf['correct']}/{perf['total']})") + except Exception as e: logger.error(f"Error in immediate training for {model_name}: {e}") @@ -2238,6 +2252,13 @@ class TradingOrchestrator: f"{price_prediction_stats['accurate']}/{price_prediction_stats['total']} " f"({price_prediction_stats['avg_error']:.2f}% avg error)") + # Enhanced logging for training evaluation + logger.info(f"Training evaluation for {model_name}:") + logger.info(f" Action: {predicted_action} | Confidence: {prediction_confidence:.3f}") + logger.info(f" Price change: {price_change_pct:+.3f}% | Time: {time_diff_seconds:.1f}s") + logger.info(f" Reward: {reward:.4f} | Correct: {was_correct}") + logger.info(f" Accuracy: {self.model_performance[model_name]['accuracy']:.1%} ({self.model_performance[model_name]['correct']}/{self.model_performance[model_name]['total']})") + # Train the specific model based on sophisticated outcome await self._train_model_on_outcome(record, was_correct, price_change_pct, reward) diff --git a/core/standardized_data_provider.py b/core/standardized_data_provider.py index ca0c452..497cbbb 100644 --- a/core/standardized_data_provider.py +++ b/core/standardized_data_provider.py @@ -60,6 +60,13 @@ class StandardizedDataProvider(DataProvider): from datetime import timedelta self.live_price_cache_ttl = timedelta(milliseconds=500) + # Initialize WebSocket cache for dashboard compatibility + if not hasattr(self, 'ws_price_cache'): + self.ws_price_cache: Dict[str, float] = {} + + # Initialize orchestrator reference (for dashboard compatibility) + self.orchestrator = None + # COB provider integration self.cob_provider: Optional[MultiExchangeCOBProvider] = None self._initialize_cob_provider() @@ -488,32 +495,177 @@ class StandardizedDataProvider(DataProvider): return [] def get_live_price_from_api(self, symbol: str) -> Optional[float]: - """FORCE fetch live price from Binance API for low-latency updates""" - # Check cache first to avoid excessive API calls - if symbol in self.live_price_cache: - price, timestamp = self.live_price_cache[symbol] - if datetime.now() - timestamp < self.live_price_cache_ttl: - return price - + """ROBUST live price fetching with comprehensive fallbacks""" try: - import requests - binance_symbol = symbol.replace('/', '') - url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}" - response = requests.get(url, timeout=0.5) # Use a short timeout for low latency - response.raise_for_status() - data = response.json() - price = float(data['price']) + # 1. Check cache first to avoid excessive API calls + if symbol in self.live_price_cache: + price, timestamp = self.live_price_cache[symbol] + if datetime.now() - timestamp < self.live_price_cache_ttl: + logger.debug(f"Using cached price for {symbol}: ${price:.2f}") + return price - # Update cache and current prices - self.live_price_cache[symbol] = (price, datetime.now()) - self.current_prices[symbol] = price + # 2. Try direct Binance API call + try: + import requests + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}" + response = requests.get(url, timeout=0.5) # Use a short timeout for low latency + response.raise_for_status() + data = response.json() + price = float(data['price']) + + # Update cache and current prices + self.live_price_cache[symbol] = (price, datetime.now()) + self.current_prices[symbol] = price + + logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}") + return price + except requests.exceptions.RequestException as e: + logger.warning(f"Failed to get live price for {symbol} from API: {e}") + except Exception as e: + logger.warning(f"Unexpected error in API call for {symbol}: {e}") + + # 3. Fallback to current prices from parent + if hasattr(self, 'current_prices') and symbol in self.current_prices: + price = self.current_prices[symbol] + if price and price > 0: + logger.debug(f"Using current price for {symbol}: ${price:.2f}") + return price + + # 4. Try parent's get_current_price method + if hasattr(self, 'get_current_price'): + try: + price = self.get_current_price(symbol) + if price and price > 0: + self.current_prices[symbol] = price + logger.debug(f"Got current price for {symbol} from parent: ${price:.2f}") + return price + except Exception as e: + logger.debug(f"Parent get_current_price failed for {symbol}: {e}") + + # 5. Try historical data from multiple timeframes + for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability + try: + df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True) + if df is not None and not df.empty: + price = float(df['close'].iloc[-1]) + if price > 0: + self.current_prices[symbol] = price + logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}") + return price + except Exception as tf_error: + logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}") + continue + + # 6. Try WebSocket cache if available + ws_symbol = symbol.replace('/', '') + if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache: + price = self.ws_price_cache[ws_symbol] + if price and price > 0: + logger.debug(f"Using WebSocket cache for {symbol}: ${price:.2f}") + return price + + # 7. Try to get from orchestrator if available (for dashboard compatibility) + if hasattr(self, 'orchestrator') and self.orchestrator: + try: + if hasattr(self.orchestrator, 'data_provider'): + price = self.orchestrator.data_provider.get_current_price(symbol) + if price and price > 0: + self.current_prices[symbol] = price + logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}") + return price + except Exception as orch_error: + logger.debug(f"Failed to get price from orchestrator: {orch_error}") + + # 8. Last resort: try external API with longer timeout + try: + import requests + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}" + response = requests.get(url, timeout=2) # Longer timeout for last resort + if response.status_code == 200: + data = response.json() + price = float(data['price']) + if price > 0: + self.current_prices[symbol] = price + logger.warning(f"Got current price for {symbol} from external API (last resort): ${price:.2f}") + return price + except Exception as api_error: + logger.debug(f"External API failed: {api_error}") + + logger.warning(f"Could not get current price for {symbol} from any source") - logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}") - return price - except requests.exceptions.RequestException as e: - logger.warning(f"Failed to get live price for {symbol} from API: {e}") - # Fallback to last known current price - return self.current_prices.get(symbol) except Exception as e: - logger.error(f"Unexpected error getting live price for {symbol}: {e}") - return self.current_prices.get(symbol) \ No newline at end of file + logger.error(f"Error getting current price for {symbol}: {e}") + + # Return a fallback price if we have any cached data + if hasattr(self, 'current_prices') and symbol in self.current_prices and self.current_prices[symbol] > 0: + return self.current_prices[symbol] + + # Return None instead of hardcoded fallbacks - let the caller handle missing data + return None + + def get_current_price(self, symbol: str) -> Optional[float]: + """Get current price with robust fallbacks - enhanced version""" + try: + # 1. Try live price API first (our enhanced method) + price = self.get_live_price_from_api(symbol) + if price and price > 0: + return price + + # 2. Try parent's get_current_price method + if hasattr(super(), 'get_current_price'): + try: + price = super().get_current_price(symbol) + if price and price > 0: + return price + except Exception as e: + logger.debug(f"Parent get_current_price failed for {symbol}: {e}") + + # 3. Try current prices cache + if hasattr(self, 'current_prices') and symbol in self.current_prices: + price = self.current_prices[symbol] + if price and price > 0: + return price + + # 4. Try historical data from multiple timeframes + for timeframe in ['1m', '5m', '1h']: + try: + df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True) + if df is not None and not df.empty: + price = float(df['close'].iloc[-1]) + if price > 0: + self.current_prices[symbol] = price + return price + except Exception as tf_error: + logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}") + continue + + # 5. Try WebSocket cache if available + ws_symbol = symbol.replace('/', '') + if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache: + price = self.ws_price_cache[ws_symbol] + if price and price > 0: + return price + + logger.warning(f"Could not get current price for {symbol} from any source") + return None + + except Exception as e: + logger.error(f"Error getting current price for {symbol}: {e}") + return None + + def update_ws_price_cache(self, symbol: str, price: float): + """Update WebSocket price cache for dashboard compatibility""" + try: + ws_symbol = symbol.replace('/', '') + self.ws_price_cache[ws_symbol] = price + # Also update current prices for consistency + self.current_prices[symbol] = price + logger.debug(f"Updated WS cache for {symbol}: ${price:.2f}") + except Exception as e: + logger.error(f"Error updating WS cache for {symbol}: {e}") + + def set_orchestrator(self, orchestrator): + """Set orchestrator reference for dashboard compatibility""" + self.orchestrator = orchestrator \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 14c476a..3f59bda 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -1076,7 +1076,7 @@ class CleanTradingDashboard: return [html.I(className="fas fa-save me-1"), "Store All Models"] def _get_current_price(self, symbol: str) -> Optional[float]: - """Get current price for symbol - ENHANCED with better fallbacks""" + """Get current price for symbol - ONLY using our data providers""" try: # Try WebSocket cache first ws_symbol = symbol.replace('/', '') @@ -1099,6 +1099,16 @@ class CleanTradingDashboard: except Exception as dp_error: logger.debug(f"Data provider get_current_price failed: {dp_error}") + # Try data provider get_live_price_from_api method (our standardized method) + if hasattr(self.data_provider, 'get_live_price_from_api'): + try: + price = self.data_provider.get_live_price_from_api(symbol) + if price and price > 0: + self.current_prices[symbol] = price + return price + except Exception as live_error: + logger.debug(f"Data provider get_live_price_from_api failed: {live_error}") + # Fallback to dashboard current prices if symbol in self.current_prices and self.current_prices[symbol] > 0: return self.current_prices[symbol] @@ -1127,34 +1137,18 @@ class CleanTradingDashboard: self.current_prices[symbol] = price logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}") return price + + # Try orchestrator's live price method + if hasattr(self.orchestrator.data_provider, 'get_live_price_from_api'): + price = self.orchestrator.data_provider.get_live_price_from_api(symbol) + if price and price > 0: + self.current_prices[symbol] = price + logger.debug(f"Got current price for {symbol} from orchestrator live API: ${price:.2f}") + return price except Exception as orch_error: logger.debug(f"Failed to get price from orchestrator: {orch_error}") - # Try external API as last resort - try: - import requests - if symbol == 'ETH/USDT': - response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2) - if response.status_code == 200: - data = response.json() - price = float(data['price']) - if price > 0: - self.current_prices[symbol] = price - logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}") - return price - elif symbol == 'BTC/USDT': - response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2) - if response.status_code == 200: - data = response.json() - price = float(data['price']) - if price > 0: - self.current_prices[symbol] = price - logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}") - return price - except Exception as api_error: - logger.debug(f"External API failed: {api_error}") - - logger.warning(f"Could not get current price for {symbol} from any source") + logger.warning(f"Could not get current price for {symbol} from any data provider source") except Exception as e: logger.error(f"Error getting current price for {symbol}: {e}") @@ -1163,12 +1157,7 @@ class CleanTradingDashboard: if symbol in self.current_prices and self.current_prices[symbol] > 0: return self.current_prices[symbol] - # Return a reasonable fallback based on current market conditions - if symbol == 'ETH/USDT': - return 3385.0 # Current market price fallback - elif symbol == 'BTC/USDT': - return 119500.0 # Current market price fallback - + # Return None instead of hardcoded fallbacks - let the UI handle missing data return None def _create_price_chart(self, symbol: str) -> go.Figure: