import logging import time import asyncio import json import websockets from typing import Dict, Any, List, Optional, Callable import requests import hmac import hashlib from urllib.parse import urlencode from datetime import datetime from threading import Thread, Lock from collections import deque from .exchange_interface import ExchangeInterface logger = logging.getLogger(__name__) class MEXCInterface(ExchangeInterface): """MEXC Exchange API Interface with WebSocket support""" def __init__(self, api_key: str = None, api_secret: str = None, test_mode: bool = True): """Initialize MEXC exchange interface. Args: api_key: MEXC API key api_secret: MEXC API secret test_mode: If True, use test/sandbox environment (Note: MEXC doesn't have a true sandbox) """ super().__init__(api_key, api_secret, test_mode) self.base_url = "https://api.mexc.com" self.api_version = "api/v3" # WebSocket configuration self.ws_base_url = "wss://wbs.mexc.com/ws" self.websocket_tasks = {} self.is_streaming = False self.stream_lock = Lock() self.tick_callbacks = [] self.ticker_callbacks = [] # Data buffers for reliability self.recent_ticks = {} # {symbol: deque} self.current_prices = {} # {symbol: price} self.buffer_size = 1000 def add_tick_callback(self, callback: Callable[[Dict[str, Any]], None]): """Add callback for real-time tick data""" self.tick_callbacks.append(callback) logger.info(f"Added MEXC tick callback: {len(self.tick_callbacks)} total") def add_ticker_callback(self, callback: Callable[[Dict[str, Any]], None]): """Add callback for real-time ticker data""" self.ticker_callbacks.append(callback) logger.info(f"Added MEXC ticker callback: {len(self.ticker_callbacks)} total") def _notify_tick_callbacks(self, tick_data: Dict[str, Any]): """Notify all tick callbacks with new data""" for callback in self.tick_callbacks: try: callback(tick_data) except Exception as e: logger.error(f"Error in MEXC tick callback: {e}") def _notify_ticker_callbacks(self, ticker_data: Dict[str, Any]): """Notify all ticker callbacks with new data""" for callback in self.ticker_callbacks: try: callback(ticker_data) except Exception as e: logger.error(f"Error in MEXC ticker callback: {e}") async def start_websocket_streams(self, symbols: List[str], stream_types: List[str] = None): """Start WebSocket streams for multiple symbols Args: symbols: List of symbols in 'BTC/USDT' format stream_types: List of stream types ['trade', 'ticker', 'depth'] (default: ['trade', 'ticker']) """ if stream_types is None: stream_types = ['trade', 'ticker'] self.is_streaming = True logger.info(f"Starting MEXC WebSocket streams for {symbols} with types {stream_types}") # Initialize buffers for symbols for symbol in symbols: mexc_symbol = symbol.replace('/', '').upper() self.recent_ticks[mexc_symbol] = deque(maxlen=self.buffer_size) # Start streams for each symbol and stream type combination for symbol in symbols: for stream_type in stream_types: task = asyncio.create_task(self._websocket_stream(symbol, stream_type)) task_key = f"{symbol}_{stream_type}" self.websocket_tasks[task_key] = task async def stop_websocket_streams(self): """Stop all WebSocket streams""" logger.info("Stopping MEXC WebSocket streams") self.is_streaming = False # Cancel all tasks for task_key, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() async def _websocket_stream(self, symbol: str, stream_type: str): """Individual WebSocket stream for a symbol and stream type""" mexc_symbol = symbol.replace('/', '').upper() # MEXC WebSocket stream naming convention if stream_type == 'trade': stream_name = f"{mexc_symbol}@trade" elif stream_type == 'ticker': stream_name = f"{mexc_symbol}@ticker" elif stream_type == 'depth': stream_name = f"{mexc_symbol}@depth" else: logger.error(f"Unsupported MEXC stream type: {stream_type}") return url = f"{self.ws_base_url}" while self.is_streaming: try: logger.info(f"Connecting to MEXC WebSocket: {stream_name}") async with websockets.connect(url) as websocket: # Subscribe to the stream subscribe_msg = { "method": "SUBSCRIPTION", "params": [stream_name] } await websocket.send(json.dumps(subscribe_msg)) logger.info(f"Subscribed to MEXC stream: {stream_name}") async for message in websocket: if not self.is_streaming: break try: await self._process_websocket_message(mexc_symbol, stream_type, message) except Exception as e: logger.warning(f"Error processing MEXC message for {stream_name}: {e}") except Exception as e: logger.error(f"MEXC WebSocket error for {stream_name}: {e}") if self.is_streaming: logger.info(f"Reconnecting MEXC WebSocket for {stream_name} in 5 seconds...") await asyncio.sleep(5) async def _process_websocket_message(self, symbol: str, stream_type: str, message: str): """Process incoming WebSocket message""" try: data = json.loads(message) # Handle subscription confirmation if data.get('id') is not None: logger.info(f"MEXC WebSocket subscription confirmed for {symbol} {stream_type}") return # Process data based on stream type if stream_type == 'trade' and 'data' in data: await self._process_trade_data(symbol, data['data']) elif stream_type == 'ticker' and 'data' in data: await self._process_ticker_data(symbol, data['data']) elif stream_type == 'depth' and 'data' in data: await self._process_depth_data(symbol, data['data']) except Exception as e: logger.error(f"Error processing MEXC WebSocket message: {e}") async def _process_trade_data(self, symbol: str, trade_data: Dict[str, Any]): """Process trade data from WebSocket""" try: # MEXC trade data format price = float(trade_data.get('p', 0)) quantity = float(trade_data.get('q', 0)) timestamp = datetime.fromtimestamp(int(trade_data.get('t', 0)) / 1000) is_buyer_maker = trade_data.get('m', False) trade_id = trade_data.get('i', '') # Create standardized tick tick = { 'symbol': symbol, 'timestamp': timestamp, 'price': price, 'volume': price * quantity, # Volume in quote currency 'quantity': quantity, 'side': 'sell' if is_buyer_maker else 'buy', 'trade_id': str(trade_id), 'is_buyer_maker': is_buyer_maker, 'exchange': 'MEXC', 'raw_data': trade_data } # Update buffers self.recent_ticks[symbol].append(tick) self.current_prices[symbol] = price # Notify callbacks self._notify_tick_callbacks(tick) except Exception as e: logger.error(f"Error processing MEXC trade data: {e}") async def _process_ticker_data(self, symbol: str, ticker_data: Dict[str, Any]): """Process ticker data from WebSocket""" try: # MEXC ticker data format ticker = { 'symbol': symbol, 'timestamp': datetime.now(), 'price': float(ticker_data.get('c', 0)), # Current price 'bid': float(ticker_data.get('b', 0)), # Best bid 'ask': float(ticker_data.get('a', 0)), # Best ask 'volume': float(ticker_data.get('v', 0)), # Volume 'high': float(ticker_data.get('h', 0)), # 24h high 'low': float(ticker_data.get('l', 0)), # 24h low 'change': float(ticker_data.get('P', 0)), # Price change % 'exchange': 'MEXC', 'raw_data': ticker_data } # Update current price self.current_prices[symbol] = ticker['price'] # Notify callbacks self._notify_ticker_callbacks(ticker) except Exception as e: logger.error(f"Error processing MEXC ticker data: {e}") async def _process_depth_data(self, symbol: str, depth_data: Dict[str, Any]): """Process order book depth data from WebSocket""" try: # Process depth data if needed for future features logger.debug(f"MEXC depth data received for {symbol}") except Exception as e: logger.error(f"Error processing MEXC depth data: {e}") def get_current_price(self, symbol: str) -> Optional[float]: """Get current price for a symbol from WebSocket data or REST API fallback""" mexc_symbol = symbol.replace('/', '').upper() # Try from WebSocket data first if mexc_symbol in self.current_prices: return self.current_prices[mexc_symbol] # Fallback to REST API try: ticker = self.get_ticker(symbol) if ticker and 'price' in ticker: return float(ticker['price']) except Exception as e: logger.warning(f"Failed to get current price for {symbol} from MEXC: {e}") return None def get_recent_ticks(self, symbol: str, count: int = 100) -> List[Dict[str, Any]]: """Get recent ticks for a symbol""" mexc_symbol = symbol.replace('/', '').upper() if mexc_symbol in self.recent_ticks: return list(self.recent_ticks[mexc_symbol])[-count:] return [] def connect(self) -> bool: """Connect to MEXC API.""" if not self.api_key or not self.api_secret: logger.warning("MEXC API credentials not provided. Running in read-only mode.") try: # Test public API connection by getting server time (ping) self.get_server_time() logger.info("Successfully connected to MEXC API in read-only mode") return True except Exception as e: logger.error(f"Failed to connect to MEXC API in read-only mode: {str(e)}") return False try: # Test connection by getting account info self.get_account_info() logger.info("Successfully connected to MEXC API with authentication") return True except Exception as e: logger.error(f"Failed to connect to MEXC API: {str(e)}") return False def _generate_signature(self, params: Dict[str, Any]) -> str: """Generate signature for authenticated requests.""" # Sort parameters by key for consistent signature generation sorted_params = sorted(params.items()) query_string = urlencode(sorted_params) signature = hmac.new( self.api_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256 ).hexdigest() return signature def _send_public_request(self, method: str, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send public request to MEXC API.""" url = f"{self.base_url}/{self.api_version}/{endpoint}" try: if method.upper() == 'GET': response = requests.get(url, params=params) else: response = requests.post(url, json=params) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error in public request to {endpoint}: {str(e)}") raise def _send_private_request(self, method: str, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Send private/authenticated request to MEXC API.""" if not self.api_key or not self.api_secret: raise ValueError("API key and secret are required for private requests") if params is None: params = {} # Add timestamp and recvWindow as required by MEXC # Use server time for better synchronization try: server_time_response = self._send_public_request('GET', 'time') params['timestamp'] = server_time_response['serverTime'] except: params['timestamp'] = int(time.time() * 1000) if 'recvWindow' not in params: params['recvWindow'] = 10000 # Increased receive window # Generate signature using the correct MEXC format signature = self._generate_signature(params) params['signature'] = signature # Set headers as required by MEXC documentation headers = { 'X-MEXC-APIKEY': self.api_key } url = f"{self.base_url}/{self.api_version}/{endpoint}" try: if method.upper() == 'GET': # For GET requests with authentication, signature goes in query string response = requests.get(url, params=params, headers=headers) elif method.upper() == 'POST': # For POST requests, send as form data in request body headers['Content-Type'] = 'application/x-www-form-urlencoded' response = requests.post(url, data=params, headers=headers) elif method.upper() == 'DELETE': # For DELETE requests, send parameters as query string response = requests.delete(url, params=params, headers=headers) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error in private request to {endpoint}: {str(e)}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response status: {e.response.status_code}") logger.error(f"Response content: {e.response.text}") raise def get_server_time(self) -> Dict[str, Any]: """Get server time (ping test).""" return self._send_public_request('GET', 'time') def ping(self) -> Dict[str, Any]: """Test connectivity to the Rest API.""" return self._send_public_request('GET', 'ping') def get_account_info(self) -> Dict[str, Any]: """Get account information.""" params = {'recvWindow': 5000} return self._send_private_request('GET', 'account', params) def get_balance(self, asset: str) -> float: """Get balance of a specific asset. Args: asset: Asset symbol (e.g., 'BTC', 'USDT') Returns: float: Available balance of the asset """ try: params = {'recvWindow': 5000} account_info = self._send_private_request('GET', 'account', params) balances = account_info.get('balances', []) for balance in balances: if balance['asset'] == asset: return float(balance['free']) # Asset not found return 0.0 except Exception as e: logger.error(f"Error getting balance for {asset}: {str(e)}") return 0.0 def get_ticker(self, symbol: str) -> Dict[str, Any]: """Get current ticker data for a symbol. Args: symbol: Trading symbol (e.g., 'BTC/USDT') Returns: dict: Ticker data including price information """ mexc_symbol = symbol.replace('/', '') # Use official MEXC API endpoints from documentation endpoints_to_try = [ ('ticker/price', {'symbol': mexc_symbol}), # Symbol Price Ticker ('ticker/24hr', {'symbol': mexc_symbol}), # 24hr Ticker Price Change Statistics ('ticker/bookTicker', {'symbol': mexc_symbol}), # Symbol Order Book Ticker ] for endpoint, params in endpoints_to_try: try: logger.debug(f"Trying MEXC endpoint: {endpoint} for {mexc_symbol}") response = self._send_public_request('GET', endpoint, params) if not response: continue # Handle the response based on structure if isinstance(response, dict): ticker = response elif isinstance(response, list) and len(response) > 0: # Find the specific symbol in list response ticker = None for t in response: if t.get('symbol') == mexc_symbol: ticker = t break if ticker is None: continue else: continue # Convert to standardized format based on MEXC API response current_time = int(time.time() * 1000) # Handle different response formats from different endpoints if 'price' in ticker: # ticker/price endpoint price = float(ticker['price']) result = { 'symbol': symbol, 'bid': price, # Use price as fallback 'ask': price, # Use price as fallback 'last': price, 'volume': 0, # Not available in price endpoint 'timestamp': current_time } elif 'lastPrice' in ticker: # ticker/24hr endpoint result = { 'symbol': symbol, 'bid': float(ticker.get('bidPrice', ticker.get('lastPrice', 0))), 'ask': float(ticker.get('askPrice', ticker.get('lastPrice', 0))), 'last': float(ticker.get('lastPrice', 0)), 'volume': float(ticker.get('volume', ticker.get('quoteVolume', 0))), 'timestamp': int(ticker.get('closeTime', current_time)) } elif 'bidPrice' in ticker: # ticker/bookTicker endpoint result = { 'symbol': symbol, 'bid': float(ticker.get('bidPrice', 0)), 'ask': float(ticker.get('askPrice', 0)), 'last': float(ticker.get('bidPrice', 0)), # Use bid as fallback for last 'volume': 0, # Not available in book ticker 'timestamp': current_time } else: continue # Validate we have a valid price if result['last'] > 0: logger.info(f"✅ MEXC: Got ticker from {endpoint} for {symbol}: ${result['last']:.2f}") return result except Exception as e: logger.warning(f"MEXC endpoint {endpoint} failed for {symbol}: {e}") continue # All endpoints failed logger.error(f"❌ MEXC: All ticker endpoints failed for {symbol}") return None def place_order(self, symbol: str, side: str, order_type: str, quantity: float, price: float = None) -> Dict[str, Any]: """Place an order on the exchange. Args: symbol: Trading symbol (e.g., 'BTC/USDT') side: Order side ('BUY' or 'SELL') order_type: Order type ('MARKET', 'LIMIT', etc.) quantity: Order quantity price: Order price (for limit orders) Returns: dict: Order information including order ID """ mexc_symbol = symbol.replace('/', '') # Prepare order parameters according to MEXC API params = { 'symbol': mexc_symbol, 'side': side.upper(), 'type': order_type.upper(), 'quantity': str(quantity), # MEXC expects string format 'recvWindow': 5000 } # Add price and timeInForce for limit orders if order_type.upper() == 'LIMIT': if price is None: raise ValueError("Price is required for LIMIT orders") params['price'] = str(price) params['timeInForce'] = 'GTC' # Good Till Cancelled try: logger.info(f"MEXC: Placing {side} {order_type} order for {symbol}: {quantity} @ {price}") order_result = self._send_private_request('POST', 'order', params) logger.info(f"MEXC: Order placed successfully: {order_result.get('orderId', 'N/A')}") return order_result except Exception as e: logger.error(f"MEXC: Error placing {side} {order_type} order for {symbol}: {str(e)}") raise def cancel_order(self, symbol: str, order_id: str) -> bool: """Cancel an existing order. Args: symbol: Trading symbol (e.g., 'BTC/USDT') order_id: ID of the order to cancel Returns: bool: True if cancellation successful, False otherwise """ mexc_symbol = symbol.replace('/', '') params = { 'symbol': mexc_symbol, 'orderId': order_id } try: cancel_result = self._send_private_request('DELETE', 'order', params) return True except Exception as e: logger.error(f"Error cancelling order {order_id} for {symbol}: {str(e)}") return False def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]: """Get status of an existing order. Args: symbol: Trading symbol (e.g., 'BTC/USDT') order_id: ID of the order Returns: dict: Order status information """ mexc_symbol = symbol.replace('/', '') params = { 'symbol': mexc_symbol, 'orderId': order_id } try: order_info = self._send_private_request('GET', 'order', params) return order_info except Exception as e: logger.error(f"Error getting order status for {order_id} on {symbol}: {str(e)}") raise def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]: """Get all open orders, optionally filtered by symbol. Args: symbol: Trading symbol (e.g., 'BTC/USDT'), or None for all symbols Returns: list: List of open orders """ params = {} if symbol: params['symbol'] = symbol.replace('/', '') try: open_orders = self._send_private_request('GET', 'openOrders', params) return open_orders except Exception as e: logger.error(f"Error getting open orders: {str(e)}") return []