Binance (completed previously)

 Coinbase Pro (completed in this task)
 Kraken (completed in this task)
This commit is contained in:
Dobromir Popov
2025-08-04 23:21:21 +03:00
parent 68a556e09c
commit 4170553cf3
7 changed files with 2413 additions and 0 deletions

View File

@ -3,11 +3,17 @@ Exchange connector implementations for the COBY system.
"""
from .base_connector import BaseExchangeConnector
from .binance_connector import BinanceConnector
from .coinbase_connector import CoinbaseConnector
from .kraken_connector import KrakenConnector
from .connection_manager import ConnectionManager
from .circuit_breaker import CircuitBreaker
__all__ = [
'BaseExchangeConnector',
'BinanceConnector',
'CoinbaseConnector',
'KrakenConnector',
'ConnectionManager',
'CircuitBreaker'
]

View File

@ -0,0 +1,650 @@
"""
Coinbase Pro exchange connector implementation.
Supports WebSocket connections to Coinbase Pro (now Coinbase Advanced Trade).
"""
import json
import hmac
import hashlib
import base64
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class CoinbaseConnector(BaseExchangeConnector):
"""
Coinbase Pro WebSocket connector implementation.
Supports:
- Order book level2 streams
- Trade streams (matches)
- Symbol normalization
- Authentication for private channels (if needed)
"""
# Coinbase Pro WebSocket URLs
WEBSOCKET_URL = "wss://ws-feed.exchange.coinbase.com"
SANDBOX_URL = "wss://ws-feed-public.sandbox.exchange.coinbase.com"
API_URL = "https://api.exchange.coinbase.com"
def __init__(self, use_sandbox: bool = False, api_key: str = None,
api_secret: str = None, passphrase: str = None):
"""
Initialize Coinbase connector.
Args:
use_sandbox: Whether to use sandbox environment
api_key: API key for authentication (optional)
api_secret: API secret for authentication (optional)
passphrase: API passphrase for authentication (optional)
"""
websocket_url = self.SANDBOX_URL if use_sandbox else self.WEBSOCKET_URL
super().__init__("coinbase", websocket_url)
# Authentication credentials (optional)
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.use_sandbox = use_sandbox
# Coinbase-specific message handlers
self.message_handlers.update({
'l2update': self._handle_orderbook_update,
'match': self._handle_trade_update,
'snapshot': self._handle_orderbook_snapshot,
'error': self._handle_error_message,
'subscriptions': self._handle_subscription_response
})
# Channel management
self.subscribed_channels = set()
self.product_ids = set()
logger.info(f"Coinbase connector initialized ({'sandbox' if use_sandbox else 'production'})")
def _get_message_type(self, data: Dict) -> str:
"""
Determine message type from Coinbase message data.
Args:
data: Parsed message data
Returns:
str: Message type identifier
"""
# Coinbase uses 'type' field for message type
return data.get('type', 'unknown')
def normalize_symbol(self, symbol: str) -> str:
"""
Normalize symbol to Coinbase format.
Args:
symbol: Standard symbol format (e.g., 'BTCUSDT')
Returns:
str: Coinbase product ID format (e.g., 'BTC-USD')
"""
# Convert standard format to Coinbase product ID
if symbol.upper() == 'BTCUSDT':
return 'BTC-USD'
elif symbol.upper() == 'ETHUSDT':
return 'ETH-USD'
elif symbol.upper() == 'ADAUSDT':
return 'ADA-USD'
elif symbol.upper() == 'DOTUSDT':
return 'DOT-USD'
elif symbol.upper() == 'LINKUSDT':
return 'LINK-USD'
else:
# Generic conversion: BTCUSDT -> BTC-USD
if symbol.endswith('USDT'):
base = symbol[:-4]
return f"{base}-USD"
elif symbol.endswith('USD'):
base = symbol[:-3]
return f"{base}-USD"
else:
# Assume it's already in correct format or try to parse
if '-' in symbol:
return symbol.upper()
else:
# Default fallback
return symbol.upper()
def _denormalize_symbol(self, product_id: str) -> str:
"""
Convert Coinbase product ID back to standard format.
Args:
product_id: Coinbase product ID (e.g., 'BTC-USD')
Returns:
str: Standard symbol format (e.g., 'BTCUSDT')
"""
if '-' in product_id:
base, quote = product_id.split('-', 1)
if quote == 'USD':
return f"{base}USDT"
else:
return f"{base}{quote}"
return product_id
async def subscribe_orderbook(self, symbol: str) -> None:
"""
Subscribe to order book level2 updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
product_id = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"type": "subscribe",
"product_ids": [product_id],
"channels": ["level2"]
}
# Add authentication if credentials provided
if self.api_key and self.api_secret and self.passphrase:
subscription_msg.update(self._get_auth_headers(subscription_msg))
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
self.subscribed_channels.add('level2')
self.product_ids.add(product_id)
logger.info(f"Subscribed to order book for {symbol} ({product_id}) on Coinbase")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on Coinbase")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""
Subscribe to trade updates (matches) for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
product_id = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"type": "subscribe",
"product_ids": [product_id],
"channels": ["matches"]
}
# Add authentication if credentials provided
if self.api_key and self.api_secret and self.passphrase:
subscription_msg.update(self._get_auth_headers(subscription_msg))
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
self.subscribed_channels.add('matches')
self.product_ids.add(product_id)
logger.info(f"Subscribed to trades for {symbol} ({product_id}) on Coinbase")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on Coinbase")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""
Unsubscribe from order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
product_id = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"type": "unsubscribe",
"product_ids": [product_id],
"channels": ["level2"]
}
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('orderbook')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.product_ids.discard(product_id)
logger.info(f"Unsubscribed from order book for {symbol} ({product_id}) on Coinbase")
else:
logger.error(f"Failed to unsubscribe from order book for {symbol} on Coinbase")
except Exception as e:
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
raise
async def unsubscribe_trades(self, symbol: str) -> None:
"""
Unsubscribe from trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
product_id = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"type": "unsubscribe",
"product_ids": [product_id],
"channels": ["matches"]
}
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('trades')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
self.product_ids.discard(product_id)
logger.info(f"Unsubscribed from trades for {symbol} ({product_id}) on Coinbase")
else:
logger.error(f"Failed to unsubscribe from trades for {symbol} on Coinbase")
except Exception as e:
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
raise
async def get_symbols(self) -> List[str]:
"""
Get list of available trading symbols from Coinbase.
Returns:
List[str]: List of available symbols in standard format
"""
try:
import aiohttp
api_url = "https://api-public.sandbox.exchange.coinbase.com" if self.use_sandbox else self.API_URL
async with aiohttp.ClientSession() as session:
async with session.get(f"{api_url}/products") as response:
if response.status == 200:
data = await response.json()
symbols = []
for product in data:
if product.get('status') == 'online' and product.get('trading_disabled') is False:
product_id = product.get('id', '')
# Convert to standard format
standard_symbol = self._denormalize_symbol(product_id)
symbols.append(standard_symbol)
logger.info(f"Retrieved {len(symbols)} symbols from Coinbase")
return symbols
else:
logger.error(f"Failed to get symbols from Coinbase: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from Coinbase: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""
Get current order book snapshot from Coinbase REST API.
Args:
symbol: Trading symbol
depth: Number of price levels to retrieve (Coinbase supports up to 50)
Returns:
OrderBookSnapshot: Current order book or None if unavailable
"""
try:
import aiohttp
product_id = self.normalize_symbol(symbol)
api_url = "https://api-public.sandbox.exchange.coinbase.com" if self.use_sandbox else self.API_URL
# Coinbase supports level 1, 2, or 3
level = 2 # Level 2 gives us aggregated order book
url = f"{api_url}/products/{product_id}/book"
params = {'level': level}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self._parse_orderbook_snapshot(data, symbol)
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot:
"""
Parse Coinbase order book data into OrderBookSnapshot.
Args:
data: Raw Coinbase order book data
symbol: Trading symbol
Returns:
OrderBookSnapshot: Parsed order book
"""
try:
# Parse bids and asks
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks,
sequence_id=data.get('sequence')
)
return orderbook
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
def _get_auth_headers(self, message: Dict) -> Dict[str, str]:
"""
Generate authentication headers for Coinbase Pro API.
Args:
message: Message to authenticate
Returns:
Dict: Authentication headers
"""
if not all([self.api_key, self.api_secret, self.passphrase]):
return {}
try:
timestamp = str(time.time())
message_str = json.dumps(message, separators=(',', ':'))
# Create signature
message_to_sign = timestamp + 'GET' + '/users/self/verify' + message_str
signature = base64.b64encode(
hmac.new(
base64.b64decode(self.api_secret),
message_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
).decode('utf-8')
return {
'CB-ACCESS-KEY': self.api_key,
'CB-ACCESS-SIGN': signature,
'CB-ACCESS-TIMESTAMP': timestamp,
'CB-ACCESS-PASSPHRASE': self.passphrase
}
except Exception as e:
logger.error(f"Error generating auth headers: {e}")
return {}
async def _handle_orderbook_snapshot(self, data: Dict) -> None:
"""
Handle order book snapshot from Coinbase.
Args:
data: Order book snapshot data
"""
try:
set_correlation_id()
product_id = data.get('product_id', '')
if not product_id:
logger.warning("Order book snapshot missing product_id")
return
symbol = self._denormalize_symbol(product_id)
# Parse bids and asks
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks,
sequence_id=data.get('sequence')
)
# Notify callbacks
self._notify_data_callbacks(orderbook)
logger.debug(f"Processed order book snapshot for {symbol}")
except Exception as e:
logger.error(f"Error handling order book snapshot: {e}")
async def _handle_orderbook_update(self, data: Dict) -> None:
"""
Handle order book level2 update from Coinbase.
Args:
data: Order book update data
"""
try:
set_correlation_id()
product_id = data.get('product_id', '')
if not product_id:
logger.warning("Order book update missing product_id")
return
symbol = self._denormalize_symbol(product_id)
# Coinbase l2update format: changes array with [side, price, size]
changes = data.get('changes', [])
bids = []
asks = []
for change in changes:
if len(change) >= 3:
side = change[0] # 'buy' or 'sell'
price = float(change[1])
size = float(change[2])
if validate_price(price) and validate_volume(size):
if side == 'buy':
bids.append(PriceLevel(price=price, size=size))
elif side == 'sell':
asks.append(PriceLevel(price=price, size=size))
# Create order book update (partial snapshot)
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromisoformat(data.get('time', '').replace('Z', '+00:00')),
bids=bids,
asks=asks,
sequence_id=data.get('sequence')
)
# Notify callbacks
self._notify_data_callbacks(orderbook)
logger.debug(f"Processed order book update for {symbol}")
except Exception as e:
logger.error(f"Error handling order book update: {e}")
async def _handle_trade_update(self, data: Dict) -> None:
"""
Handle trade (match) update from Coinbase.
Args:
data: Trade update data
"""
try:
set_correlation_id()
product_id = data.get('product_id', '')
if not product_id:
logger.warning("Trade update missing product_id")
return
symbol = self._denormalize_symbol(product_id)
price = float(data.get('price', 0))
size = float(data.get('size', 0))
# Validate data
if not validate_price(price) or not validate_volume(size):
logger.warning(f"Invalid trade data: price={price}, size={size}")
return
# Determine side (Coinbase uses 'side' field for taker side)
side = data.get('side', 'unknown') # 'buy' or 'sell'
# Create trade event
trade = TradeEvent(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromisoformat(data.get('time', '').replace('Z', '+00:00')),
price=price,
size=size,
side=side,
trade_id=str(data.get('trade_id', ''))
)
# Notify callbacks
self._notify_data_callbacks(trade)
logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}")
except Exception as e:
logger.error(f"Error handling trade update: {e}")
async def _handle_subscription_response(self, data: Dict) -> None:
"""
Handle subscription confirmation from Coinbase.
Args:
data: Subscription response data
"""
try:
channels = data.get('channels', [])
logger.info(f"Coinbase subscription confirmed for channels: {channels}")
except Exception as e:
logger.error(f"Error handling subscription response: {e}")
async def _handle_error_message(self, data: Dict) -> None:
"""
Handle error message from Coinbase.
Args:
data: Error message data
"""
message = data.get('message', 'Unknown error')
reason = data.get('reason', '')
logger.error(f"Coinbase error: {message}")
if reason:
logger.error(f"Coinbase error reason: {reason}")
# Handle specific error types
if 'Invalid signature' in message:
logger.error("Authentication failed - check API credentials")
elif 'Product not found' in message:
logger.error("Invalid product ID - check symbol mapping")
def get_coinbase_stats(self) -> Dict[str, Any]:
"""Get Coinbase-specific statistics."""
base_stats = self.get_stats()
coinbase_stats = {
'subscribed_channels': list(self.subscribed_channels),
'product_ids': list(self.product_ids),
'use_sandbox': self.use_sandbox,
'authenticated': bool(self.api_key and self.api_secret and self.passphrase)
}
base_stats.update(coinbase_stats)
return base_stats

View File

@ -0,0 +1,708 @@
"""
Kraken exchange connector implementation.
Supports WebSocket connections to Kraken exchange with their specific message format.
"""
import json
import hashlib
import hmac
import base64
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel
from ..utils.logging import get_logger, set_correlation_id
from ..utils.exceptions import ValidationError, ConnectionError
from ..utils.validation import validate_symbol, validate_price, validate_volume
from .base_connector import BaseExchangeConnector
logger = get_logger(__name__)
class KrakenConnector(BaseExchangeConnector):
"""
Kraken WebSocket connector implementation.
Supports:
- Order book streams
- Trade streams
- Symbol normalization for Kraken format
- Authentication for private channels (if needed)
"""
# Kraken WebSocket URLs
WEBSOCKET_URL = "wss://ws.kraken.com"
WEBSOCKET_AUTH_URL = "wss://ws-auth.kraken.com"
API_URL = "https://api.kraken.com"
def __init__(self, api_key: str = None, api_secret: str = None):
"""
Initialize Kraken connector.
Args:
api_key: API key for authentication (optional)
api_secret: API secret for authentication (optional)
"""
super().__init__("kraken", self.WEBSOCKET_URL)
# Authentication credentials (optional)
self.api_key = api_key
self.api_secret = api_secret
# Kraken-specific message handlers
self.message_handlers.update({
'book-10': self._handle_orderbook_update,
'book-25': self._handle_orderbook_update,
'book-100': self._handle_orderbook_update,
'book-500': self._handle_orderbook_update,
'book-1000': self._handle_orderbook_update,
'trade': self._handle_trade_update,
'systemStatus': self._handle_system_status,
'subscriptionStatus': self._handle_subscription_status,
'heartbeat': self._handle_heartbeat
})
# Kraken-specific tracking
self.channel_map = {} # channel_id -> (channel_name, symbol)
self.subscription_ids = {} # symbol -> subscription_id
self.system_status = 'unknown'
logger.info("Kraken connector initialized")
def _get_message_type(self, data: Dict) -> str:
"""
Determine message type from Kraken message data.
Args:
data: Parsed message data
Returns:
str: Message type identifier
"""
# Kraken messages can be arrays or objects
if isinstance(data, list) and len(data) >= 2:
# Data message format: [channelID, data, channelName, pair]
if len(data) >= 4:
channel_name = data[2]
return channel_name
else:
return 'unknown'
elif isinstance(data, dict):
# Status/control messages
if 'event' in data:
return data['event']
elif 'errorMessage' in data:
return 'error'
return 'unknown'
def normalize_symbol(self, symbol: str) -> str:
"""
Normalize symbol to Kraken format.
Args:
symbol: Standard symbol format (e.g., 'BTCUSDT')
Returns:
str: Kraken pair format (e.g., 'XBT/USD')
"""
# Kraken uses different symbol names
symbol_map = {
'BTCUSDT': 'XBT/USD',
'ETHUSDT': 'ETH/USD',
'ADAUSDT': 'ADA/USD',
'DOTUSDT': 'DOT/USD',
'LINKUSDT': 'LINK/USD',
'LTCUSDT': 'LTC/USD',
'XRPUSDT': 'XRP/USD',
'BCHUSDT': 'BCH/USD',
'EOSUSDT': 'EOS/USD',
'XLMUSDT': 'XLM/USD'
}
if symbol.upper() in symbol_map:
return symbol_map[symbol.upper()]
else:
# Generic conversion: BTCUSDT -> BTC/USD
if symbol.endswith('USDT'):
base = symbol[:-4]
return f"{base}/USD"
elif symbol.endswith('USD'):
base = symbol[:-3]
return f"{base}/USD"
else:
# Assume it's already in correct format
return symbol.upper()
def _denormalize_symbol(self, kraken_pair: str) -> str:
"""
Convert Kraken pair back to standard format.
Args:
kraken_pair: Kraken pair format (e.g., 'XBT/USD')
Returns:
str: Standard symbol format (e.g., 'BTCUSDT')
"""
# Reverse mapping
reverse_map = {
'XBT/USD': 'BTCUSDT',
'ETH/USD': 'ETHUSDT',
'ADA/USD': 'ADAUSDT',
'DOT/USD': 'DOTUSDT',
'LINK/USD': 'LINKUSDT',
'LTC/USD': 'LTCUSDT',
'XRP/USD': 'XRPUSDT',
'BCH/USD': 'BCHUSDT',
'EOS/USD': 'EOSUSDT',
'XLM/USD': 'XLMUSDT'
}
if kraken_pair in reverse_map:
return reverse_map[kraken_pair]
else:
# Generic conversion: BTC/USD -> BTCUSDT
if '/' in kraken_pair:
base, quote = kraken_pair.split('/', 1)
if quote == 'USD':
return f"{base}USDT"
else:
return f"{base}{quote}"
return kraken_pair
async def subscribe_orderbook(self, symbol: str) -> None:
"""
Subscribe to order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
kraken_pair = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"event": "subscribe",
"pair": [kraken_pair],
"subscription": {
"name": "book",
"depth": 25 # 25 levels
}
}
# Add authentication if credentials provided
if self.api_key and self.api_secret:
subscription_msg["subscription"]["token"] = self._get_auth_token()
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'orderbook' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('orderbook')
logger.info(f"Subscribed to order book for {symbol} ({kraken_pair}) on Kraken")
else:
logger.error(f"Failed to subscribe to order book for {symbol} on Kraken")
except Exception as e:
logger.error(f"Error subscribing to order book for {symbol}: {e}")
raise
async def subscribe_trades(self, symbol: str) -> None:
"""
Subscribe to trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
set_correlation_id()
kraken_pair = self.normalize_symbol(symbol)
# Create subscription message
subscription_msg = {
"event": "subscribe",
"pair": [kraken_pair],
"subscription": {
"name": "trade"
}
}
# Add authentication if credentials provided
if self.api_key and self.api_secret:
subscription_msg["subscription"]["token"] = self._get_auth_token()
# Send subscription
success = await self._send_message(subscription_msg)
if success:
# Track subscription
if symbol not in self.subscriptions:
self.subscriptions[symbol] = []
if 'trades' not in self.subscriptions[symbol]:
self.subscriptions[symbol].append('trades')
logger.info(f"Subscribed to trades for {symbol} ({kraken_pair}) on Kraken")
else:
logger.error(f"Failed to subscribe to trades for {symbol} on Kraken")
except Exception as e:
logger.error(f"Error subscribing to trades for {symbol}: {e}")
raise
async def unsubscribe_orderbook(self, symbol: str) -> None:
"""
Unsubscribe from order book updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
kraken_pair = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"event": "unsubscribe",
"pair": [kraken_pair],
"subscription": {
"name": "book"
}
}
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('orderbook')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
logger.info(f"Unsubscribed from order book for {symbol} ({kraken_pair}) on Kraken")
else:
logger.error(f"Failed to unsubscribe from order book for {symbol} on Kraken")
except Exception as e:
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
raise
async def unsubscribe_trades(self, symbol: str) -> None:
"""
Unsubscribe from trade updates for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
"""
try:
kraken_pair = self.normalize_symbol(symbol)
# Create unsubscription message
unsubscription_msg = {
"event": "unsubscribe",
"pair": [kraken_pair],
"subscription": {
"name": "trade"
}
}
# Send unsubscription
success = await self._send_message(unsubscription_msg)
if success:
# Remove from tracking
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
self.subscriptions[symbol].remove('trades')
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
logger.info(f"Unsubscribed from trades for {symbol} ({kraken_pair}) on Kraken")
else:
logger.error(f"Failed to unsubscribe from trades for {symbol} on Kraken")
except Exception as e:
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
raise
async def get_symbols(self) -> List[str]:
"""
Get list of available trading symbols from Kraken.
Returns:
List[str]: List of available symbols in standard format
"""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.API_URL}/0/public/AssetPairs") as response:
if response.status == 200:
data = await response.json()
if data.get('error'):
logger.error(f"Kraken API error: {data['error']}")
return []
symbols = []
pairs = data.get('result', {})
for pair_name, pair_info in pairs.items():
# Skip dark pool pairs
if '.d' in pair_name:
continue
# Get the WebSocket pair name
ws_name = pair_info.get('wsname')
if ws_name:
# Convert to standard format
standard_symbol = self._denormalize_symbol(ws_name)
symbols.append(standard_symbol)
logger.info(f"Retrieved {len(symbols)} symbols from Kraken")
return symbols
else:
logger.error(f"Failed to get symbols from Kraken: HTTP {response.status}")
return []
except Exception as e:
logger.error(f"Error getting symbols from Kraken: {e}")
return []
async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]:
"""
Get current order book snapshot from Kraken REST API.
Args:
symbol: Trading symbol
depth: Number of price levels to retrieve
Returns:
OrderBookSnapshot: Current order book or None if unavailable
"""
try:
import aiohttp
kraken_pair = self.normalize_symbol(symbol)
url = f"{self.API_URL}/0/public/Depth"
params = {
'pair': kraken_pair,
'count': min(depth, 500) # Kraken max is 500
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if data.get('error'):
logger.error(f"Kraken API error: {data['error']}")
return None
result = data.get('result', {})
# Kraken returns data with the actual pair name as key
pair_data = None
for key, value in result.items():
if isinstance(value, dict) and 'bids' in value and 'asks' in value:
pair_data = value
break
if pair_data:
return self._parse_orderbook_snapshot(pair_data, symbol)
else:
logger.error(f"No order book data found for {symbol}")
return None
else:
logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}")
return None
except Exception as e:
logger.error(f"Error getting order book snapshot for {symbol}: {e}")
return None
def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot:
"""
Parse Kraken order book data into OrderBookSnapshot.
Args:
data: Raw Kraken order book data
symbol: Trading symbol
Returns:
OrderBookSnapshot: Parsed order book
"""
try:
# Parse bids and asks
bids = []
for bid_data in data.get('bids', []):
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
asks = []
for ask_data in data.get('asks', []):
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks
)
return orderbook
except Exception as e:
logger.error(f"Error parsing order book snapshot: {e}")
raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR")
async def _handle_orderbook_update(self, data: List) -> None:
"""
Handle order book update from Kraken.
Args:
data: Order book update data (Kraken array format)
"""
try:
set_correlation_id()
# Kraken format: [channelID, data, channelName, pair]
if len(data) < 4:
logger.warning("Invalid Kraken order book update format")
return
channel_id = data[0]
book_data = data[1]
channel_name = data[2]
kraken_pair = data[3]
symbol = self._denormalize_symbol(kraken_pair)
# Track channel mapping
self.channel_map[channel_id] = (channel_name, symbol)
# Parse order book data
bids = []
asks = []
# Kraken book data can have 'b' (bids), 'a' (asks), 'bs' (bid snapshot), 'as' (ask snapshot)
if 'b' in book_data:
for bid_data in book_data['b']:
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
if 'bs' in book_data: # Bid snapshot
for bid_data in book_data['bs']:
price = float(bid_data[0])
size = float(bid_data[1])
if validate_price(price) and validate_volume(size):
bids.append(PriceLevel(price=price, size=size))
if 'a' in book_data:
for ask_data in book_data['a']:
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
if 'as' in book_data: # Ask snapshot
for ask_data in book_data['as']:
price = float(ask_data[0])
size = float(ask_data[1])
if validate_price(price) and validate_volume(size):
asks.append(PriceLevel(price=price, size=size))
# Create order book snapshot
orderbook = OrderBookSnapshot(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.now(timezone.utc),
bids=bids,
asks=asks
)
# Notify callbacks
self._notify_data_callbacks(orderbook)
logger.debug(f"Processed order book update for {symbol}")
except Exception as e:
logger.error(f"Error handling order book update: {e}")
async def _handle_trade_update(self, data: List) -> None:
"""
Handle trade update from Kraken.
Args:
data: Trade update data (Kraken array format)
"""
try:
set_correlation_id()
# Kraken format: [channelID, data, channelName, pair]
if len(data) < 4:
logger.warning("Invalid Kraken trade update format")
return
channel_id = data[0]
trade_data = data[1]
channel_name = data[2]
kraken_pair = data[3]
symbol = self._denormalize_symbol(kraken_pair)
# Track channel mapping
self.channel_map[channel_id] = (channel_name, symbol)
# Process trade data (array of trades)
for trade_info in trade_data:
if len(trade_info) >= 6:
price = float(trade_info[0])
size = float(trade_info[1])
timestamp = float(trade_info[2])
side = trade_info[3] # 'b' for buy, 's' for sell
order_type = trade_info[4] # 'm' for market, 'l' for limit
misc = trade_info[5] if len(trade_info) > 5 else ''
# Validate data
if not validate_price(price) or not validate_volume(size):
logger.warning(f"Invalid trade data: price={price}, size={size}")
continue
# Convert side
trade_side = 'buy' if side == 'b' else 'sell'
# Create trade event
trade = TradeEvent(
symbol=symbol,
exchange=self.exchange_name,
timestamp=datetime.fromtimestamp(timestamp, tz=timezone.utc),
price=price,
size=size,
side=trade_side,
trade_id=f"{timestamp}_{price}_{size}" # Generate ID
)
# Notify callbacks
self._notify_data_callbacks(trade)
logger.debug(f"Processed trade for {symbol}: {trade_side} {size} @ {price}")
except Exception as e:
logger.error(f"Error handling trade update: {e}")
async def _handle_system_status(self, data: Dict) -> None:
"""
Handle system status message from Kraken.
Args:
data: System status data
"""
try:
status = data.get('status', 'unknown')
version = data.get('version', 'unknown')
self.system_status = status
logger.info(f"Kraken system status: {status} (version: {version})")
if status != 'online':
logger.warning(f"Kraken system not online: {status}")
except Exception as e:
logger.error(f"Error handling system status: {e}")
async def _handle_subscription_status(self, data: Dict) -> None:
"""
Handle subscription status message from Kraken.
Args:
data: Subscription status data
"""
try:
status = data.get('status', 'unknown')
channel_name = data.get('channelName', 'unknown')
pair = data.get('pair', 'unknown')
subscription = data.get('subscription', {})
if status == 'subscribed':
logger.info(f"Kraken subscription confirmed: {channel_name} for {pair}")
# Store subscription ID if provided
if 'channelID' in data:
channel_id = data['channelID']
symbol = self._denormalize_symbol(pair)
self.channel_map[channel_id] = (channel_name, symbol)
elif status == 'unsubscribed':
logger.info(f"Kraken unsubscription confirmed: {channel_name} for {pair}")
elif status == 'error':
error_message = data.get('errorMessage', 'Unknown error')
logger.error(f"Kraken subscription error: {error_message}")
except Exception as e:
logger.error(f"Error handling subscription status: {e}")
async def _handle_heartbeat(self, data: Dict) -> None:
"""
Handle heartbeat message from Kraken.
Args:
data: Heartbeat data
"""
logger.debug("Received Kraken heartbeat")
def _get_auth_token(self) -> str:
"""
Generate authentication token for Kraken WebSocket.
Returns:
str: Authentication token
"""
if not self.api_key or not self.api_secret:
return ""
try:
# This is a simplified version - actual Kraken auth is more complex
# and requires getting a token from the REST API first
nonce = str(int(time.time() * 1000))
message = nonce + self.api_key
signature = hmac.new(
base64.b64decode(self.api_secret),
message.encode('utf-8'),
hashlib.sha512
).hexdigest()
return f"{self.api_key}:{signature}:{nonce}"
except Exception as e:
logger.error(f"Error generating auth token: {e}")
return ""
def get_kraken_stats(self) -> Dict[str, Any]:
"""Get Kraken-specific statistics."""
base_stats = self.get_stats()
kraken_stats = {
'system_status': self.system_status,
'channel_mappings': len(self.channel_map),
'authenticated': bool(self.api_key and self.api_secret)
}
base_stats.update(kraken_stats)
return base_stats