Files
gogo2/utils/npu_capabilities.py
Dobromir Popov 00ae5bd579 NPU (wip); docker
2025-09-25 00:46:08 +03:00

363 lines
14 KiB
Python

"""
AMD Strix Halo NPU Capabilities and Monitoring
Provides detailed information about NPU specifications, memory usage, and saturation monitoring
"""
import os
import time
import logging
import subprocess
import psutil
from typing import Dict, Any, List, Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
class NPUCapabilities:
"""AMD Strix Halo NPU capabilities and specifications"""
# NPU Specifications (based on research)
SPECS = {
'compute_performance': 50, # TOPS (Tera Operations Per Second)
'architecture': 'XDNA',
'memory_type': 'Unified Memory Architecture',
'max_system_memory': 128, # GB
'memory_bandwidth': 'High-bandwidth unified memory',
'compute_units': '2D array of compute and memory tiles',
'precision_support': ['FP16', 'INT8', 'INT4'],
'max_model_size': 'Limited by available system memory',
'concurrent_models': 'Multiple (memory dependent)',
'latency_target': '< 1ms for small models',
'power_efficiency': 'Optimized for inference workloads'
}
@classmethod
def get_specifications(cls) -> Dict[str, Any]:
"""Get NPU specifications"""
return cls.SPECS.copy()
@classmethod
def estimate_model_capacity(cls, model_params: int, precision: str = 'FP16') -> Dict[str, Any]:
"""Estimate how many parameters the NPU can handle"""
# Memory requirements per parameter (bytes)
memory_per_param = {
'FP32': 4,
'FP16': 2,
'INT8': 1,
'INT4': 0.5
}
# Get available system memory
total_memory_gb = psutil.virtual_memory().total / (1024**3)
# Estimate memory needed for model
model_memory_gb = (model_params * memory_per_param.get(precision, 2)) / (1024**3)
# Reserve memory for system and other processes
available_memory_gb = total_memory_gb * 0.7 # Use 70% of total memory
# Calculate capacity
max_params = int((available_memory_gb * 1024**3) / memory_per_param.get(precision, 2))
return {
'model_parameters': model_params,
'precision': precision,
'model_memory_gb': model_memory_gb,
'total_system_memory_gb': total_memory_gb,
'available_memory_gb': available_memory_gb,
'max_parameters_supported': max_params,
'memory_utilization_percent': (model_memory_gb / available_memory_gb) * 100,
'can_fit_model': model_memory_gb <= available_memory_gb
}
class NPUMonitor:
"""Monitor NPU utilization and saturation"""
def __init__(self):
self.npu_available = self._check_npu_availability()
self.monitoring_data = []
self.start_time = time.time()
def _check_npu_availability(self) -> bool:
"""Check if NPU is available"""
try:
# Check for NPU devices
if os.path.exists('/dev/amdxdna'):
return True
# Check for NPU devices in /dev
result = subprocess.run(['ls', '/dev/amdxdna*'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0 and result.stdout.strip()
except Exception:
return False
def get_system_memory_info(self) -> Dict[str, Any]:
"""Get detailed system memory information"""
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
'total_gb': memory.total / (1024**3),
'available_gb': memory.available / (1024**3),
'used_gb': memory.used / (1024**3),
'free_gb': memory.free / (1024**3),
'usage_percent': memory.percent,
'swap_total_gb': swap.total / (1024**3),
'swap_used_gb': swap.used / (1024**3),
'swap_percent': swap.percent
}
def get_npu_device_info(self) -> Dict[str, Any]:
"""Get NPU device information"""
if not self.npu_available:
return {'available': False}
info = {'available': True}
try:
# Check NPU devices
result = subprocess.run(['ls', '/dev/amdxdna*'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
info['devices'] = result.stdout.strip().split('\n')
# Check kernel version
result = subprocess.run(['uname', '-r'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
info['kernel_version'] = result.stdout.strip()
# Check for NPU-specific files
npu_files = [
'/sys/class/amdxdna',
'/proc/amdxdna',
'/sys/devices/platform/amdxdna'
]
for file_path in npu_files:
if os.path.exists(file_path):
info['sysfs_path'] = file_path
break
except Exception as e:
info['error'] = str(e)
return info
def monitor_inference_performance(self, inference_times: List[float]) -> Dict[str, Any]:
"""Monitor inference performance and detect saturation"""
if not inference_times:
return {'error': 'No inference times provided'}
inference_times = np.array(inference_times)
# Calculate performance metrics
avg_latency = np.mean(inference_times)
min_latency = np.min(inference_times)
max_latency = np.max(inference_times)
std_latency = np.std(inference_times)
# Detect potential saturation
latency_variance = std_latency / avg_latency if avg_latency > 0 else 0
# Saturation indicators
saturation_indicators = {
'high_variance': latency_variance > 0.3, # High variance indicates instability
'increasing_latency': self._detect_trend(inference_times),
'latency_spikes': max_latency > avg_latency * 2, # Spikes indicate saturation
'average_latency_ms': avg_latency,
'latency_variance': latency_variance
}
# Performance assessment
performance_assessment = self._assess_performance(avg_latency, latency_variance)
return {
'inference_times_ms': inference_times.tolist(),
'avg_latency_ms': avg_latency,
'min_latency_ms': min_latency,
'max_latency_ms': max_latency,
'std_latency_ms': std_latency,
'latency_variance': latency_variance,
'saturation_indicators': saturation_indicators,
'performance_assessment': performance_assessment,
'samples': len(inference_times)
}
def _detect_trend(self, times: np.ndarray) -> bool:
"""Detect if latency is increasing over time"""
if len(times) < 10:
return False
# Simple linear trend detection
x = np.arange(len(times))
slope = np.polyfit(x, times, 1)[0]
return slope > 0.1 # Increasing trend
def _assess_performance(self, avg_latency: float, variance: float) -> str:
"""Assess NPU performance"""
if avg_latency < 1.0 and variance < 0.1:
return "Excellent"
elif avg_latency < 5.0 and variance < 0.2:
return "Good"
elif avg_latency < 10.0 and variance < 0.3:
return "Fair"
else:
return "Poor"
def get_npu_utilization(self) -> Dict[str, Any]:
"""Get NPU utilization metrics"""
if not self.npu_available:
return {'available': False, 'error': 'NPU not available'}
# Get system metrics
memory_info = self.get_system_memory_info()
device_info = self.get_npu_device_info()
# Estimate NPU utilization based on system metrics
# This is a simplified approach - real NPU utilization would require specific drivers
utilization = {
'available': True,
'memory_usage_percent': memory_info['usage_percent'],
'memory_available_gb': memory_info['available_gb'],
'device_info': device_info,
'estimated_load': 'Unknown', # Would need NPU-specific monitoring
'timestamp': time.time()
}
return utilization
def benchmark_npu_capacity(self, model_sizes: List[int]) -> Dict[str, Any]:
"""Benchmark NPU capacity with different model sizes"""
if not self.npu_available:
return {'available': False}
results = {}
memory_info = self.get_system_memory_info()
for model_size in model_sizes:
# Estimate memory requirements
capacity_info = NPUCapabilities.estimate_model_capacity(model_size)
results[f'model_{model_size}M'] = {
'parameters_millions': model_size,
'estimated_memory_gb': capacity_info['model_memory_gb'],
'can_fit': capacity_info['can_fit_model'],
'memory_utilization_percent': capacity_info['memory_utilization_percent']
}
return {
'available': True,
'system_memory_gb': memory_info['total_gb'],
'available_memory_gb': memory_info['available_gb'],
'model_capacity_results': results,
'recommendations': self._generate_capacity_recommendations(results)
}
def _generate_capacity_recommendations(self, results: Dict[str, Any]) -> List[str]:
"""Generate capacity recommendations"""
recommendations = []
for model_name, result in results.items():
if not result['can_fit']:
recommendations.append(f"Model {model_name} may not fit in available memory")
elif result['memory_utilization_percent'] > 80:
recommendations.append(f"Model {model_name} uses >80% of available memory")
if not recommendations:
recommendations.append("All tested models should fit comfortably in available memory")
return recommendations
class NPUPerformanceProfiler:
"""Profile NPU performance for specific models"""
def __init__(self):
self.monitor = NPUMonitor()
self.profiling_data = {}
def profile_model(self, model_name: str, input_shape: tuple,
iterations: int = 100) -> Dict[str, Any]:
"""Profile a specific model's performance"""
if not self.monitor.npu_available:
return {'error': 'NPU not available'}
# This would integrate with actual model inference
# For now, simulate performance data
# Simulate inference times (would be real measurements)
simulated_times = np.random.normal(2.5, 0.5, iterations).tolist()
# Monitor performance
performance_data = self.monitor.monitor_inference_performance(simulated_times)
# Calculate throughput
throughput = 1000 / np.mean(simulated_times) # inferences per second
# Estimate memory usage
input_size = np.prod(input_shape) * 4 # Assume FP32
estimated_memory_mb = input_size / (1024**2)
profile_result = {
'model_name': model_name,
'input_shape': input_shape,
'iterations': iterations,
'performance': performance_data,
'throughput_ips': throughput,
'estimated_memory_mb': estimated_memory_mb,
'npu_utilization': self.monitor.get_npu_utilization(),
'timestamp': time.time()
}
self.profiling_data[model_name] = profile_result
return profile_result
def get_profiling_summary(self) -> Dict[str, Any]:
"""Get summary of all profiled models"""
if not self.profiling_data:
return {'error': 'No profiling data available'}
summary = {
'total_models': len(self.profiling_data),
'models': {},
'overall_performance': 'Unknown'
}
for model_name, data in self.profiling_data.items():
summary['models'][model_name] = {
'avg_latency_ms': data['performance']['avg_latency_ms'],
'throughput_ips': data['throughput_ips'],
'performance_assessment': data['performance']['performance_assessment'],
'estimated_memory_mb': data['estimated_memory_mb']
}
return summary
# Utility functions
def get_npu_capabilities_summary() -> Dict[str, Any]:
"""Get comprehensive NPU capabilities summary"""
capabilities = NPUCapabilities.get_specifications()
monitor = NPUMonitor()
return {
'specifications': capabilities,
'availability': monitor.npu_available,
'system_memory': monitor.get_system_memory_info(),
'device_info': monitor.get_npu_device_info(),
'estimated_capacity': NPUCapabilities.estimate_model_capacity(100, 'FP16') # 100M params example
}
def check_npu_saturation(inference_times: List[float]) -> Dict[str, Any]:
"""Check if NPU is saturated based on inference times"""
monitor = NPUMonitor()
return monitor.monitor_inference_performance(inference_times)
def benchmark_model_capacity(model_sizes: List[int]) -> Dict[str, Any]:
"""Benchmark NPU capacity for different model sizes"""
monitor = NPUMonitor()
return monitor.benchmark_npu_capacity(model_sizes)