#!/usr/bin/env python3 """ Test COB WebSocket Only Integration This script tests that COB integration works with Enhanced WebSocket only, without falling back to REST API calls. """ import asyncio import time from datetime import datetime from typing import Dict from core.cob_integration import COBIntegration async def test_cob_websocket_only(): """Test COB integration with WebSocket only""" print("=== Testing COB WebSocket Only Integration ===") # Initialize COB integration print("1. Initializing COB integration...") symbols = ['ETH/USDT', 'BTC/USDT'] cob_integration = COBIntegration(symbols=symbols) # Track updates update_count = 0 last_update_time = None def dashboard_callback(symbol: str, data: Dict): nonlocal update_count, last_update_time update_count += 1 last_update_time = datetime.now() if update_count <= 5: # Show first 5 updates data_type = data.get('type', 'unknown') if data_type == 'cob_update': stats = data.get('data', {}).get('stats', {}) mid_price = stats.get('mid_price', 0) spread_bps = stats.get('spread_bps', 0) source = stats.get('source', 'unknown') print(f" Update #{update_count}: {symbol} - Price: ${mid_price:.2f}, Spread: {spread_bps:.1f}bps, Source: {source}") elif data_type == 'websocket_status': status_data = data.get('data', {}) status = status_data.get('status', 'unknown') print(f" Status #{update_count}: {symbol} - WebSocket: {status}") # Add dashboard callback cob_integration.add_dashboard_callback(dashboard_callback) # Start COB integration print("2. Starting COB integration...") try: # Start in background start_task = asyncio.create_task(cob_integration.start()) # Wait for initialization await asyncio.sleep(3) # Check if COB provider is disabled print("3. Checking COB provider status:") if cob_integration.cob_provider is None: print(" ✅ COB provider is disabled (using Enhanced WebSocket only)") else: print(" ❌ COB provider is still active (may cause REST API fallback)") # Check Enhanced WebSocket status print("4. Checking Enhanced WebSocket status:") if cob_integration.enhanced_websocket: print(" ✅ Enhanced WebSocket is initialized") # Check WebSocket status for each symbol websocket_status = cob_integration.get_websocket_status() for symbol, status in websocket_status.items(): print(f" {symbol}: {status}") else: print(" ❌ Enhanced WebSocket is not initialized") # Monitor updates for a few seconds print("5. Monitoring COB updates...") initial_count = update_count monitor_start = time.time() # Wait for updates await asyncio.sleep(5) monitor_duration = time.time() - monitor_start updates_received = update_count - initial_count update_rate = updates_received / monitor_duration print(f" Received {updates_received} updates in {monitor_duration:.1f}s") print(f" Update rate: {update_rate:.1f} updates/second") if update_rate >= 8: # Should be around 10 updates/second print(" ✅ Update rate is excellent (8+ updates/second)") elif update_rate >= 5: print(" ✅ Update rate is good (5+ updates/second)") elif update_rate >= 1: print(" ⚠️ Update rate is low (1+ updates/second)") else: print(" ❌ Update rate is too low (<1 update/second)") # Check data quality print("6. Data quality check:") if last_update_time: time_since_last = (datetime.now() - last_update_time).total_seconds() if time_since_last < 1: print(f" ✅ Recent data (last update {time_since_last:.1f}s ago)") else: print(f" ⚠️ Stale data (last update {time_since_last:.1f}s ago)") else: print(" ❌ No updates received") # Stop the integration print("7. Stopping COB integration...") await cob_integration.stop() # Cancel the start task start_task.cancel() try: await start_task except asyncio.CancelledError: pass except Exception as e: print(f" ❌ Error during COB integration test: {e}") print(f"\n✅ COB WebSocket only test completed!") print(f"Total updates received: {update_count}") print("Enhanced WebSocket is now the sole data source (no REST API fallback)") if __name__ == "__main__": asyncio.run(test_cob_websocket_only())