gogo2/core/unified_data_stream.py
2025-05-28 23:42:06 +03:00

627 lines
23 KiB
Python

"""
Unified Data Stream Architecture for Dashboard and Enhanced RL Training
This module provides a centralized data streaming architecture that:
1. Serves real-time data to the dashboard UI
2. Feeds the enhanced RL training pipeline with comprehensive data
3. Maintains data consistency across all consumers
4. Provides efficient data distribution without duplication
5. Supports multiple data consumers with different requirements
Key Features:
- Single source of truth for all market data
- Real-time tick processing and aggregation
- Multi-timeframe OHLCV generation
- CNN feature extraction and caching
- RL state building with comprehensive data
- Dashboard-ready formatted data
- Training data collection and buffering
"""
import asyncio
import logging
import time
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
from collections import deque
from threading import Thread, Lock
import json
from .config import get_config
from .data_provider import DataProvider, MarketTick
from .universal_data_adapter import UniversalDataAdapter, UniversalDataStream
from .enhanced_orchestrator import MarketState, TradingAction
logger = logging.getLogger(__name__)
@dataclass
class StreamConsumer:
"""Data stream consumer configuration"""
consumer_id: str
consumer_name: str
callback: Callable[[Dict[str, Any]], None]
data_types: List[str] # ['ticks', 'ohlcv', 'training_data', 'ui_data']
active: bool = True
last_update: datetime = field(default_factory=datetime.now)
update_count: int = 0
@dataclass
class TrainingDataPacket:
"""Training data packet for RL pipeline"""
timestamp: datetime
symbol: str
tick_cache: List[Dict[str, Any]]
one_second_bars: List[Dict[str, Any]]
multi_timeframe_data: Dict[str, List[Dict[str, Any]]]
cnn_features: Optional[Dict[str, np.ndarray]]
cnn_predictions: Optional[Dict[str, np.ndarray]]
market_state: Optional[MarketState]
universal_stream: Optional[UniversalDataStream]
@dataclass
class UIDataPacket:
"""UI data packet for dashboard"""
timestamp: datetime
current_prices: Dict[str, float]
tick_cache_size: int
one_second_bars_count: int
streaming_status: str
training_data_available: bool
model_training_status: Dict[str, Any]
orchestrator_status: Dict[str, Any]
class UnifiedDataStream:
"""
Unified data stream manager for dashboard and training pipeline integration
"""
def __init__(self, data_provider: DataProvider, orchestrator=None):
"""Initialize unified data stream"""
self.config = get_config()
self.data_provider = data_provider
self.orchestrator = orchestrator
# Initialize universal data adapter
self.universal_adapter = UniversalDataAdapter(data_provider)
# Data consumers registry
self.consumers: Dict[str, StreamConsumer] = {}
self.consumer_lock = Lock()
# Data buffers for different consumers
self.tick_cache = deque(maxlen=5000) # Raw tick cache
self.one_second_bars = deque(maxlen=1000) # 1s OHLCV bars
self.training_data_buffer = deque(maxlen=100) # Training data packets
self.ui_data_buffer = deque(maxlen=50) # UI data packets
# Multi-timeframe data storage
self.multi_timeframe_data = {
'ETH/USDT': {
'1s': deque(maxlen=300),
'1m': deque(maxlen=300),
'1h': deque(maxlen=300),
'1d': deque(maxlen=300)
},
'BTC/USDT': {
'1s': deque(maxlen=300),
'1m': deque(maxlen=300),
'1h': deque(maxlen=300),
'1d': deque(maxlen=300)
}
}
# CNN features cache
self.cnn_features_cache = {}
self.cnn_predictions_cache = {}
# Stream status
self.streaming = False
self.stream_thread = None
# Performance tracking
self.stream_stats = {
'total_ticks_processed': 0,
'total_packets_sent': 0,
'consumers_served': 0,
'last_tick_time': None,
'processing_errors': 0,
'data_quality_score': 1.0
}
# Data validation
self.last_prices = {}
self.price_change_threshold = 0.1 # 10% change threshold
logger.info("Unified Data Stream initialized")
logger.info(f"Symbols: {self.config.symbols}")
logger.info(f"Timeframes: {self.config.timeframes}")
def register_consumer(self, consumer_name: str, callback: Callable[[Dict[str, Any]], None],
data_types: List[str]) -> str:
"""Register a data consumer"""
consumer_id = f"{consumer_name}_{int(time.time())}"
with self.consumer_lock:
consumer = StreamConsumer(
consumer_id=consumer_id,
consumer_name=consumer_name,
callback=callback,
data_types=data_types
)
self.consumers[consumer_id] = consumer
logger.info(f"Registered consumer: {consumer_name} ({consumer_id})")
logger.info(f"Data types: {data_types}")
return consumer_id
def unregister_consumer(self, consumer_id: str):
"""Unregister a data consumer"""
with self.consumer_lock:
if consumer_id in self.consumers:
consumer = self.consumers.pop(consumer_id)
logger.info(f"Unregistered consumer: {consumer.consumer_name} ({consumer_id})")
async def start_streaming(self):
"""Start unified data streaming"""
if self.streaming:
logger.warning("Data streaming already active")
return
self.streaming = True
# Subscribe to data provider ticks
self.data_provider.subscribe_to_ticks(
callback=self._handle_tick,
symbols=self.config.symbols,
subscriber_name="UnifiedDataStream"
)
# Start background processing
self.stream_thread = Thread(target=self._stream_processor, daemon=True)
self.stream_thread.start()
logger.info("Unified data streaming started")
async def stop_streaming(self):
"""Stop unified data streaming"""
self.streaming = False
if self.stream_thread:
self.stream_thread.join(timeout=5)
logger.info("Unified data streaming stopped")
def _handle_tick(self, tick: MarketTick):
"""Handle incoming tick data"""
try:
# Validate tick data
if not self._validate_tick(tick):
return
# Add to tick cache
tick_data = {
'symbol': tick.symbol,
'timestamp': tick.timestamp,
'price': tick.price,
'volume': tick.volume,
'quantity': tick.quantity,
'side': tick.side
}
self.tick_cache.append(tick_data)
# Update current prices
self.last_prices[tick.symbol] = tick.price
# Generate 1s bars if needed
self._update_one_second_bars(tick_data)
# Update multi-timeframe data
self._update_multi_timeframe_data(tick_data)
# Update statistics
self.stream_stats['total_ticks_processed'] += 1
self.stream_stats['last_tick_time'] = tick.timestamp
except Exception as e:
logger.error(f"Error handling tick: {e}")
self.stream_stats['processing_errors'] += 1
def _validate_tick(self, tick: MarketTick) -> bool:
"""Validate tick data quality"""
try:
# Check for valid price
if tick.price <= 0:
return False
# Check for reasonable price change
if tick.symbol in self.last_prices:
last_price = self.last_prices[tick.symbol]
if last_price > 0:
price_change = abs(tick.price - last_price) / last_price
if price_change > self.price_change_threshold:
logger.warning(f"Large price change detected for {tick.symbol}: {price_change:.2%}")
return False
# Check timestamp
if tick.timestamp > datetime.now() + timedelta(seconds=10):
return False
return True
except Exception as e:
logger.error(f"Error validating tick: {e}")
return False
def _update_one_second_bars(self, tick_data: Dict[str, Any]):
"""Update 1-second OHLCV bars"""
try:
symbol = tick_data['symbol']
price = tick_data['price']
volume = tick_data['volume']
timestamp = tick_data['timestamp']
# Round timestamp to nearest second
bar_timestamp = timestamp.replace(microsecond=0)
# Check if we need a new bar
if (not self.one_second_bars or
self.one_second_bars[-1]['timestamp'] != bar_timestamp or
self.one_second_bars[-1]['symbol'] != symbol):
# Create new 1s bar
bar_data = {
'symbol': symbol,
'timestamp': bar_timestamp,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
self.one_second_bars.append(bar_data)
else:
# Update existing bar
bar = self.one_second_bars[-1]
bar['high'] = max(bar['high'], price)
bar['low'] = min(bar['low'], price)
bar['close'] = price
bar['volume'] += volume
except Exception as e:
logger.error(f"Error updating 1s bars: {e}")
def _update_multi_timeframe_data(self, tick_data: Dict[str, Any]):
"""Update multi-timeframe OHLCV data"""
try:
symbol = tick_data['symbol']
if symbol not in self.multi_timeframe_data:
return
# Update each timeframe
for timeframe in ['1s', '1m', '1h', '1d']:
self._update_timeframe_bar(symbol, timeframe, tick_data)
except Exception as e:
logger.error(f"Error updating multi-timeframe data: {e}")
def _update_timeframe_bar(self, symbol: str, timeframe: str, tick_data: Dict[str, Any]):
"""Update specific timeframe bar"""
try:
price = tick_data['price']
volume = tick_data['volume']
timestamp = tick_data['timestamp']
# Calculate bar timestamp based on timeframe
if timeframe == '1s':
bar_timestamp = timestamp.replace(microsecond=0)
elif timeframe == '1m':
bar_timestamp = timestamp.replace(second=0, microsecond=0)
elif timeframe == '1h':
bar_timestamp = timestamp.replace(minute=0, second=0, microsecond=0)
elif timeframe == '1d':
bar_timestamp = timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
else:
return
timeframe_buffer = self.multi_timeframe_data[symbol][timeframe]
# Check if we need a new bar
if (not timeframe_buffer or
timeframe_buffer[-1]['timestamp'] != bar_timestamp):
# Create new bar
bar_data = {
'timestamp': bar_timestamp,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
timeframe_buffer.append(bar_data)
else:
# Update existing bar
bar = timeframe_buffer[-1]
bar['high'] = max(bar['high'], price)
bar['low'] = min(bar['low'], price)
bar['close'] = price
bar['volume'] += volume
except Exception as e:
logger.error(f"Error updating {timeframe} bar for {symbol}: {e}")
def _stream_processor(self):
"""Background stream processor"""
logger.info("Stream processor started")
while self.streaming:
try:
# Process training data packets
self._process_training_data()
# Process UI data packets
self._process_ui_data()
# Update CNN features if orchestrator available
if self.orchestrator:
self._update_cnn_features()
# Distribute data to consumers
self._distribute_data()
# Sleep briefly
time.sleep(0.1) # 100ms processing cycle
except Exception as e:
logger.error(f"Error in stream processor: {e}")
time.sleep(1)
logger.info("Stream processor stopped")
def _process_training_data(self):
"""Process and package training data"""
try:
if len(self.tick_cache) < 10: # Need minimum data
return
# Create training data packet
training_packet = TrainingDataPacket(
timestamp=datetime.now(),
symbol='ETH/USDT', # Primary symbol
tick_cache=list(self.tick_cache)[-300:], # Last 300 ticks
one_second_bars=list(self.one_second_bars)[-300:], # Last 300 1s bars
multi_timeframe_data=self._get_multi_timeframe_snapshot(),
cnn_features=self.cnn_features_cache.copy(),
cnn_predictions=self.cnn_predictions_cache.copy(),
market_state=self._build_market_state(),
universal_stream=self._get_universal_stream()
)
self.training_data_buffer.append(training_packet)
except Exception as e:
logger.error(f"Error processing training data: {e}")
def _process_ui_data(self):
"""Process and package UI data"""
try:
# Create UI data packet
ui_packet = UIDataPacket(
timestamp=datetime.now(),
current_prices=self.last_prices.copy(),
tick_cache_size=len(self.tick_cache),
one_second_bars_count=len(self.one_second_bars),
streaming_status='LIVE' if self.streaming else 'STOPPED',
training_data_available=len(self.training_data_buffer) > 0,
model_training_status=self._get_model_training_status(),
orchestrator_status=self._get_orchestrator_status()
)
self.ui_data_buffer.append(ui_packet)
except Exception as e:
logger.error(f"Error processing UI data: {e}")
def _update_cnn_features(self):
"""Update CNN features cache"""
try:
if not self.orchestrator:
return
# Get CNN features from orchestrator
for symbol in self.config.symbols:
if hasattr(self.orchestrator, '_get_cnn_features_for_rl'):
hidden_features, predictions = self.orchestrator._get_cnn_features_for_rl(symbol)
if hidden_features:
self.cnn_features_cache[symbol] = hidden_features
if predictions:
self.cnn_predictions_cache[symbol] = predictions
except Exception as e:
logger.error(f"Error updating CNN features: {e}")
def _distribute_data(self):
"""Distribute data to registered consumers"""
try:
with self.consumer_lock:
for consumer_id, consumer in self.consumers.items():
if not consumer.active:
continue
try:
# Prepare data based on consumer requirements
data_packet = self._prepare_consumer_data(consumer)
if data_packet:
# Send data to consumer
consumer.callback(data_packet)
consumer.update_count += 1
consumer.last_update = datetime.now()
except Exception as e:
logger.error(f"Error sending data to consumer {consumer.consumer_name}: {e}")
consumer.active = False
self.stream_stats['consumers_served'] = len([c for c in self.consumers.values() if c.active])
except Exception as e:
logger.error(f"Error distributing data: {e}")
def _prepare_consumer_data(self, consumer: StreamConsumer) -> Optional[Dict[str, Any]]:
"""Prepare data packet for specific consumer"""
try:
data_packet = {
'timestamp': datetime.now(),
'consumer_id': consumer.consumer_id,
'consumer_name': consumer.consumer_name
}
# Add requested data types
if 'ticks' in consumer.data_types:
data_packet['ticks'] = list(self.tick_cache)[-100:] # Last 100 ticks
if 'ohlcv' in consumer.data_types:
data_packet['one_second_bars'] = list(self.one_second_bars)[-100:]
data_packet['multi_timeframe'] = self._get_multi_timeframe_snapshot()
if 'training_data' in consumer.data_types:
if self.training_data_buffer:
data_packet['training_data'] = self.training_data_buffer[-1]
if 'ui_data' in consumer.data_types:
if self.ui_data_buffer:
data_packet['ui_data'] = self.ui_data_buffer[-1]
return data_packet
except Exception as e:
logger.error(f"Error preparing data for consumer {consumer.consumer_name}: {e}")
return None
def _get_multi_timeframe_snapshot(self) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
"""Get snapshot of multi-timeframe data"""
snapshot = {}
for symbol, timeframes in self.multi_timeframe_data.items():
snapshot[symbol] = {}
for timeframe, data in timeframes.items():
snapshot[symbol][timeframe] = list(data)
return snapshot
def _build_market_state(self) -> Optional[MarketState]:
"""Build market state for training"""
try:
if not self.orchestrator:
return None
# Get universal stream
universal_stream = self._get_universal_stream()
if not universal_stream:
return None
# Build market state using orchestrator
symbol = 'ETH/USDT'
current_price = self.last_prices.get(symbol, 0.0)
market_state = MarketState(
symbol=symbol,
timestamp=datetime.now(),
prices={'current': current_price},
features={},
volatility=0.0,
volume=0.0,
trend_strength=0.0,
market_regime='unknown',
universal_data=universal_stream,
raw_ticks=list(self.tick_cache)[-300:],
ohlcv_data=self._get_multi_timeframe_snapshot(),
btc_reference_data=self._get_btc_reference_data(),
cnn_hidden_features=self.cnn_features_cache.copy(),
cnn_predictions=self.cnn_predictions_cache.copy()
)
return market_state
except Exception as e:
logger.error(f"Error building market state: {e}")
return None
def _get_universal_stream(self) -> Optional[UniversalDataStream]:
"""Get universal data stream"""
try:
if self.universal_adapter:
return self.universal_adapter.get_universal_stream()
return None
except Exception as e:
logger.error(f"Error getting universal stream: {e}")
return None
def _get_btc_reference_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get BTC reference data"""
btc_data = {}
if 'BTC/USDT' in self.multi_timeframe_data:
for timeframe, data in self.multi_timeframe_data['BTC/USDT'].items():
btc_data[timeframe] = list(data)
return btc_data
def _get_model_training_status(self) -> Dict[str, Any]:
"""Get model training status"""
try:
if self.orchestrator and hasattr(self.orchestrator, 'get_performance_metrics'):
return self.orchestrator.get_performance_metrics()
return {
'cnn_status': 'TRAINING',
'rl_status': 'TRAINING',
'data_available': len(self.training_data_buffer) > 0
}
except Exception as e:
logger.error(f"Error getting model training status: {e}")
return {}
def _get_orchestrator_status(self) -> Dict[str, Any]:
"""Get orchestrator status"""
try:
if self.orchestrator:
return {
'active': True,
'symbols': self.config.symbols,
'streaming': self.streaming,
'tick_processor_active': hasattr(self.orchestrator, 'tick_processor')
}
return {'active': False}
except Exception as e:
logger.error(f"Error getting orchestrator status: {e}")
return {'active': False}
def get_stream_stats(self) -> Dict[str, Any]:
"""Get stream statistics"""
stats = self.stream_stats.copy()
stats.update({
'tick_cache_size': len(self.tick_cache),
'one_second_bars_count': len(self.one_second_bars),
'training_data_packets': len(self.training_data_buffer),
'ui_data_packets': len(self.ui_data_buffer),
'active_consumers': len([c for c in self.consumers.values() if c.active]),
'total_consumers': len(self.consumers)
})
return stats
def get_latest_training_data(self) -> Optional[TrainingDataPacket]:
"""Get latest training data packet"""
if self.training_data_buffer:
return self.training_data_buffer[-1]
return None
def get_latest_ui_data(self) -> Optional[UIDataPacket]:
"""Get latest UI data packet"""
if self.ui_data_buffer:
return self.ui_data_buffer[-1]
return None