gogo2/NN/models/cnn_model_pytorch.py
2025-03-29 02:18:25 +02:00

520 lines
18 KiB
Python

#!/usr/bin/env python3
"""
CNN Model - PyTorch Implementation
This module implements a CNN model using PyTorch for time series analysis.
The model consists of multiple convolutional pathways and LSTM layers.
"""
import os
import logging
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Configure logging
logger = logging.getLogger(__name__)
class CNNPyTorch(nn.Module):
"""PyTorch CNN model for time series analysis"""
def __init__(self, input_shape, output_size=5):
"""
Initialize the enhanced CNN model.
Args:
input_shape (tuple): Shape of input data (window_size, features)
output_size (int): Always 5 for our trading signals
"""
super(CNNPyTorch, self).__init__()
window_size, num_features = input_shape
kernel_size = 5
dropout_rate = 0.3
# Enhanced CNN Architecture
self.conv_layers = nn.Sequential(
# Block 1
nn.Conv1d(num_features, 64, kernel_size, padding='same'),
nn.BatchNorm1d(64),
nn.ReLU(),
# Block 2
nn.Conv1d(64, 128, kernel_size, padding='same'),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2),
# Block 3
nn.Conv1d(128, 256, kernel_size, padding='same'),
nn.BatchNorm1d(256),
nn.ReLU(),
# Block 4
nn.Conv1d(256, 512, kernel_size, padding='same'),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.MaxPool1d(2)
)
# Calculate flattened size after conv and pooling
conv_output_size = 512 * (window_size // 4)
# Enhanced dense layers
self.dense_block = nn.Sequential(
nn.Flatten(),
nn.Linear(conv_output_size, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Linear(128, output_size)
)
# Activation based on output size
if output_size == 1:
self.activation = nn.Sigmoid() # Binary classification or regression
elif output_size > 1:
self.activation = nn.Softmax(dim=1) # Multi-class classification
else:
self.activation = nn.Identity() # No activation
def forward(self, x):
"""
Forward pass through enhanced network.
Args:
x: Input tensor of shape [batch_size, window_size, features]
Returns:
Output tensor of shape [batch_size, output_size]
"""
# Transpose for conv1d: [batch, features, window]
x_t = x.transpose(1, 2)
# Process through all CNN layers
conv_out = self.conv_layers(x_t)
# Process through dense layers
output = self.dense_block(conv_out)
return self.activation(output)
class CNNModelPyTorch:
"""
CNN model wrapper class for time series analysis using PyTorch.
This class provides methods for building, training, evaluating, and making
predictions with the CNN model.
"""
def __init__(self, window_size, num_features, output_size=5, timeframes=None):
"""
Initialize the CNN model.
Args:
window_size (int): Size of the input window
num_features (int): Number of features in the input data
output_size (int): Size of the output (1 for regression, 3 for classification)
timeframes (list): List of timeframes used (for logging)
"""
self.window_size = window_size
self.num_features = num_features
self.output_size = output_size
self.timeframes = timeframes or []
# Determine device (GPU or CPU)
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
# Initialize model
self.model = None
self.build_model()
# Initialize training history
self.history = {
'loss': [],
'val_loss': [],
'accuracy': [],
'val_accuracy': []
}
def build_model(self):
"""Build the CNN model architecture"""
logger.info(f"Building PyTorch CNN model with window_size={self.window_size}, "
f"num_features={self.num_features}, output_size={self.output_size}")
self.model = CNNPyTorch(
input_shape=(self.window_size, self.num_features),
output_size=self.output_size
).to(self.device)
# Initialize optimizer
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
# Initialize loss function based on output size
if self.output_size == 1:
self.criterion = nn.BCELoss() # Binary classification
elif self.output_size > 1:
self.criterion = nn.CrossEntropyLoss() # Multi-class classification
else:
self.criterion = nn.MSELoss() # Regression
logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters")
def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100):
"""
Train the CNN model.
Args:
X_train: Training input data
y_train: Training target data
X_val: Validation input data
y_val: Validation target data
batch_size: Batch size for training
epochs: Number of training epochs
Returns:
Training history
"""
logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, "
f"batch_size={batch_size}, epochs={epochs}")
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device)
# Handle different output sizes for y_train
if self.output_size == 1:
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device)
else:
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device)
# Create DataLoader for training data
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for validation data if provided
if X_val is not None and y_val is not None:
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device)
else:
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
else:
val_loader = None
# Training loop
for epoch in range(epochs):
# Training phase
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in train_loader:
# Zero the parameter gradients
self.optimizer.zero_grad()
# Forward pass
outputs = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(outputs, targets.unsqueeze(1))
else:
loss = self.criterion(outputs, targets)
# Backward pass and optimize
loss.backward()
self.optimizer.step()
# Statistics
running_loss += loss.item()
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct / total if total > 0 else 0
# Validation phase
if val_loader is not None:
val_loss, val_acc = self._validate(val_loader)
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - "
f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}")
# Update history
self.history['loss'].append(epoch_loss)
self.history['accuracy'].append(epoch_acc)
self.history['val_loss'].append(val_loss)
self.history['val_accuracy'].append(val_acc)
else:
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}")
# Update history without validation
self.history['loss'].append(epoch_loss)
self.history['accuracy'].append(epoch_acc)
logger.info("Training completed")
return self.history
def _validate(self, val_loader):
"""Validate the model using the validation set"""
self.model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in val_loader:
# Forward pass
outputs = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(outputs, targets.unsqueeze(1))
else:
loss = self.criterion(outputs, targets)
val_loss += loss.item()
# Calculate accuracy
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
return val_loss / len(val_loader), correct / total if total > 0 else 0
def evaluate(self, X_test, y_test):
"""
Evaluate the model on test data.
Args:
X_test: Test input data
y_test: Test target data
Returns:
dict: Evaluation metrics
"""
logger.info(f"Evaluating model on {len(X_test)} samples")
# Convert to PyTorch tensors
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device)
# Get predictions
self.model.eval()
with torch.no_grad():
y_pred = self.model(X_test_tensor)
if self.output_size > 1:
_, y_pred_class = torch.max(y_pred, 1)
y_pred_class = y_pred_class.cpu().numpy()
else:
y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten()
# Calculate metrics
if self.output_size > 1:
accuracy = accuracy_score(y_test, y_pred_class)
precision = precision_score(y_test, y_pred_class, average='weighted')
recall = recall_score(y_test, y_pred_class, average='weighted')
f1 = f1_score(y_test, y_pred_class, average='weighted')
metrics = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1
}
else:
accuracy = accuracy_score(y_test, y_pred_class)
precision = precision_score(y_test, y_pred_class)
recall = recall_score(y_test, y_pred_class)
f1 = f1_score(y_test, y_pred_class)
metrics = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1
}
logger.info(f"Evaluation metrics: {metrics}")
return metrics
def predict(self, X):
"""
Make predictions with the model.
Args:
X: Input data
Returns:
Predictions
"""
# Convert to PyTorch tensor
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
# Get predictions
self.model.eval()
with torch.no_grad():
predictions = self.model(X_tensor)
if self.output_size > 1:
# Multi-class classification
probs = predictions.cpu().numpy()
_, class_preds = torch.max(predictions, 1)
class_preds = class_preds.cpu().numpy()
return class_preds, probs
else:
# Binary classification or regression
preds = predictions.cpu().numpy()
if self.output_size == 1:
# Binary classification
class_preds = (preds > 0.5).astype(int)
return class_preds.flatten(), preds.flatten()
else:
# Regression
return preds.flatten(), None
def save(self, filepath):
"""
Save the model to a file.
Args:
filepath: Path to save the model
"""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Save the model state
model_state = {
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'history': self.history,
'window_size': self.window_size,
'num_features': self.num_features,
'output_size': self.output_size,
'timeframes': self.timeframes
}
torch.save(model_state, f"{filepath}.pt")
logger.info(f"Model saved to {filepath}.pt")
def load(self, filepath):
"""
Load the model from a file.
Args:
filepath: Path to load the model from
"""
# Check if file exists
if not os.path.exists(f"{filepath}.pt"):
logger.error(f"Model file {filepath}.pt not found")
return False
# Load the model state
model_state = torch.load(f"{filepath}.pt", map_location=self.device)
# Update model parameters
self.window_size = model_state['window_size']
self.num_features = model_state['num_features']
self.output_size = model_state['output_size']
self.timeframes = model_state['timeframes']
# Rebuild the model
self.build_model()
# Load the model state
self.model.load_state_dict(model_state['model_state_dict'])
self.optimizer.load_state_dict(model_state['optimizer_state_dict'])
self.history = model_state['history']
logger.info(f"Model loaded from {filepath}.pt")
return True
def plot_training_history(self):
"""Plot the training history"""
if not self.history['loss']:
logger.warning("No training history to plot")
return
plt.figure(figsize=(12, 4))
# Plot loss
plt.subplot(1, 2, 1)
plt.plot(self.history['loss'], label='Training Loss')
if 'val_loss' in self.history and self.history['val_loss']:
plt.plot(self.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(self.history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in self.history and self.history['val_accuracy']:
plt.plot(self.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
# Save the plot
os.makedirs('plots', exist_ok=True)
plt.savefig(os.path.join('plots', f"cnn_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"))
plt.close()
logger.info("Training history plots saved to plots directory")
def extract_hidden_features(self, X):
"""
Extract hidden features from the model - outputs from last dense layer before output.
Args:
X: Input data
Returns:
Hidden features (output from penultimate dense layer)
"""
# Convert to PyTorch tensor
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
# Forward pass through the model
self.model.eval()
with torch.no_grad():
# Get features through CNN layers
x_t = X_tensor.transpose(1, 2)
conv_out = self.model.conv_layers(x_t)
# Process through all dense layers except the output layer
features = conv_out
for layer in self.model.dense_block[:-2]: # Exclude last linear layer and dropout
features = layer(features)
return features.cpu().numpy()