This commit is contained in:
Dobromir Popov
2025-08-04 17:40:30 +03:00
parent de77b0afa8
commit 8ee9b7a90c
7 changed files with 1468 additions and 14 deletions

View File

@ -0,0 +1,376 @@
"""
Heatmap data generation from price buckets.
"""
from typing import List, Dict, Optional, Tuple
from ..models.core import PriceBuckets, HeatmapData, HeatmapPoint
from ..config import config
from ..utils.logging import get_logger
logger = get_logger(__name__)
class HeatmapGenerator:
"""
Generates heatmap visualization data from price buckets.
Creates intensity-based heatmap points for visualization.
"""
def __init__(self):
"""Initialize heatmap generator"""
self.heatmaps_generated = 0
self.total_points_created = 0
logger.info("Heatmap generator initialized")
def generate_heatmap(self, buckets: PriceBuckets,
max_points: Optional[int] = None) -> HeatmapData:
"""
Generate heatmap data from price buckets.
Args:
buckets: Price buckets to convert
max_points: Maximum number of points to include (None = all)
Returns:
HeatmapData: Heatmap visualization data
"""
try:
heatmap = HeatmapData(
symbol=buckets.symbol,
timestamp=buckets.timestamp,
bucket_size=buckets.bucket_size
)
# Calculate maximum volume for intensity normalization
all_volumes = list(buckets.bid_buckets.values()) + list(buckets.ask_buckets.values())
max_volume = max(all_volumes) if all_volumes else 1.0
# Generate bid points
bid_points = self._create_heatmap_points(
buckets.bid_buckets, 'bid', max_volume
)
# Generate ask points
ask_points = self._create_heatmap_points(
buckets.ask_buckets, 'ask', max_volume
)
# Combine all points
all_points = bid_points + ask_points
# Limit points if requested
if max_points and len(all_points) > max_points:
# Sort by volume and take top points
all_points.sort(key=lambda p: p.volume, reverse=True)
all_points = all_points[:max_points]
heatmap.data = all_points
self.heatmaps_generated += 1
self.total_points_created += len(all_points)
logger.debug(
f"Generated heatmap for {buckets.symbol}: {len(all_points)} points "
f"(max_volume: {max_volume:.6f})"
)
return heatmap
except Exception as e:
logger.error(f"Error generating heatmap: {e}")
raise
def _create_heatmap_points(self, bucket_dict: Dict[float, float],
side: str, max_volume: float) -> List[HeatmapPoint]:
"""
Create heatmap points from bucket dictionary.
Args:
bucket_dict: Dictionary of price -> volume
side: 'bid' or 'ask'
max_volume: Maximum volume for intensity calculation
Returns:
List[HeatmapPoint]: List of heatmap points
"""
points = []
for price, volume in bucket_dict.items():
if volume > 0: # Only include non-zero volumes
intensity = min(volume / max_volume, 1.0) if max_volume > 0 else 0.0
point = HeatmapPoint(
price=price,
volume=volume,
intensity=intensity,
side=side
)
points.append(point)
return points
def apply_smoothing(self, heatmap: HeatmapData,
smoothing_factor: float = 0.3) -> HeatmapData:
"""
Apply smoothing to heatmap data to reduce noise.
Args:
heatmap: Original heatmap data
smoothing_factor: Smoothing factor (0.0 = no smoothing, 1.0 = maximum)
Returns:
HeatmapData: Smoothed heatmap data
"""
if smoothing_factor <= 0:
return heatmap
try:
smoothed = HeatmapData(
symbol=heatmap.symbol,
timestamp=heatmap.timestamp,
bucket_size=heatmap.bucket_size
)
# Separate bids and asks
bids = [p for p in heatmap.data if p.side == 'bid']
asks = [p for p in heatmap.data if p.side == 'ask']
# Apply smoothing to each side
smoothed_bids = self._smooth_points(bids, smoothing_factor)
smoothed_asks = self._smooth_points(asks, smoothing_factor)
smoothed.data = smoothed_bids + smoothed_asks
logger.debug(f"Applied smoothing with factor {smoothing_factor}")
return smoothed
except Exception as e:
logger.error(f"Error applying smoothing: {e}")
return heatmap # Return original on error
def _smooth_points(self, points: List[HeatmapPoint],
smoothing_factor: float) -> List[HeatmapPoint]:
"""
Apply smoothing to a list of heatmap points.
Args:
points: Points to smooth
smoothing_factor: Smoothing factor
Returns:
List[HeatmapPoint]: Smoothed points
"""
if len(points) < 3:
return points
# Sort points by price
sorted_points = sorted(points, key=lambda p: p.price)
smoothed_points = []
for i, point in enumerate(sorted_points):
# Calculate weighted average with neighbors
total_weight = 1.0
weighted_volume = point.volume
weighted_intensity = point.intensity
# Add left neighbor
if i > 0:
left_point = sorted_points[i - 1]
weight = smoothing_factor
total_weight += weight
weighted_volume += left_point.volume * weight
weighted_intensity += left_point.intensity * weight
# Add right neighbor
if i < len(sorted_points) - 1:
right_point = sorted_points[i + 1]
weight = smoothing_factor
total_weight += weight
weighted_volume += right_point.volume * weight
weighted_intensity += right_point.intensity * weight
# Create smoothed point
smoothed_point = HeatmapPoint(
price=point.price,
volume=weighted_volume / total_weight,
intensity=min(weighted_intensity / total_weight, 1.0),
side=point.side
)
smoothed_points.append(smoothed_point)
return smoothed_points
def filter_by_intensity(self, heatmap: HeatmapData,
min_intensity: float = 0.1) -> HeatmapData:
"""
Filter heatmap points by minimum intensity.
Args:
heatmap: Original heatmap data
min_intensity: Minimum intensity threshold
Returns:
HeatmapData: Filtered heatmap data
"""
filtered = HeatmapData(
symbol=heatmap.symbol,
timestamp=heatmap.timestamp,
bucket_size=heatmap.bucket_size
)
# Filter points by intensity
filtered.data = [
point for point in heatmap.data
if point.intensity >= min_intensity
]
logger.debug(
f"Filtered heatmap: {len(heatmap.data)} -> {len(filtered.data)} points "
f"(min_intensity: {min_intensity})"
)
return filtered
def get_price_levels(self, heatmap: HeatmapData,
side: str = None) -> List[float]:
"""
Get sorted list of price levels from heatmap.
Args:
heatmap: Heatmap data
side: 'bid', 'ask', or None for both
Returns:
List[float]: Sorted price levels
"""
if side:
points = [p for p in heatmap.data if p.side == side]
else:
points = heatmap.data
prices = [p.price for p in points]
return sorted(prices)
def get_volume_profile(self, heatmap: HeatmapData) -> Dict[str, List[Tuple[float, float]]]:
"""
Get volume profile from heatmap data.
Args:
heatmap: Heatmap data
Returns:
Dict: Volume profile with 'bids' and 'asks' as (price, volume) tuples
"""
profile = {'bids': [], 'asks': []}
# Extract bid profile
bid_points = [p for p in heatmap.data if p.side == 'bid']
profile['bids'] = [(p.price, p.volume) for p in bid_points]
profile['bids'].sort(key=lambda x: x[0], reverse=True) # Highest price first
# Extract ask profile
ask_points = [p for p in heatmap.data if p.side == 'ask']
profile['asks'] = [(p.price, p.volume) for p in ask_points]
profile['asks'].sort(key=lambda x: x[0]) # Lowest price first
return profile
def calculate_support_resistance(self, heatmap: HeatmapData,
threshold: float = 0.7) -> Dict[str, List[float]]:
"""
Identify potential support and resistance levels from heatmap.
Args:
heatmap: Heatmap data
threshold: Intensity threshold for significant levels
Returns:
Dict: Support and resistance levels
"""
levels = {'support': [], 'resistance': []}
# Find high-intensity bid levels (potential support)
bid_points = [p for p in heatmap.data if p.side == 'bid' and p.intensity >= threshold]
levels['support'] = sorted([p.price for p in bid_points], reverse=True)
# Find high-intensity ask levels (potential resistance)
ask_points = [p for p in heatmap.data if p.side == 'ask' and p.intensity >= threshold]
levels['resistance'] = sorted([p.price for p in ask_points])
logger.debug(
f"Identified {len(levels['support'])} support and "
f"{len(levels['resistance'])} resistance levels"
)
return levels
def get_heatmap_summary(self, heatmap: HeatmapData) -> Dict[str, float]:
"""
Get summary statistics for heatmap data.
Args:
heatmap: Heatmap data
Returns:
Dict: Summary statistics
"""
if not heatmap.data:
return {}
# Separate bids and asks
bids = [p for p in heatmap.data if p.side == 'bid']
asks = [p for p in heatmap.data if p.side == 'ask']
summary = {
'total_points': len(heatmap.data),
'bid_points': len(bids),
'ask_points': len(asks),
'total_volume': sum(p.volume for p in heatmap.data),
'bid_volume': sum(p.volume for p in bids),
'ask_volume': sum(p.volume for p in asks),
'max_intensity': max(p.intensity for p in heatmap.data),
'avg_intensity': sum(p.intensity for p in heatmap.data) / len(heatmap.data),
'price_range': 0.0,
'best_bid': 0.0,
'best_ask': 0.0
}
# Calculate price range
all_prices = [p.price for p in heatmap.data]
if all_prices:
summary['price_range'] = max(all_prices) - min(all_prices)
# Calculate best bid and ask
if bids:
summary['best_bid'] = max(p.price for p in bids)
if asks:
summary['best_ask'] = min(p.price for p in asks)
# Calculate volume imbalance
total_volume = summary['total_volume']
if total_volume > 0:
summary['volume_imbalance'] = (
(summary['bid_volume'] - summary['ask_volume']) / total_volume
)
else:
summary['volume_imbalance'] = 0.0
return summary
def get_processing_stats(self) -> Dict[str, int]:
"""Get processing statistics"""
return {
'heatmaps_generated': self.heatmaps_generated,
'total_points_created': self.total_points_created,
'avg_points_per_heatmap': (
self.total_points_created // max(self.heatmaps_generated, 1)
)
}
def reset_stats(self) -> None:
"""Reset processing statistics"""
self.heatmaps_generated = 0
self.total_points_created = 0
logger.info("Heatmap generator statistics reset")