Files
gogo2/COBY/monitoring/memory_monitor.py
Dobromir Popov bf4d43f6f7 debugging web ui
2025-08-05 15:58:51 +03:00

652 lines
26 KiB
Python

"""
Memory usage monitoring and garbage collection optimization.
"""
import gc
import sys
import threading
import tracemalloc
from typing import Dict, List, Optional, Any, Tuple
from collections import defaultdict, deque
from datetime import datetime, timezone
from dataclasses import dataclass
try:
from ..utils.logging import get_logger
from ..utils.timing import get_current_timestamp
except ImportError:
from utils.logging import get_logger
from utils.timing import get_current_timestamp
# Import will be done lazily to avoid circular imports
logger = get_logger(__name__)
@dataclass
class MemorySnapshot:
"""Memory usage snapshot"""
timestamp: datetime
total_memory_mb: float
available_memory_mb: float
process_memory_mb: float
gc_collections: Dict[int, int]
gc_objects: int
tracemalloc_current_mb: Optional[float] = None
tracemalloc_peak_mb: Optional[float] = None
@dataclass
class MemoryLeak:
"""Memory leak detection result"""
object_type: str
count_increase: int
size_increase_mb: float
growth_rate_per_hour: float
severity: str # 'low', 'medium', 'high'
class MemoryMonitor:
"""
Monitors memory usage, detects leaks, and optimizes garbage collection.
Provides detailed memory analytics and automatic GC optimization.
"""
def __init__(self, enable_tracemalloc: bool = False, snapshot_interval: float = 60.0):
"""
Initialize memory monitor.
Args:
enable_tracemalloc: Whether to enable detailed memory tracing
snapshot_interval: How often to take memory snapshots (seconds)
"""
self.enable_tracemalloc = enable_tracemalloc
self.snapshot_interval = snapshot_interval
# Memory tracking
self.memory_snapshots: deque = deque(maxlen=1000)
self.object_counts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100))
# GC optimization
self.gc_stats: Dict[str, Any] = {}
self.gc_thresholds = gc.get_threshold()
self.auto_gc_enabled = True
# Leak detection
self.leak_detection_enabled = True
self.detected_leaks: List[MemoryLeak] = []
# Monitoring control
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
# Initialize tracemalloc if enabled
if self.enable_tracemalloc and not tracemalloc.is_tracing():
tracemalloc.start()
logger.info("Started tracemalloc for detailed memory tracking")
logger.info(f"Memory monitor initialized (tracemalloc: {self.enable_tracemalloc})")
def start_monitoring(self) -> None:
"""Start memory monitoring"""
if self._monitoring:
logger.warning("Memory monitoring already running")
return
self._monitoring = True
self._monitor_thread = threading.Thread(
target=self._monitoring_loop,
name="MemoryMonitor",
daemon=True
)
self._monitor_thread.start()
logger.info("Started memory monitoring")
def stop_monitoring(self) -> None:
"""Stop memory monitoring"""
if not self._monitoring:
return
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5.0)
logger.info("Stopped memory monitoring")
def _monitoring_loop(self) -> None:
"""Main monitoring loop"""
import time
while self._monitoring:
try:
# Take memory snapshot
self._take_memory_snapshot()
# Update object counts
self._update_object_counts()
# Check for memory leaks
if self.leak_detection_enabled:
self._check_for_leaks()
# Optimize garbage collection
if self.auto_gc_enabled:
self._optimize_gc()
# Periodic cleanup to prevent memory leaks in the monitor itself
if hasattr(self, '_cleanup_counter'):
self._cleanup_counter += 1
else:
self._cleanup_counter = 1
if self._cleanup_counter % 10 == 0: # Every 10 cycles
self._cleanup_monitor_data()
time.sleep(self.snapshot_interval)
except Exception as e:
logger.error(f"Error in memory monitoring loop: {e}")
time.sleep(self.snapshot_interval)
def _take_memory_snapshot(self) -> None:
"""Take a memory usage snapshot"""
try:
import psutil
# Get system memory info
memory = psutil.virtual_memory()
# Get process memory info
process = psutil.Process()
process_memory = process.memory_info()
# Get GC stats
gc_collections = {i: gc.get_count()[i] for i in range(3)}
gc_objects = len(gc.get_objects())
# Get tracemalloc stats if enabled
tracemalloc_current_mb = None
tracemalloc_peak_mb = None
if self.enable_tracemalloc and tracemalloc.is_tracing():
current, peak = tracemalloc.get_traced_memory()
tracemalloc_current_mb = current / (1024 * 1024)
tracemalloc_peak_mb = peak / (1024 * 1024)
# Create snapshot
snapshot = MemorySnapshot(
timestamp=get_current_timestamp(),
total_memory_mb=memory.total / (1024 * 1024),
available_memory_mb=memory.available / (1024 * 1024),
process_memory_mb=process_memory.rss / (1024 * 1024),
gc_collections=gc_collections,
gc_objects=gc_objects,
tracemalloc_current_mb=tracemalloc_current_mb,
tracemalloc_peak_mb=tracemalloc_peak_mb
)
self.memory_snapshots.append(snapshot)
# Update metrics
try:
from .metrics_collector import metrics_collector
metrics_collector.set_gauge('memory_total_mb', snapshot.total_memory_mb)
metrics_collector.set_gauge('memory_available_mb', snapshot.available_memory_mb)
metrics_collector.set_gauge('memory_process_mb', snapshot.process_memory_mb)
metrics_collector.set_gauge('memory_gc_objects', snapshot.gc_objects)
if tracemalloc_current_mb is not None:
metrics_collector.set_gauge('memory_tracemalloc_current_mb', tracemalloc_current_mb)
metrics_collector.set_gauge('memory_tracemalloc_peak_mb', tracemalloc_peak_mb)
except ImportError:
pass # Metrics collector not available
except Exception as e:
logger.error(f"Error taking memory snapshot: {e}")
def _update_object_counts(self) -> None:
"""Update object counts by type (limited to prevent memory leaks)"""
try:
# Only track specific object types to avoid creating too many objects
tracked_types = {
'dict', 'list', 'tuple', 'str', 'function', 'type',
'SystemMetrics', 'MetricPoint', 'MemorySnapshot'
}
# Count only tracked object types
object_counts = {}
all_objects = gc.get_objects()
for tracked_type in tracked_types:
count = sum(1 for obj in all_objects if type(obj).__name__ == tracked_type)
if count > 0:
object_counts[tracked_type] = count
# Store counts with timestamp (only for tracked types)
timestamp = get_current_timestamp()
for obj_type, count in object_counts.items():
self.object_counts[obj_type].append((timestamp, count))
# Clean up old entries to prevent memory growth
for obj_type in list(self.object_counts.keys()):
if len(self.object_counts[obj_type]) > 50: # Keep only last 50 entries
# Remove oldest entries
while len(self.object_counts[obj_type]) > 50:
self.object_counts[obj_type].popleft()
# Update metrics for common types (less frequently)
try:
from .metrics_collector import metrics_collector
# Only update metrics every 5th call to reduce object creation
if not hasattr(self, '_metrics_update_counter'):
self._metrics_update_counter = 0
self._metrics_update_counter += 1
if self._metrics_update_counter % 5 == 0:
for obj_type, count in object_counts.items():
metrics_collector.set_gauge(f'memory_objects_{obj_type}', count)
except ImportError:
pass # Metrics collector not available
except Exception as e:
logger.error(f"Error updating object counts: {e}")
def _check_for_leaks(self) -> None:
"""Check for potential memory leaks (less aggressive)"""
try:
if len(self.memory_snapshots) < 20: # Need more data for reliable detection
return
# Only check every 10th call to reduce overhead
if not hasattr(self, '_leak_check_counter'):
self._leak_check_counter = 0
self._leak_check_counter += 1
if self._leak_check_counter % 10 != 0:
return
# Check for consistent memory growth over longer period
recent_snapshots = list(self.memory_snapshots)[-20:]
memory_values = [s.process_memory_mb for s in recent_snapshots]
# More conservative growth detection
if self._is_memory_growing(memory_values, threshold=20.0): # Increased threshold
# Check object count growth
potential_leaks = self._analyze_object_growth()
for leak in potential_leaks:
# Check if we already reported this leak recently
existing_leak = next(
(l for l in self.detected_leaks if l.object_type == leak.object_type),
None
)
if not existing_leak and leak.severity in ['medium', 'high']:
self.detected_leaks.append(leak)
logger.warning(f"Potential memory leak detected: {leak.object_type}")
# Record leak detection
try:
from .metrics_collector import metrics_collector
metrics_collector.increment_counter('memory_leaks_detected')
except ImportError:
pass
# Clean up old leak reports (keep only last 10)
if len(self.detected_leaks) > 10:
self.detected_leaks = self.detected_leaks[-10:]
except Exception as e:
logger.error(f"Error checking for leaks: {e}")
def _is_memory_growing(self, memory_values: List[float], threshold: float = 5.0) -> bool:
"""Check if memory is consistently growing"""
if len(memory_values) < 5:
return False
# Check if memory increased by more than threshold MB
growth = memory_values[-1] - memory_values[0]
return growth > threshold
def _analyze_object_growth(self) -> List[MemoryLeak]:
"""Analyze object count growth to identify potential leaks (more conservative)"""
leaks = []
for obj_type, counts in self.object_counts.items():
if len(counts) < 20: # Need more data points
continue
# Get recent counts over longer period
recent_counts = list(counts)[-20:]
timestamps = [item[0] for item in recent_counts]
count_values = [item[1] for item in recent_counts]
# Check for sustained growth
if len(count_values) >= 10:
# Calculate growth over the period
start_avg = sum(count_values[:5]) / 5 # Average of first 5 values
end_avg = sum(count_values[-5:]) / 5 # Average of last 5 values
growth = end_avg - start_avg
time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours
# More conservative thresholds
if growth > 500 and time_diff > 0.5: # More than 500 objects growth over 30+ minutes
growth_rate = growth / time_diff
# Skip common types that naturally fluctuate
if obj_type in ['dict', 'list', 'tuple', 'str']:
continue
# Determine severity with higher thresholds
if growth_rate > 2000:
severity = 'high'
elif growth_rate > 500:
severity = 'medium'
else:
severity = 'low'
# Only report medium and high severity leaks
if severity in ['medium', 'high']:
leak = MemoryLeak(
object_type=obj_type,
count_increase=int(growth),
size_increase_mb=growth * 0.001, # Rough estimate
growth_rate_per_hour=growth_rate,
severity=severity
)
leaks.append(leak)
return leaks
def _optimize_gc(self) -> None:
"""Optimize garbage collection based on memory usage"""
try:
if not self.memory_snapshots:
return
latest_snapshot = self.memory_snapshots[-1]
memory_usage_percent = (
(latest_snapshot.total_memory_mb - latest_snapshot.available_memory_mb) /
latest_snapshot.total_memory_mb * 100
)
# Adjust GC thresholds based on memory pressure
if memory_usage_percent > 85:
# High memory pressure - more aggressive GC
new_thresholds = (500, 10, 10)
if gc.get_threshold() != new_thresholds:
gc.set_threshold(*new_thresholds)
logger.info("Enabled aggressive garbage collection due to high memory usage")
# Force collection
collected = gc.collect()
metrics_collector.increment_counter('memory_gc_forced')
logger.debug(f"Forced GC collected {collected} objects")
elif memory_usage_percent < 50:
# Low memory pressure - less aggressive GC
new_thresholds = (1000, 20, 20)
if gc.get_threshold() != new_thresholds:
gc.set_threshold(*new_thresholds)
logger.info("Reduced garbage collection frequency due to low memory usage")
# Update GC stats
self.gc_stats = {
'threshold': gc.get_threshold(),
'counts': gc.get_count(),
'collections': gc.get_stats()
}
except Exception as e:
logger.error(f"Error optimizing GC: {e}")
def _cleanup_monitor_data(self) -> None:
"""Clean up monitor data to prevent memory leaks"""
try:
# Limit memory snapshots
if len(self.memory_snapshots) > 500:
# Keep only the most recent 300 snapshots
while len(self.memory_snapshots) > 300:
self.memory_snapshots.popleft()
# Clean up object counts
for obj_type in list(self.object_counts.keys()):
if len(self.object_counts[obj_type]) > 30:
# Keep only the most recent 20 entries
while len(self.object_counts[obj_type]) > 20:
self.object_counts[obj_type].popleft()
# Remove empty deques
if len(self.object_counts[obj_type]) == 0:
del self.object_counts[obj_type]
# Limit detected leaks
if len(self.detected_leaks) > 5:
self.detected_leaks = self.detected_leaks[-5:]
# Force a small garbage collection
gc.collect()
logger.debug("Cleaned up memory monitor data")
except Exception as e:
logger.error(f"Error cleaning up monitor data: {e}")
def force_garbage_collection(self) -> Dict[str, int]:
"""Force garbage collection and return statistics"""
try:
# Get counts before collection
before_counts = gc.get_count()
before_objects = len(gc.get_objects())
# Force collection for all generations
collected = [gc.collect(generation) for generation in range(3)]
total_collected = sum(collected)
# Get counts after collection
after_counts = gc.get_count()
after_objects = len(gc.get_objects())
# Update metrics
try:
from .metrics_collector import metrics_collector
metrics_collector.increment_counter('memory_gc_manual')
metrics_collector.set_gauge('memory_gc_objects_collected', total_collected)
except ImportError:
pass
result = {
'total_collected': total_collected,
'by_generation': collected,
'objects_before': before_objects,
'objects_after': after_objects,
'objects_freed': before_objects - after_objects,
'counts_before': before_counts,
'counts_after': after_counts
}
logger.info(f"Manual GC collected {total_collected} objects, freed {result['objects_freed']} objects")
return result
except Exception as e:
logger.error(f"Error during forced garbage collection: {e}")
return {}
def get_memory_usage_summary(self) -> Dict[str, Any]:
"""Get current memory usage summary"""
if not self.memory_snapshots:
return {}
latest = self.memory_snapshots[-1]
# Calculate memory usage percentage
memory_usage_percent = (
(latest.total_memory_mb - latest.available_memory_mb) /
latest.total_memory_mb * 100
)
return {
'timestamp': latest.timestamp.isoformat(),
'total_memory_mb': latest.total_memory_mb,
'available_memory_mb': latest.available_memory_mb,
'used_memory_mb': latest.total_memory_mb - latest.available_memory_mb,
'memory_usage_percent': memory_usage_percent,
'process_memory_mb': latest.process_memory_mb,
'gc_objects': latest.gc_objects,
'gc_collections': latest.gc_collections,
'tracemalloc_current_mb': latest.tracemalloc_current_mb,
'tracemalloc_peak_mb': latest.tracemalloc_peak_mb
}
def get_memory_trends(self, hours: int = 1) -> Dict[str, Any]:
"""Get memory usage trends over specified time period"""
if not self.memory_snapshots:
return {}
from datetime import timedelta
cutoff_time = get_current_timestamp() - timedelta(hours=hours)
# Filter snapshots
recent_snapshots = [
s for s in self.memory_snapshots
if s.timestamp >= cutoff_time
]
if len(recent_snapshots) < 2:
return {'trend': 'insufficient_data'}
# Calculate trends
process_memory_values = [s.process_memory_mb for s in recent_snapshots]
gc_object_values = [s.gc_objects for s in recent_snapshots]
return {
'process_memory': {
'start_mb': process_memory_values[0],
'end_mb': process_memory_values[-1],
'change_mb': process_memory_values[-1] - process_memory_values[0],
'max_mb': max(process_memory_values),
'min_mb': min(process_memory_values),
'avg_mb': sum(process_memory_values) / len(process_memory_values)
},
'gc_objects': {
'start_count': gc_object_values[0],
'end_count': gc_object_values[-1],
'change_count': gc_object_values[-1] - gc_object_values[0],
'max_count': max(gc_object_values),
'min_count': min(gc_object_values),
'avg_count': sum(gc_object_values) / len(gc_object_values)
},
'sample_count': len(recent_snapshots),
'time_period_hours': hours
}
def get_top_memory_consumers(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top memory consuming object types"""
if not self.object_counts:
return []
# Get latest counts
latest_counts = {}
for obj_type, counts in self.object_counts.items():
if counts:
latest_counts[obj_type] = counts[-1][1] # Get count from (timestamp, count) tuple
# Sort by count
sorted_types = sorted(
latest_counts.items(),
key=lambda x: x[1],
reverse=True
)
return [
{
'object_type': obj_type,
'count': count,
'estimated_size_mb': count * 0.001 # Rough estimate
}
for obj_type, count in sorted_types[:limit]
]
def get_detected_leaks(self) -> List[Dict[str, Any]]:
"""Get detected memory leaks"""
return [
{
'object_type': leak.object_type,
'count_increase': leak.count_increase,
'size_increase_mb': leak.size_increase_mb,
'growth_rate_per_hour': leak.growth_rate_per_hour,
'severity': leak.severity
}
for leak in self.detected_leaks
]
def get_tracemalloc_top(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top memory allocations from tracemalloc"""
if not self.enable_tracemalloc or not tracemalloc.is_tracing():
return []
try:
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
return [
{
'filename': stat.traceback.format()[0],
'size_mb': stat.size / (1024 * 1024),
'count': stat.count
}
for stat in top_stats[:limit]
]
except Exception as e:
logger.error(f"Error getting tracemalloc top: {e}")
return []
def clear_leak_history(self) -> None:
"""Clear detected leak history"""
self.detected_leaks.clear()
logger.info("Cleared memory leak history")
def get_gc_stats(self) -> Dict[str, Any]:
"""Get garbage collection statistics"""
return {
'thresholds': gc.get_threshold(),
'counts': gc.get_count(),
'stats': gc.get_stats(),
'auto_gc_enabled': self.auto_gc_enabled,
'is_enabled': gc.isenabled()
}
def set_gc_thresholds(self, gen0: int, gen1: int, gen2: int) -> None:
"""Set garbage collection thresholds"""
gc.set_threshold(gen0, gen1, gen2)
logger.info(f"Set GC thresholds to ({gen0}, {gen1}, {gen2})")
def enable_auto_gc_optimization(self, enabled: bool = True) -> None:
"""Enable or disable automatic GC optimization"""
self.auto_gc_enabled = enabled
logger.info(f"Auto GC optimization {'enabled' if enabled else 'disabled'}")
def enable_leak_detection(self, enabled: bool = True) -> None:
"""Enable or disable memory leak detection"""
self.leak_detection_enabled = enabled
logger.info(f"Memory leak detection {'enabled' if enabled else 'disabled'}")
def get_stats(self) -> Dict[str, Any]:
"""Get memory monitor statistics"""
return {
'monitoring': self._monitoring,
'snapshot_interval': self.snapshot_interval,
'snapshots_count': len(self.memory_snapshots),
'object_types_tracked': len(self.object_counts),
'detected_leaks': len(self.detected_leaks),
'tracemalloc_enabled': self.enable_tracemalloc and tracemalloc.is_tracing(),
'auto_gc_enabled': self.auto_gc_enabled,
'leak_detection_enabled': self.leak_detection_enabled,
'gc_thresholds': gc.get_threshold()
}
# Global memory monitor instance
memory_monitor = MemoryMonitor()