diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index b05e532..13f7d06 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -135,9 +135,14 @@ + - Write integration tests with existing orchestrator code - _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_ + + + + - [x] 12. Add additional exchange connectors (Coinbase, Kraken) - Implement Coinbase Pro WebSocket connector with proper authentication - Create Kraken WebSocket connector with their specific message format diff --git a/COBY/aggregation/aggregation_engine.py b/COBY/aggregation/aggregation_engine.py index b3e87c4..304afcd 100644 --- a/COBY/aggregation/aggregation_engine.py +++ b/COBY/aggregation/aggregation_engine.py @@ -143,9 +143,9 @@ class StandardAggregationEngine(AggregationEngine): except Exception as e: logger.error(f"Error aggregating across exchanges: {e}") - raise AggregationError(f"Cross-exchange aggregation failed: {e}", "CONSOLIDATION_ERROR") - - def calculate_volume_weighted_price(self, orderbooks: List[OrderBookSnapshot]) -> float: + raise AggregationError(f"Cross-exchange aggregation failed: {e}", "CONSOLIDATION_ERROR") + + def calculate_volume_weighted_price(self, orderbooks: List[OrderBookSnapshot]) -> float: """ Calculate volume-weighted average price across exchanges. diff --git a/COBY/aggregation/price_bucketer.py b/COBY/aggregation/price_bucketer.py index 962e45b..ff4382a 100644 --- a/COBY/aggregation/price_bucketer.py +++ b/COBY/aggregation/price_bucketer.py @@ -118,6 +118,18 @@ class PriceBucketer: logger.debug(f"Aggregated {len(bucket_list)} bucket sets") return aggregated + def get_bucket_price(self, price: float) -> float: + """ + Get the bucket price for a given price. + + Args: + price: Original price + + Returns: + float: Bucket price (rounded to bucket boundaries) + """ + return math.floor(price / self.bucket_size) * self.bucket_size + def get_bucket_range(self, center_price: float, depth: int) -> Tuple[float, float]: """ Get price range for buckets around a center price. diff --git a/COBY/config.py b/COBY/config.py index 55b2edc..75bfe4b 100644 --- a/COBY/config.py +++ b/COBY/config.py @@ -111,11 +111,9 @@ class Config: log_dir = Path(self.logging.file_path).parent log_dir.mkdir(parents=True, exist_ok=True) - # Validate bucket sizes - if self.aggregation.btc_bucket_size <= 0: - raise ValueError("BTC bucket size must be positive") - if self.aggregation.eth_bucket_size <= 0: - raise ValueError("ETH bucket size must be positive") + # Validate bucket size + if self.aggregation.bucket_size <= 0: + raise ValueError("Bucket size must be positive") def get_bucket_size(self, symbol: str = None) -> float: """Get bucket size (now universal $1 for all symbols)""" diff --git a/COBY/connectors/base_connector.py b/COBY/connectors/base_connector.py index 3cefb92..e8f4268 100644 --- a/COBY/connectors/base_connector.py +++ b/COBY/connectors/base_connector.py @@ -10,6 +10,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional, Callable, Any from datetime import datetime, timedelta from ..models.core import ConnectionStatus, OrderBookSnapshot, TradeEvent +from ..interfaces.exchange_connector import ExchangeConnector from ..utils.logging import get_logger, set_correlation_id from ..utils.exceptions import ConnectionError, ValidationError from ..utils.timing import get_current_timestamp diff --git a/COBY/processing/quality_checker.py b/COBY/processing/quality_checker.py index 619c6a6..92c6fab 100644 --- a/COBY/processing/quality_checker.py +++ b/COBY/processing/quality_checker.py @@ -94,8 +94,9 @@ class DataQualityChecker: if issues: logger.debug(f"Order book quality issues for {orderbook.symbol}@{orderbook.exchange}: {issues}") - return quality_score, issues de -f check_trade_quality(self, trade: TradeEvent) -> Tuple[float, List[str]]: + return quality_score, issues + + def check_trade_quality(self, trade: TradeEvent) -> Tuple[float, List[str]]: """ Check trade data quality. @@ -197,8 +198,9 @@ f check_trade_quality(self, trade: TradeEvent) -> Tuple[float, List[str]]: if orderbook.bids[0].price >= orderbook.asks[0].price: issues.append("Best bid >= best ask (crossed book)") - return issues def - _check_orderbook_volumes(self, orderbook: OrderBookSnapshot) -> List[str]: + return issues + + def _check_orderbook_volumes(self, orderbook: OrderBookSnapshot) -> List[str]: """Check order book volume validity""" issues = [] diff --git a/test_connectors_simple.py b/test_connectors_simple.py deleted file mode 100644 index bc66c8a..0000000 --- a/test_connectors_simple.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/env python3 -""" -Simple test to verify all 10 exchange connectors are properly implemented. -""" - -import sys -import os - -# Add the current directory to Python path -sys.path.insert(0, os.path.abspath('.')) - -def test_all_connectors(): - """Test that all 10 exchange connectors can be imported and initialized.""" - - print("=== Testing All 10 Exchange Connectors ===\n") - - connectors = {} - - try: - # Import all connectors - from COBY.connectors.binance_connector import BinanceConnector - from COBY.connectors.coinbase_connector import CoinbaseConnector - from COBY.connectors.kraken_connector import KrakenConnector - from COBY.connectors.bybit_connector import BybitConnector - from COBY.connectors.okx_connector import OKXConnector - from COBY.connectors.huobi_connector import HuobiConnector - from COBY.connectors.kucoin_connector import KuCoinConnector - from COBY.connectors.gateio_connector import GateIOConnector - from COBY.connectors.bitfinex_connector import BitfinexConnector - from COBY.connectors.mexc_connector import MEXCConnector - - # Initialize all connectors - connectors = { - 'binance': BinanceConnector(), - 'coinbase': CoinbaseConnector(use_sandbox=True), - 'kraken': KrakenConnector(), - 'bybit': BybitConnector(use_testnet=True), - 'okx': OKXConnector(use_demo=True), - 'huobi': HuobiConnector(), - 'kucoin': KuCoinConnector(use_sandbox=True), - 'gateio': GateIOConnector(use_testnet=True), - 'bitfinex': BitfinexConnector(), - 'mexc': MEXCConnector() - } - - print("✅ All connectors imported successfully!\n") - - except Exception as e: - print(f"❌ Failed to import connectors: {e}") - return False - - # Test each connector - success_count = 0 - total_count = len(connectors) - - for name, connector in connectors.items(): - try: - print(f"Testing {name.upper()} connector:") - - # Test basic properties - assert connector.exchange_name == name - assert hasattr(connector, 'websocket_url') - assert hasattr(connector, 'message_handlers') - assert hasattr(connector, 'subscriptions') - print(f" ✓ Basic properties: OK") - - # Test symbol normalization - btc_symbol = connector.normalize_symbol('BTCUSDT') - eth_symbol = connector.normalize_symbol('ETHUSDT') - print(f" ✓ Symbol normalization: BTCUSDT -> {btc_symbol}, ETHUSDT -> {eth_symbol}") - - # Test required methods exist - required_methods = [ - 'connect', 'disconnect', 'subscribe_orderbook', 'subscribe_trades', - 'unsubscribe_orderbook', 'unsubscribe_trades', 'get_symbols', - 'get_orderbook_snapshot', 'get_connection_status' - ] - - for method in required_methods: - assert hasattr(connector, method), f"Missing method: {method}" - assert callable(getattr(connector, method)), f"Method not callable: {method}" - print(f" ✓ Required methods: All {len(required_methods)} methods present") - - # Test statistics - stats = connector.get_stats() - assert isinstance(stats, dict) - assert 'exchange' in stats - assert stats['exchange'] == name - print(f" ✓ Statistics: {len(stats)} fields") - - # Test connection status - status = connector.get_connection_status() - print(f" ✓ Connection status: {status.value}") - - print(f" ✅ {name.upper()} connector: ALL TESTS PASSED\n") - success_count += 1 - - except Exception as e: - print(f" ❌ {name.upper()} connector: FAILED - {e}\n") - - print(f"=== SUMMARY ===") - print(f"Total connectors: {total_count}") - print(f"Successful: {success_count}") - print(f"Failed: {total_count - success_count}") - - if success_count == total_count: - print("🎉 ALL 10 EXCHANGE CONNECTORS WORKING PERFECTLY!") - return True - else: - print(f"⚠️ {total_count - success_count} connectors need attention") - return False - -if __name__ == "__main__": - success = test_all_connectors() - sys.exit(0 if success else 1) \ No newline at end of file