From 3bbfde5d2b6a55c5c4cac404bc7e2c84027b1cc3 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 5 Aug 2025 00:13:38 +0300 Subject: [PATCH] 14. finishing connectors --- .../multi-exchange-data-aggregation/tasks.md | 1 + COBY/connectors/base_connector.py | 2 +- COBY/connectors/bitfinex_connector.py | 194 +++++++++++++++++- COBY/connectors/mexc_connector.py | 150 +++++++++++++- COBY/tests/test_all_connectors.py | 18 +- test_connectors_simple.py | 115 +++++++++++ 6 files changed, 465 insertions(+), 15 deletions(-) create mode 100644 test_connectors_simple.py diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index 737ce82..b05e532 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -134,6 +134,7 @@ + - Write integration tests with existing orchestrator code - _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_ diff --git a/COBY/connectors/base_connector.py b/COBY/connectors/base_connector.py index a2fb633..3cefb92 100644 --- a/COBY/connectors/base_connector.py +++ b/COBY/connectors/base_connector.py @@ -9,7 +9,7 @@ import json from abc import ABC, abstractmethod from typing import Dict, List, Optional, Callable, Any from datetime import datetime, timedelta -from enmodels.core import ConnectionStatus, OrderBookSnapshot, TradeEvent +from ..models.core import ConnectionStatus, OrderBookSnapshot, TradeEvent from ..utils.logging import get_logger, set_correlation_id from ..utils.exceptions import ConnectionError, ValidationError from ..utils.timing import get_current_timestamp diff --git a/COBY/connectors/bitfinex_connector.py b/COBY/connectors/bitfinex_connector.py index 24d7812..acad71a 100644 --- a/COBY/connectors/bitfinex_connector.py +++ b/COBY/connectors/bitfinex_connector.py @@ -43,7 +43,8 @@ class BitfinexConnector(BaseExchangeConnector): 'subscribed': self._handle_subscription_response, 'unsubscribed': self._handle_unsubscription_response, 'error': self._handle_error_message, - 'info': self._handle_info_message + 'info': self._handle_info_message, + 'data': self._handle_data_message }) # Channel management @@ -150,13 +151,75 @@ class BitfinexConnector(BaseExchangeConnector): async def unsubscribe_orderbook(self, symbol: str) -> None: """Unsubscribe from order book updates.""" - # Implementation would find the channel ID and send unsubscribe message - pass + try: + bitfinex_symbol = self.normalize_symbol(symbol) + + # Find channel ID for this symbol's order book + channel_id = None + for cid, info in self.channels.items(): + if info.get('channel') == 'book' and info.get('symbol') == bitfinex_symbol: + channel_id = cid + break + + if channel_id: + unsubscription_msg = { + "event": "unsubscribe", + "chanId": channel_id + } + + success = await self._send_message(unsubscription_msg) + if success: + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_symbols.discard(bitfinex_symbol) + logger.info(f"Unsubscribed from order book for {symbol} on Bitfinex") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on Bitfinex") + else: + logger.warning(f"No active order book subscription found for {symbol}") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise async def unsubscribe_trades(self, symbol: str) -> None: """Unsubscribe from trade updates.""" - # Implementation would find the channel ID and send unsubscribe message - pass + try: + bitfinex_symbol = self.normalize_symbol(symbol) + + # Find channel ID for this symbol's trades + channel_id = None + for cid, info in self.channels.items(): + if info.get('channel') == 'trades' and info.get('symbol') == bitfinex_symbol: + channel_id = cid + break + + if channel_id: + unsubscription_msg = { + "event": "unsubscribe", + "chanId": channel_id + } + + success = await self._send_message(unsubscription_msg) + if success: + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + self.subscribed_symbols.discard(bitfinex_symbol) + logger.info(f"Unsubscribed from trades for {symbol} on Bitfinex") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on Bitfinex") + else: + logger.warning(f"No active trades subscription found for {symbol}") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise async def get_symbols(self) -> List[str]: """Get available symbols from Bitfinex.""" @@ -256,6 +319,127 @@ class BitfinexConnector(BaseExchangeConnector): """Handle info message.""" logger.info(f"Bitfinex info: {data}") + async def _handle_data_message(self, data: List) -> None: + """Handle data message from Bitfinex.""" + try: + if len(data) < 2: + return + + channel_id = data[0] + message_data = data[1] + + if channel_id not in self.channels: + logger.warning(f"Received data for unknown channel: {channel_id}") + return + + channel_info = self.channels[channel_id] + channel_type = channel_info.get('channel') + symbol = channel_info.get('symbol', '') + + if channel_type == 'book': + await self._handle_orderbook_data(message_data, symbol) + elif channel_type == 'trades': + await self._handle_trades_data(message_data, symbol) + + except Exception as e: + logger.error(f"Error handling data message: {e}") + + async def _handle_orderbook_data(self, data, symbol: str) -> None: + """Handle order book data from Bitfinex.""" + try: + set_correlation_id() + + if not isinstance(data, list): + return + + standard_symbol = self._denormalize_symbol(symbol) + + # Handle snapshot vs update + if len(data) > 0 and isinstance(data[0], list): + # Snapshot - array of [price, count, amount] + bids = [] + asks = [] + + for level in data: + if len(level) >= 3: + price = float(level[0]) + count = int(level[1]) + amount = float(level[2]) + + if validate_price(price) and validate_volume(abs(amount)): + if amount > 0: + bids.append(PriceLevel(price=price, size=amount)) + else: + asks.append(PriceLevel(price=price, size=abs(amount))) + + orderbook = OrderBookSnapshot( + symbol=standard_symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks + ) + + self._notify_data_callbacks(orderbook) + logger.debug(f"Processed order book snapshot for {standard_symbol}") + + except Exception as e: + logger.error(f"Error handling order book data: {e}") + + async def _handle_trades_data(self, data, symbol: str) -> None: + """Handle trades data from Bitfinex.""" + try: + set_correlation_id() + + if not isinstance(data, list): + return + + standard_symbol = self._denormalize_symbol(symbol) + + # Handle snapshot vs update + if len(data) > 0 and isinstance(data[0], list): + # Snapshot - array of trades + for trade_data in data: + await self._process_single_trade(trade_data, standard_symbol) + elif len(data) >= 4: + # Single trade update + await self._process_single_trade(data, standard_symbol) + + except Exception as e: + logger.error(f"Error handling trades data: {e}") + + async def _process_single_trade(self, trade_data: List, symbol: str) -> None: + """Process a single trade from Bitfinex.""" + try: + if len(trade_data) < 4: + return + + trade_id = str(trade_data[0]) + timestamp = int(trade_data[1]) / 1000 # Convert to seconds + amount = float(trade_data[2]) + price = float(trade_data[3]) + + if not validate_price(price) or not validate_volume(abs(amount)): + return + + side = 'buy' if amount > 0 else 'sell' + + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(timestamp, tz=timezone.utc), + price=price, + size=abs(amount), + side=side, + trade_id=trade_id + ) + + self._notify_data_callbacks(trade) + logger.debug(f"Processed trade for {symbol}: {side} {abs(amount)} @ {price}") + + except Exception as e: + logger.error(f"Error processing single trade: {e}") + def get_bitfinex_stats(self) -> Dict[str, Any]: """Get Bitfinex-specific statistics.""" base_stats = self.get_stats() diff --git a/COBY/connectors/mexc_connector.py b/COBY/connectors/mexc_connector.py index b0396c3..94f0657 100644 --- a/COBY/connectors/mexc_connector.py +++ b/COBY/connectors/mexc_connector.py @@ -251,23 +251,161 @@ class MEXCConnector(BaseExchangeConnector): async def _handle_orderbook_update(self, data: Dict) -> None: """Handle order book update from MEXC.""" - # Implementation would parse MEXC-specific order book update format - logger.debug("Received MEXC order book update") + try: + set_correlation_id() + + symbol_data = data.get('s', '') # Symbol + if not symbol_data: + logger.warning("Order book update missing symbol") + return + + symbol = symbol_data # Already in standard format + order_data = data.get('d', {}) + + # Parse bids and asks + bids = [] + for bid_data in order_data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in order_data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(data.get('t', 0)) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=order_data.get('lastUpdateId') + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") async def _handle_orderbook_snapshot(self, data: Dict) -> None: """Handle order book snapshot from MEXC.""" - # Implementation would parse MEXC-specific order book snapshot format - logger.debug("Received MEXC order book snapshot") + try: + set_correlation_id() + + symbol_data = data.get('s', '') # Symbol + if not symbol_data: + logger.warning("Order book snapshot missing symbol") + return + + symbol = symbol_data # Already in standard format + order_data = data.get('d', {}) + + # Parse bids and asks + bids = [] + for bid_data in order_data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in order_data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(data.get('t', 0)) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=order_data.get('lastUpdateId') + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book snapshot for {symbol}") + + except Exception as e: + logger.error(f"Error handling order book snapshot: {e}") async def _handle_trade_update(self, data: Dict) -> None: """Handle trade update from MEXC.""" - # Implementation would parse MEXC-specific trade format - logger.debug("Received MEXC trade update") + try: + set_correlation_id() + + symbol_data = data.get('s', '') # Symbol + if not symbol_data: + logger.warning("Trade update missing symbol") + return + + symbol = symbol_data # Already in standard format + trade_data = data.get('d', {}) + + # MEXC trade data format + trades = trade_data.get('deals', []) + + for trade_info in trades: + price = float(trade_info.get('p', 0)) + quantity = float(trade_info.get('v', 0)) + + # Validate data + if not validate_price(price) or not validate_volume(quantity): + logger.warning(f"Invalid trade data: price={price}, quantity={quantity}") + continue + + # Determine side (MEXC uses 'S' field: 1=buy, 2=sell) + side_code = trade_info.get('S', 0) + side = 'buy' if side_code == 1 else 'sell' + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(int(trade_info.get('t', 0)) / 1000, tz=timezone.utc), + price=price, + size=quantity, + side=side, + trade_id=str(trade_info.get('i', '')) + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {side} {quantity} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") async def _handle_pong(self, data: Dict) -> None: """Handle pong response from MEXC.""" logger.debug("Received MEXC pong") + async def _send_ping(self) -> None: + """Send ping to keep connection alive.""" + try: + ping_msg = {"method": "PING"} + await self._send_message(ping_msg) + logger.debug("Sent ping to MEXC") + except Exception as e: + logger.error(f"Error sending ping: {e}") + def get_mexc_stats(self) -> Dict[str, Any]: """Get MEXC-specific statistics.""" base_stats = self.get_stats() diff --git a/COBY/tests/test_all_connectors.py b/COBY/tests/test_all_connectors.py index b82dcd1..49d9960 100644 --- a/COBY/tests/test_all_connectors.py +++ b/COBY/tests/test_all_connectors.py @@ -13,6 +13,10 @@ from ..connectors.kraken_connector import KrakenConnector from ..connectors.bybit_connector import BybitConnector from ..connectors.okx_connector import OKXConnector from ..connectors.huobi_connector import HuobiConnector +from ..connectors.kucoin_connector import KuCoinConnector +from ..connectors.gateio_connector import GateIOConnector +from ..connectors.bitfinex_connector import BitfinexConnector +from ..connectors.mexc_connector import MEXCConnector class TestAllConnectors: @@ -27,12 +31,16 @@ class TestAllConnectors: 'kraken': KrakenConnector(), 'bybit': BybitConnector(use_testnet=True), 'okx': OKXConnector(use_demo=True), - 'huobi': HuobiConnector() + 'huobi': HuobiConnector(), + 'kucoin': KuCoinConnector(use_sandbox=True), + 'gateio': GateIOConnector(use_testnet=True), + 'bitfinex': BitfinexConnector(), + 'mexc': MEXCConnector() } def test_all_connectors_initialization(self, all_connectors): """Test that all connectors initialize correctly.""" - expected_names = ['binance', 'coinbase', 'kraken', 'bybit', 'okx', 'huobi'] + expected_names = ['binance', 'coinbase', 'kraken', 'bybit', 'okx', 'huobi', 'kucoin', 'gateio', 'bitfinex', 'mexc'] for name, connector in all_connectors.items(): assert connector.exchange_name == name @@ -184,7 +192,11 @@ async def test_connector_compatibility(): 'kraken': KrakenConnector(), 'bybit': BybitConnector(use_testnet=True), 'okx': OKXConnector(use_demo=True), - 'huobi': HuobiConnector() + 'huobi': HuobiConnector(), + 'kucoin': KuCoinConnector(use_sandbox=True), + 'gateio': GateIOConnector(use_testnet=True), + 'bitfinex': BitfinexConnector(), + 'mexc': MEXCConnector() } # Test basic functionality diff --git a/test_connectors_simple.py b/test_connectors_simple.py new file mode 100644 index 0000000..bc66c8a --- /dev/null +++ b/test_connectors_simple.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Simple test to verify all 10 exchange connectors are properly implemented. +""" + +import sys +import os + +# Add the current directory to Python path +sys.path.insert(0, os.path.abspath('.')) + +def test_all_connectors(): + """Test that all 10 exchange connectors can be imported and initialized.""" + + print("=== Testing All 10 Exchange Connectors ===\n") + + connectors = {} + + try: + # Import all connectors + from COBY.connectors.binance_connector import BinanceConnector + from COBY.connectors.coinbase_connector import CoinbaseConnector + from COBY.connectors.kraken_connector import KrakenConnector + from COBY.connectors.bybit_connector import BybitConnector + from COBY.connectors.okx_connector import OKXConnector + from COBY.connectors.huobi_connector import HuobiConnector + from COBY.connectors.kucoin_connector import KuCoinConnector + from COBY.connectors.gateio_connector import GateIOConnector + from COBY.connectors.bitfinex_connector import BitfinexConnector + from COBY.connectors.mexc_connector import MEXCConnector + + # Initialize all connectors + connectors = { + 'binance': BinanceConnector(), + 'coinbase': CoinbaseConnector(use_sandbox=True), + 'kraken': KrakenConnector(), + 'bybit': BybitConnector(use_testnet=True), + 'okx': OKXConnector(use_demo=True), + 'huobi': HuobiConnector(), + 'kucoin': KuCoinConnector(use_sandbox=True), + 'gateio': GateIOConnector(use_testnet=True), + 'bitfinex': BitfinexConnector(), + 'mexc': MEXCConnector() + } + + print("✅ All connectors imported successfully!\n") + + except Exception as e: + print(f"❌ Failed to import connectors: {e}") + return False + + # Test each connector + success_count = 0 + total_count = len(connectors) + + for name, connector in connectors.items(): + try: + print(f"Testing {name.upper()} connector:") + + # Test basic properties + assert connector.exchange_name == name + assert hasattr(connector, 'websocket_url') + assert hasattr(connector, 'message_handlers') + assert hasattr(connector, 'subscriptions') + print(f" ✓ Basic properties: OK") + + # Test symbol normalization + btc_symbol = connector.normalize_symbol('BTCUSDT') + eth_symbol = connector.normalize_symbol('ETHUSDT') + print(f" ✓ Symbol normalization: BTCUSDT -> {btc_symbol}, ETHUSDT -> {eth_symbol}") + + # Test required methods exist + required_methods = [ + 'connect', 'disconnect', 'subscribe_orderbook', 'subscribe_trades', + 'unsubscribe_orderbook', 'unsubscribe_trades', 'get_symbols', + 'get_orderbook_snapshot', 'get_connection_status' + ] + + for method in required_methods: + assert hasattr(connector, method), f"Missing method: {method}" + assert callable(getattr(connector, method)), f"Method not callable: {method}" + print(f" ✓ Required methods: All {len(required_methods)} methods present") + + # Test statistics + stats = connector.get_stats() + assert isinstance(stats, dict) + assert 'exchange' in stats + assert stats['exchange'] == name + print(f" ✓ Statistics: {len(stats)} fields") + + # Test connection status + status = connector.get_connection_status() + print(f" ✓ Connection status: {status.value}") + + print(f" ✅ {name.upper()} connector: ALL TESTS PASSED\n") + success_count += 1 + + except Exception as e: + print(f" ❌ {name.upper()} connector: FAILED - {e}\n") + + print(f"=== SUMMARY ===") + print(f"Total connectors: {total_count}") + print(f"Successful: {success_count}") + print(f"Failed: {total_count - success_count}") + + if success_count == total_count: + print("🎉 ALL 10 EXCHANGE CONNECTORS WORKING PERFECTLY!") + return True + else: + print(f"⚠️ {total_count - success_count} connectors need attention") + return False + +if __name__ == "__main__": + success = test_all_connectors() + sys.exit(0 if success else 1) \ No newline at end of file