""" Data Interface for Neural Network Trading System This module provides functionality to fetch, process, and prepare data for the neural network models. """ import os import logging import numpy as np import pandas as pd from datetime import datetime, timedelta import json import pickle from sklearn.preprocessing import MinMaxScaler import sys import ta # Add project root to sys.path project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if project_root not in sys.path: sys.path.append(project_root) # Import BinanceHistoricalData from the root module from realtime import BinanceHistoricalData logger = logging.getLogger(__name__) class DataInterface: """ Enhanced Data Interface supporting: - Multiple trading pairs (up to 3) - Multiple timeframes per pair (1s, 1m, 1h, 1d + custom) - Technical indicators (up to 20) - Cross-timeframe normalization - Real-time tick streaming """ def __init__(self, symbol=None, timeframes=None, data_dir="NN/data"): """ Initialize the data interface. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d']) data_dir (str): Directory to store/load datasets """ self.symbol = symbol self.timeframes = timeframes or ['1h', '4h', '1d'] self.data_dir = data_dir self.scalers = {} # Store scalers for each timeframe # Initialize the historical data fetcher self.historical_data = BinanceHistoricalData() # Create data directory if it doesn't exist os.makedirs(self.data_dir, exist_ok=True) # Initialize empty dataframes for each timeframe self.dataframes = {tf: None for tf in self.timeframes} logger.info(f"DataInterface initialized for {symbol} with timeframes {timeframes}") def get_historical_data(self, timeframe='1h', n_candles=1000, use_cache=True): """ Fetch historical price data for a given timeframe. Args: timeframe (str): Timeframe to fetch data for n_candles (int): Number of candles to fetch use_cache (bool): Whether to use cached data if available Returns: pd.DataFrame: DataFrame with OHLCV data """ # Map timeframe string to seconds for BinanceHistoricalData timeframe_to_seconds = { '1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } interval_seconds = timeframe_to_seconds.get(timeframe, 3600) # Default to 1h if not found try: # Fetch data using BinanceHistoricalData df = self.historical_data.get_historical_candles( symbol=self.symbol, interval_seconds=interval_seconds, limit=n_candles ) if not df.empty: logger.info(f"Using data for {self.symbol} {timeframe} ({len(df)} candles)") self.dataframes[timeframe] = df return df else: logger.error(f"No data available for {self.symbol} {timeframe}") return None except Exception as e: logger.error(f"Error fetching data for {self.symbol} {timeframe}: {str(e)}") return None def prepare_nn_input(self, timeframes=None, n_candles=500, window_size=20): """ Prepare input data for neural network models. Args: timeframes (list): List of timeframes to use n_candles (int): Number of candles to fetch for each timeframe window_size (int): Size of the sliding window for feature creation Returns: tuple: (X, y, timestamps) where: X is the input features array with shape (n_samples, window_size, n_features) y is the target array with shape (n_samples,) timestamps is an array of timestamps for each sample """ if timeframes is None: timeframes = self.timeframes # Get data for all requested timeframes dfs = {} min_length = float('inf') for tf in timeframes: # For 1s timeframe, we need more data points tf_candles = n_candles * 60 if tf == '1s' else n_candles df = self.get_historical_data(timeframe=tf, n_candles=tf_candles) if df is not None and not df.empty: dfs[tf] = df # Keep track of minimum length across all timeframes min_length = min(min_length, len(df)) if not dfs: logger.error("No data available for feature creation") return None, None, None # Align all dataframes to the same length for tf in dfs: dfs[tf] = dfs[tf].tail(min_length) # Create features for each timeframe features = [] targets = None timestamps = None for tf in timeframes: if tf in dfs: X, y, ts = self._create_features(dfs[tf], window_size) if X is not None and y is not None: features.append(X) if targets is None: # Only need targets from one timeframe targets = y timestamps = ts if not features or targets is None: logger.error("Failed to create features for any timeframe") return None, None, None # Ensure all feature arrays have the same length min_samples = min(f.shape[0] for f in features) features = [f[-min_samples:] for f in features] targets = targets[-min_samples:] timestamps = timestamps[-min_samples:] # Stack features from all timeframes X = np.concatenate([f.reshape(min_samples, window_size, -1) for f in features], axis=2) # Validate data if np.any(np.isnan(X)) or np.any(np.isinf(X)): logger.error("Generated features contain NaN or infinite values") return None, None, None logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {targets.shape}") return X, targets, timestamps def _create_features(self, df, window_size): """ Create features from OHLCV data using a sliding window. Args: df (pd.DataFrame): DataFrame with OHLCV data window_size (int): Size of the sliding window Returns: tuple: (X, y, timestamps) where: X is the feature array y is the target array timestamps is the array of timestamps """ if len(df) < window_size + 1: logger.error(f"Not enough data for feature creation (need {window_size + 1}, got {len(df)})") return None, None, None # Extract OHLCV data data = df[['open', 'high', 'low', 'close', 'volume']].values timestamps = df['timestamp'].values # Create sliding windows X = np.array([data[i:i+window_size] for i in range(len(data)-window_size)]) # Create targets (next candle's movement: 0=down, 1=neutral, 2=up) next_close = data[window_size:, 3] # Close prices curr_close = data[window_size-1:-1, 3] price_changes = (next_close - curr_close) / curr_close # Define thresholds for price movement classification threshold = 0.0005 # 0.05% threshold - smaller to encourage more signals y = np.zeros(len(price_changes), dtype=int) y[price_changes > threshold] = 2 # Up y[price_changes < -threshold] = 0 # Down y[(price_changes >= -threshold) & (price_changes <= threshold)] = 1 # Neutral # Log the target distribution to understand our data better sell_count = np.sum(y == 0) hold_count = np.sum(y == 1) buy_count = np.sum(y == 2) total_count = len(y) logger.info(f"Target distribution for {self.symbol} {self.timeframes[0]}: SELL: {sell_count} ({sell_count/total_count:.2%}), " + f"HOLD: {hold_count} ({hold_count/total_count:.2%}), BUY: {buy_count} ({buy_count/total_count:.2%})") logger.info(f"Created features - X shape: {X.shape}, y shape: {y.shape}") return X, y, timestamps[window_size:] def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20): """ Generate and save a training dataset for neural network models. Args: timeframes (list): List of timeframes to use n_candles (int): Number of candles to fetch for each timeframe window_size (int): Size of the sliding window for feature creation Returns: dict: Dictionary of dataset file paths """ if timeframes is None: timeframes = self.timeframes # Prepare inputs X, y, timestamps = self.prepare_nn_input(timeframes, n_candles, window_size) if X is None or y is None: logger.error("Failed to prepare input data for dataset") return None # Prepare output paths timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") dataset_name = f"{self.symbol.replace('/', '_')}_{'_'.join(timeframes)}_{timestamp_str}" X_path = os.path.join(self.data_dir, f"{dataset_name}_X.npy") y_path = os.path.join(self.data_dir, f"{dataset_name}_y.npy") timestamps_path = os.path.join(self.data_dir, f"{dataset_name}_timestamps.npy") metadata_path = os.path.join(self.data_dir, f"{dataset_name}_metadata.json") # Save arrays np.save(X_path, X) np.save(y_path, y) np.save(timestamps_path, timestamps) # Save metadata metadata = { 'symbol': self.symbol, 'timeframes': timeframes, 'window_size': window_size, 'n_samples': len(X), 'feature_shape': X.shape[1:], 'created_at': datetime.now().isoformat(), 'dataset_name': dataset_name } with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) # Save scalers scaler_path = os.path.join(self.data_dir, f"{dataset_name}_scalers.pkl") with open(scaler_path, 'wb') as f: pickle.dump(self.scalers, f) # Return dataset info dataset_info = { 'X_path': X_path, 'y_path': y_path, 'timestamps_path': timestamps_path, 'metadata_path': metadata_path, 'scaler_path': scaler_path } logger.info(f"Dataset generated and saved: {dataset_name}") return dataset_info def get_feature_count(self): """ Calculate total number of features across all timeframes. Returns: int: Total number of features (5 features per timeframe) """ return len(self.timeframes) * 5 # OHLCV features for each timeframe def get_features(self, timeframe, n_candles=1000): """ Get feature data with technical indicators for a specific timeframe. Args: timeframe (str): Timeframe to get features for ('1m', '5m', etc.) n_candles (int): Number of candles to get Returns: np.ndarray: Array of feature data including technical indicators and the close price as the last column """ # Get historical data df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles) if df is None or df.empty: logger.error(f"No data available for {self.symbol} {timeframe}") return None # Add technical indicators df = self.add_technical_indicators(df) # Drop NaN values that might have been introduced by indicators df = df.dropna() # Extract features (all columns except timestamp) features = df.drop('timestamp', axis=1).values logger.info(f"Prepared {len(features)} {timeframe} feature rows with {features.shape[1]} features") return features def add_technical_indicators(self, df): """ Add technical indicators to the dataframe. Args: df (pd.DataFrame): DataFrame with OHLCV data Returns: pd.DataFrame: DataFrame with added technical indicators """ # Make a copy to avoid modifying the original df_copy = df.copy() # Basic price indicators df_copy['returns'] = df_copy['close'].pct_change() df_copy['log_returns'] = np.log(df_copy['close']/df_copy['close'].shift(1)) # Moving Averages df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7) df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25) df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99) # MACD macd = ta.trend.MACD(df_copy['close']) df_copy['macd'] = macd.macd() df_copy['macd_signal'] = macd.macd_signal() df_copy['macd_diff'] = macd.macd_diff() # RSI df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14) # Bollinger Bands bollinger = ta.volatility.BollingerBands(df_copy['close']) df_copy['bb_high'] = bollinger.bollinger_hband() df_copy['bb_low'] = bollinger.bollinger_lband() df_copy['bb_pct'] = bollinger.bollinger_pband() return df_copy def calculate_pnl(self, predictions, actual_prices, position_size=1.0): """ Robust PnL calculator that handles: - Action predictions (0=SELL, 1=HOLD, 2=BUY) - Probability predictions (array of [sell_prob, hold_prob, buy_prob]) - Single price array or OHLC data Args: predictions: Array of predicted actions or probabilities actual_prices: Array of actual prices (can be 1D or 2D OHLC format) position_size: Position size multiplier Returns: tuple: (total_pnl, win_rate, trades) """ # Convert inputs to numpy arrays if they aren't already try: predictions = np.array(predictions) actual_prices = np.array(actual_prices) except Exception as e: logger.error(f"Error converting inputs: {str(e)}") return 0.0, 0.0, [] # Validate input shapes if len(predictions.shape) > 2 or len(actual_prices.shape) > 2: logger.error("Invalid input dimensions") return 0.0, 0.0, [] # Convert OHLC data to close prices if needed if len(actual_prices.shape) == 2 and actual_prices.shape[1] >= 4: prices = actual_prices[:, 3] # Use close prices else: prices = actual_prices # Handle case where prices is 2D with single column if len(prices.shape) == 2 and prices.shape[1] == 1: prices = prices.flatten() # Convert probabilities to actions if needed if len(predictions.shape) == 2 and predictions.shape[1] > 1: actions = np.argmax(predictions, axis=1) else: actions = predictions # Ensure we have enough prices if len(prices) < 2: logger.error("Not enough price data") return 0.0, 0.0, [] # Trim to matching length min_length = min(len(actions), len(prices)-1) actions = actions[:min_length] prices = prices[:min_length+1] pnl = 0.0 wins = 0 trades = [] for i in range(min_length): current_price = prices[i] next_price = prices[i+1] action = actions[i] # Skip HOLD actions if action == 1: continue price_change = (next_price - current_price) / current_price if action == 2: # BUY trade_pnl = price_change * position_size trade_type = 'BUY' is_win = price_change > 0 elif action == 0: # SELL trade_pnl = -price_change * position_size trade_type = 'SELL' is_win = price_change < 0 else: continue # Invalid action pnl += trade_pnl wins += int(is_win) # Track trade details trades.append({ 'type': trade_type, 'entry': current_price, 'exit': next_price, 'pnl': trade_pnl, 'win': is_win, 'duration': 1 # In number of candles }) win_rate = wins / len(trades) if trades else 0.0 # Add timestamps to trades if available if hasattr(self, 'dataframes') and self.timeframes and self.timeframes[0] in self.dataframes: df = self.dataframes[self.timeframes[0]] if df is not None and 'timestamp' in df.columns: for i, trade in enumerate(trades[:len(df)]): trade['timestamp'] = df['timestamp'].iloc[i] return pnl, win_rate, trades def get_future_prices(self, prices, n_candles=3): """ Extract future prices for retrospective training. Args: prices (np.ndarray): Array of prices n_candles (int): Number of future candles to look at Returns: np.ndarray: Future prices array (1D array) """ if prices is None or len(prices) < n_candles + 1: return None # Convert to numpy array if it's not already prices_np = np.array(prices).flatten() if not isinstance(prices, np.ndarray) else prices.flatten() # For each price point, get the maximum price in the next n_candles future_prices = np.zeros(len(prices_np)) for i in range(len(prices_np) - n_candles): # Get the next n candles next_candles = prices_np[i+1:i+n_candles+1] # Use the maximum price as the future price future_prices[i] = np.max(next_candles) # For the last n_candles points, use the last available price future_prices[-n_candles:] = prices_np[-1] return future_prices.flatten() # Ensure it's a 1D array def prepare_training_data(self, refresh=False, refresh_interval=300): """ Prepare data for training, including splitting into train/validation sets. Args: refresh (bool): Whether to refresh the data cache refresh_interval (int): Interval in seconds to refresh data Returns: tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices) """ current_time = datetime.now() # Check if we should refresh the data if refresh or not hasattr(self, 'last_refresh_time') or \ (current_time - self.last_refresh_time).total_seconds() > refresh_interval: logger.info("Refreshing training data...") self.last_refresh_time = current_time else: # Use cached data if hasattr(self, 'cached_train_data'): return self.cached_train_data # Prepare input data X, y, _ = self.prepare_nn_input() if X is None: return None, None, None, None, None, None # Get price data for PnL calculation raw_prices = [] for tf in self.timeframes: if tf in self.dataframes and self.dataframes[tf] is not None: # Get the close prices for the same period as X prices = self.dataframes[tf]['close'].values[-len(X):] if len(prices) == len(X): raw_prices = prices break if len(raw_prices) != len(X): raw_prices = np.zeros(len(X)) # Fallback if no prices available # Split data into training and validation sets (80/20) split_idx = int(len(X) * 0.8) X_train, X_val = X[:split_idx], X[split_idx:] y_train, y_val = y[:split_idx], y[split_idx:] train_prices, val_prices = raw_prices[:split_idx], raw_prices[split_idx:] # Cache the data self.cached_train_data = (X_train, y_train, X_val, y_val, train_prices, val_prices) return X_train, y_train, X_val, y_val, train_prices, val_prices def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20): """ Prepare a single input sample from the most recent data for real-time inference. Args: timeframe (str): Timeframe to use n_candles (int): Number of recent candles to fetch window_size (int): Size of the sliding window Returns: tuple: (X, timestamp) where: X is the input features array with shape (1, window_size, n_features) timestamp is the timestamp of the most recent candle """ # Get recent data df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=False) if df is None or len(df) < window_size: logger.error(f"Not enough data for inference (need at least {window_size} candles)") return None, None # Extract features from the most recent window ohlcv = df[['open', 'high', 'low', 'close', 'volume']].tail(window_size).values # Scale the data if timeframe in self.scalers: # Use existing scaler scaler = self.scalers[timeframe] else: # Create new scaler scaler = MinMaxScaler() # Fit on all available data all_data = df[['open', 'high', 'low', 'close', 'volume']].values scaler.fit(all_data) self.scalers[timeframe] = scaler ohlcv_scaled = scaler.transform(ohlcv) # Reshape to (1, window_size, n_features) X = np.array([ohlcv_scaled]) # Get timestamp of the most recent candle timestamp = df['timestamp'].iloc[-1] return X, timestamp def get_training_data(self, timeframe='1m', n_candles=5000): """ Get a consolidated dataframe for RL training with OHLCV and technical indicators Args: timeframe (str): Timeframe to use n_candles (int): Number of candles to fetch Returns: DataFrame: Combined dataframe with price data and technical indicators """ # Get historical data df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=True) if df is None or len(df) < 100: # Minimum required for indicators logger.error(f"Not enough data for RL training (need at least 100 candles)") return None # Calculate technical indicators try: # Add RSI (14) df['rsi'] = ta.momentum.rsi(df['close'], window=14) # Add MACD macd = ta.trend.MACD(df['close']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_hist'] = macd.macd_diff() # Add Bollinger Bands bbands = ta.volatility.BollingerBands(df['close']) df['bb_upper'] = bbands.bollinger_hband() df['bb_middle'] = bbands.bollinger_mavg() df['bb_lower'] = bbands.bollinger_lband() # Add ATR (Average True Range) df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=14) # Add moving averages df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20) df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50) df['ema_20'] = ta.trend.ema_indicator(df['close'], window=20) # Add OBV (On-Balance Volume) df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume']) # Add momentum indicators df['mom'] = ta.momentum.roc(df['close'], window=10) # Normalize price to previous close df['close_norm'] = df['close'] / df['close'].shift(1) - 1 df['high_norm'] = df['high'] / df['close'].shift(1) - 1 df['low_norm'] = df['low'] / df['close'].shift(1) - 1 # Volatility features df['volatility'] = df['high'] / df['low'] - 1 # Volume features df['volume_norm'] = df['volume'] / df['volume'].rolling(20).mean() # Calculate returns df['returns_1'] = df['close'].pct_change(1) df['returns_5'] = df['close'].pct_change(5) df['returns_10'] = df['close'].pct_change(10) except Exception as e: logger.error(f"Error calculating technical indicators: {str(e)}") return None # Drop NaN values df = df.dropna() return df