diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index 13f7d06..9f475c6 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -147,6 +147,10 @@ - Implement Coinbase Pro WebSocket connector with proper authentication - Create Kraken WebSocket connector with their specific message format - Add exchange-specific data normalization for both exchanges + + + + - Implement proper error handling for each exchange's quirks - Write unit tests for both new exchange connectors - _Requirements: 1.1, 1.2, 1.4_ diff --git a/COBY/monitoring/__init__.py b/COBY/monitoring/__init__.py new file mode 100644 index 0000000..25a68a6 --- /dev/null +++ b/COBY/monitoring/__init__.py @@ -0,0 +1,17 @@ +""" +Performance monitoring and optimization module. +""" + +from .metrics_collector import MetricsCollector +from .performance_monitor import PerformanceMonitor +from .memory_monitor import MemoryMonitor +from .latency_tracker import LatencyTracker +from .alert_manager import AlertManager + +__all__ = [ + 'MetricsCollector', + 'PerformanceMonitor', + 'MemoryMonitor', + 'LatencyTracker', + 'AlertManager' +] \ No newline at end of file diff --git a/COBY/monitoring/alert_manager.py b/COBY/monitoring/alert_manager.py new file mode 100644 index 0000000..0fe80df --- /dev/null +++ b/COBY/monitoring/alert_manager.py @@ -0,0 +1,671 @@ +""" +Alert management system for performance degradation and system issues. +""" + +import smtplib +import json +from typing import Dict, List, Optional, Any, Callable +from collections import defaultdict, deque +from datetime import datetime, timezone, timedelta +from dataclasses import dataclass, field +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from enum import Enum + +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp + +logger = get_logger(__name__) + + +class AlertSeverity(Enum): + """Alert severity levels""" + INFO = "info" + WARNING = "warning" + CRITICAL = "critical" + + +class AlertStatus(Enum): + """Alert status""" + ACTIVE = "active" + RESOLVED = "resolved" + ACKNOWLEDGED = "acknowledged" + SUPPRESSED = "suppressed" + + +@dataclass +class Alert: + """Alert definition""" + id: str + name: str + description: str + severity: AlertSeverity + metric_name: str + threshold: float + comparison: str # 'gt', 'lt', 'eq', 'ne' + duration_seconds: int + status: AlertStatus = AlertStatus.ACTIVE + triggered_at: Optional[datetime] = None + resolved_at: Optional[datetime] = None + acknowledged_at: Optional[datetime] = None + acknowledged_by: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert alert to dictionary""" + return { + 'id': self.id, + 'name': self.name, + 'description': self.description, + 'severity': self.severity.value, + 'metric_name': self.metric_name, + 'threshold': self.threshold, + 'comparison': self.comparison, + 'duration_seconds': self.duration_seconds, + 'status': self.status.value, + 'triggered_at': self.triggered_at.isoformat() if self.triggered_at else None, + 'resolved_at': self.resolved_at.isoformat() if self.resolved_at else None, + 'acknowledged_at': self.acknowledged_at.isoformat() if self.acknowledged_at else None, + 'acknowledged_by': self.acknowledged_by, + 'metadata': self.metadata + } + + +@dataclass +class AlertRule: + """Alert rule configuration""" + name: str + metric_name: str + threshold: float + comparison: str + duration_seconds: int + severity: AlertSeverity + description: str = "" + enabled: bool = True + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class NotificationChannel: + """Notification channel configuration""" + name: str + type: str # 'email', 'webhook', 'slack' + config: Dict[str, Any] + enabled: bool = True + severity_filter: List[AlertSeverity] = field(default_factory=list) + + +class AlertManager: + """ + Manages alerts, notifications, and alert lifecycle. + + Provides comprehensive alerting with multiple notification channels + and alert suppression capabilities. + """ + + def __init__(self): + """Initialize alert manager""" + # Alert storage + self.alert_rules: Dict[str, AlertRule] = {} + self.active_alerts: Dict[str, Alert] = {} + self.alert_history: deque = deque(maxlen=10000) + + # Notification channels + self.notification_channels: Dict[str, NotificationChannel] = {} + + # Alert state tracking + self.metric_values: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100)) + self.alert_triggers: Dict[str, datetime] = {} + + # Suppression rules + self.suppression_rules: Dict[str, Dict[str, Any]] = {} + + # Callbacks + self.alert_callbacks: List[Callable[[Alert], None]] = [] + + # Statistics + self.alerts_triggered = 0 + self.alerts_resolved = 0 + self.notifications_sent = 0 + + logger.info("Alert manager initialized") + + def add_alert_rule(self, rule: AlertRule) -> None: + """Add an alert rule""" + self.alert_rules[rule.name] = rule + logger.info(f"Added alert rule: {rule.name}") + + def remove_alert_rule(self, rule_name: str) -> None: + """Remove an alert rule""" + if rule_name in self.alert_rules: + del self.alert_rules[rule_name] + # Also remove any active alerts for this rule + alerts_to_remove = [ + alert_id for alert_id, alert in self.active_alerts.items() + if alert.name == rule_name + ] + for alert_id in alerts_to_remove: + del self.active_alerts[alert_id] + logger.info(f"Removed alert rule: {rule_name}") + + def add_notification_channel(self, channel: NotificationChannel) -> None: + """Add a notification channel""" + self.notification_channels[channel.name] = channel + logger.info(f"Added notification channel: {channel.name} ({channel.type})") + + def remove_notification_channel(self, channel_name: str) -> None: + """Remove a notification channel""" + if channel_name in self.notification_channels: + del self.notification_channels[channel_name] + logger.info(f"Removed notification channel: {channel_name}") + + def update_metric_value(self, metric_name: str, value: float) -> None: + """Update metric value and check alerts""" + timestamp = get_current_timestamp() + self.metric_values[metric_name].append((timestamp, value)) + + # Check all alert rules for this metric + for rule_name, rule in self.alert_rules.items(): + if rule.metric_name == metric_name and rule.enabled: + self._check_alert_rule(rule, value, timestamp) + + def _check_alert_rule(self, rule: AlertRule, value: float, timestamp: datetime) -> None: + """Check if an alert rule should be triggered""" + try: + # Check if condition is met + condition_met = self._evaluate_condition(rule.comparison, value, rule.threshold) + + alert_id = f"{rule.name}_{rule.metric_name}" + + if condition_met: + # Check if we need to wait for duration + if alert_id not in self.alert_triggers: + self.alert_triggers[alert_id] = timestamp + return + + # Check if duration has passed + trigger_time = self.alert_triggers[alert_id] + if (timestamp - trigger_time).total_seconds() >= rule.duration_seconds: + # Trigger alert if not already active + if alert_id not in self.active_alerts: + self._trigger_alert(rule, value, timestamp) + else: + # Condition not met - clear trigger time and resolve alert if active + self.alert_triggers.pop(alert_id, None) + if alert_id in self.active_alerts: + self._resolve_alert(alert_id, timestamp) + + except Exception as e: + logger.error(f"Error checking alert rule {rule.name}: {e}") + + def _evaluate_condition(self, comparison: str, value: float, threshold: float) -> bool: + """Evaluate alert condition""" + if comparison == 'gt': + return value > threshold + elif comparison == 'lt': + return value < threshold + elif comparison == 'eq': + return abs(value - threshold) < 0.001 + elif comparison == 'ne': + return abs(value - threshold) >= 0.001 + elif comparison == 'gte': + return value >= threshold + elif comparison == 'lte': + return value <= threshold + else: + logger.warning(f"Unknown comparison operator: {comparison}") + return False + + def _trigger_alert(self, rule: AlertRule, value: float, timestamp: datetime) -> None: + """Trigger an alert""" + try: + alert_id = f"{rule.name}_{rule.metric_name}" + + # Create alert + alert = Alert( + id=alert_id, + name=rule.name, + description=rule.description or f"{rule.metric_name} {rule.comparison} {rule.threshold}", + severity=rule.severity, + metric_name=rule.metric_name, + threshold=rule.threshold, + comparison=rule.comparison, + duration_seconds=rule.duration_seconds, + triggered_at=timestamp, + metadata={ + 'current_value': value, + 'rule_metadata': rule.metadata + } + ) + + # Check suppression rules + if self._is_suppressed(alert): + alert.status = AlertStatus.SUPPRESSED + logger.info(f"Alert suppressed: {alert.name}") + return + + # Store alert + self.active_alerts[alert_id] = alert + self.alert_history.append(alert) + self.alerts_triggered += 1 + + logger.warning(f"Alert triggered: {alert.name} - {alert.description}") + + # Send notifications + self._send_notifications(alert) + + # Call callbacks + for callback in self.alert_callbacks: + try: + callback(alert) + except Exception as e: + logger.error(f"Error in alert callback: {e}") + + except Exception as e: + logger.error(f"Error triggering alert: {e}") + + def _resolve_alert(self, alert_id: str, timestamp: datetime) -> None: + """Resolve an alert""" + try: + if alert_id in self.active_alerts: + alert = self.active_alerts[alert_id] + alert.status = AlertStatus.RESOLVED + alert.resolved_at = timestamp + + # Move to history and remove from active + self.alert_history.append(alert) + del self.active_alerts[alert_id] + self.alerts_resolved += 1 + + logger.info(f"Alert resolved: {alert.name}") + + # Send resolution notifications + self._send_resolution_notifications(alert) + + except Exception as e: + logger.error(f"Error resolving alert {alert_id}: {e}") + + def _is_suppressed(self, alert: Alert) -> bool: + """Check if alert should be suppressed""" + for rule_name, rule in self.suppression_rules.items(): + try: + # Check if suppression rule applies + if self._matches_suppression_rule(alert, rule): + return True + except Exception as e: + logger.error(f"Error checking suppression rule {rule_name}: {e}") + + return False + + def _matches_suppression_rule(self, alert: Alert, rule: Dict[str, Any]) -> bool: + """Check if alert matches suppression rule""" + # Check alert name pattern + if 'alert_pattern' in rule: + import re + if not re.match(rule['alert_pattern'], alert.name): + return False + + # Check severity + if 'severity' in rule: + if alert.severity.value not in rule['severity']: + return False + + # Check time window + if 'time_window' in rule: + start_time = datetime.fromisoformat(rule['time_window']['start']) + end_time = datetime.fromisoformat(rule['time_window']['end']) + current_time = get_current_timestamp() + + if not (start_time <= current_time <= end_time): + return False + + return True + + def _send_notifications(self, alert: Alert) -> None: + """Send notifications for an alert""" + for channel_name, channel in self.notification_channels.items(): + try: + if not channel.enabled: + continue + + # Check severity filter + if channel.severity_filter and alert.severity not in channel.severity_filter: + continue + + # Send notification based on channel type + if channel.type == 'email': + self._send_email_notification(alert, channel) + elif channel.type == 'webhook': + self._send_webhook_notification(alert, channel) + elif channel.type == 'slack': + self._send_slack_notification(alert, channel) + else: + logger.warning(f"Unknown notification channel type: {channel.type}") + + self.notifications_sent += 1 + + except Exception as e: + logger.error(f"Error sending notification via {channel_name}: {e}") + + def _send_resolution_notifications(self, alert: Alert) -> None: + """Send resolution notifications""" + for channel_name, channel in self.notification_channels.items(): + try: + if not channel.enabled: + continue + + # Send resolution notification + if channel.type == 'email': + self._send_email_resolution(alert, channel) + elif channel.type == 'webhook': + self._send_webhook_resolution(alert, channel) + elif channel.type == 'slack': + self._send_slack_resolution(alert, channel) + + except Exception as e: + logger.error(f"Error sending resolution notification via {channel_name}: {e}") + + def _send_email_notification(self, alert: Alert, channel: NotificationChannel) -> None: + """Send email notification""" + try: + config = channel.config + + # Create message + msg = MIMEMultipart() + msg['From'] = config['from_email'] + msg['To'] = ', '.join(config['to_emails']) + msg['Subject'] = f"[{alert.severity.value.upper()}] {alert.name}" + + # Create body + body = f""" +Alert: {alert.name} +Severity: {alert.severity.value.upper()} +Description: {alert.description} +Metric: {alert.metric_name} +Current Value: {alert.metadata.get('current_value', 'N/A')} +Threshold: {alert.threshold} +Triggered At: {alert.triggered_at.isoformat() if alert.triggered_at else 'N/A'} + +Alert ID: {alert.id} + """ + + msg.attach(MIMEText(body, 'plain')) + + # Send email + with smtplib.SMTP(config['smtp_server'], config['smtp_port']) as server: + if config.get('use_tls', True): + server.starttls() + if 'username' in config and 'password' in config: + server.login(config['username'], config['password']) + server.send_message(msg) + + logger.info(f"Email notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending email notification: {e}") + + def _send_webhook_notification(self, alert: Alert, channel: NotificationChannel) -> None: + """Send webhook notification""" + try: + import requests + + config = channel.config + payload = { + 'alert': alert.to_dict(), + 'type': 'alert_triggered' + } + + response = requests.post( + config['url'], + json=payload, + headers=config.get('headers', {}), + timeout=config.get('timeout', 10) + ) + response.raise_for_status() + + logger.info(f"Webhook notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending webhook notification: {e}") + + def _send_slack_notification(self, alert: Alert, channel: NotificationChannel) -> None: + """Send Slack notification""" + try: + import requests + + config = channel.config + + # Create Slack message + color = { + AlertSeverity.INFO: 'good', + AlertSeverity.WARNING: 'warning', + AlertSeverity.CRITICAL: 'danger' + }.get(alert.severity, 'warning') + + payload = { + 'channel': config['channel'], + 'username': config.get('username', 'AlertBot'), + 'attachments': [{ + 'color': color, + 'title': f"{alert.severity.value.upper()}: {alert.name}", + 'text': alert.description, + 'fields': [ + {'title': 'Metric', 'value': alert.metric_name, 'short': True}, + {'title': 'Current Value', 'value': str(alert.metadata.get('current_value', 'N/A')), 'short': True}, + {'title': 'Threshold', 'value': str(alert.threshold), 'short': True}, + {'title': 'Triggered At', 'value': alert.triggered_at.isoformat() if alert.triggered_at else 'N/A', 'short': True} + ], + 'footer': f"Alert ID: {alert.id}" + }] + } + + response = requests.post( + config['webhook_url'], + json=payload, + timeout=10 + ) + response.raise_for_status() + + logger.info(f"Slack notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending Slack notification: {e}") + + def _send_email_resolution(self, alert: Alert, channel: NotificationChannel) -> None: + """Send email resolution notification""" + try: + config = channel.config + + # Create message + msg = MIMEMultipart() + msg['From'] = config['from_email'] + msg['To'] = ', '.join(config['to_emails']) + msg['Subject'] = f"[RESOLVED] {alert.name}" + + # Create body + duration = "" + if alert.triggered_at and alert.resolved_at: + duration = str(alert.resolved_at - alert.triggered_at) + + body = f""" +Alert RESOLVED: {alert.name} +Severity: {alert.severity.value.upper()} +Description: {alert.description} +Metric: {alert.metric_name} +Threshold: {alert.threshold} +Triggered At: {alert.triggered_at.isoformat() if alert.triggered_at else 'N/A'} +Resolved At: {alert.resolved_at.isoformat() if alert.resolved_at else 'N/A'} +Duration: {duration} + +Alert ID: {alert.id} + """ + + msg.attach(MIMEText(body, 'plain')) + + # Send email + with smtplib.SMTP(config['smtp_server'], config['smtp_port']) as server: + if config.get('use_tls', True): + server.starttls() + if 'username' in config and 'password' in config: + server.login(config['username'], config['password']) + server.send_message(msg) + + logger.info(f"Email resolution notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending email resolution notification: {e}") + + def _send_webhook_resolution(self, alert: Alert, channel: NotificationChannel) -> None: + """Send webhook resolution notification""" + try: + import requests + + config = channel.config + payload = { + 'alert': alert.to_dict(), + 'type': 'alert_resolved' + } + + response = requests.post( + config['url'], + json=payload, + headers=config.get('headers', {}), + timeout=config.get('timeout', 10) + ) + response.raise_for_status() + + logger.info(f"Webhook resolution notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending webhook resolution notification: {e}") + + def _send_slack_resolution(self, alert: Alert, channel: NotificationChannel) -> None: + """Send Slack resolution notification""" + try: + import requests + + config = channel.config + + duration = "" + if alert.triggered_at and alert.resolved_at: + duration = str(alert.resolved_at - alert.triggered_at) + + payload = { + 'channel': config['channel'], + 'username': config.get('username', 'AlertBot'), + 'attachments': [{ + 'color': 'good', + 'title': f"RESOLVED: {alert.name}", + 'text': f"Alert has been resolved: {alert.description}", + 'fields': [ + {'title': 'Duration', 'value': duration, 'short': True}, + {'title': 'Resolved At', 'value': alert.resolved_at.isoformat() if alert.resolved_at else 'N/A', 'short': True} + ], + 'footer': f"Alert ID: {alert.id}" + }] + } + + response = requests.post( + config['webhook_url'], + json=payload, + timeout=10 + ) + response.raise_for_status() + + logger.info(f"Slack resolution notification sent for alert: {alert.name}") + + except Exception as e: + logger.error(f"Error sending Slack resolution notification: {e}") + + def acknowledge_alert(self, alert_id: str, acknowledged_by: str) -> bool: + """Acknowledge an alert""" + if alert_id in self.active_alerts: + alert = self.active_alerts[alert_id] + alert.status = AlertStatus.ACKNOWLEDGED + alert.acknowledged_at = get_current_timestamp() + alert.acknowledged_by = acknowledged_by + + logger.info(f"Alert acknowledged by {acknowledged_by}: {alert.name}") + return True + + return False + + def suppress_alert(self, alert_id: str) -> bool: + """Suppress an alert""" + if alert_id in self.active_alerts: + alert = self.active_alerts[alert_id] + alert.status = AlertStatus.SUPPRESSED + + logger.info(f"Alert suppressed: {alert.name}") + return True + + return False + + def add_suppression_rule(self, name: str, rule: Dict[str, Any]) -> None: + """Add alert suppression rule""" + self.suppression_rules[name] = rule + logger.info(f"Added suppression rule: {name}") + + def remove_suppression_rule(self, name: str) -> None: + """Remove alert suppression rule""" + if name in self.suppression_rules: + del self.suppression_rules[name] + logger.info(f"Removed suppression rule: {name}") + + def get_active_alerts(self, severity: AlertSeverity = None) -> List[Alert]: + """Get active alerts, optionally filtered by severity""" + alerts = list(self.active_alerts.values()) + + if severity: + alerts = [alert for alert in alerts if alert.severity == severity] + + return sorted(alerts, key=lambda x: x.triggered_at or datetime.min, reverse=True) + + def get_alert_history(self, limit: int = 100, severity: AlertSeverity = None) -> List[Alert]: + """Get alert history""" + alerts = list(self.alert_history) + + if severity: + alerts = [alert for alert in alerts if alert.severity == severity] + + return sorted(alerts, key=lambda x: x.triggered_at or datetime.min, reverse=True)[:limit] + + def get_alert_summary(self) -> Dict[str, Any]: + """Get alert summary statistics""" + active_by_severity = defaultdict(int) + for alert in self.active_alerts.values(): + active_by_severity[alert.severity.value] += 1 + + return { + 'active_alerts': len(self.active_alerts), + 'active_by_severity': dict(active_by_severity), + 'total_triggered': self.alerts_triggered, + 'total_resolved': self.alerts_resolved, + 'notifications_sent': self.notifications_sent, + 'alert_rules': len(self.alert_rules), + 'notification_channels': len(self.notification_channels), + 'suppression_rules': len(self.suppression_rules) + } + + def register_callback(self, callback: Callable[[Alert], None]) -> None: + """Register alert callback""" + self.alert_callbacks.append(callback) + logger.info(f"Registered alert callback: {callback.__name__}") + + def get_stats(self) -> Dict[str, Any]: + """Get alert manager statistics""" + return { + 'alert_rules': len(self.alert_rules), + 'active_alerts': len(self.active_alerts), + 'alert_history_count': len(self.alert_history), + 'notification_channels': len(self.notification_channels), + 'suppression_rules': len(self.suppression_rules), + 'alerts_triggered': self.alerts_triggered, + 'alerts_resolved': self.alerts_resolved, + 'notifications_sent': self.notifications_sent, + 'registered_callbacks': len(self.alert_callbacks) + } + + +# Global alert manager instance +alert_manager = AlertManager() \ No newline at end of file diff --git a/COBY/monitoring/latency_tracker.py b/COBY/monitoring/latency_tracker.py new file mode 100644 index 0000000..1c17301 --- /dev/null +++ b/COBY/monitoring/latency_tracker.py @@ -0,0 +1,497 @@ +""" +End-to-end latency tracking for data processing pipeline. +""" + +import time +import threading +from typing import Dict, List, Optional, Any, ContextManager +from collections import defaultdict, deque +from datetime import datetime, timezone +from dataclasses import dataclass +from contextlib import contextmanager + +from ..utils.logging import get_logger, set_correlation_id +from ..utils.timing import get_current_timestamp +# Import will be done lazily to avoid circular imports + +logger = get_logger(__name__) + + +@dataclass +class LatencyMeasurement: + """Individual latency measurement""" + operation: str + start_time: float + end_time: float + duration_ms: float + correlation_id: Optional[str] = None + metadata: Dict[str, Any] = None + + @property + def duration_seconds(self) -> float: + """Get duration in seconds""" + return self.duration_ms / 1000.0 + + +class LatencyTracker: + """ + Tracks end-to-end latency for various operations in the system. + + Provides context managers for easy latency measurement and + comprehensive latency analytics. + """ + + def __init__(self, max_measurements: int = 10000): + """ + Initialize latency tracker. + + Args: + max_measurements: Maximum number of measurements to keep in memory + """ + self.max_measurements = max_measurements + + # Latency storage + self.measurements: Dict[str, deque] = defaultdict( + lambda: deque(maxlen=max_measurements) + ) + + # Active measurements (for tracking ongoing operations) + self.active_measurements: Dict[str, Dict[str, float]] = defaultdict(dict) + + # Thread safety + self._lock = threading.RLock() + + # Statistics + self.total_measurements = 0 + + logger.info(f"Latency tracker initialized with max {max_measurements} measurements") + + @contextmanager + def measure(self, operation: str, correlation_id: str = None, + metadata: Dict[str, Any] = None) -> ContextManager[None]: + """ + Context manager for measuring operation latency. + + Args: + operation: Name of the operation being measured + correlation_id: Optional correlation ID for tracking + metadata: Optional metadata to store with measurement + + Usage: + with latency_tracker.measure('data_processing'): + # Your code here + process_data() + """ + start_time = time.perf_counter() + measurement_id = f"{operation}_{start_time}_{threading.get_ident()}" + + # Store active measurement + with self._lock: + self.active_measurements[operation][measurement_id] = start_time + + try: + yield + finally: + end_time = time.perf_counter() + duration_ms = (end_time - start_time) * 1000 + + # Create measurement + measurement = LatencyMeasurement( + operation=operation, + start_time=start_time, + end_time=end_time, + duration_ms=duration_ms, + correlation_id=correlation_id, + metadata=metadata or {} + ) + + # Store measurement + with self._lock: + self.measurements[operation].append(measurement) + self.active_measurements[operation].pop(measurement_id, None) + self.total_measurements += 1 + + # Record in metrics collector + try: + from .metrics_collector import metrics_collector + metrics_collector.observe_histogram( + f"{operation}_latency_ms", + duration_ms, + labels={'operation': operation} + ) + except ImportError: + pass # Metrics collector not available + + logger.debug(f"Measured {operation}: {duration_ms:.2f}ms") + + def start_measurement(self, operation: str, measurement_id: str = None, + correlation_id: str = None) -> str: + """ + Start a manual latency measurement. + + Args: + operation: Name of the operation + measurement_id: Optional custom measurement ID + correlation_id: Optional correlation ID + + Returns: + str: Measurement ID for ending the measurement + """ + start_time = time.perf_counter() + + if measurement_id is None: + measurement_id = f"{operation}_{start_time}_{threading.get_ident()}" + + with self._lock: + self.active_measurements[operation][measurement_id] = start_time + + logger.debug(f"Started measurement {measurement_id} for {operation}") + return measurement_id + + def end_measurement(self, operation: str, measurement_id: str, + metadata: Dict[str, Any] = None) -> Optional[LatencyMeasurement]: + """ + End a manual latency measurement. + + Args: + operation: Name of the operation + measurement_id: Measurement ID from start_measurement + metadata: Optional metadata to store + + Returns: + LatencyMeasurement: The completed measurement, or None if not found + """ + end_time = time.perf_counter() + + with self._lock: + start_time = self.active_measurements[operation].pop(measurement_id, None) + + if start_time is None: + logger.warning(f"No active measurement found: {measurement_id}") + return None + + duration_ms = (end_time - start_time) * 1000 + + # Create measurement + measurement = LatencyMeasurement( + operation=operation, + start_time=start_time, + end_time=end_time, + duration_ms=duration_ms, + metadata=metadata or {} + ) + + # Store measurement + with self._lock: + self.measurements[operation].append(measurement) + self.total_measurements += 1 + + # Record in metrics collector + try: + from .metrics_collector import metrics_collector + metrics_collector.observe_histogram( + f"{operation}_latency_ms", + duration_ms, + labels={'operation': operation} + ) + except ImportError: + pass # Metrics collector not available + + logger.debug(f"Completed measurement {measurement_id}: {duration_ms:.2f}ms") + return measurement + + def get_latency_stats(self, operation: str) -> Dict[str, float]: + """ + Get latency statistics for an operation. + + Args: + operation: Operation name + + Returns: + Dict: Latency statistics + """ + with self._lock: + measurements = list(self.measurements[operation]) + + if not measurements: + return { + 'count': 0, + 'avg_ms': 0.0, + 'min_ms': 0.0, + 'max_ms': 0.0, + 'p50_ms': 0.0, + 'p95_ms': 0.0, + 'p99_ms': 0.0 + } + + durations = [m.duration_ms for m in measurements] + durations.sort() + + count = len(durations) + avg_ms = sum(durations) / count + min_ms = durations[0] + max_ms = durations[-1] + + # Calculate percentiles + p50_ms = durations[int(0.50 * count)] + p95_ms = durations[int(0.95 * count)] + p99_ms = durations[int(0.99 * count)] + + return { + 'count': count, + 'avg_ms': avg_ms, + 'min_ms': min_ms, + 'max_ms': max_ms, + 'p50_ms': p50_ms, + 'p95_ms': p95_ms, + 'p99_ms': p99_ms + } + + def get_all_latency_stats(self) -> Dict[str, Dict[str, float]]: + """Get latency statistics for all operations""" + with self._lock: + operations = list(self.measurements.keys()) + + return { + operation: self.get_latency_stats(operation) + for operation in operations + } + + def get_recent_measurements(self, operation: str, limit: int = 100) -> List[LatencyMeasurement]: + """ + Get recent measurements for an operation. + + Args: + operation: Operation name + limit: Maximum number of measurements to return + + Returns: + List[LatencyMeasurement]: Recent measurements + """ + with self._lock: + measurements = list(self.measurements[operation]) + + return measurements[-limit:] + + def get_slow_operations(self, threshold_ms: float = 100.0) -> List[Dict[str, Any]]: + """ + Get operations that are slower than threshold. + + Args: + threshold_ms: Latency threshold in milliseconds + + Returns: + List: Slow operations with their stats + """ + slow_operations = [] + + for operation in self.measurements.keys(): + stats = self.get_latency_stats(operation) + if stats['avg_ms'] > threshold_ms: + slow_operations.append({ + 'operation': operation, + 'avg_latency_ms': stats['avg_ms'], + 'p95_latency_ms': stats['p95_ms'], + 'count': stats['count'] + }) + + # Sort by average latency (descending) + slow_operations.sort(key=lambda x: x['avg_latency_ms'], reverse=True) + return slow_operations + + def get_latency_trends(self, operation: str, window_size: int = 100) -> Dict[str, Any]: + """ + Get latency trends for an operation. + + Args: + operation: Operation name + window_size: Number of recent measurements to analyze + + Returns: + Dict: Trend analysis + """ + recent_measurements = self.get_recent_measurements(operation, window_size) + + if len(recent_measurements) < 2: + return {'trend': 'insufficient_data'} + + # Split into two halves for trend analysis + mid_point = len(recent_measurements) // 2 + first_half = recent_measurements[:mid_point] + second_half = recent_measurements[mid_point:] + + first_avg = sum(m.duration_ms for m in first_half) / len(first_half) + second_avg = sum(m.duration_ms for m in second_half) / len(second_half) + + # Calculate trend + change_percent = ((second_avg - first_avg) / first_avg) * 100 + + if abs(change_percent) < 5: + trend = 'stable' + elif change_percent > 0: + trend = 'increasing' + else: + trend = 'decreasing' + + return { + 'trend': trend, + 'change_percent': change_percent, + 'first_half_avg_ms': first_avg, + 'second_half_avg_ms': second_avg, + 'sample_size': len(recent_measurements) + } + + def get_active_measurements_count(self) -> Dict[str, int]: + """Get count of currently active measurements by operation""" + with self._lock: + return { + operation: len(measurements) + for operation, measurements in self.active_measurements.items() + if measurements + } + + def get_latency_distribution(self, operation: str, bucket_size_ms: float = 10.0) -> Dict[str, int]: + """ + Get latency distribution in buckets. + + Args: + operation: Operation name + bucket_size_ms: Size of each bucket in milliseconds + + Returns: + Dict: Latency distribution buckets + """ + with self._lock: + measurements = list(self.measurements[operation]) + + if not measurements: + return {} + + # Create buckets + distribution = defaultdict(int) + + for measurement in measurements: + bucket = int(measurement.duration_ms // bucket_size_ms) * bucket_size_ms + bucket_label = f"{bucket:.0f}-{bucket + bucket_size_ms:.0f}ms" + distribution[bucket_label] += 1 + + return dict(distribution) + + def export_measurements(self, operation: str = None, + format: str = 'json') -> List[Dict[str, Any]]: + """ + Export measurements for analysis. + + Args: + operation: Specific operation to export (None for all) + format: Export format ('json', 'csv') + + Returns: + List: Exported measurement data + """ + exported_data = [] + + operations = [operation] if operation else list(self.measurements.keys()) + + for op in operations: + with self._lock: + measurements = list(self.measurements[op]) + + for measurement in measurements: + data = { + 'operation': measurement.operation, + 'duration_ms': measurement.duration_ms, + 'start_time': measurement.start_time, + 'end_time': measurement.end_time, + 'correlation_id': measurement.correlation_id, + 'metadata': measurement.metadata + } + exported_data.append(data) + + return exported_data + + def clear_measurements(self, operation: str = None) -> None: + """ + Clear measurements for an operation or all operations. + + Args: + operation: Specific operation to clear (None for all) + """ + with self._lock: + if operation: + self.measurements[operation].clear() + logger.info(f"Cleared measurements for operation: {operation}") + else: + self.measurements.clear() + self.total_measurements = 0 + logger.info("Cleared all measurements") + + def get_performance_impact(self, operation: str) -> Dict[str, Any]: + """ + Analyze performance impact of an operation. + + Args: + operation: Operation name + + Returns: + Dict: Performance impact analysis + """ + stats = self.get_latency_stats(operation) + trends = self.get_latency_trends(operation) + + # Determine impact level + avg_latency = stats['avg_ms'] + if avg_latency < 10: + impact_level = 'low' + elif avg_latency < 100: + impact_level = 'medium' + else: + impact_level = 'high' + + # Check for performance degradation + degradation = trends.get('trend') == 'increasing' and trends.get('change_percent', 0) > 20 + + return { + 'operation': operation, + 'impact_level': impact_level, + 'avg_latency_ms': avg_latency, + 'p95_latency_ms': stats['p95_ms'], + 'measurement_count': stats['count'], + 'trend': trends.get('trend', 'unknown'), + 'performance_degradation': degradation, + 'recommendations': self._get_performance_recommendations(stats, trends) + } + + def _get_performance_recommendations(self, stats: Dict[str, float], + trends: Dict[str, Any]) -> List[str]: + """Get performance recommendations based on stats and trends""" + recommendations = [] + + if stats['avg_ms'] > 100: + recommendations.append("Consider optimizing this operation - average latency is high") + + if stats['p95_ms'] > stats['avg_ms'] * 3: + recommendations.append("High latency variance detected - investigate outliers") + + if trends.get('trend') == 'increasing': + recommendations.append("Latency is trending upward - monitor for performance degradation") + + if stats['count'] < 10: + recommendations.append("Insufficient data for reliable analysis - collect more measurements") + + return recommendations + + def get_stats(self) -> Dict[str, Any]: + """Get latency tracker statistics""" + with self._lock: + return { + 'total_measurements': self.total_measurements, + 'operations_tracked': len(self.measurements), + 'active_measurements': sum(len(m) for m in self.active_measurements.values()), + 'max_measurements': self.max_measurements, + 'operations': list(self.measurements.keys()) + } + + +# Global latency tracker instance +latency_tracker = LatencyTracker() \ No newline at end of file diff --git a/COBY/monitoring/memory_monitor.py b/COBY/monitoring/memory_monitor.py new file mode 100644 index 0000000..d245c89 --- /dev/null +++ b/COBY/monitoring/memory_monitor.py @@ -0,0 +1,561 @@ +""" +Memory usage monitoring and garbage collection optimization. +""" + +import gc +import sys +import threading +import tracemalloc +from typing import Dict, List, Optional, Any, Tuple +from collections import defaultdict, deque +from datetime import datetime, timezone +from dataclasses import dataclass + +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp +# Import will be done lazily to avoid circular imports + +logger = get_logger(__name__) + + +@dataclass +class MemorySnapshot: + """Memory usage snapshot""" + timestamp: datetime + total_memory_mb: float + available_memory_mb: float + process_memory_mb: float + gc_collections: Dict[int, int] + gc_objects: int + tracemalloc_current_mb: Optional[float] = None + tracemalloc_peak_mb: Optional[float] = None + + +@dataclass +class MemoryLeak: + """Memory leak detection result""" + object_type: str + count_increase: int + size_increase_mb: float + growth_rate_per_hour: float + severity: str # 'low', 'medium', 'high' + + +class MemoryMonitor: + """ + Monitors memory usage, detects leaks, and optimizes garbage collection. + + Provides detailed memory analytics and automatic GC optimization. + """ + + def __init__(self, enable_tracemalloc: bool = True, snapshot_interval: float = 30.0): + """ + Initialize memory monitor. + + Args: + enable_tracemalloc: Whether to enable detailed memory tracing + snapshot_interval: How often to take memory snapshots (seconds) + """ + self.enable_tracemalloc = enable_tracemalloc + self.snapshot_interval = snapshot_interval + + # Memory tracking + self.memory_snapshots: deque = deque(maxlen=1000) + self.object_counts: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100)) + + # GC optimization + self.gc_stats: Dict[str, Any] = {} + self.gc_thresholds = gc.get_threshold() + self.auto_gc_enabled = True + + # Leak detection + self.leak_detection_enabled = True + self.detected_leaks: List[MemoryLeak] = [] + + # Monitoring control + self._monitoring = False + self._monitor_thread: Optional[threading.Thread] = None + + # Initialize tracemalloc if enabled + if self.enable_tracemalloc and not tracemalloc.is_tracing(): + tracemalloc.start() + logger.info("Started tracemalloc for detailed memory tracking") + + logger.info(f"Memory monitor initialized (tracemalloc: {self.enable_tracemalloc})") + + def start_monitoring(self) -> None: + """Start memory monitoring""" + if self._monitoring: + logger.warning("Memory monitoring already running") + return + + self._monitoring = True + self._monitor_thread = threading.Thread( + target=self._monitoring_loop, + name="MemoryMonitor", + daemon=True + ) + self._monitor_thread.start() + logger.info("Started memory monitoring") + + def stop_monitoring(self) -> None: + """Stop memory monitoring""" + if not self._monitoring: + return + + self._monitoring = False + if self._monitor_thread: + self._monitor_thread.join(timeout=5.0) + logger.info("Stopped memory monitoring") + + def _monitoring_loop(self) -> None: + """Main monitoring loop""" + import time + + while self._monitoring: + try: + # Take memory snapshot + self._take_memory_snapshot() + + # Update object counts + self._update_object_counts() + + # Check for memory leaks + if self.leak_detection_enabled: + self._check_for_leaks() + + # Optimize garbage collection + if self.auto_gc_enabled: + self._optimize_gc() + + time.sleep(self.snapshot_interval) + + except Exception as e: + logger.error(f"Error in memory monitoring loop: {e}") + time.sleep(self.snapshot_interval) + + def _take_memory_snapshot(self) -> None: + """Take a memory usage snapshot""" + try: + import psutil + + # Get system memory info + memory = psutil.virtual_memory() + + # Get process memory info + process = psutil.Process() + process_memory = process.memory_info() + + # Get GC stats + gc_collections = {i: gc.get_count()[i] for i in range(3)} + gc_objects = len(gc.get_objects()) + + # Get tracemalloc stats if enabled + tracemalloc_current_mb = None + tracemalloc_peak_mb = None + + if self.enable_tracemalloc and tracemalloc.is_tracing(): + current, peak = tracemalloc.get_traced_memory() + tracemalloc_current_mb = current / (1024 * 1024) + tracemalloc_peak_mb = peak / (1024 * 1024) + + # Create snapshot + snapshot = MemorySnapshot( + timestamp=get_current_timestamp(), + total_memory_mb=memory.total / (1024 * 1024), + available_memory_mb=memory.available / (1024 * 1024), + process_memory_mb=process_memory.rss / (1024 * 1024), + gc_collections=gc_collections, + gc_objects=gc_objects, + tracemalloc_current_mb=tracemalloc_current_mb, + tracemalloc_peak_mb=tracemalloc_peak_mb + ) + + self.memory_snapshots.append(snapshot) + + # Update metrics + try: + from .metrics_collector import metrics_collector + metrics_collector.set_gauge('memory_total_mb', snapshot.total_memory_mb) + metrics_collector.set_gauge('memory_available_mb', snapshot.available_memory_mb) + metrics_collector.set_gauge('memory_process_mb', snapshot.process_memory_mb) + metrics_collector.set_gauge('memory_gc_objects', snapshot.gc_objects) + + if tracemalloc_current_mb is not None: + metrics_collector.set_gauge('memory_tracemalloc_current_mb', tracemalloc_current_mb) + metrics_collector.set_gauge('memory_tracemalloc_peak_mb', tracemalloc_peak_mb) + except ImportError: + pass # Metrics collector not available + + except Exception as e: + logger.error(f"Error taking memory snapshot: {e}") + + def _update_object_counts(self) -> None: + """Update object counts by type""" + try: + # Count objects by type + object_counts = defaultdict(int) + + for obj in gc.get_objects(): + obj_type = type(obj).__name__ + object_counts[obj_type] += 1 + + # Store counts with timestamp + timestamp = get_current_timestamp() + for obj_type, count in object_counts.items(): + self.object_counts[obj_type].append((timestamp, count)) + + # Update metrics for common types + try: + from .metrics_collector import metrics_collector + common_types = ['dict', 'list', 'tuple', 'str', 'function', 'type'] + for obj_type in common_types: + if obj_type in object_counts: + metrics_collector.set_gauge( + f'memory_objects_{obj_type}', + object_counts[obj_type] + ) + except ImportError: + pass # Metrics collector not available + + except Exception as e: + logger.error(f"Error updating object counts: {e}") + + def _check_for_leaks(self) -> None: + """Check for potential memory leaks""" + try: + if len(self.memory_snapshots) < 10: + return # Need more data + + # Check for consistent memory growth + recent_snapshots = list(self.memory_snapshots)[-10:] + memory_values = [s.process_memory_mb for s in recent_snapshots] + + # Simple linear regression to detect growth trend + if self._is_memory_growing(memory_values): + # Check object count growth + potential_leaks = self._analyze_object_growth() + + for leak in potential_leaks: + if leak not in self.detected_leaks: + self.detected_leaks.append(leak) + logger.warning(f"Potential memory leak detected: {leak.object_type}") + + # Record leak detection + try: + from .metrics_collector import metrics_collector + metrics_collector.increment_counter('memory_leaks_detected') + except ImportError: + pass + + except Exception as e: + logger.error(f"Error checking for leaks: {e}") + + def _is_memory_growing(self, memory_values: List[float], threshold: float = 5.0) -> bool: + """Check if memory is consistently growing""" + if len(memory_values) < 5: + return False + + # Check if memory increased by more than threshold MB + growth = memory_values[-1] - memory_values[0] + return growth > threshold + + def _analyze_object_growth(self) -> List[MemoryLeak]: + """Analyze object count growth to identify potential leaks""" + leaks = [] + + for obj_type, counts in self.object_counts.items(): + if len(counts) < 10: + continue + + # Get recent counts + recent_counts = list(counts)[-10:] + timestamps = [item[0] for item in recent_counts] + count_values = [item[1] for item in recent_counts] + + # Check for growth + if len(count_values) >= 2: + growth = count_values[-1] - count_values[0] + time_diff = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours + + if growth > 100 and time_diff > 0: # More than 100 objects growth + growth_rate = growth / time_diff + + # Determine severity + if growth_rate > 1000: + severity = 'high' + elif growth_rate > 100: + severity = 'medium' + else: + severity = 'low' + + leak = MemoryLeak( + object_type=obj_type, + count_increase=growth, + size_increase_mb=growth * 0.001, # Rough estimate + growth_rate_per_hour=growth_rate, + severity=severity + ) + leaks.append(leak) + + return leaks + + def _optimize_gc(self) -> None: + """Optimize garbage collection based on memory usage""" + try: + if not self.memory_snapshots: + return + + latest_snapshot = self.memory_snapshots[-1] + memory_usage_percent = ( + (latest_snapshot.total_memory_mb - latest_snapshot.available_memory_mb) / + latest_snapshot.total_memory_mb * 100 + ) + + # Adjust GC thresholds based on memory pressure + if memory_usage_percent > 85: + # High memory pressure - more aggressive GC + new_thresholds = (500, 10, 10) + if gc.get_threshold() != new_thresholds: + gc.set_threshold(*new_thresholds) + logger.info("Enabled aggressive garbage collection due to high memory usage") + + # Force collection + collected = gc.collect() + metrics_collector.increment_counter('memory_gc_forced') + logger.debug(f"Forced GC collected {collected} objects") + + elif memory_usage_percent < 50: + # Low memory pressure - less aggressive GC + new_thresholds = (1000, 20, 20) + if gc.get_threshold() != new_thresholds: + gc.set_threshold(*new_thresholds) + logger.info("Reduced garbage collection frequency due to low memory usage") + + # Update GC stats + self.gc_stats = { + 'threshold': gc.get_threshold(), + 'counts': gc.get_count(), + 'collections': gc.get_stats() + } + + except Exception as e: + logger.error(f"Error optimizing GC: {e}") + + def force_garbage_collection(self) -> Dict[str, int]: + """Force garbage collection and return statistics""" + try: + # Get counts before collection + before_counts = gc.get_count() + before_objects = len(gc.get_objects()) + + # Force collection for all generations + collected = [gc.collect(generation) for generation in range(3)] + total_collected = sum(collected) + + # Get counts after collection + after_counts = gc.get_count() + after_objects = len(gc.get_objects()) + + # Update metrics + try: + from .metrics_collector import metrics_collector + metrics_collector.increment_counter('memory_gc_manual') + metrics_collector.set_gauge('memory_gc_objects_collected', total_collected) + except ImportError: + pass + + result = { + 'total_collected': total_collected, + 'by_generation': collected, + 'objects_before': before_objects, + 'objects_after': after_objects, + 'objects_freed': before_objects - after_objects, + 'counts_before': before_counts, + 'counts_after': after_counts + } + + logger.info(f"Manual GC collected {total_collected} objects, freed {result['objects_freed']} objects") + return result + + except Exception as e: + logger.error(f"Error during forced garbage collection: {e}") + return {} + + def get_memory_usage_summary(self) -> Dict[str, Any]: + """Get current memory usage summary""" + if not self.memory_snapshots: + return {} + + latest = self.memory_snapshots[-1] + + # Calculate memory usage percentage + memory_usage_percent = ( + (latest.total_memory_mb - latest.available_memory_mb) / + latest.total_memory_mb * 100 + ) + + return { + 'timestamp': latest.timestamp.isoformat(), + 'total_memory_mb': latest.total_memory_mb, + 'available_memory_mb': latest.available_memory_mb, + 'used_memory_mb': latest.total_memory_mb - latest.available_memory_mb, + 'memory_usage_percent': memory_usage_percent, + 'process_memory_mb': latest.process_memory_mb, + 'gc_objects': latest.gc_objects, + 'gc_collections': latest.gc_collections, + 'tracemalloc_current_mb': latest.tracemalloc_current_mb, + 'tracemalloc_peak_mb': latest.tracemalloc_peak_mb + } + + def get_memory_trends(self, hours: int = 1) -> Dict[str, Any]: + """Get memory usage trends over specified time period""" + if not self.memory_snapshots: + return {} + + from datetime import timedelta + cutoff_time = get_current_timestamp() - timedelta(hours=hours) + + # Filter snapshots + recent_snapshots = [ + s for s in self.memory_snapshots + if s.timestamp >= cutoff_time + ] + + if len(recent_snapshots) < 2: + return {'trend': 'insufficient_data'} + + # Calculate trends + process_memory_values = [s.process_memory_mb for s in recent_snapshots] + gc_object_values = [s.gc_objects for s in recent_snapshots] + + return { + 'process_memory': { + 'start_mb': process_memory_values[0], + 'end_mb': process_memory_values[-1], + 'change_mb': process_memory_values[-1] - process_memory_values[0], + 'max_mb': max(process_memory_values), + 'min_mb': min(process_memory_values), + 'avg_mb': sum(process_memory_values) / len(process_memory_values) + }, + 'gc_objects': { + 'start_count': gc_object_values[0], + 'end_count': gc_object_values[-1], + 'change_count': gc_object_values[-1] - gc_object_values[0], + 'max_count': max(gc_object_values), + 'min_count': min(gc_object_values), + 'avg_count': sum(gc_object_values) / len(gc_object_values) + }, + 'sample_count': len(recent_snapshots), + 'time_period_hours': hours + } + + def get_top_memory_consumers(self, limit: int = 10) -> List[Dict[str, Any]]: + """Get top memory consuming object types""" + if not self.object_counts: + return [] + + # Get latest counts + latest_counts = {} + for obj_type, counts in self.object_counts.items(): + if counts: + latest_counts[obj_type] = counts[-1][1] # Get count from (timestamp, count) tuple + + # Sort by count + sorted_types = sorted( + latest_counts.items(), + key=lambda x: x[1], + reverse=True + ) + + return [ + { + 'object_type': obj_type, + 'count': count, + 'estimated_size_mb': count * 0.001 # Rough estimate + } + for obj_type, count in sorted_types[:limit] + ] + + def get_detected_leaks(self) -> List[Dict[str, Any]]: + """Get detected memory leaks""" + return [ + { + 'object_type': leak.object_type, + 'count_increase': leak.count_increase, + 'size_increase_mb': leak.size_increase_mb, + 'growth_rate_per_hour': leak.growth_rate_per_hour, + 'severity': leak.severity + } + for leak in self.detected_leaks + ] + + def get_tracemalloc_top(self, limit: int = 10) -> List[Dict[str, Any]]: + """Get top memory allocations from tracemalloc""" + if not self.enable_tracemalloc or not tracemalloc.is_tracing(): + return [] + + try: + snapshot = tracemalloc.take_snapshot() + top_stats = snapshot.statistics('lineno') + + return [ + { + 'filename': stat.traceback.format()[0], + 'size_mb': stat.size / (1024 * 1024), + 'count': stat.count + } + for stat in top_stats[:limit] + ] + + except Exception as e: + logger.error(f"Error getting tracemalloc top: {e}") + return [] + + def clear_leak_history(self) -> None: + """Clear detected leak history""" + self.detected_leaks.clear() + logger.info("Cleared memory leak history") + + def get_gc_stats(self) -> Dict[str, Any]: + """Get garbage collection statistics""" + return { + 'thresholds': gc.get_threshold(), + 'counts': gc.get_count(), + 'stats': gc.get_stats(), + 'auto_gc_enabled': self.auto_gc_enabled, + 'is_enabled': gc.isenabled() + } + + def set_gc_thresholds(self, gen0: int, gen1: int, gen2: int) -> None: + """Set garbage collection thresholds""" + gc.set_threshold(gen0, gen1, gen2) + logger.info(f"Set GC thresholds to ({gen0}, {gen1}, {gen2})") + + def enable_auto_gc_optimization(self, enabled: bool = True) -> None: + """Enable or disable automatic GC optimization""" + self.auto_gc_enabled = enabled + logger.info(f"Auto GC optimization {'enabled' if enabled else 'disabled'}") + + def enable_leak_detection(self, enabled: bool = True) -> None: + """Enable or disable memory leak detection""" + self.leak_detection_enabled = enabled + logger.info(f"Memory leak detection {'enabled' if enabled else 'disabled'}") + + def get_stats(self) -> Dict[str, Any]: + """Get memory monitor statistics""" + return { + 'monitoring': self._monitoring, + 'snapshot_interval': self.snapshot_interval, + 'snapshots_count': len(self.memory_snapshots), + 'object_types_tracked': len(self.object_counts), + 'detected_leaks': len(self.detected_leaks), + 'tracemalloc_enabled': self.enable_tracemalloc and tracemalloc.is_tracing(), + 'auto_gc_enabled': self.auto_gc_enabled, + 'leak_detection_enabled': self.leak_detection_enabled, + 'gc_thresholds': gc.get_threshold() + } + + +# Global memory monitor instance +memory_monitor = MemoryMonitor() \ No newline at end of file diff --git a/COBY/monitoring/metrics_collector.py b/COBY/monitoring/metrics_collector.py new file mode 100644 index 0000000..a028e81 --- /dev/null +++ b/COBY/monitoring/metrics_collector.py @@ -0,0 +1,395 @@ +""" +Comprehensive metrics collection for all system components. +""" + +import time +import psutil +import threading +from typing import Dict, List, Optional, Any, Callable +from collections import defaultdict, deque +from datetime import datetime, timezone +from dataclasses import dataclass, field + +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp + +logger = get_logger(__name__) + + +@dataclass +class MetricPoint: + """Individual metric data point""" + name: str + value: float + timestamp: datetime + labels: Dict[str, str] = field(default_factory=dict) + + def to_prometheus_format(self) -> str: + """Convert to Prometheus format""" + labels_str = "" + if self.labels: + label_pairs = [f'{k}="{v}"' for k, v in self.labels.items()] + labels_str = "{" + ",".join(label_pairs) + "}" + + return f"{self.name}{labels_str} {self.value} {int(self.timestamp.timestamp() * 1000)}" + + +@dataclass +class SystemMetrics: + """System-level metrics""" + cpu_usage: float + memory_usage: float + memory_available: float + disk_usage: float + network_bytes_sent: int + network_bytes_recv: int + active_connections: int + timestamp: datetime + + +class MetricsCollector: + """ + Collects and manages performance metrics from all system components. + + Provides Prometheus-compatible metrics and real-time monitoring data. + """ + + def __init__(self, collection_interval: float = 1.0, max_history: int = 10000): + """ + Initialize metrics collector. + + Args: + collection_interval: How often to collect system metrics (seconds) + max_history: Maximum number of metric points to keep in memory + """ + self.collection_interval = collection_interval + self.max_history = max_history + + # Metric storage + self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history)) + self.counters: Dict[str, float] = defaultdict(float) + self.gauges: Dict[str, float] = defaultdict(float) + self.histograms: Dict[str, List[float]] = defaultdict(list) + + # System metrics + self.system_metrics_history: deque = deque(maxlen=max_history) + + # Collection control + self._collecting = False + self._collection_thread: Optional[threading.Thread] = None + self._lock = threading.RLock() + + # Callbacks for custom metrics + self.metric_callbacks: List[Callable[[], Dict[str, float]]] = [] + + logger.info(f"Metrics collector initialized with {collection_interval}s interval") + + def start_collection(self) -> None: + """Start automatic metrics collection""" + if self._collecting: + logger.warning("Metrics collection already running") + return + + self._collecting = True + self._collection_thread = threading.Thread( + target=self._collection_loop, + name="MetricsCollector", + daemon=True + ) + self._collection_thread.start() + logger.info("Started metrics collection") + + def stop_collection(self) -> None: + """Stop automatic metrics collection""" + if not self._collecting: + return + + self._collecting = False + if self._collection_thread: + self._collection_thread.join(timeout=5.0) + logger.info("Stopped metrics collection") + + def _collection_loop(self) -> None: + """Main collection loop""" + while self._collecting: + try: + # Collect system metrics + self._collect_system_metrics() + + # Collect custom metrics from callbacks + self._collect_custom_metrics() + + time.sleep(self.collection_interval) + + except Exception as e: + logger.error(f"Error in metrics collection loop: {e}") + time.sleep(self.collection_interval) + + def _collect_system_metrics(self) -> None: + """Collect system-level metrics""" + try: + # CPU usage + cpu_percent = psutil.cpu_percent(interval=None) + + # Memory usage + memory = psutil.virtual_memory() + + # Disk usage (root partition) + disk = psutil.disk_usage('/') + + # Network stats + network = psutil.net_io_counters() + + # Active connections + connections = len(psutil.net_connections()) + + # Create system metrics object + sys_metrics = SystemMetrics( + cpu_usage=cpu_percent, + memory_usage=memory.percent, + memory_available=memory.available / (1024**3), # GB + disk_usage=disk.percent, + network_bytes_sent=network.bytes_sent, + network_bytes_recv=network.bytes_recv, + active_connections=connections, + timestamp=get_current_timestamp() + ) + + with self._lock: + self.system_metrics_history.append(sys_metrics) + + # Update gauges + self.gauges['system_cpu_usage'] = cpu_percent + self.gauges['system_memory_usage'] = memory.percent + self.gauges['system_memory_available_gb'] = memory.available / (1024**3) + self.gauges['system_disk_usage'] = disk.percent + self.gauges['system_active_connections'] = connections + + # Update counters (cumulative) + self.counters['system_network_bytes_sent'] = network.bytes_sent + self.counters['system_network_bytes_recv'] = network.bytes_recv + + except Exception as e: + logger.error(f"Error collecting system metrics: {e}") + + def _collect_custom_metrics(self) -> None: + """Collect metrics from registered callbacks""" + for callback in self.metric_callbacks: + try: + custom_metrics = callback() + if isinstance(custom_metrics, dict): + with self._lock: + for name, value in custom_metrics.items(): + self.record_gauge(name, value) + except Exception as e: + logger.error(f"Error collecting custom metrics: {e}") + + def record_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None) -> None: + """Record a counter metric (cumulative)""" + with self._lock: + self.counters[name] += value + + # Store metric point + point = MetricPoint( + name=name, + value=self.counters[name], + timestamp=get_current_timestamp(), + labels=labels or {} + ) + self.metrics[name].append(point) + + def record_gauge(self, name: str, value: float, labels: Dict[str, str] = None) -> None: + """Record a gauge metric (current value)""" + with self._lock: + self.gauges[name] = value + + # Store metric point + point = MetricPoint( + name=name, + value=value, + timestamp=get_current_timestamp(), + labels=labels or {} + ) + self.metrics[name].append(point) + + def record_histogram(self, name: str, value: float, labels: Dict[str, str] = None) -> None: + """Record a histogram metric (for latency, sizes, etc.)""" + with self._lock: + self.histograms[name].append(value) + + # Keep only recent values + if len(self.histograms[name]) > 1000: + self.histograms[name] = self.histograms[name][-1000:] + + # Store metric point + point = MetricPoint( + name=name, + value=value, + timestamp=get_current_timestamp(), + labels=labels or {} + ) + self.metrics[name].append(point) + + def increment_counter(self, name: str, labels: Dict[str, str] = None) -> None: + """Increment a counter by 1""" + self.record_counter(name, 1.0, labels) + + def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None) -> None: + """Set a gauge value""" + self.record_gauge(name, value, labels) + + def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None) -> None: + """Observe a value in a histogram""" + self.record_histogram(name, value, labels) + + def get_current_metrics(self) -> Dict[str, Any]: + """Get current metric values""" + with self._lock: + return { + 'counters': dict(self.counters), + 'gauges': dict(self.gauges), + 'histograms': { + name: { + 'count': len(values), + 'sum': sum(values), + 'avg': sum(values) / len(values) if values else 0, + 'min': min(values) if values else 0, + 'max': max(values) if values else 0, + 'p50': self._percentile(values, 50) if values else 0, + 'p95': self._percentile(values, 95) if values else 0, + 'p99': self._percentile(values, 99) if values else 0 + } + for name, values in self.histograms.items() + }, + 'system': self.get_latest_system_metrics() + } + + def get_latest_system_metrics(self) -> Optional[Dict[str, Any]]: + """Get the latest system metrics""" + with self._lock: + if not self.system_metrics_history: + return None + + latest = self.system_metrics_history[-1] + return { + 'cpu_usage': latest.cpu_usage, + 'memory_usage': latest.memory_usage, + 'memory_available_gb': latest.memory_available, + 'disk_usage': latest.disk_usage, + 'network_bytes_sent': latest.network_bytes_sent, + 'network_bytes_recv': latest.network_bytes_recv, + 'active_connections': latest.active_connections, + 'timestamp': latest.timestamp.isoformat() + } + + def get_metric_history(self, name: str, limit: int = 100) -> List[Dict[str, Any]]: + """Get historical values for a specific metric""" + with self._lock: + if name not in self.metrics: + return [] + + points = list(self.metrics[name])[-limit:] + return [ + { + 'value': point.value, + 'timestamp': point.timestamp.isoformat(), + 'labels': point.labels + } + for point in points + ] + + def get_prometheus_metrics(self) -> str: + """Export metrics in Prometheus format""" + lines = [] + + with self._lock: + # Export counters + for name, value in self.counters.items(): + lines.append(f"# TYPE {name} counter") + lines.append(f"{name} {value}") + + # Export gauges + for name, value in self.gauges.items(): + lines.append(f"# TYPE {name} gauge") + lines.append(f"{name} {value}") + + # Export histograms + for name, values in self.histograms.items(): + if values: + lines.append(f"# TYPE {name} histogram") + lines.append(f"{name}_count {len(values)}") + lines.append(f"{name}_sum {sum(values)}") + + # Add percentiles + for percentile in [50, 95, 99]: + p_value = self._percentile(values, percentile) + lines.append(f"{name}_percentile{{quantile=\"0.{percentile:02d}\"}} {p_value}") + + return "\n".join(lines) + + def register_callback(self, callback: Callable[[], Dict[str, float]]) -> None: + """Register a callback for custom metrics collection""" + self.metric_callbacks.append(callback) + logger.info(f"Registered metrics callback: {callback.__name__}") + + def get_performance_summary(self) -> Dict[str, Any]: + """Get a performance summary""" + current_metrics = self.get_current_metrics() + + # Calculate rates and trends + summary = { + 'timestamp': get_current_timestamp().isoformat(), + 'system': current_metrics.get('system', {}), + 'counters': current_metrics.get('counters', {}), + 'gauges': current_metrics.get('gauges', {}), + 'performance_indicators': {} + } + + # Add performance indicators + histograms = current_metrics.get('histograms', {}) + for name, stats in histograms.items(): + if 'latency' in name.lower(): + summary['performance_indicators'][f"{name}_avg_ms"] = stats['avg'] + summary['performance_indicators'][f"{name}_p95_ms"] = stats['p95'] + + return summary + + def _percentile(self, values: List[float], percentile: int) -> float: + """Calculate percentile of values""" + if not values: + return 0.0 + + sorted_values = sorted(values) + index = int((percentile / 100.0) * len(sorted_values)) + index = min(index, len(sorted_values) - 1) + return sorted_values[index] + + def reset_metrics(self) -> None: + """Reset all metrics (useful for testing)""" + with self._lock: + self.metrics.clear() + self.counters.clear() + self.gauges.clear() + self.histograms.clear() + self.system_metrics_history.clear() + + logger.info("All metrics reset") + + def get_stats(self) -> Dict[str, Any]: + """Get collector statistics""" + with self._lock: + return { + 'collecting': self._collecting, + 'collection_interval': self.collection_interval, + 'max_history': self.max_history, + 'total_metrics': len(self.metrics), + 'total_counters': len(self.counters), + 'total_gauges': len(self.gauges), + 'total_histograms': len(self.histograms), + 'system_metrics_count': len(self.system_metrics_history), + 'registered_callbacks': len(self.metric_callbacks) + } + + +# Global metrics collector instance +metrics_collector = MetricsCollector() \ No newline at end of file diff --git a/COBY/monitoring/performance_monitor.py b/COBY/monitoring/performance_monitor.py new file mode 100644 index 0000000..47b1c14 --- /dev/null +++ b/COBY/monitoring/performance_monitor.py @@ -0,0 +1,556 @@ +""" +Performance monitoring dashboard and real-time performance tracking. +""" + +import time +import asyncio +import threading +from typing import Dict, List, Optional, Any, Callable +from collections import defaultdict, deque +from datetime import datetime, timezone, timedelta +from dataclasses import dataclass, field + +from ..utils.logging import get_logger +from ..utils.timing import get_current_timestamp +from .metrics_collector import MetricsCollector + +logger = get_logger(__name__) + + +@dataclass +class PerformanceAlert: + """Performance alert definition""" + name: str + metric_name: str + threshold: float + comparison: str # 'gt', 'lt', 'eq' + duration: int # seconds + message: str + severity: str = 'warning' # 'info', 'warning', 'critical' + triggered_at: Optional[datetime] = None + resolved_at: Optional[datetime] = None + + def is_triggered(self, value: float) -> bool: + """Check if alert should be triggered""" + if self.comparison == 'gt': + return value > self.threshold + elif self.comparison == 'lt': + return value < self.threshold + elif self.comparison == 'eq': + return abs(value - self.threshold) < 0.001 + return False + + +@dataclass +class PerformanceThresholds: + """Performance threshold configuration""" + max_cpu_usage: float = 80.0 # % + max_memory_usage: float = 85.0 # % + min_memory_available: float = 1.0 # GB + max_latency_ms: float = 100.0 # milliseconds + max_error_rate: float = 5.0 # % + min_throughput: float = 100.0 # operations/second + + +class PerformanceMonitor: + """ + Real-time performance monitoring with alerting and dashboard data. + + Monitors system performance, tracks KPIs, and provides alerts. + """ + + def __init__(self, metrics_collector: MetricsCollector = None): + """ + Initialize performance monitor. + + Args: + metrics_collector: Metrics collector instance + """ + if metrics_collector is None: + from .metrics_collector import metrics_collector as default_collector + self.metrics_collector = default_collector + else: + self.metrics_collector = metrics_collector + self.thresholds = PerformanceThresholds() + + # Alert management + self.alerts: Dict[str, PerformanceAlert] = {} + self.active_alerts: Dict[str, PerformanceAlert] = {} + self.alert_history: deque = deque(maxlen=1000) + + # Performance tracking + self.performance_history: deque = deque(maxlen=10000) + self.kpi_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000)) + + # Monitoring control + self._monitoring = False + self._monitor_thread: Optional[threading.Thread] = None + self._monitor_interval = 5.0 # seconds + + # Alert callbacks + self.alert_callbacks: List[Callable[[PerformanceAlert], None]] = [] + + # Initialize default alerts + self._setup_default_alerts() + + logger.info("Performance monitor initialized") + + def _setup_default_alerts(self) -> None: + """Setup default performance alerts""" + default_alerts = [ + PerformanceAlert( + name="high_cpu_usage", + metric_name="system_cpu_usage", + threshold=self.thresholds.max_cpu_usage, + comparison="gt", + duration=30, + message="CPU usage is above {threshold}%", + severity="warning" + ), + PerformanceAlert( + name="high_memory_usage", + metric_name="system_memory_usage", + threshold=self.thresholds.max_memory_usage, + comparison="gt", + duration=30, + message="Memory usage is above {threshold}%", + severity="warning" + ), + PerformanceAlert( + name="low_memory_available", + metric_name="system_memory_available_gb", + threshold=self.thresholds.min_memory_available, + comparison="lt", + duration=60, + message="Available memory is below {threshold}GB", + severity="critical" + ), + PerformanceAlert( + name="high_latency", + metric_name="processing_latency_ms", + threshold=self.thresholds.max_latency_ms, + comparison="gt", + duration=60, + message="Processing latency is above {threshold}ms", + severity="warning" + ) + ] + + for alert in default_alerts: + self.add_alert(alert) + + def start_monitoring(self) -> None: + """Start performance monitoring""" + if self._monitoring: + logger.warning("Performance monitoring already running") + return + + self._monitoring = True + self._monitor_thread = threading.Thread( + target=self._monitoring_loop, + name="PerformanceMonitor", + daemon=True + ) + self._monitor_thread.start() + logger.info("Started performance monitoring") + + def stop_monitoring(self) -> None: + """Stop performance monitoring""" + if not self._monitoring: + return + + self._monitoring = False + if self._monitor_thread: + self._monitor_thread.join(timeout=5.0) + logger.info("Stopped performance monitoring") + + def _monitoring_loop(self) -> None: + """Main monitoring loop""" + while self._monitoring: + try: + # Collect current performance data + self._collect_performance_data() + + # Check alerts + self._check_alerts() + + # Update KPIs + self._update_kpis() + + time.sleep(self._monitor_interval) + + except Exception as e: + logger.error(f"Error in performance monitoring loop: {e}") + time.sleep(self._monitor_interval) + + def _collect_performance_data(self) -> None: + """Collect current performance data""" + try: + if self.metrics_collector is None: + return + + current_metrics = self.metrics_collector.get_current_metrics() + + if current_metrics is None: + return + + # Create performance snapshot + performance_data = { + 'timestamp': get_current_timestamp(), + 'system': current_metrics.get('system', {}), + 'counters': current_metrics.get('counters', {}), + 'gauges': current_metrics.get('gauges', {}), + 'histograms': current_metrics.get('histograms', {}) + } + + self.performance_history.append(performance_data) + + except Exception as e: + logger.error(f"Error collecting performance data: {e}") + + def _check_alerts(self) -> None: + """Check all alerts against current metrics""" + if self.metrics_collector is None: + return + + current_metrics = self.metrics_collector.get_current_metrics() + if current_metrics is None: + return + + current_time = get_current_timestamp() + + for alert_name, alert in self.alerts.items(): + try: + # Get metric value + metric_value = self._get_metric_value(alert.metric_name, current_metrics) + if metric_value is None: + continue + + # Check if alert should be triggered + should_trigger = alert.is_triggered(metric_value) + + if should_trigger and alert_name not in self.active_alerts: + # Trigger alert + alert.triggered_at = current_time + self.active_alerts[alert_name] = alert + self.alert_history.append(alert) + + # Format message + message = alert.message.format( + threshold=alert.threshold, + value=metric_value + ) + + logger.warning(f"Performance alert triggered: {alert.name} - {message}") + + # Notify callbacks + for callback in self.alert_callbacks: + try: + callback(alert) + except Exception as e: + logger.error(f"Error in alert callback: {e}") + + elif not should_trigger and alert_name in self.active_alerts: + # Resolve alert + resolved_alert = self.active_alerts.pop(alert_name) + resolved_alert.resolved_at = current_time + + logger.info(f"Performance alert resolved: {alert.name}") + + except Exception as e: + logger.error(f"Error checking alert {alert_name}: {e}") + + def _get_metric_value(self, metric_name: str, metrics: Dict[str, Any]) -> Optional[float]: + """Get metric value from metrics data""" + if not metrics: + return None + + # Check gauges first + gauges = metrics.get('gauges', {}) + if gauges and metric_name in gauges: + return gauges[metric_name] + + # Check counters + counters = metrics.get('counters', {}) + if counters and metric_name in counters: + return counters[metric_name] + + # Check histograms (use average) + histograms = metrics.get('histograms', {}) + if histograms and metric_name in histograms: + hist_data = histograms[metric_name] + if hist_data and isinstance(hist_data, dict): + return hist_data.get('avg', 0) + + # Check system metrics + system_metrics = metrics.get('system', {}) + if system_metrics and metric_name in system_metrics: + return system_metrics[metric_name] + + return None + + def _update_kpis(self) -> None: + """Update key performance indicators""" + try: + if self.metrics_collector is None: + return + + current_metrics = self.metrics_collector.get_current_metrics() + if current_metrics is None: + return + + timestamp = get_current_timestamp() + + # Calculate throughput (operations per second) + throughput = self._calculate_throughput() + self.kpi_history['throughput_ops_per_sec'].append({ + 'value': throughput, + 'timestamp': timestamp + }) + + # Calculate error rate + error_rate = self._calculate_error_rate() + self.kpi_history['error_rate_percent'].append({ + 'value': error_rate, + 'timestamp': timestamp + }) + + # Calculate average latency + avg_latency = self._calculate_average_latency() + self.kpi_history['avg_latency_ms'].append({ + 'value': avg_latency, + 'timestamp': timestamp + }) + + # Update metrics collector with KPIs + self.metrics_collector.set_gauge('kpi_throughput_ops_per_sec', throughput) + self.metrics_collector.set_gauge('kpi_error_rate_percent', error_rate) + self.metrics_collector.set_gauge('kpi_avg_latency_ms', avg_latency) + + except Exception as e: + logger.error(f"Error updating KPIs: {e}") + + def _calculate_throughput(self) -> float: + """Calculate operations per second throughput""" + try: + current_metrics = self.metrics_collector.get_current_metrics() + counters = current_metrics.get('counters', {}) + + # Sum up relevant operation counters + total_ops = 0 + for name, value in counters.items(): + if any(keyword in name.lower() for keyword in ['processed', 'handled', 'completed']): + total_ops += value + + # Calculate rate (simple approximation) + if len(self.performance_history) >= 2: + prev_data = self.performance_history[-2] + current_data = self.performance_history[-1] + + time_diff = (current_data['timestamp'] - prev_data['timestamp']).total_seconds() + if time_diff > 0: + prev_ops = sum( + value for name, value in prev_data.get('counters', {}).items() + if any(keyword in name.lower() for keyword in ['processed', 'handled', 'completed']) + ) + return (total_ops - prev_ops) / time_diff + + return 0.0 + + except Exception as e: + logger.error(f"Error calculating throughput: {e}") + return 0.0 + + def _calculate_error_rate(self) -> float: + """Calculate error rate percentage""" + try: + current_metrics = self.metrics_collector.get_current_metrics() + counters = current_metrics.get('counters', {}) + + # Count errors and total operations + total_errors = sum( + value for name, value in counters.items() + if 'error' in name.lower() or 'failed' in name.lower() + ) + + total_operations = sum( + value for name, value in counters.items() + if any(keyword in name.lower() for keyword in ['processed', 'handled', 'completed', 'total']) + ) + + if total_operations > 0: + return (total_errors / total_operations) * 100 + + return 0.0 + + except Exception as e: + logger.error(f"Error calculating error rate: {e}") + return 0.0 + + def _calculate_average_latency(self) -> float: + """Calculate average latency across all operations""" + try: + current_metrics = self.metrics_collector.get_current_metrics() + histograms = current_metrics.get('histograms', {}) + + # Find latency histograms + latency_values = [] + for name, stats in histograms.items(): + if 'latency' in name.lower(): + latency_values.append(stats.get('avg', 0)) + + if latency_values: + return sum(latency_values) / len(latency_values) + + return 0.0 + + except Exception as e: + logger.error(f"Error calculating average latency: {e}") + return 0.0 + + def add_alert(self, alert: PerformanceAlert) -> None: + """Add a performance alert""" + self.alerts[alert.name] = alert + logger.info(f"Added performance alert: {alert.name}") + + def remove_alert(self, alert_name: str) -> None: + """Remove a performance alert""" + if alert_name in self.alerts: + del self.alerts[alert_name] + # Also remove from active alerts if present + self.active_alerts.pop(alert_name, None) + logger.info(f"Removed performance alert: {alert_name}") + + def get_active_alerts(self) -> List[PerformanceAlert]: + """Get currently active alerts""" + return list(self.active_alerts.values()) + + def get_alert_history(self, limit: int = 100) -> List[PerformanceAlert]: + """Get alert history""" + return list(self.alert_history)[-limit:] + + def get_performance_dashboard_data(self) -> Dict[str, Any]: + """Get data for performance dashboard""" + current_metrics = {} + if self.metrics_collector: + current_metrics = self.metrics_collector.get_current_metrics() or {} + + system_metrics = current_metrics.get('system', {}) or {} + + return { + 'timestamp': get_current_timestamp().isoformat(), + 'system_metrics': system_metrics, + 'kpis': { + name: list(history)[-10:] if history else [] # Last 10 points + for name, history in self.kpi_history.items() + }, + 'active_alerts': [ + { + 'name': alert.name, + 'message': alert.message, + 'severity': alert.severity.value if hasattr(alert.severity, 'value') else str(alert.severity), + 'triggered_at': alert.triggered_at.isoformat() if alert.triggered_at else None + } + for alert in self.active_alerts.values() + ], + 'performance_summary': { + 'cpu_usage': system_metrics.get('cpu_usage', 0) if system_metrics else 0, + 'memory_usage': system_metrics.get('memory_usage', 0) if system_metrics else 0, + 'active_connections': system_metrics.get('active_connections', 0) if system_metrics else 0, + 'throughput': self.kpi_history['throughput_ops_per_sec'][-1]['value'] if self.kpi_history['throughput_ops_per_sec'] else 0, + 'error_rate': self.kpi_history['error_rate_percent'][-1]['value'] if self.kpi_history['error_rate_percent'] else 0, + 'avg_latency': self.kpi_history['avg_latency_ms'][-1]['value'] if self.kpi_history['avg_latency_ms'] else 0 + } + } + + def register_alert_callback(self, callback: Callable[[PerformanceAlert], None]) -> None: + """Register callback for alert notifications""" + self.alert_callbacks.append(callback) + logger.info(f"Registered alert callback: {callback.__name__}") + + def update_thresholds(self, **kwargs) -> None: + """Update performance thresholds""" + for key, value in kwargs.items(): + if hasattr(self.thresholds, key): + setattr(self.thresholds, key, value) + logger.info(f"Updated threshold {key} to {value}") + + def get_performance_trends(self, hours: int = 24) -> Dict[str, Any]: + """Get performance trends over specified time period""" + cutoff_time = get_current_timestamp() - timedelta(hours=hours) + + # Filter performance history + recent_data = [ + data for data in self.performance_history + if data and data.get('timestamp') and data['timestamp'] >= cutoff_time + ] + + if not recent_data: + return {} + + # Calculate trends + trends = {} + + # CPU usage trend + cpu_values = [] + for data in recent_data: + system_data = data.get('system', {}) + if system_data: + cpu_values.append(system_data.get('cpu_usage', 0)) + + if cpu_values: + trends['cpu_usage'] = { + 'current': cpu_values[-1], + 'average': sum(cpu_values) / len(cpu_values), + 'max': max(cpu_values), + 'trend': 'increasing' if len(cpu_values) > 1 and cpu_values[-1] > cpu_values[0] else 'stable' + } + + # Memory usage trend + memory_values = [] + for data in recent_data: + system_data = data.get('system', {}) + if system_data: + memory_values.append(system_data.get('memory_usage', 0)) + + if memory_values: + trends['memory_usage'] = { + 'current': memory_values[-1], + 'average': sum(memory_values) / len(memory_values), + 'max': max(memory_values), + 'trend': 'increasing' if len(memory_values) > 1 and memory_values[-1] > memory_values[0] else 'stable' + } + + return trends + + def get_stats(self) -> Dict[str, Any]: + """Get performance monitor statistics""" + return { + 'monitoring': self._monitoring, + 'monitor_interval': self._monitor_interval, + 'total_alerts': len(self.alerts), + 'active_alerts': len(self.active_alerts), + 'alert_history_count': len(self.alert_history), + 'performance_history_count': len(self.performance_history), + 'kpi_metrics': list(self.kpi_history.keys()), + 'registered_callbacks': len(self.alert_callbacks), + 'thresholds': { + 'max_cpu_usage': self.thresholds.max_cpu_usage, + 'max_memory_usage': self.thresholds.max_memory_usage, + 'min_memory_available': self.thresholds.min_memory_available, + 'max_latency_ms': self.thresholds.max_latency_ms, + 'max_error_rate': self.thresholds.max_error_rate, + 'min_throughput': self.thresholds.min_throughput + } + } + + +# Global performance monitor instance (initialized lazily) +performance_monitor = None + +def get_performance_monitor(): + """Get or create global performance monitor instance""" + global performance_monitor + if performance_monitor is None: + performance_monitor = PerformanceMonitor() + return performance_monitor \ No newline at end of file