442 lines
16 KiB
Python
442 lines
16 KiB
Python
"""
|
|
Async Handler for UI Stability Fix
|
|
|
|
Properly handles all async operations in the dashboard with single event loop management,
|
|
proper exception handling, and timeout support to prevent async/await errors.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any, Callable, Coroutine, Dict, Optional, Union
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import functools
|
|
import weakref
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AsyncOperationError(Exception):
|
|
"""Exception raised for async operation errors"""
|
|
pass
|
|
|
|
|
|
class AsyncHandler:
|
|
"""
|
|
Centralized async operation handler with single event loop management
|
|
and proper exception handling for async operations.
|
|
"""
|
|
|
|
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
|
|
"""
|
|
Initialize the async handler
|
|
|
|
Args:
|
|
loop: Optional event loop to use. If None, creates a new one.
|
|
"""
|
|
self._loop = loop
|
|
self._thread = None
|
|
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="AsyncHandler")
|
|
self._running = False
|
|
self._callbacks = weakref.WeakSet()
|
|
self._timeout_default = 30.0 # Default timeout for operations
|
|
|
|
# Start the event loop in a separate thread if not provided
|
|
if self._loop is None:
|
|
self._start_event_loop_thread()
|
|
|
|
logger.info("AsyncHandler initialized with event loop management")
|
|
|
|
def _start_event_loop_thread(self):
|
|
"""Start the event loop in a separate thread"""
|
|
def run_event_loop():
|
|
"""Run the event loop in a separate thread"""
|
|
try:
|
|
self._loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self._loop)
|
|
self._running = True
|
|
logger.debug("Event loop started in separate thread")
|
|
self._loop.run_forever()
|
|
except Exception as e:
|
|
logger.error(f"Error in event loop thread: {e}")
|
|
finally:
|
|
self._running = False
|
|
logger.debug("Event loop thread stopped")
|
|
|
|
self._thread = threading.Thread(target=run_event_loop, daemon=True, name="AsyncHandler-EventLoop")
|
|
self._thread.start()
|
|
|
|
# Wait for the loop to be ready
|
|
timeout = 5.0
|
|
start_time = time.time()
|
|
while not self._running and (time.time() - start_time) < timeout:
|
|
time.sleep(0.1)
|
|
|
|
if not self._running:
|
|
raise AsyncOperationError("Failed to start event loop within timeout")
|
|
|
|
def is_running(self) -> bool:
|
|
"""Check if the async handler is running"""
|
|
return self._running and self._loop is not None and not self._loop.is_closed()
|
|
|
|
def run_async_safely(self, coro: Coroutine, timeout: Optional[float] = None) -> Any:
|
|
"""
|
|
Run an async coroutine safely with proper error handling and timeout
|
|
|
|
Args:
|
|
coro: The coroutine to run
|
|
timeout: Timeout in seconds (uses default if None)
|
|
|
|
Returns:
|
|
The result of the coroutine
|
|
|
|
Raises:
|
|
AsyncOperationError: If the operation fails or times out
|
|
"""
|
|
if not self.is_running():
|
|
raise AsyncOperationError("AsyncHandler is not running")
|
|
|
|
timeout = timeout or self._timeout_default
|
|
|
|
try:
|
|
# Schedule the coroutine on the event loop
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
asyncio.wait_for(coro, timeout=timeout),
|
|
self._loop
|
|
)
|
|
|
|
# Wait for the result with timeout
|
|
result = future.result(timeout=timeout + 1.0) # Add buffer to future timeout
|
|
logger.debug("Async operation completed successfully")
|
|
return result
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.error(f"Async operation timed out after {timeout} seconds")
|
|
raise AsyncOperationError(f"Operation timed out after {timeout} seconds")
|
|
except Exception as e:
|
|
logger.error(f"Async operation failed: {e}")
|
|
raise AsyncOperationError(f"Async operation failed: {e}")
|
|
|
|
def schedule_coroutine(self, coro: Coroutine, callback: Optional[Callable] = None) -> None:
|
|
"""
|
|
Schedule a coroutine to run asynchronously without waiting for result
|
|
|
|
Args:
|
|
coro: The coroutine to schedule
|
|
callback: Optional callback to call with the result
|
|
"""
|
|
if not self.is_running():
|
|
logger.warning("Cannot schedule coroutine: AsyncHandler is not running")
|
|
return
|
|
|
|
async def wrapped_coro():
|
|
"""Wrapper to handle exceptions and callbacks"""
|
|
try:
|
|
result = await coro
|
|
if callback:
|
|
try:
|
|
callback(result)
|
|
except Exception as e:
|
|
logger.error(f"Error in coroutine callback: {e}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error in scheduled coroutine: {e}")
|
|
if callback:
|
|
try:
|
|
callback(None) # Call callback with None on error
|
|
except Exception as cb_e:
|
|
logger.error(f"Error in error callback: {cb_e}")
|
|
|
|
try:
|
|
asyncio.run_coroutine_threadsafe(wrapped_coro(), self._loop)
|
|
logger.debug("Coroutine scheduled successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule coroutine: {e}")
|
|
|
|
def create_task_safely(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
|
"""
|
|
Create an asyncio task safely with proper error handling
|
|
|
|
Args:
|
|
coro: The coroutine to create a task for
|
|
name: Optional name for the task
|
|
|
|
Returns:
|
|
The created task or None if failed
|
|
"""
|
|
if not self.is_running():
|
|
logger.warning("Cannot create task: AsyncHandler is not running")
|
|
return None
|
|
|
|
async def create_task():
|
|
"""Create the task in the event loop"""
|
|
try:
|
|
task = asyncio.create_task(coro, name=name)
|
|
logger.debug(f"Task created: {name or 'unnamed'}")
|
|
return task
|
|
except Exception as e:
|
|
logger.error(f"Failed to create task {name}: {e}")
|
|
return None
|
|
|
|
try:
|
|
future = asyncio.run_coroutine_threadsafe(create_task(), self._loop)
|
|
return future.result(timeout=5.0)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create task {name}: {e}")
|
|
return None
|
|
|
|
async def handle_orchestrator_connection(self, orchestrator) -> bool:
|
|
"""
|
|
Handle orchestrator connection with proper async patterns
|
|
|
|
Args:
|
|
orchestrator: The orchestrator instance to connect to
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
logger.info("Connecting to orchestrator...")
|
|
|
|
# Add decision callback if orchestrator supports it
|
|
if hasattr(orchestrator, 'add_decision_callback'):
|
|
await orchestrator.add_decision_callback(self._handle_trading_decision)
|
|
logger.info("Decision callback added to orchestrator")
|
|
|
|
# Start COB integration if available
|
|
if hasattr(orchestrator, 'start_cob_integration'):
|
|
await orchestrator.start_cob_integration()
|
|
logger.info("COB integration started")
|
|
|
|
# Start continuous trading if available
|
|
if hasattr(orchestrator, 'start_continuous_trading'):
|
|
await orchestrator.start_continuous_trading()
|
|
logger.info("Continuous trading started")
|
|
|
|
logger.info("Successfully connected to orchestrator")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to orchestrator: {e}")
|
|
return False
|
|
|
|
async def handle_cob_integration(self, cob_integration) -> bool:
|
|
"""
|
|
Handle COB integration startup with proper async patterns
|
|
|
|
Args:
|
|
cob_integration: The COB integration instance
|
|
|
|
Returns:
|
|
True if startup successful, False otherwise
|
|
"""
|
|
try:
|
|
logger.info("Starting COB integration...")
|
|
|
|
if hasattr(cob_integration, 'start'):
|
|
await cob_integration.start()
|
|
logger.info("COB integration started successfully")
|
|
return True
|
|
else:
|
|
logger.warning("COB integration does not have start method")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start COB integration: {e}")
|
|
return False
|
|
|
|
async def _handle_trading_decision(self, decision: Dict[str, Any]) -> None:
|
|
"""
|
|
Handle trading decision with proper async patterns
|
|
|
|
Args:
|
|
decision: The trading decision dictionary
|
|
"""
|
|
try:
|
|
logger.debug(f"Handling trading decision: {decision.get('action', 'UNKNOWN')}")
|
|
|
|
# Process the decision (this would be customized based on needs)
|
|
# For now, just log it
|
|
symbol = decision.get('symbol', 'UNKNOWN')
|
|
action = decision.get('action', 'HOLD')
|
|
confidence = decision.get('confidence', 0.0)
|
|
|
|
logger.info(f"Trading decision processed: {action} {symbol} (confidence: {confidence:.2f})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling trading decision: {e}")
|
|
|
|
def run_in_executor(self, func: Callable, *args, **kwargs) -> Any:
|
|
"""
|
|
Run a blocking function in the thread pool executor
|
|
|
|
Args:
|
|
func: The function to run
|
|
*args: Positional arguments for the function
|
|
**kwargs: Keyword arguments for the function
|
|
|
|
Returns:
|
|
The result of the function
|
|
"""
|
|
if not self.is_running():
|
|
raise AsyncOperationError("AsyncHandler is not running")
|
|
|
|
try:
|
|
# Create a partial function with the arguments
|
|
partial_func = functools.partial(func, *args, **kwargs)
|
|
|
|
# Create a coroutine that runs the function in executor
|
|
async def run_in_executor_coro():
|
|
return await self._loop.run_in_executor(self._executor, partial_func)
|
|
|
|
# Run the coroutine
|
|
future = asyncio.run_coroutine_threadsafe(run_in_executor_coro(), self._loop)
|
|
|
|
result = future.result(timeout=self._timeout_default)
|
|
logger.debug("Executor function completed successfully")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error running function in executor: {e}")
|
|
raise AsyncOperationError(f"Executor function failed: {e}")
|
|
|
|
def add_periodic_task(self, coro_func: Callable[[], Coroutine], interval: float, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
|
"""
|
|
Add a periodic task that runs at specified intervals
|
|
|
|
Args:
|
|
coro_func: Function that returns a coroutine to run periodically
|
|
interval: Interval in seconds between runs
|
|
name: Optional name for the task
|
|
|
|
Returns:
|
|
The created task or None if failed
|
|
"""
|
|
async def periodic_runner():
|
|
"""Run the coroutine periodically"""
|
|
task_name = name or "periodic_task"
|
|
logger.info(f"Starting periodic task: {task_name} (interval: {interval}s)")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
coro = coro_func()
|
|
await coro
|
|
logger.debug(f"Periodic task {task_name} completed")
|
|
except Exception as e:
|
|
logger.error(f"Error in periodic task {task_name}: {e}")
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Periodic task {task_name} cancelled")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Fatal error in periodic task {task_name}: {e}")
|
|
|
|
return self.create_task_safely(periodic_runner(), name=f"periodic_{name}")
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the async handler and clean up resources"""
|
|
try:
|
|
logger.info("Stopping AsyncHandler...")
|
|
|
|
if self._loop and not self._loop.is_closed():
|
|
# Cancel all tasks
|
|
if self._loop.is_running():
|
|
asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop)
|
|
|
|
# Stop the event loop
|
|
self._loop.call_soon_threadsafe(self._loop.stop)
|
|
|
|
# Shutdown executor
|
|
if self._executor:
|
|
self._executor.shutdown(wait=True)
|
|
|
|
# Wait for thread to finish
|
|
if self._thread and self._thread.is_alive():
|
|
self._thread.join(timeout=5.0)
|
|
|
|
self._running = False
|
|
logger.info("AsyncHandler stopped successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error stopping AsyncHandler: {e}")
|
|
|
|
async def _cancel_all_tasks(self) -> None:
|
|
"""Cancel all running tasks"""
|
|
try:
|
|
tasks = [task for task in asyncio.all_tasks(self._loop) if not task.done()]
|
|
if tasks:
|
|
logger.info(f"Cancelling {len(tasks)} running tasks")
|
|
for task in tasks:
|
|
task.cancel()
|
|
|
|
# Wait for tasks to be cancelled
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
logger.debug("All tasks cancelled")
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling tasks: {e}")
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.stop()
|
|
|
|
|
|
class AsyncContextManager:
|
|
"""
|
|
Context manager for async operations that ensures proper cleanup
|
|
"""
|
|
|
|
def __init__(self, async_handler: AsyncHandler):
|
|
self.async_handler = async_handler
|
|
self.active_tasks = []
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
# Cancel any active tasks
|
|
for task in self.active_tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
def create_task(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
|
"""Create a task and track it for cleanup"""
|
|
task = self.async_handler.create_task_safely(coro, name)
|
|
if task:
|
|
self.active_tasks.append(task)
|
|
return task
|
|
|
|
|
|
def create_async_handler(loop: Optional[asyncio.AbstractEventLoop] = None) -> AsyncHandler:
|
|
"""
|
|
Factory function to create an AsyncHandler instance
|
|
|
|
Args:
|
|
loop: Optional event loop to use
|
|
|
|
Returns:
|
|
AsyncHandler instance
|
|
"""
|
|
return AsyncHandler(loop=loop)
|
|
|
|
|
|
def run_async_safely(coro: Coroutine, timeout: Optional[float] = None) -> Any:
|
|
"""
|
|
Convenience function to run a coroutine safely with a temporary AsyncHandler
|
|
|
|
Args:
|
|
coro: The coroutine to run
|
|
timeout: Timeout in seconds
|
|
|
|
Returns:
|
|
The result of the coroutine
|
|
"""
|
|
with AsyncHandler() as handler:
|
|
return handler.run_async_safely(coro, timeout=timeout) |