diff --git a/core/data_provider.py b/core/data_provider.py index e8ab67d..1ebbd02 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -4931,4 +4931,35 @@ class DataProvider: try: callback(symbol, data) except Exception as e: - logger.error(f"Error in bucketed COB callback: {e}") \ No newline at end of file + logger.error(f"Error in bucketed COB callback: {e}") + + def get_live_price_from_api(self, symbol: str) -> Optional[float]: + """FORCE fetch live price from Binance API for low-latency updates""" + # Check cache first to avoid excessive API calls + if symbol in self.live_price_cache: + price, timestamp = self.live_price_cache[symbol] + if datetime.now() - timestamp < self.live_price_cache_ttl: + return price + + try: + import requests + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}" + response = requests.get(url, timeout=0.5) # Use a short timeout for low latency + response.raise_for_status() + data = response.json() + price = float(data['price']) + + # Update cache and current prices + self.live_price_cache[symbol] = (price, datetime.now()) + self.current_prices[symbol] = price + + logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}") + return price + except requests.exceptions.RequestException as e: + logger.warning(f"Failed to get live price for {symbol} from API: {e}") + # Fallback to last known current price + return self.current_prices.get(symbol) + except Exception as e: + logger.error(f"Unexpected error getting live price for {symbol}: {e}") + return self.current_prices.get(symbol) \ No newline at end of file diff --git a/core/orchestrator.py b/core/orchestrator.py index 8a68fbc..a0543ce 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -1035,70 +1035,17 @@ class TradingOrchestrator: logger.debug(f"Error capturing DQN prediction: {e}") def _get_current_price(self, symbol: str) -> Optional[float]: - """Get current price for a symbol - ENHANCED with better fallbacks""" + """Get current price for a symbol - using dedicated live price API""" try: - # Try data provider current prices first - if hasattr(self.data_provider, 'current_prices') and symbol in self.data_provider.current_prices: - price = self.data_provider.current_prices[symbol] - if price and price > 0: - return price - - # Try data provider get_current_price method - if hasattr(self.data_provider, 'get_current_price'): - try: - price = self.data_provider.get_current_price(symbol) - if price and price > 0: - return price - except Exception as dp_error: - logger.debug(f"Data provider get_current_price failed: {dp_error}") - - # Get fresh price from data provider - try multiple timeframes - for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability - try: - df = self.data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True) - if df is not None and not df.empty: - price = float(df['close'].iloc[-1]) - if price > 0: - logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}") - return price - except Exception as tf_error: - logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}") - continue - - # Try external API as last resort - try: - import requests - if symbol == 'ETH/USDT': - response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2) - if response.status_code == 200: - data = response.json() - price = float(data['price']) - if price > 0: - logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}") - return price - elif symbol == 'BTC/USDT': - response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2) - if response.status_code == 200: - data = response.json() - price = float(data['price']) - if price > 0: - logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}") - return price - except Exception as api_error: - logger.debug(f"External API failed: {api_error}") - - logger.warning(f"Could not get current price for {symbol} from any source") - + # Use the new low-latency live price method from data provider + if hasattr(self.data_provider, 'get_live_price_from_api'): + return self.data_provider.get_live_price_from_api(symbol) + else: + # Fallback to old method if not available + return self.data_provider.get_current_price(symbol) except Exception as e: logger.error(f"Error getting current price for {symbol}: {e}") - - # Return a reasonable fallback based on current market conditions - if symbol == 'ETH/USDT': - return 3385.0 # Current market price fallback - elif symbol == 'BTC/USDT': - return 119500.0 # Current market price fallback - - return None + return None async def _generate_fallback_prediction(self, symbol: str, current_price: float) -> Optional[Prediction]: """Generate a basic momentum-based fallback prediction when no models are available""" @@ -2304,7 +2251,7 @@ class TradingOrchestrator: 'evaluated_at': datetime.now().isoformat() } - price_pred_info = f"predicted: ${predicted_price:.2f}" if predicted_price is not None else "no price prediction" + price_pred_info = f"inference: ${inference_price:.2f}" if inference_price is not None else "no inference price" logger.debug(f"Evaluated {model_name} prediction: {'✓' if was_correct else '✗'} " f"({prediction['action']}, {price_change_pct:.2f}% change, " f"confidence: {prediction_confidence:.3f}, {price_pred_info}, reward: {reward:.3f})") diff --git a/core/standardized_data_provider.py b/core/standardized_data_provider.py index 0ae71ca..ca0c452 100644 --- a/core/standardized_data_provider.py +++ b/core/standardized_data_provider.py @@ -53,6 +53,13 @@ class StandardizedDataProvider(DataProvider): self.cob_data_cache[symbol] = None self.cob_imbalance_history[symbol] = deque(maxlen=300) # 5 minutes of 1s data + # Ensure live price cache exists (in case parent didn't initialize it) + if not hasattr(self, 'live_price_cache'): + self.live_price_cache: Dict[str, Tuple[float, datetime]] = {} + if not hasattr(self, 'live_price_cache_ttl'): + from datetime import timedelta + self.live_price_cache_ttl = timedelta(milliseconds=500) + # COB provider integration self.cob_provider: Optional[MultiExchangeCOBProvider] = None self._initialize_cob_provider() @@ -476,10 +483,37 @@ class StandardizedDataProvider(DataProvider): else: logger.warning(f"No 'close' column found in OHLCV data for {symbol}") return [] - except Exception as e: logger.error(f"Error getting recent prices for {symbol}: {e}") return [] + + def get_live_price_from_api(self, symbol: str) -> Optional[float]: + """FORCE fetch live price from Binance API for low-latency updates""" + # Check cache first to avoid excessive API calls + if symbol in self.live_price_cache: + price, timestamp = self.live_price_cache[symbol] + if datetime.now() - timestamp < self.live_price_cache_ttl: + return price + + try: + import requests + binance_symbol = symbol.replace('/', '') + url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}" + response = requests.get(url, timeout=0.5) # Use a short timeout for low latency + response.raise_for_status() + data = response.json() + price = float(data['price']) + # Update cache and current prices + self.live_price_cache[symbol] = (price, datetime.now()) + self.current_prices[symbol] = price + + logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}") + return price + except requests.exceptions.RequestException as e: + logger.warning(f"Failed to get live price for {symbol} from API: {e}") + # Fallback to last known current price + return self.current_prices.get(symbol) except Exception as e: - logger.error(f"Error stopping real-time processing: {e}") \ No newline at end of file + logger.error(f"Unexpected error getting live price for {symbol}: {e}") + return self.current_prices.get(symbol) \ No newline at end of file