fix model mappings,dash updates, trading
This commit is contained in:
442
core/async_handler.py
Normal file
442
core/async_handler.py
Normal file
@ -0,0 +1,442 @@
|
||||
"""
|
||||
Async Handler for UI Stability Fix
|
||||
|
||||
Properly handles all async operations in the dashboard with single event loop management,
|
||||
proper exception handling, and timeout support to prevent async/await errors.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Callable, Coroutine, Dict, Optional, Union
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import functools
|
||||
import weakref
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AsyncOperationError(Exception):
|
||||
"""Exception raised for async operation errors"""
|
||||
pass
|
||||
|
||||
|
||||
class AsyncHandler:
|
||||
"""
|
||||
Centralized async operation handler with single event loop management
|
||||
and proper exception handling for async operations.
|
||||
"""
|
||||
|
||||
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None):
|
||||
"""
|
||||
Initialize the async handler
|
||||
|
||||
Args:
|
||||
loop: Optional event loop to use. If None, creates a new one.
|
||||
"""
|
||||
self._loop = loop
|
||||
self._thread = None
|
||||
self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="AsyncHandler")
|
||||
self._running = False
|
||||
self._callbacks = weakref.WeakSet()
|
||||
self._timeout_default = 30.0 # Default timeout for operations
|
||||
|
||||
# Start the event loop in a separate thread if not provided
|
||||
if self._loop is None:
|
||||
self._start_event_loop_thread()
|
||||
|
||||
logger.info("AsyncHandler initialized with event loop management")
|
||||
|
||||
def _start_event_loop_thread(self):
|
||||
"""Start the event loop in a separate thread"""
|
||||
def run_event_loop():
|
||||
"""Run the event loop in a separate thread"""
|
||||
try:
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
self._running = True
|
||||
logger.debug("Event loop started in separate thread")
|
||||
self._loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in event loop thread: {e}")
|
||||
finally:
|
||||
self._running = False
|
||||
logger.debug("Event loop thread stopped")
|
||||
|
||||
self._thread = threading.Thread(target=run_event_loop, daemon=True, name="AsyncHandler-EventLoop")
|
||||
self._thread.start()
|
||||
|
||||
# Wait for the loop to be ready
|
||||
timeout = 5.0
|
||||
start_time = time.time()
|
||||
while not self._running and (time.time() - start_time) < timeout:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not self._running:
|
||||
raise AsyncOperationError("Failed to start event loop within timeout")
|
||||
|
||||
def is_running(self) -> bool:
|
||||
"""Check if the async handler is running"""
|
||||
return self._running and self._loop is not None and not self._loop.is_closed()
|
||||
|
||||
def run_async_safely(self, coro: Coroutine, timeout: Optional[float] = None) -> Any:
|
||||
"""
|
||||
Run an async coroutine safely with proper error handling and timeout
|
||||
|
||||
Args:
|
||||
coro: The coroutine to run
|
||||
timeout: Timeout in seconds (uses default if None)
|
||||
|
||||
Returns:
|
||||
The result of the coroutine
|
||||
|
||||
Raises:
|
||||
AsyncOperationError: If the operation fails or times out
|
||||
"""
|
||||
if not self.is_running():
|
||||
raise AsyncOperationError("AsyncHandler is not running")
|
||||
|
||||
timeout = timeout or self._timeout_default
|
||||
|
||||
try:
|
||||
# Schedule the coroutine on the event loop
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
asyncio.wait_for(coro, timeout=timeout),
|
||||
self._loop
|
||||
)
|
||||
|
||||
# Wait for the result with timeout
|
||||
result = future.result(timeout=timeout + 1.0) # Add buffer to future timeout
|
||||
logger.debug("Async operation completed successfully")
|
||||
return result
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"Async operation timed out after {timeout} seconds")
|
||||
raise AsyncOperationError(f"Operation timed out after {timeout} seconds")
|
||||
except Exception as e:
|
||||
logger.error(f"Async operation failed: {e}")
|
||||
raise AsyncOperationError(f"Async operation failed: {e}")
|
||||
|
||||
def schedule_coroutine(self, coro: Coroutine, callback: Optional[Callable] = None) -> None:
|
||||
"""
|
||||
Schedule a coroutine to run asynchronously without waiting for result
|
||||
|
||||
Args:
|
||||
coro: The coroutine to schedule
|
||||
callback: Optional callback to call with the result
|
||||
"""
|
||||
if not self.is_running():
|
||||
logger.warning("Cannot schedule coroutine: AsyncHandler is not running")
|
||||
return
|
||||
|
||||
async def wrapped_coro():
|
||||
"""Wrapper to handle exceptions and callbacks"""
|
||||
try:
|
||||
result = await coro
|
||||
if callback:
|
||||
try:
|
||||
callback(result)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in coroutine callback: {e}")
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"Error in scheduled coroutine: {e}")
|
||||
if callback:
|
||||
try:
|
||||
callback(None) # Call callback with None on error
|
||||
except Exception as cb_e:
|
||||
logger.error(f"Error in error callback: {cb_e}")
|
||||
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(wrapped_coro(), self._loop)
|
||||
logger.debug("Coroutine scheduled successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to schedule coroutine: {e}")
|
||||
|
||||
def create_task_safely(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
||||
"""
|
||||
Create an asyncio task safely with proper error handling
|
||||
|
||||
Args:
|
||||
coro: The coroutine to create a task for
|
||||
name: Optional name for the task
|
||||
|
||||
Returns:
|
||||
The created task or None if failed
|
||||
"""
|
||||
if not self.is_running():
|
||||
logger.warning("Cannot create task: AsyncHandler is not running")
|
||||
return None
|
||||
|
||||
async def create_task():
|
||||
"""Create the task in the event loop"""
|
||||
try:
|
||||
task = asyncio.create_task(coro, name=name)
|
||||
logger.debug(f"Task created: {name or 'unnamed'}")
|
||||
return task
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create task {name}: {e}")
|
||||
return None
|
||||
|
||||
try:
|
||||
future = asyncio.run_coroutine_threadsafe(create_task(), self._loop)
|
||||
return future.result(timeout=5.0)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create task {name}: {e}")
|
||||
return None
|
||||
|
||||
async def handle_orchestrator_connection(self, orchestrator) -> bool:
|
||||
"""
|
||||
Handle orchestrator connection with proper async patterns
|
||||
|
||||
Args:
|
||||
orchestrator: The orchestrator instance to connect to
|
||||
|
||||
Returns:
|
||||
True if connection successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
logger.info("Connecting to orchestrator...")
|
||||
|
||||
# Add decision callback if orchestrator supports it
|
||||
if hasattr(orchestrator, 'add_decision_callback'):
|
||||
await orchestrator.add_decision_callback(self._handle_trading_decision)
|
||||
logger.info("Decision callback added to orchestrator")
|
||||
|
||||
# Start COB integration if available
|
||||
if hasattr(orchestrator, 'start_cob_integration'):
|
||||
await orchestrator.start_cob_integration()
|
||||
logger.info("COB integration started")
|
||||
|
||||
# Start continuous trading if available
|
||||
if hasattr(orchestrator, 'start_continuous_trading'):
|
||||
await orchestrator.start_continuous_trading()
|
||||
logger.info("Continuous trading started")
|
||||
|
||||
logger.info("Successfully connected to orchestrator")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to orchestrator: {e}")
|
||||
return False
|
||||
|
||||
async def handle_cob_integration(self, cob_integration) -> bool:
|
||||
"""
|
||||
Handle COB integration startup with proper async patterns
|
||||
|
||||
Args:
|
||||
cob_integration: The COB integration instance
|
||||
|
||||
Returns:
|
||||
True if startup successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
logger.info("Starting COB integration...")
|
||||
|
||||
if hasattr(cob_integration, 'start'):
|
||||
await cob_integration.start()
|
||||
logger.info("COB integration started successfully")
|
||||
return True
|
||||
else:
|
||||
logger.warning("COB integration does not have start method")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start COB integration: {e}")
|
||||
return False
|
||||
|
||||
async def _handle_trading_decision(self, decision: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Handle trading decision with proper async patterns
|
||||
|
||||
Args:
|
||||
decision: The trading decision dictionary
|
||||
"""
|
||||
try:
|
||||
logger.debug(f"Handling trading decision: {decision.get('action', 'UNKNOWN')}")
|
||||
|
||||
# Process the decision (this would be customized based on needs)
|
||||
# For now, just log it
|
||||
symbol = decision.get('symbol', 'UNKNOWN')
|
||||
action = decision.get('action', 'HOLD')
|
||||
confidence = decision.get('confidence', 0.0)
|
||||
|
||||
logger.info(f"Trading decision processed: {action} {symbol} (confidence: {confidence:.2f})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling trading decision: {e}")
|
||||
|
||||
def run_in_executor(self, func: Callable, *args, **kwargs) -> Any:
|
||||
"""
|
||||
Run a blocking function in the thread pool executor
|
||||
|
||||
Args:
|
||||
func: The function to run
|
||||
*args: Positional arguments for the function
|
||||
**kwargs: Keyword arguments for the function
|
||||
|
||||
Returns:
|
||||
The result of the function
|
||||
"""
|
||||
if not self.is_running():
|
||||
raise AsyncOperationError("AsyncHandler is not running")
|
||||
|
||||
try:
|
||||
# Create a partial function with the arguments
|
||||
partial_func = functools.partial(func, *args, **kwargs)
|
||||
|
||||
# Create a coroutine that runs the function in executor
|
||||
async def run_in_executor_coro():
|
||||
return await self._loop.run_in_executor(self._executor, partial_func)
|
||||
|
||||
# Run the coroutine
|
||||
future = asyncio.run_coroutine_threadsafe(run_in_executor_coro(), self._loop)
|
||||
|
||||
result = future.result(timeout=self._timeout_default)
|
||||
logger.debug("Executor function completed successfully")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error running function in executor: {e}")
|
||||
raise AsyncOperationError(f"Executor function failed: {e}")
|
||||
|
||||
def add_periodic_task(self, coro_func: Callable[[], Coroutine], interval: float, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
||||
"""
|
||||
Add a periodic task that runs at specified intervals
|
||||
|
||||
Args:
|
||||
coro_func: Function that returns a coroutine to run periodically
|
||||
interval: Interval in seconds between runs
|
||||
name: Optional name for the task
|
||||
|
||||
Returns:
|
||||
The created task or None if failed
|
||||
"""
|
||||
async def periodic_runner():
|
||||
"""Run the coroutine periodically"""
|
||||
task_name = name or "periodic_task"
|
||||
logger.info(f"Starting periodic task: {task_name} (interval: {interval}s)")
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
coro = coro_func()
|
||||
await coro
|
||||
logger.debug(f"Periodic task {task_name} completed")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in periodic task {task_name}: {e}")
|
||||
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Periodic task {task_name} cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error in periodic task {task_name}: {e}")
|
||||
|
||||
return self.create_task_safely(periodic_runner(), name=f"periodic_{name}")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the async handler and clean up resources"""
|
||||
try:
|
||||
logger.info("Stopping AsyncHandler...")
|
||||
|
||||
if self._loop and not self._loop.is_closed():
|
||||
# Cancel all tasks
|
||||
if self._loop.is_running():
|
||||
asyncio.run_coroutine_threadsafe(self._cancel_all_tasks(), self._loop)
|
||||
|
||||
# Stop the event loop
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
|
||||
# Shutdown executor
|
||||
if self._executor:
|
||||
self._executor.shutdown(wait=True)
|
||||
|
||||
# Wait for thread to finish
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._thread.join(timeout=5.0)
|
||||
|
||||
self._running = False
|
||||
logger.info("AsyncHandler stopped successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping AsyncHandler: {e}")
|
||||
|
||||
async def _cancel_all_tasks(self) -> None:
|
||||
"""Cancel all running tasks"""
|
||||
try:
|
||||
tasks = [task for task in asyncio.all_tasks(self._loop) if not task.done()]
|
||||
if tasks:
|
||||
logger.info(f"Cancelling {len(tasks)} running tasks")
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
# Wait for tasks to be cancelled
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
logger.debug("All tasks cancelled")
|
||||
except Exception as e:
|
||||
logger.error(f"Error cancelling tasks: {e}")
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
self.stop()
|
||||
|
||||
|
||||
class AsyncContextManager:
|
||||
"""
|
||||
Context manager for async operations that ensures proper cleanup
|
||||
"""
|
||||
|
||||
def __init__(self, async_handler: AsyncHandler):
|
||||
self.async_handler = async_handler
|
||||
self.active_tasks = []
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# Cancel any active tasks
|
||||
for task in self.active_tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
def create_task(self, coro: Coroutine, name: Optional[str] = None) -> Optional[asyncio.Task]:
|
||||
"""Create a task and track it for cleanup"""
|
||||
task = self.async_handler.create_task_safely(coro, name)
|
||||
if task:
|
||||
self.active_tasks.append(task)
|
||||
return task
|
||||
|
||||
|
||||
def create_async_handler(loop: Optional[asyncio.AbstractEventLoop] = None) -> AsyncHandler:
|
||||
"""
|
||||
Factory function to create an AsyncHandler instance
|
||||
|
||||
Args:
|
||||
loop: Optional event loop to use
|
||||
|
||||
Returns:
|
||||
AsyncHandler instance
|
||||
"""
|
||||
return AsyncHandler(loop=loop)
|
||||
|
||||
|
||||
def run_async_safely(coro: Coroutine, timeout: Optional[float] = None) -> Any:
|
||||
"""
|
||||
Convenience function to run a coroutine safely with a temporary AsyncHandler
|
||||
|
||||
Args:
|
||||
coro: The coroutine to run
|
||||
timeout: Timeout in seconds
|
||||
|
||||
Returns:
|
||||
The result of the coroutine
|
||||
"""
|
||||
with AsyncHandler() as handler:
|
||||
return handler.run_async_safely(coro, timeout=timeout)
|
@ -80,7 +80,7 @@ class COBIntegration:
|
||||
|
||||
async def start(self):
|
||||
"""Start COB integration with Enhanced WebSocket"""
|
||||
logger.info("🚀 Starting COB Integration with Enhanced WebSocket")
|
||||
logger.info(" Starting COB Integration with Enhanced WebSocket")
|
||||
|
||||
# Initialize Enhanced WebSocket first
|
||||
try:
|
||||
@ -94,10 +94,10 @@ class COBIntegration:
|
||||
|
||||
# Start enhanced WebSocket
|
||||
await self.enhanced_websocket.start()
|
||||
logger.info("✅ Enhanced WebSocket started successfully")
|
||||
logger.info(" Enhanced WebSocket started successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error starting Enhanced WebSocket: {e}")
|
||||
logger.error(f" Error starting Enhanced WebSocket: {e}")
|
||||
|
||||
# Initialize COB provider as fallback
|
||||
try:
|
||||
@ -115,13 +115,13 @@ class COBIntegration:
|
||||
asyncio.create_task(self._start_cob_provider_background())
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error initializing COB provider: {e}")
|
||||
logger.error(f" Error initializing COB provider: {e}")
|
||||
|
||||
# Start analysis threads
|
||||
asyncio.create_task(self._continuous_cob_analysis())
|
||||
asyncio.create_task(self._continuous_signal_generation())
|
||||
|
||||
logger.info("✅ COB Integration started successfully with Enhanced WebSocket")
|
||||
logger.info(" COB Integration started successfully with Enhanced WebSocket")
|
||||
|
||||
async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict):
|
||||
"""Handle COB updates from Enhanced WebSocket"""
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -97,7 +97,7 @@ class EnhancedCOBWebSocket:
|
||||
|
||||
logger.info(f"Enhanced COB WebSocket initialized for symbols: {self.symbols}")
|
||||
if not WEBSOCKETS_AVAILABLE:
|
||||
logger.error("⚠️ WebSockets module not available - COB data will be limited to REST API")
|
||||
logger.error("WebSockets module not available - COB data will be limited to REST API")
|
||||
|
||||
def add_cob_callback(self, callback: Callable):
|
||||
"""Add callback for COB data updates"""
|
||||
@ -109,7 +109,7 @@ class EnhancedCOBWebSocket:
|
||||
|
||||
async def start(self):
|
||||
"""Start COB WebSocket connections"""
|
||||
logger.info("🚀 Starting Enhanced COB WebSocket system")
|
||||
logger.info("Starting Enhanced COB WebSocket system")
|
||||
|
||||
# Initialize REST session for fallback
|
||||
await self._init_rest_session()
|
||||
@ -121,11 +121,11 @@ class EnhancedCOBWebSocket:
|
||||
# Start monitoring task
|
||||
asyncio.create_task(self._monitor_connections())
|
||||
|
||||
logger.info("✅ Enhanced COB WebSocket system started")
|
||||
logger.info("Enhanced COB WebSocket system started")
|
||||
|
||||
async def stop(self):
|
||||
"""Stop all WebSocket connections"""
|
||||
logger.info("🛑 Stopping Enhanced COB WebSocket system")
|
||||
logger.info("Stopping Enhanced COB WebSocket system")
|
||||
|
||||
# Cancel all WebSocket tasks
|
||||
for symbol, task in self.websocket_tasks.items():
|
||||
@ -149,21 +149,161 @@ class EnhancedCOBWebSocket:
|
||||
if self.rest_session:
|
||||
await self.rest_session.close()
|
||||
|
||||
logger.info("✅ Enhanced COB WebSocket system stopped")
|
||||
logger.info("Enhanced COB WebSocket system stopped")
|
||||
|
||||
async def _init_rest_session(self):
|
||||
"""Initialize REST API session for fallback"""
|
||||
"""Initialize REST API session for fallback and snapshots"""
|
||||
try:
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
self.rest_session = aiohttp.ClientSession(timeout=timeout)
|
||||
logger.info("✅ REST API session initialized for fallback")
|
||||
# Windows-compatible configuration without aiodns
|
||||
timeout = aiohttp.ClientTimeout(total=10, connect=5)
|
||||
connector = aiohttp.TCPConnector(
|
||||
limit=100,
|
||||
limit_per_host=10,
|
||||
enable_cleanup_closed=True,
|
||||
use_dns_cache=False, # Disable DNS cache to avoid aiodns
|
||||
family=0 # Use default family
|
||||
)
|
||||
self.rest_session = aiohttp.ClientSession(
|
||||
timeout=timeout,
|
||||
connector=connector,
|
||||
headers={'User-Agent': 'Enhanced-COB-WebSocket/1.0'}
|
||||
)
|
||||
logger.info("✅ REST API session initialized (Windows compatible)")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Failed to initialize REST session: {e}")
|
||||
logger.warning(f"⚠️ Failed to initialize REST session: {e}")
|
||||
# Try with minimal configuration
|
||||
try:
|
||||
self.rest_session = aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=10),
|
||||
connector=aiohttp.TCPConnector(use_dns_cache=False)
|
||||
)
|
||||
logger.info("✅ REST API session initialized with minimal config")
|
||||
except Exception as e2:
|
||||
logger.warning(f"⚠️ Failed to initialize minimal REST session: {e2}")
|
||||
# Continue without REST session - WebSocket only
|
||||
self.rest_session = None
|
||||
|
||||
async def _get_order_book_snapshot(self, symbol: str):
|
||||
"""Get initial order book snapshot from REST API
|
||||
|
||||
This is necessary for properly maintaining the order book state
|
||||
with the WebSocket depth stream.
|
||||
"""
|
||||
try:
|
||||
# Ensure REST session is available
|
||||
if not self.rest_session:
|
||||
await self._init_rest_session()
|
||||
|
||||
if not self.rest_session:
|
||||
logger.warning(f"⚠️ Cannot get order book snapshot for {symbol} - REST session not available, will use WebSocket data only")
|
||||
return
|
||||
|
||||
# Convert symbol format for Binance API
|
||||
binance_symbol = symbol.replace('/', '')
|
||||
|
||||
# Get order book snapshot with maximum depth
|
||||
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=1000"
|
||||
|
||||
logger.debug(f"🔍 Getting order book snapshot for {symbol} from {url}")
|
||||
|
||||
async with self.rest_session.get(url) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
|
||||
# Validate response structure
|
||||
if not isinstance(data, dict) or 'bids' not in data or 'asks' not in data:
|
||||
logger.error(f"❌ Invalid order book snapshot response for {symbol}: missing bids/asks")
|
||||
return
|
||||
|
||||
# Initialize order book state for proper WebSocket synchronization
|
||||
self.order_books[symbol] = {
|
||||
'bids': {float(price): float(qty) for price, qty in data['bids']},
|
||||
'asks': {float(price): float(qty) for price, qty in data['asks']}
|
||||
}
|
||||
|
||||
# Store last update ID for synchronization
|
||||
if 'lastUpdateId' in data:
|
||||
self.last_update_ids[symbol] = data['lastUpdateId']
|
||||
|
||||
logger.info(f"✅ Got order book snapshot for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
|
||||
|
||||
# Create initial COB data from snapshot
|
||||
bids = [{'price': float(price), 'size': float(qty)} for price, qty in data['bids'] if float(qty) > 0]
|
||||
asks = [{'price': float(price), 'size': float(qty)} for price, qty in data['asks'] if float(qty) > 0]
|
||||
|
||||
# Sort bids (descending) and asks (ascending)
|
||||
bids.sort(key=lambda x: x['price'], reverse=True)
|
||||
asks.sort(key=lambda x: x['price'])
|
||||
|
||||
# Create COB data structure if we have valid data
|
||||
if bids and asks:
|
||||
best_bid = bids[0]
|
||||
best_ask = asks[0]
|
||||
mid_price = (best_bid['price'] + best_ask['price']) / 2
|
||||
spread = best_ask['price'] - best_bid['price']
|
||||
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
|
||||
|
||||
# Calculate volumes
|
||||
bid_volume = sum(bid['size'] * bid['price'] for bid in bids)
|
||||
ask_volume = sum(ask['size'] * ask['price'] for ask in asks)
|
||||
total_volume = bid_volume + ask_volume
|
||||
|
||||
cob_data = {
|
||||
'symbol': symbol,
|
||||
'timestamp': datetime.now(),
|
||||
'bids': bids,
|
||||
'asks': asks,
|
||||
'source': 'rest_snapshot',
|
||||
'exchange': 'binance',
|
||||
'stats': {
|
||||
'best_bid': best_bid['price'],
|
||||
'best_ask': best_ask['price'],
|
||||
'mid_price': mid_price,
|
||||
'spread': spread,
|
||||
'spread_bps': spread_bps,
|
||||
'bid_volume': bid_volume,
|
||||
'ask_volume': ask_volume,
|
||||
'total_bid_volume': bid_volume,
|
||||
'total_ask_volume': ask_volume,
|
||||
'imbalance': (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0,
|
||||
'bid_levels': len(bids),
|
||||
'ask_levels': len(asks),
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
}
|
||||
|
||||
# Update cache
|
||||
self.latest_cob_data[symbol] = cob_data
|
||||
|
||||
# Notify callbacks
|
||||
for callback in self.cob_callbacks:
|
||||
try:
|
||||
await callback(symbol, cob_data)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error in COB callback: {e}")
|
||||
|
||||
logger.debug(f"📊 Initial snapshot for {symbol}: ${mid_price:.2f}, spread: {spread_bps:.1f} bps")
|
||||
else:
|
||||
logger.warning(f"⚠️ No valid bid/ask data in snapshot for {symbol}")
|
||||
|
||||
elif response.status == 429:
|
||||
logger.warning(f"⚠️ Rate limited getting snapshot for {symbol}, will continue with WebSocket only")
|
||||
else:
|
||||
logger.error(f"❌ Failed to get order book snapshot for {symbol}: HTTP {response.status}")
|
||||
response_text = await response.text()
|
||||
logger.debug(f"Response: {response_text}")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"⚠️ Timeout getting order book snapshot for {symbol}, will continue with WebSocket only")
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Error getting order book snapshot for {symbol}: {e}, will continue with WebSocket only")
|
||||
logger.debug(f"Snapshot error details: {e}")
|
||||
# Don't fail the entire connection due to snapshot issues
|
||||
|
||||
async def _start_symbol_websocket(self, symbol: str):
|
||||
"""Start WebSocket connection for a specific symbol"""
|
||||
if not WEBSOCKETS_AVAILABLE:
|
||||
logger.warning(f"⚠️ WebSockets not available for {symbol}, starting REST fallback")
|
||||
logger.warning(f"WebSockets not available for {symbol}, starting REST fallback")
|
||||
await self._start_rest_fallback(symbol)
|
||||
return
|
||||
|
||||
@ -176,22 +316,25 @@ class EnhancedCOBWebSocket:
|
||||
self._websocket_connection_loop(symbol)
|
||||
)
|
||||
|
||||
logger.info(f"🔌 Started WebSocket task for {symbol}")
|
||||
logger.info(f"Started WebSocket task for {symbol}")
|
||||
|
||||
async def _websocket_connection_loop(self, symbol: str):
|
||||
"""Main WebSocket connection loop with reconnection logic"""
|
||||
"""Main WebSocket connection loop with reconnection logic
|
||||
|
||||
Uses depth@100ms for fastest updates with maximum depth.
|
||||
"""
|
||||
status = self.status[symbol]
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info(f"🔌 Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})")
|
||||
logger.info(f"Attempting WebSocket connection for {symbol} (attempt {status.connection_attempts + 1})")
|
||||
status.connection_attempts += 1
|
||||
|
||||
# Create WebSocket URL with maximum depth
|
||||
# Create WebSocket URL with maximum depth - use depth@100ms for fastest updates
|
||||
ws_symbol = symbol.replace('/', '').lower() # BTCUSDT, ETHUSDT
|
||||
ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@{self.update_speed}"
|
||||
ws_url = f"wss://stream.binance.com:9443/ws/{ws_symbol}@depth@100ms"
|
||||
|
||||
logger.info(f"🔗 Connecting to: {ws_url}")
|
||||
logger.info(f"Connecting to: {ws_url}")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
# Connection successful
|
||||
@ -199,7 +342,7 @@ class EnhancedCOBWebSocket:
|
||||
status.last_error = None
|
||||
status.reset_reconnect_delay()
|
||||
|
||||
logger.info(f"✅ WebSocket connected for {symbol}")
|
||||
logger.info(f"WebSocket connected for {symbol}")
|
||||
await self._notify_dashboard_status(symbol, "connected", "WebSocket connected")
|
||||
|
||||
# Deactivate REST fallback
|
||||
@ -216,24 +359,24 @@ class EnhancedCOBWebSocket:
|
||||
status.messages_received += 1
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f"⚠️ Invalid JSON from {symbol} WebSocket: {e}")
|
||||
logger.warning(f"Invalid JSON from {symbol} WebSocket: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}")
|
||||
logger.error(f"Error processing WebSocket message for {symbol}: {e}")
|
||||
|
||||
except ConnectionClosed as e:
|
||||
status.connected = False
|
||||
status.last_error = f"Connection closed: {e}"
|
||||
logger.warning(f"🔌 WebSocket connection closed for {symbol}: {e}")
|
||||
logger.warning(f"WebSocket connection closed for {symbol}: {e}")
|
||||
|
||||
except WebSocketException as e:
|
||||
status.connected = False
|
||||
status.last_error = f"WebSocket error: {e}"
|
||||
logger.error(f"❌ WebSocket error for {symbol}: {e}")
|
||||
logger.error(f"WebSocket error for {symbol}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
status.connected = False
|
||||
status.last_error = f"Unexpected error: {e}"
|
||||
logger.error(f"❌ Unexpected WebSocket error for {symbol}: {e}")
|
||||
logger.error(f"Unexpected WebSocket error for {symbol}: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
# Connection failed or closed - start REST fallback
|
||||
@ -242,51 +385,163 @@ class EnhancedCOBWebSocket:
|
||||
|
||||
# Wait before reconnecting
|
||||
status.increase_reconnect_delay()
|
||||
logger.info(f"⏳ Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}")
|
||||
logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}")
|
||||
await asyncio.sleep(status.reconnect_delay)
|
||||
|
||||
async def _process_websocket_message(self, symbol: str, data: Dict):
|
||||
"""Process WebSocket message and convert to COB format"""
|
||||
try:
|
||||
# Binance depth stream format
|
||||
if 'b' in data and 'a' in data: # bids and asks
|
||||
cob_data = {
|
||||
'symbol': symbol,
|
||||
'timestamp': datetime.now(),
|
||||
'bids': [{'price': float(bid[0]), 'size': float(bid[1])} for bid in data['b']],
|
||||
'asks': [{'price': float(ask[0]), 'size': float(ask[1])} for ask in data['a']],
|
||||
'source': 'websocket',
|
||||
'exchange': 'binance'
|
||||
}
|
||||
|
||||
# Calculate stats
|
||||
if cob_data['bids'] and cob_data['asks']:
|
||||
best_bid = max(cob_data['bids'], key=lambda x: x['price'])
|
||||
best_ask = min(cob_data['asks'], key=lambda x: x['price'])
|
||||
|
||||
cob_data['stats'] = {
|
||||
'best_bid': best_bid['price'],
|
||||
'best_ask': best_ask['price'],
|
||||
'spread': best_ask['price'] - best_bid['price'],
|
||||
'mid_price': (best_bid['price'] + best_ask['price']) / 2,
|
||||
'bid_volume': sum(bid['size'] for bid in cob_data['bids']),
|
||||
'ask_volume': sum(ask['size'] for ask in cob_data['asks'])
|
||||
}
|
||||
|
||||
# Update cache
|
||||
self.latest_cob_data[symbol] = cob_data
|
||||
|
||||
# Notify callbacks
|
||||
for callback in self.cob_callbacks:
|
||||
try:
|
||||
await callback(symbol, cob_data)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error in COB callback: {e}")
|
||||
|
||||
logger.debug(f"📊 Processed WebSocket COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks")
|
||||
"""Process WebSocket message and convert to COB format
|
||||
|
||||
Based on the working implementation from cob_realtime_dashboard.py
|
||||
Using maximum depth for best performance - no order book maintenance needed.
|
||||
"""
|
||||
try:
|
||||
# Extract bids and asks from the message - handle all possible formats
|
||||
bids_data = data.get('b', [])
|
||||
asks_data = data.get('a', [])
|
||||
|
||||
# Process the order book data - filter out zero quantities
|
||||
# Binance uses 0 quantity to indicate removal from the book
|
||||
valid_bids = []
|
||||
valid_asks = []
|
||||
|
||||
# Process bids
|
||||
for bid in bids_data:
|
||||
try:
|
||||
if len(bid) >= 2:
|
||||
price = float(bid[0])
|
||||
size = float(bid[1])
|
||||
if size > 0: # Only include non-zero quantities
|
||||
valid_bids.append({'price': price, 'size': size})
|
||||
except (IndexError, ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Process asks
|
||||
for ask in asks_data:
|
||||
try:
|
||||
if len(ask) >= 2:
|
||||
price = float(ask[0])
|
||||
size = float(ask[1])
|
||||
if size > 0: # Only include non-zero quantities
|
||||
valid_asks.append({'price': price, 'size': size})
|
||||
except (IndexError, ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Sort bids (descending) and asks (ascending) for proper order book
|
||||
valid_bids.sort(key=lambda x: x['price'], reverse=True)
|
||||
valid_asks.sort(key=lambda x: x['price'])
|
||||
|
||||
# Limit to maximum depth (1000 levels for maximum DOM)
|
||||
max_depth = 1000
|
||||
if len(valid_bids) > max_depth:
|
||||
valid_bids = valid_bids[:max_depth]
|
||||
if len(valid_asks) > max_depth:
|
||||
valid_asks = valid_asks[:max_depth]
|
||||
|
||||
# Create COB data structure matching the working dashboard format
|
||||
cob_data = {
|
||||
'symbol': symbol,
|
||||
'timestamp': datetime.now(),
|
||||
'bids': valid_bids,
|
||||
'asks': valid_asks,
|
||||
'source': 'enhanced_websocket',
|
||||
'exchange': 'binance'
|
||||
}
|
||||
|
||||
# Calculate comprehensive stats if we have valid data
|
||||
if valid_bids and valid_asks:
|
||||
best_bid = valid_bids[0] # Already sorted, first is highest
|
||||
best_ask = valid_asks[0] # Already sorted, first is lowest
|
||||
|
||||
# Core price metrics
|
||||
mid_price = (best_bid['price'] + best_ask['price']) / 2
|
||||
spread = best_ask['price'] - best_bid['price']
|
||||
spread_bps = (spread / mid_price) * 10000 if mid_price > 0 else 0
|
||||
|
||||
# Volume calculations (notional value) - limit to top 20 levels for performance
|
||||
top_bids = valid_bids[:20]
|
||||
top_asks = valid_asks[:20]
|
||||
|
||||
bid_volume = sum(bid['size'] * bid['price'] for bid in top_bids)
|
||||
ask_volume = sum(ask['size'] * ask['price'] for ask in top_asks)
|
||||
|
||||
# Size calculations (base currency)
|
||||
bid_size = sum(bid['size'] for bid in top_bids)
|
||||
ask_size = sum(ask['size'] for ask in top_asks)
|
||||
|
||||
# Imbalance calculations
|
||||
total_volume = bid_volume + ask_volume
|
||||
volume_imbalance = (bid_volume - ask_volume) / total_volume if total_volume > 0 else 0
|
||||
|
||||
total_size = bid_size + ask_size
|
||||
size_imbalance = (bid_size - ask_size) / total_size if total_size > 0 else 0
|
||||
|
||||
cob_data['stats'] = {
|
||||
'best_bid': best_bid['price'],
|
||||
'best_ask': best_ask['price'],
|
||||
'mid_price': mid_price,
|
||||
'spread': spread,
|
||||
'spread_bps': spread_bps,
|
||||
'bid_volume': bid_volume,
|
||||
'ask_volume': ask_volume,
|
||||
'total_bid_volume': bid_volume,
|
||||
'total_ask_volume': ask_volume,
|
||||
'bid_liquidity': bid_volume, # Add liquidity fields
|
||||
'ask_liquidity': ask_volume,
|
||||
'total_bid_liquidity': bid_volume,
|
||||
'total_ask_liquidity': ask_volume,
|
||||
'bid_size': bid_size,
|
||||
'ask_size': ask_size,
|
||||
'volume_imbalance': volume_imbalance,
|
||||
'size_imbalance': size_imbalance,
|
||||
'imbalance': volume_imbalance, # Default to volume imbalance
|
||||
'bid_levels': len(valid_bids),
|
||||
'ask_levels': len(valid_asks),
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'update_id': data.get('u', 0), # Binance update ID
|
||||
'event_time': data.get('E', 0) # Binance event time
|
||||
}
|
||||
else:
|
||||
# Provide default stats if no valid data
|
||||
cob_data['stats'] = {
|
||||
'best_bid': 0,
|
||||
'best_ask': 0,
|
||||
'mid_price': 0,
|
||||
'spread': 0,
|
||||
'spread_bps': 0,
|
||||
'bid_volume': 0,
|
||||
'ask_volume': 0,
|
||||
'total_bid_volume': 0,
|
||||
'total_ask_volume': 0,
|
||||
'bid_size': 0,
|
||||
'ask_size': 0,
|
||||
'volume_imbalance': 0,
|
||||
'size_imbalance': 0,
|
||||
'imbalance': 0,
|
||||
'bid_levels': 0,
|
||||
'ask_levels': 0,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'update_id': data.get('u', 0),
|
||||
'event_time': data.get('E', 0)
|
||||
}
|
||||
|
||||
# Update cache
|
||||
self.latest_cob_data[symbol] = cob_data
|
||||
|
||||
# Notify callbacks
|
||||
for callback in self.cob_callbacks:
|
||||
try:
|
||||
await callback(symbol, cob_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in COB callback: {e}")
|
||||
|
||||
# Log success with key metrics (only for non-empty updates)
|
||||
if valid_bids and valid_asks:
|
||||
logger.debug(f"{symbol}: ${cob_data['stats']['mid_price']:.2f}, {len(valid_bids)} bids, {len(valid_asks)} asks, spread: {cob_data['stats']['spread_bps']:.1f} bps")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error processing WebSocket message for {symbol}: {e}")
|
||||
logger.error(f"Error processing WebSocket message for {symbol}: {e}")
|
||||
import traceback
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
async def _start_rest_fallback(self, symbol: str):
|
||||
"""Start REST API fallback for a symbol"""
|
||||
@ -304,7 +559,7 @@ class EnhancedCOBWebSocket:
|
||||
self._rest_fallback_loop(symbol)
|
||||
)
|
||||
|
||||
logger.warning(f"⚠️ Started REST API fallback for {symbol}")
|
||||
logger.warning(f"Started REST API fallback for {symbol}")
|
||||
await self._notify_dashboard_status(symbol, "fallback", "Using REST API fallback")
|
||||
|
||||
async def _stop_rest_fallback(self, symbol: str):
|
||||
@ -317,7 +572,7 @@ class EnhancedCOBWebSocket:
|
||||
if symbol in self.rest_tasks and not self.rest_tasks[symbol].done():
|
||||
self.rest_tasks[symbol].cancel()
|
||||
|
||||
logger.info(f"✅ Stopped REST API fallback for {symbol}")
|
||||
logger.info(f"Stopped REST API fallback for {symbol}")
|
||||
|
||||
async def _rest_fallback_loop(self, symbol: str):
|
||||
"""REST API fallback loop"""
|
||||
@ -328,7 +583,7 @@ class EnhancedCOBWebSocket:
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"❌ REST fallback error for {symbol}: {e}")
|
||||
logger.error(f"REST fallback error for {symbol}: {e}")
|
||||
await asyncio.sleep(5) # Wait longer on error
|
||||
|
||||
async def _fetch_rest_orderbook(self, symbol: str):
|
||||
@ -381,10 +636,10 @@ class EnhancedCOBWebSocket:
|
||||
logger.debug(f"📊 Fetched REST COB data for {symbol}: {len(cob_data['bids'])} bids, {len(cob_data['asks'])} asks")
|
||||
|
||||
else:
|
||||
logger.warning(f"⚠️ REST API error for {symbol}: HTTP {response.status}")
|
||||
logger.warning(f"REST API error for {symbol}: HTTP {response.status}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error fetching REST order book for {symbol}: {e}")
|
||||
logger.error(f"Error fetching REST order book for {symbol}: {e}")
|
||||
|
||||
async def _monitor_connections(self):
|
||||
"""Monitor WebSocket connections and provide status updates"""
|
||||
@ -399,33 +654,40 @@ class EnhancedCOBWebSocket:
|
||||
if status.connected and status.last_message_time:
|
||||
time_since_last = datetime.now() - status.last_message_time
|
||||
if time_since_last > timedelta(seconds=30):
|
||||
logger.warning(f"⚠️ No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s")
|
||||
logger.warning(f"No messages from {symbol} WebSocket for {time_since_last.total_seconds():.0f}s")
|
||||
await self._notify_dashboard_status(symbol, "stale", "No recent messages")
|
||||
|
||||
# Log status
|
||||
if status.connected:
|
||||
logger.debug(f"✅ {symbol}: Connected, {status.messages_received} messages received")
|
||||
logger.debug(f"{symbol}: Connected, {status.messages_received} messages received")
|
||||
elif self.rest_fallback_active[symbol]:
|
||||
logger.debug(f"⚠️ {symbol}: Using REST fallback")
|
||||
logger.debug(f"{symbol}: Using REST fallback")
|
||||
else:
|
||||
logger.debug(f"❌ {symbol}: Disconnected, last error: {status.last_error}")
|
||||
logger.debug(f"{symbol}: Disconnected, last error: {status.last_error}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error in connection monitor: {e}")
|
||||
logger.error(f"Error in connection monitor: {e}")
|
||||
|
||||
async def _notify_dashboard_status(self, symbol: str, status: str, message: str):
|
||||
"""Notify dashboard of status changes"""
|
||||
try:
|
||||
if self.dashboard_callback:
|
||||
await self.dashboard_callback({
|
||||
status_data = {
|
||||
'type': 'cob_status',
|
||||
'symbol': symbol,
|
||||
'status': status,
|
||||
'message': message,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
}
|
||||
|
||||
# Check if callback is async or sync
|
||||
if asyncio.iscoroutinefunction(self.dashboard_callback):
|
||||
await self.dashboard_callback(status_data)
|
||||
else:
|
||||
# Call sync function directly
|
||||
self.dashboard_callback(status_data)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error notifying dashboard: {e}")
|
||||
logger.error(f"Error notifying dashboard: {e}")
|
||||
|
||||
def get_status_summary(self) -> Dict[str, Any]:
|
||||
"""Get status summary for all symbols"""
|
||||
|
@ -948,9 +948,11 @@ class TradingOrchestrator:
|
||||
for model_name in self.model_weights:
|
||||
self.model_weights[model_name] /= total_weight
|
||||
|
||||
def add_decision_callback(self, callback):
|
||||
async def add_decision_callback(self, callback):
|
||||
"""Add a callback function to be called when decisions are made"""
|
||||
self.decision_callbacks.append(callback)
|
||||
logger.info(f"Decision callback registered: {callback.__name__ if hasattr(callback, '__name__') else 'unnamed'}")
|
||||
return True
|
||||
|
||||
async def make_trading_decision(self, symbol: str) -> Optional[TradingDecision]:
|
||||
"""
|
||||
@ -1844,23 +1846,52 @@ class TradingOrchestrator:
|
||||
logger.error(f"Error setting training dashboard: {e}")
|
||||
|
||||
def get_universal_data_stream(self, current_time: Optional[datetime] = None):
|
||||
"""Get universal data stream for external consumers like dashboard"""
|
||||
"""Get universal data stream for external consumers like dashboard - DELEGATED to data provider"""
|
||||
try:
|
||||
return self.universal_adapter.get_universal_data_stream(current_time)
|
||||
if self.data_provider and hasattr(self.data_provider, 'universal_adapter'):
|
||||
return self.data_provider.universal_adapter.get_universal_data_stream(current_time)
|
||||
elif self.universal_adapter:
|
||||
return self.universal_adapter.get_universal_data_stream(current_time)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting universal data stream: {e}")
|
||||
return None
|
||||
|
||||
def get_universal_data_for_model(self, model_type: str = 'cnn') -> Optional[Dict[str, Any]]:
|
||||
"""Get formatted universal data for specific model types"""
|
||||
"""Get formatted universal data for specific model types - DELEGATED to data provider"""
|
||||
try:
|
||||
stream = self.universal_adapter.get_universal_data_stream()
|
||||
if stream:
|
||||
return self.universal_adapter.format_for_model(stream, model_type)
|
||||
if self.data_provider and hasattr(self.data_provider, 'universal_adapter'):
|
||||
stream = self.data_provider.universal_adapter.get_universal_data_stream()
|
||||
if stream:
|
||||
return self.data_provider.universal_adapter.format_for_model(stream, model_type)
|
||||
elif self.universal_adapter:
|
||||
stream = self.universal_adapter.get_universal_data_stream()
|
||||
if stream:
|
||||
return self.universal_adapter.format_for_model(stream, model_type)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting universal data for {model_type}: {e}")
|
||||
return None
|
||||
|
||||
def get_cob_data(self, symbol: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get COB data for symbol - DELEGATED to data provider"""
|
||||
try:
|
||||
if self.data_provider:
|
||||
return self.data_provider.get_latest_cob_data(symbol)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting COB data for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def get_combined_model_data(self, symbol: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get combined OHLCV + COB data for models - DELEGATED to data provider"""
|
||||
try:
|
||||
if self.data_provider:
|
||||
return self.data_provider.get_combined_ohlcv_cob_data(symbol)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting combined model data for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def _get_current_position_pnl(self, symbol: str, current_price: float) -> float:
|
||||
"""Get current position P&L for the symbol"""
|
||||
@ -2120,7 +2151,7 @@ class TradingOrchestrator:
|
||||
# Create state representation
|
||||
state = self._create_state_for_training(symbol, market_data)
|
||||
|
||||
# Map action to DQN action space
|
||||
# Map action to DQN action space - CONSISTENT ACTION MAPPING
|
||||
action_mapping = {'BUY': 0, 'SELL': 1, 'HOLD': 2}
|
||||
dqn_action = action_mapping.get(action, 2)
|
||||
|
||||
|
425
core/shared_data_manager.py
Normal file
425
core/shared_data_manager.py
Normal file
@ -0,0 +1,425 @@
|
||||
"""
|
||||
Shared Data Manager for UI Stability Fix
|
||||
|
||||
Manages data sharing between processes through files with proper locking
|
||||
and atomic operations to prevent corruption and conflicts.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
import platform
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict, Any, Optional, Union
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
# Windows-compatible file locking
|
||||
if platform.system() == "Windows":
|
||||
import msvcrt
|
||||
else:
|
||||
import fcntl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class ProcessStatus:
|
||||
"""Model for process status information"""
|
||||
name: str
|
||||
pid: int
|
||||
status: str # 'running', 'stopped', 'error'
|
||||
start_time: datetime
|
||||
last_heartbeat: datetime
|
||||
memory_usage: float
|
||||
cpu_usage: float
|
||||
error_message: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary with datetime serialization"""
|
||||
data = asdict(self)
|
||||
data['start_time'] = self.start_time.isoformat()
|
||||
data['last_heartbeat'] = self.last_heartbeat.isoformat()
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'ProcessStatus':
|
||||
"""Create from dictionary with datetime deserialization"""
|
||||
data['start_time'] = datetime.fromisoformat(data['start_time'])
|
||||
data['last_heartbeat'] = datetime.fromisoformat(data['last_heartbeat'])
|
||||
return cls(**data)
|
||||
|
||||
@dataclass
|
||||
class TrainingStatus:
|
||||
"""Model for training status information"""
|
||||
is_running: bool
|
||||
current_epoch: int
|
||||
total_epochs: int
|
||||
loss: float
|
||||
accuracy: float
|
||||
last_update: datetime
|
||||
model_path: str
|
||||
error_message: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary with datetime serialization"""
|
||||
data = asdict(self)
|
||||
data['last_update'] = self.last_update.isoformat()
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'TrainingStatus':
|
||||
"""Create from dictionary with datetime deserialization"""
|
||||
data['last_update'] = datetime.fromisoformat(data['last_update'])
|
||||
return cls(**data)
|
||||
|
||||
@dataclass
|
||||
class DashboardState:
|
||||
"""Model for dashboard state information"""
|
||||
is_connected: bool
|
||||
last_data_update: datetime
|
||||
active_connections: int
|
||||
error_count: int
|
||||
performance_metrics: Dict[str, float]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary with datetime serialization"""
|
||||
data = asdict(self)
|
||||
data['last_data_update'] = self.last_data_update.isoformat()
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'DashboardState':
|
||||
"""Create from dictionary with datetime deserialization"""
|
||||
data['last_data_update'] = datetime.fromisoformat(data['last_data_update'])
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class SharedDataManager:
|
||||
"""
|
||||
Manages data sharing between processes through files with proper locking
|
||||
and atomic operations to prevent corruption and conflicts.
|
||||
"""
|
||||
|
||||
def __init__(self, data_dir: str = "shared_data"):
|
||||
"""
|
||||
Initialize the shared data manager
|
||||
|
||||
Args:
|
||||
data_dir: Directory to store shared data files
|
||||
"""
|
||||
self.data_dir = Path(data_dir)
|
||||
self.data_dir.mkdir(exist_ok=True)
|
||||
|
||||
# Define file paths for different data types
|
||||
self.training_status_file = self.data_dir / "training_status.json"
|
||||
self.dashboard_state_file = self.data_dir / "dashboard_state.json"
|
||||
self.process_status_file = self.data_dir / "process_status.json"
|
||||
self.market_data_file = self.data_dir / "market_data.json"
|
||||
self.model_metrics_file = self.data_dir / "model_metrics.json"
|
||||
|
||||
logger.info(f"SharedDataManager initialized with data directory: {self.data_dir}")
|
||||
|
||||
def _lock_file(self, file_handle, exclusive=True):
|
||||
"""Cross-platform file locking"""
|
||||
if platform.system() == "Windows":
|
||||
# Windows file locking
|
||||
try:
|
||||
if exclusive:
|
||||
msvcrt.locking(file_handle.fileno(), msvcrt.LK_LOCK, 1)
|
||||
else:
|
||||
msvcrt.locking(file_handle.fileno(), msvcrt.LK_LOCK, 1)
|
||||
except IOError:
|
||||
pass # File locking may not be available in all scenarios
|
||||
else:
|
||||
# Unix file locking
|
||||
lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
|
||||
fcntl.flock(file_handle.fileno(), lock_type)
|
||||
|
||||
def _unlock_file(self, file_handle):
|
||||
"""Cross-platform file unlocking"""
|
||||
if platform.system() == "Windows":
|
||||
try:
|
||||
msvcrt.locking(file_handle.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
except IOError:
|
||||
pass
|
||||
else:
|
||||
fcntl.flock(file_handle.fileno(), fcntl.LOCK_UN)
|
||||
|
||||
def _write_json_atomic(self, file_path: Path, data: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write JSON data atomically with file locking
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to write
|
||||
data: Data to write as JSON
|
||||
"""
|
||||
temp_path = None
|
||||
try:
|
||||
# Create temporary file in the same directory
|
||||
temp_fd, temp_path = tempfile.mkstemp(
|
||||
dir=file_path.parent,
|
||||
prefix=f".{file_path.name}.",
|
||||
suffix=".tmp"
|
||||
)
|
||||
|
||||
with os.fdopen(temp_fd, 'w') as temp_file:
|
||||
# Lock the temporary file
|
||||
self._lock_file(temp_file, exclusive=True)
|
||||
|
||||
# Write data with proper formatting
|
||||
json.dump(data, temp_file, indent=2, default=str)
|
||||
temp_file.flush()
|
||||
os.fsync(temp_file.fileno())
|
||||
|
||||
# Unlock before closing
|
||||
self._unlock_file(temp_file)
|
||||
|
||||
# Atomically replace the original file
|
||||
os.replace(temp_path, file_path)
|
||||
logger.debug(f"Successfully wrote data to {file_path}")
|
||||
|
||||
except Exception as e:
|
||||
# Clean up temporary file if it exists
|
||||
if temp_path:
|
||||
try:
|
||||
os.unlink(temp_path)
|
||||
except:
|
||||
pass
|
||||
logger.error(f"Failed to write data to {file_path}: {e}")
|
||||
raise
|
||||
|
||||
def _read_json_safe(self, file_path: Path) -> Dict[str, Any]:
|
||||
"""
|
||||
Read JSON data safely with file locking
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to read
|
||||
|
||||
Returns:
|
||||
Dictionary containing the JSON data
|
||||
"""
|
||||
if not file_path.exists():
|
||||
logger.debug(f"File {file_path} does not exist, returning empty dict")
|
||||
return {}
|
||||
|
||||
try:
|
||||
with open(file_path, 'r') as file:
|
||||
# Lock the file for reading
|
||||
self._lock_file(file, exclusive=False)
|
||||
data = json.load(file)
|
||||
self._unlock_file(file)
|
||||
logger.debug(f"Successfully read data from {file_path}")
|
||||
return data
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON in {file_path}: {e}")
|
||||
return {}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read data from {file_path}: {e}")
|
||||
return {}
|
||||
|
||||
def write_training_status(self, status: TrainingStatus) -> None:
|
||||
"""
|
||||
Write training status to shared file
|
||||
|
||||
Args:
|
||||
status: TrainingStatus object to write
|
||||
"""
|
||||
try:
|
||||
data = status.to_dict()
|
||||
self._write_json_atomic(self.training_status_file, data)
|
||||
logger.debug("Training status written successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write training status: {e}")
|
||||
raise
|
||||
|
||||
def read_training_status(self) -> Optional[TrainingStatus]:
|
||||
"""
|
||||
Read training status from shared file
|
||||
|
||||
Returns:
|
||||
TrainingStatus object or None if not available
|
||||
"""
|
||||
try:
|
||||
data = self._read_json_safe(self.training_status_file)
|
||||
if not data:
|
||||
return None
|
||||
return TrainingStatus.from_dict(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read training status: {e}")
|
||||
return None
|
||||
|
||||
def write_dashboard_state(self, state: DashboardState) -> None:
|
||||
"""
|
||||
Write dashboard state to shared file
|
||||
|
||||
Args:
|
||||
state: DashboardState object to write
|
||||
"""
|
||||
try:
|
||||
data = state.to_dict()
|
||||
self._write_json_atomic(self.dashboard_state_file, data)
|
||||
logger.debug("Dashboard state written successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write dashboard state: {e}")
|
||||
raise
|
||||
|
||||
def read_dashboard_state(self) -> Optional[DashboardState]:
|
||||
"""
|
||||
Read dashboard state from shared file
|
||||
|
||||
Returns:
|
||||
DashboardState object or None if not available
|
||||
"""
|
||||
try:
|
||||
data = self._read_json_safe(self.dashboard_state_file)
|
||||
if not data:
|
||||
return None
|
||||
return DashboardState.from_dict(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read dashboard state: {e}")
|
||||
return None
|
||||
|
||||
def write_process_status(self, status: ProcessStatus) -> None:
|
||||
"""
|
||||
Write process status to shared file
|
||||
|
||||
Args:
|
||||
status: ProcessStatus object to write
|
||||
"""
|
||||
try:
|
||||
data = status.to_dict()
|
||||
self._write_json_atomic(self.process_status_file, data)
|
||||
logger.debug("Process status written successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write process status: {e}")
|
||||
raise
|
||||
|
||||
def read_process_status(self) -> Optional[ProcessStatus]:
|
||||
"""
|
||||
Read process status from shared file
|
||||
|
||||
Returns:
|
||||
ProcessStatus object or None if not available
|
||||
"""
|
||||
try:
|
||||
data = self._read_json_safe(self.process_status_file)
|
||||
if not data:
|
||||
return None
|
||||
return ProcessStatus.from_dict(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read process status: {e}")
|
||||
return None
|
||||
|
||||
def write_market_data(self, data: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write market data to shared file
|
||||
|
||||
Args:
|
||||
data: Market data dictionary to write
|
||||
"""
|
||||
try:
|
||||
# Add timestamp to market data
|
||||
data['timestamp'] = datetime.now().isoformat()
|
||||
self._write_json_atomic(self.market_data_file, data)
|
||||
logger.debug("Market data written successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write market data: {e}")
|
||||
raise
|
||||
|
||||
def read_market_data(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Read market data from shared file
|
||||
|
||||
Returns:
|
||||
Dictionary containing market data
|
||||
"""
|
||||
try:
|
||||
return self._read_json_safe(self.market_data_file)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read market data: {e}")
|
||||
return {}
|
||||
|
||||
def write_model_metrics(self, metrics: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write model metrics to shared file
|
||||
|
||||
Args:
|
||||
metrics: Model metrics dictionary to write
|
||||
"""
|
||||
try:
|
||||
# Add timestamp to metrics
|
||||
metrics['timestamp'] = datetime.now().isoformat()
|
||||
self._write_json_atomic(self.model_metrics_file, metrics)
|
||||
logger.debug("Model metrics written successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write model metrics: {e}")
|
||||
raise
|
||||
|
||||
def read_model_metrics(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Read model metrics from shared file
|
||||
|
||||
Returns:
|
||||
Dictionary containing model metrics
|
||||
"""
|
||||
try:
|
||||
return self._read_json_safe(self.model_metrics_file)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to read model metrics: {e}")
|
||||
return {}
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""
|
||||
Clean up shared data files
|
||||
"""
|
||||
try:
|
||||
for file_path in [
|
||||
self.training_status_file,
|
||||
self.dashboard_state_file,
|
||||
self.process_status_file,
|
||||
self.market_data_file,
|
||||
self.model_metrics_file
|
||||
]:
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
logger.debug(f"Removed {file_path}")
|
||||
|
||||
# Remove directory if empty
|
||||
if self.data_dir.exists() and not any(self.data_dir.iterdir()):
|
||||
self.data_dir.rmdir()
|
||||
logger.debug(f"Removed empty directory {self.data_dir}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to cleanup shared data: {e}")
|
||||
|
||||
def get_data_age(self, data_type: str) -> Optional[float]:
|
||||
"""
|
||||
Get the age of data in seconds
|
||||
|
||||
Args:
|
||||
data_type: Type of data ('training', 'dashboard', 'process', 'market', 'metrics')
|
||||
|
||||
Returns:
|
||||
Age in seconds or None if file doesn't exist
|
||||
"""
|
||||
file_map = {
|
||||
'training': self.training_status_file,
|
||||
'dashboard': self.dashboard_state_file,
|
||||
'process': self.process_status_file,
|
||||
'market': self.market_data_file,
|
||||
'metrics': self.model_metrics_file
|
||||
}
|
||||
|
||||
file_path = file_map.get(data_type)
|
||||
if not file_path or not file_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
mtime = file_path.stat().st_mtime
|
||||
return time.time() - mtime
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get data age for {data_type}: {e}")
|
||||
return None
|
Reference in New Issue
Block a user