""" Unified Data Stream Architecture for Dashboard and Enhanced RL Training This module provides a centralized data streaming architecture that: 1. Serves real-time data to the dashboard UI 2. Feeds the enhanced RL training pipeline with comprehensive data 3. Maintains data consistency across all consumers 4. Provides efficient data distribution without duplication 5. Supports multiple data consumers with different requirements Key Features: - Single source of truth for all market data - Real-time tick processing and aggregation - Multi-timeframe OHLCV generation - CNN feature extraction and caching - RL state building with comprehensive data - Dashboard-ready formatted data - Training data collection and buffering """ import asyncio import logging import time import numpy as np import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Callable from dataclasses import dataclass, field from collections import deque from threading import Thread, Lock import json from .config import get_config from .data_provider import DataProvider, MarketTick from .universal_data_adapter import UniversalDataAdapter, UniversalDataStream from .enhanced_orchestrator import MarketState, TradingAction logger = logging.getLogger(__name__) @dataclass class StreamConsumer: """Data stream consumer configuration""" consumer_id: str consumer_name: str callback: Callable[[Dict[str, Any]], None] data_types: List[str] # ['ticks', 'ohlcv', 'training_data', 'ui_data'] active: bool = True last_update: datetime = field(default_factory=datetime.now) update_count: int = 0 @dataclass class TrainingDataPacket: """Training data packet for RL pipeline""" timestamp: datetime symbol: str tick_cache: List[Dict[str, Any]] one_second_bars: List[Dict[str, Any]] multi_timeframe_data: Dict[str, List[Dict[str, Any]]] cnn_features: Optional[Dict[str, np.ndarray]] cnn_predictions: Optional[Dict[str, np.ndarray]] market_state: Optional[MarketState] universal_stream: Optional[UniversalDataStream] @dataclass class UIDataPacket: """UI data packet for dashboard""" timestamp: datetime current_prices: Dict[str, float] tick_cache_size: int one_second_bars_count: int streaming_status: str training_data_available: bool model_training_status: Dict[str, Any] orchestrator_status: Dict[str, Any] class UnifiedDataStream: """ Unified data stream manager for dashboard and training pipeline integration """ def __init__(self, data_provider: DataProvider, orchestrator=None): """Initialize unified data stream""" self.config = get_config() self.data_provider = data_provider self.orchestrator = orchestrator # Initialize universal data adapter self.universal_adapter = UniversalDataAdapter(data_provider) # Data consumers registry self.consumers: Dict[str, StreamConsumer] = {} self.consumer_lock = Lock() # Data buffers for different consumers self.tick_cache = deque(maxlen=5000) # Raw tick cache self.one_second_bars = deque(maxlen=1000) # 1s OHLCV bars self.training_data_buffer = deque(maxlen=100) # Training data packets self.ui_data_buffer = deque(maxlen=50) # UI data packets # Multi-timeframe data storage self.multi_timeframe_data = { 'ETH/USDT': { '1s': deque(maxlen=300), '1m': deque(maxlen=300), '1h': deque(maxlen=300), '1d': deque(maxlen=300) }, 'BTC/USDT': { '1s': deque(maxlen=300), '1m': deque(maxlen=300), '1h': deque(maxlen=300), '1d': deque(maxlen=300) } } # CNN features cache self.cnn_features_cache = {} self.cnn_predictions_cache = {} # Stream status self.streaming = False self.stream_thread = None # Performance tracking self.stream_stats = { 'total_ticks_processed': 0, 'total_packets_sent': 0, 'consumers_served': 0, 'last_tick_time': None, 'processing_errors': 0, 'data_quality_score': 1.0 } # Data validation self.last_prices = {} self.price_change_threshold = 0.1 # 10% change threshold logger.info("Unified Data Stream initialized") logger.info(f"Symbols: {self.config.symbols}") logger.info(f"Timeframes: {self.config.timeframes}") def register_consumer(self, consumer_name: str, callback: Callable[[Dict[str, Any]], None], data_types: List[str]) -> str: """Register a data consumer""" consumer_id = f"{consumer_name}_{int(time.time())}" with self.consumer_lock: consumer = StreamConsumer( consumer_id=consumer_id, consumer_name=consumer_name, callback=callback, data_types=data_types ) self.consumers[consumer_id] = consumer logger.info(f"Registered consumer: {consumer_name} ({consumer_id})") logger.info(f"Data types: {data_types}") return consumer_id def unregister_consumer(self, consumer_id: str): """Unregister a data consumer""" with self.consumer_lock: if consumer_id in self.consumers: consumer = self.consumers.pop(consumer_id) logger.info(f"Unregistered consumer: {consumer.consumer_name} ({consumer_id})") async def start_streaming(self): """Start unified data streaming""" if self.streaming: logger.warning("Data streaming already active") return self.streaming = True # Subscribe to data provider ticks self.data_provider.subscribe_to_ticks( callback=self._handle_tick, symbols=self.config.symbols, subscriber_name="UnifiedDataStream" ) # Start background processing self.stream_thread = Thread(target=self._stream_processor, daemon=True) self.stream_thread.start() logger.info("Unified data streaming started") async def stop_streaming(self): """Stop unified data streaming""" self.streaming = False if self.stream_thread: self.stream_thread.join(timeout=5) logger.info("Unified data streaming stopped") def _handle_tick(self, tick: MarketTick): """Handle incoming tick data""" try: # Validate tick data if not self._validate_tick(tick): return # Add to tick cache tick_data = { 'symbol': tick.symbol, 'timestamp': tick.timestamp, 'price': tick.price, 'volume': tick.volume, 'quantity': tick.quantity, 'side': tick.side } self.tick_cache.append(tick_data) # Update current prices self.last_prices[tick.symbol] = tick.price # Generate 1s bars if needed self._update_one_second_bars(tick_data) # Update multi-timeframe data self._update_multi_timeframe_data(tick_data) # Update statistics self.stream_stats['total_ticks_processed'] += 1 self.stream_stats['last_tick_time'] = tick.timestamp except Exception as e: logger.error(f"Error handling tick: {e}") self.stream_stats['processing_errors'] += 1 def _validate_tick(self, tick: MarketTick) -> bool: """Validate tick data quality""" try: # Check for valid price if tick.price <= 0: return False # Check for reasonable price change if tick.symbol in self.last_prices: last_price = self.last_prices[tick.symbol] if last_price > 0: price_change = abs(tick.price - last_price) / last_price if price_change > self.price_change_threshold: logger.warning(f"Large price change detected for {tick.symbol}: {price_change:.2%}") return False # Check timestamp if tick.timestamp > datetime.now() + timedelta(seconds=10): return False return True except Exception as e: logger.error(f"Error validating tick: {e}") return False def _update_one_second_bars(self, tick_data: Dict[str, Any]): """Update 1-second OHLCV bars""" try: symbol = tick_data['symbol'] price = tick_data['price'] volume = tick_data['volume'] timestamp = tick_data['timestamp'] # Round timestamp to nearest second bar_timestamp = timestamp.replace(microsecond=0) # Check if we need a new bar if (not self.one_second_bars or self.one_second_bars[-1]['timestamp'] != bar_timestamp or self.one_second_bars[-1]['symbol'] != symbol): # Create new 1s bar bar_data = { 'symbol': symbol, 'timestamp': bar_timestamp, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': volume } self.one_second_bars.append(bar_data) else: # Update existing bar bar = self.one_second_bars[-1] bar['high'] = max(bar['high'], price) bar['low'] = min(bar['low'], price) bar['close'] = price bar['volume'] += volume except Exception as e: logger.error(f"Error updating 1s bars: {e}") def _update_multi_timeframe_data(self, tick_data: Dict[str, Any]): """Update multi-timeframe OHLCV data""" try: symbol = tick_data['symbol'] if symbol not in self.multi_timeframe_data: return # Update each timeframe for timeframe in ['1s', '1m', '1h', '1d']: self._update_timeframe_bar(symbol, timeframe, tick_data) except Exception as e: logger.error(f"Error updating multi-timeframe data: {e}") def _update_timeframe_bar(self, symbol: str, timeframe: str, tick_data: Dict[str, Any]): """Update specific timeframe bar""" try: price = tick_data['price'] volume = tick_data['volume'] timestamp = tick_data['timestamp'] # Calculate bar timestamp based on timeframe if timeframe == '1s': bar_timestamp = timestamp.replace(microsecond=0) elif timeframe == '1m': bar_timestamp = timestamp.replace(second=0, microsecond=0) elif timeframe == '1h': bar_timestamp = timestamp.replace(minute=0, second=0, microsecond=0) elif timeframe == '1d': bar_timestamp = timestamp.replace(hour=0, minute=0, second=0, microsecond=0) else: return timeframe_buffer = self.multi_timeframe_data[symbol][timeframe] # Check if we need a new bar if (not timeframe_buffer or timeframe_buffer[-1]['timestamp'] != bar_timestamp): # Create new bar bar_data = { 'timestamp': bar_timestamp, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': volume } timeframe_buffer.append(bar_data) else: # Update existing bar bar = timeframe_buffer[-1] bar['high'] = max(bar['high'], price) bar['low'] = min(bar['low'], price) bar['close'] = price bar['volume'] += volume except Exception as e: logger.error(f"Error updating {timeframe} bar for {symbol}: {e}") def _stream_processor(self): """Background stream processor""" logger.info("Stream processor started") while self.streaming: try: # Process training data packets self._process_training_data() # Process UI data packets self._process_ui_data() # Update CNN features if orchestrator available if self.orchestrator: self._update_cnn_features() # Distribute data to consumers self._distribute_data() # Sleep briefly time.sleep(0.1) # 100ms processing cycle except Exception as e: logger.error(f"Error in stream processor: {e}") time.sleep(1) logger.info("Stream processor stopped") def _process_training_data(self): """Process and package training data""" try: if len(self.tick_cache) < 10: # Need minimum data return # Create training data packet training_packet = TrainingDataPacket( timestamp=datetime.now(), symbol='ETH/USDT', # Primary symbol tick_cache=list(self.tick_cache)[-300:], # Last 300 ticks one_second_bars=list(self.one_second_bars)[-300:], # Last 300 1s bars multi_timeframe_data=self._get_multi_timeframe_snapshot(), cnn_features=self.cnn_features_cache.copy(), cnn_predictions=self.cnn_predictions_cache.copy(), market_state=self._build_market_state(), universal_stream=self._get_universal_stream() ) self.training_data_buffer.append(training_packet) except Exception as e: logger.error(f"Error processing training data: {e}") def _process_ui_data(self): """Process and package UI data""" try: # Create UI data packet ui_packet = UIDataPacket( timestamp=datetime.now(), current_prices=self.last_prices.copy(), tick_cache_size=len(self.tick_cache), one_second_bars_count=len(self.one_second_bars), streaming_status='LIVE' if self.streaming else 'STOPPED', training_data_available=len(self.training_data_buffer) > 0, model_training_status=self._get_model_training_status(), orchestrator_status=self._get_orchestrator_status() ) self.ui_data_buffer.append(ui_packet) except Exception as e: logger.error(f"Error processing UI data: {e}") def _update_cnn_features(self): """Update CNN features cache""" try: if not self.orchestrator: return # Get CNN features from orchestrator for symbol in self.config.symbols: if hasattr(self.orchestrator, '_get_cnn_features_for_rl'): hidden_features, predictions = self.orchestrator._get_cnn_features_for_rl(symbol) if hidden_features: self.cnn_features_cache[symbol] = hidden_features if predictions: self.cnn_predictions_cache[symbol] = predictions except Exception as e: logger.error(f"Error updating CNN features: {e}") def _distribute_data(self): """Distribute data to registered consumers""" try: with self.consumer_lock: for consumer_id, consumer in self.consumers.items(): if not consumer.active: continue try: # Prepare data based on consumer requirements data_packet = self._prepare_consumer_data(consumer) if data_packet: # Send data to consumer consumer.callback(data_packet) consumer.update_count += 1 consumer.last_update = datetime.now() except Exception as e: logger.error(f"Error sending data to consumer {consumer.consumer_name}: {e}") consumer.active = False self.stream_stats['consumers_served'] = len([c for c in self.consumers.values() if c.active]) except Exception as e: logger.error(f"Error distributing data: {e}") def _prepare_consumer_data(self, consumer: StreamConsumer) -> Optional[Dict[str, Any]]: """Prepare data packet for specific consumer""" try: data_packet = { 'timestamp': datetime.now(), 'consumer_id': consumer.consumer_id, 'consumer_name': consumer.consumer_name } # Add requested data types if 'ticks' in consumer.data_types: data_packet['ticks'] = list(self.tick_cache)[-100:] # Last 100 ticks if 'ohlcv' in consumer.data_types: data_packet['one_second_bars'] = list(self.one_second_bars)[-100:] data_packet['multi_timeframe'] = self._get_multi_timeframe_snapshot() if 'training_data' in consumer.data_types: if self.training_data_buffer: data_packet['training_data'] = self.training_data_buffer[-1] if 'ui_data' in consumer.data_types: if self.ui_data_buffer: data_packet['ui_data'] = self.ui_data_buffer[-1] return data_packet except Exception as e: logger.error(f"Error preparing data for consumer {consumer.consumer_name}: {e}") return None def _get_multi_timeframe_snapshot(self) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: """Get snapshot of multi-timeframe data""" snapshot = {} for symbol, timeframes in self.multi_timeframe_data.items(): snapshot[symbol] = {} for timeframe, data in timeframes.items(): snapshot[symbol][timeframe] = list(data) return snapshot def _build_market_state(self) -> Optional[MarketState]: """Build market state for training""" try: if not self.orchestrator: return None # Get universal stream universal_stream = self._get_universal_stream() if not universal_stream: return None # Build market state using orchestrator symbol = 'ETH/USDT' current_price = self.last_prices.get(symbol, 0.0) market_state = MarketState( symbol=symbol, timestamp=datetime.now(), prices={'current': current_price}, features={}, volatility=0.0, volume=0.0, trend_strength=0.0, market_regime='unknown', universal_data=universal_stream, raw_ticks=list(self.tick_cache)[-300:], ohlcv_data=self._get_multi_timeframe_snapshot(), btc_reference_data=self._get_btc_reference_data(), cnn_hidden_features=self.cnn_features_cache.copy(), cnn_predictions=self.cnn_predictions_cache.copy() ) return market_state except Exception as e: logger.error(f"Error building market state: {e}") return None def _get_universal_stream(self) -> Optional[UniversalDataStream]: """Get universal data stream""" try: if self.universal_adapter: return self.universal_adapter.get_universal_stream() return None except Exception as e: logger.error(f"Error getting universal stream: {e}") return None def _get_btc_reference_data(self) -> Dict[str, List[Dict[str, Any]]]: """Get BTC reference data""" btc_data = {} if 'BTC/USDT' in self.multi_timeframe_data: for timeframe, data in self.multi_timeframe_data['BTC/USDT'].items(): btc_data[timeframe] = list(data) return btc_data def _get_model_training_status(self) -> Dict[str, Any]: """Get model training status""" try: if self.orchestrator and hasattr(self.orchestrator, 'get_performance_metrics'): return self.orchestrator.get_performance_metrics() return { 'cnn_status': 'TRAINING', 'rl_status': 'TRAINING', 'data_available': len(self.training_data_buffer) > 0 } except Exception as e: logger.error(f"Error getting model training status: {e}") return {} def _get_orchestrator_status(self) -> Dict[str, Any]: """Get orchestrator status""" try: if self.orchestrator: return { 'active': True, 'symbols': self.config.symbols, 'streaming': self.streaming, 'tick_processor_active': hasattr(self.orchestrator, 'tick_processor') } return {'active': False} except Exception as e: logger.error(f"Error getting orchestrator status: {e}") return {'active': False} def get_stream_stats(self) -> Dict[str, Any]: """Get stream statistics""" stats = self.stream_stats.copy() stats.update({ 'tick_cache_size': len(self.tick_cache), 'one_second_bars_count': len(self.one_second_bars), 'training_data_packets': len(self.training_data_buffer), 'ui_data_packets': len(self.ui_data_buffer), 'active_consumers': len([c for c in self.consumers.values() if c.active]), 'total_consumers': len(self.consumers) }) return stats def get_latest_training_data(self) -> Optional[TrainingDataPacket]: """Get latest training data packet""" if self.training_data_buffer: return self.training_data_buffer[-1] return None def get_latest_ui_data(self) -> Optional[UIDataPacket]: """Get latest UI data packet""" if self.ui_data_buffer: return self.ui_data_buffer[-1] return None