Files
gogo2/kill_dashboard.py
2025-09-02 17:59:12 +03:00

208 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
Cross-Platform Dashboard Process Cleanup Script
Works on both Linux and Windows systems.
"""
import os
import sys
import time
import signal
import subprocess
import logging
import platform
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def is_windows():
"""Check if running on Windows"""
return platform.system().lower() == "windows"
def kill_processes_windows():
"""Kill dashboard processes on Windows"""
killed_count = 0
try:
# Use tasklist to find Python processes
result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines[1:]: # Skip header
if line.strip() and 'python.exe' in line:
parts = line.split(',')
if len(parts) > 1:
pid = parts[1].strip('"')
try:
# Get command line to check if it's our dashboard
cmd_result = subprocess.run(['wmic', 'process', 'where', f'ProcessId={pid}', 'get', 'CommandLine', '/format:csv'],
capture_output=True, text=True, timeout=5)
if cmd_result.returncode == 0 and ('run_clean_dashboard' in cmd_result.stdout or 'clean_dashboard' in cmd_result.stdout):
logger.info(f"Killing Windows process {pid}")
subprocess.run(['taskkill', '/PID', pid, '/F'],
capture_output=True, timeout=5)
killed_count += 1
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
except Exception as e:
logger.debug(f"Error checking process {pid}: {e}")
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.debug("tasklist not available")
except Exception as e:
logger.error(f"Error in Windows process cleanup: {e}")
return killed_count
def kill_processes_linux():
"""Kill dashboard processes on Linux"""
killed_count = 0
# Find and kill processes by name
process_names = [
'run_clean_dashboard',
'clean_dashboard',
'python.*run_clean_dashboard',
'python.*clean_dashboard'
]
for process_name in process_names:
try:
# Use pgrep to find processes
result = subprocess.run(['pgrep', '-f', process_name],
capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
if pid.strip():
try:
logger.info(f"Killing Linux process {pid} ({process_name})")
os.kill(int(pid), signal.SIGTERM)
killed_count += 1
except (ProcessLookupError, ValueError) as e:
logger.debug(f"Process {pid} already terminated: {e}")
except Exception as e:
logger.warning(f"Error killing process {pid}: {e}")
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.debug(f"pgrep not available for {process_name}")
# Kill processes using port 8050
try:
result = subprocess.run(['lsof', '-ti', ':8050'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split('\n')
logger.info(f"Found processes using port 8050: {pids}")
for pid in pids:
if pid.strip():
try:
logger.info(f"Killing process {pid} using port 8050")
os.kill(int(pid), signal.SIGTERM)
killed_count += 1
except (ProcessLookupError, ValueError) as e:
logger.debug(f"Process {pid} already terminated: {e}")
except Exception as e:
logger.warning(f"Error killing process {pid}: {e}")
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.debug("lsof not available")
return killed_count
def check_port_8050():
"""Check if port 8050 is free (cross-platform)"""
import socket
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', 8050))
return True
except OSError:
return False
def kill_dashboard_processes():
"""Kill all dashboard-related processes (cross-platform)"""
logger.info("Killing dashboard processes...")
if is_windows():
logger.info("Detected Windows system")
killed_count = kill_processes_windows()
else:
logger.info("Detected Linux/Unix system")
killed_count = kill_processes_linux()
# Wait for processes to terminate
if killed_count > 0:
logger.info(f"Killed {killed_count} processes, waiting for termination...")
time.sleep(3)
# Force kill any remaining processes
if is_windows():
# Windows force kill
try:
result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines[1:]:
if line.strip() and 'python.exe' in line:
parts = line.split(',')
if len(parts) > 1:
pid = parts[1].strip('"')
try:
cmd_result = subprocess.run(['wmic', 'process', 'where', f'ProcessId={pid}', 'get', 'CommandLine', '/format:csv'],
capture_output=True, text=True, timeout=3)
if cmd_result.returncode == 0 and ('run_clean_dashboard' in cmd_result.stdout or 'clean_dashboard' in cmd_result.stdout):
logger.info(f"Force killing Windows process {pid}")
subprocess.run(['taskkill', '/PID', pid, '/F'],
capture_output=True, timeout=3)
except:
pass
except:
pass
else:
# Linux force kill
for process_name in ['run_clean_dashboard', 'clean_dashboard']:
try:
result = subprocess.run(['pgrep', '-f', process_name],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
if pid.strip():
try:
logger.info(f"Force killing Linux process {pid}")
os.kill(int(pid), signal.SIGKILL)
except (ProcessLookupError, ValueError):
pass
except Exception as e:
logger.warning(f"Error force killing process {pid}: {e}")
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return killed_count
def main():
logger.info("=== Cross-Platform Dashboard Process Cleanup ===")
logger.info(f"Platform: {platform.system()} {platform.release()}")
# Kill processes
killed = kill_dashboard_processes()
# Check port status
port_free = check_port_8050()
logger.info("=== Cleanup Summary ===")
logger.info(f"Processes killed: {killed}")
logger.info(f"Port 8050 free: {port_free}")
if port_free:
logger.info("✅ Ready for debugging - port 8050 is available")
else:
logger.warning("⚠️ Port 8050 may still be in use")
logger.info("💡 Try running this script again or restart your system")
if __name__ == "__main__":
main()