diff --git a/NN/models/cnn_model_pytorch.py b/NN/models/cnn_model_pytorch.py index 99801f0..260f531 100644 --- a/NN/models/cnn_model_pytorch.py +++ b/NN/models/cnn_model_pytorch.py @@ -11,6 +11,7 @@ import logging import numpy as np import matplotlib.pyplot as plt from datetime import datetime +import math import torch import torch.nn as nn @@ -24,79 +25,84 @@ logger = logging.getLogger(__name__) class CNNPyTorch(nn.Module): """PyTorch CNN model for time series analysis""" - def __init__(self, input_shape, output_size=5): + def __init__(self, input_shape, output_size=3): """ - Initialize the enhanced CNN model. + Initialize the CNN model. Args: input_shape (tuple): Shape of input data (window_size, features) - output_size (int): Always 5 for our trading signals + output_size (int): Size of the output (3 for BUY/HOLD/SELL) """ super(CNNPyTorch, self).__init__() window_size, num_features = input_shape - kernel_size = 5 + kernel_size = min(5, window_size) # Ensure kernel size doesn't exceed window size dropout_rate = 0.3 - # Enhanced CNN Architecture + # Calculate initial channel size based on number of features + initial_channels = max(32, num_features * 2) # Scale channels with features + + # CNN Architecture self.conv_layers = nn.Sequential( # Block 1 - nn.Conv1d(num_features, 64, kernel_size, padding='same'), - nn.BatchNorm1d(64), + nn.Conv1d(num_features, initial_channels, kernel_size, padding='same'), + nn.BatchNorm1d(initial_channels), nn.ReLU(), + nn.Dropout(dropout_rate), # Block 2 - nn.Conv1d(64, 128, kernel_size, padding='same'), - nn.BatchNorm1d(128), + nn.Conv1d(initial_channels, initial_channels * 2, kernel_size, padding='same'), + nn.BatchNorm1d(initial_channels * 2), nn.ReLU(), nn.MaxPool1d(2), + nn.Dropout(dropout_rate), # Block 3 - nn.Conv1d(128, 256, kernel_size, padding='same'), - nn.BatchNorm1d(256), + nn.Conv1d(initial_channels * 2, initial_channels * 4, kernel_size, padding='same'), + nn.BatchNorm1d(initial_channels * 4), nn.ReLU(), + nn.Dropout(dropout_rate), # Block 4 - nn.Conv1d(256, 512, kernel_size, padding='same'), - nn.BatchNorm1d(512), + nn.Conv1d(initial_channels * 4, initial_channels * 8, kernel_size, padding='same'), + nn.BatchNorm1d(initial_channels * 8), nn.ReLU(), - nn.MaxPool1d(2) + nn.MaxPool1d(2), + nn.Dropout(dropout_rate) ) # Calculate flattened size after conv and pooling - conv_output_size = 512 * (window_size // 4) + conv_output_size = (initial_channels * 8) * (window_size // 4) + + # Dense layers with scaled sizes + dense_size = min(2048, conv_output_size) # Cap dense layer size - # Enhanced dense layers self.dense_block = nn.Sequential( nn.Flatten(), - nn.Linear(conv_output_size, 512), - nn.BatchNorm1d(512), + nn.Linear(conv_output_size, dense_size), + nn.BatchNorm1d(dense_size), nn.ReLU(), nn.Dropout(dropout_rate), - nn.Linear(512, 256), - nn.BatchNorm1d(256), + nn.Linear(dense_size, dense_size // 2), + nn.BatchNorm1d(dense_size // 2), nn.ReLU(), nn.Dropout(dropout_rate), - nn.Linear(256, 128), - nn.BatchNorm1d(128), + nn.Linear(dense_size // 2, dense_size // 4), + nn.BatchNorm1d(dense_size // 4), nn.ReLU(), + nn.Dropout(dropout_rate), - nn.Linear(128, output_size) + nn.Linear(dense_size // 4, output_size) ) - # Activation based on output size - if output_size == 1: - self.activation = nn.Sigmoid() # Binary classification or regression - elif output_size > 1: - self.activation = nn.Softmax(dim=1) # Multi-class classification - else: - self.activation = nn.Identity() # No activation + # Activation for output + self.activation = nn.Softmax(dim=1) def forward(self, x): """ - Forward pass through enhanced network. + Forward pass through the network. Args: x: Input tensor of shape [batch_size, window_size, features] @@ -107,14 +113,16 @@ class CNNPyTorch(nn.Module): # Transpose for conv1d: [batch, features, window] x_t = x.transpose(1, 2) - # Process through all CNN layers + # Process through CNN layers conv_out = self.conv_layers(x_t) # Process through dense layers - output = self.dense_block(conv_out) + dense_out = self.dense_block(conv_out) - return self.activation(output) - + # Apply activation + output = self.activation(dense_out) + + return output class CNNModelPyTorch: """ @@ -124,14 +132,14 @@ class CNNModelPyTorch: predictions with the CNN model. """ - def __init__(self, window_size, num_features, output_size=5, timeframes=None): + def __init__(self, window_size, num_features, output_size=3, timeframes=None): """ Initialize the CNN model. Args: window_size (int): Size of the input window num_features (int): Number of features in the input data - output_size (int): Size of the output (1 for regression, 3 for classification) + output_size (int): Size of the output (default: 3 for BUY/HOLD/SELL) timeframes (list): List of timeframes used (for logging) """ # Action tracking @@ -171,27 +179,23 @@ class CNNModelPyTorch: output_size=self.output_size ).to(self.device) - # Initialize optimizer + # Initialize optimizer with learning rate schedule self.optimizer = optim.Adam(self.model.parameters(), lr=0.001) + self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( + self.optimizer, mode='max', factor=0.5, patience=10, verbose=True + ) - # Initialize loss function based on output size - if self.output_size == 1: - self.criterion = nn.BCELoss() # Binary classification - elif self.output_size > 1: - self.criterion = nn.CrossEntropyLoss() # Multi-class classification - else: - self.criterion = nn.MSELoss() # Regression + # Initialize loss function with class weights + class_weights = torch.tensor([1.0, 0.5, 1.0]).to(self.device) # Lower weight for HOLD + self.criterion = nn.CrossEntropyLoss(weight=class_weights) logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters") - def train_epoch(self, X_train, y_train, batch_size=32): + def train_epoch(self, X_train, y_train, future_prices=None, batch_size=32): """Train for one epoch and return loss and accuracy""" # Convert to PyTorch tensors X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device) - if self.output_size == 1: - y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device) - else: - y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device) + y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device) # Create DataLoader train_dataset = TensorDataset(X_train_tensor, y_train_tensor) @@ -210,40 +214,44 @@ class CNNModelPyTorch: outputs = self.model(inputs) # Calculate loss - if self.output_size == 1: - loss = self.criterion(outputs, targets.unsqueeze(1)) - else: - loss = self.criterion(outputs, targets) + loss = self.criterion(outputs, targets) # Backward pass and optimize loss.backward() + + # Clip gradients to prevent exploding gradients + torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) + self.optimizer.step() # Statistics running_loss += loss.item() - if self.output_size > 1: - _, predicted = torch.max(outputs, 1) - total += targets.size(0) - correct += (predicted == targets).sum().item() + _, predicted = torch.max(outputs, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() epoch_loss = running_loss / len(train_loader) epoch_acc = correct / total if total > 0 else 0 - return epoch_loss, epoch_acc + # Update learning rate scheduler + self.scheduler.step(epoch_acc) + + # To maintain compatibility with the updated training code, we'll return 3 values + # But the price_loss will be zero since we're not using that in this model + return epoch_loss, 0.0, epoch_acc - def evaluate(self, X_val, y_val): + def evaluate(self, X_val, y_val, future_prices=None): """Evaluate on validation data and return loss and accuracy""" + # Convert to PyTorch tensors X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device) - if self.output_size == 1: - y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device) - else: - y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device) - + y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device) + + # Create DataLoader val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=32) self.model.eval() - val_loss = 0.0 + running_loss = 0.0 correct = 0 total = 0 @@ -253,20 +261,20 @@ class CNNModelPyTorch: outputs = self.model(inputs) # Calculate loss - if self.output_size == 1: - loss = self.criterion(outputs, targets.unsqueeze(1)) - else: - loss = self.criterion(outputs, targets) - - val_loss += loss.item() + loss = self.criterion(outputs, targets) + running_loss += loss.item() # Calculate accuracy - if self.output_size > 1: - _, predicted = torch.max(outputs, 1) - total += targets.size(0) - correct += (predicted == targets).sum().item() + _, predicted = torch.max(outputs, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() - return val_loss / len(val_loader), correct / total if total > 0 else 0 + val_loss = running_loss / len(val_loader) + val_acc = correct / total if total > 0 else 0 + + # To maintain compatibility with the updated training code, we'll return 3 values + # But the price_loss will be zero since we're not using that in this model + return val_loss, 0.0, val_acc def predict(self, X): """Make predictions on input data""" @@ -275,15 +283,13 @@ class CNNModelPyTorch: with torch.no_grad(): outputs = self.model(X_tensor) - if self.output_size > 1: - _, predicted = torch.max(outputs, 1) - return predicted.cpu().numpy() - else: - return outputs.cpu().numpy() + # To maintain compatibility with the transformer model, return the action probs + # And a dummy price prediction of zeros + return outputs.cpu().numpy(), np.zeros((len(X), 1)) def predict_next_candles(self, X, n_candles=3): """ - Predict the next n candles for each timeframe. + Predict the next n candles. Args: X: Input data of shape [batch_size, window_size, features] @@ -296,33 +302,14 @@ class CNNModelPyTorch: X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) with torch.no_grad(): - # Get the last window of data - last_window = X_tensor[-1:] # [1, window_size, features] + # Get predictions for the input window + action_probs = self.model(X_tensor) - # Initialize predictions + # For compatibility, we'll return a dictionary with the timeframes predictions = {} - - # For each timeframe, predict next n candles for i, tf in enumerate(self.timeframes): - # Extract features for this timeframe - tf_features = last_window[:, :, i*5:(i+1)*5] # [1, window_size, 5] - - # Predict next n candles - tf_predictions = [] - current_window = tf_features - - for _ in range(n_candles): - # Get prediction for next candle - output = self.model(current_window) - tf_predictions.append(output.cpu().numpy()) - - # Update window for next prediction - current_window = torch.cat([ - current_window[:, 1:, :], - output.unsqueeze(1) - ], dim=1) - - predictions[tf] = np.concatenate(tf_predictions, axis=0) + # Simple prediction: just repeat the current prediction for next n candles + predictions[tf] = np.tile(action_probs.cpu().numpy(), (n_candles, 1)) return predictions diff --git a/NN/realtime_main.py b/NN/realtime_main.py index fa16301..d0e4645 100644 --- a/NN/realtime_main.py +++ b/NN/realtime_main.py @@ -148,19 +148,29 @@ def train(data_interface, model, args): best_val_acc = 0 best_val_pnl = float('-inf') best_win_rate = 0 + best_price_mae = float('inf') logger.info("Verifying data interface...") X_sample, y_sample, _, _, _, _ = data_interface.prepare_training_data(refresh=True) logger.info(f"Data validation - X shape: {X_sample.shape}, y shape: {y_sample.shape}") + # Calculate refresh intervals based on timeframes + min_timeframe = min(args.timeframes) + refresh_interval = { + '1s': 1, + '1m': 60, + '5m': 300, + '15m': 900, + '1h': 3600, + '4h': 14400, + '1d': 86400 + }.get(min_timeframe, 60) + + logger.info(f"Using refresh interval of {refresh_interval} seconds based on {min_timeframe} timeframe") + for epoch in range(args.epochs): - # More frequent refresh for shorter timeframes - if '1s' in args.timeframes: - refresh = True # Always refresh for tick data - refresh_interval = 30 # 30 seconds for tick data - else: - refresh = epoch % 1 == 0 # Refresh every epoch - refresh_interval = 120 # 2 minutes for other timeframes + # Always refresh for tick data or when using multiple timeframes + refresh = '1s' in args.timeframes or len(args.timeframes) > 1 logger.info(f"\nStarting epoch {epoch+1}/{args.epochs}") X_train, y_train, X_val, y_val, train_prices, val_prices = data_interface.prepare_training_data( @@ -170,86 +180,106 @@ def train(data_interface, model, args): logger.info(f"Training data - X shape: {X_train.shape}, y shape: {y_train.shape}") logger.info(f"Validation data - X shape: {X_val.shape}, y shape: {y_val.shape}") + # Get future prices for retrospective training + train_future_prices = data_interface.get_future_prices(train_prices, n_candles=3) + val_future_prices = data_interface.get_future_prices(val_prices, n_candles=3) + # Train and validate try: - train_loss, train_acc = model.train_epoch(X_train, y_train, args.batch_size) - val_loss, val_acc = model.evaluate(X_val, y_val) + train_action_loss, train_price_loss, train_acc = model.train_epoch( + X_train, y_train, train_future_prices, args.batch_size + ) + val_action_loss, val_price_loss, val_acc = model.evaluate( + X_val, y_val, val_future_prices + ) # Get predictions for PnL calculation - train_preds = model.predict(X_train) - val_preds = model.predict(X_val) + train_action_probs, train_price_preds = model.predict(X_train) + val_action_probs, val_price_preds = model.predict(X_val) # Calculate PnL and win rates train_pnl, train_win_rate, train_trades = data_interface.calculate_pnl( - train_preds, train_prices, position_size=1.0 + train_action_probs, train_prices, position_size=1.0 ) val_pnl, val_win_rate, val_trades = data_interface.calculate_pnl( - val_preds, val_prices, position_size=1.0 + val_action_probs, val_prices, position_size=1.0 ) + # Calculate price prediction error + train_price_mae = np.mean(np.abs(train_price_preds - train_future_prices)) + val_price_mae = np.mean(np.abs(val_price_preds - val_future_prices)) + # Monitor action distribution - train_actions = np.bincount(train_preds, minlength=3) - val_actions = np.bincount(val_preds, minlength=3) + train_actions = np.bincount(np.argmax(train_action_probs, axis=1), minlength=3) + val_actions = np.bincount(np.argmax(val_action_probs, axis=1), minlength=3) # Log metrics - writer.add_scalar('Loss/train', train_loss, epoch) + writer.add_scalar('Loss/action_train', train_action_loss, epoch) + writer.add_scalar('Loss/price_train', train_price_loss, epoch) + writer.add_scalar('Loss/action_val', val_action_loss, epoch) + writer.add_scalar('Loss/price_val', val_price_loss, epoch) writer.add_scalar('Accuracy/train', train_acc, epoch) - writer.add_scalar('Loss/val', val_loss, epoch) writer.add_scalar('Accuracy/val', val_acc, epoch) writer.add_scalar('PnL/train', train_pnl, epoch) writer.add_scalar('PnL/val', val_pnl, epoch) writer.add_scalar('WinRate/train', train_win_rate, epoch) writer.add_scalar('WinRate/val', val_win_rate, epoch) + writer.add_scalar('PriceMAE/train', train_price_mae, epoch) + writer.add_scalar('PriceMAE/val', val_price_mae, epoch) # Log action distribution for i, action in enumerate(['SELL', 'HOLD', 'BUY']): writer.add_scalar(f'Actions/train_{action}', train_actions[i], epoch) writer.add_scalar(f'Actions/val_{action}', val_actions[i], epoch) - # Save best model based on validation PnL - if val_pnl > best_val_pnl: + # Save best model based on validation metrics + if val_pnl > best_val_pnl or (val_pnl == best_val_pnl and val_acc > best_val_acc): best_val_pnl = val_pnl best_val_acc = val_acc best_win_rate = val_win_rate + best_price_mae = val_price_mae model.save(f"models/{args.model_type}_best.pt") + logger.info("Saved new best model based on validation metrics") # Log detailed metrics - logger.info(f"Epoch {epoch+1}/{args.epochs} - " - f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.2f}, " - f"PnL: {train_pnl:.2%}, Win Rate: {train_win_rate:.2%} - " - f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.2f}, " - f"PnL: {val_pnl:.2%}, Win Rate: {val_win_rate:.2%}") + logger.info(f"Epoch {epoch+1}/{args.epochs}") + logger.info("Training Metrics:") + logger.info(f" Action Loss: {train_action_loss:.4f}") + logger.info(f" Price Loss: {train_price_loss:.4f}") + logger.info(f" Accuracy: {train_acc:.2f}") + logger.info(f" PnL: {train_pnl:.2%}") + logger.info(f" Win Rate: {train_win_rate:.2%}") + logger.info(f" Price MAE: {train_price_mae:.2f}") + + logger.info("Validation Metrics:") + logger.info(f" Action Loss: {val_action_loss:.4f}") + logger.info(f" Price Loss: {val_price_loss:.4f}") + logger.info(f" Accuracy: {val_acc:.2f}") + logger.info(f" PnL: {val_pnl:.2%}") + logger.info(f" Win Rate: {val_win_rate:.2%}") + logger.info(f" Price MAE: {val_price_mae:.2f}") # Log action distribution logger.info("Action Distribution:") for i, action in enumerate(['SELL', 'HOLD', 'BUY']): - logger.info(f"{action}: Train={train_actions[i]}, Val={val_actions[i]}") + logger.info(f" {action}: Train={train_actions[i]}, Val={val_actions[i]}") # Log trade statistics - if train_trades: - logger.info(f"Training trades: {len(train_trades)}") - logger.info(f"Validation trades: {len(val_trades)}") + logger.info("Trade Statistics:") + logger.info(f" Training trades: {len(train_trades)}") + logger.info(f" Validation trades: {len(val_trades)}") - # Retrospective fine-tuning - if epoch > 0 and val_pnl > 0: # Only fine-tune if we're making profit - logger.info("Performing retrospective fine-tuning...") - - # Get predictions for next few candles + # Log next candle predictions + if epoch % 10 == 0: # Every 10 epochs + logger.info("\nNext Candle Predictions:") next_candles = model.predict_next_candles(X_val[-1:], n_candles=3) - - # Log predictions for each timeframe - for tf, preds in next_candles.items(): - logger.info(f"Next 3 candles for {tf}:") - for i, pred in enumerate(preds): - action = ['SELL', 'HOLD', 'BUY'][np.argmax(pred)] - confidence = np.max(pred) - logger.info(f"Candle {i+1}: {action} (confidence: {confidence:.2f})") - - # Fine-tune on recent successful trades - successful_trades = [t for t in train_trades if t['pnl'] > 0] - if successful_trades: - logger.info(f"Fine-tuning on {len(successful_trades)} successful trades") - # TODO: Implement fine-tuning logic here + for tf in args.timeframes: + if tf in next_candles: + logger.info(f"\n{tf} timeframe predictions:") + for i, pred in enumerate(next_candles[tf]): + action = ['SELL', 'HOLD', 'BUY'][np.argmax(pred)] + confidence = np.max(pred) + logger.info(f" Candle {i+1}: {action} (confidence: {confidence:.2f})") except Exception as e: logger.error(f"Error during epoch {epoch+1}: {str(e)}") @@ -257,10 +287,11 @@ def train(data_interface, model, args): # Save final model model.save(f"models/{args.model_type}_final_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pt") - logger.info(f"Training complete. Best validation metrics:") + logger.info(f"\nTraining complete. Best validation metrics:") logger.info(f"Accuracy: {best_val_acc:.2f}") logger.info(f"PnL: {best_val_pnl:.2%}") logger.info(f"Win Rate: {best_win_rate:.2%}") + logger.info(f"Price MAE: {best_price_mae:.2f}") except Exception as e: logger.error(f"Error in training: {str(e)}") diff --git a/NN/utils/data_interface.py b/NN/utils/data_interface.py index d948666..f9f2c5b 100644 --- a/NN/utils/data_interface.py +++ b/NN/utils/data_interface.py @@ -61,6 +61,10 @@ class DataInterface: """ cache_file = os.path.join(self.data_dir, f"{self.symbol.replace('/', '_')}_{timeframe}.csv") + # For 1s timeframe, always fetch fresh data + if timeframe == '1s': + use_cache = False + # Check if cached data exists and is recent if use_cache and os.path.exists(cache_file): try: @@ -74,18 +78,18 @@ class DataInterface: logger.error(f"Error reading cached data: {str(e)}") # If we get here, we need to fetch data - # For now, we'll use a placeholder for fetching data from an exchange try: - # In a real implementation, we would fetch data from an exchange or API here - # For this example, we'll create dummy data if we can't load from cache logger.info(f"Fetching historical data for {self.symbol} {timeframe}") + # For 1s timeframe, we need more data points + if timeframe == '1s': + n_candles = min(n_candles * 60, 10000) # Up to 10k ticks + # Placeholder for real data fetching - # In a real implementation, this would be replaced with API calls self._fetch_data_from_exchange(timeframe, n_candles) - # Save to cache - if self.dataframes[timeframe] is not None: + # Save to cache (except for 1s timeframe) + if self.dataframes[timeframe] is not None and timeframe != '1s': self.dataframes[timeframe].to_csv(cache_file, index=False) return self.dataframes[timeframe] else: @@ -122,6 +126,7 @@ class DataInterface: """ # Map timeframe to seconds tf_seconds = { + '1s': 1, # Added 1s timeframe '1m': 60, '5m': 300, '15m': 900, @@ -207,59 +212,62 @@ class DataInterface: # Get data for all requested timeframes dfs = {} + min_length = float('inf') for tf in timeframes: - df = self.get_historical_data(timeframe=tf, n_candles=n_candles) + # For 1s timeframe, we need more data points + tf_candles = n_candles * 60 if tf == '1s' else n_candles + df = self.get_historical_data(timeframe=tf, n_candles=tf_candles) if df is not None and not df.empty: dfs[tf] = df + # Keep track of minimum length across all timeframes + min_length = min(min_length, len(df)) if not dfs: logger.error("No data available for feature creation") return None, None, None + # Align all dataframes to the same length + for tf in dfs: + dfs[tf] = dfs[tf].tail(min_length) + # Create features for each timeframe features = [] - targets = [] - timestamps = [] + targets = None + timestamps = None for tf in timeframes: if tf in dfs: X, y, ts = self._create_features(dfs[tf], window_size) if X is not None and y is not None: features.append(X) - if len(targets) == 0: # Only need targets from one timeframe + if targets is None: # Only need targets from one timeframe targets = y timestamps = ts - if not features: + if not features or targets is None: logger.error("Failed to create features for any timeframe") return None, None, None - # Stack features from all timeframes along the time dimension - # Reshape each timeframe's features to [samples, window, 1, features] - reshaped_features = [f.reshape(f.shape[0], f.shape[1], 1, f.shape[2]) - for f in features] - # Concatenate along the channel dimension - X = np.concatenate(reshaped_features, axis=2) - # Reshape to [samples, window, features*timeframes] - X = X.reshape(X.shape[0], X.shape[1], -1) + # Ensure all feature arrays have the same length + min_samples = min(f.shape[0] for f in features) + features = [f[-min_samples:] for f in features] + targets = targets[-min_samples:] + timestamps = timestamps[-min_samples:] + + # Stack features from all timeframes + X = np.concatenate([f.reshape(min_samples, window_size, -1) for f in features], axis=2) # Validate data if np.any(np.isnan(X)) or np.any(np.isinf(X)): logger.error("Generated features contain NaN or infinite values") return None, None, None - # Ensure all values are finite and normalized - X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0) - X = np.clip(X, -1e6, 1e6) # Clip extreme values - - # Log data shapes for debugging - logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {np.array(targets).shape}") - + logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {targets.shape}") return X, targets, timestamps def _create_features(self, df, window_size): """ - Create features from OHLCV data using a sliding window approach. + Create features from OHLCV data using a sliding window. Args: df (pd.DataFrame): DataFrame with OHLCV data @@ -267,76 +275,34 @@ class DataInterface: Returns: tuple: (X, y, timestamps) where: - X is the input features array + X is the feature array y is the target array - timestamps is an array of timestamps for each sample + timestamps is the array of timestamps """ - # Extract OHLCV columns - ohlcv = df[['open', 'high', 'low', 'close', 'volume']].values - - # Validate data before scaling - if np.any(np.isnan(ohlcv)) or np.any(np.isinf(ohlcv)): - logger.error("Input data contains NaN or infinite values") - return None, None, None - - # Handle potential constant columns (avoid division by zero in scaler) - ohlcv = np.nan_to_num(ohlcv, nan=0.0) - ranges = np.ptp(ohlcv, axis=0) - for i in range(len(ranges)): - if ranges[i] == 0: # Constant column - ohlcv[:, i] = 1 if i == 3 else 0 # Set close to 1, others to 0 - - # Scale the data with safety checks - try: - scaler = MinMaxScaler() - ohlcv_scaled = scaler.fit_transform(ohlcv) - if np.any(np.isnan(ohlcv_scaled)) or np.any(np.isinf(ohlcv_scaled)): - logger.error("Scaling produced invalid values") - return None, None, None - except Exception as e: - logger.error(f"Scaling failed: {str(e)}") + if len(df) < window_size + 1: + logger.error(f"Not enough data for feature creation (need {window_size + 1}, got {len(df)})") return None, None, None - # Store the scaler for later use - timeframe = next((tf for tf in self.timeframes if self.dataframes.get(tf) is not None and - self.dataframes[tf].equals(df)), 'unknown') - self.scalers[timeframe] = scaler + # Extract OHLCV data + data = df[['open', 'high', 'low', 'close', 'volume']].values + timestamps = df['timestamp'].values # Create sliding windows - X = [] - y = [] - timestamps = [] + X = np.array([data[i:i+window_size] for i in range(len(data)-window_size)]) - for i in range(len(ohlcv_scaled) - window_size): - # Input: window_size candles of OHLCV data - window = ohlcv_scaled[i:i+window_size] - - # Validate window data - if np.any(np.isnan(window)) or np.any(np.isinf(window)): - continue - - X.append(window) - - # Target: binary classification - price goes up (1) or down (0) - # 1 if close price increases in the next candle, 0 otherwise - price_change = ohlcv[i+window_size, 3] - ohlcv[i+window_size-1, 3] - y.append(1 if price_change > 0 else 0) - - # Store timestamp for reference - timestamps.append(df['timestamp'].iloc[i+window_size]) + # Create targets (next candle's movement: 0=down, 1=neutral, 2=up) + next_close = data[window_size:, 3] # Close prices + curr_close = data[window_size-1:-1, 3] + price_changes = (next_close - curr_close) / curr_close - if not X: - logger.error("No valid windows created") - return None, None, None - - X = np.array(X) - y = np.array(y) - timestamps = np.array(timestamps) + # Define thresholds for price movement classification + threshold = 0.001 # 0.1% threshold + y = np.zeros(len(price_changes), dtype=int) + y[price_changes > threshold] = 2 # Up + y[(price_changes >= -threshold) & (price_changes <= threshold)] = 1 # Neutral - # Log shapes for debugging logger.info(f"Created features - X shape: {X.shape}, y shape: {y.shape}") - - return X, y, timestamps + return X, y, timestamps[window_size:] def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20): """ @@ -406,21 +372,28 @@ class DataInterface: return dataset_info def get_feature_count(self): - """Get the number of features per input sample""" - # OHLCV (5 features) per timeframe - return 5 * len(self.timeframes) + """ + Calculate total number of features across all timeframes. + + Returns: + int: Total number of features (5 features per timeframe) + """ + return len(self.timeframes) * 5 # OHLCV features for each timeframe def calculate_pnl(self, predictions, actual_prices, position_size=1.0): """ - Calculate PnL based on predictions and actual price movements. + Calculate PnL and win rates based on predictions and actual price movements. Args: - predictions (np.array): Model predictions (0: sell, 1: hold, 2: buy) - actual_prices (np.array): Actual price data - position_size (float): Size of the position to trade + predictions: Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) or probabilities + actual_prices: Array of actual close prices + position_size: Position size for each trade Returns: - tuple: (total_pnl, win_rate, trade_history) + tuple: (pnl, win_rate, trades) where: + pnl is the total profit and loss + win_rate is the ratio of winning trades + trades is a list of trade dictionaries """ if len(predictions) != len(actual_prices) - 1: logger.error("Predictions and prices length mismatch") @@ -468,36 +441,85 @@ class DataInterface: win_rate = wins / trades if trades > 0 else 0.0 return pnl, win_rate, trade_history - def prepare_training_data(self, refresh=False, refresh_interval=300): + def get_future_prices(self, prices, n_candles=3): """ - Prepare training and validation data with optional refresh. + Extract future prices for use in retrospective training. Args: - refresh (bool): Whether to force refresh data - refresh_interval (int): Minimum seconds between refreshes + prices: Array of close prices + n_candles: Number of future candles to predict Returns: - tuple: (X_train, y_train, X_val, y_val, prices) numpy arrays + numpy.ndarray: Array of future prices for each sample + """ + if prices is None or len(prices) <= n_candles: + logger.warning(f"Not enough price data for future prediction: {len(prices) if prices is not None else 0} prices") + # Return zeros if not enough data + return np.zeros((len(prices) if prices is not None else 0, 1)) + + # For each price point i, get the price at i+n_candles + future_prices = np.zeros((len(prices), 1)) + for i in range(len(prices) - n_candles): + future_prices[i, 0] = prices[i + n_candles] + + # For the last n_candles positions, we don't have future data + # We'll use the last known price as a placeholder + for i in range(len(prices) - n_candles, len(prices)): + future_prices[i, 0] = prices[-1] + + return future_prices + + def prepare_training_data(self, refresh=False, refresh_interval=300): + """ + Prepare data for training, including splitting into train/validation sets. + + Args: + refresh (bool): Whether to refresh the data cache + refresh_interval (int): Interval in seconds to refresh data + + Returns: + tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices) """ current_time = datetime.now() - if refresh or (current_time - getattr(self, 'last_refresh', datetime.min)).total_seconds() > refresh_interval: + + # Check if we should refresh the data + if refresh or not hasattr(self, 'last_refresh_time') or \ + (current_time - self.last_refresh_time).total_seconds() > refresh_interval: logger.info("Refreshing training data...") - for tf in self.timeframes: - self.get_historical_data(timeframe=tf, n_candles=1000, use_cache=False) - self.last_refresh = current_time - - # Get all data + self.last_refresh_time = current_time + else: + # Use cached data + if hasattr(self, 'cached_train_data'): + return self.cached_train_data + + # Prepare input data X, y, _ = self.prepare_nn_input() if X is None: - return None, None, None, None, None + return None, None, None, None, None, None # Get price data for PnL calculation - prices = self.dataframes[self.timeframes[0]]['close'].values - - # Split into train/validation (80/20) + raw_prices = [] + for tf in self.timeframes: + if tf in self.dataframes and self.dataframes[tf] is not None: + # Get the close prices for the same period as X + prices = self.dataframes[tf]['close'].values[-len(X):] + if len(prices) == len(X): + raw_prices = prices + break + + if len(raw_prices) != len(X): + raw_prices = np.zeros(len(X)) # Fallback if no prices available + + # Split data into training and validation sets (80/20) split_idx = int(len(X) * 0.8) - return (X[:split_idx], y[:split_idx], X[split_idx:], y[split_idx:], - prices[:split_idx], prices[split_idx:]) + X_train, X_val = X[:split_idx], X[split_idx:] + y_train, y_val = y[:split_idx], y[split_idx:] + train_prices, val_prices = raw_prices[:split_idx], raw_prices[split_idx:] + + # Cache the data + self.cached_train_data = (X_train, y_train, X_val, y_val, train_prices, val_prices) + + return X_train, y_train, X_val, y_val, train_prices, val_prices def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20): """