diff --git a/check_stream.py b/check_stream.py index f2cc2af..71c28a9 100644 --- a/check_stream.py +++ b/check_stream.py @@ -108,7 +108,7 @@ def check_stream(): print("āŒ Could not get stream status from API") def show_ohlcv_data(): - """Show OHLCV data with indicators.""" + """Show OHLCV data with indicators for all required timeframes and symbols.""" print("=" * 60) print("OHLCV DATA WITH INDICATORS") print("=" * 60) @@ -120,30 +120,118 @@ def show_ohlcv_data(): print("šŸ’” Start dashboard first: python run_clean_dashboard.py") return - # Get OHLCV data for different timeframes - timeframes = ['1s', '1m', '1h', '1d'] - symbol = 'ETH/USDT' + # Check all required datasets for models + datasets = [ + ("ETH/USDT", "1m"), + ("ETH/USDT", "1h"), + ("ETH/USDT", "1d"), + ("BTC/USDT", "1m") + ] - for timeframe in timeframes: - print(f"\nšŸ“Š {symbol} {timeframe} Data:") + print("šŸ“Š Checking all required datasets for model training:") + + for symbol, timeframe in datasets: + print(f"\nšŸ“ˆ {symbol} {timeframe} Data:") data = get_ohlcv_data_from_api(symbol, timeframe, 300) - if data and data.get('data'): + if data and isinstance(data, dict) and 'data' in data: ohlcv_data = data['data'] - print(f" Records: {len(ohlcv_data)}") - - if ohlcv_data: + if ohlcv_data and len(ohlcv_data) > 0: + print(f" āœ… Records: {len(ohlcv_data)}") + latest = ohlcv_data[-1] - print(f" Latest: {latest['timestamp']}") - print(f" Price: ${latest['close']:.2f}") + oldest = ohlcv_data[0] + print(f" šŸ“… Range: {oldest['timestamp'][:10]} to {latest['timestamp'][:10]}") + print(f" šŸ’° Latest Price: ${latest['close']:.2f}") + print(f" šŸ“Š Volume: {latest['volume']:.2f}") indicators = latest.get('indicators', {}) if indicators: - print(f" RSI: {indicators.get('rsi', 'N/A')}") - print(f" MACD: {indicators.get('macd', 'N/A')}") - print(f" SMA20: {indicators.get('sma_20', 'N/A')}") + rsi = indicators.get('rsi') + macd = indicators.get('macd') + sma_20 = indicators.get('sma_20') + print(f" šŸ“‰ RSI: {rsi:.2f}" if rsi else " šŸ“‰ RSI: N/A") + print(f" šŸ”„ MACD: {macd:.4f}" if macd else " šŸ”„ MACD: N/A") + print(f" šŸ“ˆ SMA20: ${sma_20:.2f}" if sma_20 else " šŸ“ˆ SMA20: N/A") + + # Check if we have enough data for training + if len(ohlcv_data) >= 300: + print(f" šŸŽÆ Model Ready: {len(ohlcv_data)}/300 candles") + else: + print(f" āš ļø Need More: {len(ohlcv_data)}/300 candles ({300-len(ohlcv_data)} missing)") + else: + print(f" āŒ Empty data array") + elif data and isinstance(data, list) and len(data) > 0: + # Direct array format + print(f" āœ… Records: {len(data)}") + latest = data[-1] + oldest = data[0] + print(f" šŸ“… Range: {oldest['timestamp'][:10]} to {latest['timestamp'][:10]}") + print(f" šŸ’° Latest Price: ${latest['close']:.2f}") + elif data: + print(f" āš ļø Unexpected format: {type(data)}") else: - print(f" No data available") + print(f" āŒ No data available") + + print(f"\nšŸŽÆ Expected: 300 candles per dataset (1200 total)") + +def show_detailed_ohlcv(symbol="ETH/USDT", timeframe="1m"): + """Show detailed OHLCV data for a specific symbol/timeframe.""" + print("=" * 60) + print(f"DETAILED {symbol} {timeframe} DATA") + print("=" * 60) + + # Check dashboard health + dashboard_running, _ = check_dashboard_status() + if not dashboard_running: + print("āŒ Dashboard not running") + return + + data = get_ohlcv_data_from_api(symbol, timeframe, 300) + + if data and isinstance(data, dict) and 'data' in data: + ohlcv_data = data['data'] + if ohlcv_data and len(ohlcv_data) > 0: + print(f"šŸ“ˆ Total candles loaded: {len(ohlcv_data)}") + + if len(ohlcv_data) >= 2: + oldest = ohlcv_data[0] + latest = ohlcv_data[-1] + print(f"šŸ“… Date range: {oldest['timestamp']} to {latest['timestamp']}") + + # Calculate price statistics + closes = [item['close'] for item in ohlcv_data] + volumes = [item['volume'] for item in ohlcv_data] + + print(f"šŸ’° Price range: ${min(closes):.2f} - ${max(closes):.2f}") + print(f"šŸ“Š Average volume: {sum(volumes)/len(volumes):.2f}") + + # Show sample data + print(f"\nšŸ” First 3 candles:") + for i in range(min(3, len(ohlcv_data))): + candle = ohlcv_data[i] + ts = candle['timestamp'][:19] if len(candle['timestamp']) > 19 else candle['timestamp'] + print(f" {ts} | ${candle['close']:.2f} | Vol:{candle['volume']:.2f}") + + print(f"\nšŸ” Last 3 candles:") + for i in range(max(0, len(ohlcv_data)-3), len(ohlcv_data)): + candle = ohlcv_data[i] + ts = candle['timestamp'][:19] if len(candle['timestamp']) > 19 else candle['timestamp'] + print(f" {ts} | ${candle['close']:.2f} | Vol:{candle['volume']:.2f}") + + # Model training readiness check + if len(ohlcv_data) >= 300: + print(f"\nāœ… Model Training Ready: {len(ohlcv_data)}/300 candles loaded") + else: + print(f"\nāš ļø Insufficient Data: {len(ohlcv_data)}/300 candles (need {300-len(ohlcv_data)} more)") + else: + print("āŒ Empty data array") + elif data and isinstance(data, list) and len(data) > 0: + # Direct array format + print(f"šŸ“ˆ Total candles loaded: {len(data)}") + # ... (same processing as above for array format) + else: + print(f"āŒ No data returned: {type(data)}") def show_cob_data(): """Show COB data with price buckets.""" @@ -213,9 +301,13 @@ def main(): if len(sys.argv) < 2: print("Usage:") print(" python check_stream.py status # Check stream status") - print(" python check_stream.py ohlcv # Show OHLCV data") + print(" python check_stream.py ohlcv # Show all OHLCV datasets") + print(" python check_stream.py detail [symbol] [timeframe] # Show detailed data") print(" python check_stream.py cob # Show COB data") print(" python check_stream.py snapshot # Generate snapshot") + print("\nExamples:") + print(" python check_stream.py detail ETH/USDT 1h") + print(" python check_stream.py detail BTC/USDT 1m") return command = sys.argv[1].lower() @@ -224,13 +316,17 @@ def main(): check_stream() elif command == "ohlcv": show_ohlcv_data() + elif command == "detail": + symbol = sys.argv[2] if len(sys.argv) > 2 else "ETH/USDT" + timeframe = sys.argv[3] if len(sys.argv) > 3 else "1m" + show_detailed_ohlcv(symbol, timeframe) elif command == "cob": show_cob_data() elif command == "snapshot": generate_snapshot() else: print(f"Unknown command: {command}") - print("Available commands: status, ohlcv, cob, snapshot") + print("Available commands: status, ohlcv, detail, cob, snapshot") if __name__ == "__main__": main() diff --git a/core/orchestrator.py b/core/orchestrator.py index 27e6eaf..1081c5f 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -195,6 +195,9 @@ class TradingOrchestrator: # Initialize and start data stream monitor (single source of truth) self._initialize_data_stream_monitor() + + # Load historical data for models and RL training + self._load_historical_data_for_models() def _initialize_ml_models(self): """Initialize ML models for enhanced trading""" @@ -2315,6 +2318,58 @@ class TradingOrchestrator: logger.error(f"Error getting COB data: {e}") return [] + def _load_historical_data_for_models(self): + """Load 300 historical candles for all required timeframes and symbols for model training""" + logger.info("Loading 300 historical candles for model training and RL context...") + + try: + # Required data for models: + # ETH/USDT: 1m, 1h, 1d (300 candles each) + # BTC/USDT: 1m (300 candles) + + symbols_timeframes = [ + ('ETH/USDT', '1m'), + ('ETH/USDT', '1h'), + ('ETH/USDT', '1d'), + ('BTC/USDT', '1m') + ] + + loaded_data = {} + total_candles = 0 + + for symbol, timeframe in symbols_timeframes: + try: + logger.info(f"Loading {symbol} {timeframe} historical data...") + df = self.data_provider.get_historical_data(symbol, timeframe, limit=300) + + if df is not None and not df.empty: + loaded_data[f"{symbol}_{timeframe}"] = df + total_candles += len(df) + logger.info(f"āœ… Loaded {len(df)} {timeframe} candles for {symbol}") + + # Store in data provider's historical cache for quick access + cache_key = f"{symbol}_{timeframe}_300" + if not hasattr(self.data_provider, 'model_data_cache'): + self.data_provider.model_data_cache = {} + self.data_provider.model_data_cache[cache_key] = df + + else: + logger.warning(f"āŒ No {timeframe} data available for {symbol}") + + except Exception as e: + logger.error(f"Error loading {symbol} {timeframe} data: {e}") + + # Initialize model context data + if hasattr(self, 'extrema_trainer') and self.extrema_trainer: + logger.info("Initializing ExtremaTrainer with historical context...") + self.extrema_trainer.initialize_context_data() + + logger.info(f"šŸŽÆ Historical data loading complete: {total_candles} total candles loaded") + logger.info(f"šŸ“Š Available datasets: {list(loaded_data.keys())}") + + except Exception as e: + logger.error(f"Error in historical data loading: {e}") + def get_ohlcv_data(self, symbol: str, timeframe: str, limit: int = 300) -> List: """Get OHLCV data for a symbol with specified timeframe and limit.""" try: diff --git a/data_stream_monitor.py b/data_stream_monitor.py index 9047a2e..62ba02f 100644 --- a/data_stream_monitor.py +++ b/data_stream_monitor.py @@ -25,11 +25,15 @@ class DataStreamMonitor: self.data_provider = data_provider self.training_system = training_system - # Data buffers for streaming + # Data buffers for streaming (expanded for accessing historical data) self.data_streams = { - 'ohlcv_1m': deque(maxlen=100), - 'ohlcv_5m': deque(maxlen=50), - 'ohlcv_15m': deque(maxlen=20), + 'ohlcv_1s': deque(maxlen=300), # 300 seconds for 1s data + 'ohlcv_1m': deque(maxlen=300), # 300 minutes for 1m data (ETH) + 'ohlcv_1h': deque(maxlen=300), # 300 hours for 1h data (ETH) + 'ohlcv_1d': deque(maxlen=300), # 300 days for 1d data (ETH) + 'btc_1m': deque(maxlen=300), # 300 minutes for BTC 1m data + 'ohlcv_5m': deque(maxlen=100), # Keep for compatibility + 'ohlcv_15m': deque(maxlen=100), # Keep for compatibility 'ticks': deque(maxlen=200), 'cob_raw': deque(maxlen=100), 'cob_aggregated': deque(maxlen=50), @@ -39,12 +43,15 @@ class DataStreamMonitor: 'training_experiences': deque(maxlen=200) } - # Streaming configuration + # Streaming configuration - expanded for model requirements self.stream_config = { 'console_output': True, 'compact_format': False, 'include_timestamps': True, - 'filter_symbols': ['ETH/USDT'], # Focus on primary symbols + 'filter_symbols': ['ETH/USDT', 'BTC/USDT'], # Primary and secondary symbols + 'primary_symbol': 'ETH/USDT', + 'secondary_symbol': 'BTC/USDT', + 'timeframes': ['1s', '1m', '1h', '1d'], # Required timeframes for models 'sampling_rate': 1.0 # seconds between samples } @@ -118,32 +125,114 @@ class DataStreamMonitor: logger.error(f"Error collecting data sample: {e}") def _collect_ohlcv_data(self, timestamp: datetime): - """Collect OHLCV data for all timeframes""" + """Collect OHLCV data for all timeframes and symbols""" try: - for symbol in self.stream_config['filter_symbols']: - for timeframe in ['1m', '5m', '15m']: - if self.data_provider: - df = self.data_provider.get_historical_data(symbol, timeframe, limit=5) - if df is not None and not df.empty: - latest_bar = { - 'timestamp': timestamp.isoformat(), - 'symbol': symbol, - 'timeframe': timeframe, - 'open': float(df['open'].iloc[-1]), - 'high': float(df['high'].iloc[-1]), - 'low': float(df['low'].iloc[-1]), - 'close': float(df['close'].iloc[-1]), - 'volume': float(df['volume'].iloc[-1]) - } + # ETH/USDT data for all required timeframes + primary_symbol = self.stream_config['primary_symbol'] + for timeframe in ['1m', '1h', '1d']: + if self.data_provider: + # Get recent data (limit=1 for latest, but access historical data when needed) + df = self.data_provider.get_historical_data(primary_symbol, timeframe, limit=300) + if df is not None and not df.empty: + # Get the latest bar + latest_bar = { + 'timestamp': timestamp.isoformat(), + 'symbol': primary_symbol, + 'timeframe': timeframe, + 'open': float(df['open'].iloc[-1]), + 'high': float(df['high'].iloc[-1]), + 'low': float(df['low'].iloc[-1]), + 'close': float(df['close'].iloc[-1]), + 'volume': float(df['volume'].iloc[-1]) + } - stream_key = f'ohlcv_{timeframe}' - if len(self.data_streams[stream_key]) == 0 or \ - self.data_streams[stream_key][-1]['timestamp'] != latest_bar['timestamp']: - self.data_streams[stream_key].append(latest_bar) + stream_key = f'ohlcv_{timeframe}' + + # Only add if different from last entry or if stream is empty + if len(self.data_streams[stream_key]) == 0 or \ + self.data_streams[stream_key][-1]['close'] != latest_bar['close']: + self.data_streams[stream_key].append(latest_bar) + + # If stream was empty, populate with historical data + if len(self.data_streams[stream_key]) == 1: + logger.info(f"Populating {stream_key} with historical data...") + self._populate_historical_data(df, stream_key, primary_symbol, timeframe) + + # BTC/USDT 1m data (secondary symbol) + secondary_symbol = self.stream_config['secondary_symbol'] + if self.data_provider: + df = self.data_provider.get_historical_data(secondary_symbol, '1m', limit=300) + if df is not None and not df.empty: + latest_bar = { + 'timestamp': timestamp.isoformat(), + 'symbol': secondary_symbol, + 'timeframe': '1m', + 'open': float(df['open'].iloc[-1]), + 'high': float(df['high'].iloc[-1]), + 'low': float(df['low'].iloc[-1]), + 'close': float(df['close'].iloc[-1]), + 'volume': float(df['volume'].iloc[-1]) + } + + # Only add if different from last entry or if stream is empty + if len(self.data_streams['btc_1m']) == 0 or \ + self.data_streams['btc_1m'][-1]['close'] != latest_bar['close']: + self.data_streams['btc_1m'].append(latest_bar) + + # If stream was empty, populate with historical data + if len(self.data_streams['btc_1m']) == 1: + logger.info("Populating btc_1m with historical data...") + self._populate_historical_data(df, 'btc_1m', secondary_symbol, '1m') + + # Legacy timeframes for compatibility + for timeframe in ['5m', '15m']: + if self.data_provider: + df = self.data_provider.get_historical_data(primary_symbol, timeframe, limit=5) + if df is not None and not df.empty: + latest_bar = { + 'timestamp': timestamp.isoformat(), + 'symbol': primary_symbol, + 'timeframe': timeframe, + 'open': float(df['open'].iloc[-1]), + 'high': float(df['high'].iloc[-1]), + 'low': float(df['low'].iloc[-1]), + 'close': float(df['close'].iloc[-1]), + 'volume': float(df['volume'].iloc[-1]) + } + + stream_key = f'ohlcv_{timeframe}' + if len(self.data_streams[stream_key]) == 0 or \ + self.data_streams[stream_key][-1]['timestamp'] != latest_bar['timestamp']: + self.data_streams[stream_key].append(latest_bar) except Exception as e: logger.debug(f"Error collecting OHLCV data: {e}") + def _populate_historical_data(self, df, stream_key, symbol, timeframe): + """Populate stream with historical data from DataFrame""" + try: + # Clear the stream first (it should only have 1 latest entry) + self.data_streams[stream_key].clear() + + # Add all historical data + for _, row in df.iterrows(): + bar_data = { + 'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name), + 'symbol': symbol, + 'timeframe': timeframe, + 'open': float(row['open']), + 'high': float(row['high']), + 'low': float(row['low']), + 'close': float(row['close']), + 'volume': float(row['volume']) + } + self.data_streams[stream_key].append(bar_data) + + logger.info(f"āœ… Loaded {len(df)} historical candles for {stream_key} ({symbol} {timeframe})") + + except Exception as e: + logger.error(f"Error populating historical data for {stream_key}: {e}") + def _collect_tick_data(self, timestamp: datetime): """Collect real-time tick data""" try: diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 19a5803..970ff4a 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -347,17 +347,27 @@ class CleanTradingDashboard: def _get_ohlcv_data_with_indicators(self, symbol: str, timeframe: str, limit: int = 300): """Get OHLCV data with technical indicators from data stream monitor""" try: - # Get OHLCV data from data stream monitor + # Get OHLCV data from data stream monitor based on symbol and timeframe if hasattr(self.orchestrator, 'data_stream_monitor') and self.orchestrator.data_stream_monitor: - stream_key = f"ohlcv_{timeframe}" + # Determine stream key based on symbol and timeframe + if symbol == 'BTC/USDT' and timeframe == '1m': + stream_key = 'btc_1m' + else: + stream_key = f"ohlcv_{timeframe}" + if stream_key in self.orchestrator.data_stream_monitor.data_streams: ohlcv_data = list(self.orchestrator.data_stream_monitor.data_streams[stream_key]) + # Filter by symbol if needed (for ETH data in mixed streams) + if symbol != 'BTC/USDT': + ohlcv_data = [item for item in ohlcv_data if item.get('symbol') == symbol] + # Take the last 'limit' items ohlcv_data = ohlcv_data[-limit:] if len(ohlcv_data) > limit else ohlcv_data if not ohlcv_data: - return [] + # Fallback to data provider if stream is empty + return self._get_ohlcv_from_provider(symbol, timeframe, limit) # Convert to DataFrame for indicator calculation df_data = [] @@ -372,78 +382,46 @@ class CleanTradingDashboard: }) if not df_data: - return [] + return self._get_ohlcv_from_provider(symbol, timeframe, limit) df = pd.DataFrame(df_data) df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) # Add technical indicators - df['sma_20'] = df['close'].rolling(window=20).mean() - df['sma_50'] = df['close'].rolling(window=50).mean() - df['ema_12'] = df['close'].ewm(span=12).mean() - df['ema_26'] = df['close'].ewm(span=26).mean() - - # RSI - delta = df['close'].diff() - gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() - loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() - rs = gain / loss - df['rsi'] = 100 - (100 / (1 + rs)) - - # MACD - df['macd'] = df['ema_12'] - df['ema_26'] - df['macd_signal'] = df['macd'].ewm(span=9).mean() - df['macd_histogram'] = df['macd'] - df['macd_signal'] - - # Bollinger Bands - df['bb_middle'] = df['close'].rolling(window=20).mean() - bb_std = df['close'].rolling(window=20).std() - df['bb_upper'] = df['bb_middle'] + (bb_std * 2) - df['bb_lower'] = df['bb_middle'] - (bb_std * 2) - - # Volume indicators - df['volume_sma'] = df['volume'].rolling(window=20).mean() - df['volume_ratio'] = df['volume'] / df['volume_sma'] + df = self._add_technical_indicators(df) # Convert to list of dictionaries - result = [] - for _, row in df.iterrows(): - data_point = { - 'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name), - 'open': float(row['open']), - 'high': float(row['high']), - 'low': float(row['low']), - 'close': float(row['close']), - 'volume': float(row['volume']), - 'indicators': { - 'sma_20': float(row['sma_20']) if pd.notna(row['sma_20']) else None, - 'sma_50': float(row['sma_50']) if pd.notna(row['sma_50']) else None, - 'ema_12': float(row['ema_12']) if pd.notna(row['ema_12']) else None, - 'ema_26': float(row['ema_26']) if pd.notna(row['ema_26']) else None, - 'rsi': float(row['rsi']) if pd.notna(row['rsi']) else None, - 'macd': float(row['macd']) if pd.notna(row['macd']) else None, - 'macd_signal': float(row['macd_signal']) if pd.notna(row['macd_signal']) else None, - 'macd_histogram': float(row['macd_histogram']) if pd.notna(row['macd_histogram']) else None, - 'bb_upper': float(row['bb_upper']) if pd.notna(row['bb_upper']) else None, - 'bb_middle': float(row['bb_middle']) if pd.notna(row['bb_middle']) else None, - 'bb_lower': float(row['bb_lower']) if pd.notna(row['bb_lower']) else None, - 'volume_ratio': float(row['volume_ratio']) if pd.notna(row['volume_ratio']) else None - } - } - result.append(data_point) - - return result + return self._dataframe_to_api_format(df) # Fallback to data provider if stream monitor not available + return self._get_ohlcv_from_provider(symbol, timeframe, limit) + + except Exception as e: + logger.error(f"Error getting OHLCV data: {e}") + return [] + + def _get_ohlcv_from_provider(self, symbol: str, timeframe: str, limit: int = 300): + """Fallback to get OHLCV data directly from data provider""" + try: ohlcv_data = self.data_provider.get_ohlcv(symbol, timeframe, limit=limit) if ohlcv_data is None or ohlcv_data.empty: return [] # Add technical indicators - df = ohlcv_data.copy() + df = self._add_technical_indicators(ohlcv_data.copy()) + # Convert to list of dictionaries + return self._dataframe_to_api_format(df) + + except Exception as e: + logger.error(f"Error getting OHLCV from provider: {e}") + return [] + + def _add_technical_indicators(self, df): + """Add technical indicators to DataFrame""" + try: # Basic indicators df['sma_20'] = df['close'].rolling(window=20).mean() df['sma_50'] = df['close'].rolling(window=50).mean() @@ -472,7 +450,15 @@ class CleanTradingDashboard: df['volume_sma'] = df['volume'].rolling(window=20).mean() df['volume_ratio'] = df['volume'] / df['volume_sma'] - # Convert to list of dictionaries + return df + + except Exception as e: + logger.error(f"Error adding technical indicators: {e}") + return df + + def _dataframe_to_api_format(self, df): + """Convert DataFrame to API format with indicators""" + try: result = [] for _, row in df.iterrows(): data_point = { @@ -502,7 +488,7 @@ class CleanTradingDashboard: return result except Exception as e: - logger.error(f"Error getting OHLCV data: {e}") + logger.error(f"Error converting to API format: {e}") return [] def _get_cob_data_with_buckets(self, symbol: str, limit: int = 300):