diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index e0acd31..c982c6f 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -131,6 +131,9 @@ - Create compatibility layer for seamless integration with current data provider - Add data quality indicators and metadata in responses - Implement switching mechanism between live and replay modes + + + - Write integration tests with existing orchestrator code - _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_ diff --git a/COBY/connectors/__init__.py b/COBY/connectors/__init__.py index a14dd61..febaa8d 100644 --- a/COBY/connectors/__init__.py +++ b/COBY/connectors/__init__.py @@ -9,6 +9,10 @@ from .kraken_connector import KrakenConnector from .bybit_connector import BybitConnector from .okx_connector import OKXConnector from .huobi_connector import HuobiConnector +from .kucoin_connector import KuCoinConnector +from .gateio_connector import GateIOConnector +from .bitfinex_connector import BitfinexConnector +from .mexc_connector import MEXCConnector from .connection_manager import ConnectionManager from .circuit_breaker import CircuitBreaker @@ -20,6 +24,10 @@ __all__ = [ 'BybitConnector', 'OKXConnector', 'HuobiConnector', + 'KuCoinConnector', + 'GateIOConnector', + 'BitfinexConnector', + 'MEXCConnector', 'ConnectionManager', 'CircuitBreaker' ] \ No newline at end of file diff --git a/COBY/connectors/bitfinex_connector.py b/COBY/connectors/bitfinex_connector.py new file mode 100644 index 0000000..24d7812 --- /dev/null +++ b/COBY/connectors/bitfinex_connector.py @@ -0,0 +1,270 @@ +""" +Bitfinex exchange connector implementation. +Supports WebSocket connections to Bitfinex with proper channel subscription management. +""" + +import json +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class BitfinexConnector(BaseExchangeConnector): + """ + Bitfinex WebSocket connector implementation. + + Supports: + - Channel subscription management + - Order book streams + - Trade streams + - Symbol normalization + """ + + # Bitfinex WebSocket URLs + WEBSOCKET_URL = "wss://api-pub.bitfinex.com/ws/2" + API_URL = "https://api-pub.bitfinex.com" + + def __init__(self, api_key: str = None, api_secret: str = None): + """Initialize Bitfinex connector.""" + super().__init__("bitfinex", self.WEBSOCKET_URL) + + self.api_key = api_key + self.api_secret = api_secret + + # Bitfinex-specific message handlers + self.message_handlers.update({ + 'subscribed': self._handle_subscription_response, + 'unsubscribed': self._handle_unsubscription_response, + 'error': self._handle_error_message, + 'info': self._handle_info_message + }) + + # Channel management + self.channels = {} # channel_id -> channel_info + self.subscribed_symbols = set() + + logger.info("Bitfinex connector initialized") + + def _get_message_type(self, data) -> str: + """Determine message type from Bitfinex message data.""" + if isinstance(data, dict): + if 'event' in data: + return data['event'] + elif 'error' in data: + return 'error' + elif isinstance(data, list) and len(data) >= 2: + # Data message format: [CHANNEL_ID, data] + return 'data' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """Normalize symbol to Bitfinex format.""" + # Bitfinex uses 't' prefix for trading pairs + if symbol.upper() == 'BTCUSDT': + return 'tBTCUSD' + elif symbol.upper() == 'ETHUSDT': + return 'tETHUSD' + elif symbol.upper().endswith('USDT'): + base = symbol[:-4].upper() + return f"t{base}USD" + else: + # Generic conversion + normalized = symbol.upper().replace('-', '').replace('/', '') + return f"t{normalized}" if not normalized.startswith('t') else normalized + + def _denormalize_symbol(self, bitfinex_symbol: str) -> str: + """Convert Bitfinex symbol back to standard format.""" + if bitfinex_symbol.startswith('t'): + symbol = bitfinex_symbol[1:] # Remove 't' prefix + if symbol.endswith('USD'): + return symbol[:-3] + 'USDT' + return symbol + return bitfinex_symbol + + async def subscribe_orderbook(self, symbol: str) -> None: + """Subscribe to order book updates for a symbol.""" + try: + set_correlation_id() + bitfinex_symbol = self.normalize_symbol(symbol) + + subscription_msg = { + "event": "subscribe", + "channel": "book", + "symbol": bitfinex_symbol, + "prec": "P0", + "freq": "F0", + "len": "25" + } + + success = await self._send_message(subscription_msg) + if success: + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.subscribed_symbols.add(bitfinex_symbol) + logger.info(f"Subscribed to order book for {symbol} ({bitfinex_symbol}) on Bitfinex") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on Bitfinex") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """Subscribe to trade updates for a symbol.""" + try: + set_correlation_id() + bitfinex_symbol = self.normalize_symbol(symbol) + + subscription_msg = { + "event": "subscribe", + "channel": "trades", + "symbol": bitfinex_symbol + } + + success = await self._send_message(subscription_msg) + if success: + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.subscribed_symbols.add(bitfinex_symbol) + logger.info(f"Subscribed to trades for {symbol} ({bitfinex_symbol}) on Bitfinex") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on Bitfinex") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """Unsubscribe from order book updates.""" + # Implementation would find the channel ID and send unsubscribe message + pass + + async def unsubscribe_trades(self, symbol: str) -> None: + """Unsubscribe from trade updates.""" + # Implementation would find the channel ID and send unsubscribe message + pass + + async def get_symbols(self) -> List[str]: + """Get available symbols from Bitfinex.""" + try: + import aiohttp + + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.API_URL}/v1/symbols") as response: + if response.status == 200: + data = await response.json() + symbols = [self._denormalize_symbol(f"t{s.upper()}") for s in data] + logger.info(f"Retrieved {len(symbols)} symbols from Bitfinex") + return symbols + else: + logger.error(f"Failed to get symbols from Bitfinex: HTTP {response.status}") + return [] + except Exception as e: + logger.error(f"Error getting symbols from Bitfinex: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """Get order book snapshot from Bitfinex REST API.""" + try: + import aiohttp + + bitfinex_symbol = self.normalize_symbol(symbol) + url = f"{self.API_URL}/v2/book/{bitfinex_symbol}/P0" + params = {'len': min(depth, 100)} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + return self._parse_orderbook_snapshot(data, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: List, symbol: str) -> OrderBookSnapshot: + """Parse Bitfinex order book data.""" + try: + bids = [] + asks = [] + + for level in data: + price = float(level[0]) + count = int(level[1]) + amount = float(level[2]) + + if validate_price(price) and validate_volume(abs(amount)): + if amount > 0: + bids.append(PriceLevel(price=price, size=amount)) + else: + asks.append(PriceLevel(price=price, size=abs(amount))) + + return OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks + ) + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_subscription_response(self, data: Dict) -> None: + """Handle subscription response.""" + channel_id = data.get('chanId') + channel = data.get('channel') + symbol = data.get('symbol', '') + + if channel_id: + self.channels[channel_id] = { + 'channel': channel, + 'symbol': symbol + } + logger.info(f"Bitfinex subscription confirmed: {channel} for {symbol} (ID: {channel_id})") + + async def _handle_unsubscription_response(self, data: Dict) -> None: + """Handle unsubscription response.""" + channel_id = data.get('chanId') + if channel_id in self.channels: + del self.channels[channel_id] + logger.info(f"Bitfinex unsubscription confirmed for channel {channel_id}") + + async def _handle_error_message(self, data: Dict) -> None: + """Handle error message.""" + error_msg = data.get('msg', 'Unknown error') + error_code = data.get('code', 'unknown') + logger.error(f"Bitfinex error {error_code}: {error_msg}") + + async def _handle_info_message(self, data: Dict) -> None: + """Handle info message.""" + logger.info(f"Bitfinex info: {data}") + + def get_bitfinex_stats(self) -> Dict[str, Any]: + """Get Bitfinex-specific statistics.""" + base_stats = self.get_stats() + + bitfinex_stats = { + 'active_channels': len(self.channels), + 'subscribed_symbols': list(self.subscribed_symbols), + 'authenticated': bool(self.api_key and self.api_secret) + } + + base_stats.update(bitfinex_stats) + return base_stats \ No newline at end of file diff --git a/COBY/connectors/gateio_connector.py b/COBY/connectors/gateio_connector.py new file mode 100644 index 0000000..69ef582 --- /dev/null +++ b/COBY/connectors/gateio_connector.py @@ -0,0 +1,601 @@ +""" +Gate.io exchange connector implementation. +Supports WebSocket connections to Gate.io with their WebSocket v4 API. +""" + +import json +import hmac +import hashlib +import time +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class GateIOConnector(BaseExchangeConnector): + """ + Gate.io WebSocket connector implementation. + + Supports: + - WebSocket v4 API + - Order book streams + - Trade streams + - Symbol normalization + - Authentication for private channels + """ + + # Gate.io WebSocket URLs + WEBSOCKET_URL = "wss://api.gateio.ws/ws/v4/" + TESTNET_URL = "wss://fx-api-testnet.gateio.ws/ws/v4/" + API_URL = "https://api.gateio.ws" + + def __init__(self, use_testnet: bool = False, api_key: str = None, api_secret: str = None): + """ + Initialize Gate.io connector. + + Args: + use_testnet: Whether to use testnet environment + api_key: API key for authentication (optional) + api_secret: API secret for authentication (optional) + """ + websocket_url = self.TESTNET_URL if use_testnet else self.WEBSOCKET_URL + super().__init__("gateio", websocket_url) + + # Authentication credentials (optional) + self.api_key = api_key + self.api_secret = api_secret + self.use_testnet = use_testnet + + # Gate.io-specific message handlers + self.message_handlers.update({ + 'spot.order_book_update': self._handle_orderbook_update, + 'spot.trades': self._handle_trade_update, + 'spot.pong': self._handle_pong, + 'error': self._handle_error_message + }) + + # Subscription tracking + self.subscribed_channels = set() + self.request_id = 1 + + logger.info(f"Gate.io connector initialized ({'testnet' if use_testnet else 'mainnet'})") + + def _get_message_type(self, data: Dict) -> str: + """ + Determine message type from Gate.io message data. + + Args: + data: Parsed message data + + Returns: + str: Message type identifier + """ + # Gate.io v4 API message format + if 'method' in data: + return data['method'] # 'spot.order_book_update', 'spot.trades', etc. + elif 'error' in data: + return 'error' + elif 'result' in data: + return 'result' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to Gate.io format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: Gate.io symbol format (e.g., 'BTC_USDT') + """ + # Gate.io uses underscore-separated format + if symbol.upper() == 'BTCUSDT': + return 'BTC_USDT' + elif symbol.upper() == 'ETHUSDT': + return 'ETH_USDT' + elif symbol.upper().endswith('USDT'): + base = symbol[:-4].upper() + return f"{base}_USDT" + elif symbol.upper().endswith('USD'): + base = symbol[:-3].upper() + return f"{base}_USD" + else: + # Assume it's already in correct format or add underscore + if '_' not in symbol: + # Try to split common patterns + if len(symbol) >= 6: + # Assume last 4 chars are quote currency + base = symbol[:-4].upper() + quote = symbol[-4:].upper() + return f"{base}_{quote}" + else: + return symbol.upper() + else: + return symbol.upper() + + def _denormalize_symbol(self, gateio_symbol: str) -> str: + """ + Convert Gate.io symbol back to standard format. + + Args: + gateio_symbol: Gate.io symbol format (e.g., 'BTC_USDT') + + Returns: + str: Standard symbol format (e.g., 'BTCUSDT') + """ + if '_' in gateio_symbol: + return gateio_symbol.replace('_', '') + return gateio_symbol + + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + gateio_symbol = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "method": "spot.order_book", + "params": [gateio_symbol, 20, "0"], # symbol, limit, interval + "id": self.request_id + } + self.request_id += 1 + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.subscribed_channels.add(f"spot.order_book:{gateio_symbol}") + + logger.info(f"Subscribed to order book for {symbol} ({gateio_symbol}) on Gate.io") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on Gate.io") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + gateio_symbol = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "method": "spot.trades", + "params": [gateio_symbol], + "id": self.request_id + } + self.request_id += 1 + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.subscribed_channels.add(f"spot.trades:{gateio_symbol}") + + logger.info(f"Subscribed to trades for {symbol} ({gateio_symbol}) on Gate.io") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on Gate.io") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + gateio_symbol = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "method": "spot.unsubscribe", + "params": [f"spot.order_book", gateio_symbol], + "id": self.request_id + } + self.request_id += 1 + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_channels.discard(f"spot.order_book:{gateio_symbol}") + + logger.info(f"Unsubscribed from order book for {symbol} ({gateio_symbol}) on Gate.io") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on Gate.io") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + gateio_symbol = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "method": "spot.unsubscribe", + "params": ["spot.trades", gateio_symbol], + "id": self.request_id + } + self.request_id += 1 + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_channels.discard(f"spot.trades:{gateio_symbol}") + + logger.info(f"Unsubscribed from trades for {symbol} ({gateio_symbol}) on Gate.io") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on Gate.io") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols from Gate.io. + + Returns: + List[str]: List of available symbols in standard format + """ + try: + import aiohttp + + api_url = "https://fx-api-testnet.gateio.ws" if self.use_testnet else self.API_URL + + async with aiohttp.ClientSession() as session: + async with session.get(f"{api_url}/api/v4/spot/currency_pairs") as response: + if response.status == 200: + data = await response.json() + + symbols = [] + + for pair_info in data: + if pair_info.get('trade_status') == 'tradable': + pair_id = pair_info.get('id', '') + # Convert to standard format + standard_symbol = self._denormalize_symbol(pair_id) + symbols.append(standard_symbol) + + logger.info(f"Retrieved {len(symbols)} symbols from Gate.io") + return symbols + else: + logger.error(f"Failed to get symbols from Gate.io: HTTP {response.status}") + return [] + + except Exception as e: + logger.error(f"Error getting symbols from Gate.io: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot from Gate.io REST API. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + try: + import aiohttp + + gateio_symbol = self.normalize_symbol(symbol) + api_url = "https://fx-api-testnet.gateio.ws" if self.use_testnet else self.API_URL + + # Gate.io supports various depths + api_depth = min(depth, 100) + + url = f"{api_url}/api/v4/spot/order_book" + params = { + 'currency_pair': gateio_symbol, + 'limit': api_depth + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + return self._parse_orderbook_snapshot(data, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """ + Parse Gate.io order book data into OrderBookSnapshot. + + Args: + data: Raw Gate.io order book data + symbol: Trading symbol + + Returns: + OrderBookSnapshot: Parsed order book + """ + try: + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), # Gate.io doesn't provide timestamp in snapshot + bids=bids, + asks=asks, + sequence_id=data.get('id') + ) + + return orderbook + + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_orderbook_update(self, data: Dict) -> None: + """ + Handle order book update from Gate.io. + + Args: + data: Order book update data + """ + try: + set_correlation_id() + + params = data.get('params', []) + if len(params) < 2: + logger.warning("Invalid order book update format") + return + + # Gate.io format: [symbol, order_book_data] + gateio_symbol = params[0] + symbol = self._denormalize_symbol(gateio_symbol) + book_data = params[1] + + # Parse bids and asks + bids = [] + for bid_data in book_data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in book_data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(book_data.get('t', 0)) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=book_data.get('id') + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") + + async def _handle_trade_update(self, data: Dict) -> None: + """ + Handle trade update from Gate.io. + + Args: + data: Trade update data + """ + try: + set_correlation_id() + + params = data.get('params', []) + if len(params) < 2: + logger.warning("Invalid trade update format") + return + + # Gate.io format: [symbol, [trade_data]] + gateio_symbol = params[0] + symbol = self._denormalize_symbol(gateio_symbol) + trades_data = params[1] + + # Process each trade + for trade_data in trades_data: + price = float(trade_data.get('price', 0)) + amount = float(trade_data.get('amount', 0)) + + # Validate data + if not validate_price(price) or not validate_volume(amount): + logger.warning(f"Invalid trade data: price={price}, amount={amount}") + continue + + # Determine side (Gate.io uses 'side' field) + side = trade_data.get('side', 'unknown').lower() + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(trade_data.get('time', 0)), tz=timezone.utc), + price=price, + size=amount, + side=side, + trade_id=str(trade_data.get('id', '')) + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {side} {amount} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") + + async def _handle_pong(self, data: Dict) -> None: + """ + Handle pong response from Gate.io. + + Args: + data: Pong response data + """ + logger.debug("Received Gate.io pong") + + async def _handle_error_message(self, data: Dict) -> None: + """ + Handle error message from Gate.io. + + Args: + data: Error message data + """ + error_info = data.get('error', {}) + code = error_info.get('code', 'unknown') + message = error_info.get('message', 'Unknown error') + + logger.error(f"Gate.io error {code}: {message}") + + def _get_auth_signature(self, method: str, url: str, query_string: str, + payload: str, timestamp: str) -> str: + """ + Generate authentication signature for Gate.io. + + Args: + method: HTTP method + url: Request URL + query_string: Query string + payload: Request payload + timestamp: Request timestamp + + Returns: + str: Authentication signature + """ + if not self.api_key or not self.api_secret: + return "" + + try: + # Create signature string + message = f"{method}\n{url}\n{query_string}\n{hashlib.sha512(payload.encode()).hexdigest()}\n{timestamp}" + + # Generate signature + signature = hmac.new( + self.api_secret.encode('utf-8'), + message.encode('utf-8'), + hashlib.sha512 + ).hexdigest() + + return signature + + except Exception as e: + logger.error(f"Error generating auth signature: {e}") + return "" + + async def _send_ping(self) -> None: + """Send ping to keep connection alive.""" + try: + ping_msg = { + "method": "spot.ping", + "params": [], + "id": self.request_id + } + self.request_id += 1 + + await self._send_message(ping_msg) + logger.debug("Sent ping to Gate.io") + + except Exception as e: + logger.error(f"Error sending ping: {e}") + + def get_gateio_stats(self) -> Dict[str, Any]: + """Get Gate.io-specific statistics.""" + base_stats = self.get_stats() + + gateio_stats = { + 'subscribed_channels': list(self.subscribed_channels), + 'use_testnet': self.use_testnet, + 'authenticated': bool(self.api_key and self.api_secret), + 'next_request_id': self.request_id + } + + base_stats.update(gateio_stats) + return base_stats \ No newline at end of file diff --git a/COBY/connectors/kucoin_connector.py b/COBY/connectors/kucoin_connector.py new file mode 100644 index 0000000..f374c33 --- /dev/null +++ b/COBY/connectors/kucoin_connector.py @@ -0,0 +1,776 @@ +""" +KuCoin exchange connector implementation. +Supports WebSocket connections to KuCoin with proper token-based authentication. +""" + +import json +import hmac +import hashlib +import base64 +import time +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class KuCoinConnector(BaseExchangeConnector): + """ + KuCoin WebSocket connector implementation. + + Supports: + - Token-based authentication + - Order book streams + - Trade streams + - Symbol normalization + - Bullet connection protocol + """ + + # KuCoin API URLs + API_URL = "https://api.kucoin.com" + SANDBOX_API_URL = "https://openapi-sandbox.kucoin.com" + + def __init__(self, use_sandbox: bool = False, api_key: str = None, + api_secret: str = None, passphrase: str = None): + """ + Initialize KuCoin connector. + + Args: + use_sandbox: Whether to use sandbox environment + api_key: API key for authentication (optional) + api_secret: API secret for authentication (optional) + passphrase: API passphrase for authentication (optional) + """ + # KuCoin requires getting WebSocket URL from REST API + super().__init__("kucoin", "") # URL will be set after token retrieval + + # Authentication credentials (optional) + self.api_key = api_key + self.api_secret = api_secret + self.passphrase = passphrase + self.use_sandbox = use_sandbox + + # KuCoin-specific attributes + self.token = None + self.connect_id = None + self.ping_interval = 18000 # 18 seconds (KuCoin requirement) + self.ping_timeout = 10000 # 10 seconds + + # KuCoin-specific message handlers + self.message_handlers.update({ + 'message': self._handle_data_message, + 'welcome': self._handle_welcome_message, + 'ack': self._handle_ack_message, + 'error': self._handle_error_message, + 'pong': self._handle_pong_message + }) + + # Subscription tracking + self.subscribed_topics = set() + self.subscription_id = 1 + + logger.info(f"KuCoin connector initialized ({'sandbox' if use_sandbox else 'live'})") + + def _get_message_type(self, data: Dict) -> str: + """ + Determine message type from KuCoin message data. + + Args: + data: Parsed message data + + Returns: + str: Message type identifier + """ + # KuCoin message format + if 'type' in data: + return data['type'] # 'message', 'welcome', 'ack', 'error', 'pong' + elif 'subject' in data: + # Data message with subject + return 'message' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to KuCoin format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: KuCoin symbol format (e.g., 'BTC-USDT') + """ + # KuCoin uses dash-separated format + if symbol.upper() == 'BTCUSDT': + return 'BTC-USDT' + elif symbol.upper() == 'ETHUSDT': + return 'ETH-USDT' + elif symbol.upper().endswith('USDT'): + base = symbol[:-4].upper() + return f"{base}-USDT" + elif symbol.upper().endswith('USD'): + base = symbol[:-3].upper() + return f"{base}-USD" + else: + # Assume it's already in correct format or add dash + if '-' not in symbol: + # Try to split common patterns + if len(symbol) >= 6: + # Assume last 4 chars are quote currency + base = symbol[:-4].upper() + quote = symbol[-4:].upper() + return f"{base}-{quote}" + else: + return symbol.upper() + else: + return symbol.upper() + + def _denormalize_symbol(self, kucoin_symbol: str) -> str: + """ + Convert KuCoin symbol back to standard format. + + Args: + kucoin_symbol: KuCoin symbol format (e.g., 'BTC-USDT') + + Returns: + str: Standard symbol format (e.g., 'BTCUSDT') + """ + if '-' in kucoin_symbol: + return kucoin_symbol.replace('-', '') + return kucoin_symbol + + async def _get_websocket_token(self) -> Optional[Dict[str, Any]]: + """ + Get WebSocket connection token from KuCoin REST API. + + Returns: + Dict: Token information including WebSocket URL + """ + try: + import aiohttp + + api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL + endpoint = "/api/v1/bullet-public" + + # Use private endpoint if authenticated + if self.api_key and self.api_secret and self.passphrase: + endpoint = "/api/v1/bullet-private" + headers = self._get_auth_headers("POST", endpoint, "") + else: + headers = {} + + async with aiohttp.ClientSession() as session: + async with session.post(f"{api_url}{endpoint}", headers=headers) as response: + if response.status == 200: + data = await response.json() + + if data.get('code') != '200000': + logger.error(f"KuCoin token error: {data.get('msg')}") + return None + + return data.get('data') + else: + logger.error(f"Failed to get KuCoin token: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting KuCoin WebSocket token: {e}") + return None + + async def connect(self) -> bool: + """Override connect to get token first.""" + try: + # Get WebSocket token and URL + token_data = await self._get_websocket_token() + if not token_data: + logger.error("Failed to get KuCoin WebSocket token") + return False + + self.token = token_data.get('token') + servers = token_data.get('instanceServers', []) + + if not servers: + logger.error("No KuCoin WebSocket servers available") + return False + + # Use first available server + server = servers[0] + self.websocket_url = f"{server['endpoint']}?token={self.token}&connectId={int(time.time() * 1000)}" + self.ping_interval = server.get('pingInterval', 18000) + self.ping_timeout = server.get('pingTimeout', 10000) + + logger.info(f"KuCoin WebSocket URL: {server['endpoint']}") + + # Now connect using the base connector method + return await super().connect() + + except Exception as e: + logger.error(f"Error connecting to KuCoin: {e}") + return False + + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + kucoin_symbol = self.normalize_symbol(symbol) + topic = f"/market/level2:{kucoin_symbol}" + + # Create subscription message + subscription_msg = { + "id": str(self.subscription_id), + "type": "subscribe", + "topic": topic, + "privateChannel": False, + "response": True + } + self.subscription_id += 1 + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.subscribed_topics.add(topic) + + logger.info(f"Subscribed to order book for {symbol} ({kucoin_symbol}) on KuCoin") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on KuCoin") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + kucoin_symbol = self.normalize_symbol(symbol) + topic = f"/market/match:{kucoin_symbol}" + + # Create subscription message + subscription_msg = { + "id": str(self.subscription_id), + "type": "subscribe", + "topic": topic, + "privateChannel": False, + "response": True + } + self.subscription_id += 1 + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.subscribed_topics.add(topic) + + logger.info(f"Subscribed to trades for {symbol} ({kucoin_symbol}) on KuCoin") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on KuCoin") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + kucoin_symbol = self.normalize_symbol(symbol) + topic = f"/market/level2:{kucoin_symbol}" + + # Create unsubscription message + unsubscription_msg = { + "id": str(self.subscription_id), + "type": "unsubscribe", + "topic": topic, + "privateChannel": False, + "response": True + } + self.subscription_id += 1 + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_topics.discard(topic) + + logger.info(f"Unsubscribed from order book for {symbol} ({kucoin_symbol}) on KuCoin") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on KuCoin") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + kucoin_symbol = self.normalize_symbol(symbol) + topic = f"/market/match:{kucoin_symbol}" + + # Create unsubscription message + unsubscription_msg = { + "id": str(self.subscription_id), + "type": "unsubscribe", + "topic": topic, + "privateChannel": False, + "response": True + } + self.subscription_id += 1 + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_topics.discard(topic) + + logger.info(f"Unsubscribed from trades for {symbol} ({kucoin_symbol}) on KuCoin") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on KuCoin") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols from KuCoin. + + Returns: + List[str]: List of available symbols in standard format + """ + try: + import aiohttp + + api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL + + async with aiohttp.ClientSession() as session: + async with session.get(f"{api_url}/api/v1/symbols") as response: + if response.status == 200: + data = await response.json() + + if data.get('code') != '200000': + logger.error(f"KuCoin API error: {data.get('msg')}") + return [] + + symbols = [] + symbol_data = data.get('data', []) + + for symbol_info in symbol_data: + if symbol_info.get('enableTrading'): + symbol = symbol_info.get('symbol', '') + # Convert to standard format + standard_symbol = self._denormalize_symbol(symbol) + symbols.append(standard_symbol) + + logger.info(f"Retrieved {len(symbols)} symbols from KuCoin") + return symbols + else: + logger.error(f"Failed to get symbols from KuCoin: HTTP {response.status}") + return [] + + except Exception as e: + logger.error(f"Error getting symbols from KuCoin: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot from KuCoin REST API. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + try: + import aiohttp + + kucoin_symbol = self.normalize_symbol(symbol) + api_url = self.SANDBOX_API_URL if self.use_sandbox else self.API_URL + + url = f"{api_url}/api/v1/market/orderbook/level2_20" + params = {'symbol': kucoin_symbol} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + + if data.get('code') != '200000': + logger.error(f"KuCoin API error: {data.get('msg')}") + return None + + result = data.get('data', {}) + return self._parse_orderbook_snapshot(result, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """ + Parse KuCoin order book data into OrderBookSnapshot. + + Args: + data: Raw KuCoin order book data + symbol: Trading symbol + + Returns: + OrderBookSnapshot: Parsed order book + """ + try: + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(data.get('time', 0)) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=int(data.get('sequence', 0)) + ) + + return orderbook + + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_data_message(self, data: Dict) -> None: + """ + Handle data message from KuCoin. + + Args: + data: Data message + """ + try: + set_correlation_id() + + subject = data.get('subject', '') + topic = data.get('topic', '') + message_data = data.get('data', {}) + + if 'level2' in subject: + await self._handle_orderbook_update(data) + elif 'match' in subject: + await self._handle_trade_update(data) + else: + logger.debug(f"Unhandled KuCoin subject: {subject}") + + except Exception as e: + logger.error(f"Error handling data message: {e}") + + async def _handle_orderbook_update(self, data: Dict) -> None: + """ + Handle order book update from KuCoin. + + Args: + data: Order book update data + """ + try: + topic = data.get('topic', '') + if not topic: + logger.warning("Order book update missing topic") + return + + # Extract symbol from topic: /market/level2:BTC-USDT + parts = topic.split(':') + if len(parts) < 2: + logger.warning("Invalid order book topic format") + return + + kucoin_symbol = parts[1] + symbol = self._denormalize_symbol(kucoin_symbol) + + message_data = data.get('data', {}) + changes = message_data.get('changes', {}) + + # Parse bids and asks changes + bids = [] + for bid_data in changes.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in changes.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(message_data.get('time', 0)) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=int(message_data.get('sequenceEnd', 0)) + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") + + async def _handle_trade_update(self, data: Dict) -> None: + """ + Handle trade update from KuCoin. + + Args: + data: Trade update data + """ + try: + topic = data.get('topic', '') + if not topic: + logger.warning("Trade update missing topic") + return + + # Extract symbol from topic: /market/match:BTC-USDT + parts = topic.split(':') + if len(parts) < 2: + logger.warning("Invalid trade topic format") + return + + kucoin_symbol = parts[1] + symbol = self._denormalize_symbol(kucoin_symbol) + + message_data = data.get('data', {}) + + price = float(message_data.get('price', 0)) + size = float(message_data.get('size', 0)) + + # Validate data + if not validate_price(price) or not validate_volume(size): + logger.warning(f"Invalid trade data: price={price}, size={size}") + return + + # Determine side (KuCoin uses 'side' field) + side = message_data.get('side', 'unknown').lower() + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(message_data.get('time', 0)) / 1000, tz=timezone.utc), + price=price, + size=size, + side=side, + trade_id=str(message_data.get('tradeId', '')) + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") + + async def _handle_welcome_message(self, data: Dict) -> None: + """ + Handle welcome message from KuCoin. + + Args: + data: Welcome message data + """ + try: + connect_id = data.get('id') + if connect_id: + self.connect_id = connect_id + logger.info(f"KuCoin connection established with ID: {connect_id}") + + except Exception as e: + logger.error(f"Error handling welcome message: {e}") + + async def _handle_ack_message(self, data: Dict) -> None: + """ + Handle acknowledgment message from KuCoin. + + Args: + data: Ack message data + """ + try: + msg_id = data.get('id', '') + logger.debug(f"KuCoin ACK received for message ID: {msg_id}") + + except Exception as e: + logger.error(f"Error handling ack message: {e}") + + async def _handle_error_message(self, data: Dict) -> None: + """ + Handle error message from KuCoin. + + Args: + data: Error message data + """ + try: + code = data.get('code', 'unknown') + message = data.get('data', 'Unknown error') + + logger.error(f"KuCoin error {code}: {message}") + + except Exception as e: + logger.error(f"Error handling error message: {e}") + + async def _handle_pong_message(self, data: Dict) -> None: + """ + Handle pong message from KuCoin. + + Args: + data: Pong message data + """ + logger.debug("Received KuCoin pong") + + def _get_auth_headers(self, method: str, endpoint: str, body: str) -> Dict[str, str]: + """ + Generate authentication headers for KuCoin API. + + Args: + method: HTTP method + endpoint: API endpoint + body: Request body + + Returns: + Dict: Authentication headers + """ + if not all([self.api_key, self.api_secret, self.passphrase]): + return {} + + try: + timestamp = str(int(time.time() * 1000)) + + # Create signature string + str_to_sign = timestamp + method + endpoint + body + signature = base64.b64encode( + hmac.new( + self.api_secret.encode('utf-8'), + str_to_sign.encode('utf-8'), + hashlib.sha256 + ).digest() + ).decode('utf-8') + + # Create passphrase signature + passphrase_signature = base64.b64encode( + hmac.new( + self.api_secret.encode('utf-8'), + self.passphrase.encode('utf-8'), + hashlib.sha256 + ).digest() + ).decode('utf-8') + + return { + 'KC-API-SIGN': signature, + 'KC-API-TIMESTAMP': timestamp, + 'KC-API-KEY': self.api_key, + 'KC-API-PASSPHRASE': passphrase_signature, + 'KC-API-KEY-VERSION': '2', + 'Content-Type': 'application/json' + } + + except Exception as e: + logger.error(f"Error generating auth headers: {e}") + return {} + + async def _send_ping(self) -> None: + """Send ping to keep connection alive.""" + try: + ping_msg = { + "id": str(self.subscription_id), + "type": "ping" + } + self.subscription_id += 1 + + await self._send_message(ping_msg) + logger.debug("Sent ping to KuCoin") + + except Exception as e: + logger.error(f"Error sending ping: {e}") + + def get_kucoin_stats(self) -> Dict[str, Any]: + """Get KuCoin-specific statistics.""" + base_stats = self.get_stats() + + kucoin_stats = { + 'subscribed_topics': list(self.subscribed_topics), + 'use_sandbox': self.use_sandbox, + 'authenticated': bool(self.api_key and self.api_secret and self.passphrase), + 'connect_id': self.connect_id, + 'token_available': bool(self.token), + 'next_subscription_id': self.subscription_id + } + + base_stats.update(kucoin_stats) + return base_stats \ No newline at end of file diff --git a/COBY/connectors/mexc_connector.py b/COBY/connectors/mexc_connector.py new file mode 100644 index 0000000..b0396c3 --- /dev/null +++ b/COBY/connectors/mexc_connector.py @@ -0,0 +1,282 @@ +""" +MEXC exchange connector implementation. +Supports WebSocket connections to MEXC with their WebSocket streams. +""" + +import json +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class MEXCConnector(BaseExchangeConnector): + """ + MEXC WebSocket connector implementation. + + Supports: + - Order book streams + - Trade streams + - Symbol normalization + """ + + # MEXC WebSocket URLs + WEBSOCKET_URL = "wss://wbs.mexc.com/ws" + API_URL = "https://api.mexc.com" + + def __init__(self, api_key: str = None, api_secret: str = None): + """Initialize MEXC connector.""" + super().__init__("mexc", self.WEBSOCKET_URL) + + self.api_key = api_key + self.api_secret = api_secret + + # MEXC-specific message handlers + self.message_handlers.update({ + 'spot@public.deals.v3.api': self._handle_trade_update, + 'spot@public.increase.depth.v3.api': self._handle_orderbook_update, + 'spot@public.limit.depth.v3.api': self._handle_orderbook_snapshot, + 'pong': self._handle_pong + }) + + # Subscription tracking + self.subscribed_streams = set() + self.request_id = 1 + + logger.info("MEXC connector initialized") + + def _get_message_type(self, data: Dict) -> str: + """Determine message type from MEXC message data.""" + if 'c' in data: # Channel + return data['c'] + elif 'msg' in data: + return 'message' + elif 'pong' in data: + return 'pong' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """Normalize symbol to MEXC format.""" + # MEXC uses uppercase without separators (same as Binance) + normalized = symbol.upper().replace('-', '').replace('/', '') + + if not validate_symbol(normalized): + raise ValidationError(f"Invalid symbol format: {symbol}", "INVALID_SYMBOL") + + return normalized + + async def subscribe_orderbook(self, symbol: str) -> None: + """Subscribe to order book updates for a symbol.""" + try: + set_correlation_id() + mexc_symbol = self.normalize_symbol(symbol) + + subscription_msg = { + "method": "SUBSCRIPTION", + "params": [f"spot@public.limit.depth.v3.api@{mexc_symbol}@20"] + } + + success = await self._send_message(subscription_msg) + if success: + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.subscribed_streams.add(f"spot@public.limit.depth.v3.api@{mexc_symbol}@20") + logger.info(f"Subscribed to order book for {symbol} ({mexc_symbol}) on MEXC") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on MEXC") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """Subscribe to trade updates for a symbol.""" + try: + set_correlation_id() + mexc_symbol = self.normalize_symbol(symbol) + + subscription_msg = { + "method": "SUBSCRIPTION", + "params": [f"spot@public.deals.v3.api@{mexc_symbol}"] + } + + success = await self._send_message(subscription_msg) + if success: + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.subscribed_streams.add(f"spot@public.deals.v3.api@{mexc_symbol}") + logger.info(f"Subscribed to trades for {symbol} ({mexc_symbol}) on MEXC") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on MEXC") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """Unsubscribe from order book updates.""" + try: + mexc_symbol = self.normalize_symbol(symbol) + + unsubscription_msg = { + "method": "UNSUBSCRIPTION", + "params": [f"spot@public.limit.depth.v3.api@{mexc_symbol}@20"] + } + + success = await self._send_message(unsubscription_msg) + if success: + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_streams.discard(f"spot@public.limit.depth.v3.api@{mexc_symbol}@20") + logger.info(f"Unsubscribed from order book for {symbol} on MEXC") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """Unsubscribe from trade updates.""" + try: + mexc_symbol = self.normalize_symbol(symbol) + + unsubscription_msg = { + "method": "UNSUBSCRIPTION", + "params": [f"spot@public.deals.v3.api@{mexc_symbol}"] + } + + success = await self._send_message(unsubscription_msg) + if success: + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_streams.discard(f"spot@public.deals.v3.api@{mexc_symbol}") + logger.info(f"Unsubscribed from trades for {symbol} on MEXC") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """Get available symbols from MEXC.""" + try: + import aiohttp + + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.API_URL}/api/v3/exchangeInfo") as response: + if response.status == 200: + data = await response.json() + symbols = [ + symbol_info['symbol'] + for symbol_info in data.get('symbols', []) + if symbol_info.get('status') == 'TRADING' + ] + logger.info(f"Retrieved {len(symbols)} symbols from MEXC") + return symbols + else: + logger.error(f"Failed to get symbols from MEXC: HTTP {response.status}") + return [] + except Exception as e: + logger.error(f"Error getting symbols from MEXC: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """Get order book snapshot from MEXC REST API.""" + try: + import aiohttp + + mexc_symbol = self.normalize_symbol(symbol) + url = f"{self.API_URL}/api/v3/depth" + params = {'symbol': mexc_symbol, 'limit': min(depth, 5000)} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + return self._parse_orderbook_snapshot(data, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """Parse MEXC order book data.""" + try: + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + return OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks, + sequence_id=data.get('lastUpdateId') + ) + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_orderbook_update(self, data: Dict) -> None: + """Handle order book update from MEXC.""" + # Implementation would parse MEXC-specific order book update format + logger.debug("Received MEXC order book update") + + async def _handle_orderbook_snapshot(self, data: Dict) -> None: + """Handle order book snapshot from MEXC.""" + # Implementation would parse MEXC-specific order book snapshot format + logger.debug("Received MEXC order book snapshot") + + async def _handle_trade_update(self, data: Dict) -> None: + """Handle trade update from MEXC.""" + # Implementation would parse MEXC-specific trade format + logger.debug("Received MEXC trade update") + + async def _handle_pong(self, data: Dict) -> None: + """Handle pong response from MEXC.""" + logger.debug("Received MEXC pong") + + def get_mexc_stats(self) -> Dict[str, Any]: + """Get MEXC-specific statistics.""" + base_stats = self.get_stats() + + mexc_stats = { + 'subscribed_streams': list(self.subscribed_streams), + 'authenticated': bool(self.api_key and self.api_secret), + 'next_request_id': self.request_id + } + + base_stats.update(mexc_stats) + return base_stats \ No newline at end of file