#!/usr/bin/env python3 """ Test Enhanced Data Provider WebSocket Integration This script tests the integration between the Enhanced COB WebSocket and the Data Provider. """ import asyncio import logging import sys import time from datetime import datetime # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Import the enhanced data provider try: from core.data_provider import DataProvider print("āœ… Enhanced Data Provider imported successfully") except ImportError as e: print(f"āŒ Failed to import Enhanced Data Provider: {e}") sys.exit(1) async def test_enhanced_websocket_integration(): """Test the enhanced WebSocket integration with data provider""" print("šŸš€ Testing Enhanced WebSocket Integration with Data Provider") print("=" * 70) # Test 1: Initialize Data Provider print("\n1. Initializing Data Provider...") try: data_provider = DataProvider( symbols=['ETH/USDT', 'BTC/USDT'], timeframes=['1m', '1h'] ) print("āœ… Data Provider initialized") except Exception as e: print(f"āŒ Failed to initialize Data Provider: {e}") return # Test 2: Start Enhanced WebSocket Streaming print("\n2. Starting Enhanced WebSocket streaming...") try: await data_provider.start_real_time_streaming() print("āœ… Enhanced WebSocket streaming started") except Exception as e: print(f"āŒ Failed to start WebSocket streaming: {e}") return # Test 3: Check WebSocket Status print("\n3. Checking WebSocket status...") try: status = data_provider.get_cob_websocket_status() overall_status = status.get('overall_status', 'unknown') print(f"Overall WebSocket status: {overall_status}") for symbol, symbol_status in status.get('symbols', {}).items(): connected = symbol_status.get('connected', False) messages = symbol_status.get('messages_received', 0) fallback = symbol_status.get('rest_fallback_active', False) if connected: print(f" {symbol}: āœ… Connected ({messages} messages)") elif fallback: print(f" {symbol}: āš ļø REST fallback active") else: print(f" {symbol}: āŒ Disconnected") except Exception as e: print(f"āŒ Error checking WebSocket status: {e}") # Test 4: Monitor COB Data for 30 seconds print("\n4. Monitoring COB data for 30 seconds...") start_time = time.time() data_received = {'ETH/USDT': 0, 'BTC/USDT': 0} while time.time() - start_time < 30: try: for symbol in ['ETH/USDT', 'BTC/USDT']: cob_data = data_provider.get_latest_cob_data(symbol) if cob_data: data_received[symbol] += 1 if data_received[symbol] % 10 == 1: # Print every 10th update bids = len(cob_data.get('bids', [])) asks = len(cob_data.get('asks', [])) source = cob_data.get('source', 'unknown') mid_price = cob_data.get('stats', {}).get('mid_price', 0) print(f" šŸ“Š {symbol}: ${mid_price:.2f}, {bids} bids, {asks} asks (via {source})") await asyncio.sleep(2) # Check every 2 seconds except KeyboardInterrupt: print("\nā¹ļø Test interrupted by user") break except Exception as e: print(f"āŒ Error monitoring COB data: {e}") break # Test 5: Final Status Check print("\n5. Final status check...") try: for symbol in ['ETH/USDT', 'BTC/USDT']: count = data_received[symbol] if count > 0: print(f" {symbol}: āœ… Received {count} COB updates") else: print(f" {symbol}: āŒ No COB data received") # Check overall WebSocket status again final_status = data_provider.get_cob_websocket_status() print(f"Final WebSocket status: {final_status.get('overall_status', 'unknown')}") except Exception as e: print(f"āŒ Error in final status check: {e}") # Test 6: Stop WebSocket Streaming print("\n6. Stopping WebSocket streaming...") try: await data_provider.stop_real_time_streaming() print("āœ… WebSocket streaming stopped") except Exception as e: print(f"āŒ Error stopping WebSocket streaming: {e}") print("\n" + "=" * 70) print("šŸ Enhanced WebSocket Integration Test Completed") # Summary total_updates = sum(data_received.values()) if total_updates > 0: print(f"āœ… SUCCESS: Received {total_updates} total COB updates") print("šŸŽ‰ Enhanced WebSocket integration is working!") else: print("āŒ FAILURE: No COB data received") print("āš ļø Enhanced WebSocket integration needs investigation") if __name__ == "__main__": try: asyncio.run(test_enhanced_websocket_integration()) except KeyboardInterrupt: print("\nā¹ļø Test interrupted") except Exception as e: print(f"āŒ Test failed: {e}") import traceback traceback.print_exc()