""" AMD Strix Halo NPU Capabilities and Monitoring Provides detailed information about NPU specifications, memory usage, and saturation monitoring """ import os import time import logging import subprocess import psutil from typing import Dict, Any, List, Optional, Tuple import numpy as np logger = logging.getLogger(__name__) class NPUCapabilities: """AMD Strix Halo NPU capabilities and specifications""" # NPU Specifications (based on research) SPECS = { 'compute_performance': 50, # TOPS (Tera Operations Per Second) 'architecture': 'XDNA', 'memory_type': 'Unified Memory Architecture', 'max_system_memory': 128, # GB 'memory_bandwidth': 'High-bandwidth unified memory', 'compute_units': '2D array of compute and memory tiles', 'precision_support': ['FP16', 'INT8', 'INT4'], 'max_model_size': 'Limited by available system memory', 'concurrent_models': 'Multiple (memory dependent)', 'latency_target': '< 1ms for small models', 'power_efficiency': 'Optimized for inference workloads' } @classmethod def get_specifications(cls) -> Dict[str, Any]: """Get NPU specifications""" return cls.SPECS.copy() @classmethod def estimate_model_capacity(cls, model_params: int, precision: str = 'FP16') -> Dict[str, Any]: """Estimate how many parameters the NPU can handle""" # Memory requirements per parameter (bytes) memory_per_param = { 'FP32': 4, 'FP16': 2, 'INT8': 1, 'INT4': 0.5 } # Get available system memory total_memory_gb = psutil.virtual_memory().total / (1024**3) # Estimate memory needed for model model_memory_gb = (model_params * memory_per_param.get(precision, 2)) / (1024**3) # Reserve memory for system and other processes available_memory_gb = total_memory_gb * 0.7 # Use 70% of total memory # Calculate capacity max_params = int((available_memory_gb * 1024**3) / memory_per_param.get(precision, 2)) return { 'model_parameters': model_params, 'precision': precision, 'model_memory_gb': model_memory_gb, 'total_system_memory_gb': total_memory_gb, 'available_memory_gb': available_memory_gb, 'max_parameters_supported': max_params, 'memory_utilization_percent': (model_memory_gb / available_memory_gb) * 100, 'can_fit_model': model_memory_gb <= available_memory_gb } class NPUMonitor: """Monitor NPU utilization and saturation""" def __init__(self): self.npu_available = self._check_npu_availability() self.monitoring_data = [] self.start_time = time.time() def _check_npu_availability(self) -> bool: """Check if NPU is available""" try: # Check for NPU devices if os.path.exists('/dev/amdxdna'): return True # Check for NPU devices in /dev result = subprocess.run(['ls', '/dev/amdxdna*'], capture_output=True, text=True, timeout=5) return result.returncode == 0 and result.stdout.strip() except Exception: return False def get_system_memory_info(self) -> Dict[str, Any]: """Get detailed system memory information""" memory = psutil.virtual_memory() swap = psutil.swap_memory() return { 'total_gb': memory.total / (1024**3), 'available_gb': memory.available / (1024**3), 'used_gb': memory.used / (1024**3), 'free_gb': memory.free / (1024**3), 'usage_percent': memory.percent, 'swap_total_gb': swap.total / (1024**3), 'swap_used_gb': swap.used / (1024**3), 'swap_percent': swap.percent } def get_npu_device_info(self) -> Dict[str, Any]: """Get NPU device information""" if not self.npu_available: return {'available': False} info = {'available': True} try: # Check NPU devices result = subprocess.run(['ls', '/dev/amdxdna*'], capture_output=True, text=True, timeout=5) if result.returncode == 0: info['devices'] = result.stdout.strip().split('\n') # Check kernel version result = subprocess.run(['uname', '-r'], capture_output=True, text=True, timeout=5) if result.returncode == 0: info['kernel_version'] = result.stdout.strip() # Check for NPU-specific files npu_files = [ '/sys/class/amdxdna', '/proc/amdxdna', '/sys/devices/platform/amdxdna' ] for file_path in npu_files: if os.path.exists(file_path): info['sysfs_path'] = file_path break except Exception as e: info['error'] = str(e) return info def monitor_inference_performance(self, inference_times: List[float]) -> Dict[str, Any]: """Monitor inference performance and detect saturation""" if not inference_times: return {'error': 'No inference times provided'} inference_times = np.array(inference_times) # Calculate performance metrics avg_latency = np.mean(inference_times) min_latency = np.min(inference_times) max_latency = np.max(inference_times) std_latency = np.std(inference_times) # Detect potential saturation latency_variance = std_latency / avg_latency if avg_latency > 0 else 0 # Saturation indicators saturation_indicators = { 'high_variance': latency_variance > 0.3, # High variance indicates instability 'increasing_latency': self._detect_trend(inference_times), 'latency_spikes': max_latency > avg_latency * 2, # Spikes indicate saturation 'average_latency_ms': avg_latency, 'latency_variance': latency_variance } # Performance assessment performance_assessment = self._assess_performance(avg_latency, latency_variance) return { 'inference_times_ms': inference_times.tolist(), 'avg_latency_ms': avg_latency, 'min_latency_ms': min_latency, 'max_latency_ms': max_latency, 'std_latency_ms': std_latency, 'latency_variance': latency_variance, 'saturation_indicators': saturation_indicators, 'performance_assessment': performance_assessment, 'samples': len(inference_times) } def _detect_trend(self, times: np.ndarray) -> bool: """Detect if latency is increasing over time""" if len(times) < 10: return False # Simple linear trend detection x = np.arange(len(times)) slope = np.polyfit(x, times, 1)[0] return slope > 0.1 # Increasing trend def _assess_performance(self, avg_latency: float, variance: float) -> str: """Assess NPU performance""" if avg_latency < 1.0 and variance < 0.1: return "Excellent" elif avg_latency < 5.0 and variance < 0.2: return "Good" elif avg_latency < 10.0 and variance < 0.3: return "Fair" else: return "Poor" def get_npu_utilization(self) -> Dict[str, Any]: """Get NPU utilization metrics""" if not self.npu_available: return {'available': False, 'error': 'NPU not available'} # Get system metrics memory_info = self.get_system_memory_info() device_info = self.get_npu_device_info() # Estimate NPU utilization based on system metrics # This is a simplified approach - real NPU utilization would require specific drivers utilization = { 'available': True, 'memory_usage_percent': memory_info['usage_percent'], 'memory_available_gb': memory_info['available_gb'], 'device_info': device_info, 'estimated_load': 'Unknown', # Would need NPU-specific monitoring 'timestamp': time.time() } return utilization def benchmark_npu_capacity(self, model_sizes: List[int]) -> Dict[str, Any]: """Benchmark NPU capacity with different model sizes""" if not self.npu_available: return {'available': False} results = {} memory_info = self.get_system_memory_info() for model_size in model_sizes: # Estimate memory requirements capacity_info = NPUCapabilities.estimate_model_capacity(model_size) results[f'model_{model_size}M'] = { 'parameters_millions': model_size, 'estimated_memory_gb': capacity_info['model_memory_gb'], 'can_fit': capacity_info['can_fit_model'], 'memory_utilization_percent': capacity_info['memory_utilization_percent'] } return { 'available': True, 'system_memory_gb': memory_info['total_gb'], 'available_memory_gb': memory_info['available_gb'], 'model_capacity_results': results, 'recommendations': self._generate_capacity_recommendations(results) } def _generate_capacity_recommendations(self, results: Dict[str, Any]) -> List[str]: """Generate capacity recommendations""" recommendations = [] for model_name, result in results.items(): if not result['can_fit']: recommendations.append(f"Model {model_name} may not fit in available memory") elif result['memory_utilization_percent'] > 80: recommendations.append(f"Model {model_name} uses >80% of available memory") if not recommendations: recommendations.append("All tested models should fit comfortably in available memory") return recommendations class NPUPerformanceProfiler: """Profile NPU performance for specific models""" def __init__(self): self.monitor = NPUMonitor() self.profiling_data = {} def profile_model(self, model_name: str, input_shape: tuple, iterations: int = 100) -> Dict[str, Any]: """Profile a specific model's performance""" if not self.monitor.npu_available: return {'error': 'NPU not available'} # This would integrate with actual model inference # For now, simulate performance data # Simulate inference times (would be real measurements) simulated_times = np.random.normal(2.5, 0.5, iterations).tolist() # Monitor performance performance_data = self.monitor.monitor_inference_performance(simulated_times) # Calculate throughput throughput = 1000 / np.mean(simulated_times) # inferences per second # Estimate memory usage input_size = np.prod(input_shape) * 4 # Assume FP32 estimated_memory_mb = input_size / (1024**2) profile_result = { 'model_name': model_name, 'input_shape': input_shape, 'iterations': iterations, 'performance': performance_data, 'throughput_ips': throughput, 'estimated_memory_mb': estimated_memory_mb, 'npu_utilization': self.monitor.get_npu_utilization(), 'timestamp': time.time() } self.profiling_data[model_name] = profile_result return profile_result def get_profiling_summary(self) -> Dict[str, Any]: """Get summary of all profiled models""" if not self.profiling_data: return {'error': 'No profiling data available'} summary = { 'total_models': len(self.profiling_data), 'models': {}, 'overall_performance': 'Unknown' } for model_name, data in self.profiling_data.items(): summary['models'][model_name] = { 'avg_latency_ms': data['performance']['avg_latency_ms'], 'throughput_ips': data['throughput_ips'], 'performance_assessment': data['performance']['performance_assessment'], 'estimated_memory_mb': data['estimated_memory_mb'] } return summary # Utility functions def get_npu_capabilities_summary() -> Dict[str, Any]: """Get comprehensive NPU capabilities summary""" capabilities = NPUCapabilities.get_specifications() monitor = NPUMonitor() return { 'specifications': capabilities, 'availability': monitor.npu_available, 'system_memory': monitor.get_system_memory_info(), 'device_info': monitor.get_npu_device_info(), 'estimated_capacity': NPUCapabilities.estimate_model_capacity(100, 'FP16') # 100M params example } def check_npu_saturation(inference_times: List[float]) -> Dict[str, Any]: """Check if NPU is saturated based on inference times""" monitor = NPUMonitor() return monitor.monitor_inference_performance(inference_times) def benchmark_model_capacity(model_sizes: List[int]) -> Dict[str, Any]: """Benchmark NPU capacity for different model sizes""" monitor = NPUMonitor() return monitor.benchmark_npu_capacity(model_sizes)