From 8ee9b7a90c29651cec8a6a7485bded5d4f9a4f3f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 4 Aug 2025 17:40:30 +0300 Subject: [PATCH] wip --- .../multi-exchange-data-aggregation/tasks.md | 3 + COBY/aggregation/__init__.py | 15 + COBY/aggregation/aggregation_engine.py | 338 +++++++++++++++ COBY/aggregation/cross_exchange_aggregator.py | 390 ++++++++++++++++++ COBY/aggregation/heatmap_generator.py | 376 +++++++++++++++++ COBY/aggregation/price_bucketer.py | 341 +++++++++++++++ COBY/config.py | 19 +- 7 files changed, 1468 insertions(+), 14 deletions(-) create mode 100644 COBY/aggregation/__init__.py create mode 100644 COBY/aggregation/aggregation_engine.py create mode 100644 COBY/aggregation/cross_exchange_aggregator.py create mode 100644 COBY/aggregation/heatmap_generator.py create mode 100644 COBY/aggregation/price_bucketer.py diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index d9ee10c..385a3cd 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -37,6 +37,9 @@ - [ ] 4. Implement Binance exchange connector - Create Binance-specific WebSocket connector extending the base framework + + + - Implement order book depth stream subscription and processing - Add trade stream subscription for volume analysis - Implement data normalization from Binance format to standard format diff --git a/COBY/aggregation/__init__.py b/COBY/aggregation/__init__.py new file mode 100644 index 0000000..285d184 --- /dev/null +++ b/COBY/aggregation/__init__.py @@ -0,0 +1,15 @@ +""" +Data aggregation components for the COBY system. +""" + +from .aggregation_engine import StandardAggregationEngine +from .price_bucketer import PriceBucketer +from .heatmap_generator import HeatmapGenerator +from .cross_exchange_aggregator import CrossExchangeAggregator + +__all__ = [ + 'StandardAggregationEngine', + 'PriceBucketer', + 'HeatmapGenerator', + 'CrossExchangeAggregator' +] \ No newline at end of file diff --git a/COBY/aggregation/aggregation_engine.py b/COBY/aggregation/aggregation_engine.py new file mode 100644 index 0000000..b3e87c4 --- /dev/null +++ b/COBY/aggregation/aggregation_engine.py @@ -0,0 +1,338 @@ +""" +Main aggregation engine implementation. +""" + +from typing import Dict, List +from ..interfaces.aggregation_engine import AggregationEngine +from ..models.core import ( + OrderBookSnapshot, PriceBuckets, HeatmapData, + ImbalanceMetrics, ConsolidatedOrderBook +) +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import AggregationError +from .price_bucketer import PriceBucketer +from .heatmap_generator import HeatmapGenerator +from .cross_exchange_aggregator import CrossExchangeAggregator +from ..processing.metrics_calculator import MetricsCalculator + +logger = get_logger(__name__) + + +class StandardAggregationEngine(AggregationEngine): + """ + Standard implementation of aggregation engine interface. + + Provides: + - Price bucket creation with $1 USD buckets + - Heatmap generation + - Cross-exchange aggregation + - Imbalance calculations + - Support/resistance detection + """ + + def __init__(self): + """Initialize aggregation engine with components""" + self.price_bucketer = PriceBucketer() + self.heatmap_generator = HeatmapGenerator() + self.cross_exchange_aggregator = CrossExchangeAggregator() + self.metrics_calculator = MetricsCalculator() + + # Processing statistics + self.buckets_created = 0 + self.heatmaps_generated = 0 + self.consolidations_performed = 0 + + logger.info("Standard aggregation engine initialized") + + def create_price_buckets(self, orderbook: OrderBookSnapshot, + bucket_size: float = None) -> PriceBuckets: + """ + Convert order book data to price buckets. + + Args: + orderbook: Order book snapshot + bucket_size: Size of each price bucket (uses $1 default) + + Returns: + PriceBuckets: Aggregated price bucket data + """ + try: + set_correlation_id() + + # Use provided bucket size or default $1 + if bucket_size: + bucketer = PriceBucketer(bucket_size) + else: + bucketer = self.price_bucketer + + buckets = bucketer.create_price_buckets(orderbook) + self.buckets_created += 1 + + logger.debug(f"Created price buckets for {orderbook.symbol}@{orderbook.exchange}") + return buckets + + except Exception as e: + logger.error(f"Error creating price buckets: {e}") + raise AggregationError(f"Price bucket creation failed: {e}", "BUCKET_ERROR") + + def update_heatmap(self, symbol: str, buckets: PriceBuckets) -> HeatmapData: + """ + Update heatmap data with new price buckets. + + Args: + symbol: Trading symbol + buckets: Price bucket data + + Returns: + HeatmapData: Updated heatmap visualization data + """ + try: + set_correlation_id() + + heatmap = self.heatmap_generator.generate_heatmap(buckets) + self.heatmaps_generated += 1 + + logger.debug(f"Generated heatmap for {symbol}: {len(heatmap.data)} points") + return heatmap + + except Exception as e: + logger.error(f"Error updating heatmap: {e}") + raise AggregationError(f"Heatmap update failed: {e}", "HEATMAP_ERROR") + + def calculate_imbalances(self, orderbook: OrderBookSnapshot) -> ImbalanceMetrics: + """ + Calculate order book imbalance metrics. + + Args: + orderbook: Order book snapshot + + Returns: + ImbalanceMetrics: Calculated imbalance metrics + """ + try: + set_correlation_id() + + return self.metrics_calculator.calculate_imbalance_metrics(orderbook) + + except Exception as e: + logger.error(f"Error calculating imbalances: {e}") + raise AggregationError(f"Imbalance calculation failed: {e}", "IMBALANCE_ERROR") + + def aggregate_across_exchanges(self, symbol: str, + orderbooks: List[OrderBookSnapshot]) -> ConsolidatedOrderBook: + """ + Aggregate order book data from multiple exchanges. + + Args: + symbol: Trading symbol + orderbooks: List of order book snapshots from different exchanges + + Returns: + ConsolidatedOrderBook: Consolidated order book data + """ + try: + set_correlation_id() + + consolidated = self.cross_exchange_aggregator.aggregate_across_exchanges( + symbol, orderbooks + ) + self.consolidations_performed += 1 + + logger.debug(f"Consolidated {len(orderbooks)} order books for {symbol}") + return consolidated + + except Exception as e: + logger.error(f"Error aggregating across exchanges: {e}") + raise AggregationError(f"Cross-exchange aggregation failed: {e}", "CONSOLIDATION_ERROR") + + def calculate_volume_weighted_price(self, orderbooks: List[OrderBookSnapshot]) -> float: + """ + Calculate volume-weighted average price across exchanges. + + Args: + orderbooks: List of order book snapshots + + Returns: + float: Volume-weighted average price + """ + try: + set_correlation_id() + + return self.cross_exchange_aggregator._calculate_weighted_mid_price(orderbooks) + + except Exception as e: + logger.error(f"Error calculating volume weighted price: {e}") + raise AggregationError(f"VWAP calculation failed: {e}", "VWAP_ERROR") + + def get_market_depth(self, orderbook: OrderBookSnapshot, + depth_levels: List[float]) -> Dict[float, Dict[str, float]]: + """ + Calculate market depth at different price levels. + + Args: + orderbook: Order book snapshot + depth_levels: List of depth percentages (e.g., [0.1, 0.5, 1.0]) + + Returns: + Dict: Market depth data {level: {'bid_volume': x, 'ask_volume': y}} + """ + try: + set_correlation_id() + + depth_data = {} + + if not orderbook.mid_price: + return depth_data + + for level_pct in depth_levels: + # Calculate price range for this depth level + price_range = orderbook.mid_price * (level_pct / 100.0) + min_bid_price = orderbook.mid_price - price_range + max_ask_price = orderbook.mid_price + price_range + + # Calculate volumes within this range + bid_volume = sum( + bid.size for bid in orderbook.bids + if bid.price >= min_bid_price + ) + + ask_volume = sum( + ask.size for ask in orderbook.asks + if ask.price <= max_ask_price + ) + + depth_data[level_pct] = { + 'bid_volume': bid_volume, + 'ask_volume': ask_volume, + 'total_volume': bid_volume + ask_volume + } + + logger.debug(f"Calculated market depth for {len(depth_levels)} levels") + return depth_data + + except Exception as e: + logger.error(f"Error calculating market depth: {e}") + return {} + + def smooth_heatmap(self, heatmap: HeatmapData, smoothing_factor: float) -> HeatmapData: + """ + Apply smoothing to heatmap data to reduce noise. + + Args: + heatmap: Raw heatmap data + smoothing_factor: Smoothing factor (0.0 to 1.0) + + Returns: + HeatmapData: Smoothed heatmap data + """ + try: + set_correlation_id() + + return self.heatmap_generator.apply_smoothing(heatmap, smoothing_factor) + + except Exception as e: + logger.error(f"Error smoothing heatmap: {e}") + return heatmap # Return original on error + + def calculate_liquidity_score(self, orderbook: OrderBookSnapshot) -> float: + """ + Calculate liquidity score for an order book. + + Args: + orderbook: Order book snapshot + + Returns: + float: Liquidity score (0.0 to 1.0) + """ + try: + set_correlation_id() + + return self.metrics_calculator.calculate_liquidity_score(orderbook) + + except Exception as e: + logger.error(f"Error calculating liquidity score: {e}") + return 0.0 + + def detect_support_resistance(self, heatmap: HeatmapData) -> Dict[str, List[float]]: + """ + Detect support and resistance levels from heatmap data. + + Args: + heatmap: Heatmap data + + Returns: + Dict: {'support': [prices], 'resistance': [prices]} + """ + try: + set_correlation_id() + + return self.heatmap_generator.calculate_support_resistance(heatmap) + + except Exception as e: + logger.error(f"Error detecting support/resistance: {e}") + return {'support': [], 'resistance': []} + + def create_consolidated_heatmap(self, symbol: str, + orderbooks: List[OrderBookSnapshot]) -> HeatmapData: + """ + Create consolidated heatmap from multiple exchanges. + + Args: + symbol: Trading symbol + orderbooks: List of order book snapshots + + Returns: + HeatmapData: Consolidated heatmap data + """ + try: + set_correlation_id() + + return self.cross_exchange_aggregator.create_consolidated_heatmap( + symbol, orderbooks + ) + + except Exception as e: + logger.error(f"Error creating consolidated heatmap: {e}") + raise AggregationError(f"Consolidated heatmap creation failed: {e}", "CONSOLIDATED_HEATMAP_ERROR") + + def detect_arbitrage_opportunities(self, orderbooks: List[OrderBookSnapshot]) -> List[Dict]: + """ + Detect arbitrage opportunities between exchanges. + + Args: + orderbooks: List of order book snapshots + + Returns: + List[Dict]: Arbitrage opportunities + """ + try: + set_correlation_id() + + return self.cross_exchange_aggregator.detect_arbitrage_opportunities(orderbooks) + + except Exception as e: + logger.error(f"Error detecting arbitrage opportunities: {e}") + return [] + + def get_processing_stats(self) -> Dict[str, any]: + """Get processing statistics""" + return { + 'buckets_created': self.buckets_created, + 'heatmaps_generated': self.heatmaps_generated, + 'consolidations_performed': self.consolidations_performed, + 'price_bucketer_stats': self.price_bucketer.get_processing_stats(), + 'heatmap_generator_stats': self.heatmap_generator.get_processing_stats(), + 'cross_exchange_stats': self.cross_exchange_aggregator.get_processing_stats() + } + + def reset_stats(self) -> None: + """Reset processing statistics""" + self.buckets_created = 0 + self.heatmaps_generated = 0 + self.consolidations_performed = 0 + + self.price_bucketer.reset_stats() + self.heatmap_generator.reset_stats() + self.cross_exchange_aggregator.reset_stats() + + logger.info("Aggregation engine statistics reset") \ No newline at end of file diff --git a/COBY/aggregation/cross_exchange_aggregator.py b/COBY/aggregation/cross_exchange_aggregator.py new file mode 100644 index 0000000..a77132d --- /dev/null +++ b/COBY/aggregation/cross_exchange_aggregator.py @@ -0,0 +1,390 @@ +""" +Cross-exchange data aggregation and consolidation. +""" + +from typing import List, Dict, Optional +from collections import defaultdict +from datetime import datetime +from ..models.core import ( + OrderBookSnapshot, ConsolidatedOrderBook, PriceLevel, + PriceBuckets, HeatmapData, HeatmapPoint +) +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp +from .price_bucketer import PriceBucketer +from .heatmap_generator import HeatmapGenerator + +logger = get_logger(__name__) + + +class CrossExchangeAggregator: + """ + Aggregates data across multiple exchanges. + + Provides consolidated order books and cross-exchange heatmaps. + """ + + def __init__(self): + """Initialize cross-exchange aggregator""" + self.price_bucketer = PriceBucketer() + self.heatmap_generator = HeatmapGenerator() + + # Exchange weights for aggregation + self.exchange_weights = { + 'binance': 1.0, + 'coinbase': 0.9, + 'kraken': 0.8, + 'bybit': 0.7, + 'okx': 0.7, + 'huobi': 0.6, + 'kucoin': 0.6, + 'gateio': 0.5, + 'bitfinex': 0.5, + 'mexc': 0.4 + } + + # Statistics + self.consolidations_performed = 0 + self.exchanges_processed = set() + + logger.info("Cross-exchange aggregator initialized") + + def aggregate_across_exchanges(self, symbol: str, + orderbooks: List[OrderBookSnapshot]) -> ConsolidatedOrderBook: + """ + Aggregate order book data from multiple exchanges. + + Args: + symbol: Trading symbol + orderbooks: List of order book snapshots from different exchanges + + Returns: + ConsolidatedOrderBook: Consolidated order book data + """ + if not orderbooks: + raise ValueError("Cannot aggregate empty orderbook list") + + try: + # Track exchanges + exchanges = [ob.exchange for ob in orderbooks] + self.exchanges_processed.update(exchanges) + + # Calculate weighted mid price + weighted_mid_price = self._calculate_weighted_mid_price(orderbooks) + + # Consolidate bids and asks + consolidated_bids = self._consolidate_price_levels( + [ob.bids for ob in orderbooks], + [ob.exchange for ob in orderbooks], + 'bid' + ) + + consolidated_asks = self._consolidate_price_levels( + [ob.asks for ob in orderbooks], + [ob.exchange for ob in orderbooks], + 'ask' + ) + + # Calculate total volumes + total_bid_volume = sum(level.size for level in consolidated_bids) + total_ask_volume = sum(level.size for level in consolidated_asks) + + # Create consolidated order book + consolidated = ConsolidatedOrderBook( + symbol=symbol, + timestamp=get_current_timestamp(), + exchanges=exchanges, + bids=consolidated_bids, + asks=consolidated_asks, + weighted_mid_price=weighted_mid_price, + total_bid_volume=total_bid_volume, + total_ask_volume=total_ask_volume, + exchange_weights={ex: self.exchange_weights.get(ex, 0.5) for ex in exchanges} + ) + + self.consolidations_performed += 1 + + logger.debug( + f"Consolidated {len(orderbooks)} order books for {symbol}: " + f"{len(consolidated_bids)} bids, {len(consolidated_asks)} asks" + ) + + return consolidated + + except Exception as e: + logger.error(f"Error aggregating across exchanges: {e}") + raise + + def create_consolidated_heatmap(self, symbol: str, + orderbooks: List[OrderBookSnapshot]) -> HeatmapData: + """ + Create consolidated heatmap from multiple exchanges. + + Args: + symbol: Trading symbol + orderbooks: List of order book snapshots + + Returns: + HeatmapData: Consolidated heatmap data + """ + try: + # Create price buckets for each exchange + all_buckets = [] + for orderbook in orderbooks: + buckets = self.price_bucketer.create_price_buckets(orderbook) + all_buckets.append(buckets) + + # Aggregate all buckets + if len(all_buckets) == 1: + consolidated_buckets = all_buckets[0] + else: + consolidated_buckets = self.price_bucketer.aggregate_buckets(all_buckets) + + # Generate heatmap from consolidated buckets + heatmap = self.heatmap_generator.generate_heatmap(consolidated_buckets) + + # Add exchange metadata to heatmap points + self._add_exchange_metadata(heatmap, orderbooks) + + logger.debug(f"Created consolidated heatmap for {symbol} from {len(orderbooks)} exchanges") + return heatmap + + except Exception as e: + logger.error(f"Error creating consolidated heatmap: {e}") + raise + + def _calculate_weighted_mid_price(self, orderbooks: List[OrderBookSnapshot]) -> float: + """Calculate volume-weighted mid price across exchanges""" + total_weight = 0.0 + weighted_sum = 0.0 + + for orderbook in orderbooks: + if orderbook.mid_price: + # Use total volume as weight + volume_weight = orderbook.bid_volume + orderbook.ask_volume + exchange_weight = self.exchange_weights.get(orderbook.exchange, 0.5) + + # Combined weight + weight = volume_weight * exchange_weight + + weighted_sum += orderbook.mid_price * weight + total_weight += weight + + return weighted_sum / total_weight if total_weight > 0 else 0.0 + + def _consolidate_price_levels(self, level_lists: List[List[PriceLevel]], + exchanges: List[str], side: str) -> List[PriceLevel]: + """Consolidate price levels from multiple exchanges""" + # Group levels by price bucket + price_groups = defaultdict(lambda: {'size': 0.0, 'count': 0, 'exchanges': set()}) + + for levels, exchange in zip(level_lists, exchanges): + exchange_weight = self.exchange_weights.get(exchange, 0.5) + + for level in levels: + # Round price to bucket + bucket_price = self.price_bucketer.get_bucket_price(level.price) + + # Add weighted volume + weighted_size = level.size * exchange_weight + price_groups[bucket_price]['size'] += weighted_size + price_groups[bucket_price]['count'] += level.count or 1 + price_groups[bucket_price]['exchanges'].add(exchange) + + # Create consolidated price levels + consolidated_levels = [] + for price, data in price_groups.items(): + if data['size'] > 0: # Only include non-zero volumes + level = PriceLevel( + price=price, + size=data['size'], + count=data['count'] + ) + consolidated_levels.append(level) + + # Sort levels appropriately + if side == 'bid': + consolidated_levels.sort(key=lambda x: x.price, reverse=True) + else: + consolidated_levels.sort(key=lambda x: x.price) + + return consolidated_levels + + def _add_exchange_metadata(self, heatmap: HeatmapData, + orderbooks: List[OrderBookSnapshot]) -> None: + """Add exchange metadata to heatmap points""" + # Create exchange mapping by price bucket + exchange_map = defaultdict(set) + + for orderbook in orderbooks: + # Map bid prices to exchanges + for bid in orderbook.bids: + bucket_price = self.price_bucketer.get_bucket_price(bid.price) + exchange_map[bucket_price].add(orderbook.exchange) + + # Map ask prices to exchanges + for ask in orderbook.asks: + bucket_price = self.price_bucketer.get_bucket_price(ask.price) + exchange_map[bucket_price].add(orderbook.exchange) + + # Add exchange information to heatmap points + for point in heatmap.data: + bucket_price = self.price_bucketer.get_bucket_price(point.price) + # Store exchange info in a custom attribute (would need to extend HeatmapPoint) + # For now, we'll log it + exchanges_at_price = exchange_map.get(bucket_price, set()) + if len(exchanges_at_price) > 1: + logger.debug(f"Price {point.price} has data from {len(exchanges_at_price)} exchanges") + + def calculate_exchange_dominance(self, orderbooks: List[OrderBookSnapshot]) -> Dict[str, float]: + """ + Calculate which exchanges dominate at different price levels. + + Args: + orderbooks: List of order book snapshots + + Returns: + Dict[str, float]: Exchange dominance scores + """ + exchange_volumes = defaultdict(float) + total_volume = 0.0 + + for orderbook in orderbooks: + volume = orderbook.bid_volume + orderbook.ask_volume + exchange_volumes[orderbook.exchange] += volume + total_volume += volume + + # Calculate dominance percentages + dominance = {} + for exchange, volume in exchange_volumes.items(): + dominance[exchange] = (volume / total_volume * 100) if total_volume > 0 else 0.0 + + return dominance + + def detect_arbitrage_opportunities(self, orderbooks: List[OrderBookSnapshot], + min_spread_pct: float = 0.1) -> List[Dict]: + """ + Detect potential arbitrage opportunities between exchanges. + + Args: + orderbooks: List of order book snapshots + min_spread_pct: Minimum spread percentage to consider + + Returns: + List[Dict]: Arbitrage opportunities + """ + opportunities = [] + + if len(orderbooks) < 2: + return opportunities + + try: + # Find best bid and ask across exchanges + best_bids = [] + best_asks = [] + + for orderbook in orderbooks: + if orderbook.bids and orderbook.asks: + best_bids.append({ + 'exchange': orderbook.exchange, + 'price': orderbook.bids[0].price, + 'size': orderbook.bids[0].size + }) + best_asks.append({ + 'exchange': orderbook.exchange, + 'price': orderbook.asks[0].price, + 'size': orderbook.asks[0].size + }) + + # Sort to find best opportunities + best_bids.sort(key=lambda x: x['price'], reverse=True) + best_asks.sort(key=lambda x: x['price']) + + # Check for arbitrage opportunities + for bid in best_bids: + for ask in best_asks: + if bid['exchange'] != ask['exchange'] and bid['price'] > ask['price']: + spread = bid['price'] - ask['price'] + spread_pct = (spread / ask['price']) * 100 + + if spread_pct >= min_spread_pct: + opportunities.append({ + 'buy_exchange': ask['exchange'], + 'sell_exchange': bid['exchange'], + 'buy_price': ask['price'], + 'sell_price': bid['price'], + 'spread': spread, + 'spread_percentage': spread_pct, + 'max_size': min(bid['size'], ask['size']) + }) + + # Sort by spread percentage + opportunities.sort(key=lambda x: x['spread_percentage'], reverse=True) + + if opportunities: + logger.info(f"Found {len(opportunities)} arbitrage opportunities") + + return opportunities + + except Exception as e: + logger.error(f"Error detecting arbitrage opportunities: {e}") + return [] + + def get_exchange_correlation(self, orderbooks: List[OrderBookSnapshot]) -> Dict[str, Dict[str, float]]: + """ + Calculate price correlation between exchanges. + + Args: + orderbooks: List of order book snapshots + + Returns: + Dict: Correlation matrix between exchanges + """ + correlations = {} + + # Extract mid prices by exchange + exchange_prices = {} + for orderbook in orderbooks: + if orderbook.mid_price: + exchange_prices[orderbook.exchange] = orderbook.mid_price + + # Calculate simple correlation (would need historical data for proper correlation) + exchanges = list(exchange_prices.keys()) + for i, exchange1 in enumerate(exchanges): + correlations[exchange1] = {} + for j, exchange2 in enumerate(exchanges): + if i == j: + correlations[exchange1][exchange2] = 1.0 + else: + # Simple price difference as correlation proxy + price1 = exchange_prices[exchange1] + price2 = exchange_prices[exchange2] + diff_pct = abs(price1 - price2) / max(price1, price2) * 100 + # Convert to correlation-like score (lower difference = higher correlation) + correlation = max(0.0, 1.0 - (diff_pct / 10.0)) + correlations[exchange1][exchange2] = correlation + + return correlations + + def get_processing_stats(self) -> Dict[str, int]: + """Get processing statistics""" + return { + 'consolidations_performed': self.consolidations_performed, + 'unique_exchanges_processed': len(self.exchanges_processed), + 'exchanges_processed': list(self.exchanges_processed), + 'bucketer_stats': self.price_bucketer.get_processing_stats(), + 'heatmap_stats': self.heatmap_generator.get_processing_stats() + } + + def update_exchange_weights(self, new_weights: Dict[str, float]) -> None: + """Update exchange weights for aggregation""" + self.exchange_weights.update(new_weights) + logger.info(f"Updated exchange weights: {new_weights}") + + def reset_stats(self) -> None: + """Reset processing statistics""" + self.consolidations_performed = 0 + self.exchanges_processed.clear() + self.price_bucketer.reset_stats() + self.heatmap_generator.reset_stats() + logger.info("Cross-exchange aggregator statistics reset") \ No newline at end of file diff --git a/COBY/aggregation/heatmap_generator.py b/COBY/aggregation/heatmap_generator.py new file mode 100644 index 0000000..3a2bc98 --- /dev/null +++ b/COBY/aggregation/heatmap_generator.py @@ -0,0 +1,376 @@ +""" +Heatmap data generation from price buckets. +""" + +from typing import List, Dict, Optional, Tuple +from ..models.core import PriceBuckets, HeatmapData, HeatmapPoint +from ..config import config +from ..utils.logging import get_logger + +logger = get_logger(__name__) + + +class HeatmapGenerator: + """ + Generates heatmap visualization data from price buckets. + + Creates intensity-based heatmap points for visualization. + """ + + def __init__(self): + """Initialize heatmap generator""" + self.heatmaps_generated = 0 + self.total_points_created = 0 + + logger.info("Heatmap generator initialized") + + def generate_heatmap(self, buckets: PriceBuckets, + max_points: Optional[int] = None) -> HeatmapData: + """ + Generate heatmap data from price buckets. + + Args: + buckets: Price buckets to convert + max_points: Maximum number of points to include (None = all) + + Returns: + HeatmapData: Heatmap visualization data + """ + try: + heatmap = HeatmapData( + symbol=buckets.symbol, + timestamp=buckets.timestamp, + bucket_size=buckets.bucket_size + ) + + # Calculate maximum volume for intensity normalization + all_volumes = list(buckets.bid_buckets.values()) + list(buckets.ask_buckets.values()) + max_volume = max(all_volumes) if all_volumes else 1.0 + + # Generate bid points + bid_points = self._create_heatmap_points( + buckets.bid_buckets, 'bid', max_volume + ) + + # Generate ask points + ask_points = self._create_heatmap_points( + buckets.ask_buckets, 'ask', max_volume + ) + + # Combine all points + all_points = bid_points + ask_points + + # Limit points if requested + if max_points and len(all_points) > max_points: + # Sort by volume and take top points + all_points.sort(key=lambda p: p.volume, reverse=True) + all_points = all_points[:max_points] + + heatmap.data = all_points + + self.heatmaps_generated += 1 + self.total_points_created += len(all_points) + + logger.debug( + f"Generated heatmap for {buckets.symbol}: {len(all_points)} points " + f"(max_volume: {max_volume:.6f})" + ) + + return heatmap + + except Exception as e: + logger.error(f"Error generating heatmap: {e}") + raise + + def _create_heatmap_points(self, bucket_dict: Dict[float, float], + side: str, max_volume: float) -> List[HeatmapPoint]: + """ + Create heatmap points from bucket dictionary. + + Args: + bucket_dict: Dictionary of price -> volume + side: 'bid' or 'ask' + max_volume: Maximum volume for intensity calculation + + Returns: + List[HeatmapPoint]: List of heatmap points + """ + points = [] + + for price, volume in bucket_dict.items(): + if volume > 0: # Only include non-zero volumes + intensity = min(volume / max_volume, 1.0) if max_volume > 0 else 0.0 + + point = HeatmapPoint( + price=price, + volume=volume, + intensity=intensity, + side=side + ) + points.append(point) + + return points + + def apply_smoothing(self, heatmap: HeatmapData, + smoothing_factor: float = 0.3) -> HeatmapData: + """ + Apply smoothing to heatmap data to reduce noise. + + Args: + heatmap: Original heatmap data + smoothing_factor: Smoothing factor (0.0 = no smoothing, 1.0 = maximum) + + Returns: + HeatmapData: Smoothed heatmap data + """ + if smoothing_factor <= 0: + return heatmap + + try: + smoothed = HeatmapData( + symbol=heatmap.symbol, + timestamp=heatmap.timestamp, + bucket_size=heatmap.bucket_size + ) + + # Separate bids and asks + bids = [p for p in heatmap.data if p.side == 'bid'] + asks = [p for p in heatmap.data if p.side == 'ask'] + + # Apply smoothing to each side + smoothed_bids = self._smooth_points(bids, smoothing_factor) + smoothed_asks = self._smooth_points(asks, smoothing_factor) + + smoothed.data = smoothed_bids + smoothed_asks + + logger.debug(f"Applied smoothing with factor {smoothing_factor}") + return smoothed + + except Exception as e: + logger.error(f"Error applying smoothing: {e}") + return heatmap # Return original on error + + def _smooth_points(self, points: List[HeatmapPoint], + smoothing_factor: float) -> List[HeatmapPoint]: + """ + Apply smoothing to a list of heatmap points. + + Args: + points: Points to smooth + smoothing_factor: Smoothing factor + + Returns: + List[HeatmapPoint]: Smoothed points + """ + if len(points) < 3: + return points + + # Sort points by price + sorted_points = sorted(points, key=lambda p: p.price) + smoothed_points = [] + + for i, point in enumerate(sorted_points): + # Calculate weighted average with neighbors + total_weight = 1.0 + weighted_volume = point.volume + weighted_intensity = point.intensity + + # Add left neighbor + if i > 0: + left_point = sorted_points[i - 1] + weight = smoothing_factor + total_weight += weight + weighted_volume += left_point.volume * weight + weighted_intensity += left_point.intensity * weight + + # Add right neighbor + if i < len(sorted_points) - 1: + right_point = sorted_points[i + 1] + weight = smoothing_factor + total_weight += weight + weighted_volume += right_point.volume * weight + weighted_intensity += right_point.intensity * weight + + # Create smoothed point + smoothed_point = HeatmapPoint( + price=point.price, + volume=weighted_volume / total_weight, + intensity=min(weighted_intensity / total_weight, 1.0), + side=point.side + ) + smoothed_points.append(smoothed_point) + + return smoothed_points + + def filter_by_intensity(self, heatmap: HeatmapData, + min_intensity: float = 0.1) -> HeatmapData: + """ + Filter heatmap points by minimum intensity. + + Args: + heatmap: Original heatmap data + min_intensity: Minimum intensity threshold + + Returns: + HeatmapData: Filtered heatmap data + """ + filtered = HeatmapData( + symbol=heatmap.symbol, + timestamp=heatmap.timestamp, + bucket_size=heatmap.bucket_size + ) + + # Filter points by intensity + filtered.data = [ + point for point in heatmap.data + if point.intensity >= min_intensity + ] + + logger.debug( + f"Filtered heatmap: {len(heatmap.data)} -> {len(filtered.data)} points " + f"(min_intensity: {min_intensity})" + ) + + return filtered + + def get_price_levels(self, heatmap: HeatmapData, + side: str = None) -> List[float]: + """ + Get sorted list of price levels from heatmap. + + Args: + heatmap: Heatmap data + side: 'bid', 'ask', or None for both + + Returns: + List[float]: Sorted price levels + """ + if side: + points = [p for p in heatmap.data if p.side == side] + else: + points = heatmap.data + + prices = [p.price for p in points] + return sorted(prices) + + def get_volume_profile(self, heatmap: HeatmapData) -> Dict[str, List[Tuple[float, float]]]: + """ + Get volume profile from heatmap data. + + Args: + heatmap: Heatmap data + + Returns: + Dict: Volume profile with 'bids' and 'asks' as (price, volume) tuples + """ + profile = {'bids': [], 'asks': []} + + # Extract bid profile + bid_points = [p for p in heatmap.data if p.side == 'bid'] + profile['bids'] = [(p.price, p.volume) for p in bid_points] + profile['bids'].sort(key=lambda x: x[0], reverse=True) # Highest price first + + # Extract ask profile + ask_points = [p for p in heatmap.data if p.side == 'ask'] + profile['asks'] = [(p.price, p.volume) for p in ask_points] + profile['asks'].sort(key=lambda x: x[0]) # Lowest price first + + return profile + + def calculate_support_resistance(self, heatmap: HeatmapData, + threshold: float = 0.7) -> Dict[str, List[float]]: + """ + Identify potential support and resistance levels from heatmap. + + Args: + heatmap: Heatmap data + threshold: Intensity threshold for significant levels + + Returns: + Dict: Support and resistance levels + """ + levels = {'support': [], 'resistance': []} + + # Find high-intensity bid levels (potential support) + bid_points = [p for p in heatmap.data if p.side == 'bid' and p.intensity >= threshold] + levels['support'] = sorted([p.price for p in bid_points], reverse=True) + + # Find high-intensity ask levels (potential resistance) + ask_points = [p for p in heatmap.data if p.side == 'ask' and p.intensity >= threshold] + levels['resistance'] = sorted([p.price for p in ask_points]) + + logger.debug( + f"Identified {len(levels['support'])} support and " + f"{len(levels['resistance'])} resistance levels" + ) + + return levels + + def get_heatmap_summary(self, heatmap: HeatmapData) -> Dict[str, float]: + """ + Get summary statistics for heatmap data. + + Args: + heatmap: Heatmap data + + Returns: + Dict: Summary statistics + """ + if not heatmap.data: + return {} + + # Separate bids and asks + bids = [p for p in heatmap.data if p.side == 'bid'] + asks = [p for p in heatmap.data if p.side == 'ask'] + + summary = { + 'total_points': len(heatmap.data), + 'bid_points': len(bids), + 'ask_points': len(asks), + 'total_volume': sum(p.volume for p in heatmap.data), + 'bid_volume': sum(p.volume for p in bids), + 'ask_volume': sum(p.volume for p in asks), + 'max_intensity': max(p.intensity for p in heatmap.data), + 'avg_intensity': sum(p.intensity for p in heatmap.data) / len(heatmap.data), + 'price_range': 0.0, + 'best_bid': 0.0, + 'best_ask': 0.0 + } + + # Calculate price range + all_prices = [p.price for p in heatmap.data] + if all_prices: + summary['price_range'] = max(all_prices) - min(all_prices) + + # Calculate best bid and ask + if bids: + summary['best_bid'] = max(p.price for p in bids) + if asks: + summary['best_ask'] = min(p.price for p in asks) + + # Calculate volume imbalance + total_volume = summary['total_volume'] + if total_volume > 0: + summary['volume_imbalance'] = ( + (summary['bid_volume'] - summary['ask_volume']) / total_volume + ) + else: + summary['volume_imbalance'] = 0.0 + + return summary + + def get_processing_stats(self) -> Dict[str, int]: + """Get processing statistics""" + return { + 'heatmaps_generated': self.heatmaps_generated, + 'total_points_created': self.total_points_created, + 'avg_points_per_heatmap': ( + self.total_points_created // max(self.heatmaps_generated, 1) + ) + } + + def reset_stats(self) -> None: + """Reset processing statistics""" + self.heatmaps_generated = 0 + self.total_points_created = 0 + logger.info("Heatmap generator statistics reset") \ No newline at end of file diff --git a/COBY/aggregation/price_bucketer.py b/COBY/aggregation/price_bucketer.py new file mode 100644 index 0000000..962e45b --- /dev/null +++ b/COBY/aggregation/price_bucketer.py @@ -0,0 +1,341 @@ +""" +Price bucketing system for order book aggregation. +""" + +import math +from typing import Dict, List, Tuple, Optional +from collections import defaultdict +from ..models.core import OrderBookSnapshot, PriceBuckets, PriceLevel +from ..config import config +from ..utils.logging import get_logger +from ..utils.validation import validate_price, validate_volume + +logger = get_logger(__name__) + + +class PriceBucketer: + """ + Converts order book data into price buckets for heatmap visualization. + + Uses universal $1 USD buckets for all symbols to simplify logic. + """ + + def __init__(self, bucket_size: float = None): + """ + Initialize price bucketer. + + Args: + bucket_size: Size of price buckets in USD (defaults to config value) + """ + self.bucket_size = bucket_size or config.get_bucket_size() + + # Statistics + self.buckets_created = 0 + self.total_volume_processed = 0.0 + + logger.info(f"Price bucketer initialized with ${self.bucket_size} buckets") + + def create_price_buckets(self, orderbook: OrderBookSnapshot) -> PriceBuckets: + """ + Convert order book data to price buckets. + + Args: + orderbook: Order book snapshot + + Returns: + PriceBuckets: Aggregated price bucket data + """ + try: + # Create price buckets object + buckets = PriceBuckets( + symbol=orderbook.symbol, + timestamp=orderbook.timestamp, + bucket_size=self.bucket_size + ) + + # Process bids (aggregate into buckets) + for bid in orderbook.bids: + if validate_price(bid.price) and validate_volume(bid.size): + buckets.add_bid(bid.price, bid.size) + self.total_volume_processed += bid.size + + # Process asks (aggregate into buckets) + for ask in orderbook.asks: + if validate_price(ask.price) and validate_volume(ask.size): + buckets.add_ask(ask.price, ask.size) + self.total_volume_processed += ask.size + + self.buckets_created += 1 + + logger.debug( + f"Created price buckets for {orderbook.symbol}: " + f"{len(buckets.bid_buckets)} bid buckets, {len(buckets.ask_buckets)} ask buckets" + ) + + return buckets + + except Exception as e: + logger.error(f"Error creating price buckets: {e}") + raise + + def aggregate_buckets(self, bucket_list: List[PriceBuckets]) -> PriceBuckets: + """ + Aggregate multiple price buckets into a single bucket set. + + Args: + bucket_list: List of price buckets to aggregate + + Returns: + PriceBuckets: Aggregated buckets + """ + if not bucket_list: + raise ValueError("Cannot aggregate empty bucket list") + + # Use first bucket as template + first_bucket = bucket_list[0] + aggregated = PriceBuckets( + symbol=first_bucket.symbol, + timestamp=first_bucket.timestamp, + bucket_size=self.bucket_size + ) + + # Aggregate all bid buckets + for buckets in bucket_list: + for price, volume in buckets.bid_buckets.items(): + bucket_price = aggregated.get_bucket_price(price) + aggregated.bid_buckets[bucket_price] = ( + aggregated.bid_buckets.get(bucket_price, 0) + volume + ) + + # Aggregate all ask buckets + for buckets in bucket_list: + for price, volume in buckets.ask_buckets.items(): + bucket_price = aggregated.get_bucket_price(price) + aggregated.ask_buckets[bucket_price] = ( + aggregated.ask_buckets.get(bucket_price, 0) + volume + ) + + logger.debug(f"Aggregated {len(bucket_list)} bucket sets") + return aggregated + + def get_bucket_range(self, center_price: float, depth: int) -> Tuple[float, float]: + """ + Get price range for buckets around a center price. + + Args: + center_price: Center price for the range + depth: Number of buckets on each side + + Returns: + Tuple[float, float]: (min_price, max_price) + """ + half_range = depth * self.bucket_size + min_price = center_price - half_range + max_price = center_price + half_range + + return (max(0, min_price), max_price) + + def filter_buckets_by_range(self, buckets: PriceBuckets, + min_price: float, max_price: float) -> PriceBuckets: + """ + Filter buckets to only include those within a price range. + + Args: + buckets: Original price buckets + min_price: Minimum price to include + max_price: Maximum price to include + + Returns: + PriceBuckets: Filtered buckets + """ + filtered = PriceBuckets( + symbol=buckets.symbol, + timestamp=buckets.timestamp, + bucket_size=buckets.bucket_size + ) + + # Filter bid buckets + for price, volume in buckets.bid_buckets.items(): + if min_price <= price <= max_price: + filtered.bid_buckets[price] = volume + + # Filter ask buckets + for price, volume in buckets.ask_buckets.items(): + if min_price <= price <= max_price: + filtered.ask_buckets[price] = volume + + return filtered + + def get_top_buckets(self, buckets: PriceBuckets, count: int) -> PriceBuckets: + """ + Get top N buckets by volume. + + Args: + buckets: Original price buckets + count: Number of top buckets to return + + Returns: + PriceBuckets: Top buckets by volume + """ + top_buckets = PriceBuckets( + symbol=buckets.symbol, + timestamp=buckets.timestamp, + bucket_size=buckets.bucket_size + ) + + # Get top bid buckets + top_bids = sorted( + buckets.bid_buckets.items(), + key=lambda x: x[1], # Sort by volume + reverse=True + )[:count] + + for price, volume in top_bids: + top_buckets.bid_buckets[price] = volume + + # Get top ask buckets + top_asks = sorted( + buckets.ask_buckets.items(), + key=lambda x: x[1], # Sort by volume + reverse=True + )[:count] + + for price, volume in top_asks: + top_buckets.ask_buckets[price] = volume + + return top_buckets + + def calculate_bucket_statistics(self, buckets: PriceBuckets) -> Dict[str, float]: + """ + Calculate statistics for price buckets. + + Args: + buckets: Price buckets to analyze + + Returns: + Dict[str, float]: Bucket statistics + """ + stats = { + 'total_bid_buckets': len(buckets.bid_buckets), + 'total_ask_buckets': len(buckets.ask_buckets), + 'total_bid_volume': sum(buckets.bid_buckets.values()), + 'total_ask_volume': sum(buckets.ask_buckets.values()), + 'bid_price_range': 0.0, + 'ask_price_range': 0.0, + 'max_bid_volume': 0.0, + 'max_ask_volume': 0.0, + 'avg_bid_volume': 0.0, + 'avg_ask_volume': 0.0 + } + + # Calculate bid statistics + if buckets.bid_buckets: + bid_prices = list(buckets.bid_buckets.keys()) + bid_volumes = list(buckets.bid_buckets.values()) + + stats['bid_price_range'] = max(bid_prices) - min(bid_prices) + stats['max_bid_volume'] = max(bid_volumes) + stats['avg_bid_volume'] = sum(bid_volumes) / len(bid_volumes) + + # Calculate ask statistics + if buckets.ask_buckets: + ask_prices = list(buckets.ask_buckets.keys()) + ask_volumes = list(buckets.ask_buckets.values()) + + stats['ask_price_range'] = max(ask_prices) - min(ask_prices) + stats['max_ask_volume'] = max(ask_volumes) + stats['avg_ask_volume'] = sum(ask_volumes) / len(ask_volumes) + + # Calculate combined statistics + stats['total_volume'] = stats['total_bid_volume'] + stats['total_ask_volume'] + stats['volume_imbalance'] = ( + (stats['total_bid_volume'] - stats['total_ask_volume']) / + max(stats['total_volume'], 1e-10) + ) + + return stats + + def merge_adjacent_buckets(self, buckets: PriceBuckets, merge_factor: int = 2) -> PriceBuckets: + """ + Merge adjacent buckets to create larger bucket sizes. + + Args: + buckets: Original price buckets + merge_factor: Number of adjacent buckets to merge + + Returns: + PriceBuckets: Merged buckets with larger bucket size + """ + merged = PriceBuckets( + symbol=buckets.symbol, + timestamp=buckets.timestamp, + bucket_size=buckets.bucket_size * merge_factor + ) + + # Merge bid buckets + bid_groups = defaultdict(float) + for price, volume in buckets.bid_buckets.items(): + # Calculate new bucket price + new_bucket_price = merged.get_bucket_price(price) + bid_groups[new_bucket_price] += volume + + merged.bid_buckets = dict(bid_groups) + + # Merge ask buckets + ask_groups = defaultdict(float) + for price, volume in buckets.ask_buckets.items(): + # Calculate new bucket price + new_bucket_price = merged.get_bucket_price(price) + ask_groups[new_bucket_price] += volume + + merged.ask_buckets = dict(ask_groups) + + logger.debug(f"Merged buckets with factor {merge_factor}") + return merged + + def get_bucket_depth_profile(self, buckets: PriceBuckets, + center_price: float) -> Dict[str, List[Tuple[float, float]]]: + """ + Get depth profile showing volume at different distances from center price. + + Args: + buckets: Price buckets + center_price: Center price for depth calculation + + Returns: + Dict: Depth profile with 'bids' and 'asks' lists of (distance, volume) tuples + """ + profile = {'bids': [], 'asks': []} + + # Calculate bid depth profile + for price, volume in buckets.bid_buckets.items(): + distance = abs(center_price - price) + profile['bids'].append((distance, volume)) + + # Calculate ask depth profile + for price, volume in buckets.ask_buckets.items(): + distance = abs(price - center_price) + profile['asks'].append((distance, volume)) + + # Sort by distance + profile['bids'].sort(key=lambda x: x[0]) + profile['asks'].sort(key=lambda x: x[0]) + + return profile + + def get_processing_stats(self) -> Dict[str, float]: + """Get processing statistics""" + return { + 'bucket_size': self.bucket_size, + 'buckets_created': self.buckets_created, + 'total_volume_processed': self.total_volume_processed, + 'avg_volume_per_bucket': ( + self.total_volume_processed / max(self.buckets_created, 1) + ) + } + + def reset_stats(self) -> None: + """Reset processing statistics""" + self.buckets_created = 0 + self.total_volume_processed = 0.0 + logger.info("Price bucketer statistics reset") \ No newline at end of file diff --git a/COBY/config.py b/COBY/config.py index 6febf1c..55b2edc 100644 --- a/COBY/config.py +++ b/COBY/config.py @@ -51,9 +51,7 @@ class ExchangeConfig: @dataclass class AggregationConfig: """Data aggregation configuration""" - btc_bucket_size: float = float(os.getenv('BTC_BUCKET_SIZE', '10.0')) # $10 USD buckets - eth_bucket_size: float = float(os.getenv('ETH_BUCKET_SIZE', '1.0')) # $1 USD buckets - default_bucket_size: float = float(os.getenv('DEFAULT_BUCKET_SIZE', '1.0')) + bucket_size: float = float(os.getenv('BUCKET_SIZE', '1.0')) # $1 USD buckets for all symbols heatmap_depth: int = int(os.getenv('HEATMAP_DEPTH', '50')) # Number of price levels update_frequency: float = float(os.getenv('UPDATE_FREQUENCY', '0.5')) # Seconds volume_threshold: float = float(os.getenv('VOLUME_THRESHOLD', '0.01')) # Minimum volume @@ -119,15 +117,9 @@ class Config: if self.aggregation.eth_bucket_size <= 0: raise ValueError("ETH bucket size must be positive") - def get_bucket_size(self, symbol: str) -> float: - """Get bucket size for a specific symbol""" - symbol_upper = symbol.upper() - if 'BTC' in symbol_upper: - return self.aggregation.btc_bucket_size - elif 'ETH' in symbol_upper: - return self.aggregation.eth_bucket_size - else: - return self.aggregation.default_bucket_size + def get_bucket_size(self, symbol: str = None) -> float: + """Get bucket size (now universal $1 for all symbols)""" + return self.aggregation.bucket_size def get_database_url(self) -> str: """Get database connection URL""" @@ -158,8 +150,7 @@ class Config: 'symbols': self.exchanges.symbols, }, 'aggregation': { - 'btc_bucket_size': self.aggregation.btc_bucket_size, - 'eth_bucket_size': self.aggregation.eth_bucket_size, + 'bucket_size': self.aggregation.bucket_size, 'heatmap_depth': self.aggregation.heatmap_depth, }, 'api': {