diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index a63432e..ded08ab 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -115,6 +115,9 @@ - [x] 10. Build historical data replay system - Create replay manager for historical data playback - Implement configurable playback speeds and time range selection + + + - Create replay session management with start/pause/stop controls - Implement data streaming interface compatible with live data format - Add replay status monitoring and progress tracking diff --git a/COBY/connectors/__init__.py b/COBY/connectors/__init__.py index d2dae8e..472f677 100644 --- a/COBY/connectors/__init__.py +++ b/COBY/connectors/__init__.py @@ -3,11 +3,17 @@ Exchange connector implementations for the COBY system. """ from .base_connector import BaseExchangeConnector +from .binance_connector import BinanceConnector +from .coinbase_connector import CoinbaseConnector +from .kraken_connector import KrakenConnector from .connection_manager import ConnectionManager from .circuit_breaker import CircuitBreaker __all__ = [ 'BaseExchangeConnector', + 'BinanceConnector', + 'CoinbaseConnector', + 'KrakenConnector', 'ConnectionManager', 'CircuitBreaker' ] \ No newline at end of file diff --git a/COBY/connectors/coinbase_connector.py b/COBY/connectors/coinbase_connector.py new file mode 100644 index 0000000..e9cfd2a --- /dev/null +++ b/COBY/connectors/coinbase_connector.py @@ -0,0 +1,650 @@ +""" +Coinbase Pro exchange connector implementation. +Supports WebSocket connections to Coinbase Pro (now Coinbase Advanced Trade). +""" + +import json +import hmac +import hashlib +import base64 +import time +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class CoinbaseConnector(BaseExchangeConnector): + """ + Coinbase Pro WebSocket connector implementation. + + Supports: + - Order book level2 streams + - Trade streams (matches) + - Symbol normalization + - Authentication for private channels (if needed) + """ + + # Coinbase Pro WebSocket URLs + WEBSOCKET_URL = "wss://ws-feed.exchange.coinbase.com" + SANDBOX_URL = "wss://ws-feed-public.sandbox.exchange.coinbase.com" + API_URL = "https://api.exchange.coinbase.com" + + def __init__(self, use_sandbox: bool = False, api_key: str = None, + api_secret: str = None, passphrase: str = None): + """ + Initialize Coinbase connector. + + Args: + use_sandbox: Whether to use sandbox environment + api_key: API key for authentication (optional) + api_secret: API secret for authentication (optional) + passphrase: API passphrase for authentication (optional) + """ + websocket_url = self.SANDBOX_URL if use_sandbox else self.WEBSOCKET_URL + super().__init__("coinbase", websocket_url) + + # Authentication credentials (optional) + self.api_key = api_key + self.api_secret = api_secret + self.passphrase = passphrase + self.use_sandbox = use_sandbox + + # Coinbase-specific message handlers + self.message_handlers.update({ + 'l2update': self._handle_orderbook_update, + 'match': self._handle_trade_update, + 'snapshot': self._handle_orderbook_snapshot, + 'error': self._handle_error_message, + 'subscriptions': self._handle_subscription_response + }) + + # Channel management + self.subscribed_channels = set() + self.product_ids = set() + + logger.info(f"Coinbase connector initialized ({'sandbox' if use_sandbox else 'production'})") + + def _get_message_type(self, data: Dict) -> str: + """ + Determine message type from Coinbase message data. + + Args: + data: Parsed message data + + Returns: + str: Message type identifier + """ + # Coinbase uses 'type' field for message type + return data.get('type', 'unknown') + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to Coinbase format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: Coinbase product ID format (e.g., 'BTC-USD') + """ + # Convert standard format to Coinbase product ID + if symbol.upper() == 'BTCUSDT': + return 'BTC-USD' + elif symbol.upper() == 'ETHUSDT': + return 'ETH-USD' + elif symbol.upper() == 'ADAUSDT': + return 'ADA-USD' + elif symbol.upper() == 'DOTUSDT': + return 'DOT-USD' + elif symbol.upper() == 'LINKUSDT': + return 'LINK-USD' + else: + # Generic conversion: BTCUSDT -> BTC-USD + if symbol.endswith('USDT'): + base = symbol[:-4] + return f"{base}-USD" + elif symbol.endswith('USD'): + base = symbol[:-3] + return f"{base}-USD" + else: + # Assume it's already in correct format or try to parse + if '-' in symbol: + return symbol.upper() + else: + # Default fallback + return symbol.upper() + + def _denormalize_symbol(self, product_id: str) -> str: + """ + Convert Coinbase product ID back to standard format. + + Args: + product_id: Coinbase product ID (e.g., 'BTC-USD') + + Returns: + str: Standard symbol format (e.g., 'BTCUSDT') + """ + if '-' in product_id: + base, quote = product_id.split('-', 1) + if quote == 'USD': + return f"{base}USDT" + else: + return f"{base}{quote}" + return product_id + + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book level2 updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + product_id = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "type": "subscribe", + "product_ids": [product_id], + "channels": ["level2"] + } + + # Add authentication if credentials provided + if self.api_key and self.api_secret and self.passphrase: + subscription_msg.update(self._get_auth_headers(subscription_msg)) + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.subscribed_channels.add('level2') + self.product_ids.add(product_id) + + logger.info(f"Subscribed to order book for {symbol} ({product_id}) on Coinbase") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on Coinbase") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates (matches) for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + product_id = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "type": "subscribe", + "product_ids": [product_id], + "channels": ["matches"] + } + + # Add authentication if credentials provided + if self.api_key and self.api_secret and self.passphrase: + subscription_msg.update(self._get_auth_headers(subscription_msg)) + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.subscribed_channels.add('matches') + self.product_ids.add(product_id) + + logger.info(f"Subscribed to trades for {symbol} ({product_id}) on Coinbase") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on Coinbase") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + product_id = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "type": "unsubscribe", + "product_ids": [product_id], + "channels": ["level2"] + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.product_ids.discard(product_id) + + logger.info(f"Unsubscribed from order book for {symbol} ({product_id}) on Coinbase") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on Coinbase") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + product_id = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "type": "unsubscribe", + "product_ids": [product_id], + "channels": ["matches"] + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.product_ids.discard(product_id) + + logger.info(f"Unsubscribed from trades for {symbol} ({product_id}) on Coinbase") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on Coinbase") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols from Coinbase. + + Returns: + List[str]: List of available symbols in standard format + """ + try: + import aiohttp + + api_url = "https://api-public.sandbox.exchange.coinbase.com" if self.use_sandbox else self.API_URL + + async with aiohttp.ClientSession() as session: + async with session.get(f"{api_url}/products") as response: + if response.status == 200: + data = await response.json() + symbols = [] + + for product in data: + if product.get('status') == 'online' and product.get('trading_disabled') is False: + product_id = product.get('id', '') + # Convert to standard format + standard_symbol = self._denormalize_symbol(product_id) + symbols.append(standard_symbol) + + logger.info(f"Retrieved {len(symbols)} symbols from Coinbase") + return symbols + else: + logger.error(f"Failed to get symbols from Coinbase: HTTP {response.status}") + return [] + + except Exception as e: + logger.error(f"Error getting symbols from Coinbase: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot from Coinbase REST API. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve (Coinbase supports up to 50) + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + try: + import aiohttp + + product_id = self.normalize_symbol(symbol) + api_url = "https://api-public.sandbox.exchange.coinbase.com" if self.use_sandbox else self.API_URL + + # Coinbase supports level 1, 2, or 3 + level = 2 # Level 2 gives us aggregated order book + + url = f"{api_url}/products/{product_id}/book" + params = {'level': level} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + return self._parse_orderbook_snapshot(data, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """ + Parse Coinbase order book data into OrderBookSnapshot. + + Args: + data: Raw Coinbase order book data + symbol: Trading symbol + + Returns: + OrderBookSnapshot: Parsed order book + """ + try: + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks, + sequence_id=data.get('sequence') + ) + + return orderbook + + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + def _get_auth_headers(self, message: Dict) -> Dict[str, str]: + """ + Generate authentication headers for Coinbase Pro API. + + Args: + message: Message to authenticate + + Returns: + Dict: Authentication headers + """ + if not all([self.api_key, self.api_secret, self.passphrase]): + return {} + + try: + timestamp = str(time.time()) + message_str = json.dumps(message, separators=(',', ':')) + + # Create signature + message_to_sign = timestamp + 'GET' + '/users/self/verify' + message_str + signature = base64.b64encode( + hmac.new( + base64.b64decode(self.api_secret), + message_to_sign.encode('utf-8'), + hashlib.sha256 + ).digest() + ).decode('utf-8') + + return { + 'CB-ACCESS-KEY': self.api_key, + 'CB-ACCESS-SIGN': signature, + 'CB-ACCESS-TIMESTAMP': timestamp, + 'CB-ACCESS-PASSPHRASE': self.passphrase + } + + except Exception as e: + logger.error(f"Error generating auth headers: {e}") + return {} + + async def _handle_orderbook_snapshot(self, data: Dict) -> None: + """ + Handle order book snapshot from Coinbase. + + Args: + data: Order book snapshot data + """ + try: + set_correlation_id() + + product_id = data.get('product_id', '') + if not product_id: + logger.warning("Order book snapshot missing product_id") + return + + symbol = self._denormalize_symbol(product_id) + + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks, + sequence_id=data.get('sequence') + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book snapshot for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book snapshot: {e}") + + async def _handle_orderbook_update(self, data: Dict) -> None: + """ + Handle order book level2 update from Coinbase. + + Args: + data: Order book update data + """ + try: + set_correlation_id() + + product_id = data.get('product_id', '') + if not product_id: + logger.warning("Order book update missing product_id") + return + + symbol = self._denormalize_symbol(product_id) + + # Coinbase l2update format: changes array with [side, price, size] + changes = data.get('changes', []) + + bids = [] + asks = [] + + for change in changes: + if len(change) >= 3: + side = change[0] # 'buy' or 'sell' + price = float(change[1]) + size = float(change[2]) + + if validate_price(price) and validate_volume(size): + if side == 'buy': + bids.append(PriceLevel(price=price, size=size)) + elif side == 'sell': + asks.append(PriceLevel(price=price, size=size)) + + # Create order book update (partial snapshot) + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromisoformat(data.get('time', '').replace('Z', '+00:00')), + bids=bids, + asks=asks, + sequence_id=data.get('sequence') + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") + + async def _handle_trade_update(self, data: Dict) -> None: + """ + Handle trade (match) update from Coinbase. + + Args: + data: Trade update data + """ + try: + set_correlation_id() + + product_id = data.get('product_id', '') + if not product_id: + logger.warning("Trade update missing product_id") + return + + symbol = self._denormalize_symbol(product_id) + + price = float(data.get('price', 0)) + size = float(data.get('size', 0)) + + # Validate data + if not validate_price(price) or not validate_volume(size): + logger.warning(f"Invalid trade data: price={price}, size={size}") + return + + # Determine side (Coinbase uses 'side' field for taker side) + side = data.get('side', 'unknown') # 'buy' or 'sell' + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromisoformat(data.get('time', '').replace('Z', '+00:00')), + price=price, + size=size, + side=side, + trade_id=str(data.get('trade_id', '')) + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") + + async def _handle_subscription_response(self, data: Dict) -> None: + """ + Handle subscription confirmation from Coinbase. + + Args: + data: Subscription response data + """ + try: + channels = data.get('channels', []) + logger.info(f"Coinbase subscription confirmed for channels: {channels}") + + except Exception as e: + logger.error(f"Error handling subscription response: {e}") + + async def _handle_error_message(self, data: Dict) -> None: + """ + Handle error message from Coinbase. + + Args: + data: Error message data + """ + message = data.get('message', 'Unknown error') + reason = data.get('reason', '') + + logger.error(f"Coinbase error: {message}") + if reason: + logger.error(f"Coinbase error reason: {reason}") + + # Handle specific error types + if 'Invalid signature' in message: + logger.error("Authentication failed - check API credentials") + elif 'Product not found' in message: + logger.error("Invalid product ID - check symbol mapping") + + def get_coinbase_stats(self) -> Dict[str, Any]: + """Get Coinbase-specific statistics.""" + base_stats = self.get_stats() + + coinbase_stats = { + 'subscribed_channels': list(self.subscribed_channels), + 'product_ids': list(self.product_ids), + 'use_sandbox': self.use_sandbox, + 'authenticated': bool(self.api_key and self.api_secret and self.passphrase) + } + + base_stats.update(coinbase_stats) + return base_stats \ No newline at end of file diff --git a/COBY/connectors/kraken_connector.py b/COBY/connectors/kraken_connector.py new file mode 100644 index 0000000..74dd31b --- /dev/null +++ b/COBY/connectors/kraken_connector.py @@ -0,0 +1,708 @@ +""" +Kraken exchange connector implementation. +Supports WebSocket connections to Kraken exchange with their specific message format. +""" + +import json +import hashlib +import hmac +import base64 +import time +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ConnectionError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class KrakenConnector(BaseExchangeConnector): + """ + Kraken WebSocket connector implementation. + + Supports: + - Order book streams + - Trade streams + - Symbol normalization for Kraken format + - Authentication for private channels (if needed) + """ + + # Kraken WebSocket URLs + WEBSOCKET_URL = "wss://ws.kraken.com" + WEBSOCKET_AUTH_URL = "wss://ws-auth.kraken.com" + API_URL = "https://api.kraken.com" + + def __init__(self, api_key: str = None, api_secret: str = None): + """ + Initialize Kraken connector. + + Args: + api_key: API key for authentication (optional) + api_secret: API secret for authentication (optional) + """ + super().__init__("kraken", self.WEBSOCKET_URL) + + # Authentication credentials (optional) + self.api_key = api_key + self.api_secret = api_secret + + # Kraken-specific message handlers + self.message_handlers.update({ + 'book-10': self._handle_orderbook_update, + 'book-25': self._handle_orderbook_update, + 'book-100': self._handle_orderbook_update, + 'book-500': self._handle_orderbook_update, + 'book-1000': self._handle_orderbook_update, + 'trade': self._handle_trade_update, + 'systemStatus': self._handle_system_status, + 'subscriptionStatus': self._handle_subscription_status, + 'heartbeat': self._handle_heartbeat + }) + + # Kraken-specific tracking + self.channel_map = {} # channel_id -> (channel_name, symbol) + self.subscription_ids = {} # symbol -> subscription_id + self.system_status = 'unknown' + + logger.info("Kraken connector initialized") + + def _get_message_type(self, data: Dict) -> str: + """ + Determine message type from Kraken message data. + + Args: + data: Parsed message data + + Returns: + str: Message type identifier + """ + # Kraken messages can be arrays or objects + if isinstance(data, list) and len(data) >= 2: + # Data message format: [channelID, data, channelName, pair] + if len(data) >= 4: + channel_name = data[2] + return channel_name + else: + return 'unknown' + elif isinstance(data, dict): + # Status/control messages + if 'event' in data: + return data['event'] + elif 'errorMessage' in data: + return 'error' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to Kraken format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: Kraken pair format (e.g., 'XBT/USD') + """ + # Kraken uses different symbol names + symbol_map = { + 'BTCUSDT': 'XBT/USD', + 'ETHUSDT': 'ETH/USD', + 'ADAUSDT': 'ADA/USD', + 'DOTUSDT': 'DOT/USD', + 'LINKUSDT': 'LINK/USD', + 'LTCUSDT': 'LTC/USD', + 'XRPUSDT': 'XRP/USD', + 'BCHUSDT': 'BCH/USD', + 'EOSUSDT': 'EOS/USD', + 'XLMUSDT': 'XLM/USD' + } + + if symbol.upper() in symbol_map: + return symbol_map[symbol.upper()] + else: + # Generic conversion: BTCUSDT -> BTC/USD + if symbol.endswith('USDT'): + base = symbol[:-4] + return f"{base}/USD" + elif symbol.endswith('USD'): + base = symbol[:-3] + return f"{base}/USD" + else: + # Assume it's already in correct format + return symbol.upper() + + def _denormalize_symbol(self, kraken_pair: str) -> str: + """ + Convert Kraken pair back to standard format. + + Args: + kraken_pair: Kraken pair format (e.g., 'XBT/USD') + + Returns: + str: Standard symbol format (e.g., 'BTCUSDT') + """ + # Reverse mapping + reverse_map = { + 'XBT/USD': 'BTCUSDT', + 'ETH/USD': 'ETHUSDT', + 'ADA/USD': 'ADAUSDT', + 'DOT/USD': 'DOTUSDT', + 'LINK/USD': 'LINKUSDT', + 'LTC/USD': 'LTCUSDT', + 'XRP/USD': 'XRPUSDT', + 'BCH/USD': 'BCHUSDT', + 'EOS/USD': 'EOSUSDT', + 'XLM/USD': 'XLMUSDT' + } + + if kraken_pair in reverse_map: + return reverse_map[kraken_pair] + else: + # Generic conversion: BTC/USD -> BTCUSDT + if '/' in kraken_pair: + base, quote = kraken_pair.split('/', 1) + if quote == 'USD': + return f"{base}USDT" + else: + return f"{base}{quote}" + return kraken_pair + + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + kraken_pair = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "event": "subscribe", + "pair": [kraken_pair], + "subscription": { + "name": "book", + "depth": 25 # 25 levels + } + } + + # Add authentication if credentials provided + if self.api_key and self.api_secret: + subscription_msg["subscription"]["token"] = self._get_auth_token() + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + logger.info(f"Subscribed to order book for {symbol} ({kraken_pair}) on Kraken") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on Kraken") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + kraken_pair = self.normalize_symbol(symbol) + + # Create subscription message + subscription_msg = { + "event": "subscribe", + "pair": [kraken_pair], + "subscription": { + "name": "trade" + } + } + + # Add authentication if credentials provided + if self.api_key and self.api_secret: + subscription_msg["subscription"]["token"] = self._get_auth_token() + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + logger.info(f"Subscribed to trades for {symbol} ({kraken_pair}) on Kraken") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on Kraken") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + kraken_pair = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "event": "unsubscribe", + "pair": [kraken_pair], + "subscription": { + "name": "book" + } + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + logger.info(f"Unsubscribed from order book for {symbol} ({kraken_pair}) on Kraken") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on Kraken") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + kraken_pair = self.normalize_symbol(symbol) + + # Create unsubscription message + unsubscription_msg = { + "event": "unsubscribe", + "pair": [kraken_pair], + "subscription": { + "name": "trade" + } + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + logger.info(f"Unsubscribed from trades for {symbol} ({kraken_pair}) on Kraken") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on Kraken") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols from Kraken. + + Returns: + List[str]: List of available symbols in standard format + """ + try: + import aiohttp + + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.API_URL}/0/public/AssetPairs") as response: + if response.status == 200: + data = await response.json() + + if data.get('error'): + logger.error(f"Kraken API error: {data['error']}") + return [] + + symbols = [] + pairs = data.get('result', {}) + + for pair_name, pair_info in pairs.items(): + # Skip dark pool pairs + if '.d' in pair_name: + continue + + # Get the WebSocket pair name + ws_name = pair_info.get('wsname') + if ws_name: + # Convert to standard format + standard_symbol = self._denormalize_symbol(ws_name) + symbols.append(standard_symbol) + + logger.info(f"Retrieved {len(symbols)} symbols from Kraken") + return symbols + else: + logger.error(f"Failed to get symbols from Kraken: HTTP {response.status}") + return [] + + except Exception as e: + logger.error(f"Error getting symbols from Kraken: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot from Kraken REST API. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + try: + import aiohttp + + kraken_pair = self.normalize_symbol(symbol) + + url = f"{self.API_URL}/0/public/Depth" + params = { + 'pair': kraken_pair, + 'count': min(depth, 500) # Kraken max is 500 + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + + if data.get('error'): + logger.error(f"Kraken API error: {data['error']}") + return None + + result = data.get('result', {}) + # Kraken returns data with the actual pair name as key + pair_data = None + for key, value in result.items(): + if isinstance(value, dict) and 'bids' in value and 'asks' in value: + pair_data = value + break + + if pair_data: + return self._parse_orderbook_snapshot(pair_data, symbol) + else: + logger.error(f"No order book data found for {symbol}") + return None + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """ + Parse Kraken order book data into OrderBookSnapshot. + + Args: + data: Raw Kraken order book data + symbol: Trading symbol + + Returns: + OrderBookSnapshot: Parsed order book + """ + try: + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks + ) + + return orderbook + + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_orderbook_update(self, data: List) -> None: + """ + Handle order book update from Kraken. + + Args: + data: Order book update data (Kraken array format) + """ + try: + set_correlation_id() + + # Kraken format: [channelID, data, channelName, pair] + if len(data) < 4: + logger.warning("Invalid Kraken order book update format") + return + + channel_id = data[0] + book_data = data[1] + channel_name = data[2] + kraken_pair = data[3] + + symbol = self._denormalize_symbol(kraken_pair) + + # Track channel mapping + self.channel_map[channel_id] = (channel_name, symbol) + + # Parse order book data + bids = [] + asks = [] + + # Kraken book data can have 'b' (bids), 'a' (asks), 'bs' (bid snapshot), 'as' (ask snapshot) + if 'b' in book_data: + for bid_data in book_data['b']: + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + if 'bs' in book_data: # Bid snapshot + for bid_data in book_data['bs']: + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + if 'a' in book_data: + for ask_data in book_data['a']: + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + if 'as' in book_data: # Ask snapshot + for ask_data in book_data['as']: + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") + + async def _handle_trade_update(self, data: List) -> None: + """ + Handle trade update from Kraken. + + Args: + data: Trade update data (Kraken array format) + """ + try: + set_correlation_id() + + # Kraken format: [channelID, data, channelName, pair] + if len(data) < 4: + logger.warning("Invalid Kraken trade update format") + return + + channel_id = data[0] + trade_data = data[1] + channel_name = data[2] + kraken_pair = data[3] + + symbol = self._denormalize_symbol(kraken_pair) + + # Track channel mapping + self.channel_map[channel_id] = (channel_name, symbol) + + # Process trade data (array of trades) + for trade_info in trade_data: + if len(trade_info) >= 6: + price = float(trade_info[0]) + size = float(trade_info[1]) + timestamp = float(trade_info[2]) + side = trade_info[3] # 'b' for buy, 's' for sell + order_type = trade_info[4] # 'm' for market, 'l' for limit + misc = trade_info[5] if len(trade_info) > 5 else '' + + # Validate data + if not validate_price(price) or not validate_volume(size): + logger.warning(f"Invalid trade data: price={price}, size={size}") + continue + + # Convert side + trade_side = 'buy' if side == 'b' else 'sell' + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(timestamp, tz=timezone.utc), + price=price, + size=size, + side=trade_side, + trade_id=f"{timestamp}_{price}_{size}" # Generate ID + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {trade_side} {size} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") + + async def _handle_system_status(self, data: Dict) -> None: + """ + Handle system status message from Kraken. + + Args: + data: System status data + """ + try: + status = data.get('status', 'unknown') + version = data.get('version', 'unknown') + + self.system_status = status + logger.info(f"Kraken system status: {status} (version: {version})") + + if status != 'online': + logger.warning(f"Kraken system not online: {status}") + + except Exception as e: + logger.error(f"Error handling system status: {e}") + + async def _handle_subscription_status(self, data: Dict) -> None: + """ + Handle subscription status message from Kraken. + + Args: + data: Subscription status data + """ + try: + status = data.get('status', 'unknown') + channel_name = data.get('channelName', 'unknown') + pair = data.get('pair', 'unknown') + subscription = data.get('subscription', {}) + + if status == 'subscribed': + logger.info(f"Kraken subscription confirmed: {channel_name} for {pair}") + + # Store subscription ID if provided + if 'channelID' in data: + channel_id = data['channelID'] + symbol = self._denormalize_symbol(pair) + self.channel_map[channel_id] = (channel_name, symbol) + + elif status == 'unsubscribed': + logger.info(f"Kraken unsubscription confirmed: {channel_name} for {pair}") + elif status == 'error': + error_message = data.get('errorMessage', 'Unknown error') + logger.error(f"Kraken subscription error: {error_message}") + + except Exception as e: + logger.error(f"Error handling subscription status: {e}") + + async def _handle_heartbeat(self, data: Dict) -> None: + """ + Handle heartbeat message from Kraken. + + Args: + data: Heartbeat data + """ + logger.debug("Received Kraken heartbeat") + + def _get_auth_token(self) -> str: + """ + Generate authentication token for Kraken WebSocket. + + Returns: + str: Authentication token + """ + if not self.api_key or not self.api_secret: + return "" + + try: + # This is a simplified version - actual Kraken auth is more complex + # and requires getting a token from the REST API first + nonce = str(int(time.time() * 1000)) + message = nonce + self.api_key + signature = hmac.new( + base64.b64decode(self.api_secret), + message.encode('utf-8'), + hashlib.sha512 + ).hexdigest() + + return f"{self.api_key}:{signature}:{nonce}" + + except Exception as e: + logger.error(f"Error generating auth token: {e}") + return "" + + def get_kraken_stats(self) -> Dict[str, Any]: + """Get Kraken-specific statistics.""" + base_stats = self.get_stats() + + kraken_stats = { + 'system_status': self.system_status, + 'channel_mappings': len(self.channel_map), + 'authenticated': bool(self.api_key and self.api_secret) + } + + base_stats.update(kraken_stats) + return base_stats \ No newline at end of file diff --git a/COBY/examples/multi_exchange_example.py b/COBY/examples/multi_exchange_example.py new file mode 100644 index 0000000..38f01b8 --- /dev/null +++ b/COBY/examples/multi_exchange_example.py @@ -0,0 +1,284 @@ +""" +Example demonstrating multi-exchange connectivity with Binance, Coinbase, and Kraken. +Shows how to connect to multiple exchanges simultaneously and handle their data. +""" + +import asyncio +import logging +from datetime import datetime + +from ..connectors.binance_connector import BinanceConnector +from ..connectors.coinbase_connector import CoinbaseConnector +from ..connectors.kraken_connector import KrakenConnector +from ..models.core import OrderBookSnapshot, TradeEvent + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class MultiExchangeManager: + """Manages connections to multiple exchanges.""" + + def __init__(self): + """Initialize multi-exchange manager.""" + # Initialize connectors + self.connectors = { + 'binance': BinanceConnector(), + 'coinbase': CoinbaseConnector(use_sandbox=True), # Use sandbox for testing + 'kraken': KrakenConnector() + } + + # Data tracking + self.data_received = { + 'binance': {'orderbooks': 0, 'trades': 0}, + 'coinbase': {'orderbooks': 0, 'trades': 0}, + 'kraken': {'orderbooks': 0, 'trades': 0} + } + + # Set up data callbacks + for name, connector in self.connectors.items(): + connector.add_data_callback(lambda data, exchange=name: self._handle_data(exchange, data)) + + def _handle_data(self, exchange: str, data): + """Handle data from any exchange.""" + try: + if isinstance(data, OrderBookSnapshot): + self.data_received[exchange]['orderbooks'] += 1 + logger.info(f"📊 {exchange.upper()}: Order book for {data.symbol} - " + f"Bids: {len(data.bids)}, Asks: {len(data.asks)}") + + # Show best bid/ask if available + if data.bids and data.asks: + best_bid = max(data.bids, key=lambda x: x.price) + best_ask = min(data.asks, key=lambda x: x.price) + spread = best_ask.price - best_bid.price + logger.info(f" Best: {best_bid.price} / {best_ask.price} (spread: {spread:.2f})") + + elif isinstance(data, TradeEvent): + self.data_received[exchange]['trades'] += 1 + logger.info(f"💰 {exchange.upper()}: Trade {data.symbol} - " + f"{data.side} {data.size} @ {data.price}") + + except Exception as e: + logger.error(f"Error handling data from {exchange}: {e}") + + async def connect_all(self): + """Connect to all exchanges.""" + logger.info("Connecting to all exchanges...") + + connection_tasks = [] + for name, connector in self.connectors.items(): + task = asyncio.create_task(self._connect_exchange(name, connector)) + connection_tasks.append(task) + + # Wait for all connections + results = await asyncio.gather(*connection_tasks, return_exceptions=True) + + # Report results + for i, (name, result) in enumerate(zip(self.connectors.keys(), results)): + if isinstance(result, Exception): + logger.error(f"❌ Failed to connect to {name}: {result}") + elif result: + logger.info(f"✅ Connected to {name}") + else: + logger.warning(f"⚠️ Connection to {name} returned False") + + async def _connect_exchange(self, name: str, connector) -> bool: + """Connect to a single exchange.""" + try: + return await connector.connect() + except Exception as e: + logger.error(f"Error connecting to {name}: {e}") + return False + + async def subscribe_to_symbols(self, symbols: list): + """Subscribe to order book and trade data for given symbols.""" + logger.info(f"Subscribing to symbols: {symbols}") + + for symbol in symbols: + for name, connector in self.connectors.items(): + try: + if connector.is_connected: + # Subscribe to order book + await connector.subscribe_orderbook(symbol) + logger.info(f"📈 Subscribed to {symbol} order book on {name}") + + # Subscribe to trades + await connector.subscribe_trades(symbol) + logger.info(f"💱 Subscribed to {symbol} trades on {name}") + + # Small delay between subscriptions + await asyncio.sleep(0.5) + else: + logger.warning(f"⚠️ {name} not connected, skipping {symbol}") + + except Exception as e: + logger.error(f"Error subscribing to {symbol} on {name}: {e}") + + async def run_for_duration(self, duration_seconds: int): + """Run data collection for specified duration.""" + logger.info(f"Running data collection for {duration_seconds} seconds...") + + start_time = datetime.now() + + # Print statistics periodically + while (datetime.now() - start_time).seconds < duration_seconds: + await asyncio.sleep(10) # Print stats every 10 seconds + self._print_statistics() + + logger.info("Data collection period completed") + + def _print_statistics(self): + """Print current data statistics.""" + logger.info("📊 Current Statistics:") + total_orderbooks = 0 + total_trades = 0 + + for exchange, stats in self.data_received.items(): + orderbooks = stats['orderbooks'] + trades = stats['trades'] + total_orderbooks += orderbooks + total_trades += trades + + logger.info(f" {exchange.upper()}: {orderbooks} order books, {trades} trades") + + logger.info(f" TOTAL: {total_orderbooks} order books, {total_trades} trades") + + async def disconnect_all(self): + """Disconnect from all exchanges.""" + logger.info("Disconnecting from all exchanges...") + + for name, connector in self.connectors.items(): + try: + await connector.disconnect() + logger.info(f"✅ Disconnected from {name}") + except Exception as e: + logger.error(f"Error disconnecting from {name}: {e}") + + def get_connector_stats(self): + """Get statistics from all connectors.""" + stats = {} + for name, connector in self.connectors.items(): + try: + if hasattr(connector, 'get_stats'): + stats[name] = connector.get_stats() + else: + stats[name] = { + 'connected': connector.is_connected, + 'exchange': connector.exchange_name + } + except Exception as e: + stats[name] = {'error': str(e)} + + return stats + + +async def demonstrate_multi_exchange(): + """Demonstrate multi-exchange connectivity.""" + logger.info("=== Multi-Exchange Connectivity Demo ===") + + # Create manager + manager = MultiExchangeManager() + + try: + # Connect to all exchanges + await manager.connect_all() + + # Wait a moment for connections to stabilize + await asyncio.sleep(2) + + # Subscribe to some popular symbols + symbols = ['BTCUSDT', 'ETHUSDT'] + await manager.subscribe_to_symbols(symbols) + + # Run data collection for 30 seconds + await manager.run_for_duration(30) + + # Print final statistics + logger.info("=== Final Statistics ===") + manager._print_statistics() + + # Print connector statistics + logger.info("=== Connector Statistics ===") + connector_stats = manager.get_connector_stats() + for exchange, stats in connector_stats.items(): + logger.info(f"{exchange.upper()}: {stats}") + + except Exception as e: + logger.error(f"Error in multi-exchange demo: {e}") + + finally: + # Clean up + await manager.disconnect_all() + + +async def test_individual_connectors(): + """Test each connector individually.""" + logger.info("=== Individual Connector Tests ===") + + # Test Binance + logger.info("Testing Binance connector...") + binance = BinanceConnector() + try: + symbols = await binance.get_symbols() + logger.info(f"Binance symbols available: {len(symbols)}") + + # Test order book snapshot + orderbook = await binance.get_orderbook_snapshot('BTCUSDT') + if orderbook: + logger.info(f"Binance order book: {len(orderbook.bids)} bids, {len(orderbook.asks)} asks") + except Exception as e: + logger.error(f"Binance test error: {e}") + + # Test Coinbase + logger.info("Testing Coinbase connector...") + coinbase = CoinbaseConnector(use_sandbox=True) + try: + symbols = await coinbase.get_symbols() + logger.info(f"Coinbase symbols available: {len(symbols)}") + + # Test order book snapshot + orderbook = await coinbase.get_orderbook_snapshot('BTCUSDT') + if orderbook: + logger.info(f"Coinbase order book: {len(orderbook.bids)} bids, {len(orderbook.asks)} asks") + except Exception as e: + logger.error(f"Coinbase test error: {e}") + + # Test Kraken + logger.info("Testing Kraken connector...") + kraken = KrakenConnector() + try: + symbols = await kraken.get_symbols() + logger.info(f"Kraken symbols available: {len(symbols)}") + + # Test order book snapshot + orderbook = await kraken.get_orderbook_snapshot('BTCUSDT') + if orderbook: + logger.info(f"Kraken order book: {len(orderbook.bids)} bids, {len(orderbook.asks)} asks") + except Exception as e: + logger.error(f"Kraken test error: {e}") + + +async def main(): + """Run all demonstrations.""" + logger.info("Starting Multi-Exchange Examples...") + + try: + # Test individual connectors first + await test_individual_connectors() + + await asyncio.sleep(2) + + # Then test multi-exchange connectivity + await demonstrate_multi_exchange() + + logger.info("All multi-exchange examples completed successfully!") + + except Exception as e: + logger.error(f"Error running examples: {e}") + + +if __name__ == "__main__": + # Run the examples + asyncio.run(main()) \ No newline at end of file diff --git a/COBY/tests/test_coinbase_connector.py b/COBY/tests/test_coinbase_connector.py new file mode 100644 index 0000000..7baf6ef --- /dev/null +++ b/COBY/tests/test_coinbase_connector.py @@ -0,0 +1,364 @@ +""" +Unit tests for Coinbase exchange connector. +""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, patch +from datetime import datetime, timezone + +from ..connectors.coinbase_connector import CoinbaseConnector +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel + + +class TestCoinbaseConnector: + """Test suite for Coinbase connector.""" + + @pytest.fixture + def connector(self): + """Create connector instance for testing.""" + return CoinbaseConnector(use_sandbox=True) + + def test_initialization(self, connector): + """Test connector initializes correctly.""" + assert connector.exchange_name == "coinbase" + assert connector.use_sandbox is True + assert connector.SANDBOX_URL in connector.websocket_url + assert 'l2update' in connector.message_handlers + assert 'match' in connector.message_handlers + + def test_symbol_normalization(self, connector): + """Test symbol normalization to Coinbase format.""" + # Test standard conversions + assert connector.normalize_symbol('BTCUSDT') == 'BTC-USD' + assert connector.normalize_symbol('ETHUSDT') == 'ETH-USD' + assert connector.normalize_symbol('ADAUSDT') == 'ADA-USD' + + # Test generic conversion + assert connector.normalize_symbol('LINKUSDT') == 'LINK-USD' + + # Test already correct format + assert connector.normalize_symbol('BTC-USD') == 'BTC-USD' + + def test_symbol_denormalization(self, connector): + """Test converting Coinbase format back to standard.""" + assert connector._denormalize_symbol('BTC-USD') == 'BTCUSDT' + assert connector._denormalize_symbol('ETH-USD') == 'ETHUSDT' + assert connector._denormalize_symbol('ADA-USD') == 'ADAUSDT' + + # Test other quote currencies + assert connector._denormalize_symbol('BTC-EUR') == 'BTCEUR' + + def test_message_type_detection(self, connector): + """Test message type detection.""" + # Test l2update message + l2_message = {'type': 'l2update', 'product_id': 'BTC-USD'} + assert connector._get_message_type(l2_message) == 'l2update' + + # Test match message + match_message = {'type': 'match', 'product_id': 'BTC-USD'} + assert connector._get_message_type(match_message) == 'match' + + # Test error message + error_message = {'type': 'error', 'message': 'Invalid signature'} + assert connector._get_message_type(error_message) == 'error' + + # Test unknown message + unknown_message = {'data': 'something'} + assert connector._get_message_type(unknown_message) == 'unknown' + + @pytest.mark.asyncio + async def test_subscription_methods(self, connector): + """Test subscription and unsubscription methods.""" + # Mock the _send_message method + connector._send_message = AsyncMock(return_value=True) + + # Test order book subscription + await connector.subscribe_orderbook('BTCUSDT') + + # Verify subscription was tracked + assert 'BTCUSDT' in connector.subscriptions + assert 'orderbook' in connector.subscriptions['BTCUSDT'] + assert 'level2' in connector.subscribed_channels + assert 'BTC-USD' in connector.product_ids + + # Verify correct message was sent + connector._send_message.assert_called() + call_args = connector._send_message.call_args[0][0] + assert call_args['type'] == 'subscribe' + assert 'BTC-USD' in call_args['product_ids'] + assert 'level2' in call_args['channels'] + + # Test trade subscription + await connector.subscribe_trades('ETHUSDT') + + assert 'ETHUSDT' in connector.subscriptions + assert 'trades' in connector.subscriptions['ETHUSDT'] + assert 'matches' in connector.subscribed_channels + assert 'ETH-USD' in connector.product_ids + + # Test unsubscription + await connector.unsubscribe_orderbook('BTCUSDT') + + # Verify unsubscription + if 'BTCUSDT' in connector.subscriptions: + assert 'orderbook' not in connector.subscriptions['BTCUSDT'] + + @pytest.mark.asyncio + async def test_orderbook_snapshot_parsing(self, connector): + """Test parsing order book snapshot data.""" + # Mock order book data from Coinbase + mock_data = { + 'sequence': 12345, + 'bids': [ + ['50000.00', '1.5', 1], + ['49999.00', '2.0', 2] + ], + 'asks': [ + ['50001.00', '1.2', 1], + ['50002.00', '0.8', 1] + ] + } + + # Parse the data + orderbook = connector._parse_orderbook_snapshot(mock_data, 'BTCUSDT') + + # Verify parsing + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == 'BTCUSDT' + assert orderbook.exchange == 'coinbase' + assert orderbook.sequence_id == 12345 + + # Verify bids + assert len(orderbook.bids) == 2 + assert orderbook.bids[0].price == 50000.00 + assert orderbook.bids[0].size == 1.5 + assert orderbook.bids[1].price == 49999.00 + assert orderbook.bids[1].size == 2.0 + + # Verify asks + assert len(orderbook.asks) == 2 + assert orderbook.asks[0].price == 50001.00 + assert orderbook.asks[0].size == 1.2 + assert orderbook.asks[1].price == 50002.00 + assert orderbook.asks[1].size == 0.8 + + @pytest.mark.asyncio + async def test_orderbook_update_handling(self, connector): + """Test handling order book l2update messages.""" + # Mock callback + callback_called = False + received_data = None + + def mock_callback(data): + nonlocal callback_called, received_data + callback_called = True + received_data = data + + connector.add_data_callback(mock_callback) + + # Mock l2update message + update_message = { + 'type': 'l2update', + 'product_id': 'BTC-USD', + 'time': '2023-01-01T12:00:00.000000Z', + 'changes': [ + ['buy', '50000.00', '1.5'], + ['sell', '50001.00', '1.2'] + ] + } + + # Handle the message + await connector._handle_orderbook_update(update_message) + + # Verify callback was called + assert callback_called + assert isinstance(received_data, OrderBookSnapshot) + assert received_data.symbol == 'BTCUSDT' + assert received_data.exchange == 'coinbase' + + # Verify bids and asks + assert len(received_data.bids) == 1 + assert received_data.bids[0].price == 50000.00 + assert received_data.bids[0].size == 1.5 + + assert len(received_data.asks) == 1 + assert received_data.asks[0].price == 50001.00 + assert received_data.asks[0].size == 1.2 + + @pytest.mark.asyncio + async def test_trade_handling(self, connector): + """Test handling trade (match) messages.""" + # Mock callback + callback_called = False + received_data = None + + def mock_callback(data): + nonlocal callback_called, received_data + callback_called = True + received_data = data + + connector.add_data_callback(mock_callback) + + # Mock match message + trade_message = { + 'type': 'match', + 'product_id': 'BTC-USD', + 'time': '2023-01-01T12:00:00.000000Z', + 'price': '50000.50', + 'size': '0.1', + 'side': 'buy', + 'trade_id': 12345 + } + + # Handle the message + await connector._handle_trade_update(trade_message) + + # Verify callback was called + assert callback_called + assert isinstance(received_data, TradeEvent) + assert received_data.symbol == 'BTCUSDT' + assert received_data.exchange == 'coinbase' + assert received_data.price == 50000.50 + assert received_data.size == 0.1 + assert received_data.side == 'buy' + assert received_data.trade_id == '12345' + + @pytest.mark.asyncio + async def test_error_handling(self, connector): + """Test error message handling.""" + # Test error message + error_message = { + 'type': 'error', + 'message': 'Invalid signature', + 'reason': 'Authentication failed' + } + + # Should not raise exception + await connector._handle_error_message(error_message) + + @pytest.mark.asyncio + async def test_get_symbols(self, connector): + """Test getting available symbols.""" + # Mock HTTP response + mock_products = [ + { + 'id': 'BTC-USD', + 'status': 'online', + 'trading_disabled': False + }, + { + 'id': 'ETH-USD', + 'status': 'online', + 'trading_disabled': False + }, + { + 'id': 'DISABLED-USD', + 'status': 'offline', + 'trading_disabled': True + } + ] + + with patch('aiohttp.ClientSession.get') as mock_get: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_products) + mock_get.return_value.__aenter__.return_value = mock_response + + symbols = await connector.get_symbols() + + # Should only return online, enabled symbols + assert 'BTCUSDT' in symbols + assert 'ETHUSDT' in symbols + assert 'DISABLEDUSDT' not in symbols + + @pytest.mark.asyncio + async def test_get_orderbook_snapshot(self, connector): + """Test getting order book snapshot from REST API.""" + # Mock HTTP response + mock_orderbook = { + 'sequence': 12345, + 'bids': [['50000.00', '1.5']], + 'asks': [['50001.00', '1.2']] + } + + with patch('aiohttp.ClientSession.get') as mock_get: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_orderbook) + mock_get.return_value.__aenter__.return_value = mock_response + + orderbook = await connector.get_orderbook_snapshot('BTCUSDT') + + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == 'BTCUSDT' + assert orderbook.exchange == 'coinbase' + assert len(orderbook.bids) == 1 + assert len(orderbook.asks) == 1 + + def test_authentication_headers(self, connector): + """Test authentication header generation.""" + # Set up credentials + connector.api_key = 'test_key' + connector.api_secret = 'dGVzdF9zZWNyZXQ=' # base64 encoded 'test_secret' + connector.passphrase = 'test_passphrase' + + # Test message + test_message = {'type': 'subscribe', 'channels': ['level2']} + + # Generate headers + headers = connector._get_auth_headers(test_message) + + # Verify headers are present + assert 'CB-ACCESS-KEY' in headers + assert 'CB-ACCESS-SIGN' in headers + assert 'CB-ACCESS-TIMESTAMP' in headers + assert 'CB-ACCESS-PASSPHRASE' in headers + + assert headers['CB-ACCESS-KEY'] == 'test_key' + assert headers['CB-ACCESS-PASSPHRASE'] == 'test_passphrase' + + def test_statistics(self, connector): + """Test getting connector statistics.""" + # Add some test data + connector.subscribed_channels.add('level2') + connector.product_ids.add('BTC-USD') + + stats = connector.get_coinbase_stats() + + assert stats['exchange'] == 'coinbase' + assert 'level2' in stats['subscribed_channels'] + assert 'BTC-USD' in stats['product_ids'] + assert stats['use_sandbox'] is True + assert 'authenticated' in stats + + +async def test_coinbase_integration(): + """Integration test for Coinbase connector.""" + connector = CoinbaseConnector(use_sandbox=True) + + try: + # Test basic functionality + assert connector.exchange_name == "coinbase" + + # Test symbol normalization + assert connector.normalize_symbol('BTCUSDT') == 'BTC-USD' + assert connector._denormalize_symbol('BTC-USD') == 'BTCUSDT' + + # Test message type detection + test_message = {'type': 'l2update', 'product_id': 'BTC-USD'} + assert connector._get_message_type(test_message) == 'l2update' + + print("✓ Coinbase connector integration test passed") + return True + + except Exception as e: + print(f"✗ Coinbase connector integration test failed: {e}") + return False + + +if __name__ == "__main__": + # Run integration test + success = asyncio.run(test_coinbase_integration()) + if not success: + exit(1) \ No newline at end of file diff --git a/COBY/tests/test_kraken_connector.py b/COBY/tests/test_kraken_connector.py new file mode 100644 index 0000000..5a9bab6 --- /dev/null +++ b/COBY/tests/test_kraken_connector.py @@ -0,0 +1,398 @@ +""" +Unit tests for Kraken exchange connector. +""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, patch +from datetime import datetime, timezone + +from ..connectors.kraken_connector import KrakenConnector +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel + + +class TestKrakenConnector: + """Test suite for Kraken connector.""" + + @pytest.fixture + def connector(self): + """Create connector instance for testing.""" + return KrakenConnector() + + def test_initialization(self, connector): + """Test connector initializes correctly.""" + assert connector.exchange_name == "kraken" + assert connector.WEBSOCKET_URL in connector.websocket_url + assert 'book-25' in connector.message_handlers + assert 'trade' in connector.message_handlers + assert connector.system_status == 'unknown' + + def test_symbol_normalization(self, connector): + """Test symbol normalization to Kraken format.""" + # Test standard conversions + assert connector.normalize_symbol('BTCUSDT') == 'XBT/USD' + assert connector.normalize_symbol('ETHUSDT') == 'ETH/USD' + assert connector.normalize_symbol('ADAUSDT') == 'ADA/USD' + + # Test generic conversion + assert connector.normalize_symbol('LINKUSDT') == 'LINK/USD' + + # Test already correct format + assert connector.normalize_symbol('XBT/USD') == 'XBT/USD' + + def test_symbol_denormalization(self, connector): + """Test converting Kraken format back to standard.""" + assert connector._denormalize_symbol('XBT/USD') == 'BTCUSDT' + assert connector._denormalize_symbol('ETH/USD') == 'ETHUSDT' + assert connector._denormalize_symbol('ADA/USD') == 'ADAUSDT' + + # Test other quote currencies + assert connector._denormalize_symbol('BTC/EUR') == 'BTCEUR' + + def test_message_type_detection(self, connector): + """Test message type detection.""" + # Test order book message (array format) + book_message = [123, {'b': [['50000', '1.5']]}, 'book-25', 'XBT/USD'] + assert connector._get_message_type(book_message) == 'book-25' + + # Test trade message (array format) + trade_message = [456, [['50000', '0.1', 1609459200, 'b', 'm', '']], 'trade', 'XBT/USD'] + assert connector._get_message_type(trade_message) == 'trade' + + # Test status message (object format) + status_message = {'event': 'systemStatus', 'status': 'online'} + assert connector._get_message_type(status_message) == 'systemStatus' + + # Test subscription message + sub_message = {'event': 'subscriptionStatus', 'status': 'subscribed'} + assert connector._get_message_type(sub_message) == 'subscriptionStatus' + + # Test unknown message + unknown_message = {'data': 'something'} + assert connector._get_message_type(unknown_message) == 'unknown' + + @pytest.mark.asyncio + async def test_subscription_methods(self, connector): + """Test subscription and unsubscription methods.""" + # Mock the _send_message method + connector._send_message = AsyncMock(return_value=True) + + # Test order book subscription + await connector.subscribe_orderbook('BTCUSDT') + + # Verify subscription was tracked + assert 'BTCUSDT' in connector.subscriptions + assert 'orderbook' in connector.subscriptions['BTCUSDT'] + + # Verify correct message was sent + connector._send_message.assert_called() + call_args = connector._send_message.call_args[0][0] + assert call_args['event'] == 'subscribe' + assert 'XBT/USD' in call_args['pair'] + assert call_args['subscription']['name'] == 'book' + + # Test trade subscription + await connector.subscribe_trades('ETHUSDT') + + assert 'ETHUSDT' in connector.subscriptions + assert 'trades' in connector.subscriptions['ETHUSDT'] + + # Test unsubscription + await connector.unsubscribe_orderbook('BTCUSDT') + + # Verify unsubscription message + call_args = connector._send_message.call_args[0][0] + assert call_args['event'] == 'unsubscribe' + + @pytest.mark.asyncio + async def test_orderbook_snapshot_parsing(self, connector): + """Test parsing order book snapshot data.""" + # Mock order book data from Kraken + mock_data = { + 'bids': [ + ['50000.00', '1.5', 1609459200], + ['49999.00', '2.0', 1609459201] + ], + 'asks': [ + ['50001.00', '1.2', 1609459200], + ['50002.00', '0.8', 1609459201] + ] + } + + # Parse the data + orderbook = connector._parse_orderbook_snapshot(mock_data, 'BTCUSDT') + + # Verify parsing + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == 'BTCUSDT' + assert orderbook.exchange == 'kraken' + + # Verify bids + assert len(orderbook.bids) == 2 + assert orderbook.bids[0].price == 50000.00 + assert orderbook.bids[0].size == 1.5 + assert orderbook.bids[1].price == 49999.00 + assert orderbook.bids[1].size == 2.0 + + # Verify asks + assert len(orderbook.asks) == 2 + assert orderbook.asks[0].price == 50001.00 + assert orderbook.asks[0].size == 1.2 + assert orderbook.asks[1].price == 50002.00 + assert orderbook.asks[1].size == 0.8 + + @pytest.mark.asyncio + async def test_orderbook_update_handling(self, connector): + """Test handling order book update messages.""" + # Mock callback + callback_called = False + received_data = None + + def mock_callback(data): + nonlocal callback_called, received_data + callback_called = True + received_data = data + + connector.add_data_callback(mock_callback) + + # Mock Kraken order book update message + update_message = [ + 123, # channel ID + { + 'b': [['50000.00', '1.5', '1609459200.123456']], + 'a': [['50001.00', '1.2', '1609459200.123456']] + }, + 'book-25', + 'XBT/USD' + ] + + # Handle the message + await connector._handle_orderbook_update(update_message) + + # Verify callback was called + assert callback_called + assert isinstance(received_data, OrderBookSnapshot) + assert received_data.symbol == 'BTCUSDT' + assert received_data.exchange == 'kraken' + + # Verify channel mapping was stored + assert 123 in connector.channel_map + assert connector.channel_map[123] == ('book-25', 'BTCUSDT') + + # Verify bids and asks + assert len(received_data.bids) == 1 + assert received_data.bids[0].price == 50000.00 + assert received_data.bids[0].size == 1.5 + + assert len(received_data.asks) == 1 + assert received_data.asks[0].price == 50001.00 + assert received_data.asks[0].size == 1.2 + + @pytest.mark.asyncio + async def test_trade_handling(self, connector): + """Test handling trade messages.""" + # Mock callback + callback_called = False + received_trades = [] + + def mock_callback(data): + nonlocal callback_called + callback_called = True + received_trades.append(data) + + connector.add_data_callback(mock_callback) + + # Mock Kraken trade message (array of trades) + trade_message = [ + 456, # channel ID + [ + ['50000.50', '0.1', 1609459200.123456, 'b', 'm', ''], + ['50001.00', '0.05', 1609459201.123456, 's', 'l', ''] + ], + 'trade', + 'XBT/USD' + ] + + # Handle the message + await connector._handle_trade_update(trade_message) + + # Verify callback was called + assert callback_called + assert len(received_trades) == 2 + + # Verify first trade (buy) + trade1 = received_trades[0] + assert isinstance(trade1, TradeEvent) + assert trade1.symbol == 'BTCUSDT' + assert trade1.exchange == 'kraken' + assert trade1.price == 50000.50 + assert trade1.size == 0.1 + assert trade1.side == 'buy' + + # Verify second trade (sell) + trade2 = received_trades[1] + assert trade2.price == 50001.00 + assert trade2.size == 0.05 + assert trade2.side == 'sell' + + # Verify channel mapping was stored + assert 456 in connector.channel_map + assert connector.channel_map[456] == ('trade', 'BTCUSDT') + + @pytest.mark.asyncio + async def test_system_status_handling(self, connector): + """Test handling system status messages.""" + # Mock system status message + status_message = { + 'event': 'systemStatus', + 'status': 'online', + 'version': '1.0.0' + } + + # Handle the message + await connector._handle_system_status(status_message) + + # Verify status was updated + assert connector.system_status == 'online' + + @pytest.mark.asyncio + async def test_subscription_status_handling(self, connector): + """Test handling subscription status messages.""" + # Mock subscription status message + sub_message = { + 'event': 'subscriptionStatus', + 'status': 'subscribed', + 'channelName': 'book-25', + 'channelID': 123, + 'pair': 'XBT/USD', + 'subscription': {'name': 'book', 'depth': 25} + } + + # Handle the message + await connector._handle_subscription_status(sub_message) + + # Verify channel mapping was stored + assert 123 in connector.channel_map + assert connector.channel_map[123] == ('book-25', 'BTCUSDT') + + @pytest.mark.asyncio + async def test_get_symbols(self, connector): + """Test getting available symbols.""" + # Mock HTTP response + mock_response_data = { + 'error': [], + 'result': { + 'XBTUSD': { + 'wsname': 'XBT/USD' + }, + 'ETHUSD': { + 'wsname': 'ETH/USD' + }, + 'XBTUSD.d': { # Dark pool - should be filtered out + 'wsname': 'XBT/USD.d' + } + } + } + + with patch('aiohttp.ClientSession.get') as mock_get: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_response_data) + mock_get.return_value.__aenter__.return_value = mock_response + + symbols = await connector.get_symbols() + + # Should only return non-dark pool symbols + assert 'BTCUSDT' in symbols + assert 'ETHUSDT' in symbols + # Dark pool should be filtered out + assert len([s for s in symbols if '.d' in s]) == 0 + + @pytest.mark.asyncio + async def test_get_orderbook_snapshot(self, connector): + """Test getting order book snapshot from REST API.""" + # Mock HTTP response + mock_orderbook = { + 'error': [], + 'result': { + 'XBTUSD': { + 'bids': [['50000.00', '1.5', 1609459200]], + 'asks': [['50001.00', '1.2', 1609459200]] + } + } + } + + with patch('aiohttp.ClientSession.get') as mock_get: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_orderbook) + mock_get.return_value.__aenter__.return_value = mock_response + + orderbook = await connector.get_orderbook_snapshot('BTCUSDT') + + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == 'BTCUSDT' + assert orderbook.exchange == 'kraken' + assert len(orderbook.bids) == 1 + assert len(orderbook.asks) == 1 + + def test_authentication_token(self, connector): + """Test authentication token generation.""" + # Set up credentials + connector.api_key = 'test_key' + connector.api_secret = 'dGVzdF9zZWNyZXQ=' # base64 encoded + + # Generate token + token = connector._get_auth_token() + + # Should return a token (simplified implementation) + assert isinstance(token, str) + assert len(token) > 0 + + def test_statistics(self, connector): + """Test getting connector statistics.""" + # Add some test data + connector.system_status = 'online' + connector.channel_map[123] = ('book-25', 'BTCUSDT') + + stats = connector.get_kraken_stats() + + assert stats['exchange'] == 'kraken' + assert stats['system_status'] == 'online' + assert stats['channel_mappings'] == 1 + assert 'authenticated' in stats + + +async def test_kraken_integration(): + """Integration test for Kraken connector.""" + connector = KrakenConnector() + + try: + # Test basic functionality + assert connector.exchange_name == "kraken" + + # Test symbol normalization + assert connector.normalize_symbol('BTCUSDT') == 'XBT/USD' + assert connector._denormalize_symbol('XBT/USD') == 'BTCUSDT' + + # Test message type detection + test_message = [123, {}, 'book-25', 'XBT/USD'] + assert connector._get_message_type(test_message) == 'book-25' + + # Test status message + status_message = {'event': 'systemStatus', 'status': 'online'} + assert connector._get_message_type(status_message) == 'systemStatus' + + print("✓ Kraken connector integration test passed") + return True + + except Exception as e: + print(f"✗ Kraken connector integration test failed: {e}") + return False + + +if __name__ == "__main__": + # Run integration test + success = asyncio.run(test_kraken_integration()) + if not success: + exit(1) \ No newline at end of file