""" OKX exchange connector implementation. Supports WebSocket connections to OKX with their V5 API WebSocket streams. """ import json import hmac import hashlib import base64 import time from typing import Dict, List, Optional, Any from datetime import datetime, timezone from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel from ..utils.logging import get_logger, set_correlation_id from ..utils.exceptions import ValidationError, ConnectionError from ..utils.validation import validate_symbol, validate_price, validate_volume from .base_connector import BaseExchangeConnector logger = get_logger(__name__) class OKXConnector(BaseExchangeConnector): """ OKX WebSocket connector implementation. Supports: - V5 API WebSocket streams - Order book streams - Trade streams - Symbol normalization - Authentication for private channels """ # OKX WebSocket URLs WEBSOCKET_URL = "wss://ws.okx.com:8443/ws/v5/public" WEBSOCKET_PRIVATE_URL = "wss://ws.okx.com:8443/ws/v5/private" DEMO_WEBSOCKET_URL = "wss://wspap.okx.com:8443/ws/v5/public?brokerId=9999" API_URL = "https://www.okx.com" def __init__(self, use_demo: bool = False, api_key: str = None, api_secret: str = None, passphrase: str = None): """ Initialize OKX connector. Args: use_demo: Whether to use demo environment api_key: API key for authentication (optional) api_secret: API secret for authentication (optional) passphrase: API passphrase for authentication (optional) """ websocket_url = self.DEMO_WEBSOCKET_URL if use_demo else self.WEBSOCKET_URL super().__init__("okx", websocket_url) # Authentication credentials (optional) self.api_key = api_key self.api_secret = api_secret self.passphrase = passphrase self.use_demo = use_demo # OKX-specific message handlers self.message_handlers.update({ 'books': self._handle_orderbook_update, 'trades': self._handle_trade_update, 'error': self._handle_error_message, 'subscribe': self._handle_subscription_response, 'unsubscribe': self._handle_subscription_response }) # Subscription tracking self.subscribed_channels = set() logger.info(f"OKX connector initialized ({'demo' if use_demo else 'live'})") def _get_message_type(self, data: Dict) -> str: """ Determine message type from OKX message data. Args: data: Parsed message data Returns: str: Message type identifier """ # OKX V5 API message format if 'event' in data: return data['event'] # 'subscribe', 'unsubscribe', 'error' elif 'arg' in data and 'data' in data: # Data message channel = data['arg'].get('channel', '') return channel elif 'op' in data: return data['op'] # 'ping', 'pong' return 'unknown' def normalize_symbol(self, symbol: str) -> str: """ Normalize symbol to OKX format. Args: symbol: Standard symbol format (e.g., 'BTCUSDT') Returns: str: OKX symbol format (e.g., 'BTC-USDT') """ # OKX uses dash-separated format if symbol.upper() == 'BTCUSDT': return 'BTC-USDT' elif symbol.upper() == 'ETHUSDT': return 'ETH-USDT' elif symbol.upper().endswith('USDT'): base = symbol[:-4].upper() return f"{base}-USDT" elif symbol.upper().endswith('USD'): base = symbol[:-3].upper() return f"{base}-USD" else: # Assume it's already in correct format or add dash if '-' not in symbol: # Try to split common patterns if len(symbol) >= 6: # Assume last 4 chars are quote currency base = symbol[:-4].upper() quote = symbol[-4:].upper() return f"{base}-{quote}" else: return symbol.upper() else: return symbol.upper() def _denormalize_symbol(self, okx_symbol: str) -> str: """ Convert OKX symbol back to standard format. Args: okx_symbol: OKX symbol format (e.g., 'BTC-USDT') Returns: str: Standard symbol format (e.g., 'BTCUSDT') """ if '-' in okx_symbol: return okx_symbol.replace('-', '') return okx_symbol async def subscribe_orderbook(self, symbol: str) -> None: """ Subscribe to order book updates for a symbol. Args: symbol: Trading symbol (e.g., 'BTCUSDT') """ try: set_correlation_id() okx_symbol = self.normalize_symbol(symbol) # Create subscription message subscription_msg = { "op": "subscribe", "args": [ { "channel": "books", "instId": okx_symbol } ] } # Send subscription success = await self._send_message(subscription_msg) if success: # Track subscription if symbol not in self.subscriptions: self.subscriptions[symbol] = [] if 'orderbook' not in self.subscriptions[symbol]: self.subscriptions[symbol].append('orderbook') self.subscribed_channels.add(f"books:{okx_symbol}") logger.info(f"Subscribed to order book for {symbol} ({okx_symbol}) on OKX") else: logger.error(f"Failed to subscribe to order book for {symbol} on OKX") except Exception as e: logger.error(f"Error subscribing to order book for {symbol}: {e}") raise async def subscribe_trades(self, symbol: str) -> None: """ Subscribe to trade updates for a symbol. Args: symbol: Trading symbol (e.g., 'BTCUSDT') """ try: set_correlation_id() okx_symbol = self.normalize_symbol(symbol) # Create subscription message subscription_msg = { "op": "subscribe", "args": [ { "channel": "trades", "instId": okx_symbol } ] } # Send subscription success = await self._send_message(subscription_msg) if success: # Track subscription if symbol not in self.subscriptions: self.subscriptions[symbol] = [] if 'trades' not in self.subscriptions[symbol]: self.subscriptions[symbol].append('trades') self.subscribed_channels.add(f"trades:{okx_symbol}") logger.info(f"Subscribed to trades for {symbol} ({okx_symbol}) on OKX") else: logger.error(f"Failed to subscribe to trades for {symbol} on OKX") except Exception as e: logger.error(f"Error subscribing to trades for {symbol}: {e}") raise async def unsubscribe_orderbook(self, symbol: str) -> None: """ Unsubscribe from order book updates for a symbol. Args: symbol: Trading symbol (e.g., 'BTCUSDT') """ try: okx_symbol = self.normalize_symbol(symbol) # Create unsubscription message unsubscription_msg = { "op": "unsubscribe", "args": [ { "channel": "books", "instId": okx_symbol } ] } # Send unsubscription success = await self._send_message(unsubscription_msg) if success: # Remove from tracking if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: self.subscriptions[symbol].remove('orderbook') if not self.subscriptions[symbol]: del self.subscriptions[symbol] self.subscribed_channels.discard(f"books:{okx_symbol}") logger.info(f"Unsubscribed from order book for {symbol} ({okx_symbol}) on OKX") else: logger.error(f"Failed to unsubscribe from order book for {symbol} on OKX") except Exception as e: logger.error(f"Error unsubscribing from order book for {symbol}: {e}") raise async def unsubscribe_trades(self, symbol: str) -> None: """ Unsubscribe from trade updates for a symbol. Args: symbol: Trading symbol (e.g., 'BTCUSDT') """ try: okx_symbol = self.normalize_symbol(symbol) # Create unsubscription message unsubscription_msg = { "op": "unsubscribe", "args": [ { "channel": "trades", "instId": okx_symbol } ] } # Send unsubscription success = await self._send_message(unsubscription_msg) if success: # Remove from tracking if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: self.subscriptions[symbol].remove('trades') if not self.subscriptions[symbol]: del self.subscriptions[symbol] self.subscribed_channels.discard(f"trades:{okx_symbol}") logger.info(f"Unsubscribed from trades for {symbol} ({okx_symbol}) on OKX") else: logger.error(f"Failed to unsubscribe from trades for {symbol} on OKX") except Exception as e: logger.error(f"Error unsubscribing from trades for {symbol}: {e}") raise async def get_symbols(self) -> List[str]: """ Get list of available trading symbols from OKX. Returns: List[str]: List of available symbols in standard format """ try: import aiohttp api_url = "https://www.okx.com" async with aiohttp.ClientSession() as session: async with session.get(f"{api_url}/api/v5/public/instruments", params={"instType": "SPOT"}) as response: if response.status == 200: data = await response.json() if data.get('code') != '0': logger.error(f"OKX API error: {data.get('msg')}") return [] symbols = [] instruments = data.get('data', []) for instrument in instruments: if instrument.get('state') == 'live': inst_id = instrument.get('instId', '') # Convert to standard format standard_symbol = self._denormalize_symbol(inst_id) symbols.append(standard_symbol) logger.info(f"Retrieved {len(symbols)} symbols from OKX") return symbols else: logger.error(f"Failed to get symbols from OKX: HTTP {response.status}") return [] except Exception as e: logger.error(f"Error getting symbols from OKX: {e}") return [] async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: """ Get current order book snapshot from OKX REST API. Args: symbol: Trading symbol depth: Number of price levels to retrieve Returns: OrderBookSnapshot: Current order book or None if unavailable """ try: import aiohttp okx_symbol = self.normalize_symbol(symbol) api_url = "https://www.okx.com" # OKX supports depths up to 400 api_depth = min(depth, 400) url = f"{api_url}/api/v5/market/books" params = { 'instId': okx_symbol, 'sz': api_depth } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() if data.get('code') != '0': logger.error(f"OKX API error: {data.get('msg')}") return None result_data = data.get('data', []) if result_data: return self._parse_orderbook_snapshot(result_data[0], symbol) else: return None else: logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") return None except Exception as e: logger.error(f"Error getting order book snapshot for {symbol}: {e}") return None def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: """ Parse OKX order book data into OrderBookSnapshot. Args: data: Raw OKX order book data symbol: Trading symbol Returns: OrderBookSnapshot: Parsed order book """ try: # Parse bids and asks bids = [] for bid_data in data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if validate_price(price) and validate_volume(size): bids.append(PriceLevel(price=price, size=size)) asks = [] for ask_data in data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if validate_price(price) and validate_volume(size): asks.append(PriceLevel(price=price, size=size)) # Create order book snapshot orderbook = OrderBookSnapshot( symbol=symbol, exchange=self.exchange_name, timestamp=datetime.fromtimestamp(int(data.get('ts', 0)) / 1000, tz=timezone.utc), bids=bids, asks=asks, sequence_id=int(data.get('seqId', 0)) ) return orderbook except Exception as e: logger.error(f"Error parsing order book snapshot: {e}") raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") async def _handle_orderbook_update(self, data: Dict) -> None: """ Handle order book update from OKX. Args: data: Order book update data """ try: set_correlation_id() # Extract symbol from arg arg = data.get('arg', {}) okx_symbol = arg.get('instId', '') if not okx_symbol: logger.warning("Order book update missing instId") return symbol = self._denormalize_symbol(okx_symbol) # Process each data item for book_data in data.get('data', []): # Parse bids and asks bids = [] for bid_data in book_data.get('bids', []): price = float(bid_data[0]) size = float(bid_data[1]) if validate_price(price) and validate_volume(size): bids.append(PriceLevel(price=price, size=size)) asks = [] for ask_data in book_data.get('asks', []): price = float(ask_data[0]) size = float(ask_data[1]) if validate_price(price) and validate_volume(size): asks.append(PriceLevel(price=price, size=size)) # Create order book snapshot orderbook = OrderBookSnapshot( symbol=symbol, exchange=self.exchange_name, timestamp=datetime.fromtimestamp(int(book_data.get('ts', 0)) / 1000, tz=timezone.utc), bids=bids, asks=asks, sequence_id=int(book_data.get('seqId', 0)) ) # Notify callbacks self._notify_data_callbacks(orderbook) logger.debug(f"Processed order book update for {symbol}") except Exception as e: logger.error(f"Error handling order book update: {e}") async def _handle_trade_update(self, data: Dict) -> None: """ Handle trade update from OKX. Args: data: Trade update data """ try: set_correlation_id() # Extract symbol from arg arg = data.get('arg', {}) okx_symbol = arg.get('instId', '') if not okx_symbol: logger.warning("Trade update missing instId") return symbol = self._denormalize_symbol(okx_symbol) # Process each trade for trade_data in data.get('data', []): price = float(trade_data.get('px', 0)) size = float(trade_data.get('sz', 0)) # Validate data if not validate_price(price) or not validate_volume(size): logger.warning(f"Invalid trade data: price={price}, size={size}") continue # Determine side (OKX uses 'side' field) side = trade_data.get('side', 'unknown').lower() # Create trade event trade = TradeEvent( symbol=symbol, exchange=self.exchange_name, timestamp=datetime.fromtimestamp(int(trade_data.get('ts', 0)) / 1000, tz=timezone.utc), price=price, size=size, side=side, trade_id=str(trade_data.get('tradeId', '')) ) # Notify callbacks self._notify_data_callbacks(trade) logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}") except Exception as e: logger.error(f"Error handling trade update: {e}") async def _handle_subscription_response(self, data: Dict) -> None: """ Handle subscription response from OKX. Args: data: Subscription response data """ try: event = data.get('event', '') arg = data.get('arg', {}) channel = arg.get('channel', '') inst_id = arg.get('instId', '') if event == 'subscribe': logger.info(f"OKX subscription confirmed: {channel} for {inst_id}") elif event == 'unsubscribe': logger.info(f"OKX unsubscription confirmed: {channel} for {inst_id}") elif event == 'error': error_msg = data.get('msg', 'Unknown error') logger.error(f"OKX subscription error: {error_msg}") except Exception as e: logger.error(f"Error handling subscription response: {e}") async def _handle_error_message(self, data: Dict) -> None: """ Handle error message from OKX. Args: data: Error message data """ error_code = data.get('code', 'unknown') error_msg = data.get('msg', 'Unknown error') logger.error(f"OKX error {error_code}: {error_msg}") # Handle specific error codes if error_code == '60012': logger.error("Invalid request - check parameters") elif error_code == '60013': logger.error("Invalid channel - check channel name") def _get_auth_headers(self, timestamp: str, method: str = "GET", request_path: str = "/users/self/verify") -> Dict[str, str]: """ Generate authentication headers for OKX API. Args: timestamp: Current timestamp method: HTTP method request_path: Request path Returns: Dict: Authentication headers """ if not all([self.api_key, self.api_secret, self.passphrase]): return {} try: # Create signature message = timestamp + method + request_path signature = base64.b64encode( hmac.new( self.api_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).digest() ).decode('utf-8') # Create passphrase signature passphrase_signature = base64.b64encode( hmac.new( self.api_secret.encode('utf-8'), self.passphrase.encode('utf-8'), hashlib.sha256 ).digest() ).decode('utf-8') return { 'OK-ACCESS-KEY': self.api_key, 'OK-ACCESS-SIGN': signature, 'OK-ACCESS-TIMESTAMP': timestamp, 'OK-ACCESS-PASSPHRASE': passphrase_signature } except Exception as e: logger.error(f"Error generating auth headers: {e}") return {} async def _send_ping(self) -> None: """Send ping to keep connection alive.""" try: ping_msg = {"op": "ping"} await self._send_message(ping_msg) logger.debug("Sent ping to OKX") except Exception as e: logger.error(f"Error sending ping: {e}") def get_okx_stats(self) -> Dict[str, Any]: """Get OKX-specific statistics.""" base_stats = self.get_stats() okx_stats = { 'subscribed_channels': list(self.subscribed_channels), 'use_demo': self.use_demo, 'authenticated': bool(self.api_key and self.api_secret and self.passphrase) } base_stats.update(okx_stats) return base_stats