""" End-to-end latency tracking for data processing pipeline. """ import time import threading from typing import Dict, List, Optional, Any, ContextManager from collections import defaultdict, deque from datetime import datetime, timezone from dataclasses import dataclass from contextlib import contextmanager from ..utils.logging import get_logger, set_correlation_id from ..utils.timing import get_current_timestamp # Import will be done lazily to avoid circular imports logger = get_logger(__name__) @dataclass class LatencyMeasurement: """Individual latency measurement""" operation: str start_time: float end_time: float duration_ms: float correlation_id: Optional[str] = None metadata: Dict[str, Any] = None @property def duration_seconds(self) -> float: """Get duration in seconds""" return self.duration_ms / 1000.0 class LatencyTracker: """ Tracks end-to-end latency for various operations in the system. Provides context managers for easy latency measurement and comprehensive latency analytics. """ def __init__(self, max_measurements: int = 10000): """ Initialize latency tracker. Args: max_measurements: Maximum number of measurements to keep in memory """ self.max_measurements = max_measurements # Latency storage self.measurements: Dict[str, deque] = defaultdict( lambda: deque(maxlen=max_measurements) ) # Active measurements (for tracking ongoing operations) self.active_measurements: Dict[str, Dict[str, float]] = defaultdict(dict) # Thread safety self._lock = threading.RLock() # Statistics self.total_measurements = 0 logger.info(f"Latency tracker initialized with max {max_measurements} measurements") @contextmanager def measure(self, operation: str, correlation_id: str = None, metadata: Dict[str, Any] = None) -> ContextManager[None]: """ Context manager for measuring operation latency. Args: operation: Name of the operation being measured correlation_id: Optional correlation ID for tracking metadata: Optional metadata to store with measurement Usage: with latency_tracker.measure('data_processing'): # Your code here process_data() """ start_time = time.perf_counter() measurement_id = f"{operation}_{start_time}_{threading.get_ident()}" # Store active measurement with self._lock: self.active_measurements[operation][measurement_id] = start_time try: yield finally: end_time = time.perf_counter() duration_ms = (end_time - start_time) * 1000 # Create measurement measurement = LatencyMeasurement( operation=operation, start_time=start_time, end_time=end_time, duration_ms=duration_ms, correlation_id=correlation_id, metadata=metadata or {} ) # Store measurement with self._lock: self.measurements[operation].append(measurement) self.active_measurements[operation].pop(measurement_id, None) self.total_measurements += 1 # Record in metrics collector try: from .metrics_collector import metrics_collector metrics_collector.observe_histogram( f"{operation}_latency_ms", duration_ms, labels={'operation': operation} ) except ImportError: pass # Metrics collector not available logger.debug(f"Measured {operation}: {duration_ms:.2f}ms") def start_measurement(self, operation: str, measurement_id: str = None, correlation_id: str = None) -> str: """ Start a manual latency measurement. Args: operation: Name of the operation measurement_id: Optional custom measurement ID correlation_id: Optional correlation ID Returns: str: Measurement ID for ending the measurement """ start_time = time.perf_counter() if measurement_id is None: measurement_id = f"{operation}_{start_time}_{threading.get_ident()}" with self._lock: self.active_measurements[operation][measurement_id] = start_time logger.debug(f"Started measurement {measurement_id} for {operation}") return measurement_id def end_measurement(self, operation: str, measurement_id: str, metadata: Dict[str, Any] = None) -> Optional[LatencyMeasurement]: """ End a manual latency measurement. Args: operation: Name of the operation measurement_id: Measurement ID from start_measurement metadata: Optional metadata to store Returns: LatencyMeasurement: The completed measurement, or None if not found """ end_time = time.perf_counter() with self._lock: start_time = self.active_measurements[operation].pop(measurement_id, None) if start_time is None: logger.warning(f"No active measurement found: {measurement_id}") return None duration_ms = (end_time - start_time) * 1000 # Create measurement measurement = LatencyMeasurement( operation=operation, start_time=start_time, end_time=end_time, duration_ms=duration_ms, metadata=metadata or {} ) # Store measurement with self._lock: self.measurements[operation].append(measurement) self.total_measurements += 1 # Record in metrics collector try: from .metrics_collector import metrics_collector metrics_collector.observe_histogram( f"{operation}_latency_ms", duration_ms, labels={'operation': operation} ) except ImportError: pass # Metrics collector not available logger.debug(f"Completed measurement {measurement_id}: {duration_ms:.2f}ms") return measurement def get_latency_stats(self, operation: str) -> Dict[str, float]: """ Get latency statistics for an operation. Args: operation: Operation name Returns: Dict: Latency statistics """ with self._lock: measurements = list(self.measurements[operation]) if not measurements: return { 'count': 0, 'avg_ms': 0.0, 'min_ms': 0.0, 'max_ms': 0.0, 'p50_ms': 0.0, 'p95_ms': 0.0, 'p99_ms': 0.0 } durations = [m.duration_ms for m in measurements] durations.sort() count = len(durations) avg_ms = sum(durations) / count min_ms = durations[0] max_ms = durations[-1] # Calculate percentiles p50_ms = durations[int(0.50 * count)] p95_ms = durations[int(0.95 * count)] p99_ms = durations[int(0.99 * count)] return { 'count': count, 'avg_ms': avg_ms, 'min_ms': min_ms, 'max_ms': max_ms, 'p50_ms': p50_ms, 'p95_ms': p95_ms, 'p99_ms': p99_ms } def get_all_latency_stats(self) -> Dict[str, Dict[str, float]]: """Get latency statistics for all operations""" with self._lock: operations = list(self.measurements.keys()) return { operation: self.get_latency_stats(operation) for operation in operations } def get_recent_measurements(self, operation: str, limit: int = 100) -> List[LatencyMeasurement]: """ Get recent measurements for an operation. Args: operation: Operation name limit: Maximum number of measurements to return Returns: List[LatencyMeasurement]: Recent measurements """ with self._lock: measurements = list(self.measurements[operation]) return measurements[-limit:] def get_slow_operations(self, threshold_ms: float = 100.0) -> List[Dict[str, Any]]: """ Get operations that are slower than threshold. Args: threshold_ms: Latency threshold in milliseconds Returns: List: Slow operations with their stats """ slow_operations = [] for operation in self.measurements.keys(): stats = self.get_latency_stats(operation) if stats['avg_ms'] > threshold_ms: slow_operations.append({ 'operation': operation, 'avg_latency_ms': stats['avg_ms'], 'p95_latency_ms': stats['p95_ms'], 'count': stats['count'] }) # Sort by average latency (descending) slow_operations.sort(key=lambda x: x['avg_latency_ms'], reverse=True) return slow_operations def get_latency_trends(self, operation: str, window_size: int = 100) -> Dict[str, Any]: """ Get latency trends for an operation. Args: operation: Operation name window_size: Number of recent measurements to analyze Returns: Dict: Trend analysis """ recent_measurements = self.get_recent_measurements(operation, window_size) if len(recent_measurements) < 2: return {'trend': 'insufficient_data'} # Split into two halves for trend analysis mid_point = len(recent_measurements) // 2 first_half = recent_measurements[:mid_point] second_half = recent_measurements[mid_point:] first_avg = sum(m.duration_ms for m in first_half) / len(first_half) second_avg = sum(m.duration_ms for m in second_half) / len(second_half) # Calculate trend change_percent = ((second_avg - first_avg) / first_avg) * 100 if abs(change_percent) < 5: trend = 'stable' elif change_percent > 0: trend = 'increasing' else: trend = 'decreasing' return { 'trend': trend, 'change_percent': change_percent, 'first_half_avg_ms': first_avg, 'second_half_avg_ms': second_avg, 'sample_size': len(recent_measurements) } def get_active_measurements_count(self) -> Dict[str, int]: """Get count of currently active measurements by operation""" with self._lock: return { operation: len(measurements) for operation, measurements in self.active_measurements.items() if measurements } def get_latency_distribution(self, operation: str, bucket_size_ms: float = 10.0) -> Dict[str, int]: """ Get latency distribution in buckets. Args: operation: Operation name bucket_size_ms: Size of each bucket in milliseconds Returns: Dict: Latency distribution buckets """ with self._lock: measurements = list(self.measurements[operation]) if not measurements: return {} # Create buckets distribution = defaultdict(int) for measurement in measurements: bucket = int(measurement.duration_ms // bucket_size_ms) * bucket_size_ms bucket_label = f"{bucket:.0f}-{bucket + bucket_size_ms:.0f}ms" distribution[bucket_label] += 1 return dict(distribution) def export_measurements(self, operation: str = None, format: str = 'json') -> List[Dict[str, Any]]: """ Export measurements for analysis. Args: operation: Specific operation to export (None for all) format: Export format ('json', 'csv') Returns: List: Exported measurement data """ exported_data = [] operations = [operation] if operation else list(self.measurements.keys()) for op in operations: with self._lock: measurements = list(self.measurements[op]) for measurement in measurements: data = { 'operation': measurement.operation, 'duration_ms': measurement.duration_ms, 'start_time': measurement.start_time, 'end_time': measurement.end_time, 'correlation_id': measurement.correlation_id, 'metadata': measurement.metadata } exported_data.append(data) return exported_data def clear_measurements(self, operation: str = None) -> None: """ Clear measurements for an operation or all operations. Args: operation: Specific operation to clear (None for all) """ with self._lock: if operation: self.measurements[operation].clear() logger.info(f"Cleared measurements for operation: {operation}") else: self.measurements.clear() self.total_measurements = 0 logger.info("Cleared all measurements") def get_performance_impact(self, operation: str) -> Dict[str, Any]: """ Analyze performance impact of an operation. Args: operation: Operation name Returns: Dict: Performance impact analysis """ stats = self.get_latency_stats(operation) trends = self.get_latency_trends(operation) # Determine impact level avg_latency = stats['avg_ms'] if avg_latency < 10: impact_level = 'low' elif avg_latency < 100: impact_level = 'medium' else: impact_level = 'high' # Check for performance degradation degradation = trends.get('trend') == 'increasing' and trends.get('change_percent', 0) > 20 return { 'operation': operation, 'impact_level': impact_level, 'avg_latency_ms': avg_latency, 'p95_latency_ms': stats['p95_ms'], 'measurement_count': stats['count'], 'trend': trends.get('trend', 'unknown'), 'performance_degradation': degradation, 'recommendations': self._get_performance_recommendations(stats, trends) } def _get_performance_recommendations(self, stats: Dict[str, float], trends: Dict[str, Any]) -> List[str]: """Get performance recommendations based on stats and trends""" recommendations = [] if stats['avg_ms'] > 100: recommendations.append("Consider optimizing this operation - average latency is high") if stats['p95_ms'] > stats['avg_ms'] * 3: recommendations.append("High latency variance detected - investigate outliers") if trends.get('trend') == 'increasing': recommendations.append("Latency is trending upward - monitor for performance degradation") if stats['count'] < 10: recommendations.append("Insufficient data for reliable analysis - collect more measurements") return recommendations def get_stats(self) -> Dict[str, Any]: """Get latency tracker statistics""" with self._lock: return { 'total_measurements': self.total_measurements, 'operations_tracked': len(self.measurements), 'active_measurements': sum(len(m) for m in self.active_measurements.values()), 'max_measurements': self.max_measurements, 'operations': list(self.measurements.keys()) } # Global latency tracker instance latency_tracker = LatencyTracker()