""" Async Handler for UI Stability Fix Properly handles all async operations in the dashboard with single event loop management, proper exception handling, and timeout support to prevent async/await errors. """ import asyncio import logging import threading import time from typing import Any, Callable, Coroutine, Dict, Optional, Union from concurrent.futures import ThreadPoolExecutor import functools import weakref logger = logging.getLogger(__name__) class AsyncOperationError(Exception): """Exception raised for async operation errors""" pass class AsyncHandler: """ Centralized async operation handler with single event loop management and proper exception handling for async operations. """ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None): """ Initialize the async handler Args: loop: Optional event loop to use. If None, creates a new one. """ self._loop = loop self._thread = None self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="AsyncHandler") self._running = False self._callbacks = weakref.WeakSet() self._timeout_default = 30.0 # Default timeout for operations # Start the event loop in a separate thread if not provided if self._loop is None: self._start_event_loop_thread() logger.info("AsyncHandler initialized with event loop management") def _start_event_loop_thread(self): """Start the event loop in a separate thread""" def run_event_loop(): """Run the event loop in a separate thread""" try: self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._running = True logger.debug("Event loop started in separate thread") self._loop.run_forever() except Exception as e: logger.error(f"Error in event loop thread: {e}") finally: self._running = False logger.debug("Event loop thread stopped") self._thread = threading.Thread(target=run_event_loop, daemon=True, name="AsyncHandler-EventLoop") self._thread.start() # Wait for the loop to be ready timeout = 5.0 start_time = time.time() while not self._running and (time.time() - start_time) < timeout: time.sleep(0.1) if not self._running: raise AsyncOperationError("Failed to start event loop within timeout") def is_running(self) -> bool: """Check if the async handler is running""" return self._running and self._loop is not None and not self._loop.is_closed() def run_async_safely(self, coro: Coroutine, timeout: Optional[float] = None) -> Any: """ Run an async coroutine safely with proper error handling and timeout Args: coro: The coroutine to run timeout: Timeout in seconds (uses default if None) Returns: The result of the coroutine Raises: AsyncOperationError: If the operation fails or times out """ if not self.is_running(): raise AsyncOperationError("AsyncHandler is not running") timeout = timeout or self._timeout_default try: # Schedule the coroutine on the event loop future = asyncio.run_coroutine_threadsafe( asyncio.wait_for(coro, timeout=timeout), self._loop ) # Wait for the result with timeout result = future.result(timeout=timeout + 1.0) # Add buffer to future timeout logger.debug("Async operation completed successfully") return result except asyncio.TimeoutError: logger.error(f"Async operation timed out after {timeout} seconds") raise AsyncOperationError(f"Operation timed out after {timeout} seconds") except Exception as e: logger.error(f"Async operation failed: {e}") raise AsyncOperationError(f"Async operation failed: {e}") def schedule_coroutine(self, coro: Coroutine, callback: Optional[Callable] = None) -> None: """ Schedule a coroutine to run asynchronously without waiting for result Args: coro: The coroutine to schedule callback: Optional callback to call with the result """ if not self.is_running(): logger.warning("Cannot schedule coroutine: AsyncHandler is not running") return async def wrapped_coro(): """Wrapper to handle exceptions and callbacks""" try: result = await coro if callback: try: callback(result) except Exception as e: logger.error(f"Error in coroutine callback: {e}") return result except Exception as e: logger.error(f"Error in scheduled coroutine: {e}") if callback: try: callback(None) # Call callback with None on error except Exception as cb_e: logger.error(f"Error in error callback: {cb_e}") try: asyncio.run_coroutine_threadsafe(wrapped_coro(), self._loop) logger.debug("Coroutine scheduled successfully") except Exception as e: logger.error(f"Failed to schedule coroutine: {e}") def create_task_safely(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]: """ Create an asyncio task safely with proper error handling Args: coro: The coroutine to create a task for name: Optional name for the task Returns: The created task or None if failed """ if not self.is_running(): logger.warning("Cannot create task: AsyncHandler is not running") return None async def create_task(): """Create the task in the event loop""" try: task = asyncio.create_task(coro, name=name) logger.debug(f"Task created: {name or 'unnamed'}") return task except Exception as e: logger.error(f"Failed to create task {name}: {e}") return None try: future = asyncio.run_coroutine_threadsafe(create_task(), self._loop) return future.result(timeout=5.0) except Exception as e: logger.error(f"Failed to create task {name}: {e}") return None async def handle_orchestrator_connection(self, orchestrator) -> bool: """ Handle orchestrator connection with proper async patterns Args: orchestrator: The orchestrator instance to connect to Returns: True if connection successful, False otherwise """ try: logger.info("Connecting to orchestrator...") # Add decision callback if orchestrator supports it if hasattr(orchestrator, 'add_decision_callback'): await orchestrator.add_decision_callback(self._handle_trading_decision) logger.info("Decision callback added to orchestrator") # Start COB integration if available if hasattr(orchestrator, 'start_cob_integration'): await orchestrator.start_cob_integration() logger.info("COB integration started") # Start continuous trading if available if hasattr(orchestrator, 'start_continuous_trading'): await orchestrator.start_continuous_trading() logger.info("Continuous trading started") logger.info("Successfully connected to orchestrator") return True except Exception as e: logger.error(f"Failed to connect to orchestrator: {e}") return False async def handle_cob_integration(self, cob_integration) -> bool: """ Handle COB integration startup with proper async patterns Args: cob_integration: The COB integration instance Returns: True if startup successful, False otherwise """ try: logger.info("Starting COB integration...") if hasattr(cob_integration, 'start'): await cob_integration.start() logger.info("COB integration started successfully") return True else: logger.warning("COB integration does not have start method") return False except Exception as e: logger.error(f"Failed to start COB integration: {e}") return False async def _handle_trading_decision(self, decision: Dict[str, Any]) -> None: """ Handle trading decision with proper async patterns Args: decision: The trading decision dictionary """ try: logger.debug(f"Handling trading decision: {decision.get('action', 'UNKNOWN')}") # Process the decision (this would be customized based on needs) # For now, just log it symbol = decision.get('symbol', 'UNKNOWN') action = decision.get('action', 'HOLD') confidence = decision.get('confidence', 0.0) logger.info(f"Trading decision processed: {action} {symbol} (confidence: {confidence:.2f})") except Exception as e: logger.error(f"Error handling trading decision: {e}") def run_in_executor(self, func: Callable, *args, **kwargs) -> Any: """ Run a blocking function in the thread pool executor Args: func: The function to run *args: Positional arguments for the function **kwargs: Keyword arguments for the function Returns: The result of the function """ if not self.is_running(): raise AsyncOperationError("AsyncHandler is not running") try: # Create a partial function with the arguments partial_func = functools.partial(func, *args, **kwargs) # Create a coroutine that runs the function in executor async def run_in_executor_coro(): return await self._loop.run_in_executor(self._executor, partial_func) # Run the coroutine future = asyncio.run_coroutine_threadsafe(run_in_executor_coro(), self._loop) result = future.result(timeout=self._timeout_default) logger.debug("Executor function completed successfully") return result except Exception as e: logger.error(f"Error running function in executor: {e}") raise AsyncOperationError(f"Executor function failed: {e}") def add_periodic_task(self, coro_func: Callable[[], Coroutine], interval: float, name: Optional[str] = None) -> Optional[asyncio.Task]: """ Add a periodic task that runs at specified intervals Args: coro_func: Function that returns a coroutine to run periodically interval: Interval in seconds between runs name: Optional name for the task Returns: The created task or None if failed """ async def periodic_runner(): """Run the coroutine periodically""" task_name = name or "periodic_task" logger.info(f"Starting periodic task: {task_name} (interval: {interval}s)") try: while True: try: coro = coro_func() await coro logger.debug(f"Periodic task {task_name} completed") except Exception as e: logger.error(f"Error in periodic task {task_name}: {e}") await asyncio.sleep(interval) except asyncio.CancelledError: logger.info(f"Periodic task {task_name} cancelled") raise except Exception as e: logger.error(f"Fatal error in periodic task {task_name}: {e}") return self.create_task_safely(periodic_runner(), name=f"periodic_{name}") def stop(self) -> None: """Stop the async handler and clean up resources""" try: logger.info("Stopping AsyncHandler...") if self._loop and not self._loop.is_closed(): # Cancel all tasks if self._loop.is_running(): asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop) # Stop the event loop self._loop.call_soon_threadsafe(self._loop.stop) # Shutdown executor if self._executor: self._executor.shutdown(wait=True) # Wait for thread to finish if self._thread and self._thread.is_alive(): self._thread.join(timeout=5.0) self._running = False logger.info("AsyncHandler stopped successfully") except Exception as e: logger.error(f"Error stopping AsyncHandler: {e}") async def _cancel_all_tasks(self) -> None: """Cancel all running tasks""" try: tasks = [task for task in asyncio.all_tasks(self._loop) if not task.done()] if tasks: logger.info(f"Cancelling {len(tasks)} running tasks") for task in tasks: task.cancel() # Wait for tasks to be cancelled await asyncio.gather(*tasks, return_exceptions=True) logger.debug("All tasks cancelled") except Exception as e: logger.error(f"Error cancelling tasks: {e}") def __enter__(self): """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.stop() class AsyncContextManager: """ Context manager for async operations that ensures proper cleanup """ def __init__(self, async_handler: AsyncHandler): self.async_handler = async_handler self.active_tasks = [] def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): # Cancel any active tasks for task in self.active_tasks: if not task.done(): task.cancel() def create_task(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]: """Create a task and track it for cleanup""" task = self.async_handler.create_task_safely(coro, name) if task: self.active_tasks.append(task) return task def create_async_handler(loop: Optional[asyncio.AbstractEventLoop] = None) -> AsyncHandler: """ Factory function to create an AsyncHandler instance Args: loop: Optional event loop to use Returns: AsyncHandler instance """ return AsyncHandler(loop=loop) def run_async_safely(coro: Coroutine, timeout: Optional[float] = None) -> Any: """ Convenience function to run a coroutine safely with a temporary AsyncHandler Args: coro: The coroutine to run timeout: Timeout in seconds Returns: The result of the coroutine """ with AsyncHandler() as handler: return handler.run_async_safely(coro, timeout=timeout)