340 lines
12 KiB
Python
340 lines
12 KiB
Python
"""
|
|
Process Supervisor - Handles process monitoring, restarts, and supervision
|
|
Prevents silent failures by monitoring process health and restarting on crashes
|
|
"""
|
|
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import logging
|
|
import signal
|
|
import os
|
|
import sys
|
|
from typing import Dict, Any, Optional, Callable, List
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ProcessSupervisor:
|
|
"""Supervise processes and restart them on failure"""
|
|
|
|
def __init__(self, max_restarts: int = 5, restart_delay: int = 10):
|
|
"""
|
|
Initialize process supervisor
|
|
|
|
Args:
|
|
max_restarts: Maximum number of restarts before giving up
|
|
restart_delay: Delay in seconds between restarts
|
|
"""
|
|
self.max_restarts = max_restarts
|
|
self.restart_delay = restart_delay
|
|
|
|
self.processes: Dict[str, Dict[str, Any]] = {}
|
|
self.monitoring = False
|
|
self.monitor_thread = None
|
|
|
|
# Callbacks
|
|
self.process_started_callback: Optional[Callable] = None
|
|
self.process_failed_callback: Optional[Callable] = None
|
|
self.process_restarted_callback: Optional[Callable] = None
|
|
|
|
def add_process(self, name: str, command: List[str],
|
|
working_dir: Optional[str] = None,
|
|
env: Optional[Dict[str, str]] = None,
|
|
auto_restart: bool = True):
|
|
"""
|
|
Add a process to supervise
|
|
|
|
Args:
|
|
name: Process name
|
|
command: Command to run as list
|
|
working_dir: Working directory
|
|
env: Environment variables
|
|
auto_restart: Whether to auto-restart on failure
|
|
"""
|
|
self.processes[name] = {
|
|
'command': command,
|
|
'working_dir': working_dir,
|
|
'env': env,
|
|
'auto_restart': auto_restart,
|
|
'process': None,
|
|
'restart_count': 0,
|
|
'last_start': None,
|
|
'last_failure': None,
|
|
'status': 'stopped'
|
|
}
|
|
logger.info(f"Added process '{name}' to supervisor")
|
|
|
|
def start_process(self, name: str) -> bool:
|
|
"""Start a specific process"""
|
|
if name not in self.processes:
|
|
logger.error(f"Process '{name}' not found")
|
|
return False
|
|
|
|
proc_info = self.processes[name]
|
|
|
|
if proc_info['process'] and proc_info['process'].poll() is None:
|
|
logger.warning(f"Process '{name}' is already running")
|
|
return True
|
|
|
|
try:
|
|
# Prepare environment
|
|
env = os.environ.copy()
|
|
if proc_info['env']:
|
|
env.update(proc_info['env'])
|
|
|
|
# Start process
|
|
process = subprocess.Popen(
|
|
proc_info['command'],
|
|
cwd=proc_info['working_dir'],
|
|
env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
proc_info['process'] = process
|
|
proc_info['last_start'] = datetime.now()
|
|
proc_info['status'] = 'running'
|
|
|
|
logger.info(f"Started process '{name}' (PID: {process.pid})")
|
|
|
|
if self.process_started_callback:
|
|
try:
|
|
self.process_started_callback(name, process.pid)
|
|
except Exception as e:
|
|
logger.error(f"Error in process started callback: {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start process '{name}': {e}")
|
|
proc_info['status'] = 'failed'
|
|
proc_info['last_failure'] = datetime.now()
|
|
return False
|
|
|
|
def stop_process(self, name: str, timeout: int = 10) -> bool:
|
|
"""Stop a specific process"""
|
|
if name not in self.processes:
|
|
logger.error(f"Process '{name}' not found")
|
|
return False
|
|
|
|
proc_info = self.processes[name]
|
|
process = proc_info['process']
|
|
|
|
if not process or process.poll() is not None:
|
|
logger.info(f"Process '{name}' is not running")
|
|
proc_info['status'] = 'stopped'
|
|
return True
|
|
|
|
try:
|
|
# Try graceful shutdown first
|
|
process.terminate()
|
|
|
|
# Wait for graceful shutdown
|
|
try:
|
|
process.wait(timeout=timeout)
|
|
logger.info(f"Process '{name}' terminated gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
# Force kill if graceful shutdown fails
|
|
logger.warning(f"Process '{name}' did not terminate gracefully, force killing")
|
|
process.kill()
|
|
process.wait()
|
|
logger.info(f"Process '{name}' force killed")
|
|
|
|
proc_info['status'] = 'stopped'
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error stopping process '{name}': {e}")
|
|
return False
|
|
|
|
def restart_process(self, name: str) -> bool:
|
|
"""Restart a specific process"""
|
|
logger.info(f"Restarting process '{name}'")
|
|
|
|
if name not in self.processes:
|
|
logger.error(f"Process '{name}' not found")
|
|
return False
|
|
|
|
proc_info = self.processes[name]
|
|
|
|
# Stop if running
|
|
if proc_info['process'] and proc_info['process'].poll() is None:
|
|
self.stop_process(name)
|
|
|
|
# Wait restart delay
|
|
time.sleep(self.restart_delay)
|
|
|
|
# Increment restart count
|
|
proc_info['restart_count'] += 1
|
|
|
|
# Check restart limit
|
|
if proc_info['restart_count'] > self.max_restarts:
|
|
logger.error(f"Process '{name}' exceeded max restarts ({self.max_restarts})")
|
|
proc_info['status'] = 'failed_max_restarts'
|
|
return False
|
|
|
|
# Start process
|
|
success = self.start_process(name)
|
|
|
|
if success and self.process_restarted_callback:
|
|
try:
|
|
self.process_restarted_callback(name, proc_info['restart_count'])
|
|
except Exception as e:
|
|
logger.error(f"Error in process restarted callback: {e}")
|
|
|
|
return success
|
|
|
|
def start_monitoring(self):
|
|
"""Start process monitoring"""
|
|
if self.monitoring:
|
|
logger.warning("Process monitoring already started")
|
|
return
|
|
|
|
self.monitoring = True
|
|
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
|
self.monitor_thread.start()
|
|
logger.info("Process monitoring started")
|
|
|
|
def stop_monitoring(self):
|
|
"""Stop process monitoring"""
|
|
self.monitoring = False
|
|
if self.monitor_thread:
|
|
self.monitor_thread.join(timeout=5)
|
|
logger.info("Process monitoring stopped")
|
|
|
|
def _monitor_loop(self):
|
|
"""Main monitoring loop"""
|
|
logger.info("Process monitoring loop started")
|
|
|
|
while self.monitoring:
|
|
try:
|
|
for name, proc_info in self.processes.items():
|
|
self._check_process_health(name, proc_info)
|
|
|
|
time.sleep(5) # Check every 5 seconds
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in process monitoring loop: {e}")
|
|
time.sleep(5)
|
|
|
|
logger.info("Process monitoring loop stopped")
|
|
|
|
def _check_process_health(self, name: str, proc_info: Dict[str, Any]):
|
|
"""Check health of a specific process"""
|
|
process = proc_info['process']
|
|
|
|
if not process:
|
|
return
|
|
|
|
# Check if process is still running
|
|
return_code = process.poll()
|
|
|
|
if return_code is not None:
|
|
# Process has exited
|
|
proc_info['status'] = 'exited'
|
|
proc_info['last_failure'] = datetime.now()
|
|
|
|
logger.warning(f"Process '{name}' exited with code {return_code}")
|
|
|
|
# Read stdout/stderr for debugging
|
|
try:
|
|
stdout, stderr = process.communicate(timeout=1)
|
|
if stdout:
|
|
logger.info(f"Process '{name}' stdout: {stdout[-500:]}") # Last 500 chars
|
|
if stderr:
|
|
logger.error(f"Process '{name}' stderr: {stderr[-500:]}") # Last 500 chars
|
|
except Exception as e:
|
|
logger.warning(f"Could not read process output: {e}")
|
|
|
|
if self.process_failed_callback:
|
|
try:
|
|
self.process_failed_callback(name, return_code)
|
|
except Exception as e:
|
|
logger.error(f"Error in process failed callback: {e}")
|
|
|
|
# Auto-restart if enabled
|
|
if proc_info['auto_restart'] and proc_info['restart_count'] < self.max_restarts:
|
|
logger.info(f"Auto-restarting process '{name}'")
|
|
threading.Thread(target=self.restart_process, args=(name,), daemon=True).start()
|
|
|
|
def get_process_status(self, name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get status of a specific process"""
|
|
if name not in self.processes:
|
|
return None
|
|
|
|
proc_info = self.processes[name]
|
|
process = proc_info['process']
|
|
|
|
status = {
|
|
'name': name,
|
|
'status': proc_info['status'],
|
|
'restart_count': proc_info['restart_count'],
|
|
'last_start': proc_info['last_start'],
|
|
'last_failure': proc_info['last_failure'],
|
|
'auto_restart': proc_info['auto_restart'],
|
|
'pid': process.pid if process and process.poll() is None else None,
|
|
'running': process is not None and process.poll() is None
|
|
}
|
|
|
|
return status
|
|
|
|
def get_all_status(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get status of all processes"""
|
|
return {name: self.get_process_status(name) for name in self.processes}
|
|
|
|
def set_callbacks(self,
|
|
process_started: Optional[Callable] = None,
|
|
process_failed: Optional[Callable] = None,
|
|
process_restarted: Optional[Callable] = None):
|
|
"""Set callback functions for process events"""
|
|
self.process_started_callback = process_started
|
|
self.process_failed_callback = process_failed
|
|
self.process_restarted_callback = process_restarted
|
|
|
|
def shutdown_all(self):
|
|
"""Shutdown all processes"""
|
|
logger.info("Shutting down all supervised processes")
|
|
|
|
for name in list(self.processes.keys()):
|
|
self.stop_process(name)
|
|
|
|
self.stop_monitoring()
|
|
|
|
# Global instance
|
|
_process_supervisor = None
|
|
|
|
def get_process_supervisor() -> ProcessSupervisor:
|
|
"""Get global process supervisor instance"""
|
|
global _process_supervisor
|
|
if _process_supervisor is None:
|
|
_process_supervisor = ProcessSupervisor()
|
|
return _process_supervisor
|
|
|
|
def create_supervised_dashboard_runner():
|
|
"""Create a supervised version of the dashboard runner"""
|
|
supervisor = get_process_supervisor()
|
|
|
|
# Add dashboard process
|
|
supervisor.add_process(
|
|
name="clean_dashboard",
|
|
command=[sys.executable, "run_clean_dashboard.py"],
|
|
working_dir=os.getcwd(),
|
|
auto_restart=True
|
|
)
|
|
|
|
# Set up callbacks
|
|
def on_process_failed(name: str, return_code: int):
|
|
logger.error(f"Dashboard process failed with code {return_code}")
|
|
|
|
def on_process_restarted(name: str, restart_count: int):
|
|
logger.info(f"Dashboard restarted (attempt {restart_count})")
|
|
|
|
supervisor.set_callbacks(
|
|
process_failed=on_process_failed,
|
|
process_restarted=on_process_restarted
|
|
)
|
|
|
|
return supervisor |