#!/usr/bin/env python3 """ Shared COB Service - Eliminates Redundant COB Implementations This service provides a singleton COB integration that can be shared across: - Dashboard components - RL trading systems - Enhanced orchestrators - Training pipelines Instead of each component creating its own COBIntegration instance, they all share this single service, eliminating redundant connections. """ import asyncio import logging import weakref from typing import Dict, List, Optional, Any, Callable, Set from datetime import datetime from threading import Lock from dataclasses import dataclass from .cob_integration import COBIntegration from .multi_exchange_cob_provider import COBSnapshot from .data_provider import DataProvider logger = logging.getLogger(__name__) @dataclass class COBSubscription: """Represents a subscription to COB updates""" subscriber_id: str callback: Callable symbol_filter: Optional[List[str]] = None callback_type: str = "general" # general, cnn, dqn, dashboard class SharedCOBService: """ Shared COB Service - Singleton pattern for unified COB data access This service eliminates redundant COB integrations by providing a single shared instance that all components can subscribe to. """ _instance: Optional['SharedCOBService'] = None _lock = Lock() def __new__(cls, *args, **kwargs): """Singleton pattern implementation""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super(SharedCOBService, cls).__new__(cls) return cls._instance def __init__(self, symbols: Optional[List[str]] = None, data_provider: Optional[DataProvider] = None): """Initialize shared COB service (only called once due to singleton)""" if hasattr(self, '_initialized'): return self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] self.data_provider = data_provider # Single COB integration instance self.cob_integration: Optional[COBIntegration] = None self.is_running = False # Subscriber management self.subscribers: Dict[str, COBSubscription] = {} self.subscriber_counter = 0 self.subscription_lock = Lock() # Cached data for immediate access self.latest_snapshots: Dict[str, COBSnapshot] = {} self.latest_cnn_features: Dict[str, Any] = {} self.latest_dqn_states: Dict[str, Any] = {} # Performance tracking self.total_subscribers = 0 self.update_count = 0 self.start_time = None self._initialized = True logger.info(f"SharedCOBService initialized for symbols: {self.symbols}") async def start(self) -> None: """Start the shared COB service""" if self.is_running: logger.warning("SharedCOBService already running") return logger.info("Starting SharedCOBService...") try: # Initialize COB integration if not already done if self.cob_integration is None: self.cob_integration = COBIntegration( data_provider=self.data_provider, symbols=self.symbols ) # Register internal callbacks self.cob_integration.add_cnn_callback(self._on_cob_cnn_update) self.cob_integration.add_dqn_callback(self._on_cob_dqn_update) self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_update) # Start COB integration await self.cob_integration.start() self.is_running = True self.start_time = datetime.now() logger.info("SharedCOBService started successfully") logger.info(f"Active subscribers: {len(self.subscribers)}") except Exception as e: logger.error(f"Error starting SharedCOBService: {e}") raise async def stop(self) -> None: """Stop the shared COB service""" if not self.is_running: return logger.info("Stopping SharedCOBService...") try: if self.cob_integration: await self.cob_integration.stop() self.is_running = False # Notify all subscribers of shutdown for subscription in self.subscribers.values(): try: if hasattr(subscription.callback, '__call__'): subscription.callback("SHUTDOWN", None) except Exception as e: logger.warning(f"Error notifying subscriber {subscription.subscriber_id}: {e}") logger.info("SharedCOBService stopped") except Exception as e: logger.error(f"Error stopping SharedCOBService: {e}") def subscribe(self, callback: Callable, callback_type: str = "general", symbol_filter: Optional[List[str]] = None, subscriber_name: str = None) -> str: """ Subscribe to COB updates Args: callback: Function to call on updates callback_type: Type of callback ('general', 'cnn', 'dqn', 'dashboard') symbol_filter: Only receive updates for these symbols (None = all) subscriber_name: Optional name for the subscriber Returns: Subscription ID for unsubscribing """ with self.subscription_lock: self.subscriber_counter += 1 subscriber_id = f"{callback_type}_{self.subscriber_counter}" if subscriber_name: subscriber_id = f"{subscriber_name}_{subscriber_id}" subscription = COBSubscription( subscriber_id=subscriber_id, callback=callback, symbol_filter=symbol_filter, callback_type=callback_type ) self.subscribers[subscriber_id] = subscription self.total_subscribers += 1 logger.info(f"New subscriber: {subscriber_id} ({callback_type})") logger.info(f"Total active subscribers: {len(self.subscribers)}") return subscriber_id def unsubscribe(self, subscriber_id: str) -> bool: """ Unsubscribe from COB updates Args: subscriber_id: ID returned from subscribe() Returns: True if successfully unsubscribed """ with self.subscription_lock: if subscriber_id in self.subscribers: del self.subscribers[subscriber_id] logger.info(f"Unsubscribed: {subscriber_id}") logger.info(f"Remaining subscribers: {len(self.subscribers)}") return True else: logger.warning(f"Subscriber not found: {subscriber_id}") return False # Internal callback handlers async def _on_cob_cnn_update(self, symbol: str, data: Dict): """Handle CNN feature updates from COB integration""" try: self.latest_cnn_features[symbol] = data await self._notify_subscribers("cnn", symbol, data) except Exception as e: logger.error(f"Error in CNN update handler: {e}") async def _on_cob_dqn_update(self, symbol: str, data: Dict): """Handle DQN state updates from COB integration""" try: self.latest_dqn_states[symbol] = data await self._notify_subscribers("dqn", symbol, data) except Exception as e: logger.error(f"Error in DQN update handler: {e}") async def _on_cob_dashboard_update(self, symbol: str, data: Dict): """Handle dashboard updates from COB integration""" try: # Store snapshot if it's a COBSnapshot if hasattr(data, 'volume_weighted_mid'): # Duck typing for COBSnapshot self.latest_snapshots[symbol] = data await self._notify_subscribers("dashboard", symbol, data) await self._notify_subscribers("general", symbol, data) self.update_count += 1 except Exception as e: logger.error(f"Error in dashboard update handler: {e}") async def _notify_subscribers(self, callback_type: str, symbol: str, data: Any): """Notify all relevant subscribers of an update""" try: relevant_subscribers = [ sub for sub in self.subscribers.values() if (sub.callback_type == callback_type or sub.callback_type == "general") and (sub.symbol_filter is None or symbol in sub.symbol_filter) ] for subscription in relevant_subscribers: try: if asyncio.iscoroutinefunction(subscription.callback): asyncio.create_task(subscription.callback(symbol, data)) else: subscription.callback(symbol, data) except Exception as e: logger.warning(f"Error notifying subscriber {subscription.subscriber_id}: {e}") except Exception as e: logger.error(f"Error notifying subscribers: {e}") # Public data access methods def get_cob_snapshot(self, symbol: str) -> Optional[COBSnapshot]: """Get latest COB snapshot for a symbol""" if self.cob_integration: return self.cob_integration.get_cob_snapshot(symbol) return self.latest_snapshots.get(symbol) def get_cnn_features(self, symbol: str) -> Optional[Any]: """Get latest CNN features for a symbol""" return self.latest_cnn_features.get(symbol) def get_dqn_state(self, symbol: str) -> Optional[Any]: """Get latest DQN state for a symbol""" return self.latest_dqn_states.get(symbol) def get_market_depth_analysis(self, symbol: str) -> Optional[Dict]: """Get detailed market depth analysis""" if self.cob_integration: return self.cob_integration.get_market_depth_analysis(symbol) return None def get_exchange_breakdown(self, symbol: str) -> Optional[Dict]: """Get liquidity breakdown by exchange""" if self.cob_integration: return self.cob_integration.get_exchange_breakdown(symbol) return None def get_price_buckets(self, symbol: str) -> Optional[Dict]: """Get fine-grain price buckets""" if self.cob_integration: return self.cob_integration.get_price_buckets(symbol) return None def get_session_volume_profile(self, symbol: str) -> Optional[Dict]: """Get session volume profile""" if self.cob_integration and hasattr(self.cob_integration.cob_provider, 'get_session_volume_profile'): return self.cob_integration.cob_provider.get_session_volume_profile(symbol) return None def get_realtime_stats_for_nn(self, symbol: str) -> Dict: """Get real-time statistics formatted for NN models""" if self.cob_integration: return self.cob_integration.get_realtime_stats_for_nn(symbol) return {} def get_service_statistics(self) -> Dict[str, Any]: """Get service statistics""" uptime = None if self.start_time: uptime = (datetime.now() - self.start_time).total_seconds() base_stats = { 'service_name': 'SharedCOBService', 'is_running': self.is_running, 'symbols': self.symbols, 'total_subscribers': len(self.subscribers), 'lifetime_subscribers': self.total_subscribers, 'update_count': self.update_count, 'uptime_seconds': uptime, 'subscribers_by_type': {} } # Count subscribers by type for subscription in self.subscribers.values(): callback_type = subscription.callback_type if callback_type not in base_stats['subscribers_by_type']: base_stats['subscribers_by_type'][callback_type] = 0 base_stats['subscribers_by_type'][callback_type] += 1 # Get COB integration stats if available if self.cob_integration: cob_stats = self.cob_integration.get_statistics() base_stats.update(cob_stats) return base_stats # Global service instance access functions def get_shared_cob_service(symbols: List[str] = None, data_provider: DataProvider = None) -> SharedCOBService: """Get the shared COB service instance""" return SharedCOBService(symbols=symbols, data_provider=data_provider) async def start_shared_cob_service(symbols: List[str] = None, data_provider: DataProvider = None) -> SharedCOBService: """Start the shared COB service""" service = get_shared_cob_service(symbols=symbols, data_provider=data_provider) await service.start() return service async def stop_shared_cob_service(): """Stop the shared COB service""" service = get_shared_cob_service() await service.stop()