#!/usr/bin/env python3 """ CNN Model - PyTorch Implementation This module implements a CNN model using PyTorch for time series analysis. The model consists of multiple convolutional pathways and LSTM layers. """ import os import logging import numpy as np import matplotlib.pyplot as plt from datetime import datetime import math import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score # Configure logging logger = logging.getLogger(__name__) class CNNPyTorch(nn.Module): """PyTorch CNN model for time series analysis""" def __init__(self, input_shape, output_size=3): """ Initialize the CNN model. Args: input_shape (tuple): Shape of input data (window_size, features) output_size (int): Size of the output (3 for BUY/HOLD/SELL) """ super(CNNPyTorch, self).__init__() window_size, num_features = input_shape kernel_size = min(5, window_size) # Ensure kernel size doesn't exceed window size dropout_rate = 0.3 # Calculate initial channel size based on number of features initial_channels = max(32, num_features * 2) # Scale channels with features # CNN Architecture self.conv_layers = nn.Sequential( # Block 1 nn.Conv1d(num_features, initial_channels, kernel_size, padding='same'), nn.BatchNorm1d(initial_channels), nn.ReLU(), nn.Dropout(dropout_rate), # Block 2 nn.Conv1d(initial_channels, initial_channels * 2, kernel_size, padding='same'), nn.BatchNorm1d(initial_channels * 2), nn.ReLU(), nn.MaxPool1d(2), nn.Dropout(dropout_rate), # Block 3 nn.Conv1d(initial_channels * 2, initial_channels * 4, kernel_size, padding='same'), nn.BatchNorm1d(initial_channels * 4), nn.ReLU(), nn.Dropout(dropout_rate), # Block 4 nn.Conv1d(initial_channels * 4, initial_channels * 8, kernel_size, padding='same'), nn.BatchNorm1d(initial_channels * 8), nn.ReLU(), nn.MaxPool1d(2), nn.Dropout(dropout_rate) ) # Calculate flattened size after conv and pooling conv_output_size = (initial_channels * 8) * (window_size // 4) # Dense layers with scaled sizes dense_size = min(2048, conv_output_size) # Cap dense layer size self.dense_block = nn.Sequential( nn.Flatten(), nn.Linear(conv_output_size, dense_size), nn.BatchNorm1d(dense_size), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(dense_size, dense_size // 2), nn.BatchNorm1d(dense_size // 2), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(dense_size // 2, dense_size // 4), nn.BatchNorm1d(dense_size // 4), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(dense_size // 4, output_size) ) # Activation for output self.activation = nn.Softmax(dim=1) def forward(self, x): """ Forward pass through the network. Args: x: Input tensor of shape [batch_size, window_size, features] Returns: Output tensor of shape [batch_size, output_size] """ # Transpose for conv1d: [batch, features, window] x_t = x.transpose(1, 2) # Process through CNN layers conv_out = self.conv_layers(x_t) # Process through dense layers dense_out = self.dense_block(conv_out) # Apply activation output = self.activation(dense_out) return output class CNNModelPyTorch: """ CNN model wrapper class for time series analysis using PyTorch. This class provides methods for building, training, evaluating, and making predictions with the CNN model. """ def __init__(self, window_size, num_features, output_size=3, timeframes=None): """ Initialize the CNN model. Args: window_size (int): Size of the input window num_features (int): Number of features in the input data output_size (int): Size of the output (default: 3 for BUY/HOLD/SELL) timeframes (list): List of timeframes used (for logging) """ # Action tracking self.action_counts = { 'BUY': 0, 'SELL': 0, 'HOLD': 0 } self.window_size = window_size self.num_features = num_features self.output_size = output_size self.timeframes = timeframes or [] # Determine device (GPU or CPU) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {self.device}") # Initialize model self.model = None self.build_model() # Initialize training history self.history = { 'loss': [], 'val_loss': [], 'accuracy': [], 'val_accuracy': [] } def build_model(self): """Build the CNN model architecture""" logger.info(f"Building PyTorch CNN model with window_size={self.window_size}, " f"num_features={self.num_features}, output_size={self.output_size}") self.model = CNNPyTorch( input_shape=(self.window_size, self.num_features), output_size=self.output_size ).to(self.device) # Initialize optimizer with learning rate schedule self.optimizer = optim.Adam(self.model.parameters(), lr=0.001) self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='max', factor=0.5, patience=10, verbose=True ) # Initialize loss function with class weights class_weights = torch.tensor([1.0, 0.5, 1.0]).to(self.device) # Lower weight for HOLD self.criterion = nn.CrossEntropyLoss(weight=class_weights) logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters") def train_epoch(self, X_train, y_train, future_prices=None, batch_size=32): """Train for one epoch and return loss and accuracy""" # Convert to PyTorch tensors X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device) y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device) # Create DataLoader train_dataset = TensorDataset(X_train_tensor, y_train_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) self.model.train() running_loss = 0.0 correct = 0 total = 0 for inputs, targets in train_loader: # Zero gradients self.optimizer.zero_grad() # Forward pass outputs = self.model(inputs) # Calculate loss loss = self.criterion(outputs, targets) # Backward pass and optimize loss.backward() # Clip gradients to prevent exploding gradients torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() # Statistics running_loss += loss.item() _, predicted = torch.max(outputs, 1) total += targets.size(0) correct += (predicted == targets).sum().item() epoch_loss = running_loss / len(train_loader) epoch_acc = correct / total if total > 0 else 0 # Update learning rate scheduler self.scheduler.step(epoch_acc) # To maintain compatibility with the updated training code, we'll return 3 values # But the price_loss will be zero since we're not using that in this model return epoch_loss, 0.0, epoch_acc def evaluate(self, X_val, y_val, future_prices=None): """Evaluate on validation data and return loss and accuracy""" # Convert to PyTorch tensors X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device) y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device) # Create DataLoader val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=32) self.model.eval() running_loss = 0.0 correct = 0 total = 0 with torch.no_grad(): for inputs, targets in val_loader: # Forward pass outputs = self.model(inputs) # Calculate loss loss = self.criterion(outputs, targets) running_loss += loss.item() # Calculate accuracy _, predicted = torch.max(outputs, 1) total += targets.size(0) correct += (predicted == targets).sum().item() val_loss = running_loss / len(val_loader) val_acc = correct / total if total > 0 else 0 # To maintain compatibility with the updated training code, we'll return 3 values # But the price_loss will be zero since we're not using that in this model return val_loss, 0.0, val_acc def predict(self, X): """Make predictions on input data""" self.model.eval() X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) with torch.no_grad(): outputs = self.model(X_tensor) # To maintain compatibility with the transformer model, return the action probs # And a dummy price prediction of zeros return outputs.cpu().numpy(), np.zeros((len(X), 1)) def predict_next_candles(self, X, n_candles=3): """ Predict the next n candles. Args: X: Input data of shape [batch_size, window_size, features] n_candles: Number of future candles to predict Returns: Dictionary of predictions for each timeframe """ self.model.eval() X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) with torch.no_grad(): # Get predictions for the input window action_probs = self.model(X_tensor) # For compatibility, we'll return a dictionary with the timeframes predictions = {} for i, tf in enumerate(self.timeframes): # Simple prediction: just repeat the current prediction for next n candles predictions[tf] = np.tile(action_probs.cpu().numpy(), (n_candles, 1)) return predictions def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100): """ Train the CNN model. Args: X_train: Training input data y_train: Training target data X_val: Validation input data y_val: Validation target data batch_size: Batch size for training epochs: Number of training epochs Returns: Training history """ logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, " f"batch_size={batch_size}, epochs={epochs}") # Convert numpy arrays to PyTorch tensors X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device) # Handle different output sizes for y_train if self.output_size == 1: y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device) else: y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device) # Create DataLoader for training data train_dataset = TensorDataset(X_train_tensor, y_train_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Create DataLoader for validation data if provided if X_val is not None and y_val is not None: X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device) if self.output_size == 1: y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device) else: y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device) val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=batch_size) else: val_loader = None # Training loop for epoch in range(epochs): # Training phase self.model.train() running_loss = 0.0 correct = 0 total = 0 for inputs, targets in train_loader: # Zero the parameter gradients self.optimizer.zero_grad() # Forward pass outputs = self.model(inputs) # Calculate loss if self.output_size == 1: loss = self.criterion(outputs, targets.unsqueeze(1)) else: loss = self.criterion(outputs, targets) # Backward pass and optimize loss.backward() self.optimizer.step() # Statistics running_loss += loss.item() if self.output_size > 1: _, predicted = torch.max(outputs, 1) total += targets.size(0) correct += (predicted == targets).sum().item() epoch_loss = running_loss / len(train_loader) epoch_acc = correct / total if total > 0 else 0 # Validation phase if val_loader is not None: val_loss, val_acc = self.evaluate(X_val, y_val) logger.info(f"Epoch {epoch+1}/{epochs} - " f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - " f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}") # Update history self.history['loss'].append(epoch_loss) self.history['accuracy'].append(epoch_acc) self.history['val_loss'].append(val_loss) self.history['val_accuracy'].append(val_acc) else: logger.info(f"Epoch {epoch+1}/{epochs} - " f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}") # Update history without validation self.history['loss'].append(epoch_loss) self.history['accuracy'].append(epoch_acc) logger.info("Training completed") return self.history def evaluate_metrics(self, X_test, y_test): """ Calculate and return comprehensive evaluation metrics as dict """ X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device) self.model.eval() with torch.no_grad(): y_pred = self.model(X_test_tensor) if self.output_size > 1: _, y_pred_class = torch.max(y_pred, 1) y_pred_class = y_pred_class.cpu().numpy() else: y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten() metrics = { 'accuracy': accuracy_score(y_test, y_pred_class), 'precision': precision_score(y_test, y_pred_class, average='weighted', zero_division=0), 'recall': recall_score(y_test, y_pred_class, average='weighted', zero_division=0), 'f1_score': f1_score(y_test, y_pred_class, average='weighted', zero_division=0) } return metrics def save(self, filepath): """ Save the model to a file. Args: filepath: Path to save the model """ # Create directory if it doesn't exist os.makedirs(os.path.dirname(filepath), exist_ok=True) # Save the model state model_state = { 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'history': self.history, 'window_size': self.window_size, 'num_features': self.num_features, 'output_size': self.output_size, 'timeframes': self.timeframes } torch.save(model_state, f"{filepath}.pt") logger.info(f"Model saved to {filepath}.pt") def load(self, filepath): """ Load the model from a file. Args: filepath: Path to load the model from """ # Check if file exists if not os.path.exists(f"{filepath}.pt"): logger.error(f"Model file {filepath}.pt not found") return False # Load the model state model_state = torch.load(f"{filepath}.pt", map_location=self.device) # Update model parameters self.window_size = model_state['window_size'] self.num_features = model_state['num_features'] self.output_size = model_state['output_size'] self.timeframes = model_state['timeframes'] # Rebuild the model self.build_model() # Load the model state self.model.load_state_dict(model_state['model_state_dict']) self.optimizer.load_state_dict(model_state['optimizer_state_dict']) self.history = model_state['history'] logger.info(f"Model loaded from {filepath}.pt") return True def plot_training_history(self): """Plot the training history""" if not self.history['loss']: logger.warning("No training history to plot") return plt.figure(figsize=(12, 4)) # Plot loss plt.subplot(1, 2, 1) plt.plot(self.history['loss'], label='Training Loss') if 'val_loss' in self.history and self.history['val_loss']: plt.plot(self.history['val_loss'], label='Validation Loss') plt.title('Model Loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend() # Plot accuracy plt.subplot(1, 2, 2) plt.plot(self.history['accuracy'], label='Training Accuracy') if 'val_accuracy' in self.history and self.history['val_accuracy']: plt.plot(self.history['val_accuracy'], label='Validation Accuracy') plt.title('Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend() # Save the plot os.makedirs('plots', exist_ok=True) plt.savefig(os.path.join('plots', f"cnn_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")) plt.close() logger.info("Training history plots saved to plots directory") def extract_hidden_features(self, X): """ Extract hidden features from the model - outputs from last dense layer before output. Args: X: Input data Returns: Hidden features (output from penultimate dense layer) """ # Convert to PyTorch tensor X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) # Forward pass through the model self.model.eval() with torch.no_grad(): # Get features through CNN layers x_t = X_tensor.transpose(1, 2) conv_out = self.model.conv_layers(x_t) # Process through all dense layers except the output layer features = conv_out for layer in self.model.dense_block[:-2]: # Exclude last linear layer and dropout features = layer(features) return features.cpu().numpy()