148 lines
5.1 KiB
Python
148 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test Enhanced COB WebSocket Implementation
|
|
|
|
This script tests the enhanced COB WebSocket system to ensure:
|
|
1. WebSocket connections work properly
|
|
2. Fallback to REST API when WebSocket fails
|
|
3. Dashboard status updates are working
|
|
4. Clear error messages and warnings are displayed
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import the enhanced COB WebSocket
|
|
try:
|
|
from core.enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket
|
|
print("✅ Enhanced COB WebSocket imported successfully")
|
|
except ImportError as e:
|
|
print(f"❌ Failed to import Enhanced COB WebSocket: {e}")
|
|
sys.exit(1)
|
|
|
|
async def test_dashboard_callback(status_data):
|
|
"""Test dashboard callback function"""
|
|
print(f"📊 Dashboard callback received: {status_data}")
|
|
|
|
async def test_cob_callback(symbol, cob_data):
|
|
"""Test COB data callback function"""
|
|
stats = cob_data.get('stats', {})
|
|
mid_price = stats.get('mid_price', 0)
|
|
bid_levels = len(cob_data.get('bids', []))
|
|
ask_levels = len(cob_data.get('asks', []))
|
|
source = cob_data.get('source', 'unknown')
|
|
|
|
print(f"📈 COB data for {symbol}: ${mid_price:.2f}, {bid_levels} bids, {ask_levels} asks (via {source})")
|
|
|
|
async def main():
|
|
"""Main test function"""
|
|
print("🚀 Testing Enhanced COB WebSocket System")
|
|
print("=" * 60)
|
|
|
|
# Test 1: Initialize Enhanced COB WebSocket
|
|
print("\n1. Initializing Enhanced COB WebSocket...")
|
|
try:
|
|
cob_ws = EnhancedCOBWebSocket(
|
|
symbols=['BTC/USDT', 'ETH/USDT'],
|
|
dashboard_callback=test_dashboard_callback
|
|
)
|
|
|
|
# Add callbacks
|
|
cob_ws.add_cob_callback(test_cob_callback)
|
|
|
|
print("✅ Enhanced COB WebSocket initialized")
|
|
except Exception as e:
|
|
print(f"❌ Failed to initialize: {e}")
|
|
return
|
|
|
|
# Test 2: Start WebSocket connections
|
|
print("\n2. Starting WebSocket connections...")
|
|
try:
|
|
await cob_ws.start()
|
|
print("✅ WebSocket connections started")
|
|
except Exception as e:
|
|
print(f"❌ Failed to start connections: {e}")
|
|
return
|
|
|
|
# Test 3: Monitor connections for 30 seconds
|
|
print("\n3. Monitoring connections for 30 seconds...")
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < 30:
|
|
try:
|
|
# Get status summary
|
|
status = cob_ws.get_status_summary()
|
|
overall_status = status.get('overall_status', 'unknown')
|
|
|
|
print(f"⏱️ Status: {overall_status}")
|
|
|
|
# Print symbol-specific status
|
|
for symbol, symbol_status in status.get('symbols', {}).items():
|
|
connected = symbol_status.get('connected', False)
|
|
fallback = symbol_status.get('rest_fallback_active', False)
|
|
messages = symbol_status.get('messages_received', 0)
|
|
|
|
if connected:
|
|
print(f" {symbol}: ✅ Connected ({messages} messages)")
|
|
elif fallback:
|
|
print(f" {symbol}: ⚠️ REST fallback active")
|
|
else:
|
|
error = symbol_status.get('last_error', 'Unknown error')
|
|
print(f" {symbol}: ❌ Error - {error}")
|
|
|
|
await asyncio.sleep(5) # Check every 5 seconds
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⏹️ Test interrupted by user")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error during monitoring: {e}")
|
|
break
|
|
|
|
# Test 4: Final status check
|
|
print("\n4. Final status check...")
|
|
try:
|
|
final_status = cob_ws.get_status_summary()
|
|
print(f"Final overall status: {final_status.get('overall_status', 'unknown')}")
|
|
|
|
for symbol, symbol_status in final_status.get('symbols', {}).items():
|
|
print(f" {symbol}:")
|
|
print(f" Connected: {symbol_status.get('connected', False)}")
|
|
print(f" Messages received: {symbol_status.get('messages_received', 0)}")
|
|
print(f" REST fallback: {symbol_status.get('rest_fallback_active', False)}")
|
|
if symbol_status.get('last_error'):
|
|
print(f" Last error: {symbol_status.get('last_error')}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error getting final status: {e}")
|
|
|
|
# Test 5: Stop connections
|
|
print("\n5. Stopping connections...")
|
|
try:
|
|
await cob_ws.stop()
|
|
print("✅ Connections stopped successfully")
|
|
except Exception as e:
|
|
print(f"❌ Error stopping connections: {e}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("🏁 Enhanced COB WebSocket test completed")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\n⏹️ Test interrupted")
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc() |