232 lines
8.6 KiB
Python
232 lines
8.6 KiB
Python
"""
|
|
Async Task Manager - Handles async tasks with comprehensive error handling
|
|
Prevents silent failures in async operations
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import functools
|
|
import traceback
|
|
from typing import Any, Callable, Optional, Dict, List
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AsyncTaskManager:
|
|
"""Manage async tasks with error handling and monitoring"""
|
|
|
|
def __init__(self):
|
|
self.active_tasks: Dict[str, asyncio.Task] = {}
|
|
self.completed_tasks: List[Dict[str, Any]] = []
|
|
self.failed_tasks: List[Dict[str, Any]] = []
|
|
self.max_history = 100
|
|
|
|
def create_task_with_error_handling(self,
|
|
coro: Any,
|
|
name: str,
|
|
error_callback: Optional[Callable] = None,
|
|
success_callback: Optional[Callable] = None) -> asyncio.Task:
|
|
"""
|
|
Create an async task with comprehensive error handling
|
|
|
|
Args:
|
|
coro: Coroutine to run
|
|
name: Task name for identification
|
|
error_callback: Called on error with (name, exception)
|
|
success_callback: Called on success with (name, result)
|
|
"""
|
|
|
|
async def wrapped_coro():
|
|
"""Wrapper coroutine with error handling"""
|
|
start_time = datetime.now()
|
|
try:
|
|
logger.debug(f"Starting async task: {name}")
|
|
result = await coro
|
|
|
|
# Log success
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
logger.debug(f"Async task '{name}' completed successfully in {duration:.2f}s")
|
|
|
|
# Store completion info
|
|
completion_info = {
|
|
'name': name,
|
|
'status': 'completed',
|
|
'start_time': start_time,
|
|
'end_time': datetime.now(),
|
|
'duration': duration,
|
|
'result': str(result)[:200] if result else None # Truncate long results
|
|
}
|
|
self.completed_tasks.append(completion_info)
|
|
|
|
# Trim history
|
|
if len(self.completed_tasks) > self.max_history:
|
|
self.completed_tasks.pop(0)
|
|
|
|
# Call success callback
|
|
if success_callback:
|
|
try:
|
|
success_callback(name, result)
|
|
except Exception as cb_error:
|
|
logger.error(f"Error in success callback for task '{name}': {cb_error}")
|
|
|
|
return result
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Async task '{name}' was cancelled")
|
|
raise
|
|
|
|
except Exception as e:
|
|
# Log error with full traceback
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
error_msg = f"Async task '{name}' failed after {duration:.2f}s: {e}"
|
|
logger.error(error_msg)
|
|
logger.error(f"Task '{name}' traceback: {traceback.format_exc()}")
|
|
|
|
# Store failure info
|
|
failure_info = {
|
|
'name': name,
|
|
'status': 'failed',
|
|
'start_time': start_time,
|
|
'end_time': datetime.now(),
|
|
'duration': duration,
|
|
'error': str(e),
|
|
'traceback': traceback.format_exc()
|
|
}
|
|
self.failed_tasks.append(failure_info)
|
|
|
|
# Trim history
|
|
if len(self.failed_tasks) > self.max_history:
|
|
self.failed_tasks.pop(0)
|
|
|
|
# Call error callback
|
|
if error_callback:
|
|
try:
|
|
error_callback(name, e)
|
|
except Exception as cb_error:
|
|
logger.error(f"Error in error callback for task '{name}': {cb_error}")
|
|
|
|
# Don't re-raise to prevent task from crashing the event loop
|
|
# Instead, return None to indicate failure
|
|
return None
|
|
|
|
finally:
|
|
# Remove from active tasks
|
|
if name in self.active_tasks:
|
|
del self.active_tasks[name]
|
|
|
|
# Create and store task
|
|
task = asyncio.create_task(wrapped_coro(), name=name)
|
|
self.active_tasks[name] = task
|
|
|
|
return task
|
|
|
|
def cancel_task(self, name: str) -> bool:
|
|
"""Cancel a specific task"""
|
|
if name in self.active_tasks:
|
|
task = self.active_tasks[name]
|
|
if not task.done():
|
|
task.cancel()
|
|
logger.info(f"Cancelled async task: {name}")
|
|
return True
|
|
return False
|
|
|
|
def cancel_all_tasks(self):
|
|
"""Cancel all active tasks"""
|
|
for name, task in list(self.active_tasks.items()):
|
|
if not task.done():
|
|
task.cancel()
|
|
logger.info(f"Cancelled async task: {name}")
|
|
|
|
def get_task_status(self) -> Dict[str, Any]:
|
|
"""Get status of all tasks"""
|
|
active_count = len(self.active_tasks)
|
|
completed_count = len(self.completed_tasks)
|
|
failed_count = len(self.failed_tasks)
|
|
|
|
# Get recent failures
|
|
recent_failures = self.failed_tasks[-5:] if self.failed_tasks else []
|
|
|
|
return {
|
|
'active_tasks': active_count,
|
|
'completed_tasks': completed_count,
|
|
'failed_tasks': failed_count,
|
|
'active_task_names': list(self.active_tasks.keys()),
|
|
'recent_failures': [
|
|
{
|
|
'name': f['name'],
|
|
'error': f['error'],
|
|
'duration': f['duration'],
|
|
'time': f['end_time'].strftime('%H:%M:%S')
|
|
}
|
|
for f in recent_failures
|
|
]
|
|
}
|
|
|
|
def get_failure_summary(self) -> Dict[str, Any]:
|
|
"""Get summary of task failures"""
|
|
if not self.failed_tasks:
|
|
return {'total_failures': 0, 'failure_patterns': {}}
|
|
|
|
# Count failures by error type
|
|
error_counts = {}
|
|
for failure in self.failed_tasks:
|
|
error_type = type(failure.get('error', 'Unknown')).__name__
|
|
error_counts[error_type] = error_counts.get(error_type, 0) + 1
|
|
|
|
# Recent failure rate
|
|
recent_failures = [f for f in self.failed_tasks if
|
|
(datetime.now() - f['end_time']).total_seconds() < 3600] # Last hour
|
|
|
|
return {
|
|
'total_failures': len(self.failed_tasks),
|
|
'recent_failures_1h': len(recent_failures),
|
|
'failure_patterns': error_counts,
|
|
'most_common_error': max(error_counts.items(), key=lambda x: x[1])[0] if error_counts else None
|
|
}
|
|
|
|
# Global instance
|
|
_task_manager = None
|
|
|
|
def get_async_task_manager() -> AsyncTaskManager:
|
|
"""Get global async task manager instance"""
|
|
global _task_manager
|
|
if _task_manager is None:
|
|
_task_manager = AsyncTaskManager()
|
|
return _task_manager
|
|
|
|
def create_safe_task(coro: Any,
|
|
name: str,
|
|
error_callback: Optional[Callable] = None,
|
|
success_callback: Optional[Callable] = None) -> asyncio.Task:
|
|
"""
|
|
Create a safe async task with error handling
|
|
|
|
Args:
|
|
coro: Coroutine to run
|
|
name: Task name for identification
|
|
error_callback: Called on error with (name, exception)
|
|
success_callback: Called on success with (name, result)
|
|
"""
|
|
manager = get_async_task_manager()
|
|
return manager.create_task_with_error_handling(coro, name, error_callback, success_callback)
|
|
|
|
def safe_async_wrapper(name: str,
|
|
error_callback: Optional[Callable] = None,
|
|
success_callback: Optional[Callable] = None):
|
|
"""
|
|
Decorator for creating safe async functions
|
|
|
|
Usage:
|
|
@safe_async_wrapper("my_task")
|
|
async def my_async_function():
|
|
# Your async code here
|
|
pass
|
|
"""
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
coro = func(*args, **kwargs)
|
|
task = create_safe_task(coro, name, error_callback, success_callback)
|
|
return await task
|
|
return wrapper
|
|
return decorator |