#!/usr/bin/env python3 """ CNN Model - PyTorch Implementation This module implements a CNN model using PyTorch for time series analysis. The model consists of multiple convolutional pathways and LSTM layers. """ import os import logging import numpy as np import matplotlib.pyplot as plt from datetime import datetime import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score # Configure logging logger = logging.getLogger(__name__) class CNNPyTorch(nn.Module): """PyTorch CNN model for time series analysis""" def __init__(self, input_shape, output_size=3): """ Initialize the CNN model. Args: input_shape (tuple): Shape of input data (window_size, features) output_size (int): Size of output (1 for regression, 3 for classification) """ super(CNNPyTorch, self).__init__() window_size, num_features = input_shape # Architecture parameters filters = [32, 64, 128] kernel_sizes = [3, 5, 7] lstm_units = 100 dense_units = 64 dropout_rate = 0.3 # Create parallel convolutional pathways self.conv_paths = nn.ModuleList() for f, k in zip(filters, kernel_sizes): path = nn.Sequential( nn.Conv1d(num_features, f, kernel_size=k, padding='same'), nn.ReLU(), nn.BatchNorm1d(f), nn.MaxPool1d(kernel_size=2, stride=1, padding=1), nn.Dropout(dropout_rate) ) self.conv_paths.append(path) # Calculate output size from conv paths conv_output_size = sum(filters) * window_size # LSTM layer self.lstm = nn.LSTM( input_size=sum(filters), hidden_size=lstm_units, batch_first=True, bidirectional=True ) # Dense layers self.flatten = nn.Flatten() self.dense1 = nn.Sequential( nn.Linear(lstm_units * 2 * window_size, dense_units), nn.ReLU(), nn.BatchNorm1d(dense_units), nn.Dropout(dropout_rate) ) # Output layer self.output = nn.Linear(dense_units, output_size) # Activation based on output size if output_size == 1: self.activation = nn.Sigmoid() # Binary classification or regression elif output_size > 1: self.activation = nn.Softmax(dim=1) # Multi-class classification else: self.activation = nn.Identity() # No activation def forward(self, x): """ Forward pass through the network. Args: x: Input tensor of shape [batch_size, window_size, features] Returns: Output tensor of shape [batch_size, output_size] """ batch_size, window_size, num_features = x.shape # Transpose for conv1d: [batch, features, window] x_t = x.transpose(1, 2) # Process through parallel conv paths conv_outputs = [] for path in self.conv_paths: conv_outputs.append(path(x_t)) # Concatenate conv outputs conv_concat = torch.cat(conv_outputs, dim=1) # Transpose back for LSTM: [batch, window, features] conv_concat = conv_concat.transpose(1, 2) # LSTM processing lstm_out, _ = self.lstm(conv_concat) # Flatten flattened = self.flatten(lstm_out) # Dense processing dense_out = self.dense1(flattened) # Output output = self.output(dense_out) # Apply activation return self.activation(output) class CNNModelPyTorch: """ CNN model wrapper class for time series analysis using PyTorch. This class provides methods for building, training, evaluating, and making predictions with the CNN model. """ def __init__(self, window_size, num_features, output_size=3, timeframes=None): """ Initialize the CNN model. Args: window_size (int): Size of the input window num_features (int): Number of features in the input data output_size (int): Size of the output (1 for regression, 3 for classification) timeframes (list): List of timeframes used (for logging) """ self.window_size = window_size self.num_features = num_features self.output_size = output_size self.timeframes = timeframes or [] # Determine device (GPU or CPU) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {self.device}") # Initialize model self.model = None self.build_model() # Initialize training history self.history = { 'loss': [], 'val_loss': [], 'accuracy': [], 'val_accuracy': [] } def build_model(self): """Build the CNN model architecture""" logger.info(f"Building PyTorch CNN model with window_size={self.window_size}, " f"num_features={self.num_features}, output_size={self.output_size}") self.model = CNNPyTorch( input_shape=(self.window_size, self.num_features), output_size=self.output_size ).to(self.device) # Initialize optimizer self.optimizer = optim.Adam(self.model.parameters(), lr=0.001) # Initialize loss function based on output size if self.output_size == 1: self.criterion = nn.BCELoss() # Binary classification elif self.output_size > 1: self.criterion = nn.CrossEntropyLoss() # Multi-class classification else: self.criterion = nn.MSELoss() # Regression logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters") def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100): """ Train the CNN model. Args: X_train: Training input data y_train: Training target data X_val: Validation input data y_val: Validation target data batch_size: Batch size for training epochs: Number of training epochs Returns: Training history """ logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, " f"batch_size={batch_size}, epochs={epochs}") # Convert numpy arrays to PyTorch tensors X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device) # Handle different output sizes for y_train if self.output_size == 1: y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device) else: y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device) # Create DataLoader for training data train_dataset = TensorDataset(X_train_tensor, y_train_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Create DataLoader for validation data if provided if X_val is not None and y_val is not None: X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device) if self.output_size == 1: y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device) else: y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device) val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=batch_size) else: val_loader = None # Training loop for epoch in range(epochs): # Training phase self.model.train() running_loss = 0.0 correct = 0 total = 0 for inputs, targets in train_loader: # Zero the parameter gradients self.optimizer.zero_grad() # Forward pass outputs = self.model(inputs) # Calculate loss if self.output_size == 1: loss = self.criterion(outputs, targets.unsqueeze(1)) else: loss = self.criterion(outputs, targets) # Backward pass and optimize loss.backward() self.optimizer.step() # Statistics running_loss += loss.item() if self.output_size > 1: _, predicted = torch.max(outputs, 1) total += targets.size(0) correct += (predicted == targets).sum().item() epoch_loss = running_loss / len(train_loader) epoch_acc = correct / total if total > 0 else 0 # Validation phase if val_loader is not None: val_loss, val_acc = self._validate(val_loader) logger.info(f"Epoch {epoch+1}/{epochs} - " f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - " f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}") # Update history self.history['loss'].append(epoch_loss) self.history['accuracy'].append(epoch_acc) self.history['val_loss'].append(val_loss) self.history['val_accuracy'].append(val_acc) else: logger.info(f"Epoch {epoch+1}/{epochs} - " f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}") # Update history without validation self.history['loss'].append(epoch_loss) self.history['accuracy'].append(epoch_acc) logger.info("Training completed") return self.history def _validate(self, val_loader): """Validate the model using the validation set""" self.model.eval() val_loss = 0.0 correct = 0 total = 0 with torch.no_grad(): for inputs, targets in val_loader: # Forward pass outputs = self.model(inputs) # Calculate loss if self.output_size == 1: loss = self.criterion(outputs, targets.unsqueeze(1)) else: loss = self.criterion(outputs, targets) val_loss += loss.item() # Calculate accuracy if self.output_size > 1: _, predicted = torch.max(outputs, 1) total += targets.size(0) correct += (predicted == targets).sum().item() return val_loss / len(val_loader), correct / total if total > 0 else 0 def evaluate(self, X_test, y_test): """ Evaluate the model on test data. Args: X_test: Test input data y_test: Test target data Returns: dict: Evaluation metrics """ logger.info(f"Evaluating model on {len(X_test)} samples") # Convert to PyTorch tensors X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device) # Get predictions self.model.eval() with torch.no_grad(): y_pred = self.model(X_test_tensor) if self.output_size > 1: _, y_pred_class = torch.max(y_pred, 1) y_pred_class = y_pred_class.cpu().numpy() else: y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten() # Calculate metrics if self.output_size > 1: accuracy = accuracy_score(y_test, y_pred_class) precision = precision_score(y_test, y_pred_class, average='weighted') recall = recall_score(y_test, y_pred_class, average='weighted') f1 = f1_score(y_test, y_pred_class, average='weighted') metrics = { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1_score': f1 } else: accuracy = accuracy_score(y_test, y_pred_class) precision = precision_score(y_test, y_pred_class) recall = recall_score(y_test, y_pred_class) f1 = f1_score(y_test, y_pred_class) metrics = { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1_score': f1 } logger.info(f"Evaluation metrics: {metrics}") return metrics def predict(self, X): """ Make predictions with the model. Args: X: Input data Returns: Predictions """ # Convert to PyTorch tensor X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) # Get predictions self.model.eval() with torch.no_grad(): predictions = self.model(X_tensor) if self.output_size > 1: # Multi-class classification probs = predictions.cpu().numpy() _, class_preds = torch.max(predictions, 1) class_preds = class_preds.cpu().numpy() return class_preds, probs else: # Binary classification or regression preds = predictions.cpu().numpy() if self.output_size == 1: # Binary classification class_preds = (preds > 0.5).astype(int) return class_preds.flatten(), preds.flatten() else: # Regression return preds.flatten(), None def save(self, filepath): """ Save the model to a file. Args: filepath: Path to save the model """ # Create directory if it doesn't exist os.makedirs(os.path.dirname(filepath), exist_ok=True) # Save the model state model_state = { 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'history': self.history, 'window_size': self.window_size, 'num_features': self.num_features, 'output_size': self.output_size, 'timeframes': self.timeframes } torch.save(model_state, f"{filepath}.pt") logger.info(f"Model saved to {filepath}.pt") def load(self, filepath): """ Load the model from a file. Args: filepath: Path to load the model from """ # Check if file exists if not os.path.exists(f"{filepath}.pt"): logger.error(f"Model file {filepath}.pt not found") return False # Load the model state model_state = torch.load(f"{filepath}.pt", map_location=self.device) # Update model parameters self.window_size = model_state['window_size'] self.num_features = model_state['num_features'] self.output_size = model_state['output_size'] self.timeframes = model_state['timeframes'] # Rebuild the model self.build_model() # Load the model state self.model.load_state_dict(model_state['model_state_dict']) self.optimizer.load_state_dict(model_state['optimizer_state_dict']) self.history = model_state['history'] logger.info(f"Model loaded from {filepath}.pt") return True def plot_training_history(self): """Plot the training history""" if not self.history['loss']: logger.warning("No training history to plot") return plt.figure(figsize=(12, 4)) # Plot loss plt.subplot(1, 2, 1) plt.plot(self.history['loss'], label='Training Loss') if 'val_loss' in self.history and self.history['val_loss']: plt.plot(self.history['val_loss'], label='Validation Loss') plt.title('Model Loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend() # Plot accuracy plt.subplot(1, 2, 2) plt.plot(self.history['accuracy'], label='Training Accuracy') if 'val_accuracy' in self.history and self.history['val_accuracy']: plt.plot(self.history['val_accuracy'], label='Validation Accuracy') plt.title('Model Accuracy') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend() # Save the plot os.makedirs('plots', exist_ok=True) plt.savefig(os.path.join('plots', f"cnn_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")) plt.close() logger.info("Training history plots saved to plots directory") def extract_hidden_features(self, X): """ Extract hidden features from the model. Args: X: Input data Returns: Hidden features """ # Convert to PyTorch tensor X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device) # Forward pass through the model up to the last hidden layer self.model.eval() with torch.no_grad(): # Get features before the output layer x_t = X_tensor.transpose(1, 2) # Process through parallel conv paths conv_outputs = [] for path in self.model.conv_paths: conv_outputs.append(path(x_t)) # Concatenate conv outputs conv_concat = torch.cat(conv_outputs, dim=1) # Transpose back for LSTM conv_concat = conv_concat.transpose(1, 2) # LSTM processing lstm_out, _ = self.model.lstm(conv_concat) # Flatten flattened = self.model.flatten(lstm_out) # Dense processing hidden_features = self.model.dense1(flattened) return hidden_features.cpu().numpy()