Files
gogo2/test_enhanced_data_provider_websocket.py
2025-07-22 15:44:59 +03:00

149 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Test Enhanced Data Provider WebSocket Integration
This script tests the integration between the Enhanced COB WebSocket and the Data Provider.
"""
import asyncio
import logging
import sys
import time
from datetime import datetime
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Import the enhanced data provider
try:
from core.data_provider import DataProvider
print("✅ Enhanced Data Provider imported successfully")
except ImportError as e:
print(f"❌ Failed to import Enhanced Data Provider: {e}")
sys.exit(1)
async def test_enhanced_websocket_integration():
"""Test the enhanced WebSocket integration with data provider"""
print("🚀 Testing Enhanced WebSocket Integration with Data Provider")
print("=" * 70)
# Test 1: Initialize Data Provider
print("\n1. Initializing Data Provider...")
try:
data_provider = DataProvider(
symbols=['ETH/USDT', 'BTC/USDT'],
timeframes=['1m', '1h']
)
print("✅ Data Provider initialized")
except Exception as e:
print(f"❌ Failed to initialize Data Provider: {e}")
return
# Test 2: Start Enhanced WebSocket Streaming
print("\n2. Starting Enhanced WebSocket streaming...")
try:
await data_provider.start_real_time_streaming()
print("✅ Enhanced WebSocket streaming started")
except Exception as e:
print(f"❌ Failed to start WebSocket streaming: {e}")
return
# Test 3: Check WebSocket Status
print("\n3. Checking WebSocket status...")
try:
status = data_provider.get_cob_websocket_status()
overall_status = status.get('overall_status', 'unknown')
print(f"Overall WebSocket status: {overall_status}")
for symbol, symbol_status in status.get('symbols', {}).items():
connected = symbol_status.get('connected', False)
messages = symbol_status.get('messages_received', 0)
fallback = symbol_status.get('rest_fallback_active', False)
if connected:
print(f" {symbol}: ✅ Connected ({messages} messages)")
elif fallback:
print(f" {symbol}: ⚠️ REST fallback active")
else:
print(f" {symbol}: ❌ Disconnected")
except Exception as e:
print(f"❌ Error checking WebSocket status: {e}")
# Test 4: Monitor COB Data for 30 seconds
print("\n4. Monitoring COB data for 30 seconds...")
start_time = time.time()
data_received = {'ETH/USDT': 0, 'BTC/USDT': 0}
while time.time() - start_time < 30:
try:
for symbol in ['ETH/USDT', 'BTC/USDT']:
cob_data = data_provider.get_latest_cob_data(symbol)
if cob_data:
data_received[symbol] += 1
if data_received[symbol] % 10 == 1: # Print every 10th update
bids = len(cob_data.get('bids', []))
asks = len(cob_data.get('asks', []))
source = cob_data.get('source', 'unknown')
mid_price = cob_data.get('stats', {}).get('mid_price', 0)
print(f" 📊 {symbol}: ${mid_price:.2f}, {bids} bids, {asks} asks (via {source})")
await asyncio.sleep(2) # Check every 2 seconds
except KeyboardInterrupt:
print("\n⏹️ Test interrupted by user")
break
except Exception as e:
print(f"❌ Error monitoring COB data: {e}")
break
# Test 5: Final Status Check
print("\n5. Final status check...")
try:
for symbol in ['ETH/USDT', 'BTC/USDT']:
count = data_received[symbol]
if count > 0:
print(f" {symbol}: ✅ Received {count} COB updates")
else:
print(f" {symbol}: ❌ No COB data received")
# Check overall WebSocket status again
final_status = data_provider.get_cob_websocket_status()
print(f"Final WebSocket status: {final_status.get('overall_status', 'unknown')}")
except Exception as e:
print(f"❌ Error in final status check: {e}")
# Test 6: Stop WebSocket Streaming
print("\n6. Stopping WebSocket streaming...")
try:
await data_provider.stop_real_time_streaming()
print("✅ WebSocket streaming stopped")
except Exception as e:
print(f"❌ Error stopping WebSocket streaming: {e}")
print("\n" + "=" * 70)
print("🏁 Enhanced WebSocket Integration Test Completed")
# Summary
total_updates = sum(data_received.values())
if total_updates > 0:
print(f"✅ SUCCESS: Received {total_updates} total COB updates")
print("🎉 Enhanced WebSocket integration is working!")
else:
print("❌ FAILURE: No COB data received")
print("⚠️ Enhanced WebSocket integration needs investigation")
if __name__ == "__main__":
try:
asyncio.run(test_enhanced_websocket_integration())
except KeyboardInterrupt:
print("\n⏹️ Test interrupted")
except Exception as e:
print(f"❌ Test failed: {e}")
import traceback
traceback.print_exc()