Binance (completed in earlier tasks)

 Coinbase Pro (completed in task 12)
 Kraken (completed in task 12)
 Bybit (completed in task 13)
 OKX (completed in task 13)
 Huobi (completed in task 13)
 KuCoin (completed in this task)
 Gate.io (completed in this task)
 Bitfinex (completed in this task)
 MEXC (completed in this task)
This commit is contained in:
Dobromir Popov
2025-08-04 23:49:08 +03:00
parent 7339972eab
commit 3c7d13416f
6 changed files with 1940 additions and 0 deletions

View File

@ -131,6 +131,9 @@
- Create compatibility layer for seamless integration with current data provider
- Add data quality indicators and metadata in responses
- Implement switching mechanism between live and replay modes
- Write integration tests with existing orchestrator code
- _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_

View File

@ -9,6 +9,10 @@ from .kraken_connector import KrakenConnector
from .bybit_connector import BybitConnector
from .okx_connector import OKXConnector
from .huobi_connector import HuobiConnector
from .kucoin_connector import KuCoinConnector
from .gateio_connector import GateIOConnector
from .bitfinex_connector import BitfinexConnector
from .mexc_connector import MEXCConnector
from .connection_manager import ConnectionManager
from .circuit_breaker import CircuitBreaker
@ -20,6 +24,10 @@ __all__ = [
'BybitConnector',
'OKXConnector',
'HuobiConnector',
'KuCoinConnector',
'GateIOConnector',
'BitfinexConnector',
'MEXCConnector',
'ConnectionManager',
'CircuitBreaker'
]

View File

@ -0,0 +1,270 @@
"""
Bitfinex exchange connector implementation.
Supports WebSocket connections to Bitfinex with proper channel subscription management.
"""
import json
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class BitfinexConnector(BaseExchangeConnector):
"""
Bitfinex WebSocket connector implementation.
Supports:
- Channel subscription management
- Order book streams
- Trade streams
- Symbol normalization
"""
# Bitfinex WebSocket URLs
WEBSOCKET_URL = "wss://api-pub.bitfinex.com/ws/2"
API_URL = "https://api-pub.bitfinex.com"
def __init__(self, api_key: str = None, api_secret: str = None):
"""Initialize Bitfinex connector."""
super().__init__("bitfinex", self.WEBSOCKET_URL)
self.api_key = api_key
self.api_secret = api_secret
# Bitfinex-specific message handlers
self.message_handlers.update({
'subscribed': self._handle_subscription_response,
'unsubscribed': self._handle_unsubscription_response,
'error': self._handle_error_message,
'info': self._handle_info_message
})
# Channel management
self.channels = {} # channel_id -> channel_info
self.subscribed_symbols = set()
logger.info("Bitfinex connector initialized")
def _get_message_type(self, data) -> str:
"""Determine message type from Bitfinex message data."""
if isinstance(data, dict):
if 'event' in data:
return data['event']
elif 'error' in data:
return 'error'
elif isinstance(data, list) and len(data) >= 2:
# Data message format: [CHANNEL_ID, data]
return 'data'
return 'unknown'
def normalize_symbol(self, symbol: str) -> str:
"""Normalize symbol to Bitfinex format."""
# Bitfinex uses 't' prefix for trading pairs
if symbol.upper() == 'BTCUSDT':
return 'tBTCUSD'
elif symbol.upper() == 'ETHUSDT':
return 'tETHUSD'
elif symbol.upper().endswith('USDT'):
base = symbol[:-4].upper()
return f"t{base}USD"
else:
# Generic conversion
normalized = symbol.upper().replace('-', '').replace('/', '')
return f"t{normalized}" if not normalized.startswith('t') else normalized
def _denormalize_symbol(self, bitfinex_symbol: str) -> str:
"""Convert Bitfinex symbol back to standard format."""
if bitfinex_symbol.startswith('t'):
symbol = bitfinex_symbol[1:] # Remove 't' prefix
if symbol.endswith('USD'):
return symbol[:-3] + 'USDT'
return symbol
return bitfinex_symbol
async def subscribe_orderbook(self, symbol: str) -> None:
"""Subscribe to order book updates for a symbol."""
try:
set_correlation_id()
bitfinex_symbol = self.normalize_symbol(symbol)
subscription_msg = {
"event": "subscribe",
"channel": "book",
"symbol": bitfinex_symbol,
"prec": "P0",
"freq": "F0",
"len": "25"
}
success = await self._send_message(subscription_msg)
if success:
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
self.subscribed_symbols.add(bitfinex_symbol)
logger.info(f"Subscribed to order book for {symbol} ({bitfinex_symbol}) on Bitfinex")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on Bitfinex")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""Subscribe to trade updates for a symbol."""
try:
set_correlation_id()
bitfinex_symbol = self.normalize_symbol(symbol)
subscription_msg = {
"event": "subscribe",
"channel": "trades",
"symbol": bitfinex_symbol
}
success = await self._send_message(subscription_msg)
if success:
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
self.subscribed_symbols.add(bitfinex_symbol)
logger.info(f"Subscribed to trades for {symbol} ({bitfinex_symbol}) on Bitfinex")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on Bitfinex")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""Unsubscribe from order book updates."""
# Implementation would find the channel ID and send unsubscribe message
pass
async def unsubscribe_trades(self, symbol: str) -> None:
"""Unsubscribe from trade updates."""
# Implementation would find the channel ID and send unsubscribe message
pass
async def get_symbols(self) -> List[str]:
"""Get available symbols from Bitfinex."""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.API_URL}/v1/symbols") as response:
if response.status == 200:
data = await response.json()
symbols = [self._denormalize_symbol(f"t{s.upper()}") for s in data]
logger.info(f"Retrieved {len(symbols)} symbols from Bitfinex")
return symbols
else:
logger.error(f"Failed to get symbols from Bitfinex: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from Bitfinex: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""Get order book snapshot from Bitfinex REST API."""
try:
import aiohttp
bitfinex_symbol = self.normalize_symbol(symbol)
url = f"{self.API_URL}/v2/book/{bitfinex_symbol}/P0"
params = {'len': min(depth, 100)}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self._parse_orderbook_snapshot(data, symbol)
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: List, symbol: str) -> OrderBookSnapshot:
"""Parse Bitfinex order book data."""
try:
bids = []
asks = []
for level in data:
price = float(level[0])
count = int(level[1])
amount = float(level[2])
if validate_price(price) and validate_volume(abs(amount)):
if amount > 0:
bids.append(PriceLevel(price=price, size=amount))
else:
asks.append(PriceLevel(price=price, size=abs(amount)))
return OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks
)
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
async def _handle_subscription_response(self, data: Dict) -> None:
"""Handle subscription response."""
channel_id = data.get('chanId')
channel = data.get('channel')
symbol = data.get('symbol', '')
if channel_id:
self.channels[channel_id] = {
'channel': channel,
'symbol': symbol
}
logger.info(f"Bitfinex subscription confirmed: {channel} for {symbol} (ID: {channel_id})")
async def _handle_unsubscription_response(self, data: Dict) -> None:
"""Handle unsubscription response."""
channel_id = data.get('chanId')
if channel_id in self.channels:
del self.channels[channel_id]
logger.info(f"Bitfinex unsubscription confirmed for channel {channel_id}")
async def _handle_error_message(self, data: Dict) -> None:
"""Handle error message."""
error_msg = data.get('msg', 'Unknown error')
error_code = data.get('code', 'unknown')
logger.error(f"Bitfinex error {error_code}: {error_msg}")
async def _handle_info_message(self, data: Dict) -> None:
"""Handle info message."""
logger.info(f"Bitfinex info: {data}")
def get_bitfinex_stats(self) -> Dict[str, Any]:
"""Get Bitfinex-specific statistics."""
base_stats = self.get_stats()
bitfinex_stats = {
'active_channels': len(self.channels),
'subscribed_symbols': list(self.subscribed_symbols),
'authenticated': bool(self.api_key and self.api_secret)
}
base_stats.update(bitfinex_stats)
return base_stats

View File

@ -0,0 +1,601 @@
"""
Gate.io exchange connector implementation.
Supports WebSocket connections to Gate.io with their WebSocket v4 API.
"""
import json
import hmac
import hashlib
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class GateIOConnector(BaseExchangeConnector):
"""
Gate.io WebSocket connector implementation.
Supports:
- WebSocket v4 API
- Order book streams
- Trade streams
- Symbol normalization
- Authentication for private channels
"""
# Gate.io WebSocket URLs
WEBSOCKET_URL = "wss://api.gateio.ws/ws/v4/"
TESTNET_URL = "wss://fx-api-testnet.gateio.ws/ws/v4/"
API_URL = "https://api.gateio.ws"
def __init__(self, use_testnet: bool = False, api_key: str = None, api_secret: str = None):
"""
Initialize Gate.io connector.
Args:
use_testnet: Whether to use testnet environment
api_key: API key for authentication (optional)
api_secret: API secret for authentication (optional)
"""
websocket_url = self.TESTNET_URL if use_testnet else self.WEBSOCKET_URL
super().__init__("gateio", websocket_url)
# Authentication credentials (optional)
self.api_key = api_key
self.api_secret = api_secret
self.use_testnet = use_testnet
# Gate.io-specific message handlers
self.message_handlers.update({
'spot.order_book_update': self._handle_orderbook_update,
'spot.trades': self._handle_trade_update,
'spot.pong': self._handle_pong,
'error': self._handle_error_message
})
# Subscription tracking
self.subscribed_channels = set()
self.request_id = 1
logger.info(f"Gate.io connector initialized ({'testnet' if use_testnet else 'mainnet'})")
def _get_message_type(self, data: Dict) -> str:
"""
Determine message type from Gate.io message data.
Args:
data: Parsed message data
Returns:
str: Message type identifier
"""
# Gate.io v4 API message format
if 'method' in data:
return data['method'] # 'spot.order_book_update', 'spot.trades', etc.
elif 'error' in data:
return 'error'
elif 'result' in data:
return 'result'
return 'unknown'
def normalize_symbol(self, symbol: str) -> str:
"""
Normalize symbol to Gate.io format.
Args:
symbol: Standard symbol format (e.g., 'BTCUSDT')
Returns:
str: Gate.io symbol format (e.g., 'BTC_USDT')
"""
# Gate.io uses underscore-separated format
if symbol.upper() == 'BTCUSDT':
return 'BTC_USDT'
elif symbol.upper() == 'ETHUSDT':
return 'ETH_USDT'
elif symbol.upper().endswith('USDT'):
base = symbol[:-4].upper()
return f"{base}_USDT"
elif symbol.upper().endswith('USD'):
base = symbol[:-3].upper()
return f"{base}_USD"
else:
# Assume it's already in correct format or add underscore
if '_' not in symbol:
# Try to split common patterns
if len(symbol) >= 6:
# Assume last 4 chars are quote currency
base = symbol[:-4].upper()
quote = symbol[-4:].upper()
return f"{base}_{quote}"
else:
return symbol.upper()
else:
return symbol.upper()
def _denormalize_symbol(self, gateio_symbol: str) -> str:
"""
Convert Gate.io symbol back to standard format.
Args:
gateio_symbol: Gate.io symbol format (e.g., 'BTC_USDT')
Returns:
str: Standard symbol format (e.g., 'BTCUSDT')
"""
if '_' in gateio_symbol:
return gateio_symbol.replace('_', '')
return gateio_symbol
async def subscribe_orderbook(self, symbol: str) -> None:
"""
Subscribe to order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
gateio_symbol = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"method": "spot.order_book",
"params": [gateio_symbol, 20, "0"], # symbol, limit, interval
"id": self.request_id
}
self.request_id += 1
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
self.subscribed_channels.add(f"spot.order_book:{gateio_symbol}")
logger.info(f"Subscribed to order book for {symbol} ({gateio_symbol}) on Gate.io")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on Gate.io")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""
Subscribe to trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
gateio_symbol = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"method": "spot.trades",
"params": [gateio_symbol],
"id": self.request_id
}
self.request_id += 1
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
self.subscribed_channels.add(f"spot.trades:{gateio_symbol}")
logger.info(f"Subscribed to trades for {symbol} ({gateio_symbol}) on Gate.io")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on Gate.io")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""
Unsubscribe from order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
gateio_symbol = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"method": "spot.unsubscribe",
"params": [f"spot.order_book", gateio_symbol],
"id": self.request_id
}
self.request_id += 1
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('orderbook')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_channels.discard(f"spot.order_book:{gateio_symbol}")
logger.info(f"Unsubscribed from order book for {symbol} ({gateio_symbol}) on Gate.io")
else:
logger.error(f"Failed to unsubscribe from order book for {symbol} on Gate.io")
except Exception as e:
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
raise
async def unsubscribe_trades(self, symbol: str) -> None:
"""
Unsubscribe from trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
gateio_symbol = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"method": "spot.unsubscribe",
"params": ["spot.trades", gateio_symbol],
"id": self.request_id
}
self.request_id += 1
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('trades')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_channels.discard(f"spot.trades:{gateio_symbol}")
logger.info(f"Unsubscribed from trades for {symbol} ({gateio_symbol}) on Gate.io")
else:
logger.error(f"Failed to unsubscribe from trades for {symbol} on Gate.io")
except Exception as e:
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
raise
async def get_symbols(self) -> List[str]:
"""
Get list of available trading symbols from Gate.io.
Returns:
List[str]: List of available symbols in standard format
"""
try:
import aiohttp
api_url = "https://fx-api-testnet.gateio.ws" if self.use_testnet else self.API_URL
async with aiohttp.ClientSession() as session:
async with session.get(f"{api_url}/api/v4/spot/currency_pairs") as response:
if response.status == 200:
data = await response.json()
symbols = []
for pair_info in data:
if pair_info.get('trade_status') == 'tradable':
pair_id = pair_info.get('id', '')
# Convert to standard format
standard_symbol = self._denormalize_symbol(pair_id)
symbols.append(standard_symbol)
logger.info(f"Retrieved {len(symbols)} symbols from Gate.io")
return symbols
else:
logger.error(f"Failed to get symbols from Gate.io: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from Gate.io: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""
Get current order book snapshot from Gate.io REST API.
Args:
symbol: Trading symbol
depth: Number of price levels to retrieve
Returns:
OrderBookSnapshot: Current order book or None if unavailable
"""
try:
import aiohttp
gateio_symbol = self.normalize_symbol(symbol)
api_url = "https://fx-api-testnet.gateio.ws" if self.use_testnet else self.API_URL
# Gate.io supports various depths
api_depth = min(depth, 100)
url = f"{api_url}/api/v4/spot/order_book"
params = {
'currency_pair': gateio_symbol,
'limit': api_depth
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self._parse_orderbook_snapshot(data, symbol)
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot:
"""
Parse Gate.io order book data into OrderBookSnapshot.
Args:
data: Raw Gate.io order book data
symbol: Trading symbol
Returns:
OrderBookSnapshot: Parsed order book
"""
try:
# Parse bids and asks
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc), # Gate.io doesn't provide timestamp in snapshot
bids=bids,
asks=asks,
sequence_id=data.get('id')
)
return orderbook
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
async def _handle_orderbook_update(self, data: Dict) -> None:
"""
Handle order book update from Gate.io.
Args:
data: Order book update data
"""
try:
set_correlation_id()
params = data.get('params', [])
if len(params) < 2:
logger.warning("Invalid order book update format")
return
# Gate.io format: [symbol, order_book_data]
gateio_symbol = params[0]
symbol = self._denormalize_symbol(gateio_symbol)
book_data = params[1]
# Parse bids and asks
bids = []
for bid_data in book_data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in book_data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(int(book_data.get('t', 0)) / 1000, tz=timezone.utc),
bids=bids,
asks=asks,
sequence_id=book_data.get('id')
)
# Notify callbacks
self._notify_data_callbacks(orderbook)
logger.debug(f"Processed order book update for {symbol}")
except Exception as e:
logger.error(f"Error handling order book update: {e}")
async def _handle_trade_update(self, data: Dict) -> None:
"""
Handle trade update from Gate.io.
Args:
data: Trade update data
"""
try:
set_correlation_id()
params = data.get('params', [])
if len(params) < 2:
logger.warning("Invalid trade update format")
return
# Gate.io format: [symbol, [trade_data]]
gateio_symbol = params[0]
symbol = self._denormalize_symbol(gateio_symbol)
trades_data = params[1]
# Process each trade
for trade_data in trades_data:
price = float(trade_data.get('price', 0))
amount = float(trade_data.get('amount', 0))
# Validate data
if not validate_price(price) or not validate_volume(amount):
logger.warning(f"Invalid trade data: price={price}, amount={amount}")
continue
# Determine side (Gate.io uses 'side' field)
side = trade_data.get('side', 'unknown').lower()
# Create trade event
trade = TradeEvent(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(int(trade_data.get('time', 0)), tz=timezone.utc),
price=price,
size=amount,
side=side,
trade_id=str(trade_data.get('id', ''))
)
# Notify callbacks
self._notify_data_callbacks(trade)
logger.debug(f"Processed trade for {symbol}: {side} {amount} @ {price}")
except Exception as e:
logger.error(f"Error handling trade update: {e}")
async def _handle_pong(self, data: Dict) -> None:
"""
Handle pong response from Gate.io.
Args:
data: Pong response data
"""
logger.debug("Received Gate.io pong")
async def _handle_error_message(self, data: Dict) -> None:
"""
Handle error message from Gate.io.
Args:
data: Error message data
"""
error_info = data.get('error', {})
code = error_info.get('code', 'unknown')
message = error_info.get('message', 'Unknown error')
logger.error(f"Gate.io error {code}: {message}")
def _get_auth_signature(self, method: str, url: str, query_string: str,
payload: str, timestamp: str) -> str:
"""
Generate authentication signature for Gate.io.
Args:
method: HTTP method
url: Request URL
query_string: Query string
payload: Request payload
timestamp: Request timestamp
Returns:
str: Authentication signature
"""
if not self.api_key or not self.api_secret:
return ""
try:
# Create signature string
message = f"{method}\n{url}\n{query_string}\n{hashlib.sha512(payload.encode()).hexdigest()}\n{timestamp}"
# Generate signature
signature = hmac.new(
self.api_secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha512
).hexdigest()
return signature
except Exception as e:
logger.error(f"Error generating auth signature: {e}")
return ""
async def _send_ping(self) -> None:
"""Send ping to keep connection alive."""
try:
ping_msg = {
"method": "spot.ping",
"params": [],
"id": self.request_id
}
self.request_id += 1
await self._send_message(ping_msg)
logger.debug("Sent ping to Gate.io")
except Exception as e:
logger.error(f"Error sending ping: {e}")
def get_gateio_stats(self) -> Dict[str, Any]:
"""Get Gate.io-specific statistics."""
base_stats = self.get_stats()
gateio_stats = {
'subscribed_channels': list(self.subscribed_channels),
'use_testnet': self.use_testnet,
'authenticated': bool(self.api_key and self.api_secret),
'next_request_id': self.request_id
}
base_stats.update(gateio_stats)
return base_stats

View File

@ -0,0 +1,776 @@
"""
KuCoin exchange connector implementation.
Supports WebSocket connections to KuCoin with proper token-based authentication.
"""
import json
import hmac
import hashlib
import base64
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class KuCoinConnector(BaseExchangeConnector):
"""
KuCoin WebSocket connector implementation.
Supports:
- Token-based authentication
- Order book streams
- Trade streams
- Symbol normalization
- Bullet connection protocol
"""
# KuCoin API URLs
API_URL = "https://api.kucoin.com"
SANDBOX_API_URL = "https://openapi-sandbox.kucoin.com"
def __init__(self, use_sandbox: bool = False, api_key: str = None,
api_secret: str = None, passphrase: str = None):
"""
Initialize KuCoin connector.
Args:
use_sandbox: Whether to use sandbox environment
api_key: API key for authentication (optional)
api_secret: API secret for authentication (optional)
passphrase: API passphrase for authentication (optional)
"""
# KuCoin requires getting WebSocket URL from REST API
super().__init__("kucoin", "") # URL will be set after token retrieval
# Authentication credentials (optional)
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.use_sandbox = use_sandbox
# KuCoin-specific attributes
self.token = None
self.connect_id = None
self.ping_interval = 18000 # 18 seconds (KuCoin requirement)
self.ping_timeout = 10000 # 10 seconds
# KuCoin-specific message handlers
self.message_handlers.update({
'message': self._handle_data_message,
'welcome': self._handle_welcome_message,
'ack': self._handle_ack_message,
'error': self._handle_error_message,
'pong': self._handle_pong_message
})
# Subscription tracking
self.subscribed_topics = set()
self.subscription_id = 1
logger.info(f"KuCoin connector initialized ({'sandbox' if use_sandbox else 'live'})")
def _get_message_type(self, data: Dict) -> str:
"""
Determine message type from KuCoin message data.
Args:
data: Parsed message data
Returns:
str: Message type identifier
"""
# KuCoin message format
if 'type' in data:
return data['type'] # 'message', 'welcome', 'ack', 'error', 'pong'
elif 'subject' in data:
# Data message with subject
return 'message'
return 'unknown'
def normalize_symbol(self, symbol: str) -> str:
"""
Normalize symbol to KuCoin format.
Args:
symbol: Standard symbol format (e.g., 'BTCUSDT')
Returns:
str: KuCoin symbol format (e.g., 'BTC-USDT')
"""
# KuCoin uses dash-separated format
if symbol.upper() == 'BTCUSDT':
return 'BTC-USDT'
elif symbol.upper() == 'ETHUSDT':
return 'ETH-USDT'
elif symbol.upper().endswith('USDT'):
base = symbol[:-4].upper()
return f"{base}-USDT"
elif symbol.upper().endswith('USD'):
base = symbol[:-3].upper()
return f"{base}-USD"
else:
# Assume it's already in correct format or add dash
if '-' not in symbol:
# Try to split common patterns
if len(symbol) >= 6:
# Assume last 4 chars are quote currency
base = symbol[:-4].upper()
quote = symbol[-4:].upper()
return f"{base}-{quote}"
else:
return symbol.upper()
else:
return symbol.upper()
def _denormalize_symbol(self, kucoin_symbol: str) -> str:
"""
Convert KuCoin symbol back to standard format.
Args:
kucoin_symbol: KuCoin symbol format (e.g., 'BTC-USDT')
Returns:
str: Standard symbol format (e.g., 'BTCUSDT')
"""
if '-' in kucoin_symbol:
return kucoin_symbol.replace('-', '')
return kucoin_symbol
async def _get_websocket_token(self) -> Optional[Dict[str, Any]]:
"""
Get WebSocket connection token from KuCoin REST API.
Returns:
Dict: Token information including WebSocket URL
"""
try:
import aiohttp
api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL
endpoint = "/api/v1/bullet-public"
# Use private endpoint if authenticated
if self.api_key and self.api_secret and self.passphrase:
endpoint = "/api/v1/bullet-private"
headers = self._get_auth_headers("POST", endpoint, "")
else:
headers = {}
async with aiohttp.ClientSession() as session:
async with session.post(f"{api_url}{endpoint}", headers=headers) as response:
if response.status == 200:
data = await response.json()
if data.get('code') != '200000':
logger.error(f"KuCoin token error: {data.get('msg')}")
return None
return data.get('data')
else:
logger.error(f"Failed to get KuCoin token: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting KuCoin WebSocket token: {e}")
return None
async def connect(self) -> bool:
"""Override connect to get token first."""
try:
# Get WebSocket token and URL
token_data = await self._get_websocket_token()
if not token_data:
logger.error("Failed to get KuCoin WebSocket token")
return False
self.token = token_data.get('token')
servers = token_data.get('instanceServers', [])
if not servers:
logger.error("No KuCoin WebSocket servers available")
return False
# Use first available server
server = servers[0]
self.websocket_url = f"{server['endpoint']}?token={self.token}&connectId={int(time.time() * 1000)}"
self.ping_interval = server.get('pingInterval', 18000)
self.ping_timeout = server.get('pingTimeout', 10000)
logger.info(f"KuCoin WebSocket URL: {server['endpoint']}")
# Now connect using the base connector method
return await super().connect()
except Exception as e:
logger.error(f"Error connecting to KuCoin: {e}")
return False
async def subscribe_orderbook(self, symbol: str) -> None:
"""
Subscribe to order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
kucoin_symbol = self.normalize_symbol(symbol)
topic = f"/market/level2:{kucoin_symbol}"
# Create subscription message
subscription_msg = {
"id": str(self.subscription_id),
"type": "subscribe",
"topic": topic,
"privateChannel": False,
"response": True
}
self.subscription_id += 1
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
self.subscribed_topics.add(topic)
logger.info(f"Subscribed to order book for {symbol} ({kucoin_symbol}) on KuCoin")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on KuCoin")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""
Subscribe to trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
kucoin_symbol = self.normalize_symbol(symbol)
topic = f"/market/match:{kucoin_symbol}"
# Create subscription message
subscription_msg = {
"id": str(self.subscription_id),
"type": "subscribe",
"topic": topic,
"privateChannel": False,
"response": True
}
self.subscription_id += 1
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
self.subscribed_topics.add(topic)
logger.info(f"Subscribed to trades for {symbol} ({kucoin_symbol}) on KuCoin")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on KuCoin")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""
Unsubscribe from order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
kucoin_symbol = self.normalize_symbol(symbol)
topic = f"/market/level2:{kucoin_symbol}"
# Create unsubscription message
unsubscription_msg = {
"id": str(self.subscription_id),
"type": "unsubscribe",
"topic": topic,
"privateChannel": False,
"response": True
}
self.subscription_id += 1
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('orderbook')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_topics.discard(topic)
logger.info(f"Unsubscribed from order book for {symbol} ({kucoin_symbol}) on KuCoin")
else:
logger.error(f"Failed to unsubscribe from order book for {symbol} on KuCoin")
except Exception as e:
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
raise
async def unsubscribe_trades(self, symbol: str) -> None:
"""
Unsubscribe from trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
kucoin_symbol = self.normalize_symbol(symbol)
topic = f"/market/match:{kucoin_symbol}"
# Create unsubscription message
unsubscription_msg = {
"id": str(self.subscription_id),
"type": "unsubscribe",
"topic": topic,
"privateChannel": False,
"response": True
}
self.subscription_id += 1
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('trades')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_topics.discard(topic)
logger.info(f"Unsubscribed from trades for {symbol} ({kucoin_symbol}) on KuCoin")
else:
logger.error(f"Failed to unsubscribe from trades for {symbol} on KuCoin")
except Exception as e:
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
raise
async def get_symbols(self) -> List[str]:
"""
Get list of available trading symbols from KuCoin.
Returns:
List[str]: List of available symbols in standard format
"""
try:
import aiohttp
api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL
async with aiohttp.ClientSession() as session:
async with session.get(f"{api_url}/api/v1/symbols") as response:
if response.status == 200:
data = await response.json()
if data.get('code') != '200000':
logger.error(f"KuCoin API error: {data.get('msg')}")
return []
symbols = []
symbol_data = data.get('data', [])
for symbol_info in symbol_data:
if symbol_info.get('enableTrading'):
symbol = symbol_info.get('symbol', '')
# Convert to standard format
standard_symbol = self._denormalize_symbol(symbol)
symbols.append(standard_symbol)
logger.info(f"Retrieved {len(symbols)} symbols from KuCoin")
return symbols
else:
logger.error(f"Failed to get symbols from KuCoin: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from KuCoin: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""
Get current order book snapshot from KuCoin REST API.
Args:
symbol: Trading symbol
depth: Number of price levels to retrieve
Returns:
OrderBookSnapshot: Current order book or None if unavailable
"""
try:
import aiohttp
kucoin_symbol = self.normalize_symbol(symbol)
api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL
url = f"{api_url}/api/v1/market/orderbook/level2_20"
params = {'symbol': kucoin_symbol}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get('code') != '200000':
logger.error(f"KuCoin API error: {data.get('msg')}")
return None
result = data.get('data', {})
return self._parse_orderbook_snapshot(result, symbol)
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot:
"""
Parse KuCoin order book data into OrderBookSnapshot.
Args:
data: Raw KuCoin order book data
symbol: Trading symbol
Returns:
OrderBookSnapshot: Parsed order book
"""
try:
# Parse bids and asks
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(int(data.get('time', 0)) / 1000, tz=timezone.utc),
bids=bids,
asks=asks,
sequence_id=int(data.get('sequence', 0))
)
return orderbook
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
async def _handle_data_message(self, data: Dict) -> None:
"""
Handle data message from KuCoin.
Args:
data: Data message
"""
try:
set_correlation_id()
subject = data.get('subject', '')
topic = data.get('topic', '')
message_data = data.get('data', {})
if 'level2' in subject:
await self._handle_orderbook_update(data)
elif 'match' in subject:
await self._handle_trade_update(data)
else:
logger.debug(f"Unhandled KuCoin subject: {subject}")
except Exception as e:
logger.error(f"Error handling data message: {e}")
async def _handle_orderbook_update(self, data: Dict) -> None:
"""
Handle order book update from KuCoin.
Args:
data: Order book update data
"""
try:
topic = data.get('topic', '')
if not topic:
logger.warning("Order book update missing topic")
return
# Extract symbol from topic: /market/level2:BTC-USDT
parts = topic.split(':')
if len(parts) < 2:
logger.warning("Invalid order book topic format")
return
kucoin_symbol = parts[1]
symbol = self._denormalize_symbol(kucoin_symbol)
message_data = data.get('data', {})
changes = message_data.get('changes', {})
# Parse bids and asks changes
bids = []
for bid_data in changes.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in changes.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(int(message_data.get('time', 0)) / 1000, tz=timezone.utc),
bids=bids,
asks=asks,
sequence_id=int(message_data.get('sequenceEnd', 0))
)
# Notify callbacks
self._notify_data_callbacks(orderbook)
logger.debug(f"Processed order book update for {symbol}")
except Exception as e:
logger.error(f"Error handling order book update: {e}")
async def _handle_trade_update(self, data: Dict) -> None:
"""
Handle trade update from KuCoin.
Args:
data: Trade update data
"""
try:
topic = data.get('topic', '')
if not topic:
logger.warning("Trade update missing topic")
return
# Extract symbol from topic: /market/match:BTC-USDT
parts = topic.split(':')
if len(parts) < 2:
logger.warning("Invalid trade topic format")
return
kucoin_symbol = parts[1]
symbol = self._denormalize_symbol(kucoin_symbol)
message_data = data.get('data', {})
price = float(message_data.get('price', 0))
size = float(message_data.get('size', 0))
# Validate data
if not validate_price(price) or not validate_volume(size):
logger.warning(f"Invalid trade data: price={price}, size={size}")
return
# Determine side (KuCoin uses 'side' field)
side = message_data.get('side', 'unknown').lower()
# Create trade event
trade = TradeEvent(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(int(message_data.get('time', 0)) / 1000, tz=timezone.utc),
price=price,
size=size,
side=side,
trade_id=str(message_data.get('tradeId', ''))
)
# Notify callbacks
self._notify_data_callbacks(trade)
logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}")
except Exception as e:
logger.error(f"Error handling trade update: {e}")
async def _handle_welcome_message(self, data: Dict) -> None:
"""
Handle welcome message from KuCoin.
Args:
data: Welcome message data
"""
try:
connect_id = data.get('id')
if connect_id:
self.connect_id = connect_id
logger.info(f"KuCoin connection established with ID: {connect_id}")
except Exception as e:
logger.error(f"Error handling welcome message: {e}")
async def _handle_ack_message(self, data: Dict) -> None:
"""
Handle acknowledgment message from KuCoin.
Args:
data: Ack message data
"""
try:
msg_id = data.get('id', '')
logger.debug(f"KuCoin ACK received for message ID: {msg_id}")
except Exception as e:
logger.error(f"Error handling ack message: {e}")
async def _handle_error_message(self, data: Dict) -> None:
"""
Handle error message from KuCoin.
Args:
data: Error message data
"""
try:
code = data.get('code', 'unknown')
message = data.get('data', 'Unknown error')
logger.error(f"KuCoin error {code}: {message}")
except Exception as e:
logger.error(f"Error handling error message: {e}")
async def _handle_pong_message(self, data: Dict) -> None:
"""
Handle pong message from KuCoin.
Args:
data: Pong message data
"""
logger.debug("Received KuCoin pong")
def _get_auth_headers(self, method: str, endpoint: str, body: str) -> Dict[str, str]:
"""
Generate authentication headers for KuCoin API.
Args:
method: HTTP method
endpoint: API endpoint
body: Request body
Returns:
Dict: Authentication headers
"""
if not all([self.api_key, self.api_secret, self.passphrase]):
return {}
try:
timestamp = str(int(time.time() * 1000))
# Create signature string
str_to_sign = timestamp + method + endpoint + body
signature = base64.b64encode(
hmac.new(
self.api_secret.encode('utf-8'),
str_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
).decode('utf-8')
# Create passphrase signature
passphrase_signature = base64.b64encode(
hmac.new(
self.api_secret.encode('utf-8'),
self.passphrase.encode('utf-8'),
hashlib.sha256
).digest()
).decode('utf-8')
return {
'KC-API-SIGN': signature,
'KC-API-TIMESTAMP': timestamp,
'KC-API-KEY': self.api_key,
'KC-API-PASSPHRASE': passphrase_signature,
'KC-API-KEY-VERSION': '2',
'Content-Type': 'application/json'
}
except Exception as e:
logger.error(f"Error generating auth headers: {e}")
return {}
async def _send_ping(self) -> None:
"""Send ping to keep connection alive."""
try:
ping_msg = {
"id": str(self.subscription_id),
"type": "ping"
}
self.subscription_id += 1
await self._send_message(ping_msg)
logger.debug("Sent ping to KuCoin")
except Exception as e:
logger.error(f"Error sending ping: {e}")
def get_kucoin_stats(self) -> Dict[str, Any]:
"""Get KuCoin-specific statistics."""
base_stats = self.get_stats()
kucoin_stats = {
'subscribed_topics': list(self.subscribed_topics),
'use_sandbox': self.use_sandbox,
'authenticated': bool(self.api_key and self.api_secret and self.passphrase),
'connect_id': self.connect_id,
'token_available': bool(self.token),
'next_subscription_id': self.subscription_id
}
base_stats.update(kucoin_stats)
return base_stats

View File

@ -0,0 +1,282 @@
"""
MEXC exchange connector implementation.
Supports WebSocket connections to MEXC with their WebSocket streams.
"""
import json
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class MEXCConnector(BaseExchangeConnector):
"""
MEXC WebSocket connector implementation.
Supports:
- Order book streams
- Trade streams
- Symbol normalization
"""
# MEXC WebSocket URLs
WEBSOCKET_URL = "wss://wbs.mexc.com/ws"
API_URL = "https://api.mexc.com"
def __init__(self, api_key: str = None, api_secret: str = None):
"""Initialize MEXC connector."""
super().__init__("mexc", self.WEBSOCKET_URL)
self.api_key = api_key
self.api_secret = api_secret
# MEXC-specific message handlers
self.message_handlers.update({
'spot@public.deals.v3.api': self._handle_trade_update,
'spot@public.increase.depth.v3.api': self._handle_orderbook_update,
'spot@public.limit.depth.v3.api': self._handle_orderbook_snapshot,
'pong': self._handle_pong
})
# Subscription tracking
self.subscribed_streams = set()
self.request_id = 1
logger.info("MEXC connector initialized")
def _get_message_type(self, data: Dict) -> str:
"""Determine message type from MEXC message data."""
if 'c' in data: # Channel
return data['c']
elif 'msg' in data:
return 'message'
elif 'pong' in data:
return 'pong'
return 'unknown'
def normalize_symbol(self, symbol: str) -> str:
"""Normalize symbol to MEXC format."""
# MEXC uses uppercase without separators (same as Binance)
normalized = symbol.upper().replace('-', '').replace('/', '')
if not validate_symbol(normalized):
raise ValidationError(f"Invalid symbol format: {symbol}", "INVALID_SYMBOL")
return normalized
async def subscribe_orderbook(self, symbol: str) -> None:
"""Subscribe to order book updates for a symbol."""
try:
set_correlation_id()
mexc_symbol = self.normalize_symbol(symbol)
subscription_msg = {
"method": "SUBSCRIPTION",
"params": [f"spot@public.limit.depth.v3.api@{mexc_symbol}@20"]
}
success = await self._send_message(subscription_msg)
if success:
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
self.subscribed_streams.add(f"spot@public.limit.depth.v3.api@{mexc_symbol}@20")
logger.info(f"Subscribed to order book for {symbol} ({mexc_symbol}) on MEXC")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on MEXC")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""Subscribe to trade updates for a symbol."""
try:
set_correlation_id()
mexc_symbol = self.normalize_symbol(symbol)
subscription_msg = {
"method": "SUBSCRIPTION",
"params": [f"spot@public.deals.v3.api@{mexc_symbol}"]
}
success = await self._send_message(subscription_msg)
if success:
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
self.subscribed_streams.add(f"spot@public.deals.v3.api@{mexc_symbol}")
logger.info(f"Subscribed to trades for {symbol} ({mexc_symbol}) on MEXC")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on MEXC")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""Unsubscribe from order book updates."""
try:
mexc_symbol = self.normalize_symbol(symbol)
unsubscription_msg = {
"method": "UNSUBSCRIPTION",
"params": [f"spot@public.limit.depth.v3.api@{mexc_symbol}@20"]
}
success = await self._send_message(unsubscription_msg)
if success:
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('orderbook')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_streams.discard(f"spot@public.limit.depth.v3.api@{mexc_symbol}@20")
logger.info(f"Unsubscribed from order book for {symbol} on MEXC")
except Exception as e:
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
raise
async def unsubscribe_trades(self, symbol: str) -> None:
"""Unsubscribe from trade updates."""
try:
mexc_symbol = self.normalize_symbol(symbol)
unsubscription_msg = {
"method": "UNSUBSCRIPTION",
"params": [f"spot@public.deals.v3.api@{mexc_symbol}"]
}
success = await self._send_message(unsubscription_msg)
if success:
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('trades')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.subscribed_streams.discard(f"spot@public.deals.v3.api@{mexc_symbol}")
logger.info(f"Unsubscribed from trades for {symbol} on MEXC")
except Exception as e:
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
raise
async def get_symbols(self) -> List[str]:
"""Get available symbols from MEXC."""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.API_URL}/api/v3/exchangeInfo") as response:
if response.status == 200:
data = await response.json()
symbols = [
symbol_info['symbol']
for symbol_info in data.get('symbols', [])
if symbol_info.get('status') == 'TRADING'
]
logger.info(f"Retrieved {len(symbols)} symbols from MEXC")
return symbols
else:
logger.error(f"Failed to get symbols from MEXC: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from MEXC: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""Get order book snapshot from MEXC REST API."""
try:
import aiohttp
mexc_symbol = self.normalize_symbol(symbol)
url = f"{self.API_URL}/api/v3/depth"
params = {'symbol': mexc_symbol, 'limit': min(depth, 5000)}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self._parse_orderbook_snapshot(data, symbol)
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot:
"""Parse MEXC order book data."""
try:
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
return OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks,
sequence_id=data.get('lastUpdateId')
)
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
async def _handle_orderbook_update(self, data: Dict) -> None:
"""Handle order book update from MEXC."""
# Implementation would parse MEXC-specific order book update format
logger.debug("Received MEXC order book update")
async def _handle_orderbook_snapshot(self, data: Dict) -> None:
"""Handle order book snapshot from MEXC."""
# Implementation would parse MEXC-specific order book snapshot format
logger.debug("Received MEXC order book snapshot")
async def _handle_trade_update(self, data: Dict) -> None:
"""Handle trade update from MEXC."""
# Implementation would parse MEXC-specific trade format
logger.debug("Received MEXC trade update")
async def _handle_pong(self, data: Dict) -> None:
"""Handle pong response from MEXC."""
logger.debug("Received MEXC pong")
def get_mexc_stats(self) -> Dict[str, Any]:
"""Get MEXC-specific statistics."""
base_stats = self.get_stats()
mexc_stats = {
'subscribed_streams': list(self.subscribed_streams),
'authenticated': bool(self.api_key and self.api_secret),
'next_request_id': self.request_id
}
base_stats.update(mexc_stats)
return base_stats