""" Heatmap data generation from price buckets. """ from typing import List, Dict, Optional, Tuple from ..models.core import PriceBuckets, HeatmapData, HeatmapPoint from ..config import config from ..utils.logging import get_logger logger = get_logger(__name__) class HeatmapGenerator: """ Generates heatmap visualization data from price buckets. Creates intensity-based heatmap points for visualization. """ def __init__(self): """Initialize heatmap generator""" self.heatmaps_generated = 0 self.total_points_created = 0 logger.info("Heatmap generator initialized") def generate_heatmap(self, buckets: PriceBuckets, max_points: Optional[int] = None) -> HeatmapData: """ Generate heatmap data from price buckets. Args: buckets: Price buckets to convert max_points: Maximum number of points to include (None = all) Returns: HeatmapData: Heatmap visualization data """ try: heatmap = HeatmapData( symbol=buckets.symbol, timestamp=buckets.timestamp, bucket_size=buckets.bucket_size ) # Calculate maximum volume for intensity normalization all_volumes = list(buckets.bid_buckets.values()) + list(buckets.ask_buckets.values()) max_volume = max(all_volumes) if all_volumes else 1.0 # Generate bid points bid_points = self._create_heatmap_points( buckets.bid_buckets, 'bid', max_volume ) # Generate ask points ask_points = self._create_heatmap_points( buckets.ask_buckets, 'ask', max_volume ) # Combine all points all_points = bid_points + ask_points # Limit points if requested if max_points and len(all_points) > max_points: # Sort by volume and take top points all_points.sort(key=lambda p: p.volume, reverse=True) all_points = all_points[:max_points] heatmap.data = all_points self.heatmaps_generated += 1 self.total_points_created += len(all_points) logger.debug( f"Generated heatmap for {buckets.symbol}: {len(all_points)} points " f"(max_volume: {max_volume:.6f})" ) return heatmap except Exception as e: logger.error(f"Error generating heatmap: {e}") raise def _create_heatmap_points(self, bucket_dict: Dict[float, float], side: str, max_volume: float) -> List[HeatmapPoint]: """ Create heatmap points from bucket dictionary. Args: bucket_dict: Dictionary of price -> volume side: 'bid' or 'ask' max_volume: Maximum volume for intensity calculation Returns: List[HeatmapPoint]: List of heatmap points """ points = [] for price, volume in bucket_dict.items(): if volume > 0: # Only include non-zero volumes intensity = min(volume / max_volume, 1.0) if max_volume > 0 else 0.0 point = HeatmapPoint( price=price, volume=volume, intensity=intensity, side=side ) points.append(point) return points def apply_smoothing(self, heatmap: HeatmapData, smoothing_factor: float = 0.3) -> HeatmapData: """ Apply smoothing to heatmap data to reduce noise. Args: heatmap: Original heatmap data smoothing_factor: Smoothing factor (0.0 = no smoothing, 1.0 = maximum) Returns: HeatmapData: Smoothed heatmap data """ if smoothing_factor <= 0: return heatmap try: smoothed = HeatmapData( symbol=heatmap.symbol, timestamp=heatmap.timestamp, bucket_size=heatmap.bucket_size ) # Separate bids and asks bids = [p for p in heatmap.data if p.side == 'bid'] asks = [p for p in heatmap.data if p.side == 'ask'] # Apply smoothing to each side smoothed_bids = self._smooth_points(bids, smoothing_factor) smoothed_asks = self._smooth_points(asks, smoothing_factor) smoothed.data = smoothed_bids + smoothed_asks logger.debug(f"Applied smoothing with factor {smoothing_factor}") return smoothed except Exception as e: logger.error(f"Error applying smoothing: {e}") return heatmap # Return original on error def _smooth_points(self, points: List[HeatmapPoint], smoothing_factor: float) -> List[HeatmapPoint]: """ Apply smoothing to a list of heatmap points. Args: points: Points to smooth smoothing_factor: Smoothing factor Returns: List[HeatmapPoint]: Smoothed points """ if len(points) < 3: return points # Sort points by price sorted_points = sorted(points, key=lambda p: p.price) smoothed_points = [] for i, point in enumerate(sorted_points): # Calculate weighted average with neighbors total_weight = 1.0 weighted_volume = point.volume weighted_intensity = point.intensity # Add left neighbor if i > 0: left_point = sorted_points[i - 1] weight = smoothing_factor total_weight += weight weighted_volume += left_point.volume * weight weighted_intensity += left_point.intensity * weight # Add right neighbor if i < len(sorted_points) - 1: right_point = sorted_points[i + 1] weight = smoothing_factor total_weight += weight weighted_volume += right_point.volume * weight weighted_intensity += right_point.intensity * weight # Create smoothed point smoothed_point = HeatmapPoint( price=point.price, volume=weighted_volume / total_weight, intensity=min(weighted_intensity / total_weight, 1.0), side=point.side ) smoothed_points.append(smoothed_point) return smoothed_points def filter_by_intensity(self, heatmap: HeatmapData, min_intensity: float = 0.1) -> HeatmapData: """ Filter heatmap points by minimum intensity. Args: heatmap: Original heatmap data min_intensity: Minimum intensity threshold Returns: HeatmapData: Filtered heatmap data """ filtered = HeatmapData( symbol=heatmap.symbol, timestamp=heatmap.timestamp, bucket_size=heatmap.bucket_size ) # Filter points by intensity filtered.data = [ point for point in heatmap.data if point.intensity >= min_intensity ] logger.debug( f"Filtered heatmap: {len(heatmap.data)} -> {len(filtered.data)} points " f"(min_intensity: {min_intensity})" ) return filtered def get_price_levels(self, heatmap: HeatmapData, side: str = None) -> List[float]: """ Get sorted list of price levels from heatmap. Args: heatmap: Heatmap data side: 'bid', 'ask', or None for both Returns: List[float]: Sorted price levels """ if side: points = [p for p in heatmap.data if p.side == side] else: points = heatmap.data prices = [p.price for p in points] return sorted(prices) def get_volume_profile(self, heatmap: HeatmapData) -> Dict[str, List[Tuple[float, float]]]: """ Get volume profile from heatmap data. Args: heatmap: Heatmap data Returns: Dict: Volume profile with 'bids' and 'asks' as (price, volume) tuples """ profile = {'bids': [], 'asks': []} # Extract bid profile bid_points = [p for p in heatmap.data if p.side == 'bid'] profile['bids'] = [(p.price, p.volume) for p in bid_points] profile['bids'].sort(key=lambda x: x[0], reverse=True) # Highest price first # Extract ask profile ask_points = [p for p in heatmap.data if p.side == 'ask'] profile['asks'] = [(p.price, p.volume) for p in ask_points] profile['asks'].sort(key=lambda x: x[0]) # Lowest price first return profile def calculate_support_resistance(self, heatmap: HeatmapData, threshold: float = 0.7) -> Dict[str, List[float]]: """ Identify potential support and resistance levels from heatmap. Args: heatmap: Heatmap data threshold: Intensity threshold for significant levels Returns: Dict: Support and resistance levels """ levels = {'support': [], 'resistance': []} # Find high-intensity bid levels (potential support) bid_points = [p for p in heatmap.data if p.side == 'bid' and p.intensity >= threshold] levels['support'] = sorted([p.price for p in bid_points], reverse=True) # Find high-intensity ask levels (potential resistance) ask_points = [p for p in heatmap.data if p.side == 'ask' and p.intensity >= threshold] levels['resistance'] = sorted([p.price for p in ask_points]) logger.debug( f"Identified {len(levels['support'])} support and " f"{len(levels['resistance'])} resistance levels" ) return levels def get_heatmap_summary(self, heatmap: HeatmapData) -> Dict[str, float]: """ Get summary statistics for heatmap data. Args: heatmap: Heatmap data Returns: Dict: Summary statistics """ if not heatmap.data: return {} # Separate bids and asks bids = [p for p in heatmap.data if p.side == 'bid'] asks = [p for p in heatmap.data if p.side == 'ask'] summary = { 'total_points': len(heatmap.data), 'bid_points': len(bids), 'ask_points': len(asks), 'total_volume': sum(p.volume for p in heatmap.data), 'bid_volume': sum(p.volume for p in bids), 'ask_volume': sum(p.volume for p in asks), 'max_intensity': max(p.intensity for p in heatmap.data), 'avg_intensity': sum(p.intensity for p in heatmap.data) / len(heatmap.data), 'price_range': 0.0, 'best_bid': 0.0, 'best_ask': 0.0 } # Calculate price range all_prices = [p.price for p in heatmap.data] if all_prices: summary['price_range'] = max(all_prices) - min(all_prices) # Calculate best bid and ask if bids: summary['best_bid'] = max(p.price for p in bids) if asks: summary['best_ask'] = min(p.price for p in asks) # Calculate volume imbalance total_volume = summary['total_volume'] if total_volume > 0: summary['volume_imbalance'] = ( (summary['bid_volume'] - summary['ask_volume']) / total_volume ) else: summary['volume_imbalance'] = 0.0 return summary def get_processing_stats(self) -> Dict[str, int]: """Get processing statistics""" return { 'heatmaps_generated': self.heatmaps_generated, 'total_points_created': self.total_points_created, 'avg_points_per_heatmap': ( self.total_points_created // max(self.heatmaps_generated, 1) ) } def reset_stats(self) -> None: """Reset processing statistics""" self.heatmaps_generated = 0 self.total_points_created = 0 logger.info("Heatmap generator statistics reset")