From de77b0afa8bcc7f6f77317422f143e79525f2fd1 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 4 Aug 2025 17:28:55 +0300 Subject: [PATCH] bucket aggregation --- .../multi-exchange-data-aggregation/tasks.md | 5 + COBY/connectors/binance_connector.py | 489 ++++++++++++++++++ COBY/examples/binance_example.py | 168 ++++++ COBY/processing/__init__.py | 15 + COBY/processing/anomaly_detector.py | 329 ++++++++++++ COBY/processing/data_processor.py | 378 ++++++++++++++ COBY/processing/metrics_calculator.py | 275 ++++++++++ COBY/processing/quality_checker.py | 288 +++++++++++ COBY/tests/test_binance_connector.py | 341 ++++++++++++ COBY/tests/test_data_processor.py | 304 +++++++++++ 10 files changed, 2592 insertions(+) create mode 100644 COBY/connectors/binance_connector.py create mode 100644 COBY/examples/binance_example.py create mode 100644 COBY/processing/__init__.py create mode 100644 COBY/processing/anomaly_detector.py create mode 100644 COBY/processing/data_processor.py create mode 100644 COBY/processing/metrics_calculator.py create mode 100644 COBY/processing/quality_checker.py create mode 100644 COBY/tests/test_binance_connector.py create mode 100644 COBY/tests/test_data_processor.py diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index a325052..d9ee10c 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -23,10 +23,15 @@ - _Requirements: 3.1, 3.2, 3.3, 3.4_ + + - [ ] 3. Create base exchange connector framework - Implement abstract base class for exchange WebSocket connectors - Create connection management with exponential backoff and circuit breaker patterns - Implement WebSocket message handling with proper error recovery + + + - Add connection status monitoring and health checks - _Requirements: 1.1, 1.3, 1.4, 8.5_ diff --git a/COBY/connectors/binance_connector.py b/COBY/connectors/binance_connector.py new file mode 100644 index 0000000..388a019 --- /dev/null +++ b/COBY/connectors/binance_connector.py @@ -0,0 +1,489 @@ +""" +Binance exchange connector implementation. +""" + +import json +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError +from ..utils.validation import validate_symbol, validate_price, validate_volume +from .base_connector import BaseExchangeConnector + +logger = get_logger(__name__) + + +class BinanceConnector(BaseExchangeConnector): + """ + Binance WebSocket connector implementation. + + Supports: + - Order book depth streams + - Trade streams + - Symbol normalization + - Real-time data processing + """ + + # Binance WebSocket URLs + WEBSOCKET_URL = "wss://stream.binance.com:9443/ws" + API_URL = "https://api.binance.com/api/v3" + + def __init__(self): + """Initialize Binance connector""" + super().__init__("binance", self.WEBSOCKET_URL) + + # Binance-specific message handlers + self.message_handlers.update({ + 'depthUpdate': self._handle_orderbook_update, + 'trade': self._handle_trade_update, + 'error': self._handle_error_message + }) + + # Stream management + self.active_streams: List[str] = [] + self.stream_id = 1 + + logger.info("Binance connector initialized") + + def _get_message_type(self, data: Dict) -> str: + """ + Determine message type from Binance message data. + + Args: + data: Parsed message data + + Returns: + str: Message type identifier + """ + # Binance uses 'e' field for event type + if 'e' in data: + return data['e'] + + # Handle error messages + if 'error' in data: + return 'error' + + # Handle subscription confirmations + if 'result' in data and 'id' in data: + return 'subscription_response' + + return 'unknown' + + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to Binance format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: Binance symbol format (e.g., 'BTCUSDT') + """ + # Binance uses uppercase symbols without separators + normalized = symbol.upper().replace('-', '').replace('/', '') + + # Validate symbol format + if not validate_symbol(normalized): + raise ValidationError(f"Invalid symbol format: {symbol}", "INVALID_SYMBOL") + + return normalized + + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book depth updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + normalized_symbol = self.normalize_symbol(symbol) + stream_name = f"{normalized_symbol.lower()}@depth@100ms" + + # Create subscription message + subscription_msg = { + "method": "SUBSCRIBE", + "params": [stream_name], + "id": self.stream_id + } + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'orderbook' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('orderbook') + + self.active_streams.append(stream_name) + self.stream_id += 1 + + logger.info(f"Subscribed to order book for {symbol} on Binance") + else: + logger.error(f"Failed to subscribe to order book for {symbol} on Binance") + + except Exception as e: + logger.error(f"Error subscribing to order book for {symbol}: {e}") + raise + + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + set_correlation_id() + normalized_symbol = self.normalize_symbol(symbol) + stream_name = f"{normalized_symbol.lower()}@trade" + + # Create subscription message + subscription_msg = { + "method": "SUBSCRIBE", + "params": [stream_name], + "id": self.stream_id + } + + # Send subscription + success = await self._send_message(subscription_msg) + if success: + # Track subscription + if symbol not in self.subscriptions: + self.subscriptions[symbol] = [] + if 'trades' not in self.subscriptions[symbol]: + self.subscriptions[symbol].append('trades') + + self.active_streams.append(stream_name) + self.stream_id += 1 + + logger.info(f"Subscribed to trades for {symbol} on Binance") + else: + logger.error(f"Failed to subscribe to trades for {symbol} on Binance") + + except Exception as e: + logger.error(f"Error subscribing to trades for {symbol}: {e}") + raise + + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + normalized_symbol = self.normalize_symbol(symbol) + stream_name = f"{normalized_symbol.lower()}@depth@100ms" + + # Create unsubscription message + unsubscription_msg = { + "method": "UNSUBSCRIBE", + "params": [stream_name], + "id": self.stream_id + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('orderbook') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + if stream_name in self.active_streams: + self.active_streams.remove(stream_name) + + self.stream_id += 1 + logger.info(f"Unsubscribed from order book for {symbol} on Binance") + else: + logger.error(f"Failed to unsubscribe from order book for {symbol} on Binance") + + except Exception as e: + logger.error(f"Error unsubscribing from order book for {symbol}: {e}") + raise + + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + try: + normalized_symbol = self.normalize_symbol(symbol) + stream_name = f"{normalized_symbol.lower()}@trade" + + # Create unsubscription message + unsubscription_msg = { + "method": "UNSUBSCRIBE", + "params": [stream_name], + "id": self.stream_id + } + + # Send unsubscription + success = await self._send_message(unsubscription_msg) + if success: + # Remove from tracking + if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]: + self.subscriptions[symbol].remove('trades') + if not self.subscriptions[symbol]: + del self.subscriptions[symbol] + + if stream_name in self.active_streams: + self.active_streams.remove(stream_name) + + self.stream_id += 1 + logger.info(f"Unsubscribed from trades for {symbol} on Binance") + else: + logger.error(f"Failed to unsubscribe from trades for {symbol} on Binance") + + except Exception as e: + logger.error(f"Error unsubscribing from trades for {symbol}: {e}") + raise + + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols from Binance. + + Returns: + List[str]: List of available symbols + """ + try: + import aiohttp + + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.API_URL}/exchangeInfo") as response: + if response.status == 200: + data = await response.json() + symbols = [ + symbol_info['symbol'] + for symbol_info in data.get('symbols', []) + if symbol_info.get('status') == 'TRADING' + ] + logger.info(f"Retrieved {len(symbols)} symbols from Binance") + return symbols + else: + logger.error(f"Failed to get symbols from Binance: HTTP {response.status}") + return [] + + except Exception as e: + logger.error(f"Error getting symbols from Binance: {e}") + return [] + + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot from Binance REST API. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + try: + import aiohttp + + normalized_symbol = self.normalize_symbol(symbol) + + # Binance supports depths: 5, 10, 20, 50, 100, 500, 1000, 5000 + valid_depths = [5, 10, 20, 50, 100, 500, 1000, 5000] + api_depth = min(valid_depths, key=lambda x: abs(x - depth)) + + url = f"{self.API_URL}/depth" + params = { + 'symbol': normalized_symbol, + 'limit': api_depth + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + return self._parse_orderbook_snapshot(data, symbol) + else: + logger.error(f"Failed to get order book for {symbol}: HTTP {response.status}") + return None + + except Exception as e: + logger.error(f"Error getting order book snapshot for {symbol}: {e}") + return None + + def _parse_orderbook_snapshot(self, data: Dict, symbol: str) -> OrderBookSnapshot: + """ + Parse Binance order book data into OrderBookSnapshot. + + Args: + data: Raw Binance order book data + symbol: Trading symbol + + Returns: + OrderBookSnapshot: Parsed order book + """ + try: + # Parse bids and asks + bids = [] + for bid_data in data.get('bids', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('asks', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.now(timezone.utc), + bids=bids, + asks=asks, + sequence_id=data.get('lastUpdateId') + ) + + return orderbook + + except Exception as e: + logger.error(f"Error parsing order book snapshot: {e}") + raise ValidationError(f"Invalid order book data: {e}", "PARSE_ERROR") + + async def _handle_orderbook_update(self, data: Dict) -> None: + """ + Handle order book depth update from Binance. + + Args: + data: Order book update data + """ + try: + set_correlation_id() + + # Extract symbol from stream name + stream = data.get('s', '').upper() + if not stream: + logger.warning("Order book update missing symbol") + return + + # Parse bids and asks + bids = [] + for bid_data in data.get('b', []): + price = float(bid_data[0]) + size = float(bid_data[1]) + + if validate_price(price) and validate_volume(size): + bids.append(PriceLevel(price=price, size=size)) + + asks = [] + for ask_data in data.get('a', []): + price = float(ask_data[0]) + size = float(ask_data[1]) + + if validate_price(price) and validate_volume(size): + asks.append(PriceLevel(price=price, size=size)) + + # Create order book snapshot + orderbook = OrderBookSnapshot( + symbol=stream, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(data.get('E', 0) / 1000, tz=timezone.utc), + bids=bids, + asks=asks, + sequence_id=data.get('u') # Final update ID + ) + + # Notify callbacks + self._notify_data_callbacks(orderbook) + + logger.debug(f"Processed order book update for {stream}") + + except Exception as e: + logger.error(f"Error handling order book update: {e}") + + async def _handle_trade_update(self, data: Dict) -> None: + """ + Handle trade update from Binance. + + Args: + data: Trade update data + """ + try: + set_correlation_id() + + # Extract trade data + symbol = data.get('s', '').upper() + if not symbol: + logger.warning("Trade update missing symbol") + return + + price = float(data.get('p', 0)) + size = float(data.get('q', 0)) + + # Validate data + if not validate_price(price) or not validate_volume(size): + logger.warning(f"Invalid trade data: price={price}, size={size}") + return + + # Determine side (Binance uses 'm' field - true if buyer is market maker) + is_buyer_maker = data.get('m', False) + side = 'sell' if is_buyer_maker else 'buy' + + # Create trade event + trade = TradeEvent( + symbol=symbol, + exchange=self.exchange_name, + timestamp=datetime.fromtimestamp(data.get('T', 0) / 1000, tz=timezone.utc), + price=price, + size=size, + side=side, + trade_id=str(data.get('t', '')) + ) + + # Notify callbacks + self._notify_data_callbacks(trade) + + logger.debug(f"Processed trade for {symbol}: {side} {size} @ {price}") + + except Exception as e: + logger.error(f"Error handling trade update: {e}") + + async def _handle_error_message(self, data: Dict) -> None: + """ + Handle error message from Binance. + + Args: + data: Error message data + """ + error_code = data.get('code', 'unknown') + error_msg = data.get('msg', 'Unknown error') + + logger.error(f"Binance error {error_code}: {error_msg}") + + # Handle specific error codes + if error_code == -1121: # Invalid symbol + logger.error("Invalid symbol error - check symbol format") + elif error_code == -1130: # Invalid listen key + logger.error("Invalid listen key - may need to reconnect") + + def get_binance_stats(self) -> Dict[str, Any]: + """Get Binance-specific statistics""" + base_stats = self.get_stats() + + binance_stats = { + 'active_streams': len(self.active_streams), + 'stream_list': self.active_streams.copy(), + 'next_stream_id': self.stream_id + } + + base_stats.update(binance_stats) + return base_stats \ No newline at end of file diff --git a/COBY/examples/binance_example.py b/COBY/examples/binance_example.py new file mode 100644 index 0000000..3bcd639 --- /dev/null +++ b/COBY/examples/binance_example.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +Example usage of Binance connector. +""" + +import asyncio +import sys +from pathlib import Path + +# Add COBY to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from connectors.binance_connector import BinanceConnector +from utils.logging import setup_logging, get_logger +from models.core import OrderBookSnapshot, TradeEvent + +# Setup logging +setup_logging(level='INFO', console_output=True) +logger = get_logger(__name__) + + +class BinanceExample: + """Example Binance connector usage""" + + def __init__(self): + self.connector = BinanceConnector() + self.orderbook_count = 0 + self.trade_count = 0 + + # Add data callbacks + self.connector.add_data_callback(self.on_data_received) + self.connector.add_status_callback(self.on_status_changed) + + def on_data_received(self, data): + """Handle received data""" + if isinstance(data, OrderBookSnapshot): + self.orderbook_count += 1 + logger.info( + f"šŸ“Š Order Book {self.orderbook_count}: {data.symbol} - " + f"Mid: ${data.mid_price:.2f}, Spread: ${data.spread:.2f}, " + f"Bids: {len(data.bids)}, Asks: {len(data.asks)}" + ) + + elif isinstance(data, TradeEvent): + self.trade_count += 1 + logger.info( + f"šŸ’° Trade {self.trade_count}: {data.symbol} - " + f"{data.side.upper()} {data.size} @ ${data.price:.2f}" + ) + + def on_status_changed(self, exchange, status): + """Handle status changes""" + logger.info(f"šŸ”„ {exchange} status changed to: {status.value}") + + async def run_example(self): + """Run the example""" + try: + logger.info("šŸš€ Starting Binance connector example") + + # Connect to Binance + logger.info("šŸ”Œ Connecting to Binance...") + connected = await self.connector.connect() + + if not connected: + logger.error("āŒ Failed to connect to Binance") + return + + logger.info("āœ… Connected to Binance successfully") + + # Get available symbols + logger.info("šŸ“‹ Getting available symbols...") + symbols = await self.connector.get_symbols() + logger.info(f"šŸ“‹ Found {len(symbols)} trading symbols") + + # Show some popular symbols + popular_symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT', 'BNBUSDT'] + available_popular = [s for s in popular_symbols if s in symbols] + logger.info(f"šŸ“‹ Popular symbols available: {available_popular}") + + # Get order book snapshot + if 'BTCUSDT' in symbols: + logger.info("šŸ“Š Getting BTC order book snapshot...") + orderbook = await self.connector.get_orderbook_snapshot('BTCUSDT', depth=10) + if orderbook: + logger.info( + f"šŸ“Š BTC Order Book: Mid=${orderbook.mid_price:.2f}, " + f"Spread=${orderbook.spread:.2f}" + ) + + # Subscribe to real-time data + logger.info("šŸ”” Subscribing to real-time data...") + + # Subscribe to BTC order book and trades + if 'BTCUSDT' in symbols: + await self.connector.subscribe_orderbook('BTCUSDT') + await self.connector.subscribe_trades('BTCUSDT') + logger.info("āœ… Subscribed to BTCUSDT order book and trades") + + # Subscribe to ETH order book + if 'ETHUSDT' in symbols: + await self.connector.subscribe_orderbook('ETHUSDT') + logger.info("āœ… Subscribed to ETHUSDT order book") + + # Let it run for a while + logger.info("ā³ Collecting data for 30 seconds...") + await asyncio.sleep(30) + + # Show statistics + stats = self.connector.get_binance_stats() + logger.info("šŸ“ˆ Final Statistics:") + logger.info(f" šŸ“Š Order books received: {self.orderbook_count}") + logger.info(f" šŸ’° Trades received: {self.trade_count}") + logger.info(f" šŸ“” Total messages: {stats['message_count']}") + logger.info(f" āŒ Errors: {stats['error_count']}") + logger.info(f" šŸ”— Active streams: {stats['active_streams']}") + logger.info(f" šŸ“‹ Subscriptions: {list(stats['subscriptions'].keys())}") + + # Unsubscribe and disconnect + logger.info("šŸ”Œ Cleaning up...") + + if 'BTCUSDT' in self.connector.subscriptions: + await self.connector.unsubscribe_orderbook('BTCUSDT') + await self.connector.unsubscribe_trades('BTCUSDT') + + if 'ETHUSDT' in self.connector.subscriptions: + await self.connector.unsubscribe_orderbook('ETHUSDT') + + await self.connector.disconnect() + logger.info("āœ… Disconnected successfully") + + except KeyboardInterrupt: + logger.info("ā¹ļø Interrupted by user") + except Exception as e: + logger.error(f"āŒ Example failed: {e}") + finally: + # Ensure cleanup + try: + await self.connector.disconnect() + except: + pass + + +async def main(): + """Main function""" + example = BinanceExample() + await example.run_example() + + +if __name__ == "__main__": + print("Binance Connector Example") + print("=" * 25) + print("This example will:") + print("1. Connect to Binance WebSocket") + print("2. Get available trading symbols") + print("3. Subscribe to real-time order book and trade data") + print("4. Display received data for 30 seconds") + print("5. Show statistics and disconnect") + print() + print("Press Ctrl+C to stop early") + print("=" * 25) + + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nšŸ‘‹ Example stopped by user") + except Exception as e: + print(f"\nāŒ Example failed: {e}") + sys.exit(1) \ No newline at end of file diff --git a/COBY/processing/__init__.py b/COBY/processing/__init__.py new file mode 100644 index 0000000..3cbc5df --- /dev/null +++ b/COBY/processing/__init__.py @@ -0,0 +1,15 @@ +""" +Data processing and normalization components for the COBY system. +""" + +from .data_processor import StandardDataProcessor +from .quality_checker import DataQualityChecker +from .anomaly_detector import AnomalyDetector +from .metrics_calculator import MetricsCalculator + +__all__ = [ + 'StandardDataProcessor', + 'DataQualityChecker', + 'AnomalyDetector', + 'MetricsCalculator' +] \ No newline at end of file diff --git a/COBY/processing/anomaly_detector.py b/COBY/processing/anomaly_detector.py new file mode 100644 index 0000000..d3245a2 --- /dev/null +++ b/COBY/processing/anomaly_detector.py @@ -0,0 +1,329 @@ +""" +Anomaly detection for market data. +""" + +import statistics +from typing import Dict, List, Union, Optional, Deque +from collections import deque +from datetime import datetime, timedelta +from ..models.core import OrderBookSnapshot, TradeEvent +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp + +logger = get_logger(__name__) + + +class AnomalyDetector: + """ + Detects anomalies in market data using statistical methods. + + Detects: + - Price spikes and drops + - Volume anomalies + - Spread anomalies + - Frequency anomalies + """ + + def __init__(self, window_size: int = 100, z_score_threshold: float = 3.0): + """ + Initialize anomaly detector. + + Args: + window_size: Size of rolling window for statistics + z_score_threshold: Z-score threshold for anomaly detection + """ + self.window_size = window_size + self.z_score_threshold = z_score_threshold + + # Rolling windows for statistics + self.price_windows: Dict[str, Deque[float]] = {} + self.volume_windows: Dict[str, Deque[float]] = {} + self.spread_windows: Dict[str, Deque[float]] = {} + self.timestamp_windows: Dict[str, Deque[datetime]] = {} + + logger.info(f"Anomaly detector initialized with window_size={window_size}, threshold={z_score_threshold}") + + def detect_orderbook_anomalies(self, orderbook: OrderBookSnapshot) -> List[str]: + """ + Detect anomalies in order book data. + + Args: + orderbook: Order book snapshot to analyze + + Returns: + List[str]: List of detected anomalies + """ + anomalies = [] + key = f"{orderbook.symbol}_{orderbook.exchange}" + + try: + # Price anomalies + if orderbook.mid_price: + price_anomalies = self._detect_price_anomalies(key, orderbook.mid_price) + anomalies.extend(price_anomalies) + + # Volume anomalies + total_volume = orderbook.bid_volume + orderbook.ask_volume + volume_anomalies = self._detect_volume_anomalies(key, total_volume) + anomalies.extend(volume_anomalies) + + # Spread anomalies + if orderbook.spread and orderbook.mid_price: + spread_pct = (orderbook.spread / orderbook.mid_price) * 100 + spread_anomalies = self._detect_spread_anomalies(key, spread_pct) + anomalies.extend(spread_anomalies) + + # Frequency anomalies + frequency_anomalies = self._detect_frequency_anomalies(key, orderbook.timestamp) + anomalies.extend(frequency_anomalies) + + # Update windows + self._update_windows(key, orderbook) + + except Exception as e: + logger.error(f"Error detecting order book anomalies: {e}") + anomalies.append(f"Anomaly detection error: {e}") + + if anomalies: + logger.warning(f"Anomalies detected in {orderbook.symbol}@{orderbook.exchange}: {anomalies}") + + return anomalies + + def detect_trade_anomalies(self, trade: TradeEvent) -> List[str]: + """ + Detect anomalies in trade data. + + Args: + trade: Trade event to analyze + + Returns: + List[str]: List of detected anomalies + """ + anomalies = [] + key = f"{trade.symbol}_{trade.exchange}_trade" + + try: + # Price anomalies + price_anomalies = self._detect_price_anomalies(key, trade.price) + anomalies.extend(price_anomalies) + + # Volume anomalies + volume_anomalies = self._detect_volume_anomalies(key, trade.size) + anomalies.extend(volume_anomalies) + + # Update windows + self._update_trade_windows(key, trade) + + except Exception as e: + logger.error(f"Error detecting trade anomalies: {e}") + anomalies.append(f"Anomaly detection error: {e}") + + if anomalies: + logger.warning(f"Trade anomalies detected in {trade.symbol}@{trade.exchange}: {anomalies}") + + return anomalies + + def _detect_price_anomalies(self, key: str, price: float) -> List[str]: + """Detect price anomalies using z-score""" + anomalies = [] + + if key not in self.price_windows: + self.price_windows[key] = deque(maxlen=self.window_size) + return anomalies + + window = self.price_windows[key] + if len(window) < 10: # Need minimum data points + return anomalies + + try: + mean_price = statistics.mean(window) + std_price = statistics.stdev(window) + + if std_price > 0: + z_score = abs(price - mean_price) / std_price + + if z_score > self.z_score_threshold: + direction = "spike" if price > mean_price else "drop" + anomalies.append(f"Price {direction}: {price:.6f} (z-score: {z_score:.2f})") + + except statistics.StatisticsError: + pass # Not enough data or all values are the same + + return anomalies + + def _detect_volume_anomalies(self, key: str, volume: float) -> List[str]: + """Detect volume anomalies using z-score""" + anomalies = [] + + volume_key = f"{key}_volume" + if volume_key not in self.volume_windows: + self.volume_windows[volume_key] = deque(maxlen=self.window_size) + return anomalies + + window = self.volume_windows[volume_key] + if len(window) < 10: + return anomalies + + try: + mean_volume = statistics.mean(window) + std_volume = statistics.stdev(window) + + if std_volume > 0: + z_score = abs(volume - mean_volume) / std_volume + + if z_score > self.z_score_threshold: + direction = "spike" if volume > mean_volume else "drop" + anomalies.append(f"Volume {direction}: {volume:.6f} (z-score: {z_score:.2f})") + + except statistics.StatisticsError: + pass + + return anomalies + + def _detect_spread_anomalies(self, key: str, spread_pct: float) -> List[str]: + """Detect spread anomalies using z-score""" + anomalies = [] + + spread_key = f"{key}_spread" + if spread_key not in self.spread_windows: + self.spread_windows[spread_key] = deque(maxlen=self.window_size) + return anomalies + + window = self.spread_windows[spread_key] + if len(window) < 10: + return anomalies + + try: + mean_spread = statistics.mean(window) + std_spread = statistics.stdev(window) + + if std_spread > 0: + z_score = abs(spread_pct - mean_spread) / std_spread + + if z_score > self.z_score_threshold: + direction = "widening" if spread_pct > mean_spread else "tightening" + anomalies.append(f"Spread {direction}: {spread_pct:.4f}% (z-score: {z_score:.2f})") + + except statistics.StatisticsError: + pass + + return anomalies + + def _detect_frequency_anomalies(self, key: str, timestamp: datetime) -> List[str]: + """Detect frequency anomalies in data updates""" + anomalies = [] + + timestamp_key = f"{key}_timestamp" + if timestamp_key not in self.timestamp_windows: + self.timestamp_windows[timestamp_key] = deque(maxlen=self.window_size) + return anomalies + + window = self.timestamp_windows[timestamp_key] + if len(window) < 5: + return anomalies + + try: + # Calculate intervals between updates + intervals = [] + for i in range(1, len(window)): + interval = (window[i] - window[i-1]).total_seconds() + intervals.append(interval) + + if len(intervals) >= 5: + mean_interval = statistics.mean(intervals) + std_interval = statistics.stdev(intervals) + + # Check current interval + current_interval = (timestamp - window[-1]).total_seconds() + + if std_interval > 0: + z_score = abs(current_interval - mean_interval) / std_interval + + if z_score > self.z_score_threshold: + if current_interval > mean_interval: + anomalies.append(f"Update delay: {current_interval:.1f}s (expected: {mean_interval:.1f}s)") + else: + anomalies.append(f"Update burst: {current_interval:.1f}s (expected: {mean_interval:.1f}s)") + + except (statistics.StatisticsError, IndexError): + pass + + return anomalies + + def _update_windows(self, key: str, orderbook: OrderBookSnapshot) -> None: + """Update rolling windows with new data""" + # Update price window + if orderbook.mid_price: + if key not in self.price_windows: + self.price_windows[key] = deque(maxlen=self.window_size) + self.price_windows[key].append(orderbook.mid_price) + + # Update volume window + total_volume = orderbook.bid_volume + orderbook.ask_volume + volume_key = f"{key}_volume" + if volume_key not in self.volume_windows: + self.volume_windows[volume_key] = deque(maxlen=self.window_size) + self.volume_windows[volume_key].append(total_volume) + + # Update spread window + if orderbook.spread and orderbook.mid_price: + spread_pct = (orderbook.spread / orderbook.mid_price) * 100 + spread_key = f"{key}_spread" + if spread_key not in self.spread_windows: + self.spread_windows[spread_key] = deque(maxlen=self.window_size) + self.spread_windows[spread_key].append(spread_pct) + + # Update timestamp window + timestamp_key = f"{key}_timestamp" + if timestamp_key not in self.timestamp_windows: + self.timestamp_windows[timestamp_key] = deque(maxlen=self.window_size) + self.timestamp_windows[timestamp_key].append(orderbook.timestamp) + + def _update_trade_windows(self, key: str, trade: TradeEvent) -> None: + """Update rolling windows with trade data""" + # Update price window + if key not in self.price_windows: + self.price_windows[key] = deque(maxlen=self.window_size) + self.price_windows[key].append(trade.price) + + # Update volume window + volume_key = f"{key}_volume" + if volume_key not in self.volume_windows: + self.volume_windows[volume_key] = deque(maxlen=self.window_size) + self.volume_windows[volume_key].append(trade.size) + + def get_statistics(self) -> Dict[str, Dict[str, float]]: + """Get current statistics for all tracked symbols""" + stats = {} + + for key, window in self.price_windows.items(): + if len(window) >= 2: + try: + stats[key] = { + 'price_mean': statistics.mean(window), + 'price_std': statistics.stdev(window), + 'price_min': min(window), + 'price_max': max(window), + 'data_points': len(window) + } + except statistics.StatisticsError: + stats[key] = {'error': 'insufficient_data'} + + return stats + + def reset_windows(self, key: Optional[str] = None) -> None: + """Reset rolling windows for a specific key or all keys""" + if key: + # Reset specific key + self.price_windows.pop(key, None) + self.volume_windows.pop(f"{key}_volume", None) + self.spread_windows.pop(f"{key}_spread", None) + self.timestamp_windows.pop(f"{key}_timestamp", None) + else: + # Reset all windows + self.price_windows.clear() + self.volume_windows.clear() + self.spread_windows.clear() + self.timestamp_windows.clear() + + logger.info(f"Reset anomaly detection windows for {key or 'all keys'}") \ No newline at end of file diff --git a/COBY/processing/data_processor.py b/COBY/processing/data_processor.py new file mode 100644 index 0000000..4074f75 --- /dev/null +++ b/COBY/processing/data_processor.py @@ -0,0 +1,378 @@ +""" +Main data processor implementation. +""" + +from typing import Dict, Union, List, Optional, Any +from ..interfaces.data_processor import DataProcessor +from ..models.core import OrderBookSnapshot, TradeEvent, OrderBookMetrics +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ValidationError, ProcessingError +from ..utils.timing import get_current_timestamp +from .quality_checker import DataQualityChecker +from .anomaly_detector import AnomalyDetector +from .metrics_calculator import MetricsCalculator + +logger = get_logger(__name__) + + +class StandardDataProcessor(DataProcessor): + """ + Standard implementation of data processor interface. + + Provides: + - Data normalization and validation + - Quality checking + - Anomaly detection + - Metrics calculation + - Data enrichment + """ + + def __init__(self): + """Initialize data processor with components""" + self.quality_checker = DataQualityChecker() + self.anomaly_detector = AnomalyDetector() + self.metrics_calculator = MetricsCalculator() + + # Processing statistics + self.processed_orderbooks = 0 + self.processed_trades = 0 + self.quality_failures = 0 + self.anomalies_detected = 0 + + logger.info("Standard data processor initialized") + + def normalize_orderbook(self, raw_data: Dict, exchange: str) -> OrderBookSnapshot: + """ + Normalize raw order book data to standard format. + + Args: + raw_data: Raw order book data from exchange + exchange: Exchange name + + Returns: + OrderBookSnapshot: Normalized order book data + """ + try: + set_correlation_id() + + # This is a generic implementation - specific exchanges would override + # For now, assume data is already in correct format + if isinstance(raw_data, OrderBookSnapshot): + return raw_data + + # If raw_data is a dict, try to construct OrderBookSnapshot + # This would be customized per exchange + raise NotImplementedError( + "normalize_orderbook should be implemented by exchange-specific processors" + ) + + except Exception as e: + logger.error(f"Error normalizing order book data: {e}") + raise ProcessingError(f"Normalization failed: {e}", "NORMALIZE_ERROR") + + def normalize_trade(self, raw_data: Dict, exchange: str) -> TradeEvent: + """ + Normalize raw trade data to standard format. + + Args: + raw_data: Raw trade data from exchange + exchange: Exchange name + + Returns: + TradeEvent: Normalized trade data + """ + try: + set_correlation_id() + + # This is a generic implementation - specific exchanges would override + if isinstance(raw_data, TradeEvent): + return raw_data + + # If raw_data is a dict, try to construct TradeEvent + # This would be customized per exchange + raise NotImplementedError( + "normalize_trade should be implemented by exchange-specific processors" + ) + + except Exception as e: + logger.error(f"Error normalizing trade data: {e}") + raise ProcessingError(f"Normalization failed: {e}", "NORMALIZE_ERROR") + + def validate_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> bool: + """ + Validate normalized data for quality and consistency. + + Args: + data: Normalized data to validate + + Returns: + bool: True if data is valid, False otherwise + """ + try: + set_correlation_id() + + if isinstance(data, OrderBookSnapshot): + quality_score, issues = self.quality_checker.check_orderbook_quality(data) + self.processed_orderbooks += 1 + + if quality_score < 0.5: # Threshold for acceptable quality + self.quality_failures += 1 + logger.warning(f"Low quality order book data: score={quality_score:.2f}, issues={issues}") + return False + + return True + + elif isinstance(data, TradeEvent): + quality_score, issues = self.quality_checker.check_trade_quality(data) + self.processed_trades += 1 + + if quality_score < 0.5: + self.quality_failures += 1 + logger.warning(f"Low quality trade data: score={quality_score:.2f}, issues={issues}") + return False + + return True + + else: + logger.error(f"Unknown data type for validation: {type(data)}") + return False + + except Exception as e: + logger.error(f"Error validating data: {e}") + return False + + def calculate_metrics(self, orderbook: OrderBookSnapshot) -> OrderBookMetrics: + """ + Calculate metrics from order book data. + + Args: + orderbook: Order book snapshot + + Returns: + OrderBookMetrics: Calculated metrics + """ + try: + set_correlation_id() + return self.metrics_calculator.calculate_orderbook_metrics(orderbook) + + except Exception as e: + logger.error(f"Error calculating metrics: {e}") + raise ProcessingError(f"Metrics calculation failed: {e}", "METRICS_ERROR") + + def detect_anomalies(self, data: Union[OrderBookSnapshot, TradeEvent]) -> List[str]: + """ + Detect anomalies in the data. + + Args: + data: Data to analyze for anomalies + + Returns: + List[str]: List of detected anomaly descriptions + """ + try: + set_correlation_id() + + if isinstance(data, OrderBookSnapshot): + anomalies = self.anomaly_detector.detect_orderbook_anomalies(data) + elif isinstance(data, TradeEvent): + anomalies = self.anomaly_detector.detect_trade_anomalies(data) + else: + logger.error(f"Unknown data type for anomaly detection: {type(data)}") + return ["Unknown data type"] + + if anomalies: + self.anomalies_detected += len(anomalies) + + return anomalies + + except Exception as e: + logger.error(f"Error detecting anomalies: {e}") + return [f"Anomaly detection error: {e}"] + + def filter_data(self, data: Union[OrderBookSnapshot, TradeEvent], criteria: Dict) -> bool: + """ + Filter data based on criteria. + + Args: + data: Data to filter + criteria: Filtering criteria + + Returns: + bool: True if data passes filter, False otherwise + """ + try: + set_correlation_id() + + # Symbol filter + if 'symbols' in criteria: + allowed_symbols = criteria['symbols'] + if data.symbol not in allowed_symbols: + return False + + # Exchange filter + if 'exchanges' in criteria: + allowed_exchanges = criteria['exchanges'] + if data.exchange not in allowed_exchanges: + return False + + # Quality filter + if 'min_quality' in criteria: + min_quality = criteria['min_quality'] + if isinstance(data, OrderBookSnapshot): + quality_score, _ = self.quality_checker.check_orderbook_quality(data) + elif isinstance(data, TradeEvent): + quality_score, _ = self.quality_checker.check_trade_quality(data) + else: + quality_score = 0.0 + + if quality_score < min_quality: + return False + + # Price range filter + if 'price_range' in criteria: + price_range = criteria['price_range'] + min_price, max_price = price_range + + if isinstance(data, OrderBookSnapshot): + price = data.mid_price + elif isinstance(data, TradeEvent): + price = data.price + else: + return False + + if price and (price < min_price or price > max_price): + return False + + # Volume filter for trades + if 'min_volume' in criteria and isinstance(data, TradeEvent): + min_volume = criteria['min_volume'] + if data.size < min_volume: + return False + + return True + + except Exception as e: + logger.error(f"Error filtering data: {e}") + return False + + def enrich_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> Dict: + """ + Enrich data with additional metadata. + + Args: + data: Data to enrich + + Returns: + Dict: Enriched data with metadata + """ + try: + set_correlation_id() + + enriched = { + 'original_data': data, + 'processing_timestamp': get_current_timestamp(), + 'processor_version': '1.0.0' + } + + # Add quality metrics + if isinstance(data, OrderBookSnapshot): + quality_score, quality_issues = self.quality_checker.check_orderbook_quality(data) + enriched['quality_score'] = quality_score + enriched['quality_issues'] = quality_issues + + # Add calculated metrics + try: + metrics = self.calculate_metrics(data) + enriched['metrics'] = { + 'mid_price': metrics.mid_price, + 'spread': metrics.spread, + 'spread_percentage': metrics.spread_percentage, + 'volume_imbalance': metrics.volume_imbalance, + 'depth_10': metrics.depth_10, + 'depth_50': metrics.depth_50 + } + except Exception as e: + enriched['metrics_error'] = str(e) + + # Add liquidity score + try: + liquidity_score = self.metrics_calculator.calculate_liquidity_score(data) + enriched['liquidity_score'] = liquidity_score + except Exception as e: + enriched['liquidity_error'] = str(e) + + elif isinstance(data, TradeEvent): + quality_score, quality_issues = self.quality_checker.check_trade_quality(data) + enriched['quality_score'] = quality_score + enriched['quality_issues'] = quality_issues + + # Add trade-specific enrichments + enriched['trade_value'] = data.price * data.size + enriched['side_numeric'] = 1 if data.side == 'buy' else -1 + + # Add anomaly detection results + anomalies = self.detect_anomalies(data) + enriched['anomalies'] = anomalies + enriched['anomaly_count'] = len(anomalies) + + return enriched + + except Exception as e: + logger.error(f"Error enriching data: {e}") + return { + 'original_data': data, + 'enrichment_error': str(e) + } + + def get_data_quality_score(self, data: Union[OrderBookSnapshot, TradeEvent]) -> float: + """ + Calculate data quality score. + + Args: + data: Data to score + + Returns: + float: Quality score between 0.0 and 1.0 + """ + try: + set_correlation_id() + + if isinstance(data, OrderBookSnapshot): + quality_score, _ = self.quality_checker.check_orderbook_quality(data) + elif isinstance(data, TradeEvent): + quality_score, _ = self.quality_checker.check_trade_quality(data) + else: + logger.error(f"Unknown data type for quality scoring: {type(data)}") + return 0.0 + + return quality_score + + except Exception as e: + logger.error(f"Error calculating quality score: {e}") + return 0.0 + + def get_processing_stats(self) -> Dict[str, Any]: + """Get processing statistics""" + return { + 'processed_orderbooks': self.processed_orderbooks, + 'processed_trades': self.processed_trades, + 'quality_failures': self.quality_failures, + 'anomalies_detected': self.anomalies_detected, + 'quality_failure_rate': ( + self.quality_failures / max(1, self.processed_orderbooks + self.processed_trades) + ), + 'anomaly_rate': ( + self.anomalies_detected / max(1, self.processed_orderbooks + self.processed_trades) + ), + 'quality_checker_summary': self.quality_checker.get_quality_summary(), + 'anomaly_detector_stats': self.anomaly_detector.get_statistics() + } + + def reset_stats(self) -> None: + """Reset processing statistics""" + self.processed_orderbooks = 0 + self.processed_trades = 0 + self.quality_failures = 0 + self.anomalies_detected = 0 + + logger.info("Processing statistics reset") \ No newline at end of file diff --git a/COBY/processing/metrics_calculator.py b/COBY/processing/metrics_calculator.py new file mode 100644 index 0000000..c617618 --- /dev/null +++ b/COBY/processing/metrics_calculator.py @@ -0,0 +1,275 @@ +""" +Metrics calculation for order book analysis. +""" + +from typing import Dict, List, Optional +from ..models.core import OrderBookSnapshot, OrderBookMetrics, ImbalanceMetrics +from ..utils.logging import get_logger + +logger = get_logger(__name__) + + +class MetricsCalculator: + """ + Calculates various metrics from order book data. + + Metrics include: + - Basic metrics (mid price, spread, volumes) + - Imbalance metrics + - Depth metrics + - Liquidity metrics + """ + + def __init__(self): + """Initialize metrics calculator""" + logger.info("Metrics calculator initialized") + + def calculate_orderbook_metrics(self, orderbook: OrderBookSnapshot) -> OrderBookMetrics: + """ + Calculate comprehensive order book metrics. + + Args: + orderbook: Order book snapshot + + Returns: + OrderBookMetrics: Calculated metrics + """ + try: + # Basic calculations + mid_price = self._calculate_mid_price(orderbook) + spread = self._calculate_spread(orderbook) + spread_percentage = (spread / mid_price * 100) if mid_price > 0 else 0.0 + + # Volume calculations + bid_volume = sum(level.size for level in orderbook.bids) + ask_volume = sum(level.size for level in orderbook.asks) + + # Imbalance calculation + total_volume = bid_volume + ask_volume + volume_imbalance = ((bid_volume - ask_volume) / total_volume) if total_volume > 0 else 0.0 + + # Depth calculations + depth_10 = self._calculate_depth(orderbook, 10) + depth_50 = self._calculate_depth(orderbook, 50) + + return OrderBookMetrics( + symbol=orderbook.symbol, + exchange=orderbook.exchange, + timestamp=orderbook.timestamp, + mid_price=mid_price, + spread=spread, + spread_percentage=spread_percentage, + bid_volume=bid_volume, + ask_volume=ask_volume, + volume_imbalance=volume_imbalance, + depth_10=depth_10, + depth_50=depth_50 + ) + + except Exception as e: + logger.error(f"Error calculating order book metrics: {e}") + raise + + def calculate_imbalance_metrics(self, orderbook: OrderBookSnapshot) -> ImbalanceMetrics: + """ + Calculate order book imbalance metrics. + + Args: + orderbook: Order book snapshot + + Returns: + ImbalanceMetrics: Calculated imbalance metrics + """ + try: + # Volume imbalance + bid_volume = sum(level.size for level in orderbook.bids) + ask_volume = sum(level.size for level in orderbook.asks) + total_volume = bid_volume + ask_volume + volume_imbalance = ((bid_volume - ask_volume) / total_volume) if total_volume > 0 else 0.0 + + # Price imbalance (weighted by volume) + price_imbalance = self._calculate_price_imbalance(orderbook) + + # Depth imbalance + depth_imbalance = self._calculate_depth_imbalance(orderbook) + + # Momentum score (simplified - would need historical data for full implementation) + momentum_score = volume_imbalance * 0.5 + price_imbalance * 0.3 + depth_imbalance * 0.2 + + return ImbalanceMetrics( + symbol=orderbook.symbol, + timestamp=orderbook.timestamp, + volume_imbalance=volume_imbalance, + price_imbalance=price_imbalance, + depth_imbalance=depth_imbalance, + momentum_score=momentum_score + ) + + except Exception as e: + logger.error(f"Error calculating imbalance metrics: {e}") + raise + + def _calculate_mid_price(self, orderbook: OrderBookSnapshot) -> float: + """Calculate mid price""" + if not orderbook.bids or not orderbook.asks: + return 0.0 + + best_bid = orderbook.bids[0].price + best_ask = orderbook.asks[0].price + + return (best_bid + best_ask) / 2.0 + + def _calculate_spread(self, orderbook: OrderBookSnapshot) -> float: + """Calculate bid-ask spread""" + if not orderbook.bids or not orderbook.asks: + return 0.0 + + best_bid = orderbook.bids[0].price + best_ask = orderbook.asks[0].price + + return best_ask - best_bid + + def _calculate_depth(self, orderbook: OrderBookSnapshot, levels: int) -> float: + """Calculate market depth for specified number of levels""" + bid_depth = sum( + level.size for level in orderbook.bids[:levels] + ) + ask_depth = sum( + level.size for level in orderbook.asks[:levels] + ) + + return bid_depth + ask_depth + + def _calculate_price_imbalance(self, orderbook: OrderBookSnapshot) -> float: + """Calculate price-weighted imbalance""" + if not orderbook.bids or not orderbook.asks: + return 0.0 + + # Calculate volume-weighted average prices for top levels + bid_vwap = self._calculate_vwap(orderbook.bids[:5]) + ask_vwap = self._calculate_vwap(orderbook.asks[:5]) + + if bid_vwap == 0 or ask_vwap == 0: + return 0.0 + + mid_price = (bid_vwap + ask_vwap) / 2.0 + + # Normalize imbalance + price_imbalance = (bid_vwap - ask_vwap) / mid_price if mid_price > 0 else 0.0 + + return max(-1.0, min(1.0, price_imbalance)) + + def _calculate_depth_imbalance(self, orderbook: OrderBookSnapshot) -> float: + """Calculate depth imbalance across multiple levels""" + levels_to_check = [5, 10, 20] + imbalances = [] + + for levels in levels_to_check: + bid_depth = sum(level.size for level in orderbook.bids[:levels]) + ask_depth = sum(level.size for level in orderbook.asks[:levels]) + total_depth = bid_depth + ask_depth + + if total_depth > 0: + imbalance = (bid_depth - ask_depth) / total_depth + imbalances.append(imbalance) + + # Return weighted average of imbalances + if imbalances: + return sum(imbalances) / len(imbalances) + + return 0.0 + + def _calculate_vwap(self, levels: List) -> float: + """Calculate volume-weighted average price for price levels""" + if not levels: + return 0.0 + + total_volume = sum(level.size for level in levels) + if total_volume == 0: + return 0.0 + + weighted_sum = sum(level.price * level.size for level in levels) + + return weighted_sum / total_volume + + def calculate_liquidity_score(self, orderbook: OrderBookSnapshot) -> float: + """ + Calculate liquidity score based on depth and spread. + + Args: + orderbook: Order book snapshot + + Returns: + float: Liquidity score (0.0 to 1.0) + """ + try: + if not orderbook.bids or not orderbook.asks: + return 0.0 + + # Spread component (lower spread = higher liquidity) + spread = self._calculate_spread(orderbook) + mid_price = self._calculate_mid_price(orderbook) + + if mid_price == 0: + return 0.0 + + spread_pct = (spread / mid_price) * 100 + spread_score = max(0.0, 1.0 - (spread_pct / 5.0)) # Normalize to 5% max spread + + # Depth component (higher depth = higher liquidity) + total_depth = self._calculate_depth(orderbook, 10) + depth_score = min(1.0, total_depth / 100.0) # Normalize to 100 units max depth + + # Volume balance component (more balanced = higher liquidity) + bid_volume = sum(level.size for level in orderbook.bids[:10]) + ask_volume = sum(level.size for level in orderbook.asks[:10]) + total_volume = bid_volume + ask_volume + + if total_volume > 0: + imbalance = abs(bid_volume - ask_volume) / total_volume + balance_score = 1.0 - imbalance + else: + balance_score = 0.0 + + # Weighted combination + liquidity_score = (spread_score * 0.4 + depth_score * 0.4 + balance_score * 0.2) + + return max(0.0, min(1.0, liquidity_score)) + + except Exception as e: + logger.error(f"Error calculating liquidity score: {e}") + return 0.0 + + def get_market_summary(self, orderbook: OrderBookSnapshot) -> Dict[str, float]: + """ + Get comprehensive market summary. + + Args: + orderbook: Order book snapshot + + Returns: + Dict[str, float]: Market summary metrics + """ + try: + metrics = self.calculate_orderbook_metrics(orderbook) + imbalance = self.calculate_imbalance_metrics(orderbook) + liquidity = self.calculate_liquidity_score(orderbook) + + return { + 'mid_price': metrics.mid_price, + 'spread': metrics.spread, + 'spread_percentage': metrics.spread_percentage, + 'bid_volume': metrics.bid_volume, + 'ask_volume': metrics.ask_volume, + 'volume_imbalance': metrics.volume_imbalance, + 'depth_10': metrics.depth_10, + 'depth_50': metrics.depth_50, + 'price_imbalance': imbalance.price_imbalance, + 'depth_imbalance': imbalance.depth_imbalance, + 'momentum_score': imbalance.momentum_score, + 'liquidity_score': liquidity + } + + except Exception as e: + logger.error(f"Error generating market summary: {e}") + return {} \ No newline at end of file diff --git a/COBY/processing/quality_checker.py b/COBY/processing/quality_checker.py new file mode 100644 index 0000000..619c6a6 --- /dev/null +++ b/COBY/processing/quality_checker.py @@ -0,0 +1,288 @@ +""" +Data quality checking and validation for market data. +""" + +from typing import Dict, List, Union, Optional, Tuple +from datetime import datetime, timezone +from ..models.core import OrderBookSnapshot, TradeEvent +from ..utils.logging import get_logger +from ..utils.validation import validate_price, validate_volume, validate_symbol +from ..utils.timing import get_current_timestamp + +logger = get_logger(__name__) + + +class DataQualityChecker: + """ + Comprehensive data quality checker for market data. + + Validates: + - Data structure integrity + - Price and volume ranges + - Timestamp consistency + - Cross-validation between related data points + """ + + def __init__(self): + """Initialize quality checker with default thresholds""" + # Quality thresholds + self.max_spread_percentage = 10.0 # Maximum spread as % of mid price + self.max_price_change_percentage = 50.0 # Maximum price change between updates + self.min_volume_threshold = 0.000001 # Minimum meaningful volume + self.max_timestamp_drift = 300 # Maximum seconds drift from current time + + # Price history for validation + self.price_history: Dict[str, Dict[str, float]] = {} # symbol -> exchange -> last_price + + logger.info("Data quality checker initialized") + + def check_orderbook_quality(self, orderbook: OrderBookSnapshot) -> Tuple[float, List[str]]: + """ + Check order book data quality. + + Args: + orderbook: Order book snapshot to validate + + Returns: + Tuple[float, List[str]]: Quality score (0.0-1.0) and list of issues + """ + issues = [] + quality_score = 1.0 + + try: + # Basic structure validation + structure_issues = self._check_orderbook_structure(orderbook) + issues.extend(structure_issues) + quality_score -= len(structure_issues) * 0.1 + + # Price validation + price_issues = self._check_orderbook_prices(orderbook) + issues.extend(price_issues) + quality_score -= len(price_issues) * 0.15 + + # Volume validation + volume_issues = self._check_orderbook_volumes(orderbook) + issues.extend(volume_issues) + quality_score -= len(volume_issues) * 0.1 + + # Spread validation + spread_issues = self._check_orderbook_spread(orderbook) + issues.extend(spread_issues) + quality_score -= len(spread_issues) * 0.2 + + # Timestamp validation + timestamp_issues = self._check_timestamp(orderbook.timestamp) + issues.extend(timestamp_issues) + quality_score -= len(timestamp_issues) * 0.1 + + # Cross-validation with history + history_issues = self._check_price_history(orderbook) + issues.extend(history_issues) + quality_score -= len(history_issues) * 0.15 + + # Update price history + self._update_price_history(orderbook) + + except Exception as e: + logger.error(f"Error checking order book quality: {e}") + issues.append(f"Quality check error: {e}") + quality_score = 0.0 + + # Ensure score is within bounds + quality_score = max(0.0, min(1.0, quality_score)) + + if issues: + logger.debug(f"Order book quality issues for {orderbook.symbol}@{orderbook.exchange}: {issues}") + + return quality_score, issues de +f check_trade_quality(self, trade: TradeEvent) -> Tuple[float, List[str]]: + """ + Check trade data quality. + + Args: + trade: Trade event to validate + + Returns: + Tuple[float, List[str]]: Quality score (0.0-1.0) and list of issues + """ + issues = [] + quality_score = 1.0 + + try: + # Basic structure validation + if not validate_symbol(trade.symbol): + issues.append("Invalid symbol format") + + if not trade.exchange: + issues.append("Missing exchange") + + if not trade.trade_id: + issues.append("Missing trade ID") + + # Price validation + if not validate_price(trade.price): + issues.append(f"Invalid price: {trade.price}") + + # Volume validation + if not validate_volume(trade.size): + issues.append(f"Invalid size: {trade.size}") + + if trade.size < self.min_volume_threshold: + issues.append(f"Size below threshold: {trade.size}") + + # Side validation + if trade.side not in ['buy', 'sell']: + issues.append(f"Invalid side: {trade.side}") + + # Timestamp validation + timestamp_issues = self._check_timestamp(trade.timestamp) + issues.extend(timestamp_issues) + + # Calculate quality score + quality_score -= len(issues) * 0.2 + + except Exception as e: + logger.error(f"Error checking trade quality: {e}") + issues.append(f"Quality check error: {e}") + quality_score = 0.0 + + # Ensure score is within bounds + quality_score = max(0.0, min(1.0, quality_score)) + + if issues: + logger.debug(f"Trade quality issues for {trade.symbol}@{trade.exchange}: {issues}") + + return quality_score, issues + + def _check_orderbook_structure(self, orderbook: OrderBookSnapshot) -> List[str]: + """Check basic order book structure""" + issues = [] + + if not validate_symbol(orderbook.symbol): + issues.append("Invalid symbol format") + + if not orderbook.exchange: + issues.append("Missing exchange") + + if not orderbook.bids: + issues.append("No bid levels") + + if not orderbook.asks: + issues.append("No ask levels") + + return issues + + def _check_orderbook_prices(self, orderbook: OrderBookSnapshot) -> List[str]: + """Check order book price validity""" + issues = [] + + # Check bid prices (should be descending) + for i, bid in enumerate(orderbook.bids): + if not validate_price(bid.price): + issues.append(f"Invalid bid price at level {i}: {bid.price}") + + if i > 0 and bid.price >= orderbook.bids[i-1].price: + issues.append(f"Bid prices not descending at level {i}") + + # Check ask prices (should be ascending) + for i, ask in enumerate(orderbook.asks): + if not validate_price(ask.price): + issues.append(f"Invalid ask price at level {i}: {ask.price}") + + if i > 0 and ask.price <= orderbook.asks[i-1].price: + issues.append(f"Ask prices not ascending at level {i}") + + # Check bid-ask ordering + if orderbook.bids and orderbook.asks: + if orderbook.bids[0].price >= orderbook.asks[0].price: + issues.append("Best bid >= best ask (crossed book)") + + return issues def + _check_orderbook_volumes(self, orderbook: OrderBookSnapshot) -> List[str]: + """Check order book volume validity""" + issues = [] + + # Check bid volumes + for i, bid in enumerate(orderbook.bids): + if not validate_volume(bid.size): + issues.append(f"Invalid bid volume at level {i}: {bid.size}") + + if bid.size < self.min_volume_threshold: + issues.append(f"Bid volume below threshold at level {i}: {bid.size}") + + # Check ask volumes + for i, ask in enumerate(orderbook.asks): + if not validate_volume(ask.size): + issues.append(f"Invalid ask volume at level {i}: {ask.size}") + + if ask.size < self.min_volume_threshold: + issues.append(f"Ask volume below threshold at level {i}: {ask.size}") + + return issues + + def _check_orderbook_spread(self, orderbook: OrderBookSnapshot) -> List[str]: + """Check order book spread validity""" + issues = [] + + if orderbook.mid_price and orderbook.spread: + spread_percentage = (orderbook.spread / orderbook.mid_price) * 100 + + if spread_percentage > self.max_spread_percentage: + issues.append(f"Spread too wide: {spread_percentage:.2f}%") + + if spread_percentage < 0: + issues.append(f"Negative spread: {spread_percentage:.2f}%") + + return issues + + def _check_timestamp(self, timestamp: datetime) -> List[str]: + """Check timestamp validity""" + issues = [] + + if not timestamp: + issues.append("Missing timestamp") + return issues + + # Check if timestamp is timezone-aware + if timestamp.tzinfo is None: + issues.append("Timestamp missing timezone info") + + # Check timestamp drift + current_time = get_current_timestamp() + time_diff = abs((timestamp - current_time).total_seconds()) + + if time_diff > self.max_timestamp_drift: + issues.append(f"Timestamp drift too large: {time_diff:.1f}s") + + return issues + + def _check_price_history(self, orderbook: OrderBookSnapshot) -> List[str]: + """Check price consistency with history""" + issues = [] + + key = f"{orderbook.symbol}_{orderbook.exchange}" + + if key in self.price_history and orderbook.mid_price: + last_price = self.price_history[key] + price_change = abs(orderbook.mid_price - last_price) / last_price * 100 + + if price_change > self.max_price_change_percentage: + issues.append(f"Large price change: {price_change:.2f}%") + + return issues + + def _update_price_history(self, orderbook: OrderBookSnapshot) -> None: + """Update price history for future validation""" + if orderbook.mid_price: + key = f"{orderbook.symbol}_{orderbook.exchange}" + self.price_history[key] = orderbook.mid_price + + def get_quality_summary(self) -> Dict[str, int]: + """Get summary of quality checks performed""" + return { + 'symbols_tracked': len(self.price_history), + 'max_spread_percentage': self.max_spread_percentage, + 'max_price_change_percentage': self.max_price_change_percentage, + 'min_volume_threshold': self.min_volume_threshold, + 'max_timestamp_drift': self.max_timestamp_drift + } \ No newline at end of file diff --git a/COBY/tests/test_binance_connector.py b/COBY/tests/test_binance_connector.py new file mode 100644 index 0000000..44f4dcf --- /dev/null +++ b/COBY/tests/test_binance_connector.py @@ -0,0 +1,341 @@ +""" +Tests for Binance exchange connector. +""" + +import pytest +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime, timezone + +from ..connectors.binance_connector import BinanceConnector +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel + + +@pytest.fixture +def binance_connector(): + """Create Binance connector for testing""" + return BinanceConnector() + + +@pytest.fixture +def sample_binance_orderbook_data(): + """Sample Binance order book data""" + return { + "lastUpdateId": 1027024, + "bids": [ + ["4.00000000", "431.00000000"], + ["3.99000000", "9.00000000"] + ], + "asks": [ + ["4.00000200", "12.00000000"], + ["4.01000000", "18.00000000"] + ] + } + + +@pytest.fixture +def sample_binance_depth_update(): + """Sample Binance depth update message""" + return { + "e": "depthUpdate", + "E": 1672515782136, + "s": "BTCUSDT", + "U": 157, + "u": 160, + "b": [ + ["50000.00", "0.25"], + ["49999.00", "0.50"] + ], + "a": [ + ["50001.00", "0.30"], + ["50002.00", "0.40"] + ] + } + + +@pytest.fixture +def sample_binance_trade_update(): + """Sample Binance trade update message""" + return { + "e": "trade", + "E": 1672515782136, + "s": "BTCUSDT", + "t": 12345, + "p": "50000.50", + "q": "0.10", + "b": 88, + "a": 50, + "T": 1672515782134, + "m": False, + "M": True + } + + +class TestBinanceConnector: + """Test cases for BinanceConnector""" + + def test_initialization(self, binance_connector): + """Test connector initialization""" + assert binance_connector.exchange_name == "binance" + assert binance_connector.websocket_url == BinanceConnector.WEBSOCKET_URL + assert len(binance_connector.message_handlers) >= 3 + assert binance_connector.stream_id == 1 + assert binance_connector.active_streams == [] + + def test_normalize_symbol(self, binance_connector): + """Test symbol normalization""" + # Test standard format + assert binance_connector.normalize_symbol("BTCUSDT") == "BTCUSDT" + + # Test with separators + assert binance_connector.normalize_symbol("BTC-USDT") == "BTCUSDT" + assert binance_connector.normalize_symbol("BTC/USDT") == "BTCUSDT" + + # Test lowercase + assert binance_connector.normalize_symbol("btcusdt") == "BTCUSDT" + + # Test invalid symbol + with pytest.raises(Exception): + binance_connector.normalize_symbol("") + + def test_get_message_type(self, binance_connector): + """Test message type detection""" + # Test depth update + depth_msg = {"e": "depthUpdate", "s": "BTCUSDT"} + assert binance_connector._get_message_type(depth_msg) == "depthUpdate" + + # Test trade update + trade_msg = {"e": "trade", "s": "BTCUSDT"} + assert binance_connector._get_message_type(trade_msg) == "trade" + + # Test error message + error_msg = {"error": {"code": -1121, "msg": "Invalid symbol"}} + assert binance_connector._get_message_type(error_msg) == "error" + + # Test unknown message + unknown_msg = {"data": "something"} + assert binance_connector._get_message_type(unknown_msg) == "unknown" + + def test_parse_orderbook_snapshot(self, binance_connector, sample_binance_orderbook_data): + """Test order book snapshot parsing""" + orderbook = binance_connector._parse_orderbook_snapshot( + sample_binance_orderbook_data, + "BTCUSDT" + ) + + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == "BTCUSDT" + assert orderbook.exchange == "binance" + assert len(orderbook.bids) == 2 + assert len(orderbook.asks) == 2 + assert orderbook.sequence_id == 1027024 + + # Check bid data + assert orderbook.bids[0].price == 4.0 + assert orderbook.bids[0].size == 431.0 + + # Check ask data + assert orderbook.asks[0].price == 4.000002 + assert orderbook.asks[0].size == 12.0 + + @pytest.mark.asyncio + async def test_handle_orderbook_update(self, binance_connector, sample_binance_depth_update): + """Test order book update handling""" + # Mock callback + callback_called = False + received_data = None + + def mock_callback(data): + nonlocal callback_called, received_data + callback_called = True + received_data = data + + binance_connector.add_data_callback(mock_callback) + + # Handle update + await binance_connector._handle_orderbook_update(sample_binance_depth_update) + + # Verify callback was called + assert callback_called + assert isinstance(received_data, OrderBookSnapshot) + assert received_data.symbol == "BTCUSDT" + assert received_data.exchange == "binance" + assert len(received_data.bids) == 2 + assert len(received_data.asks) == 2 + + @pytest.mark.asyncio + async def test_handle_trade_update(self, binance_connector, sample_binance_trade_update): + """Test trade update handling""" + # Mock callback + callback_called = False + received_data = None + + def mock_callback(data): + nonlocal callback_called, received_data + callback_called = True + received_data = data + + binance_connector.add_data_callback(mock_callback) + + # Handle update + await binance_connector._handle_trade_update(sample_binance_trade_update) + + # Verify callback was called + assert callback_called + assert isinstance(received_data, TradeEvent) + assert received_data.symbol == "BTCUSDT" + assert received_data.exchange == "binance" + assert received_data.price == 50000.50 + assert received_data.size == 0.10 + assert received_data.side == "buy" # m=False means buyer is not maker + assert received_data.trade_id == "12345" + + @pytest.mark.asyncio + async def test_subscribe_orderbook(self, binance_connector): + """Test order book subscription""" + # Mock WebSocket send + binance_connector._send_message = AsyncMock(return_value=True) + + # Subscribe + await binance_connector.subscribe_orderbook("BTCUSDT") + + # Verify subscription was sent + binance_connector._send_message.assert_called_once() + call_args = binance_connector._send_message.call_args[0][0] + + assert call_args["method"] == "SUBSCRIBE" + assert "btcusdt@depth@100ms" in call_args["params"] + assert call_args["id"] == 1 + + # Verify tracking + assert "BTCUSDT" in binance_connector.subscriptions + assert "orderbook" in binance_connector.subscriptions["BTCUSDT"] + assert "btcusdt@depth@100ms" in binance_connector.active_streams + assert binance_connector.stream_id == 2 + + @pytest.mark.asyncio + async def test_subscribe_trades(self, binance_connector): + """Test trade subscription""" + # Mock WebSocket send + binance_connector._send_message = AsyncMock(return_value=True) + + # Subscribe + await binance_connector.subscribe_trades("ETHUSDT") + + # Verify subscription was sent + binance_connector._send_message.assert_called_once() + call_args = binance_connector._send_message.call_args[0][0] + + assert call_args["method"] == "SUBSCRIBE" + assert "ethusdt@trade" in call_args["params"] + assert call_args["id"] == 1 + + # Verify tracking + assert "ETHUSDT" in binance_connector.subscriptions + assert "trades" in binance_connector.subscriptions["ETHUSDT"] + assert "ethusdt@trade" in binance_connector.active_streams + + @pytest.mark.asyncio + async def test_unsubscribe_orderbook(self, binance_connector): + """Test order book unsubscription""" + # Setup initial subscription + binance_connector.subscriptions["BTCUSDT"] = ["orderbook"] + binance_connector.active_streams.append("btcusdt@depth@100ms") + + # Mock WebSocket send + binance_connector._send_message = AsyncMock(return_value=True) + + # Unsubscribe + await binance_connector.unsubscribe_orderbook("BTCUSDT") + + # Verify unsubscription was sent + binance_connector._send_message.assert_called_once() + call_args = binance_connector._send_message.call_args[0][0] + + assert call_args["method"] == "UNSUBSCRIBE" + assert "btcusdt@depth@100ms" in call_args["params"] + + # Verify tracking removal + assert "BTCUSDT" not in binance_connector.subscriptions + assert "btcusdt@depth@100ms" not in binance_connector.active_streams + + @pytest.mark.asyncio + @patch('aiohttp.ClientSession.get') + async def test_get_symbols(self, mock_get, binance_connector): + """Test getting available symbols""" + # Mock API response + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={ + "symbols": [ + {"symbol": "BTCUSDT", "status": "TRADING"}, + {"symbol": "ETHUSDT", "status": "TRADING"}, + {"symbol": "ADAUSDT", "status": "BREAK"} # Should be filtered out + ] + }) + mock_get.return_value.__aenter__.return_value = mock_response + + # Get symbols + symbols = await binance_connector.get_symbols() + + # Verify results + assert len(symbols) == 2 + assert "BTCUSDT" in symbols + assert "ETHUSDT" in symbols + assert "ADAUSDT" not in symbols # Filtered out due to status + + @pytest.mark.asyncio + @patch('aiohttp.ClientSession.get') + async def test_get_orderbook_snapshot(self, mock_get, binance_connector, sample_binance_orderbook_data): + """Test getting order book snapshot""" + # Mock API response + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=sample_binance_orderbook_data) + mock_get.return_value.__aenter__.return_value = mock_response + + # Get order book snapshot + orderbook = await binance_connector.get_orderbook_snapshot("BTCUSDT", depth=20) + + # Verify results + assert isinstance(orderbook, OrderBookSnapshot) + assert orderbook.symbol == "BTCUSDT" + assert orderbook.exchange == "binance" + assert len(orderbook.bids) == 2 + assert len(orderbook.asks) == 2 + + def test_get_binance_stats(self, binance_connector): + """Test getting Binance-specific statistics""" + # Add some test data + binance_connector.active_streams = ["btcusdt@depth@100ms", "ethusdt@trade"] + binance_connector.stream_id = 5 + + stats = binance_connector.get_binance_stats() + + # Verify Binance-specific stats + assert stats['active_streams'] == 2 + assert len(stats['stream_list']) == 2 + assert stats['next_stream_id'] == 5 + + # Verify base stats are included + assert 'exchange' in stats + assert 'connection_status' in stats + assert 'message_count' in stats + + +if __name__ == "__main__": + # Run a simple test + async def simple_test(): + connector = BinanceConnector() + + # Test symbol normalization + normalized = connector.normalize_symbol("BTC-USDT") + print(f"Symbol normalization: BTC-USDT -> {normalized}") + + # Test message type detection + msg_type = connector._get_message_type({"e": "depthUpdate"}) + print(f"Message type detection: {msg_type}") + + print("Simple Binance connector test completed") + + asyncio.run(simple_test()) \ No newline at end of file diff --git a/COBY/tests/test_data_processor.py b/COBY/tests/test_data_processor.py new file mode 100644 index 0000000..47478b5 --- /dev/null +++ b/COBY/tests/test_data_processor.py @@ -0,0 +1,304 @@ +""" +Tests for data processing components. +""" + +import pytest +from datetime import datetime, timezone +from ..processing.data_processor import StandardDataProcessor +from ..processing.quality_checker import DataQualityChecker +from ..processing.anomaly_detector import AnomalyDetector +from ..processing.metrics_calculator import MetricsCalculator +from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel + + +@pytest.fixture +def data_processor(): + """Create data processor for testing""" + return StandardDataProcessor() + + +@pytest.fixture +def quality_checker(): + """Create quality checker for testing""" + return DataQualityChecker() + + +@pytest.fixture +def anomaly_detector(): + """Create anomaly detector for testing""" + return AnomalyDetector() + + +@pytest.fixture +def metrics_calculator(): + """Create metrics calculator for testing""" + return MetricsCalculator() + + +@pytest.fixture +def sample_orderbook(): + """Create sample order book for testing""" + return OrderBookSnapshot( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.now(timezone.utc), + bids=[ + PriceLevel(price=50000.0, size=1.5), + PriceLevel(price=49999.0, size=2.0), + PriceLevel(price=49998.0, size=1.0) + ], + asks=[ + PriceLevel(price=50001.0, size=1.0), + PriceLevel(price=50002.0, size=1.5), + PriceLevel(price=50003.0, size=2.0) + ] + ) + + +@pytest.fixture +def sample_trade(): + """Create sample trade for testing""" + return TradeEvent( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.now(timezone.utc), + price=50000.5, + size=0.1, + side="buy", + trade_id="test_trade_123" + ) + + +class TestDataQualityChecker: + """Test cases for DataQualityChecker""" + + def test_orderbook_quality_check(self, quality_checker, sample_orderbook): + """Test order book quality checking""" + quality_score, issues = quality_checker.check_orderbook_quality(sample_orderbook) + + assert 0.0 <= quality_score <= 1.0 + assert isinstance(issues, list) + + # Good order book should have high quality score + assert quality_score > 0.8 + + def test_trade_quality_check(self, quality_checker, sample_trade): + """Test trade quality checking""" + quality_score, issues = quality_checker.check_trade_quality(sample_trade) + + assert 0.0 <= quality_score <= 1.0 + assert isinstance(issues, list) + + # Good trade should have high quality score + assert quality_score > 0.8 + + def test_invalid_orderbook_detection(self, quality_checker): + """Test detection of invalid order book""" + # Create invalid order book with crossed spread + invalid_orderbook = OrderBookSnapshot( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.now(timezone.utc), + bids=[PriceLevel(price=50002.0, size=1.0)], # Bid higher than ask + asks=[PriceLevel(price=50001.0, size=1.0)] # Ask lower than bid + ) + + quality_score, issues = quality_checker.check_orderbook_quality(invalid_orderbook) + + assert quality_score < 0.8 + assert any("crossed book" in issue.lower() for issue in issues) + + +class TestAnomalyDetector: + """Test cases for AnomalyDetector""" + + def test_orderbook_anomaly_detection(self, anomaly_detector, sample_orderbook): + """Test order book anomaly detection""" + # First few order books should not trigger anomalies + for _ in range(5): + anomalies = anomaly_detector.detect_orderbook_anomalies(sample_orderbook) + assert isinstance(anomalies, list) + + def test_trade_anomaly_detection(self, anomaly_detector, sample_trade): + """Test trade anomaly detection""" + # First few trades should not trigger anomalies + for _ in range(5): + anomalies = anomaly_detector.detect_trade_anomalies(sample_trade) + assert isinstance(anomalies, list) + + def test_price_spike_detection(self, anomaly_detector): + """Test price spike detection""" + # Create normal order books + for i in range(20): + normal_orderbook = OrderBookSnapshot( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.now(timezone.utc), + bids=[PriceLevel(price=50000.0 + i, size=1.0)], + asks=[PriceLevel(price=50001.0 + i, size=1.0)] + ) + anomaly_detector.detect_orderbook_anomalies(normal_orderbook) + + # Create order book with price spike + spike_orderbook = OrderBookSnapshot( + symbol="BTCUSDT", + exchange="binance", + timestamp=datetime.now(timezone.utc), + bids=[PriceLevel(price=60000.0, size=1.0)], # 20% spike + asks=[PriceLevel(price=60001.0, size=1.0)] + ) + + anomalies = anomaly_detector.detect_orderbook_anomalies(spike_orderbook) + assert len(anomalies) > 0 + assert any("spike" in anomaly.lower() for anomaly in anomalies) + + +class TestMetricsCalculator: + """Test cases for MetricsCalculator""" + + def test_orderbook_metrics_calculation(self, metrics_calculator, sample_orderbook): + """Test order book metrics calculation""" + metrics = metrics_calculator.calculate_orderbook_metrics(sample_orderbook) + + assert metrics.symbol == "BTCUSDT" + assert metrics.exchange == "binance" + assert metrics.mid_price == 50000.5 # (50000 + 50001) / 2 + assert metrics.spread == 1.0 # 50001 - 50000 + assert metrics.spread_percentage > 0 + assert metrics.bid_volume == 4.5 # 1.5 + 2.0 + 1.0 + assert metrics.ask_volume == 4.5 # 1.0 + 1.5 + 2.0 + assert metrics.volume_imbalance == 0.0 # Equal volumes + + def test_imbalance_metrics_calculation(self, metrics_calculator, sample_orderbook): + """Test imbalance metrics calculation""" + imbalance = metrics_calculator.calculate_imbalance_metrics(sample_orderbook) + + assert imbalance.symbol == "BTCUSDT" + assert -1.0 <= imbalance.volume_imbalance <= 1.0 + assert -1.0 <= imbalance.price_imbalance <= 1.0 + assert -1.0 <= imbalance.depth_imbalance <= 1.0 + assert -1.0 <= imbalance.momentum_score <= 1.0 + + def test_liquidity_score_calculation(self, metrics_calculator, sample_orderbook): + """Test liquidity score calculation""" + liquidity_score = metrics_calculator.calculate_liquidity_score(sample_orderbook) + + assert 0.0 <= liquidity_score <= 1.0 + assert liquidity_score > 0.5 # Good order book should have decent liquidity + + +class TestStandardDataProcessor: + """Test cases for StandardDataProcessor""" + + def test_data_validation(self, data_processor, sample_orderbook, sample_trade): + """Test data validation""" + # Valid data should pass validation + assert data_processor.validate_data(sample_orderbook) is True + assert data_processor.validate_data(sample_trade) is True + + def test_metrics_calculation(self, data_processor, sample_orderbook): + """Test metrics calculation through processor""" + metrics = data_processor.calculate_metrics(sample_orderbook) + + assert metrics.symbol == "BTCUSDT" + assert metrics.mid_price > 0 + assert metrics.spread > 0 + + def test_anomaly_detection(self, data_processor, sample_orderbook, sample_trade): + """Test anomaly detection through processor""" + orderbook_anomalies = data_processor.detect_anomalies(sample_orderbook) + trade_anomalies = data_processor.detect_anomalies(sample_trade) + + assert isinstance(orderbook_anomalies, list) + assert isinstance(trade_anomalies, list) + + def test_data_filtering(self, data_processor, sample_orderbook, sample_trade): + """Test data filtering""" + # Test symbol filter + criteria = {'symbols': ['BTCUSDT']} + assert data_processor.filter_data(sample_orderbook, criteria) is True + assert data_processor.filter_data(sample_trade, criteria) is True + + criteria = {'symbols': ['ETHUSDT']} + assert data_processor.filter_data(sample_orderbook, criteria) is False + assert data_processor.filter_data(sample_trade, criteria) is False + + # Test price range filter + criteria = {'price_range': (40000, 60000)} + assert data_processor.filter_data(sample_orderbook, criteria) is True + assert data_processor.filter_data(sample_trade, criteria) is True + + criteria = {'price_range': (60000, 70000)} + assert data_processor.filter_data(sample_orderbook, criteria) is False + assert data_processor.filter_data(sample_trade, criteria) is False + + def test_data_enrichment(self, data_processor, sample_orderbook, sample_trade): + """Test data enrichment""" + orderbook_enriched = data_processor.enrich_data(sample_orderbook) + trade_enriched = data_processor.enrich_data(sample_trade) + + # Check enriched data structure + assert 'original_data' in orderbook_enriched + assert 'quality_score' in orderbook_enriched + assert 'anomalies' in orderbook_enriched + assert 'processing_timestamp' in orderbook_enriched + + assert 'original_data' in trade_enriched + assert 'quality_score' in trade_enriched + assert 'anomalies' in trade_enriched + assert 'trade_value' in trade_enriched + + def test_quality_score_calculation(self, data_processor, sample_orderbook, sample_trade): + """Test quality score calculation""" + orderbook_score = data_processor.get_data_quality_score(sample_orderbook) + trade_score = data_processor.get_data_quality_score(sample_trade) + + assert 0.0 <= orderbook_score <= 1.0 + assert 0.0 <= trade_score <= 1.0 + + # Good data should have high quality scores + assert orderbook_score > 0.8 + assert trade_score > 0.8 + + def test_processing_stats(self, data_processor, sample_orderbook, sample_trade): + """Test processing statistics""" + # Process some data + data_processor.validate_data(sample_orderbook) + data_processor.validate_data(sample_trade) + + stats = data_processor.get_processing_stats() + + assert 'processed_orderbooks' in stats + assert 'processed_trades' in stats + assert 'quality_failures' in stats + assert 'anomalies_detected' in stats + assert stats['processed_orderbooks'] >= 1 + assert stats['processed_trades'] >= 1 + + +if __name__ == "__main__": + # Run simple tests + processor = StandardDataProcessor() + + # Test with sample data + orderbook = OrderBookSnapshot( + symbol="BTCUSDT", + exchange="test", + timestamp=datetime.now(timezone.utc), + bids=[PriceLevel(price=50000.0, size=1.0)], + asks=[PriceLevel(price=50001.0, size=1.0)] + ) + + # Test validation + is_valid = processor.validate_data(orderbook) + print(f"Order book validation: {'PASSED' if is_valid else 'FAILED'}") + + # Test metrics + metrics = processor.calculate_metrics(orderbook) + print(f"Metrics calculation: mid_price={metrics.mid_price}, spread={metrics.spread}") + + # Test quality score + quality_score = processor.get_data_quality_score(orderbook) + print(f"Quality score: {quality_score:.2f}") + + print("Simple data processor test completed") \ No newline at end of file