359 lines
14 KiB
Python
359 lines
14 KiB
Python
import logging
|
|
import time
|
|
from typing import Dict, Any, List, Optional
|
|
import requests
|
|
import hmac
|
|
import hashlib
|
|
from urllib.parse import urlencode
|
|
|
|
from .exchange_interface import ExchangeInterface
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MEXCInterface(ExchangeInterface):
|
|
"""MEXC Exchange API Interface"""
|
|
|
|
def __init__(self, api_key: str = None, api_secret: str = None, test_mode: bool = True):
|
|
"""Initialize MEXC exchange interface.
|
|
|
|
Args:
|
|
api_key: MEXC API key
|
|
api_secret: MEXC API secret
|
|
test_mode: If True, use test/sandbox environment (Note: MEXC doesn't have a true sandbox)
|
|
"""
|
|
super().__init__(api_key, api_secret, test_mode)
|
|
self.base_url = "https://api.mexc.com"
|
|
self.api_version = "api/v3"
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to MEXC API."""
|
|
if not self.api_key or not self.api_secret:
|
|
logger.warning("MEXC API credentials not provided. Running in read-only mode.")
|
|
try:
|
|
# Test public API connection by getting server time (ping)
|
|
self.get_server_time()
|
|
logger.info("Successfully connected to MEXC API in read-only mode")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to MEXC API in read-only mode: {str(e)}")
|
|
return False
|
|
|
|
try:
|
|
# Test connection by getting account info
|
|
self.get_account_info()
|
|
logger.info("Successfully connected to MEXC API with authentication")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to MEXC API: {str(e)}")
|
|
return False
|
|
|
|
def _generate_signature(self, params: Dict[str, Any]) -> str:
|
|
"""Generate signature for authenticated requests."""
|
|
# Sort parameters by key for consistent signature generation
|
|
sorted_params = sorted(params.items())
|
|
query_string = urlencode(sorted_params)
|
|
signature = hmac.new(
|
|
self.api_secret.encode('utf-8'),
|
|
query_string.encode('utf-8'),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
return signature
|
|
|
|
def _send_public_request(self, method: str, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""Send public request to MEXC API."""
|
|
url = f"{self.base_url}/{self.api_version}/{endpoint}"
|
|
|
|
try:
|
|
if method.upper() == 'GET':
|
|
response = requests.get(url, params=params)
|
|
else:
|
|
response = requests.post(url, json=params)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error in public request to {endpoint}: {str(e)}")
|
|
raise
|
|
|
|
def _send_private_request(self, method: str, endpoint: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""Send private/authenticated request to MEXC API."""
|
|
if not self.api_key or not self.api_secret:
|
|
raise ValueError("API key and secret are required for private requests")
|
|
|
|
if params is None:
|
|
params = {}
|
|
|
|
# Add timestamp and recvWindow as required by MEXC
|
|
params['timestamp'] = int(time.time() * 1000)
|
|
if 'recvWindow' not in params:
|
|
params['recvWindow'] = 5000
|
|
|
|
# Generate signature using the correct MEXC format
|
|
signature = self._generate_signature(params)
|
|
params['signature'] = signature
|
|
|
|
# Set headers as required by MEXC documentation
|
|
headers = {
|
|
'X-MEXC-APIKEY': self.api_key,
|
|
'Content-Type': 'application/x-www-form-urlencoded'
|
|
}
|
|
|
|
url = f"{self.base_url}/{self.api_version}/{endpoint}"
|
|
|
|
try:
|
|
if method.upper() == 'GET':
|
|
# For GET requests, send parameters as query string
|
|
response = requests.get(url, params=params, headers=headers)
|
|
elif method.upper() == 'POST':
|
|
# For POST requests, send as form data in request body per MEXC documentation
|
|
response = requests.post(url, data=params, headers=headers)
|
|
elif method.upper() == 'DELETE':
|
|
# For DELETE requests, send parameters as query string
|
|
response = requests.delete(url, params=params, headers=headers)
|
|
else:
|
|
raise ValueError(f"Unsupported HTTP method: {method}")
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error in private request to {endpoint}: {str(e)}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
logger.error(f"Response status: {e.response.status_code}")
|
|
logger.error(f"Response content: {e.response.text}")
|
|
raise
|
|
|
|
def get_server_time(self) -> Dict[str, Any]:
|
|
"""Get server time (ping test)."""
|
|
return self._send_public_request('GET', 'time')
|
|
|
|
def ping(self) -> Dict[str, Any]:
|
|
"""Test connectivity to the Rest API."""
|
|
return self._send_public_request('GET', 'ping')
|
|
|
|
def get_account_info(self) -> Dict[str, Any]:
|
|
"""Get account information."""
|
|
params = {'recvWindow': 5000}
|
|
return self._send_private_request('GET', 'account', params)
|
|
|
|
def get_balance(self, asset: str) -> float:
|
|
"""Get balance of a specific asset.
|
|
|
|
Args:
|
|
asset: Asset symbol (e.g., 'BTC', 'USDT')
|
|
|
|
Returns:
|
|
float: Available balance of the asset
|
|
"""
|
|
try:
|
|
params = {'recvWindow': 5000}
|
|
account_info = self._send_private_request('GET', 'account', params)
|
|
balances = account_info.get('balances', [])
|
|
|
|
for balance in balances:
|
|
if balance['asset'] == asset:
|
|
return float(balance['free'])
|
|
|
|
# Asset not found
|
|
return 0.0
|
|
except Exception as e:
|
|
logger.error(f"Error getting balance for {asset}: {str(e)}")
|
|
return 0.0
|
|
|
|
def get_ticker(self, symbol: str) -> Dict[str, Any]:
|
|
"""Get current ticker data for a symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC/USDT')
|
|
|
|
Returns:
|
|
dict: Ticker data including price information
|
|
"""
|
|
mexc_symbol = symbol.replace('/', '')
|
|
|
|
# Use official MEXC API endpoints from documentation
|
|
endpoints_to_try = [
|
|
('ticker/price', {'symbol': mexc_symbol}), # Symbol Price Ticker
|
|
('ticker/24hr', {'symbol': mexc_symbol}), # 24hr Ticker Price Change Statistics
|
|
('ticker/bookTicker', {'symbol': mexc_symbol}), # Symbol Order Book Ticker
|
|
]
|
|
|
|
for endpoint, params in endpoints_to_try:
|
|
try:
|
|
logger.debug(f"Trying MEXC endpoint: {endpoint} for {mexc_symbol}")
|
|
response = self._send_public_request('GET', endpoint, params)
|
|
|
|
if not response:
|
|
continue
|
|
|
|
# Handle the response based on structure
|
|
if isinstance(response, dict):
|
|
ticker = response
|
|
elif isinstance(response, list) and len(response) > 0:
|
|
# Find the specific symbol in list response
|
|
ticker = None
|
|
for t in response:
|
|
if t.get('symbol') == mexc_symbol:
|
|
ticker = t
|
|
break
|
|
if ticker is None:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
# Convert to standardized format based on MEXC API response
|
|
current_time = int(time.time() * 1000)
|
|
|
|
# Handle different response formats from different endpoints
|
|
if 'price' in ticker:
|
|
# ticker/price endpoint
|
|
price = float(ticker['price'])
|
|
result = {
|
|
'symbol': symbol,
|
|
'bid': price, # Use price as fallback
|
|
'ask': price, # Use price as fallback
|
|
'last': price,
|
|
'volume': 0, # Not available in price endpoint
|
|
'timestamp': current_time
|
|
}
|
|
elif 'lastPrice' in ticker:
|
|
# ticker/24hr endpoint
|
|
result = {
|
|
'symbol': symbol,
|
|
'bid': float(ticker.get('bidPrice', ticker.get('lastPrice', 0))),
|
|
'ask': float(ticker.get('askPrice', ticker.get('lastPrice', 0))),
|
|
'last': float(ticker.get('lastPrice', 0)),
|
|
'volume': float(ticker.get('volume', ticker.get('quoteVolume', 0))),
|
|
'timestamp': int(ticker.get('closeTime', current_time))
|
|
}
|
|
elif 'bidPrice' in ticker:
|
|
# ticker/bookTicker endpoint
|
|
result = {
|
|
'symbol': symbol,
|
|
'bid': float(ticker.get('bidPrice', 0)),
|
|
'ask': float(ticker.get('askPrice', 0)),
|
|
'last': float(ticker.get('bidPrice', 0)), # Use bid as fallback for last
|
|
'volume': 0, # Not available in book ticker
|
|
'timestamp': current_time
|
|
}
|
|
else:
|
|
continue
|
|
|
|
# Validate we have a valid price
|
|
if result['last'] > 0:
|
|
logger.info(f"✅ MEXC: Got ticker from {endpoint} for {symbol}: ${result['last']:.2f}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"MEXC endpoint {endpoint} failed for {symbol}: {e}")
|
|
continue
|
|
|
|
# All endpoints failed
|
|
logger.error(f"❌ MEXC: All ticker endpoints failed for {symbol}")
|
|
return None
|
|
|
|
def place_order(self, symbol: str, side: str, order_type: str,
|
|
quantity: float, price: float = None) -> Dict[str, Any]:
|
|
"""Place an order on the exchange.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC/USDT')
|
|
side: Order side ('BUY' or 'SELL')
|
|
order_type: Order type ('MARKET', 'LIMIT', etc.)
|
|
quantity: Order quantity
|
|
price: Order price (for limit orders)
|
|
|
|
Returns:
|
|
dict: Order information including order ID
|
|
"""
|
|
mexc_symbol = symbol.replace('/', '')
|
|
|
|
# Prepare order parameters according to MEXC API
|
|
params = {
|
|
'symbol': mexc_symbol,
|
|
'side': side.upper(),
|
|
'type': order_type.upper(),
|
|
'quantity': str(quantity), # MEXC expects string format
|
|
'recvWindow': 5000
|
|
}
|
|
|
|
# Add price and timeInForce for limit orders
|
|
if order_type.upper() == 'LIMIT':
|
|
if price is None:
|
|
raise ValueError("Price is required for LIMIT orders")
|
|
params['price'] = str(price)
|
|
params['timeInForce'] = 'GTC' # Good Till Cancelled
|
|
|
|
try:
|
|
logger.info(f"MEXC: Placing {side} {order_type} order for {symbol}: {quantity} @ {price}")
|
|
order_result = self._send_private_request('POST', 'order', params)
|
|
logger.info(f"MEXC: Order placed successfully: {order_result.get('orderId', 'N/A')}")
|
|
return order_result
|
|
except Exception as e:
|
|
logger.error(f"MEXC: Error placing {side} {order_type} order for {symbol}: {str(e)}")
|
|
raise
|
|
|
|
def cancel_order(self, symbol: str, order_id: str) -> bool:
|
|
"""Cancel an existing order.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC/USDT')
|
|
order_id: ID of the order to cancel
|
|
|
|
Returns:
|
|
bool: True if cancellation successful, False otherwise
|
|
"""
|
|
mexc_symbol = symbol.replace('/', '')
|
|
params = {
|
|
'symbol': mexc_symbol,
|
|
'orderId': order_id
|
|
}
|
|
|
|
try:
|
|
cancel_result = self._send_private_request('DELETE', 'order', params)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling order {order_id} for {symbol}: {str(e)}")
|
|
return False
|
|
|
|
def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]:
|
|
"""Get status of an existing order.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC/USDT')
|
|
order_id: ID of the order
|
|
|
|
Returns:
|
|
dict: Order status information
|
|
"""
|
|
mexc_symbol = symbol.replace('/', '')
|
|
params = {
|
|
'symbol': mexc_symbol,
|
|
'orderId': order_id
|
|
}
|
|
|
|
try:
|
|
order_info = self._send_private_request('GET', 'order', params)
|
|
return order_info
|
|
except Exception as e:
|
|
logger.error(f"Error getting order status for {order_id} on {symbol}: {str(e)}")
|
|
raise
|
|
|
|
def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]:
|
|
"""Get all open orders, optionally filtered by symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTC/USDT'), or None for all symbols
|
|
|
|
Returns:
|
|
list: List of open orders
|
|
"""
|
|
params = {}
|
|
if symbol:
|
|
params['symbol'] = symbol.replace('/', '')
|
|
|
|
try:
|
|
open_orders = self._send_private_request('GET', 'openOrders', params)
|
|
return open_orders
|
|
except Exception as e:
|
|
logger.error(f"Error getting open orders: {str(e)}")
|
|
return [] |