627 lines
23 KiB
Python
627 lines
23 KiB
Python
"""
|
|
Unified Data Stream Architecture for Dashboard and Enhanced RL Training
|
|
|
|
This module provides a centralized data streaming architecture that:
|
|
1. Serves real-time data to the dashboard UI
|
|
2. Feeds the enhanced RL training pipeline with comprehensive data
|
|
3. Maintains data consistency across all consumers
|
|
4. Provides efficient data distribution without duplication
|
|
5. Supports multiple data consumers with different requirements
|
|
|
|
Key Features:
|
|
- Single source of truth for all market data
|
|
- Real-time tick processing and aggregation
|
|
- Multi-timeframe OHLCV generation
|
|
- CNN feature extraction and caching
|
|
- RL state building with comprehensive data
|
|
- Dashboard-ready formatted data
|
|
- Training data collection and buffering
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
from threading import Thread, Lock
|
|
import json
|
|
|
|
from .config import get_config
|
|
from .data_provider import DataProvider, MarketTick
|
|
from .universal_data_adapter import UniversalDataAdapter, UniversalDataStream
|
|
from .enhanced_orchestrator import MarketState, TradingAction
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class StreamConsumer:
|
|
"""Data stream consumer configuration"""
|
|
consumer_id: str
|
|
consumer_name: str
|
|
callback: Callable[[Dict[str, Any]], None]
|
|
data_types: List[str] # ['ticks', 'ohlcv', 'training_data', 'ui_data']
|
|
active: bool = True
|
|
last_update: datetime = field(default_factory=datetime.now)
|
|
update_count: int = 0
|
|
|
|
@dataclass
|
|
class TrainingDataPacket:
|
|
"""Training data packet for RL pipeline"""
|
|
timestamp: datetime
|
|
symbol: str
|
|
tick_cache: List[Dict[str, Any]]
|
|
one_second_bars: List[Dict[str, Any]]
|
|
multi_timeframe_data: Dict[str, List[Dict[str, Any]]]
|
|
cnn_features: Optional[Dict[str, np.ndarray]]
|
|
cnn_predictions: Optional[Dict[str, np.ndarray]]
|
|
market_state: Optional[MarketState]
|
|
universal_stream: Optional[UniversalDataStream]
|
|
|
|
@dataclass
|
|
class UIDataPacket:
|
|
"""UI data packet for dashboard"""
|
|
timestamp: datetime
|
|
current_prices: Dict[str, float]
|
|
tick_cache_size: int
|
|
one_second_bars_count: int
|
|
streaming_status: str
|
|
training_data_available: bool
|
|
model_training_status: Dict[str, Any]
|
|
orchestrator_status: Dict[str, Any]
|
|
|
|
class UnifiedDataStream:
|
|
"""
|
|
Unified data stream manager for dashboard and training pipeline integration
|
|
"""
|
|
|
|
def __init__(self, data_provider: DataProvider, orchestrator=None):
|
|
"""Initialize unified data stream"""
|
|
self.config = get_config()
|
|
self.data_provider = data_provider
|
|
self.orchestrator = orchestrator
|
|
|
|
# Initialize universal data adapter
|
|
self.universal_adapter = UniversalDataAdapter(data_provider)
|
|
|
|
# Data consumers registry
|
|
self.consumers: Dict[str, StreamConsumer] = {}
|
|
self.consumer_lock = Lock()
|
|
|
|
# Data buffers for different consumers
|
|
self.tick_cache = deque(maxlen=5000) # Raw tick cache
|
|
self.one_second_bars = deque(maxlen=1000) # 1s OHLCV bars
|
|
self.training_data_buffer = deque(maxlen=100) # Training data packets
|
|
self.ui_data_buffer = deque(maxlen=50) # UI data packets
|
|
|
|
# Multi-timeframe data storage
|
|
self.multi_timeframe_data = {
|
|
'ETH/USDT': {
|
|
'1s': deque(maxlen=300),
|
|
'1m': deque(maxlen=300),
|
|
'1h': deque(maxlen=300),
|
|
'1d': deque(maxlen=300)
|
|
},
|
|
'BTC/USDT': {
|
|
'1s': deque(maxlen=300),
|
|
'1m': deque(maxlen=300),
|
|
'1h': deque(maxlen=300),
|
|
'1d': deque(maxlen=300)
|
|
}
|
|
}
|
|
|
|
# CNN features cache
|
|
self.cnn_features_cache = {}
|
|
self.cnn_predictions_cache = {}
|
|
|
|
# Stream status
|
|
self.streaming = False
|
|
self.stream_thread = None
|
|
|
|
# Performance tracking
|
|
self.stream_stats = {
|
|
'total_ticks_processed': 0,
|
|
'total_packets_sent': 0,
|
|
'consumers_served': 0,
|
|
'last_tick_time': None,
|
|
'processing_errors': 0,
|
|
'data_quality_score': 1.0
|
|
}
|
|
|
|
# Data validation
|
|
self.last_prices = {}
|
|
self.price_change_threshold = 0.1 # 10% change threshold
|
|
|
|
logger.info("Unified Data Stream initialized")
|
|
logger.info(f"Symbols: {self.config.symbols}")
|
|
logger.info(f"Timeframes: {self.config.timeframes}")
|
|
|
|
def register_consumer(self, consumer_name: str, callback: Callable[[Dict[str, Any]], None],
|
|
data_types: List[str]) -> str:
|
|
"""Register a data consumer"""
|
|
consumer_id = f"{consumer_name}_{int(time.time())}"
|
|
|
|
with self.consumer_lock:
|
|
consumer = StreamConsumer(
|
|
consumer_id=consumer_id,
|
|
consumer_name=consumer_name,
|
|
callback=callback,
|
|
data_types=data_types
|
|
)
|
|
self.consumers[consumer_id] = consumer
|
|
|
|
logger.info(f"Registered consumer: {consumer_name} ({consumer_id})")
|
|
logger.info(f"Data types: {data_types}")
|
|
|
|
return consumer_id
|
|
|
|
def unregister_consumer(self, consumer_id: str):
|
|
"""Unregister a data consumer"""
|
|
with self.consumer_lock:
|
|
if consumer_id in self.consumers:
|
|
consumer = self.consumers.pop(consumer_id)
|
|
logger.info(f"Unregistered consumer: {consumer.consumer_name} ({consumer_id})")
|
|
|
|
async def start_streaming(self):
|
|
"""Start unified data streaming"""
|
|
if self.streaming:
|
|
logger.warning("Data streaming already active")
|
|
return
|
|
|
|
self.streaming = True
|
|
|
|
# Subscribe to data provider ticks
|
|
self.data_provider.subscribe_to_ticks(
|
|
callback=self._handle_tick,
|
|
symbols=self.config.symbols,
|
|
subscriber_name="UnifiedDataStream"
|
|
)
|
|
|
|
# Start background processing
|
|
self.stream_thread = Thread(target=self._stream_processor, daemon=True)
|
|
self.stream_thread.start()
|
|
|
|
logger.info("Unified data streaming started")
|
|
|
|
async def stop_streaming(self):
|
|
"""Stop unified data streaming"""
|
|
self.streaming = False
|
|
|
|
if self.stream_thread:
|
|
self.stream_thread.join(timeout=5)
|
|
|
|
logger.info("Unified data streaming stopped")
|
|
|
|
def _handle_tick(self, tick: MarketTick):
|
|
"""Handle incoming tick data"""
|
|
try:
|
|
# Validate tick data
|
|
if not self._validate_tick(tick):
|
|
return
|
|
|
|
# Add to tick cache
|
|
tick_data = {
|
|
'symbol': tick.symbol,
|
|
'timestamp': tick.timestamp,
|
|
'price': tick.price,
|
|
'volume': tick.volume,
|
|
'quantity': tick.quantity,
|
|
'side': tick.side
|
|
}
|
|
|
|
self.tick_cache.append(tick_data)
|
|
|
|
# Update current prices
|
|
self.last_prices[tick.symbol] = tick.price
|
|
|
|
# Generate 1s bars if needed
|
|
self._update_one_second_bars(tick_data)
|
|
|
|
# Update multi-timeframe data
|
|
self._update_multi_timeframe_data(tick_data)
|
|
|
|
# Update statistics
|
|
self.stream_stats['total_ticks_processed'] += 1
|
|
self.stream_stats['last_tick_time'] = tick.timestamp
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling tick: {e}")
|
|
self.stream_stats['processing_errors'] += 1
|
|
|
|
def _validate_tick(self, tick: MarketTick) -> bool:
|
|
"""Validate tick data quality"""
|
|
try:
|
|
# Check for valid price
|
|
if tick.price <= 0:
|
|
return False
|
|
|
|
# Check for reasonable price change
|
|
if tick.symbol in self.last_prices:
|
|
last_price = self.last_prices[tick.symbol]
|
|
if last_price > 0:
|
|
price_change = abs(tick.price - last_price) / last_price
|
|
if price_change > self.price_change_threshold:
|
|
logger.warning(f"Large price change detected for {tick.symbol}: {price_change:.2%}")
|
|
return False
|
|
|
|
# Check timestamp
|
|
if tick.timestamp > datetime.now() + timedelta(seconds=10):
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating tick: {e}")
|
|
return False
|
|
|
|
def _update_one_second_bars(self, tick_data: Dict[str, Any]):
|
|
"""Update 1-second OHLCV bars"""
|
|
try:
|
|
symbol = tick_data['symbol']
|
|
price = tick_data['price']
|
|
volume = tick_data['volume']
|
|
timestamp = tick_data['timestamp']
|
|
|
|
# Round timestamp to nearest second
|
|
bar_timestamp = timestamp.replace(microsecond=0)
|
|
|
|
# Check if we need a new bar
|
|
if (not self.one_second_bars or
|
|
self.one_second_bars[-1]['timestamp'] != bar_timestamp or
|
|
self.one_second_bars[-1]['symbol'] != symbol):
|
|
|
|
# Create new 1s bar
|
|
bar_data = {
|
|
'symbol': symbol,
|
|
'timestamp': bar_timestamp,
|
|
'open': price,
|
|
'high': price,
|
|
'low': price,
|
|
'close': price,
|
|
'volume': volume
|
|
}
|
|
self.one_second_bars.append(bar_data)
|
|
else:
|
|
# Update existing bar
|
|
bar = self.one_second_bars[-1]
|
|
bar['high'] = max(bar['high'], price)
|
|
bar['low'] = min(bar['low'], price)
|
|
bar['close'] = price
|
|
bar['volume'] += volume
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating 1s bars: {e}")
|
|
|
|
def _update_multi_timeframe_data(self, tick_data: Dict[str, Any]):
|
|
"""Update multi-timeframe OHLCV data"""
|
|
try:
|
|
symbol = tick_data['symbol']
|
|
if symbol not in self.multi_timeframe_data:
|
|
return
|
|
|
|
# Update each timeframe
|
|
for timeframe in ['1s', '1m', '1h', '1d']:
|
|
self._update_timeframe_bar(symbol, timeframe, tick_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating multi-timeframe data: {e}")
|
|
|
|
def _update_timeframe_bar(self, symbol: str, timeframe: str, tick_data: Dict[str, Any]):
|
|
"""Update specific timeframe bar"""
|
|
try:
|
|
price = tick_data['price']
|
|
volume = tick_data['volume']
|
|
timestamp = tick_data['timestamp']
|
|
|
|
# Calculate bar timestamp based on timeframe
|
|
if timeframe == '1s':
|
|
bar_timestamp = timestamp.replace(microsecond=0)
|
|
elif timeframe == '1m':
|
|
bar_timestamp = timestamp.replace(second=0, microsecond=0)
|
|
elif timeframe == '1h':
|
|
bar_timestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
|
elif timeframe == '1d':
|
|
bar_timestamp = timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
else:
|
|
return
|
|
|
|
timeframe_buffer = self.multi_timeframe_data[symbol][timeframe]
|
|
|
|
# Check if we need a new bar
|
|
if (not timeframe_buffer or
|
|
timeframe_buffer[-1]['timestamp'] != bar_timestamp):
|
|
|
|
# Create new bar
|
|
bar_data = {
|
|
'timestamp': bar_timestamp,
|
|
'open': price,
|
|
'high': price,
|
|
'low': price,
|
|
'close': price,
|
|
'volume': volume
|
|
}
|
|
timeframe_buffer.append(bar_data)
|
|
else:
|
|
# Update existing bar
|
|
bar = timeframe_buffer[-1]
|
|
bar['high'] = max(bar['high'], price)
|
|
bar['low'] = min(bar['low'], price)
|
|
bar['close'] = price
|
|
bar['volume'] += volume
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating {timeframe} bar for {symbol}: {e}")
|
|
|
|
def _stream_processor(self):
|
|
"""Background stream processor"""
|
|
logger.info("Stream processor started")
|
|
|
|
while self.streaming:
|
|
try:
|
|
# Process training data packets
|
|
self._process_training_data()
|
|
|
|
# Process UI data packets
|
|
self._process_ui_data()
|
|
|
|
# Update CNN features if orchestrator available
|
|
if self.orchestrator:
|
|
self._update_cnn_features()
|
|
|
|
# Distribute data to consumers
|
|
self._distribute_data()
|
|
|
|
# Sleep briefly
|
|
time.sleep(0.1) # 100ms processing cycle
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stream processor: {e}")
|
|
time.sleep(1)
|
|
|
|
logger.info("Stream processor stopped")
|
|
|
|
def _process_training_data(self):
|
|
"""Process and package training data"""
|
|
try:
|
|
if len(self.tick_cache) < 10: # Need minimum data
|
|
return
|
|
|
|
# Create training data packet
|
|
training_packet = TrainingDataPacket(
|
|
timestamp=datetime.now(),
|
|
symbol='ETH/USDT', # Primary symbol
|
|
tick_cache=list(self.tick_cache)[-300:], # Last 300 ticks
|
|
one_second_bars=list(self.one_second_bars)[-300:], # Last 300 1s bars
|
|
multi_timeframe_data=self._get_multi_timeframe_snapshot(),
|
|
cnn_features=self.cnn_features_cache.copy(),
|
|
cnn_predictions=self.cnn_predictions_cache.copy(),
|
|
market_state=self._build_market_state(),
|
|
universal_stream=self._get_universal_stream()
|
|
)
|
|
|
|
self.training_data_buffer.append(training_packet)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing training data: {e}")
|
|
|
|
def _process_ui_data(self):
|
|
"""Process and package UI data"""
|
|
try:
|
|
# Create UI data packet
|
|
ui_packet = UIDataPacket(
|
|
timestamp=datetime.now(),
|
|
current_prices=self.last_prices.copy(),
|
|
tick_cache_size=len(self.tick_cache),
|
|
one_second_bars_count=len(self.one_second_bars),
|
|
streaming_status='LIVE' if self.streaming else 'STOPPED',
|
|
training_data_available=len(self.training_data_buffer) > 0,
|
|
model_training_status=self._get_model_training_status(),
|
|
orchestrator_status=self._get_orchestrator_status()
|
|
)
|
|
|
|
self.ui_data_buffer.append(ui_packet)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing UI data: {e}")
|
|
|
|
def _update_cnn_features(self):
|
|
"""Update CNN features cache"""
|
|
try:
|
|
if not self.orchestrator:
|
|
return
|
|
|
|
# Get CNN features from orchestrator
|
|
for symbol in self.config.symbols:
|
|
if hasattr(self.orchestrator, '_get_cnn_features_for_rl'):
|
|
hidden_features, predictions = self.orchestrator._get_cnn_features_for_rl(symbol)
|
|
|
|
if hidden_features:
|
|
self.cnn_features_cache[symbol] = hidden_features
|
|
|
|
if predictions:
|
|
self.cnn_predictions_cache[symbol] = predictions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating CNN features: {e}")
|
|
|
|
def _distribute_data(self):
|
|
"""Distribute data to registered consumers"""
|
|
try:
|
|
with self.consumer_lock:
|
|
for consumer_id, consumer in self.consumers.items():
|
|
if not consumer.active:
|
|
continue
|
|
|
|
try:
|
|
# Prepare data based on consumer requirements
|
|
data_packet = self._prepare_consumer_data(consumer)
|
|
|
|
if data_packet:
|
|
# Send data to consumer
|
|
consumer.callback(data_packet)
|
|
consumer.update_count += 1
|
|
consumer.last_update = datetime.now()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending data to consumer {consumer.consumer_name}: {e}")
|
|
consumer.active = False
|
|
|
|
self.stream_stats['consumers_served'] = len([c for c in self.consumers.values() if c.active])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error distributing data: {e}")
|
|
|
|
def _prepare_consumer_data(self, consumer: StreamConsumer) -> Optional[Dict[str, Any]]:
|
|
"""Prepare data packet for specific consumer"""
|
|
try:
|
|
data_packet = {
|
|
'timestamp': datetime.now(),
|
|
'consumer_id': consumer.consumer_id,
|
|
'consumer_name': consumer.consumer_name
|
|
}
|
|
|
|
# Add requested data types
|
|
if 'ticks' in consumer.data_types:
|
|
data_packet['ticks'] = list(self.tick_cache)[-100:] # Last 100 ticks
|
|
|
|
if 'ohlcv' in consumer.data_types:
|
|
data_packet['one_second_bars'] = list(self.one_second_bars)[-100:]
|
|
data_packet['multi_timeframe'] = self._get_multi_timeframe_snapshot()
|
|
|
|
if 'training_data' in consumer.data_types:
|
|
if self.training_data_buffer:
|
|
data_packet['training_data'] = self.training_data_buffer[-1]
|
|
|
|
if 'ui_data' in consumer.data_types:
|
|
if self.ui_data_buffer:
|
|
data_packet['ui_data'] = self.ui_data_buffer[-1]
|
|
|
|
return data_packet
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error preparing data for consumer {consumer.consumer_name}: {e}")
|
|
return None
|
|
|
|
def _get_multi_timeframe_snapshot(self) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
|
|
"""Get snapshot of multi-timeframe data"""
|
|
snapshot = {}
|
|
for symbol, timeframes in self.multi_timeframe_data.items():
|
|
snapshot[symbol] = {}
|
|
for timeframe, data in timeframes.items():
|
|
snapshot[symbol][timeframe] = list(data)
|
|
return snapshot
|
|
|
|
def _build_market_state(self) -> Optional[MarketState]:
|
|
"""Build market state for training"""
|
|
try:
|
|
if not self.orchestrator:
|
|
return None
|
|
|
|
# Get universal stream
|
|
universal_stream = self._get_universal_stream()
|
|
if not universal_stream:
|
|
return None
|
|
|
|
# Build market state using orchestrator
|
|
symbol = 'ETH/USDT'
|
|
current_price = self.last_prices.get(symbol, 0.0)
|
|
|
|
market_state = MarketState(
|
|
symbol=symbol,
|
|
timestamp=datetime.now(),
|
|
prices={'current': current_price},
|
|
features={},
|
|
volatility=0.0,
|
|
volume=0.0,
|
|
trend_strength=0.0,
|
|
market_regime='unknown',
|
|
universal_data=universal_stream,
|
|
raw_ticks=list(self.tick_cache)[-300:],
|
|
ohlcv_data=self._get_multi_timeframe_snapshot(),
|
|
btc_reference_data=self._get_btc_reference_data(),
|
|
cnn_hidden_features=self.cnn_features_cache.copy(),
|
|
cnn_predictions=self.cnn_predictions_cache.copy()
|
|
)
|
|
|
|
return market_state
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error building market state: {e}")
|
|
return None
|
|
|
|
def _get_universal_stream(self) -> Optional[UniversalDataStream]:
|
|
"""Get universal data stream"""
|
|
try:
|
|
if self.universal_adapter:
|
|
return self.universal_adapter.get_universal_stream()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting universal stream: {e}")
|
|
return None
|
|
|
|
def _get_btc_reference_data(self) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""Get BTC reference data"""
|
|
btc_data = {}
|
|
if 'BTC/USDT' in self.multi_timeframe_data:
|
|
for timeframe, data in self.multi_timeframe_data['BTC/USDT'].items():
|
|
btc_data[timeframe] = list(data)
|
|
return btc_data
|
|
|
|
def _get_model_training_status(self) -> Dict[str, Any]:
|
|
"""Get model training status"""
|
|
try:
|
|
if self.orchestrator and hasattr(self.orchestrator, 'get_performance_metrics'):
|
|
return self.orchestrator.get_performance_metrics()
|
|
|
|
return {
|
|
'cnn_status': 'TRAINING',
|
|
'rl_status': 'TRAINING',
|
|
'data_available': len(self.training_data_buffer) > 0
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting model training status: {e}")
|
|
return {}
|
|
|
|
def _get_orchestrator_status(self) -> Dict[str, Any]:
|
|
"""Get orchestrator status"""
|
|
try:
|
|
if self.orchestrator:
|
|
return {
|
|
'active': True,
|
|
'symbols': self.config.symbols,
|
|
'streaming': self.streaming,
|
|
'tick_processor_active': hasattr(self.orchestrator, 'tick_processor')
|
|
}
|
|
|
|
return {'active': False}
|
|
except Exception as e:
|
|
logger.error(f"Error getting orchestrator status: {e}")
|
|
return {'active': False}
|
|
|
|
def get_stream_stats(self) -> Dict[str, Any]:
|
|
"""Get stream statistics"""
|
|
stats = self.stream_stats.copy()
|
|
stats.update({
|
|
'tick_cache_size': len(self.tick_cache),
|
|
'one_second_bars_count': len(self.one_second_bars),
|
|
'training_data_packets': len(self.training_data_buffer),
|
|
'ui_data_packets': len(self.ui_data_buffer),
|
|
'active_consumers': len([c for c in self.consumers.values() if c.active]),
|
|
'total_consumers': len(self.consumers)
|
|
})
|
|
return stats
|
|
|
|
def get_latest_training_data(self) -> Optional[TrainingDataPacket]:
|
|
"""Get latest training data packet"""
|
|
if self.training_data_buffer:
|
|
return self.training_data_buffer[-1]
|
|
return None
|
|
|
|
def get_latest_ui_data(self) -> Optional[UIDataPacket]:
|
|
"""Get latest UI data packet"""
|
|
if self.ui_data_buffer:
|
|
return self.ui_data_buffer[-1]
|
|
return None |