import asyncio import json import logging import unittest from typing import Optional, Dict import websockets logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class TestMEXCWebSocket(unittest.TestCase): async def test_websocket_connection(self): """Test basic WebSocket connection and subscription""" uri = "wss://stream.mexc.com/ws" symbol = "ethusdt" async with websockets.connect(uri) as ws: # Test subscription to deals sub_msg = { "op": "sub", "id": "test1", "topic": f"spot.deals.{symbol}" } # Send subscription await ws.send(json.dumps(sub_msg)) # Wait for subscription confirmation and first message messages_received = 0 trades_received = 0 while messages_received < 5: # Get at least 5 messages try: response = await asyncio.wait_for(ws.recv(), timeout=10) messages_received += 1 logger.info(f"Received message: {response[:200]}...") data = json.loads(response) # Check message structure if isinstance(data, dict): if 'channel' in data: if data['channel'] == 'spot.deals': trades = data.get('data', []) if trades: trades_received += 1 logger.info(f"Received trade data: {trades[0]}") # Verify trade data structure trade = trades[0] self.assertIn('t', trade) # timestamp self.assertIn('p', trade) # price self.assertIn('v', trade) # volume self.assertIn('S', trade) # side except asyncio.TimeoutError: self.fail("Timeout waiting for WebSocket messages") # Verify we received some trades self.assertGreater(trades_received, 0, "No trades received") # Test unsubscribe unsub_msg = { "op": "unsub", "id": "test1", "topic": f"spot.deals.{symbol}" } await ws.send(json.dumps(unsub_msg)) async def test_kline_subscription(self): """Test subscription to kline (candlestick) data""" uri = "wss://stream.mexc.com/ws" symbol = "ethusdt" async with websockets.connect(uri) as ws: # Subscribe to 1m klines sub_msg = { "op": "sub", "id": "test2", "topic": f"spot.klines.{symbol}_1m" } await ws.send(json.dumps(sub_msg)) messages_received = 0 klines_received = 0 while messages_received < 5: try: response = await asyncio.wait_for(ws.recv(), timeout=10) messages_received += 1 logger.info(f"Received kline message: {response[:200]}...") data = json.loads(response) if isinstance(data, dict): if data.get('channel') == 'spot.klines': kline_data = data.get('data', []) if kline_data: klines_received += 1 logger.info(f"Received kline data: {kline_data[0]}") # Verify kline data structure (should be an array) kline = kline_data[0] self.assertEqual(len(kline), 6) # Should have 6 elements except asyncio.TimeoutError: self.fail("Timeout waiting for kline data") self.assertGreater(klines_received, 0, "No kline data received") def run_tests(): """Run the WebSocket tests""" async def run_async_tests(): # Create test suite suite = unittest.TestSuite() suite.addTest(TestMEXCWebSocket('test_websocket_connection')) suite.addTest(TestMEXCWebSocket('test_kline_subscription')) # Run tests runner = unittest.TextTestRunner(verbosity=2) runner.run(suite) # Run tests in asyncio loop asyncio.run(run_async_tests()) if __name__ == "__main__": run_tests()