Files
gogo2/utils/async_task_manager.py
2025-07-26 22:32:45 +03:00

232 lines
8.6 KiB
Python

"""
Async Task Manager - Handles async tasks with comprehensive error handling
Prevents silent failures in async operations
"""
import asyncio
import logging
import functools
import traceback
from typing import Any, Callable, Optional, Dict, List
from datetime import datetime
logger = logging.getLogger(__name__)
class AsyncTaskManager:
"""Manage async tasks with error handling and monitoring"""
def __init__(self):
self.active_tasks: Dict[str, asyncio.Task] = {}
self.completed_tasks: List[Dict[str, Any]] = []
self.failed_tasks: List[Dict[str, Any]] = []
self.max_history = 100
def create_task_with_error_handling(self,
coro: Any,
name: str,
error_callback: Optional[Callable] = None,
success_callback: Optional[Callable] = None) -> asyncio.Task:
"""
Create an async task with comprehensive error handling
Args:
coro: Coroutine to run
name: Task name for identification
error_callback: Called on error with (name, exception)
success_callback: Called on success with (name, result)
"""
async def wrapped_coro():
"""Wrapper coroutine with error handling"""
start_time = datetime.now()
try:
logger.debug(f"Starting async task: {name}")
result = await coro
# Log success
duration = (datetime.now() - start_time).total_seconds()
logger.debug(f"Async task '{name}' completed successfully in {duration:.2f}s")
# Store completion info
completion_info = {
'name': name,
'status': 'completed',
'start_time': start_time,
'end_time': datetime.now(),
'duration': duration,
'result': str(result)[:200] if result else None # Truncate long results
}
self.completed_tasks.append(completion_info)
# Trim history
if len(self.completed_tasks) > self.max_history:
self.completed_tasks.pop(0)
# Call success callback
if success_callback:
try:
success_callback(name, result)
except Exception as cb_error:
logger.error(f"Error in success callback for task '{name}': {cb_error}")
return result
except asyncio.CancelledError:
logger.info(f"Async task '{name}' was cancelled")
raise
except Exception as e:
# Log error with full traceback
duration = (datetime.now() - start_time).total_seconds()
error_msg = f"Async task '{name}' failed after {duration:.2f}s: {e}"
logger.error(error_msg)
logger.error(f"Task '{name}' traceback: {traceback.format_exc()}")
# Store failure info
failure_info = {
'name': name,
'status': 'failed',
'start_time': start_time,
'end_time': datetime.now(),
'duration': duration,
'error': str(e),
'traceback': traceback.format_exc()
}
self.failed_tasks.append(failure_info)
# Trim history
if len(self.failed_tasks) > self.max_history:
self.failed_tasks.pop(0)
# Call error callback
if error_callback:
try:
error_callback(name, e)
except Exception as cb_error:
logger.error(f"Error in error callback for task '{name}': {cb_error}")
# Don't re-raise to prevent task from crashing the event loop
# Instead, return None to indicate failure
return None
finally:
# Remove from active tasks
if name in self.active_tasks:
del self.active_tasks[name]
# Create and store task
task = asyncio.create_task(wrapped_coro(), name=name)
self.active_tasks[name] = task
return task
def cancel_task(self, name: str) -> bool:
"""Cancel a specific task"""
if name in self.active_tasks:
task = self.active_tasks[name]
if not task.done():
task.cancel()
logger.info(f"Cancelled async task: {name}")
return True
return False
def cancel_all_tasks(self):
"""Cancel all active tasks"""
for name, task in list(self.active_tasks.items()):
if not task.done():
task.cancel()
logger.info(f"Cancelled async task: {name}")
def get_task_status(self) -> Dict[str, Any]:
"""Get status of all tasks"""
active_count = len(self.active_tasks)
completed_count = len(self.completed_tasks)
failed_count = len(self.failed_tasks)
# Get recent failures
recent_failures = self.failed_tasks[-5:] if self.failed_tasks else []
return {
'active_tasks': active_count,
'completed_tasks': completed_count,
'failed_tasks': failed_count,
'active_task_names': list(self.active_tasks.keys()),
'recent_failures': [
{
'name': f['name'],
'error': f['error'],
'duration': f['duration'],
'time': f['end_time'].strftime('%H:%M:%S')
}
for f in recent_failures
]
}
def get_failure_summary(self) -> Dict[str, Any]:
"""Get summary of task failures"""
if not self.failed_tasks:
return {'total_failures': 0, 'failure_patterns': {}}
# Count failures by error type
error_counts = {}
for failure in self.failed_tasks:
error_type = type(failure.get('error', 'Unknown')).__name__
error_counts[error_type] = error_counts.get(error_type, 0) + 1
# Recent failure rate
recent_failures = [f for f in self.failed_tasks if
(datetime.now() - f['end_time']).total_seconds() < 3600] # Last hour
return {
'total_failures': len(self.failed_tasks),
'recent_failures_1h': len(recent_failures),
'failure_patterns': error_counts,
'most_common_error': max(error_counts.items(), key=lambda x: x[1])[0] if error_counts else None
}
# Global instance
_task_manager = None
def get_async_task_manager() -> AsyncTaskManager:
"""Get global async task manager instance"""
global _task_manager
if _task_manager is None:
_task_manager = AsyncTaskManager()
return _task_manager
def create_safe_task(coro: Any,
name: str,
error_callback: Optional[Callable] = None,
success_callback: Optional[Callable] = None) -> asyncio.Task:
"""
Create a safe async task with error handling
Args:
coro: Coroutine to run
name: Task name for identification
error_callback: Called on error with (name, exception)
success_callback: Called on success with (name, result)
"""
manager = get_async_task_manager()
return manager.create_task_with_error_handling(coro, name, error_callback, success_callback)
def safe_async_wrapper(name: str,
error_callback: Optional[Callable] = None,
success_callback: Optional[Callable] = None):
"""
Decorator for creating safe async functions
Usage:
@safe_async_wrapper("my_task")
async def my_async_function():
# Your async code here
pass
"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
coro = func(*args, **kwargs)
task = create_safe_task(coro, name, error_callback, success_callback)
return await task
return wrapper
return decorator