Compare commits
7 Commits
b4076241c9
...
9219b78241
Author | SHA1 | Date | |
---|---|---|---|
9219b78241 | |||
7c508ab536 | |||
1084b7f5b5 | |||
619e39ac9b | |||
f5416c4f1e | |||
240d2b7877 | |||
6efaa27c33 |
@ -163,7 +163,7 @@ class COBIntegration:
|
||||
|
||||
if symbol:
|
||||
self.websocket_status[symbol] = status
|
||||
logger.info(f"🔌 WebSocket status for {symbol}: {status} - {message}")
|
||||
logger.info(f"WebSocket status for {symbol}: {status} - {message}")
|
||||
|
||||
# Notify dashboard callbacks about status change
|
||||
status_update = {
|
||||
|
@ -4931,4 +4931,35 @@ class DataProvider:
|
||||
try:
|
||||
callback(symbol, data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in bucketed COB callback: {e}")
|
||||
logger.error(f"Error in bucketed COB callback: {e}")
|
||||
|
||||
def get_live_price_from_api(self, symbol: str) -> Optional[float]:
|
||||
"""FORCE fetch live price from Binance API for low-latency updates"""
|
||||
# Check cache first to avoid excessive API calls
|
||||
if symbol in self.live_price_cache:
|
||||
price, timestamp = self.live_price_cache[symbol]
|
||||
if datetime.now() - timestamp < self.live_price_cache_ttl:
|
||||
return price
|
||||
|
||||
try:
|
||||
import requests
|
||||
binance_symbol = symbol.replace('/', '')
|
||||
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
|
||||
response = requests.get(url, timeout=0.5) # Use a short timeout for low latency
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
|
||||
# Update cache and current prices
|
||||
self.live_price_cache[symbol] = (price, datetime.now())
|
||||
self.current_prices[symbol] = price
|
||||
|
||||
logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}")
|
||||
return price
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to get live price for {symbol} from API: {e}")
|
||||
# Fallback to last known current price
|
||||
return self.current_prices.get(symbol)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error getting live price for {symbol}: {e}")
|
||||
return self.current_prices.get(symbol)
|
File diff suppressed because it is too large
Load Diff
@ -1035,70 +1035,17 @@ class TradingOrchestrator:
|
||||
logger.debug(f"Error capturing DQN prediction: {e}")
|
||||
|
||||
def _get_current_price(self, symbol: str) -> Optional[float]:
|
||||
"""Get current price for a symbol - ENHANCED with better fallbacks"""
|
||||
"""Get current price for a symbol - using dedicated live price API"""
|
||||
try:
|
||||
# Try data provider current prices first
|
||||
if hasattr(self.data_provider, 'current_prices') and symbol in self.data_provider.current_prices:
|
||||
price = self.data_provider.current_prices[symbol]
|
||||
if price and price > 0:
|
||||
return price
|
||||
|
||||
# Try data provider get_current_price method
|
||||
if hasattr(self.data_provider, 'get_current_price'):
|
||||
try:
|
||||
price = self.data_provider.get_current_price(symbol)
|
||||
if price and price > 0:
|
||||
return price
|
||||
except Exception as dp_error:
|
||||
logger.debug(f"Data provider get_current_price failed: {dp_error}")
|
||||
|
||||
# Get fresh price from data provider - try multiple timeframes
|
||||
for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability
|
||||
try:
|
||||
df = self.data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True)
|
||||
if df is not None and not df.empty:
|
||||
price = float(df['close'].iloc[-1])
|
||||
if price > 0:
|
||||
logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}")
|
||||
return price
|
||||
except Exception as tf_error:
|
||||
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
|
||||
continue
|
||||
|
||||
# Try external API as last resort
|
||||
try:
|
||||
import requests
|
||||
if symbol == 'ETH/USDT':
|
||||
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
if price > 0:
|
||||
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
|
||||
return price
|
||||
elif symbol == 'BTC/USDT':
|
||||
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
if price > 0:
|
||||
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
|
||||
return price
|
||||
except Exception as api_error:
|
||||
logger.debug(f"External API failed: {api_error}")
|
||||
|
||||
logger.warning(f"Could not get current price for {symbol} from any source")
|
||||
|
||||
# Use the new low-latency live price method from data provider
|
||||
if hasattr(self.data_provider, 'get_live_price_from_api'):
|
||||
return self.data_provider.get_live_price_from_api(symbol)
|
||||
else:
|
||||
# Fallback to old method if not available
|
||||
return self.data_provider.get_current_price(symbol)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting current price for {symbol}: {e}")
|
||||
|
||||
# Return a reasonable fallback based on current market conditions
|
||||
if symbol == 'ETH/USDT':
|
||||
return 3385.0 # Current market price fallback
|
||||
elif symbol == 'BTC/USDT':
|
||||
return 119500.0 # Current market price fallback
|
||||
|
||||
return None
|
||||
return None
|
||||
|
||||
async def _generate_fallback_prediction(self, symbol: str, current_price: float) -> Optional[Prediction]:
|
||||
"""Generate a basic momentum-based fallback prediction when no models are available"""
|
||||
@ -1257,10 +1204,12 @@ class TradingOrchestrator:
|
||||
if not self.realtime_processing:
|
||||
return
|
||||
try:
|
||||
# This is where you would feed the state to the DQN model for prediction
|
||||
# or store them for training. For now, we just log and store the latest.
|
||||
# self.latest_cob_state[symbol] = cob_data['state']
|
||||
# logger.debug(f"COB DQN state updated for {symbol}: {cob_data['state'][:5]}...")
|
||||
# Store the COB state for DQN model access
|
||||
if 'state' in cob_data and cob_data['state'] is not None:
|
||||
self.latest_cob_state[symbol] = cob_data['state']
|
||||
logger.debug(f"COB DQN state updated for {symbol}: shape {np.array(cob_data['state']).shape}")
|
||||
else:
|
||||
logger.warning(f"COB data for {symbol} missing 'state' field: {list(cob_data.keys())}")
|
||||
|
||||
# If training is enabled, add to training data
|
||||
if self.training_enabled and self.enhanced_training_system:
|
||||
@ -1284,8 +1233,11 @@ class TradingOrchestrator:
|
||||
logger.debug(f"Invalidated data provider cache for {symbol} due to COB update")
|
||||
|
||||
# Update dashboard
|
||||
if self.dashboard and hasattr(self.dashboard, 'update_cob_data'):
|
||||
self.dashboard.update_cob_data(symbol, cob_data)
|
||||
if self.dashboard and hasattr(self.dashboard, 'update_cob_data_from_orchestrator'):
|
||||
self.dashboard.update_cob_data_from_orchestrator(symbol, cob_data)
|
||||
logger.debug(f"📊 Sent COB data for {symbol} to dashboard")
|
||||
else:
|
||||
logger.debug(f"📊 No dashboard connected to receive COB data for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in _on_cob_dashboard_data for {symbol}: {e}")
|
||||
@ -1624,10 +1576,12 @@ class TradingOrchestrator:
|
||||
# Log statistics periodically (every 10 inferences)
|
||||
stats = self.model_statistics[model_name]
|
||||
if stats.total_inferences % 10 == 0:
|
||||
last_prediction_str = stats.last_prediction if stats.last_prediction is not None else "None"
|
||||
last_confidence_str = f"{stats.last_confidence:.3f}" if stats.last_confidence is not None else "N/A"
|
||||
logger.debug(f"Model {model_name} stats: {stats.total_inferences} inferences, "
|
||||
f"{stats.inference_rate_per_minute:.1f}/min, "
|
||||
f"avg: {stats.average_inference_time_ms:.1f}ms, "
|
||||
f"last: {stats.last_prediction} ({stats.last_confidence:.3f})")
|
||||
f"last: {last_prediction_str} ({last_confidence_str})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating statistics for {model_name}: {e}")
|
||||
@ -2192,11 +2146,39 @@ class TradingOrchestrator:
|
||||
|
||||
outcome_status = "✅ CORRECT" if was_correct else "❌ INCORRECT"
|
||||
|
||||
# Get model statistics for enhanced logging
|
||||
model_stats = self.get_model_statistics(model_name)
|
||||
current_loss = model_stats.current_loss if model_stats else None
|
||||
best_loss = model_stats.best_loss if model_stats else None
|
||||
avg_loss = model_stats.average_loss if model_stats else None
|
||||
|
||||
# Calculate reward for logging
|
||||
reward, _ = self._calculate_sophisticated_reward(
|
||||
predicted_action,
|
||||
predicted_confidence,
|
||||
actual_price_change_pct,
|
||||
time_diff_seconds / 60, # Convert to minutes
|
||||
has_price_prediction=predicted_price is not None
|
||||
)
|
||||
|
||||
# Enhanced logging with detailed information
|
||||
logger.info(f"Completed immediate training for {model_name} - {outcome_status}")
|
||||
logger.info(f" Prediction: {predicted_action} ({predicted_confidence:.3f})")
|
||||
logger.info(f" Prediction: {predicted_action} (confidence: {predicted_confidence:.3f})")
|
||||
logger.info(f" {price_outcome}")
|
||||
logger.info(f" Reward: {reward:.4f} | Time: {time_diff_seconds:.1f}s")
|
||||
|
||||
# Safe formatting for loss values
|
||||
current_loss_str = f"{current_loss:.4f}" if current_loss is not None else "N/A"
|
||||
best_loss_str = f"{best_loss:.4f}" if best_loss is not None else "N/A"
|
||||
avg_loss_str = f"{avg_loss:.4f}" if avg_loss is not None else "N/A"
|
||||
logger.info(f" Loss: {current_loss_str} | Best: {best_loss_str} | Avg: {avg_loss_str}")
|
||||
logger.info(f" Outcome: {outcome_status}")
|
||||
|
||||
# Add performance summary
|
||||
if model_name in self.model_performance:
|
||||
perf = self.model_performance[model_name]
|
||||
logger.info(f" Performance: {perf['accuracy']:.1%} ({perf['correct']}/{perf['total']})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in immediate training for {model_name}: {e}")
|
||||
|
||||
@ -2291,6 +2273,13 @@ class TradingOrchestrator:
|
||||
f"{price_prediction_stats['accurate']}/{price_prediction_stats['total']} "
|
||||
f"({price_prediction_stats['avg_error']:.2f}% avg error)")
|
||||
|
||||
# Enhanced logging for training evaluation
|
||||
logger.info(f"Training evaluation for {model_name}:")
|
||||
logger.info(f" Action: {predicted_action} | Confidence: {prediction_confidence:.3f}")
|
||||
logger.info(f" Price change: {price_change_pct:+.3f}% | Time: {time_diff_seconds:.1f}s")
|
||||
logger.info(f" Reward: {reward:.4f} | Correct: {was_correct}")
|
||||
logger.info(f" Accuracy: {self.model_performance[model_name]['accuracy']:.1%} ({self.model_performance[model_name]['correct']}/{self.model_performance[model_name]['total']})")
|
||||
|
||||
# Train the specific model based on sophisticated outcome
|
||||
await self._train_model_on_outcome(record, was_correct, price_change_pct, reward)
|
||||
|
||||
@ -2304,7 +2293,7 @@ class TradingOrchestrator:
|
||||
'evaluated_at': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
price_pred_info = f"predicted: ${predicted_price:.2f}" if predicted_price is not None else "no price prediction"
|
||||
price_pred_info = f"inference: ${inference_price:.2f}" if inference_price is not None else "no inference price"
|
||||
logger.debug(f"Evaluated {model_name} prediction: {'✓' if was_correct else '✗'} "
|
||||
f"({prediction['action']}, {price_change_pct:.2f}% change, "
|
||||
f"confidence: {prediction_confidence:.3f}, {price_pred_info}, reward: {reward:.3f})")
|
||||
|
@ -597,7 +597,7 @@ class RealtimeRLCOBTrader:
|
||||
for symbol in self.symbols:
|
||||
await self._process_signals(symbol)
|
||||
|
||||
await asyncio.sleep(0.1) # Process signals every 100ms
|
||||
await asyncio.sleep(0.5) # Process signals every 500ms to reduce load
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in signal processing loop: {e}")
|
||||
|
@ -53,6 +53,20 @@ class StandardizedDataProvider(DataProvider):
|
||||
self.cob_data_cache[symbol] = None
|
||||
self.cob_imbalance_history[symbol] = deque(maxlen=300) # 5 minutes of 1s data
|
||||
|
||||
# Ensure live price cache exists (in case parent didn't initialize it)
|
||||
if not hasattr(self, 'live_price_cache'):
|
||||
self.live_price_cache: Dict[str, Tuple[float, datetime]] = {}
|
||||
if not hasattr(self, 'live_price_cache_ttl'):
|
||||
from datetime import timedelta
|
||||
self.live_price_cache_ttl = timedelta(milliseconds=500)
|
||||
|
||||
# Initialize WebSocket cache for dashboard compatibility
|
||||
if not hasattr(self, 'ws_price_cache'):
|
||||
self.ws_price_cache: Dict[str, float] = {}
|
||||
|
||||
# Initialize orchestrator reference (for dashboard compatibility)
|
||||
self.orchestrator = None
|
||||
|
||||
# COB provider integration
|
||||
self.cob_provider: Optional[MultiExchangeCOBProvider] = None
|
||||
self._initialize_cob_provider()
|
||||
@ -476,10 +490,182 @@ class StandardizedDataProvider(DataProvider):
|
||||
else:
|
||||
logger.warning(f"No 'close' column found in OHLCV data for {symbol}")
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting recent prices for {symbol}: {e}")
|
||||
return []
|
||||
|
||||
def get_live_price_from_api(self, symbol: str) -> Optional[float]:
|
||||
"""ROBUST live price fetching with comprehensive fallbacks"""
|
||||
try:
|
||||
# 1. Check cache first to avoid excessive API calls
|
||||
if symbol in self.live_price_cache:
|
||||
price, timestamp = self.live_price_cache[symbol]
|
||||
if datetime.now() - timestamp < self.live_price_cache_ttl:
|
||||
logger.debug(f"Using cached price for {symbol}: ${price:.2f}")
|
||||
return price
|
||||
|
||||
# 2. Try direct Binance API call
|
||||
try:
|
||||
import requests
|
||||
binance_symbol = symbol.replace('/', '')
|
||||
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
|
||||
response = requests.get(url, timeout=0.5) # Use a short timeout for low latency
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
|
||||
# Update cache and current prices
|
||||
self.live_price_cache[symbol] = (price, datetime.now())
|
||||
self.current_prices[symbol] = price
|
||||
|
||||
logger.info(f"LIVE PRICE for {symbol}: ${price:.2f}")
|
||||
return price
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Failed to get live price for {symbol} from API: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Unexpected error in API call for {symbol}: {e}")
|
||||
|
||||
# 3. Fallback to current prices from parent
|
||||
if hasattr(self, 'current_prices') and symbol in self.current_prices:
|
||||
price = self.current_prices[symbol]
|
||||
if price and price > 0:
|
||||
logger.debug(f"Using current price for {symbol}: ${price:.2f}")
|
||||
return price
|
||||
|
||||
# 4. Try parent's get_current_price method
|
||||
if hasattr(self, 'get_current_price'):
|
||||
try:
|
||||
price = self.get_current_price(symbol)
|
||||
if price and price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from parent: ${price:.2f}")
|
||||
return price
|
||||
except Exception as e:
|
||||
logger.debug(f"Parent get_current_price failed for {symbol}: {e}")
|
||||
|
||||
# 5. Try historical data from multiple timeframes
|
||||
for timeframe in ['1m', '5m', '1h']: # Start with 1m for better reliability
|
||||
try:
|
||||
df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True)
|
||||
if df is not None and not df.empty:
|
||||
price = float(df['close'].iloc[-1])
|
||||
if price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from {timeframe}: ${price:.2f}")
|
||||
return price
|
||||
except Exception as tf_error:
|
||||
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
|
||||
continue
|
||||
|
||||
# 6. Try WebSocket cache if available
|
||||
ws_symbol = symbol.replace('/', '')
|
||||
if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache:
|
||||
price = self.ws_price_cache[ws_symbol]
|
||||
if price and price > 0:
|
||||
logger.debug(f"Using WebSocket cache for {symbol}: ${price:.2f}")
|
||||
return price
|
||||
|
||||
# 7. Try to get from orchestrator if available (for dashboard compatibility)
|
||||
if hasattr(self, 'orchestrator') and self.orchestrator:
|
||||
try:
|
||||
if hasattr(self.orchestrator, 'data_provider'):
|
||||
price = self.orchestrator.data_provider.get_current_price(symbol)
|
||||
if price and price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}")
|
||||
return price
|
||||
except Exception as orch_error:
|
||||
logger.debug(f"Failed to get price from orchestrator: {orch_error}")
|
||||
|
||||
# 8. Last resort: try external API with longer timeout
|
||||
try:
|
||||
import requests
|
||||
binance_symbol = symbol.replace('/', '')
|
||||
url = f"https://api.binance.com/api/v3/ticker/price?symbol={binance_symbol}"
|
||||
response = requests.get(url, timeout=2) # Longer timeout for last resort
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
if price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.warning(f"Got current price for {symbol} from external API (last resort): ${price:.2f}")
|
||||
return price
|
||||
except Exception as api_error:
|
||||
logger.debug(f"External API failed: {api_error}")
|
||||
|
||||
logger.warning(f"Could not get current price for {symbol} from any source")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping real-time processing: {e}")
|
||||
logger.error(f"Error getting current price for {symbol}: {e}")
|
||||
|
||||
# Return a fallback price if we have any cached data
|
||||
if hasattr(self, 'current_prices') and symbol in self.current_prices and self.current_prices[symbol] > 0:
|
||||
return self.current_prices[symbol]
|
||||
|
||||
# Return None instead of hardcoded fallbacks - let the caller handle missing data
|
||||
return None
|
||||
|
||||
def get_current_price(self, symbol: str) -> Optional[float]:
|
||||
"""Get current price with robust fallbacks - enhanced version"""
|
||||
try:
|
||||
# 1. Try live price API first (our enhanced method)
|
||||
price = self.get_live_price_from_api(symbol)
|
||||
if price and price > 0:
|
||||
return price
|
||||
|
||||
# 2. Try parent's get_current_price method
|
||||
if hasattr(super(), 'get_current_price'):
|
||||
try:
|
||||
price = super().get_current_price(symbol)
|
||||
if price and price > 0:
|
||||
return price
|
||||
except Exception as e:
|
||||
logger.debug(f"Parent get_current_price failed for {symbol}: {e}")
|
||||
|
||||
# 3. Try current prices cache
|
||||
if hasattr(self, 'current_prices') and symbol in self.current_prices:
|
||||
price = self.current_prices[symbol]
|
||||
if price and price > 0:
|
||||
return price
|
||||
|
||||
# 4. Try historical data from multiple timeframes
|
||||
for timeframe in ['1m', '5m', '1h']:
|
||||
try:
|
||||
df = self.get_historical_data(symbol, timeframe, limit=1, refresh=True)
|
||||
if df is not None and not df.empty:
|
||||
price = float(df['close'].iloc[-1])
|
||||
if price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
return price
|
||||
except Exception as tf_error:
|
||||
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
|
||||
continue
|
||||
|
||||
# 5. Try WebSocket cache if available
|
||||
ws_symbol = symbol.replace('/', '')
|
||||
if hasattr(self, 'ws_price_cache') and ws_symbol in self.ws_price_cache:
|
||||
price = self.ws_price_cache[ws_symbol]
|
||||
if price and price > 0:
|
||||
return price
|
||||
|
||||
logger.warning(f"Could not get current price for {symbol} from any source")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting current price for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def update_ws_price_cache(self, symbol: str, price: float):
|
||||
"""Update WebSocket price cache for dashboard compatibility"""
|
||||
try:
|
||||
ws_symbol = symbol.replace('/', '')
|
||||
self.ws_price_cache[ws_symbol] = price
|
||||
# Also update current prices for consistency
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Updated WS cache for {symbol}: ${price:.2f}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating WS cache for {symbol}: {e}")
|
||||
|
||||
def set_orchestrator(self, orchestrator):
|
||||
"""Set orchestrator reference for dashboard compatibility"""
|
||||
self.orchestrator = orchestrator
|
@ -141,6 +141,31 @@ class CleanTradingDashboard:
|
||||
self.orchestrator.set_trading_executor(self.trading_executor)
|
||||
logger.info("Trading executor connected to orchestrator for signal execution")
|
||||
|
||||
# Connect dashboard to orchestrator for COB data updates
|
||||
if hasattr(self.orchestrator, 'set_dashboard'):
|
||||
self.orchestrator.set_dashboard(self)
|
||||
logger.info("✅ Dashboard connected to orchestrator for COB data updates")
|
||||
|
||||
# Start orchestrator's real-time processing to ensure COB data flows
|
||||
if hasattr(self.orchestrator, 'start_continuous_trading'):
|
||||
try:
|
||||
# Start in background thread to avoid blocking dashboard startup
|
||||
import threading
|
||||
def start_orchestrator_trading():
|
||||
try:
|
||||
import asyncio
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self.orchestrator.start_continuous_trading())
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting orchestrator trading: {e}")
|
||||
|
||||
trading_thread = threading.Thread(target=start_orchestrator_trading, daemon=True)
|
||||
trading_thread.start()
|
||||
logger.info("✅ Started orchestrator real-time processing for COB data")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start orchestrator trading: {e}")
|
||||
|
||||
# Initialize enhanced training system for predictions
|
||||
self.training_system = None
|
||||
self._initialize_enhanced_training_system()
|
||||
@ -202,8 +227,20 @@ class CleanTradingDashboard:
|
||||
|
||||
# COB data cache - enhanced with price buckets and memory system
|
||||
self.cob_cache: dict = {
|
||||
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0},
|
||||
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
|
||||
'ETH/USDT': {
|
||||
'last_update': 0,
|
||||
'data': None,
|
||||
'updates_count': 0,
|
||||
'update_times': [],
|
||||
'update_rate': 0.0
|
||||
},
|
||||
'BTC/USDT': {
|
||||
'last_update': 0,
|
||||
'data': None,
|
||||
'updates_count': 0,
|
||||
'update_times': [],
|
||||
'update_rate': 0.0
|
||||
}
|
||||
}
|
||||
self.latest_cob_data: dict = {} # Cache for COB integration data
|
||||
self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display)
|
||||
@ -292,6 +329,49 @@ class CleanTradingDashboard:
|
||||
def _on_cob_data_update(self, symbol: str, cob_data: dict):
|
||||
"""Handle COB data updates from data provider"""
|
||||
try:
|
||||
|
||||
# Also update the COB cache for status display
|
||||
if not hasattr(self, 'cob_cache'):
|
||||
self.cob_cache = {}
|
||||
|
||||
if symbol not in self.cob_cache:
|
||||
self.cob_cache[symbol] = {
|
||||
'last_update': 0,
|
||||
'data': None,
|
||||
'updates_count': 0,
|
||||
'update_times': [], # Track recent update times for rate calculation
|
||||
'update_rate': 0.0
|
||||
}
|
||||
|
||||
# Update cache
|
||||
current_time = time.time()
|
||||
self.cob_cache[symbol]['data'] = cob_data
|
||||
self.cob_cache[symbol]['last_update'] = current_time
|
||||
self.cob_cache[symbol]['updates_count'] += 1
|
||||
self.cob_cache[symbol]['websocket_status'] = 'connected'
|
||||
self.cob_cache[symbol]['source'] = 'data_provider'
|
||||
|
||||
# Track update times for rate calculation (keep last 60 seconds)
|
||||
self.cob_cache[symbol]['update_times'].append(current_time)
|
||||
# Remove updates older than 60 seconds
|
||||
cutoff_time = current_time - 60
|
||||
self.cob_cache[symbol]['update_times'] = [
|
||||
t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time
|
||||
]
|
||||
|
||||
# Calculate update rate (updates per second)
|
||||
if len(self.cob_cache[symbol]['update_times']) > 1:
|
||||
time_span = current_time - self.cob_cache[symbol]['update_times'][0]
|
||||
if time_span > 0:
|
||||
self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span
|
||||
else:
|
||||
self.cob_cache[symbol]['update_rate'] = 0.0
|
||||
else:
|
||||
self.cob_cache[symbol]['update_rate'] = 0.0
|
||||
|
||||
logger.debug(f"Updated COB cache for {symbol} from data provider (updates: {self.cob_cache[symbol]['updates_count']})")
|
||||
|
||||
# Continue with existing logic
|
||||
# Update latest COB data cache
|
||||
if not hasattr(self, 'latest_cob_data'):
|
||||
self.latest_cob_data = {}
|
||||
@ -332,7 +412,6 @@ class CleanTradingDashboard:
|
||||
if not hasattr(self, 'cob_last_update'):
|
||||
self.cob_last_update = {}
|
||||
|
||||
import time
|
||||
self.cob_last_update[symbol] = time.time()
|
||||
|
||||
# Update current price from COB data
|
||||
@ -767,21 +846,22 @@ class CleanTradingDashboard:
|
||||
if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode:
|
||||
mexc_status = "LIVE+SYNC" # Indicate live trading with position sync
|
||||
|
||||
# COB WebSocket status
|
||||
# COB WebSocket status with update rate
|
||||
cob_status = self.get_cob_websocket_status()
|
||||
overall_status = cob_status.get('overall_status', 'unknown')
|
||||
warning_message = cob_status.get('warning_message')
|
||||
update_rate = cob_status.get('update_rate', 0.0)
|
||||
|
||||
if overall_status == 'all_connected':
|
||||
cob_status_str = "Connected"
|
||||
cob_status_str = f"Connected ({update_rate:.1f}/s)"
|
||||
elif overall_status == 'partial_fallback':
|
||||
cob_status_str = "Fallback"
|
||||
cob_status_str = f"Fallback ({update_rate:.1f}/s)"
|
||||
elif overall_status == 'degraded':
|
||||
cob_status_str = "Degraded"
|
||||
cob_status_str = f"Degraded ({update_rate:.1f}/s)"
|
||||
elif overall_status == 'unavailable':
|
||||
cob_status_str = "N/A"
|
||||
else:
|
||||
cob_status_str = "Error"
|
||||
cob_status_str = f"Error ({update_rate:.1f}/s)"
|
||||
|
||||
return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, cob_status_str, mexc_status
|
||||
|
||||
@ -1076,7 +1156,7 @@ class CleanTradingDashboard:
|
||||
return [html.I(className="fas fa-save me-1"), "Store All Models"]
|
||||
|
||||
def _get_current_price(self, symbol: str) -> Optional[float]:
|
||||
"""Get current price for symbol - ENHANCED with better fallbacks"""
|
||||
"""Get current price for symbol - ONLY using our data providers"""
|
||||
try:
|
||||
# Try WebSocket cache first
|
||||
ws_symbol = symbol.replace('/', '')
|
||||
@ -1099,6 +1179,16 @@ class CleanTradingDashboard:
|
||||
except Exception as dp_error:
|
||||
logger.debug(f"Data provider get_current_price failed: {dp_error}")
|
||||
|
||||
# Try data provider get_live_price_from_api method (our standardized method)
|
||||
if hasattr(self.data_provider, 'get_live_price_from_api'):
|
||||
try:
|
||||
price = self.data_provider.get_live_price_from_api(symbol)
|
||||
if price and price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
return price
|
||||
except Exception as live_error:
|
||||
logger.debug(f"Data provider get_live_price_from_api failed: {live_error}")
|
||||
|
||||
# Fallback to dashboard current prices
|
||||
if symbol in self.current_prices and self.current_prices[symbol] > 0:
|
||||
return self.current_prices[symbol]
|
||||
@ -1127,34 +1217,18 @@ class CleanTradingDashboard:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from orchestrator: ${price:.2f}")
|
||||
return price
|
||||
|
||||
# Try orchestrator's live price method
|
||||
if hasattr(self.orchestrator.data_provider, 'get_live_price_from_api'):
|
||||
price = self.orchestrator.data_provider.get_live_price_from_api(symbol)
|
||||
if price and price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from orchestrator live API: ${price:.2f}")
|
||||
return price
|
||||
except Exception as orch_error:
|
||||
logger.debug(f"Failed to get price from orchestrator: {orch_error}")
|
||||
|
||||
# Try external API as last resort
|
||||
try:
|
||||
import requests
|
||||
if symbol == 'ETH/USDT':
|
||||
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
if price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
|
||||
return price
|
||||
elif symbol == 'BTC/USDT':
|
||||
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
price = float(data['price'])
|
||||
if price > 0:
|
||||
self.current_prices[symbol] = price
|
||||
logger.debug(f"Got current price for {symbol} from Binance API: ${price:.2f}")
|
||||
return price
|
||||
except Exception as api_error:
|
||||
logger.debug(f"External API failed: {api_error}")
|
||||
|
||||
logger.warning(f"Could not get current price for {symbol} from any source")
|
||||
logger.warning(f"Could not get current price for {symbol} from any data provider source")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting current price for {symbol}: {e}")
|
||||
@ -1163,12 +1237,7 @@ class CleanTradingDashboard:
|
||||
if symbol in self.current_prices and self.current_prices[symbol] > 0:
|
||||
return self.current_prices[symbol]
|
||||
|
||||
# Return a reasonable fallback based on current market conditions
|
||||
if symbol == 'ETH/USDT':
|
||||
return 3385.0 # Current market price fallback
|
||||
elif symbol == 'BTC/USDT':
|
||||
return 119500.0 # Current market price fallback
|
||||
|
||||
# Return None instead of hardcoded fallbacks - let the UI handle missing data
|
||||
return None
|
||||
|
||||
def _create_price_chart(self, symbol: str) -> go.Figure:
|
||||
@ -6509,6 +6578,67 @@ class CleanTradingDashboard:
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error initializing Enhanced COB Integration: {e}")
|
||||
|
||||
def update_cob_data_from_orchestrator(self, symbol: str, cob_data: Dict):
|
||||
"""Update COB cache from orchestrator data - called by orchestrator"""
|
||||
try:
|
||||
# Initialize cache if needed
|
||||
if not hasattr(self, 'cob_cache'):
|
||||
self.cob_cache = {}
|
||||
|
||||
if symbol not in self.cob_cache:
|
||||
self.cob_cache[symbol] = {
|
||||
'last_update': 0,
|
||||
'data': None,
|
||||
'updates_count': 0,
|
||||
'update_times': [], # Track recent update times for rate calculation
|
||||
'update_rate': 0.0
|
||||
}
|
||||
|
||||
# Update cache with orchestrator data
|
||||
current_time = time.time()
|
||||
self.cob_cache[symbol]['data'] = cob_data
|
||||
self.cob_cache[symbol]['last_update'] = current_time
|
||||
self.cob_cache[symbol]['updates_count'] += 1
|
||||
|
||||
# Track update times for rate calculation (keep last 60 seconds)
|
||||
self.cob_cache[symbol]['update_times'].append(current_time)
|
||||
# Remove updates older than 60 seconds
|
||||
cutoff_time = current_time - 60
|
||||
self.cob_cache[symbol]['update_times'] = [
|
||||
t for t in self.cob_cache[symbol]['update_times'] if t > cutoff_time
|
||||
]
|
||||
|
||||
# Calculate update rate (updates per second)
|
||||
if len(self.cob_cache[symbol]['update_times']) > 1:
|
||||
time_span = current_time - self.cob_cache[symbol]['update_times'][0]
|
||||
if time_span > 0:
|
||||
self.cob_cache[symbol]['update_rate'] = len(self.cob_cache[symbol]['update_times']) / time_span
|
||||
else:
|
||||
self.cob_cache[symbol]['update_rate'] = 0.0
|
||||
else:
|
||||
self.cob_cache[symbol]['update_rate'] = 0.0
|
||||
|
||||
# Set WebSocket status based on data source
|
||||
if isinstance(cob_data, dict) and 'stats' in cob_data:
|
||||
source = cob_data['stats'].get('source', 'unknown')
|
||||
if 'websocket' in source.lower():
|
||||
self.cob_cache[symbol]['websocket_status'] = 'connected'
|
||||
self.cob_cache[symbol]['source'] = source
|
||||
elif 'rest' in source.lower() or 'fallback' in source.lower():
|
||||
self.cob_cache[symbol]['websocket_status'] = 'fallback'
|
||||
self.cob_cache[symbol]['source'] = source
|
||||
else:
|
||||
self.cob_cache[symbol]['websocket_status'] = 'unknown'
|
||||
self.cob_cache[symbol]['source'] = source
|
||||
else:
|
||||
self.cob_cache[symbol]['websocket_status'] = 'connected'
|
||||
self.cob_cache[symbol]['source'] = 'orchestrator'
|
||||
|
||||
logger.debug(f"Updated COB cache for {symbol} from orchestrator: {self.cob_cache[symbol]['websocket_status']} (updates: {self.cob_cache[symbol]['updates_count']})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating COB cache from orchestrator for {symbol}: {e}")
|
||||
|
||||
def _on_enhanced_cob_update(self, symbol: str, data: Dict):
|
||||
"""Handle enhanced COB updates with WebSocket status"""
|
||||
try:
|
||||
@ -6524,7 +6654,13 @@ class CleanTradingDashboard:
|
||||
|
||||
# Update COB cache with status
|
||||
if symbol not in self.cob_cache:
|
||||
self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0}
|
||||
self.cob_cache[symbol] = {
|
||||
'last_update': 0,
|
||||
'data': None,
|
||||
'updates_count': 0,
|
||||
'update_times': [],
|
||||
'update_rate': 0.0
|
||||
}
|
||||
|
||||
self.cob_cache[symbol]['websocket_status'] = status
|
||||
self.cob_cache[symbol]['websocket_message'] = message
|
||||
@ -6620,11 +6756,15 @@ class CleanTradingDashboard:
|
||||
status_summary['overall_status'] = 'error'
|
||||
status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down'
|
||||
|
||||
# Set last update time
|
||||
# Set last update time and calculate overall update rate
|
||||
last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()]
|
||||
if last_updates and max(last_updates) > 0:
|
||||
status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat()
|
||||
|
||||
# Calculate overall update rate (sum of all symbols)
|
||||
total_update_rate = sum(cache.get('update_rate', 0.0) for cache in self.cob_cache.values())
|
||||
status_summary['update_rate'] = total_update_rate
|
||||
|
||||
return status_summary
|
||||
|
||||
except Exception as e:
|
||||
|
@ -256,11 +256,12 @@ class DashboardLayoutManager:
|
||||
return html.Div([
|
||||
html.Div([
|
||||
html.Div([
|
||||
# Chart header with manual trading buttons
|
||||
# Chart header with manual trading buttons and live price
|
||||
html.Div([
|
||||
html.H6([
|
||||
html.I(className="fas fa-chart-candlestick me-2"),
|
||||
"Live 1m Price Chart (3h) + 1s Mini Chart (5min) - Updated Every Second"
|
||||
"Live Price (WebSocket): ",
|
||||
html.Span(id="current-price", className="ms-2 text-primary", style={"fontWeight": "bold", "fontSize": "1.2em"})
|
||||
], className="card-title mb-0"),
|
||||
html.Div([
|
||||
html.Button([
|
||||
|
Reference in New Issue
Block a user