refactoring cob ws

This commit is contained in:
Dobromir Popov
2025-07-20 21:23:27 +03:00
parent 330f0de053
commit 0838a828ce

View File

@ -224,7 +224,7 @@ class CleanTradingDashboard:
logger.debug("Universal Data Adapter ready for orchestrator data access")
# Initialize COB integration with enhanced WebSocket
self._initialize_enhanced_cob_integration()
self._initialize_cob_integration() # Use the working COB integration method
# Start signal generation loop to ensure continuous trading signals
self._start_signal_generation_loop()
@ -5379,6 +5379,170 @@ class CleanTradingDashboard:
logger.warning("Falling back to direct data provider COB collection")
self._start_simple_cob_collection()
def _initialize_enhanced_cob_integration(self):
"""Initialize enhanced COB integration with WebSocket status monitoring"""
try:
if not COB_INTEGRATION_AVAILABLE:
logger.warning("⚠️ COB integration not available - WebSocket status will show as unavailable")
return
logger.info("🚀 Initializing Enhanced COB Integration with WebSocket monitoring")
# Initialize COB integration
self.cob_integration = COBIntegration(
data_provider=self.data_provider,
symbols=['ETH/USDT', 'BTC/USDT']
)
# Add dashboard callback for COB data
self.cob_integration.add_dashboard_callback(self._on_enhanced_cob_update)
# Start COB integration in background thread
def start_cob_integration():
try:
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.cob_integration.start())
loop.run_forever()
except Exception as e:
logger.error(f"❌ Error in COB integration thread: {e}")
cob_thread = threading.Thread(target=start_cob_integration, daemon=True)
cob_thread.start()
logger.info("✅ Enhanced COB Integration started with WebSocket monitoring")
except Exception as e:
logger.error(f"❌ Error initializing Enhanced COB Integration: {e}")
def _on_enhanced_cob_update(self, symbol: str, data: Dict):
"""Handle enhanced COB updates with WebSocket status"""
try:
# Update COB data cache
self.latest_cob_data[symbol] = data
# Extract WebSocket status if available
if isinstance(data, dict) and 'type' in data:
if data['type'] == 'websocket_status':
status_data = data.get('data', {})
status = status_data.get('status', 'unknown')
message = status_data.get('message', '')
# Update COB cache with status
if symbol not in self.cob_cache:
self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0}
self.cob_cache[symbol]['websocket_status'] = status
self.cob_cache[symbol]['websocket_message'] = message
self.cob_cache[symbol]['last_status_update'] = time.time()
logger.info(f"🔌 COB WebSocket status for {symbol}: {status} - {message}")
elif data['type'] == 'cob_update':
# Regular COB data update
cob_data = data.get('data', {})
stats = cob_data.get('stats', {})
# Update cache
self.cob_cache[symbol]['data'] = cob_data
self.cob_cache[symbol]['last_update'] = time.time()
self.cob_cache[symbol]['updates_count'] += 1
# Update WebSocket status from stats
websocket_status = stats.get('websocket_status', 'unknown')
source = stats.get('source', 'unknown')
self.cob_cache[symbol]['websocket_status'] = websocket_status
self.cob_cache[symbol]['source'] = source
logger.debug(f"📊 Enhanced COB update for {symbol}: {websocket_status} via {source}")
except Exception as e:
logger.error(f"❌ Error handling enhanced COB update for {symbol}: {e}")
def get_cob_websocket_status(self) -> Dict[str, Any]:
"""Get COB WebSocket status for dashboard display"""
try:
status_summary = {
'overall_status': 'unknown',
'symbols': {},
'last_update': None,
'warning_message': None
}
if not COB_INTEGRATION_AVAILABLE:
status_summary['overall_status'] = 'unavailable'
status_summary['warning_message'] = 'COB integration not available'
return status_summary
connected_count = 0
fallback_count = 0
error_count = 0
for symbol in ['ETH/USDT', 'BTC/USDT']:
symbol_status = {
'status': 'unknown',
'message': 'No data',
'last_update': None,
'source': 'unknown'
}
if symbol in self.cob_cache:
cache_data = self.cob_cache[symbol]
ws_status = cache_data.get('websocket_status', 'unknown')
source = cache_data.get('source', 'unknown')
last_update = cache_data.get('last_update', 0)
symbol_status['status'] = ws_status
symbol_status['source'] = source
symbol_status['last_update'] = datetime.fromtimestamp(last_update).isoformat() if last_update > 0 else None
# Determine status category
if ws_status == 'connected':
connected_count += 1
symbol_status['message'] = 'WebSocket connected'
elif ws_status == 'fallback' or source == 'rest_fallback':
fallback_count += 1
symbol_status['message'] = 'Using REST API fallback'
else:
error_count += 1
symbol_status['message'] = cache_data.get('websocket_message', 'Connection error')
status_summary['symbols'][symbol] = symbol_status
# Determine overall status
total_symbols = len(['ETH/USDT', 'BTC/USDT'])
if connected_count == total_symbols:
status_summary['overall_status'] = 'all_connected'
status_summary['warning_message'] = None
elif connected_count + fallback_count == total_symbols:
status_summary['overall_status'] = 'partial_fallback'
status_summary['warning_message'] = f'⚠️ {fallback_count} symbol(s) using REST fallback - WebSocket connection failed'
elif fallback_count > 0:
status_summary['overall_status'] = 'degraded'
status_summary['warning_message'] = f'⚠️ COB WebSocket degraded - {error_count} error(s), {fallback_count} fallback(s)'
else:
status_summary['overall_status'] = 'error'
status_summary['warning_message'] = '❌ COB WebSocket failed - All connections down'
# Set last update time
last_updates = [cache.get('last_update', 0) for cache in self.cob_cache.values()]
if last_updates and max(last_updates) > 0:
status_summary['last_update'] = datetime.fromtimestamp(max(last_updates)).isoformat()
return status_summary
except Exception as e:
logger.error(f"❌ Error getting COB WebSocket status: {e}")
return {
'overall_status': 'error',
'warning_message': f'Error getting status: {e}',
'symbols': {},
'last_update': None
}
def _start_simple_cob_collection(self):
"""Start COB data collection using the centralized data provider"""
try: