7 Commits

Author SHA1 Message Date
9219b78241 UI 2025-07-28 11:44:01 +03:00
7c508ab536 cob 2025-07-28 11:12:42 +03:00
1084b7f5b5 cob buffered 2025-07-28 10:31:24 +03:00
619e39ac9b binance WS api enhanced 2025-07-28 10:26:47 +03:00
f5416c4f1e cob update fix 2025-07-28 09:46:49 +03:00
240d2b7877 stats, standartized data provider 2025-07-28 08:35:08 +03:00
6efaa27c33 dix price ccalls 2025-07-28 00:14:03 +03:00
8 changed files with 1628 additions and 193 deletions

View File

@ -163,7 +163,7 @@ class COBIntegration:
if symbol: if symbol:
self.websocket_status[symbol] = status self.websocket_status[symbol] = status
logger.info(f"🔌 WebSocket status for {symbol}: {status} - {message}") logger.info(f"WebSocket status for {symbol}: {status} - {message}")
# Notify dashboard callbacks about status change # Notify dashboard callbacks about status change
status_update = { status_update = {

View File

@ -4931,4 +4931,35 @@ class DataProvider:
try: try:
callback(symbol, data) callback(symbol, data)
except Exception as e: except Exception as e:
logger.error(f"Error in bucketed COB callback: {e}") logger.error(f"Error in bucketed COB callback: {e}")
def get_live_price_from_api(self, symbol: str) -> Optional[float]:
"""FORCE fetch live price from Binance API for low-latency updates"""
# Check cache first to avoid excessive API calls
if symbol in self.live_price_cache:
price, timestamp = self.live_price_cache[symbol]
if datetime.now() - timestamp < self.live_price_cache_ttl:
return price
try:
import requests
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
response = requests.get(url, timeout=0.5) # Use a short timeout for low latency
response.raise_for_status()
data = response.json()
price = float(data['price'])
# Update cache and current prices
self.live_price_cache[symbol] = (price, datetime.now())
self.current_prices[symbol] = price
logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}")
return price
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get live price for {symbol} from API: {e}")
# Fallback to last known current price
return self.current_prices.get(symbol)
except Exception as e:
logger.error(f"Unexpected error getting live price for {symbol}: {e}")
return self.current_prices.get(symbol)

File diff suppressed because it is too large Load Diff

View File

@ -1035,70 +1035,17 @@ class TradingOrchestrator:
logger.debug(f"Error capturing DQN prediction: {e}") logger.debug(f"Error capturing DQN prediction: {e}")
def _get_current_price(self, symbol: str) -> Optional[float]: def _get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for a symbol - ENHANCED with better fallbacks""" """Get current price for a symbol - using dedicated live price API"""
try: try:
# Try data provider current prices first # Use the new low-latency live price method from data provider
if hasattr(self.data_provider, 'current_prices') and symbol in self.data_provider.current_prices: if hasattr(self.data_provider, 'get_live_price_from_api'):
price = self.data_provider.current_prices[symbol] return self.data_provider.get_live_price_from_api(symbol)
if price and price > 0: else:
return price # Fallback to old method if not available
return self.data_provider.get_current_price(symbol)
# Try data provider get_current_price method
if hasattr(self.data_provider, 'get_current_price'):
try:
price = self.data_provider.get_current_price(symbol)
if price and price > 0:
return price
except Exception as dp_error:
logger.debug(f"Data provider get_current_price failed: {dp_error}")
# Get fresh price from data provider - try multiple timeframes
for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability
try:
df = self.data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True)
if df is not None and not df.empty:
price = float(df['close'].iloc[-1])
if price > 0:
logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}")
return price
except Exception as tf_error:
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
continue
# Try external API as last resort
try:
import requests
if symbol == 'ETH/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
return price
elif symbol == 'BTC/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
return price
except Exception as api_error:
logger.debug(f"External API failed: {api_error}")
logger.warning(f"Could not get current price for {symbol} from any source")
except Exception as e: except Exception as e:
logger.error(f"Error getting current price for {symbol}: {e}") logger.error(f"Error getting current price for {symbol}: {e}")
return None
# Return a reasonable fallback based on current market conditions
if symbol == 'ETH/USDT':
return 3385.0 # Current market price fallback
elif symbol == 'BTC/USDT':
return 119500.0 # Current market price fallback
return None
async def _generate_fallback_prediction(self, symbol: str, current_price: float) -> Optional[Prediction]: async def _generate_fallback_prediction(self, symbol: str, current_price: float) -> Optional[Prediction]:
"""Generate a basic momentum-based fallback prediction when no models are available""" """Generate a basic momentum-based fallback prediction when no models are available"""
@ -1257,10 +1204,12 @@ class TradingOrchestrator:
if not self.realtime_processing: if not self.realtime_processing:
return return
try: try:
# This is where you would feed the state to the DQN model for prediction # Store the COB state for DQN model access
# or store them for training. For now, we just log and store the latest. if 'state' in cob_data and cob_data['state'] is not None:
# self.latest_cob_state[symbol] = cob_data['state'] self.latest_cob_state[symbol] = cob_data['state']
# logger.debug(f"COB DQN state updated for {symbol}: {cob_data['state'][:5]}...") logger.debug(f"COB DQN state updated for {symbol}: shape {np.array(cob_data['state']).shape}")
else:
logger.warning(f"COB data for {symbol} missing 'state' field: {list(cob_data.keys())}")
# If training is enabled, add to training data # If training is enabled, add to training data
if self.training_enabled and self.enhanced_training_system: if self.training_enabled and self.enhanced_training_system:
@ -1284,8 +1233,11 @@ class TradingOrchestrator:
logger.debug(f"Invalidated data provider cache for {symbol} due to COB update") logger.debug(f"Invalidated data provider cache for {symbol} due to COB update")
# Update dashboard # Update dashboard
if self.dashboard and hasattr(self.dashboard, 'update_cob_data'): if self.dashboard and hasattr(self.dashboard, 'update_cob_data_from_orchestrator'):
self.dashboard.update_cob_data(symbol, cob_data) self.dashboard.update_cob_data_from_orchestrator(symbol, cob_data)
logger.debug(f"📊 Sent COB data for {symbol} to dashboard")
else:
logger.debug(f"📊 No dashboard connected to receive COB data for {symbol}")
except Exception as e: except Exception as e:
logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}") logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}")
@ -1624,10 +1576,12 @@ class TradingOrchestrator:
# Log statistics periodically (every 10 inferences) # Log statistics periodically (every 10 inferences)
stats = self.model_statistics[model_name] stats = self.model_statistics[model_name]
if stats.total_inferences % 10 == 0: if stats.total_inferences % 10 == 0:
last_prediction_str = stats.last_prediction if stats.last_prediction is not None else "None"
last_confidence_str = f"{stats.last_confidence:.3f}" if stats.last_confidence is not None else "N/A"
logger.debug(f"Model {model_name} stats: {stats.total_inferences} inferences, " logger.debug(f"Model {model_name} stats: {stats.total_inferences} inferences, "
f"{stats.inference_rate_per_minute:.1f}/min, " f"{stats.inference_rate_per_minute:.1f}/min, "
f"avg: {stats.average_inference_time_ms:.1f}ms, " f"avg: {stats.average_inference_time_ms:.1f}ms, "
f"last: {stats.last_prediction} ({stats.last_confidence:.3f})") f"last: {last_prediction_str} ({last_confidence_str})")
except Exception as e: except Exception as e:
logger.error(f"Error updating statistics for {model_name}: {e}") logger.error(f"Error updating statistics for {model_name}: {e}")
@ -2192,11 +2146,39 @@ class TradingOrchestrator:
outcome_status = "✅ CORRECT" if was_correct else "❌ INCORRECT" outcome_status = "✅ CORRECT" if was_correct else "❌ INCORRECT"
# Get model statistics for enhanced logging
model_stats = self.get_model_statistics(model_name)
current_loss = model_stats.current_loss if model_stats else None
best_loss = model_stats.best_loss if model_stats else None
avg_loss = model_stats.average_loss if model_stats else None
# Calculate reward for logging
reward, _ = self._calculate_sophisticated_reward(
predicted_action,
predicted_confidence,
actual_price_change_pct,
time_diff_seconds / 60, # Convert to minutes
has_price_prediction=predicted_price is not None
)
# Enhanced logging with detailed information
logger.info(f"Completed immediate training for {model_name} - {outcome_status}") logger.info(f"Completed immediate training for {model_name} - {outcome_status}")
logger.info(f" Prediction: {predicted_action} ({predicted_confidence:.3f})") logger.info(f" Prediction: {predicted_action} (confidence: {predicted_confidence:.3f})")
logger.info(f" {price_outcome}") logger.info(f" {price_outcome}")
logger.info(f" Reward: {reward:.4f} | Time: {time_diff_seconds:.1f}s")
# Safe formatting for loss values
current_loss_str = f"{current_loss:.4f}" if current_loss is not None else "N/A"
best_loss_str = f"{best_loss:.4f}" if best_loss is not None else "N/A"
avg_loss_str = f"{avg_loss:.4f}" if avg_loss is not None else "N/A"
logger.info(f" Loss: {current_loss_str} | Best: {best_loss_str} | Avg: {avg_loss_str}")
logger.info(f" Outcome: {outcome_status}") logger.info(f" Outcome: {outcome_status}")
# Add performance summary
if model_name in self.model_performance:
perf = self.model_performance[model_name]
logger.info(f" Performance: {perf['accuracy']:.1%} ({perf['correct']}/{perf['total']})")
except Exception as e: except Exception as e:
logger.error(f"Error in immediate training for {model_name}: {e}") logger.error(f"Error in immediate training for {model_name}: {e}")
@ -2291,6 +2273,13 @@ class TradingOrchestrator:
f"{price_prediction_stats['accurate']}/{price_prediction_stats['total']} " f"{price_prediction_stats['accurate']}/{price_prediction_stats['total']} "
f"({price_prediction_stats['avg_error']:.2f}% avg error)") f"({price_prediction_stats['avg_error']:.2f}% avg error)")
# Enhanced logging for training evaluation
logger.info(f"Training evaluation for {model_name}:")
logger.info(f" Action: {predicted_action} | Confidence: {prediction_confidence:.3f}")
logger.info(f" Price change: {price_change_pct:+.3f}% | Time: {time_diff_seconds:.1f}s")
logger.info(f" Reward: {reward:.4f} | Correct: {was_correct}")
logger.info(f" Accuracy: {self.model_performance[model_name]['accuracy']:.1%} ({self.model_performance[model_name]['correct']}/{self.model_performance[model_name]['total']})")
# Train the specific model based on sophisticated outcome # Train the specific model based on sophisticated outcome
await self._train_model_on_outcome(record, was_correct, price_change_pct, reward) await self._train_model_on_outcome(record, was_correct, price_change_pct, reward)
@ -2304,7 +2293,7 @@ class TradingOrchestrator:
'evaluated_at': datetime.now().isoformat() 'evaluated_at': datetime.now().isoformat()
} }
price_pred_info = f"predicted: ${predicted_price:.2f}" if predicted_price is not None else "no price prediction" price_pred_info = f"inference: ${inference_price:.2f}" if inference_price is not None else "no inference price"
logger.debug(f"Evaluated {model_name} prediction: {'' if was_correct else ''} " logger.debug(f"Evaluated {model_name} prediction: {'' if was_correct else ''} "
f"({prediction['action']}, {price_change_pct:.2f}% change, " f"({prediction['action']}, {price_change_pct:.2f}% change, "
f"confidence: {prediction_confidence:.3f}, {price_pred_info}, reward: {reward:.3f})") f"confidence: {prediction_confidence:.3f}, {price_pred_info}, reward: {reward:.3f})")

View File

@ -597,7 +597,7 @@ class RealtimeRLCOBTrader:
for symbol in self.symbols: for symbol in self.symbols:
await self._process_signals(symbol) await self._process_signals(symbol)
await asyncio.sleep(0.1) # Process signals every 100ms await asyncio.sleep(0.5) # Process signals every 500ms to reduce load
except Exception as e: except Exception as e:
logger.error(f"Error in signal processing loop: {e}") logger.error(f"Error in signal processing loop: {e}")

View File

@ -53,6 +53,20 @@ class StandardizedDataProvider(DataProvider):
self.cob_data_cache[symbol] = None self.cob_data_cache[symbol] = None
self.cob_imbalance_history[symbol] = deque(maxlen=300) # 5 minutes of 1s data self.cob_imbalance_history[symbol] = deque(maxlen=300) # 5 minutes of 1s data
# Ensure live price cache exists (in case parent didn't initialize it)
if not hasattr(self, 'live_price_cache'):
self.live_price_cache: Dict[str, Tuple[float, datetime]] = {}
if not hasattr(self, 'live_price_cache_ttl'):
from datetime import timedelta
self.live_price_cache_ttl = timedelta(milliseconds=500)
# Initialize WebSocket cache for dashboard compatibility
if not hasattr(self, 'ws_price_cache'):
self.ws_price_cache: Dict[str, float] = {}
# Initialize orchestrator reference (for dashboard compatibility)
self.orchestrator = None
# COB provider integration # COB provider integration
self.cob_provider: Optional[MultiExchangeCOBProvider] = None self.cob_provider: Optional[MultiExchangeCOBProvider] = None
self._initialize_cob_provider() self._initialize_cob_provider()
@ -476,10 +490,182 @@ class StandardizedDataProvider(DataProvider):
else: else:
logger.warning(f"No 'close' column found in OHLCV data for {symbol}") logger.warning(f"No 'close' column found in OHLCV data for {symbol}")
return [] return []
except Exception as e: except Exception as e:
logger.error(f"Error getting recent prices for {symbol}: {e}") logger.error(f"Error getting recent prices for {symbol}: {e}")
return [] return []
def get_live_price_from_api(self, symbol: str) -> Optional[float]:
"""ROBUST live price fetching with comprehensive fallbacks"""
try:
# 1. Check cache first to avoid excessive API calls
if symbol in self.live_price_cache:
price, timestamp = self.live_price_cache[symbol]
if datetime.now() - timestamp < self.live_price_cache_ttl:
logger.debug(f"Using cached price for {symbol}: ${price:.2f}")
return price
# 2. Try direct Binance API call
try:
import requests
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
response = requests.get(url, timeout=0.5) # Use a short timeout for low latency
response.raise_for_status()
data = response.json()
price = float(data['price'])
# Update cache and current prices
self.live_price_cache[symbol] = (price, datetime.now())
self.current_prices[symbol] = price
logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}")
return price
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get live price for {symbol} from API: {e}")
except Exception as e:
logger.warning(f"Unexpected error in API call for {symbol}: {e}")
# 3. Fallback to current prices from parent
if hasattr(self, 'current_prices') and symbol in self.current_prices:
price = self.current_prices[symbol]
if price and price > 0:
logger.debug(f"Using current price for {symbol}: ${price:.2f}")
return price
# 4. Try parent's get_current_price method
if hasattr(self, 'get_current_price'):
try:
price = self.get_current_price(symbol)
if price and price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from parent: ${price:.2f}")
return price
except Exception as e:
logger.debug(f"Parent get_current_price failed for {symbol}: {e}")
# 5. Try historical data from multiple timeframes
for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability
try:
df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True)
if df is not None and not df.empty:
price = float(df['close'].iloc[-1])
if price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}")
return price
except Exception as tf_error:
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
continue
# 6. Try WebSocket cache if available
ws_symbol = symbol.replace('/', '')
if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache:
price = self.ws_price_cache[ws_symbol]
if price and price > 0:
logger.debug(f"Using WebSocket cache for {symbol}: ${price:.2f}")
return price
# 7. Try to get from orchestrator if available (for dashboard compatibility)
if hasattr(self, 'orchestrator') and self.orchestrator:
try:
if hasattr(self.orchestrator, 'data_provider'):
price = self.orchestrator.data_provider.get_current_price(symbol)
if price and price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}")
return price
except Exception as orch_error:
logger.debug(f"Failed to get price from orchestrator: {orch_error}")
# 8. Last resort: try external API with longer timeout
try:
import requests
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
response = requests.get(url, timeout=2) # Longer timeout for last resort
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
self.current_prices[symbol] = price
logger.warning(f"Got current price for {symbol} from external API (last resort): ${price:.2f}")
return price
except Exception as api_error:
logger.debug(f"External API failed: {api_error}")
logger.warning(f"Could not get current price for {symbol} from any source")
except Exception as e: except Exception as e:
logger.error(f"Error stopping real-time processing: {e}") logger.error(f"Error getting current price for {symbol}: {e}")
# Return a fallback price if we have any cached data
if hasattr(self, 'current_prices') and symbol in self.current_prices and self.current_prices[symbol] > 0:
return self.current_prices[symbol]
# Return None instead of hardcoded fallbacks - let the caller handle missing data
return None
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price with robust fallbacks - enhanced version"""
try:
# 1. Try live price API first (our enhanced method)
price = self.get_live_price_from_api(symbol)
if price and price > 0:
return price
# 2. Try parent's get_current_price method
if hasattr(super(), 'get_current_price'):
try:
price = super().get_current_price(symbol)
if price and price > 0:
return price
except Exception as e:
logger.debug(f"Parent get_current_price failed for {symbol}: {e}")
# 3. Try current prices cache
if hasattr(self, 'current_prices') and symbol in self.current_prices:
price = self.current_prices[symbol]
if price and price > 0:
return price
# 4. Try historical data from multiple timeframes
for timeframe in ['1m', '5m', '1h']:
try:
df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True)
if df is not None and not df.empty:
price = float(df['close'].iloc[-1])
if price > 0:
self.current_prices[symbol] = price
return price
except Exception as tf_error:
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
continue
# 5. Try WebSocket cache if available
ws_symbol = symbol.replace('/', '')
if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache:
price = self.ws_price_cache[ws_symbol]
if price and price > 0:
return price
logger.warning(f"Could not get current price for {symbol} from any source")
return None
except Exception as e:
logger.error(f"Error getting current price for {symbol}: {e}")
return None
def update_ws_price_cache(self, symbol: str, price: float):
"""Update WebSocket price cache for dashboard compatibility"""
try:
ws_symbol = symbol.replace('/', '')
self.ws_price_cache[ws_symbol] = price
# Also update current prices for consistency
self.current_prices[symbol] = price
logger.debug(f"Updated WS cache for {symbol}: ${price:.2f}")
except Exception as e:
logger.error(f"Error updating WS cache for {symbol}: {e}")
def set_orchestrator(self, orchestrator):
"""Set orchestrator reference for dashboard compatibility"""
self.orchestrator = orchestrator

View File

@ -141,6 +141,31 @@ class CleanTradingDashboard:
self.orchestrator.set_trading_executor(self.trading_executor) self.orchestrator.set_trading_executor(self.trading_executor)
logger.info("Trading executor connected to orchestrator for signal execution") logger.info("Trading executor connected to orchestrator for signal execution")
# Connect dashboard to orchestrator for COB data updates
if hasattr(self.orchestrator, 'set_dashboard'):
self.orchestrator.set_dashboard(self)
logger.info("✅ Dashboard connected to orchestrator for COB data updates")
# Start orchestrator's real-time processing to ensure COB data flows
if hasattr(self.orchestrator, 'start_continuous_trading'):
try:
# Start in background thread to avoid blocking dashboard startup
import threading
def start_orchestrator_trading():
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.orchestrator.start_continuous_trading())
except Exception as e:
logger.error(f"Error starting orchestrator trading: {e}")
trading_thread = threading.Thread(target=start_orchestrator_trading, daemon=True)
trading_thread.start()
logger.info("✅ Started orchestrator real-time processing for COB data")
except Exception as e:
logger.error(f"Failed to start orchestrator trading: {e}")
# Initialize enhanced training system for predictions # Initialize enhanced training system for predictions
self.training_system = None self.training_system = None
self._initialize_enhanced_training_system() self._initialize_enhanced_training_system()
@ -202,8 +227,20 @@ class CleanTradingDashboard:
# COB data cache - enhanced with price buckets and memory system # COB data cache - enhanced with price buckets and memory system
self.cob_cache: dict = { self.cob_cache: dict = {
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}, 'ETH/USDT': {
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0} 'last_update': 0,
'data': None,
'updates_count': 0,
'update_times': [],
'update_rate': 0.0
},
'BTC/USDT': {
'last_update': 0,
'data': None,
'updates_count': 0,
'update_times': [],
'update_rate': 0.0
}
} }
self.latest_cob_data: dict = {} # Cache for COB integration data self.latest_cob_data: dict = {} # Cache for COB integration data
self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display) self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display)
@ -292,6 +329,49 @@ class CleanTradingDashboard:
def _on_cob_data_update(self, symbol: str, cob_data: dict): def _on_cob_data_update(self, symbol: str, cob_data: dict):
"""Handle COB data updates from data provider""" """Handle COB data updates from data provider"""
try: try:
# Also update the COB cache for status display
if not hasattr(self, 'cob_cache'):
self.cob_cache = {}
if symbol not in self.cob_cache:
self.cob_cache[symbol] = {
'last_update': 0,
'data': None,
'updates_count': 0,
'update_times': [], # Track recent update times for rate calculation
'update_rate': 0.0
}
# Update cache
current_time = time.time()
self.cob_cache[symbol]['data'] = cob_data
self.cob_cache[symbol]['last_update'] = current_time
self.cob_cache[symbol]['updates_count'] += 1
self.cob_cache[symbol]['websocket_status'] = 'connected'
self.cob_cache[symbol]['source'] = 'data_provider'
# Track update times for rate calculation (keep last 60 seconds)
self.cob_cache[symbol]['update_times'].append(current_time)
# Remove updates older than 60 seconds
cutoff_time = current_time - 60
self.cob_cache[symbol]['update_times'] = [
t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time
]
# Calculate update rate (updates per second)
if len(self.cob_cache[symbol]['update_times']) > 1:
time_span = current_time - self.cob_cache[symbol]['update_times'][0]
if time_span > 0:
self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span
else:
self.cob_cache[symbol]['update_rate'] = 0.0
else:
self.cob_cache[symbol]['update_rate'] = 0.0
logger.debug(f"Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})")
# Continue with existing logic
# Update latest COB data cache # Update latest COB data cache
if not hasattr(self, 'latest_cob_data'): if not hasattr(self, 'latest_cob_data'):
self.latest_cob_data = {} self.latest_cob_data = {}
@ -332,7 +412,6 @@ class CleanTradingDashboard:
if not hasattr(self, 'cob_last_update'): if not hasattr(self, 'cob_last_update'):
self.cob_last_update = {} self.cob_last_update = {}
import time
self.cob_last_update[symbol] = time.time() self.cob_last_update[symbol] = time.time()
# Update current price from COB data # Update current price from COB data
@ -767,21 +846,22 @@ class CleanTradingDashboard:
if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode: if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode:
mexc_status = "LIVE+SYNC" # Indicate live trading with position sync mexc_status = "LIVE+SYNC" # Indicate live trading with position sync
# COB WebSocket status # COB WebSocket status with update rate
cob_status = self.get_cob_websocket_status() cob_status = self.get_cob_websocket_status()
overall_status = cob_status.get('overall_status', 'unknown') overall_status = cob_status.get('overall_status', 'unknown')
warning_message = cob_status.get('warning_message') warning_message = cob_status.get('warning_message')
update_rate = cob_status.get('update_rate', 0.0)
if overall_status == 'all_connected': if overall_status == 'all_connected':
cob_status_str = "Connected" cob_status_str = f"Connected ({update_rate:.1f}/s)"
elif overall_status == 'partial_fallback': elif overall_status == 'partial_fallback':
cob_status_str = "Fallback" cob_status_str = f"Fallback ({update_rate:.1f}/s)"
elif overall_status == 'degraded': elif overall_status == 'degraded':
cob_status_str = "Degraded" cob_status_str = f"Degraded ({update_rate:.1f}/s)"
elif overall_status == 'unavailable': elif overall_status == 'unavailable':
cob_status_str = "N/A" cob_status_str = "N/A"
else: else:
cob_status_str = "Error" cob_status_str = f"Error ({update_rate:.1f}/s)"
return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, cob_status_str, mexc_status return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, cob_status_str, mexc_status
@ -1076,7 +1156,7 @@ class CleanTradingDashboard:
return [html.I(className="fas fa-save me-1"), "Store All Models"] return [html.I(className="fas fa-save me-1"), "Store All Models"]
def _get_current_price(self, symbol: str) -> Optional[float]: def _get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for symbol - ENHANCED with better fallbacks""" """Get current price for symbol - ONLY using our data providers"""
try: try:
# Try WebSocket cache first # Try WebSocket cache first
ws_symbol = symbol.replace('/', '') ws_symbol = symbol.replace('/', '')
@ -1099,6 +1179,16 @@ class CleanTradingDashboard:
except Exception as dp_error: except Exception as dp_error:
logger.debug(f"Data provider get_current_price failed: {dp_error}") logger.debug(f"Data provider get_current_price failed: {dp_error}")
# Try data provider get_live_price_from_api method (our standardized method)
if hasattr(self.data_provider, 'get_live_price_from_api'):
try:
price = self.data_provider.get_live_price_from_api(symbol)
if price and price > 0:
self.current_prices[symbol] = price
return price
except Exception as live_error:
logger.debug(f"Data provider get_live_price_from_api failed: {live_error}")
# Fallback to dashboard current prices # Fallback to dashboard current prices
if symbol in self.current_prices and self.current_prices[symbol] > 0: if symbol in self.current_prices and self.current_prices[symbol] > 0:
return self.current_prices[symbol] return self.current_prices[symbol]
@ -1127,34 +1217,18 @@ class CleanTradingDashboard:
self.current_prices[symbol] = price self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}") logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}")
return price return price
# Try orchestrator's live price method
if hasattr(self.orchestrator.data_provider, 'get_live_price_from_api'):
price = self.orchestrator.data_provider.get_live_price_from_api(symbol)
if price and price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from orchestrator live API: ${price:.2f}")
return price
except Exception as orch_error: except Exception as orch_error:
logger.debug(f"Failed to get price from orchestrator: {orch_error}") logger.debug(f"Failed to get price from orchestrator: {orch_error}")
# Try external API as last resort logger.warning(f"Could not get current price for {symbol} from any data provider source")
try:
import requests
if symbol == 'ETH/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
return price
elif symbol == 'BTC/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
self.current_prices[symbol] = price
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
return price
except Exception as api_error:
logger.debug(f"External API failed: {api_error}")
logger.warning(f"Could not get current price for {symbol} from any source")
except Exception as e: except Exception as e:
logger.error(f"Error getting current price for {symbol}: {e}") logger.error(f"Error getting current price for {symbol}: {e}")
@ -1163,12 +1237,7 @@ class CleanTradingDashboard:
if symbol in self.current_prices and self.current_prices[symbol] > 0: if symbol in self.current_prices and self.current_prices[symbol] > 0:
return self.current_prices[symbol] return self.current_prices[symbol]
# Return a reasonable fallback based on current market conditions # Return None instead of hardcoded fallbacks - let the UI handle missing data
if symbol == 'ETH/USDT':
return 3385.0 # Current market price fallback
elif symbol == 'BTC/USDT':
return 119500.0 # Current market price fallback
return None return None
def _create_price_chart(self, symbol: str) -> go.Figure: def _create_price_chart(self, symbol: str) -> go.Figure:
@ -6509,6 +6578,67 @@ class CleanTradingDashboard:
except Exception as e: except Exception as e:
logger.error(f"❌ Error initializing Enhanced COB Integration: {e}") logger.error(f"❌ Error initializing Enhanced COB Integration: {e}")
def update_cob_data_from_orchestrator(self, symbol: str, cob_data: Dict):
"""Update COB cache from orchestrator data - called by orchestrator"""
try:
# Initialize cache if needed
if not hasattr(self, 'cob_cache'):
self.cob_cache = {}
if symbol not in self.cob_cache:
self.cob_cache[symbol] = {
'last_update': 0,
'data': None,
'updates_count': 0,
'update_times': [], # Track recent update times for rate calculation
'update_rate': 0.0
}
# Update cache with orchestrator data
current_time = time.time()
self.cob_cache[symbol]['data'] = cob_data
self.cob_cache[symbol]['last_update'] = current_time
self.cob_cache[symbol]['updates_count'] += 1
# Track update times for rate calculation (keep last 60 seconds)
self.cob_cache[symbol]['update_times'].append(current_time)
# Remove updates older than 60 seconds
cutoff_time = current_time - 60
self.cob_cache[symbol]['update_times'] = [
t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time
]
# Calculate update rate (updates per second)
if len(self.cob_cache[symbol]['update_times']) > 1:
time_span = current_time - self.cob_cache[symbol]['update_times'][0]
if time_span > 0:
self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span
else:
self.cob_cache[symbol]['update_rate'] = 0.0
else:
self.cob_cache[symbol]['update_rate'] = 0.0
# Set WebSocket status based on data source
if isinstance(cob_data, dict) and 'stats' in cob_data:
source = cob_data['stats'].get('source', 'unknown')
if 'websocket' in source.lower():
self.cob_cache[symbol]['websocket_status'] = 'connected'
self.cob_cache[symbol]['source'] = source
elif 'rest' in source.lower() or 'fallback' in source.lower():
self.cob_cache[symbol]['websocket_status'] = 'fallback'
self.cob_cache[symbol]['source'] = source
else:
self.cob_cache[symbol]['websocket_status'] = 'unknown'
self.cob_cache[symbol]['source'] = source
else:
self.cob_cache[symbol]['websocket_status'] = 'connected'
self.cob_cache[symbol]['source'] = 'orchestrator'
logger.debug(f"Updated COB cache for {symbol} from orchestrator: {self.cob_cache[symbol]['websocket_status']} (updates: {self.cob_cache[symbol]['updates_count']})")
except Exception as e:
logger.error(f"Error updating COB cache from orchestrator for {symbol}: {e}")
def _on_enhanced_cob_update(self, symbol: str, data: Dict): def _on_enhanced_cob_update(self, symbol: str, data: Dict):
"""Handle enhanced COB updates with WebSocket status""" """Handle enhanced COB updates with WebSocket status"""
try: try:
@ -6524,7 +6654,13 @@ class CleanTradingDashboard:
# Update COB cache with status # Update COB cache with status
if symbol not in self.cob_cache: if symbol not in self.cob_cache:
self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0} self.cob_cache[symbol] = {
'last_update': 0,
'data': None,
'updates_count': 0,
'update_times': [],
'update_rate': 0.0
}
self.cob_cache[symbol]['websocket_status'] = status self.cob_cache[symbol]['websocket_status'] = status
self.cob_cache[symbol]['websocket_message'] = message self.cob_cache[symbol]['websocket_message'] = message
@ -6620,11 +6756,15 @@ class CleanTradingDashboard:
status_summary['overall_status'] = 'error' status_summary['overall_status'] = 'error'
status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down' status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down'
# Set last update time # Set last update time and calculate overall update rate
last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()] last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()]
if last_updates and max(last_updates) > 0: if last_updates and max(last_updates) > 0:
status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat() status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat()
# Calculate overall update rate (sum of all symbols)
total_update_rate = sum(cache.get('update_rate', 0.0) for cache in self.cob_cache.values())
status_summary['update_rate'] = total_update_rate
return status_summary return status_summary
except Exception as e: except Exception as e:

View File

@ -256,11 +256,12 @@ class DashboardLayoutManager:
return html.Div([ return html.Div([
html.Div([ html.Div([
html.Div([ html.Div([
# Chart header with manual trading buttons # Chart header with manual trading buttons and live price
html.Div([ html.Div([
html.H6([ html.H6([
html.I(className="fas fa-chart-candlestick me-2"), html.I(className="fas fa-chart-candlestick me-2"),
"Live 1m Price Chart (3h) + 1s Mini Chart (5min) - Updated Every Second" "Live Price (WebSocket): ",
html.Span(id="current-price", className="ms-2 text-primary", style={"fontWeight": "bold", "fontSize": "1.2em"})
], className="card-title mb-0"), ], className="card-title mb-0"),
html.Div([ html.Div([
html.Button([ html.Button([