""" Memory usage monitoring and garbage collection optimization. """ import gc import sys import threading import tracemalloc from typing import Dict, List, Optional, Any, Tuple from collections import defaultdict, deque from datetime import datetime, timezone from dataclasses import dataclass try: from ..utils.logging import get_logger from ..utils.timing import get_current_timestamp except ImportError: from utils.logging import get_logger from utils.timing import get_current_timestamp # Import will be done lazily to avoid circular imports logger = get_logger(__name__) @dataclass class MemorySnapshot: """Memory usage snapshot""" timestamp: datetime total_memory_mb: float available_memory_mb: float process_memory_mb: float gc_collections: Dict[int, int] gc_objects: int tracemalloc_current_mb: Optional[float] = None tracemalloc_peak_mb: Optional[float] = None @dataclass class MemoryLeak: """Memory leak detection result""" object_type: str count_increase: int size_increase_mb: float growth_rate_per_hour: float severity: str # 'low', 'medium', 'high' class MemoryMonitor: """ Monitors memory usage, detects leaks, and optimizes garbage collection. Provides detailed memory analytics and automatic GC optimization. """ def __init__(self, enable_tracemalloc: bool = False, snapshot_interval: float = 60.0): """ Initialize memory monitor. Args: enable_tracemalloc: Whether to enable detailed memory tracing snapshot_interval: How often to take memory snapshots (seconds) """ self.enable_tracemalloc = enable_tracemalloc self.snapshot_interval = snapshot_interval # Memory tracking self.memory_snapshots: deque = deque(maxlen=1000) self.object_counts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100)) # GC optimization self.gc_stats: Dict[str, Any] = {} self.gc_thresholds = gc.get_threshold() self.auto_gc_enabled = True # Leak detection self.leak_detection_enabled = True self.detected_leaks: List[MemoryLeak] = [] # Monitoring control self._monitoring = False self._monitor_thread: Optional[threading.Thread] = None # Initialize tracemalloc if enabled if self.enable_tracemalloc and not tracemalloc.is_tracing(): tracemalloc.start() logger.info("Started tracemalloc for detailed memory tracking") logger.info(f"Memory monitor initialized (tracemalloc: {self.enable_tracemalloc})") def start_monitoring(self) -> None: """Start memory monitoring""" if self._monitoring: logger.warning("Memory monitoring already running") return self._monitoring = True self._monitor_thread = threading.Thread( target=self._monitoring_loop, name="MemoryMonitor", daemon=True ) self._monitor_thread.start() logger.info("Started memory monitoring") def stop_monitoring(self) -> None: """Stop memory monitoring""" if not self._monitoring: return self._monitoring = False if self._monitor_thread: self._monitor_thread.join(timeout=5.0) logger.info("Stopped memory monitoring") def _monitoring_loop(self) -> None: """Main monitoring loop""" import time while self._monitoring: try: # Take memory snapshot self._take_memory_snapshot() # Update object counts self._update_object_counts() # Check for memory leaks if self.leak_detection_enabled: self._check_for_leaks() # Optimize garbage collection if self.auto_gc_enabled: self._optimize_gc() # Periodic cleanup to prevent memory leaks in the monitor itself if hasattr(self, '_cleanup_counter'): self._cleanup_counter += 1 else: self._cleanup_counter = 1 if self._cleanup_counter % 10 == 0: # Every 10 cycles self._cleanup_monitor_data() time.sleep(self.snapshot_interval) except Exception as e: logger.error(f"Error in memory monitoring loop: {e}") time.sleep(self.snapshot_interval) def _take_memory_snapshot(self) -> None: """Take a memory usage snapshot""" try: import psutil # Get system memory info memory = psutil.virtual_memory() # Get process memory info process = psutil.Process() process_memory = process.memory_info() # Get GC stats gc_collections = {i: gc.get_count()[i] for i in range(3)} gc_objects = len(gc.get_objects()) # Get tracemalloc stats if enabled tracemalloc_current_mb = None tracemalloc_peak_mb = None if self.enable_tracemalloc and tracemalloc.is_tracing(): current, peak = tracemalloc.get_traced_memory() tracemalloc_current_mb = current / (1024 * 1024) tracemalloc_peak_mb = peak / (1024 * 1024) # Create snapshot snapshot = MemorySnapshot( timestamp=get_current_timestamp(), total_memory_mb=memory.total / (1024 * 1024), available_memory_mb=memory.available / (1024 * 1024), process_memory_mb=process_memory.rss / (1024 * 1024), gc_collections=gc_collections, gc_objects=gc_objects, tracemalloc_current_mb=tracemalloc_current_mb, tracemalloc_peak_mb=tracemalloc_peak_mb ) self.memory_snapshots.append(snapshot) # Update metrics try: from .metrics_collector import metrics_collector metrics_collector.set_gauge('memory_total_mb', snapshot.total_memory_mb) metrics_collector.set_gauge('memory_available_mb', snapshot.available_memory_mb) metrics_collector.set_gauge('memory_process_mb', snapshot.process_memory_mb) metrics_collector.set_gauge('memory_gc_objects', snapshot.gc_objects) if tracemalloc_current_mb is not None: metrics_collector.set_gauge('memory_tracemalloc_current_mb', tracemalloc_current_mb) metrics_collector.set_gauge('memory_tracemalloc_peak_mb', tracemalloc_peak_mb) except ImportError: pass # Metrics collector not available except Exception as e: logger.error(f"Error taking memory snapshot: {e}") def _update_object_counts(self) -> None: """Update object counts by type (limited to prevent memory leaks)""" try: # Only track specific object types to avoid creating too many objects tracked_types = { 'dict', 'list', 'tuple', 'str', 'function', 'type', 'SystemMetrics', 'MetricPoint', 'MemorySnapshot' } # Count only tracked object types object_counts = {} all_objects = gc.get_objects() for tracked_type in tracked_types: count = sum(1 for obj in all_objects if type(obj).__name__ == tracked_type) if count > 0: object_counts[tracked_type] = count # Store counts with timestamp (only for tracked types) timestamp = get_current_timestamp() for obj_type, count in object_counts.items(): self.object_counts[obj_type].append((timestamp, count)) # Clean up old entries to prevent memory growth for obj_type in list(self.object_counts.keys()): if len(self.object_counts[obj_type]) > 50: # Keep only last 50 entries # Remove oldest entries while len(self.object_counts[obj_type]) > 50: self.object_counts[obj_type].popleft() # Update metrics for common types (less frequently) try: from .metrics_collector import metrics_collector # Only update metrics every 5th call to reduce object creation if not hasattr(self, '_metrics_update_counter'): self._metrics_update_counter = 0 self._metrics_update_counter += 1 if self._metrics_update_counter % 5 == 0: for obj_type, count in object_counts.items(): metrics_collector.set_gauge(f'memory_objects_{obj_type}', count) except ImportError: pass # Metrics collector not available except Exception as e: logger.error(f"Error updating object counts: {e}") def _check_for_leaks(self) -> None: """Check for potential memory leaks (less aggressive)""" try: if len(self.memory_snapshots) < 20: # Need more data for reliable detection return # Only check every 10th call to reduce overhead if not hasattr(self, '_leak_check_counter'): self._leak_check_counter = 0 self._leak_check_counter += 1 if self._leak_check_counter % 10 != 0: return # Check for consistent memory growth over longer period recent_snapshots = list(self.memory_snapshots)[-20:] memory_values = [s.process_memory_mb for s in recent_snapshots] # More conservative growth detection if self._is_memory_growing(memory_values, threshold=20.0): # Increased threshold # Check object count growth potential_leaks = self._analyze_object_growth() for leak in potential_leaks: # Check if we already reported this leak recently existing_leak = next( (l for l in self.detected_leaks if l.object_type == leak.object_type), None ) if not existing_leak and leak.severity in ['medium', 'high']: self.detected_leaks.append(leak) logger.warning(f"Potential memory leak detected: {leak.object_type}") # Record leak detection try: from .metrics_collector import metrics_collector metrics_collector.increment_counter('memory_leaks_detected') except ImportError: pass # Clean up old leak reports (keep only last 10) if len(self.detected_leaks) > 10: self.detected_leaks = self.detected_leaks[-10:] except Exception as e: logger.error(f"Error checking for leaks: {e}") def _is_memory_growing(self, memory_values: List[float], threshold: float = 5.0) -> bool: """Check if memory is consistently growing""" if len(memory_values) < 5: return False # Check if memory increased by more than threshold MB growth = memory_values[-1] - memory_values[0] return growth > threshold def _analyze_object_growth(self) -> List[MemoryLeak]: """Analyze object count growth to identify potential leaks (more conservative)""" leaks = [] for obj_type, counts in self.object_counts.items(): if len(counts) < 20: # Need more data points continue # Get recent counts over longer period recent_counts = list(counts)[-20:] timestamps = [item[0] for item in recent_counts] count_values = [item[1] for item in recent_counts] # Check for sustained growth if len(count_values) >= 10: # Calculate growth over the period start_avg = sum(count_values[:5]) / 5 # Average of first 5 values end_avg = sum(count_values[-5:]) / 5 # Average of last 5 values growth = end_avg - start_avg time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours # More conservative thresholds if growth > 500 and time_diff > 0.5: # More than 500 objects growth over 30+ minutes growth_rate = growth / time_diff # Skip common types that naturally fluctuate if obj_type in ['dict', 'list', 'tuple', 'str']: continue # Determine severity with higher thresholds if growth_rate > 2000: severity = 'high' elif growth_rate > 500: severity = 'medium' else: severity = 'low' # Only report medium and high severity leaks if severity in ['medium', 'high']: leak = MemoryLeak( object_type=obj_type, count_increase=int(growth), size_increase_mb=growth * 0.001, # Rough estimate growth_rate_per_hour=growth_rate, severity=severity ) leaks.append(leak) return leaks def _optimize_gc(self) -> None: """Optimize garbage collection based on memory usage""" try: if not self.memory_snapshots: return latest_snapshot = self.memory_snapshots[-1] memory_usage_percent = ( (latest_snapshot.total_memory_mb - latest_snapshot.available_memory_mb) / latest_snapshot.total_memory_mb * 100 ) # Adjust GC thresholds based on memory pressure if memory_usage_percent > 85: # High memory pressure - more aggressive GC new_thresholds = (500, 10, 10) if gc.get_threshold() != new_thresholds: gc.set_threshold(*new_thresholds) logger.info("Enabled aggressive garbage collection due to high memory usage") # Force collection collected = gc.collect() metrics_collector.increment_counter('memory_gc_forced') logger.debug(f"Forced GC collected {collected} objects") elif memory_usage_percent < 50: # Low memory pressure - less aggressive GC new_thresholds = (1000, 20, 20) if gc.get_threshold() != new_thresholds: gc.set_threshold(*new_thresholds) logger.info("Reduced garbage collection frequency due to low memory usage") # Update GC stats self.gc_stats = { 'threshold': gc.get_threshold(), 'counts': gc.get_count(), 'collections': gc.get_stats() } except Exception as e: logger.error(f"Error optimizing GC: {e}") def _cleanup_monitor_data(self) -> None: """Clean up monitor data to prevent memory leaks""" try: # Limit memory snapshots if len(self.memory_snapshots) > 500: # Keep only the most recent 300 snapshots while len(self.memory_snapshots) > 300: self.memory_snapshots.popleft() # Clean up object counts for obj_type in list(self.object_counts.keys()): if len(self.object_counts[obj_type]) > 30: # Keep only the most recent 20 entries while len(self.object_counts[obj_type]) > 20: self.object_counts[obj_type].popleft() # Remove empty deques if len(self.object_counts[obj_type]) == 0: del self.object_counts[obj_type] # Limit detected leaks if len(self.detected_leaks) > 5: self.detected_leaks = self.detected_leaks[-5:] # Force a small garbage collection gc.collect() logger.debug("Cleaned up memory monitor data") except Exception as e: logger.error(f"Error cleaning up monitor data: {e}") def force_garbage_collection(self) -> Dict[str, int]: """Force garbage collection and return statistics""" try: # Get counts before collection before_counts = gc.get_count() before_objects = len(gc.get_objects()) # Force collection for all generations collected = [gc.collect(generation) for generation in range(3)] total_collected = sum(collected) # Get counts after collection after_counts = gc.get_count() after_objects = len(gc.get_objects()) # Update metrics try: from .metrics_collector import metrics_collector metrics_collector.increment_counter('memory_gc_manual') metrics_collector.set_gauge('memory_gc_objects_collected', total_collected) except ImportError: pass result = { 'total_collected': total_collected, 'by_generation': collected, 'objects_before': before_objects, 'objects_after': after_objects, 'objects_freed': before_objects - after_objects, 'counts_before': before_counts, 'counts_after': after_counts } logger.info(f"Manual GC collected {total_collected} objects, freed {result['objects_freed']} objects") return result except Exception as e: logger.error(f"Error during forced garbage collection: {e}") return {} def get_memory_usage_summary(self) -> Dict[str, Any]: """Get current memory usage summary""" if not self.memory_snapshots: return {} latest = self.memory_snapshots[-1] # Calculate memory usage percentage memory_usage_percent = ( (latest.total_memory_mb - latest.available_memory_mb) / latest.total_memory_mb * 100 ) return { 'timestamp': latest.timestamp.isoformat(), 'total_memory_mb': latest.total_memory_mb, 'available_memory_mb': latest.available_memory_mb, 'used_memory_mb': latest.total_memory_mb - latest.available_memory_mb, 'memory_usage_percent': memory_usage_percent, 'process_memory_mb': latest.process_memory_mb, 'gc_objects': latest.gc_objects, 'gc_collections': latest.gc_collections, 'tracemalloc_current_mb': latest.tracemalloc_current_mb, 'tracemalloc_peak_mb': latest.tracemalloc_peak_mb } def get_memory_trends(self, hours: int = 1) -> Dict[str, Any]: """Get memory usage trends over specified time period""" if not self.memory_snapshots: return {} from datetime import timedelta cutoff_time = get_current_timestamp() - timedelta(hours=hours) # Filter snapshots recent_snapshots = [ s for s in self.memory_snapshots if s.timestamp >= cutoff_time ] if len(recent_snapshots) < 2: return {'trend': 'insufficient_data'} # Calculate trends process_memory_values = [s.process_memory_mb for s in recent_snapshots] gc_object_values = [s.gc_objects for s in recent_snapshots] return { 'process_memory': { 'start_mb': process_memory_values[0], 'end_mb': process_memory_values[-1], 'change_mb': process_memory_values[-1] - process_memory_values[0], 'max_mb': max(process_memory_values), 'min_mb': min(process_memory_values), 'avg_mb': sum(process_memory_values) / len(process_memory_values) }, 'gc_objects': { 'start_count': gc_object_values[0], 'end_count': gc_object_values[-1], 'change_count': gc_object_values[-1] - gc_object_values[0], 'max_count': max(gc_object_values), 'min_count': min(gc_object_values), 'avg_count': sum(gc_object_values) / len(gc_object_values) }, 'sample_count': len(recent_snapshots), 'time_period_hours': hours } def get_top_memory_consumers(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top memory consuming object types""" if not self.object_counts: return [] # Get latest counts latest_counts = {} for obj_type, counts in self.object_counts.items(): if counts: latest_counts[obj_type] = counts[-1][1] # Get count from (timestamp, count) tuple # Sort by count sorted_types = sorted( latest_counts.items(), key=lambda x: x[1], reverse=True ) return [ { 'object_type': obj_type, 'count': count, 'estimated_size_mb': count * 0.001 # Rough estimate } for obj_type, count in sorted_types[:limit] ] def get_detected_leaks(self) -> List[Dict[str, Any]]: """Get detected memory leaks""" return [ { 'object_type': leak.object_type, 'count_increase': leak.count_increase, 'size_increase_mb': leak.size_increase_mb, 'growth_rate_per_hour': leak.growth_rate_per_hour, 'severity': leak.severity } for leak in self.detected_leaks ] def get_tracemalloc_top(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top memory allocations from tracemalloc""" if not self.enable_tracemalloc or not tracemalloc.is_tracing(): return [] try: snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') return [ { 'filename': stat.traceback.format()[0], 'size_mb': stat.size / (1024 * 1024), 'count': stat.count } for stat in top_stats[:limit] ] except Exception as e: logger.error(f"Error getting tracemalloc top: {e}") return [] def clear_leak_history(self) -> None: """Clear detected leak history""" self.detected_leaks.clear() logger.info("Cleared memory leak history") def get_gc_stats(self) -> Dict[str, Any]: """Get garbage collection statistics""" return { 'thresholds': gc.get_threshold(), 'counts': gc.get_count(), 'stats': gc.get_stats(), 'auto_gc_enabled': self.auto_gc_enabled, 'is_enabled': gc.isenabled() } def set_gc_thresholds(self, gen0: int, gen1: int, gen2: int) -> None: """Set garbage collection thresholds""" gc.set_threshold(gen0, gen1, gen2) logger.info(f"Set GC thresholds to ({gen0}, {gen1}, {gen2})") def enable_auto_gc_optimization(self, enabled: bool = True) -> None: """Enable or disable automatic GC optimization""" self.auto_gc_enabled = enabled logger.info(f"Auto GC optimization {'enabled' if enabled else 'disabled'}") def enable_leak_detection(self, enabled: bool = True) -> None: """Enable or disable memory leak detection""" self.leak_detection_enabled = enabled logger.info(f"Memory leak detection {'enabled' if enabled else 'disabled'}") def get_stats(self) -> Dict[str, Any]: """Get memory monitor statistics""" return { 'monitoring': self._monitoring, 'snapshot_interval': self.snapshot_interval, 'snapshots_count': len(self.memory_snapshots), 'object_types_tracked': len(self.object_counts), 'detected_leaks': len(self.detected_leaks), 'tracemalloc_enabled': self.enable_tracemalloc and tracemalloc.is_tracing(), 'auto_gc_enabled': self.auto_gc_enabled, 'leak_detection_enabled': self.leak_detection_enabled, 'gc_thresholds': gc.get_threshold() } # Global memory monitor instance memory_monitor = MemoryMonitor()