Files
gogo2/utils/process_supervisor.py
2025-07-26 22:32:45 +03:00

340 lines
12 KiB
Python

"""
Process Supervisor - Handles process monitoring, restarts, and supervision
Prevents silent failures by monitoring process health and restarting on crashes
"""
import subprocess
import threading
import time
import logging
import signal
import os
import sys
from typing import Dict, Any, Optional, Callable, List
from datetime import datetime, timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
class ProcessSupervisor:
"""Supervise processes and restart them on failure"""
def __init__(self, max_restarts: int = 5, restart_delay: int = 10):
"""
Initialize process supervisor
Args:
max_restarts: Maximum number of restarts before giving up
restart_delay: Delay in seconds between restarts
"""
self.max_restarts = max_restarts
self.restart_delay = restart_delay
self.processes: Dict[str, Dict[str, Any]] = {}
self.monitoring = False
self.monitor_thread = None
# Callbacks
self.process_started_callback: Optional[Callable] = None
self.process_failed_callback: Optional[Callable] = None
self.process_restarted_callback: Optional[Callable] = None
def add_process(self, name: str, command: List[str],
working_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
auto_restart: bool = True):
"""
Add a process to supervise
Args:
name: Process name
command: Command to run as list
working_dir: Working directory
env: Environment variables
auto_restart: Whether to auto-restart on failure
"""
self.processes[name] = {
'command': command,
'working_dir': working_dir,
'env': env,
'auto_restart': auto_restart,
'process': None,
'restart_count': 0,
'last_start': None,
'last_failure': None,
'status': 'stopped'
}
logger.info(f"Added process '{name}' to supervisor")
def start_process(self, name: str) -> bool:
"""Start a specific process"""
if name not in self.processes:
logger.error(f"Process '{name}' not found")
return False
proc_info = self.processes[name]
if proc_info['process'] and proc_info['process'].poll() is None:
logger.warning(f"Process '{name}' is already running")
return True
try:
# Prepare environment
env = os.environ.copy()
if proc_info['env']:
env.update(proc_info['env'])
# Start process
process = subprocess.Popen(
proc_info['command'],
cwd=proc_info['working_dir'],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
proc_info['process'] = process
proc_info['last_start'] = datetime.now()
proc_info['status'] = 'running'
logger.info(f"Started process '{name}' (PID: {process.pid})")
if self.process_started_callback:
try:
self.process_started_callback(name, process.pid)
except Exception as e:
logger.error(f"Error in process started callback: {e}")
return True
except Exception as e:
logger.error(f"Failed to start process '{name}': {e}")
proc_info['status'] = 'failed'
proc_info['last_failure'] = datetime.now()
return False
def stop_process(self, name: str, timeout: int = 10) -> bool:
"""Stop a specific process"""
if name not in self.processes:
logger.error(f"Process '{name}' not found")
return False
proc_info = self.processes[name]
process = proc_info['process']
if not process or process.poll() is not None:
logger.info(f"Process '{name}' is not running")
proc_info['status'] = 'stopped'
return True
try:
# Try graceful shutdown first
process.terminate()
# Wait for graceful shutdown
try:
process.wait(timeout=timeout)
logger.info(f"Process '{name}' terminated gracefully")
except subprocess.TimeoutExpired:
# Force kill if graceful shutdown fails
logger.warning(f"Process '{name}' did not terminate gracefully, force killing")
process.kill()
process.wait()
logger.info(f"Process '{name}' force killed")
proc_info['status'] = 'stopped'
return True
except Exception as e:
logger.error(f"Error stopping process '{name}': {e}")
return False
def restart_process(self, name: str) -> bool:
"""Restart a specific process"""
logger.info(f"Restarting process '{name}'")
if name not in self.processes:
logger.error(f"Process '{name}' not found")
return False
proc_info = self.processes[name]
# Stop if running
if proc_info['process'] and proc_info['process'].poll() is None:
self.stop_process(name)
# Wait restart delay
time.sleep(self.restart_delay)
# Increment restart count
proc_info['restart_count'] += 1
# Check restart limit
if proc_info['restart_count'] > self.max_restarts:
logger.error(f"Process '{name}' exceeded max restarts ({self.max_restarts})")
proc_info['status'] = 'failed_max_restarts'
return False
# Start process
success = self.start_process(name)
if success and self.process_restarted_callback:
try:
self.process_restarted_callback(name, proc_info['restart_count'])
except Exception as e:
logger.error(f"Error in process restarted callback: {e}")
return success
def start_monitoring(self):
"""Start process monitoring"""
if self.monitoring:
logger.warning("Process monitoring already started")
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("Process monitoring started")
def stop_monitoring(self):
"""Stop process monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
logger.info("Process monitoring stopped")
def _monitor_loop(self):
"""Main monitoring loop"""
logger.info("Process monitoring loop started")
while self.monitoring:
try:
for name, proc_info in self.processes.items():
self._check_process_health(name, proc_info)
time.sleep(5) # Check every 5 seconds
except Exception as e:
logger.error(f"Error in process monitoring loop: {e}")
time.sleep(5)
logger.info("Process monitoring loop stopped")
def _check_process_health(self, name: str, proc_info: Dict[str, Any]):
"""Check health of a specific process"""
process = proc_info['process']
if not process:
return
# Check if process is still running
return_code = process.poll()
if return_code is not None:
# Process has exited
proc_info['status'] = 'exited'
proc_info['last_failure'] = datetime.now()
logger.warning(f"Process '{name}' exited with code {return_code}")
# Read stdout/stderr for debugging
try:
stdout, stderr = process.communicate(timeout=1)
if stdout:
logger.info(f"Process '{name}' stdout: {stdout[-500:]}") # Last 500 chars
if stderr:
logger.error(f"Process '{name}' stderr: {stderr[-500:]}") # Last 500 chars
except Exception as e:
logger.warning(f"Could not read process output: {e}")
if self.process_failed_callback:
try:
self.process_failed_callback(name, return_code)
except Exception as e:
logger.error(f"Error in process failed callback: {e}")
# Auto-restart if enabled
if proc_info['auto_restart'] and proc_info['restart_count'] < self.max_restarts:
logger.info(f"Auto-restarting process '{name}'")
threading.Thread(target=self.restart_process, args=(name,), daemon=True).start()
def get_process_status(self, name: str) -> Optional[Dict[str, Any]]:
"""Get status of a specific process"""
if name not in self.processes:
return None
proc_info = self.processes[name]
process = proc_info['process']
status = {
'name': name,
'status': proc_info['status'],
'restart_count': proc_info['restart_count'],
'last_start': proc_info['last_start'],
'last_failure': proc_info['last_failure'],
'auto_restart': proc_info['auto_restart'],
'pid': process.pid if process and process.poll() is None else None,
'running': process is not None and process.poll() is None
}
return status
def get_all_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status of all processes"""
return {name: self.get_process_status(name) for name in self.processes}
def set_callbacks(self,
process_started: Optional[Callable] = None,
process_failed: Optional[Callable] = None,
process_restarted: Optional[Callable] = None):
"""Set callback functions for process events"""
self.process_started_callback = process_started
self.process_failed_callback = process_failed
self.process_restarted_callback = process_restarted
def shutdown_all(self):
"""Shutdown all processes"""
logger.info("Shutting down all supervised processes")
for name in list(self.processes.keys()):
self.stop_process(name)
self.stop_monitoring()
# Global instance
_process_supervisor = None
def get_process_supervisor() -> ProcessSupervisor:
"""Get global process supervisor instance"""
global _process_supervisor
if _process_supervisor is None:
_process_supervisor = ProcessSupervisor()
return _process_supervisor
def create_supervised_dashboard_runner():
"""Create a supervised version of the dashboard runner"""
supervisor = get_process_supervisor()
# Add dashboard process
supervisor.add_process(
name="clean_dashboard",
command=[sys.executable, "run_clean_dashboard.py"],
working_dir=os.getcwd(),
auto_restart=True
)
# Set up callbacks
def on_process_failed(name: str, return_code: int):
logger.error(f"Dashboard process failed with code {return_code}")
def on_process_restarted(name: str, restart_count: int):
logger.info(f"Dashboard restarted (attempt {restart_count})")
supervisor.set_callbacks(
process_failed=on_process_failed,
process_restarted=on_process_restarted
)
return supervisor