From f8d3e1c999da9113183d23f07b8226baedb1bb75 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 8 Aug 2025 17:32:09 +0300 Subject: [PATCH] stability --- core/cob_integration.py | 23 +++++++++++++++++------ core/enhanced_cob_websocket.py | 14 +++++++++++--- core/orchestrator.py | 8 +++----- run_clean_dashboard.py | 4 ++++ 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/core/cob_integration.py b/core/cob_integration.py index 6daa7b3..bee5d63 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -74,6 +74,11 @@ class COBIntegration: self.cob_signals[symbol] = [] self.liquidity_alerts[symbol] = [] self.arbitrage_opportunities[symbol] = [] + + # Idempotence and task tracking + self._started: bool = False + self._analysis_task: Optional[asyncio.Task] = None + self._signal_task: Optional[asyncio.Task] = None logger.info("COB Integration initialized with Enhanced WebSocket support") logger.info(f"Symbols: {self.symbols}") @@ -81,6 +86,11 @@ class COBIntegration: async def start(self): """Start COB integration with Enhanced WebSocket""" logger.info(" Starting COB Integration with Enhanced WebSocket") + + # Prevent duplicate start + if self._started: + logger.warning("COB Integration already started - skipping duplicate start") + return # Initialize Enhanced WebSocket first try: @@ -106,11 +116,14 @@ class COBIntegration: # Set cob_provider to None to indicate we're using Enhanced WebSocket only self.cob_provider = None - # Start analysis threads - asyncio.create_task(self._continuous_cob_analysis()) - asyncio.create_task(self._continuous_signal_generation()) + # Start analysis threads (store tasks; avoid duplicates) + if self._analysis_task is None or self._analysis_task.done(): + self._analysis_task = asyncio.create_task(self._continuous_cob_analysis()) + if self._signal_task is None or self._signal_task.done(): + self._signal_task = asyncio.create_task(self._continuous_signal_generation()) logger.info(" COB Integration started successfully with Enhanced WebSocket") + self._started = True async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict): """Handle COB updates from Enhanced WebSocket""" @@ -854,9 +867,7 @@ class COBIntegration: def _initialize_cob_integration(self): """Initialize COB integration with high-frequency data handling""" logger.info("Initializing COB integration...") - if not COB_INTEGRATION_AVAILABLE: - logger.warning("COB integration not available - skipping initialization") - return + # Proceed without external feature flag; availability handled by exceptions try: if not hasattr(self.orchestrator, 'cob_integration') or self.orchestrator.cob_integration is None: diff --git a/core/enhanced_cob_websocket.py b/core/enhanced_cob_websocket.py index 07dbf85..5ec0c9a 100644 --- a/core/enhanced_cob_websocket.py +++ b/core/enhanced_cob_websocket.py @@ -175,6 +175,11 @@ class EnhancedCOBWebSocket: async def start(self): """Start COB WebSocket connections""" logger.info("Starting Enhanced COB WebSocket system") + + # Idempotence guard: avoid spawning duplicate tasks if already started + if any(task for task in self.websocket_tasks.values() if task and not task.done()): + logger.warning("Enhanced COB WebSocket already started - skipping duplicate start") + return # Initialize REST session for fallback await self._init_rest_session() @@ -386,9 +391,12 @@ class EnhancedCOBWebSocket: await self._start_rest_fallback(symbol) return - # Cancel existing task if running - if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): - self.websocket_tasks[symbol].cancel() + # If a task exists and is still running, do not spawn another + if symbol in self.websocket_tasks: + existing = self.websocket_tasks[symbol] + if existing and not existing.done(): + logger.warning(f"WebSocket task already running for {symbol} - skipping restart") + return # Start new WebSocket task self.websocket_tasks[symbol] = asyncio.create_task( diff --git a/core/orchestrator.py b/core/orchestrator.py index ea0e683..0bb3cef 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -3406,7 +3406,7 @@ class TradingOrchestrator: """Evaluate prediction outcome and train model""" try: model_name = record["model_name"] - prediction = record["prediction"] + prediction = record.get("prediction") or {} timestamp = record["timestamp"] # Convert timestamp string back to datetime if needed @@ -3451,10 +3451,8 @@ class TradingOrchestrator: return # Enhanced reward system based on prediction confidence and price movement magnitude - predicted_action = prediction["action"] - prediction_confidence = prediction.get( - "confidence", 0.5 - ) # Default to 0.5 if missing + predicted_action = prediction.get("action", "HOLD") + prediction_confidence = prediction.get("confidence", 0.5) # Calculate sophisticated reward based on multiple factors current_pnl = self._get_current_position_pnl(symbol) diff --git a/run_clean_dashboard.py b/run_clean_dashboard.py index d1b38b3..edf7c7d 100644 --- a/run_clean_dashboard.py +++ b/run_clean_dashboard.py @@ -1,4 +1,5 @@ import logging +import os from core.config import setup_logging, get_config from core.trading_executor import TradingExecutor from core.orchestrator import TradingOrchestrator @@ -6,6 +7,9 @@ from core.standardized_data_provider import StandardizedDataProvider from web.clean_dashboard import CleanTradingDashboard def main(): + # Mitigate OpenMP duplicate runtime crash on Windows by allowing duplicates + # This avoids hard crashes from multiple linked OpenMP runtimes. + os.environ.setdefault('KMP_DUPLICATE_LIB_OK', 'TRUE') setup_logging() cfg = get_config()