stability

This commit is contained in:
Dobromir Popov
2025-08-08 17:32:09 +03:00
parent 78b96c10af
commit f8d3e1c999
4 changed files with 35 additions and 14 deletions

View File

@ -75,6 +75,11 @@ class COBIntegration:
self.liquidity_alerts[symbol] = [] self.liquidity_alerts[symbol] = []
self.arbitrage_opportunities[symbol] = [] self.arbitrage_opportunities[symbol] = []
# Idempotence and task tracking
self._started: bool = False
self._analysis_task: Optional[asyncio.Task] = None
self._signal_task: Optional[asyncio.Task] = None
logger.info("COB Integration initialized with Enhanced WebSocket support") logger.info("COB Integration initialized with Enhanced WebSocket support")
logger.info(f"Symbols: {self.symbols}") logger.info(f"Symbols: {self.symbols}")
@ -82,6 +87,11 @@ class COBIntegration:
"""Start COB integration with Enhanced WebSocket""" """Start COB integration with Enhanced WebSocket"""
logger.info(" Starting COB Integration with Enhanced WebSocket") logger.info(" Starting COB Integration with Enhanced WebSocket")
# Prevent duplicate start
if self._started:
logger.warning("COB Integration already started - skipping duplicate start")
return
# Initialize Enhanced WebSocket first # Initialize Enhanced WebSocket first
try: try:
self.enhanced_websocket = EnhancedCOBWebSocket( self.enhanced_websocket = EnhancedCOBWebSocket(
@ -106,11 +116,14 @@ class COBIntegration:
# Set cob_provider to None to indicate we're using Enhanced WebSocket only # Set cob_provider to None to indicate we're using Enhanced WebSocket only
self.cob_provider = None self.cob_provider = None
# Start analysis threads # Start analysis threads (store tasks; avoid duplicates)
asyncio.create_task(self._continuous_cob_analysis()) if self._analysis_task is None or self._analysis_task.done():
asyncio.create_task(self._continuous_signal_generation()) self._analysis_task = asyncio.create_task(self._continuous_cob_analysis())
if self._signal_task is None or self._signal_task.done():
self._signal_task = asyncio.create_task(self._continuous_signal_generation())
logger.info(" COB Integration started successfully with Enhanced WebSocket") logger.info(" COB Integration started successfully with Enhanced WebSocket")
self._started = True
async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict): async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict):
"""Handle COB updates from Enhanced WebSocket""" """Handle COB updates from Enhanced WebSocket"""
@ -854,9 +867,7 @@ class COBIntegration:
def _initialize_cob_integration(self): def _initialize_cob_integration(self):
"""Initialize COB integration with high-frequency data handling""" """Initialize COB integration with high-frequency data handling"""
logger.info("Initializing COB integration...") logger.info("Initializing COB integration...")
if not COB_INTEGRATION_AVAILABLE: # Proceed without external feature flag; availability handled by exceptions
logger.warning("COB integration not available - skipping initialization")
return
try: try:
if not hasattr(self.orchestrator, 'cob_integration') or self.orchestrator.cob_integration is None: if not hasattr(self.orchestrator, 'cob_integration') or self.orchestrator.cob_integration is None:

View File

@ -176,6 +176,11 @@ class EnhancedCOBWebSocket:
"""Start COB WebSocket connections""" """Start COB WebSocket connections"""
logger.info("Starting Enhanced COB WebSocket system") logger.info("Starting Enhanced COB WebSocket system")
# Idempotence guard: avoid spawning duplicate tasks if already started
if any(task for task in self.websocket_tasks.values() if task and not task.done()):
logger.warning("Enhanced COB WebSocket already started - skipping duplicate start")
return
# Initialize REST session for fallback # Initialize REST session for fallback
await self._init_rest_session() await self._init_rest_session()
@ -386,9 +391,12 @@ class EnhancedCOBWebSocket:
await self._start_rest_fallback(symbol) await self._start_rest_fallback(symbol)
return return
# Cancel existing task if running # If a task exists and is still running, do not spawn another
if symbol in self.websocket_tasks and not self.websocket_tasks[symbol].done(): if symbol in self.websocket_tasks:
self.websocket_tasks[symbol].cancel() existing = self.websocket_tasks[symbol]
if existing and not existing.done():
logger.warning(f"WebSocket task already running for {symbol} - skipping restart")
return
# Start new WebSocket task # Start new WebSocket task
self.websocket_tasks[symbol] = asyncio.create_task( self.websocket_tasks[symbol] = asyncio.create_task(

View File

@ -3406,7 +3406,7 @@ class TradingOrchestrator:
"""Evaluate prediction outcome and train model""" """Evaluate prediction outcome and train model"""
try: try:
model_name = record["model_name"] model_name = record["model_name"]
prediction = record["prediction"] prediction = record.get("prediction") or {}
timestamp = record["timestamp"] timestamp = record["timestamp"]
# Convert timestamp string back to datetime if needed # Convert timestamp string back to datetime if needed
@ -3451,10 +3451,8 @@ class TradingOrchestrator:
return return
# Enhanced reward system based on prediction confidence and price movement magnitude # Enhanced reward system based on prediction confidence and price movement magnitude
predicted_action = prediction["action"] predicted_action = prediction.get("action", "HOLD")
prediction_confidence = prediction.get( prediction_confidence = prediction.get("confidence", 0.5)
"confidence", 0.5
) # Default to 0.5 if missing
# Calculate sophisticated reward based on multiple factors # Calculate sophisticated reward based on multiple factors
current_pnl = self._get_current_position_pnl(symbol) current_pnl = self._get_current_position_pnl(symbol)

View File

@ -1,4 +1,5 @@
import logging import logging
import os
from core.config import setup_logging, get_config from core.config import setup_logging, get_config
from core.trading_executor import TradingExecutor from core.trading_executor import TradingExecutor
from core.orchestrator import TradingOrchestrator from core.orchestrator import TradingOrchestrator
@ -6,6 +7,9 @@ from core.standardized_data_provider import StandardizedDataProvider
from web.clean_dashboard import CleanTradingDashboard from web.clean_dashboard import CleanTradingDashboard
def main(): def main():
# Mitigate OpenMP duplicate runtime crash on Windows by allowing duplicates
# This avoids hard crashes from multiple linked OpenMP runtimes.
os.environ.setdefault('KMP_DUPLICATE_LIB_OK', 'TRUE')
setup_logging() setup_logging()
cfg = get_config() cfg = get_config()