Files
gogo2/NN/models/cnn_model_pytorch.py
2025-03-29 03:53:38 +02:00

568 lines
20 KiB
Python

#!/usr/bin/env python3
"""
CNN Model - PyTorch Implementation
This module implements a CNN model using PyTorch for time series analysis.
The model consists of multiple convolutional pathways and LSTM layers.
"""
import os
import logging
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Configure logging
logger = logging.getLogger(__name__)
class CNNPyTorch(nn.Module):
"""PyTorch CNN model for time series analysis"""
def __init__(self, input_shape, output_size=5):
"""
Initialize the enhanced CNN model.
Args:
input_shape (tuple): Shape of input data (window_size, features)
output_size (int): Always 5 for our trading signals
"""
super(CNNPyTorch, self).__init__()
window_size, num_features = input_shape
kernel_size = 5
dropout_rate = 0.3
# Enhanced CNN Architecture
self.conv_layers = nn.Sequential(
# Block 1
nn.Conv1d(num_features, 64, kernel_size, padding='same'),
nn.BatchNorm1d(64),
nn.ReLU(),
# Block 2
nn.Conv1d(64, 128, kernel_size, padding='same'),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.MaxPool1d(2),
# Block 3
nn.Conv1d(128, 256, kernel_size, padding='same'),
nn.BatchNorm1d(256),
nn.ReLU(),
# Block 4
nn.Conv1d(256, 512, kernel_size, padding='same'),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.MaxPool1d(2)
)
# Calculate flattened size after conv and pooling
conv_output_size = 512 * (window_size // 4)
# Enhanced dense layers
self.dense_block = nn.Sequential(
nn.Flatten(),
nn.Linear(conv_output_size, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Linear(128, output_size)
)
# Activation based on output size
if output_size == 1:
self.activation = nn.Sigmoid() # Binary classification or regression
elif output_size > 1:
self.activation = nn.Softmax(dim=1) # Multi-class classification
else:
self.activation = nn.Identity() # No activation
def forward(self, x):
"""
Forward pass through enhanced network.
Args:
x: Input tensor of shape [batch_size, window_size, features]
Returns:
Output tensor of shape [batch_size, output_size]
"""
# Transpose for conv1d: [batch, features, window]
x_t = x.transpose(1, 2)
# Process through all CNN layers
conv_out = self.conv_layers(x_t)
# Process through dense layers
output = self.dense_block(conv_out)
return self.activation(output)
class CNNModelPyTorch:
"""
CNN model wrapper class for time series analysis using PyTorch.
This class provides methods for building, training, evaluating, and making
predictions with the CNN model.
"""
def __init__(self, window_size, num_features, output_size=5, timeframes=None):
"""
Initialize the CNN model.
Args:
window_size (int): Size of the input window
num_features (int): Number of features in the input data
output_size (int): Size of the output (1 for regression, 3 for classification)
timeframes (list): List of timeframes used (for logging)
"""
self.window_size = window_size
self.num_features = num_features
self.output_size = output_size
self.timeframes = timeframes or []
# Determine device (GPU or CPU)
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
# Initialize model
self.model = None
self.build_model()
# Initialize training history
self.history = {
'loss': [],
'val_loss': [],
'accuracy': [],
'val_accuracy': []
}
def build_model(self):
"""Build the CNN model architecture"""
logger.info(f"Building PyTorch CNN model with window_size={self.window_size}, "
f"num_features={self.num_features}, output_size={self.output_size}")
self.model = CNNPyTorch(
input_shape=(self.window_size, self.num_features),
output_size=self.output_size
).to(self.device)
# Initialize optimizer
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
# Initialize loss function based on output size
if self.output_size == 1:
self.criterion = nn.BCELoss() # Binary classification
elif self.output_size > 1:
self.criterion = nn.CrossEntropyLoss() # Multi-class classification
else:
self.criterion = nn.MSELoss() # Regression
logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters")
def train_epoch(self, X_train, y_train, batch_size=32):
"""Train for one epoch and return loss and accuracy"""
# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device)
else:
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device)
# Create DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in train_loader:
# Zero gradients
self.optimizer.zero_grad()
# Forward pass
outputs = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(outputs, targets.unsqueeze(1))
else:
loss = self.criterion(outputs, targets)
# Backward pass and optimize
loss.backward()
self.optimizer.step()
# Statistics
running_loss += loss.item()
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct / total if total > 0 else 0
return epoch_loss, epoch_acc
def evaluate(self, X_val, y_val):
"""Evaluate on validation data and return loss and accuracy"""
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device)
else:
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=32)
self.model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in val_loader:
# Forward pass
outputs = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(outputs, targets.unsqueeze(1))
else:
loss = self.criterion(outputs, targets)
val_loss += loss.item()
# Calculate accuracy
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
return val_loss / len(val_loader), correct / total if total > 0 else 0
def predict(self, X):
"""Make predictions on input data"""
self.model.eval()
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
with torch.no_grad():
outputs = self.model(X_tensor)
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
return predicted.cpu().numpy()
else:
return outputs.cpu().numpy()
def predict_next_candles(self, X, n_candles=3):
"""
Predict the next n candles for each timeframe.
Args:
X: Input data of shape [batch_size, window_size, features]
n_candles: Number of future candles to predict
Returns:
Dictionary of predictions for each timeframe
"""
self.model.eval()
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
with torch.no_grad():
# Get the last window of data
last_window = X_tensor[-1:] # [1, window_size, features]
# Initialize predictions
predictions = {}
# For each timeframe, predict next n candles
for i, tf in enumerate(self.timeframes):
# Extract features for this timeframe
tf_features = last_window[:, :, i*5:(i+1)*5] # [1, window_size, 5]
# Predict next n candles
tf_predictions = []
current_window = tf_features
for _ in range(n_candles):
# Get prediction for next candle
output = self.model(current_window)
tf_predictions.append(output.cpu().numpy())
# Update window for next prediction
current_window = torch.cat([
current_window[:, 1:, :],
output.unsqueeze(1)
], dim=1)
predictions[tf] = np.concatenate(tf_predictions, axis=0)
return predictions
def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100):
"""
Train the CNN model.
Args:
X_train: Training input data
y_train: Training target data
X_val: Validation input data
y_val: Validation target data
batch_size: Batch size for training
epochs: Number of training epochs
Returns:
Training history
"""
logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, "
f"batch_size={batch_size}, epochs={epochs}")
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device)
# Handle different output sizes for y_train
if self.output_size == 1:
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device)
else:
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device)
# Create DataLoader for training data
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for validation data if provided
if X_val is not None and y_val is not None:
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device)
else:
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
else:
val_loader = None
# Training loop
for epoch in range(epochs):
# Training phase
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in train_loader:
# Zero the parameter gradients
self.optimizer.zero_grad()
# Forward pass
outputs = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(outputs, targets.unsqueeze(1))
else:
loss = self.criterion(outputs, targets)
# Backward pass and optimize
loss.backward()
self.optimizer.step()
# Statistics
running_loss += loss.item()
if self.output_size > 1:
_, predicted = torch.max(outputs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct / total if total > 0 else 0
# Validation phase
if val_loader is not None:
val_loss, val_acc = self.evaluate(X_val, y_val)
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - "
f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}")
# Update history
self.history['loss'].append(epoch_loss)
self.history['accuracy'].append(epoch_acc)
self.history['val_loss'].append(val_loss)
self.history['val_accuracy'].append(val_acc)
else:
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}")
# Update history without validation
self.history['loss'].append(epoch_loss)
self.history['accuracy'].append(epoch_acc)
logger.info("Training completed")
return self.history
def evaluate_metrics(self, X_test, y_test):
"""
Calculate and return comprehensive evaluation metrics as dict
"""
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device)
self.model.eval()
with torch.no_grad():
y_pred = self.model(X_test_tensor)
if self.output_size > 1:
_, y_pred_class = torch.max(y_pred, 1)
y_pred_class = y_pred_class.cpu().numpy()
else:
y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten()
metrics = {
'accuracy': accuracy_score(y_test, y_pred_class),
'precision': precision_score(y_test, y_pred_class, average='weighted', zero_division=0),
'recall': recall_score(y_test, y_pred_class, average='weighted', zero_division=0),
'f1_score': f1_score(y_test, y_pred_class, average='weighted', zero_division=0)
}
return metrics
def save(self, filepath):
"""
Save the model to a file.
Args:
filepath: Path to save the model
"""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Save the model state
model_state = {
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'history': self.history,
'window_size': self.window_size,
'num_features': self.num_features,
'output_size': self.output_size,
'timeframes': self.timeframes
}
torch.save(model_state, f"{filepath}.pt")
logger.info(f"Model saved to {filepath}.pt")
def load(self, filepath):
"""
Load the model from a file.
Args:
filepath: Path to load the model from
"""
# Check if file exists
if not os.path.exists(f"{filepath}.pt"):
logger.error(f"Model file {filepath}.pt not found")
return False
# Load the model state
model_state = torch.load(f"{filepath}.pt", map_location=self.device)
# Update model parameters
self.window_size = model_state['window_size']
self.num_features = model_state['num_features']
self.output_size = model_state['output_size']
self.timeframes = model_state['timeframes']
# Rebuild the model
self.build_model()
# Load the model state
self.model.load_state_dict(model_state['model_state_dict'])
self.optimizer.load_state_dict(model_state['optimizer_state_dict'])
self.history = model_state['history']
logger.info(f"Model loaded from {filepath}.pt")
return True
def plot_training_history(self):
"""Plot the training history"""
if not self.history['loss']:
logger.warning("No training history to plot")
return
plt.figure(figsize=(12, 4))
# Plot loss
plt.subplot(1, 2, 1)
plt.plot(self.history['loss'], label='Training Loss')
if 'val_loss' in self.history and self.history['val_loss']:
plt.plot(self.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(self.history['accuracy'], label='Training Accuracy')
if 'val_accuracy' in self.history and self.history['val_accuracy']:
plt.plot(self.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
# Save the plot
os.makedirs('plots', exist_ok=True)
plt.savefig(os.path.join('plots', f"cnn_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"))
plt.close()
logger.info("Training history plots saved to plots directory")
def extract_hidden_features(self, X):
"""
Extract hidden features from the model - outputs from last dense layer before output.
Args:
X: Input data
Returns:
Hidden features (output from penultimate dense layer)
"""
# Convert to PyTorch tensor
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
# Forward pass through the model
self.model.eval()
with torch.no_grad():
# Get features through CNN layers
x_t = X_tensor.transpose(1, 2)
conv_out = self.model.conv_layers(x_t)
# Process through all dense layers except the output layer
features = conv_out
for layer in self.model.dense_block[:-2]: # Exclude last linear layer and dropout
features = layer(features)
return features.cpu().numpy()