improve stream
This commit is contained in:
@@ -25,11 +25,15 @@ class DataStreamMonitor:
|
||||
self.data_provider = data_provider
|
||||
self.training_system = training_system
|
||||
|
||||
# Data buffers for streaming
|
||||
# Data buffers for streaming (expanded for accessing historical data)
|
||||
self.data_streams = {
|
||||
'ohlcv_1m': deque(maxlen=100),
|
||||
'ohlcv_5m': deque(maxlen=50),
|
||||
'ohlcv_15m': deque(maxlen=20),
|
||||
'ohlcv_1s': deque(maxlen=300), # 300 seconds for 1s data
|
||||
'ohlcv_1m': deque(maxlen=300), # 300 minutes for 1m data (ETH)
|
||||
'ohlcv_1h': deque(maxlen=300), # 300 hours for 1h data (ETH)
|
||||
'ohlcv_1d': deque(maxlen=300), # 300 days for 1d data (ETH)
|
||||
'btc_1m': deque(maxlen=300), # 300 minutes for BTC 1m data
|
||||
'ohlcv_5m': deque(maxlen=100), # Keep for compatibility
|
||||
'ohlcv_15m': deque(maxlen=100), # Keep for compatibility
|
||||
'ticks': deque(maxlen=200),
|
||||
'cob_raw': deque(maxlen=100),
|
||||
'cob_aggregated': deque(maxlen=50),
|
||||
@@ -39,12 +43,15 @@ class DataStreamMonitor:
|
||||
'training_experiences': deque(maxlen=200)
|
||||
}
|
||||
|
||||
# Streaming configuration
|
||||
# Streaming configuration - expanded for model requirements
|
||||
self.stream_config = {
|
||||
'console_output': True,
|
||||
'compact_format': False,
|
||||
'include_timestamps': True,
|
||||
'filter_symbols': ['ETH/USDT'], # Focus on primary symbols
|
||||
'filter_symbols': ['ETH/USDT', 'BTC/USDT'], # Primary and secondary symbols
|
||||
'primary_symbol': 'ETH/USDT',
|
||||
'secondary_symbol': 'BTC/USDT',
|
||||
'timeframes': ['1s', '1m', '1h', '1d'], # Required timeframes for models
|
||||
'sampling_rate': 1.0 # seconds between samples
|
||||
}
|
||||
|
||||
@@ -118,32 +125,114 @@ class DataStreamMonitor:
|
||||
logger.error(f"Error collecting data sample: {e}")
|
||||
|
||||
def _collect_ohlcv_data(self, timestamp: datetime):
|
||||
"""Collect OHLCV data for all timeframes"""
|
||||
"""Collect OHLCV data for all timeframes and symbols"""
|
||||
try:
|
||||
for symbol in self.stream_config['filter_symbols']:
|
||||
for timeframe in ['1m', '5m', '15m']:
|
||||
if self.data_provider:
|
||||
df = self.data_provider.get_historical_data(symbol, timeframe, limit=5)
|
||||
if df is not None and not df.empty:
|
||||
latest_bar = {
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'symbol': symbol,
|
||||
'timeframe': timeframe,
|
||||
'open': float(df['open'].iloc[-1]),
|
||||
'high': float(df['high'].iloc[-1]),
|
||||
'low': float(df['low'].iloc[-1]),
|
||||
'close': float(df['close'].iloc[-1]),
|
||||
'volume': float(df['volume'].iloc[-1])
|
||||
}
|
||||
# ETH/USDT data for all required timeframes
|
||||
primary_symbol = self.stream_config['primary_symbol']
|
||||
for timeframe in ['1m', '1h', '1d']:
|
||||
if self.data_provider:
|
||||
# Get recent data (limit=1 for latest, but access historical data when needed)
|
||||
df = self.data_provider.get_historical_data(primary_symbol, timeframe, limit=300)
|
||||
if df is not None and not df.empty:
|
||||
# Get the latest bar
|
||||
latest_bar = {
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'symbol': primary_symbol,
|
||||
'timeframe': timeframe,
|
||||
'open': float(df['open'].iloc[-1]),
|
||||
'high': float(df['high'].iloc[-1]),
|
||||
'low': float(df['low'].iloc[-1]),
|
||||
'close': float(df['close'].iloc[-1]),
|
||||
'volume': float(df['volume'].iloc[-1])
|
||||
}
|
||||
|
||||
stream_key = f'ohlcv_{timeframe}'
|
||||
if len(self.data_streams[stream_key]) == 0 or \
|
||||
self.data_streams[stream_key][-1]['timestamp'] != latest_bar['timestamp']:
|
||||
self.data_streams[stream_key].append(latest_bar)
|
||||
stream_key = f'ohlcv_{timeframe}'
|
||||
|
||||
# Only add if different from last entry or if stream is empty
|
||||
if len(self.data_streams[stream_key]) == 0 or \
|
||||
self.data_streams[stream_key][-1]['close'] != latest_bar['close']:
|
||||
self.data_streams[stream_key].append(latest_bar)
|
||||
|
||||
# If stream was empty, populate with historical data
|
||||
if len(self.data_streams[stream_key]) == 1:
|
||||
logger.info(f"Populating {stream_key} with historical data...")
|
||||
self._populate_historical_data(df, stream_key, primary_symbol, timeframe)
|
||||
|
||||
# BTC/USDT 1m data (secondary symbol)
|
||||
secondary_symbol = self.stream_config['secondary_symbol']
|
||||
if self.data_provider:
|
||||
df = self.data_provider.get_historical_data(secondary_symbol, '1m', limit=300)
|
||||
if df is not None and not df.empty:
|
||||
latest_bar = {
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'symbol': secondary_symbol,
|
||||
'timeframe': '1m',
|
||||
'open': float(df['open'].iloc[-1]),
|
||||
'high': float(df['high'].iloc[-1]),
|
||||
'low': float(df['low'].iloc[-1]),
|
||||
'close': float(df['close'].iloc[-1]),
|
||||
'volume': float(df['volume'].iloc[-1])
|
||||
}
|
||||
|
||||
# Only add if different from last entry or if stream is empty
|
||||
if len(self.data_streams['btc_1m']) == 0 or \
|
||||
self.data_streams['btc_1m'][-1]['close'] != latest_bar['close']:
|
||||
self.data_streams['btc_1m'].append(latest_bar)
|
||||
|
||||
# If stream was empty, populate with historical data
|
||||
if len(self.data_streams['btc_1m']) == 1:
|
||||
logger.info("Populating btc_1m with historical data...")
|
||||
self._populate_historical_data(df, 'btc_1m', secondary_symbol, '1m')
|
||||
|
||||
# Legacy timeframes for compatibility
|
||||
for timeframe in ['5m', '15m']:
|
||||
if self.data_provider:
|
||||
df = self.data_provider.get_historical_data(primary_symbol, timeframe, limit=5)
|
||||
if df is not None and not df.empty:
|
||||
latest_bar = {
|
||||
'timestamp': timestamp.isoformat(),
|
||||
'symbol': primary_symbol,
|
||||
'timeframe': timeframe,
|
||||
'open': float(df['open'].iloc[-1]),
|
||||
'high': float(df['high'].iloc[-1]),
|
||||
'low': float(df['low'].iloc[-1]),
|
||||
'close': float(df['close'].iloc[-1]),
|
||||
'volume': float(df['volume'].iloc[-1])
|
||||
}
|
||||
|
||||
stream_key = f'ohlcv_{timeframe}'
|
||||
if len(self.data_streams[stream_key]) == 0 or \
|
||||
self.data_streams[stream_key][-1]['timestamp'] != latest_bar['timestamp']:
|
||||
self.data_streams[stream_key].append(latest_bar)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error collecting OHLCV data: {e}")
|
||||
|
||||
def _populate_historical_data(self, df, stream_key, symbol, timeframe):
|
||||
"""Populate stream with historical data from DataFrame"""
|
||||
try:
|
||||
# Clear the stream first (it should only have 1 latest entry)
|
||||
self.data_streams[stream_key].clear()
|
||||
|
||||
# Add all historical data
|
||||
for _, row in df.iterrows():
|
||||
bar_data = {
|
||||
'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name),
|
||||
'symbol': symbol,
|
||||
'timeframe': timeframe,
|
||||
'open': float(row['open']),
|
||||
'high': float(row['high']),
|
||||
'low': float(row['low']),
|
||||
'close': float(row['close']),
|
||||
'volume': float(row['volume'])
|
||||
}
|
||||
self.data_streams[stream_key].append(bar_data)
|
||||
|
||||
logger.info(f"✅ Loaded {len(df)} historical candles for {stream_key} ({symbol} {timeframe})")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error populating historical data for {stream_key}: {e}")
|
||||
|
||||
def _collect_tick_data(self, timestamp: datetime):
|
||||
"""Collect real-time tick data"""
|
||||
try:
|
||||
|
Reference in New Issue
Block a user