gogo2/NN/models/cnn_model_pytorch.py
Dobromir Popov 73c5ecb0d2 enhancements
2025-04-01 13:46:53 +03:00

768 lines
30 KiB
Python

#!/usr/bin/env python3
"""
CNN Model - PyTorch Implementation (Optimized for Short-Term High-Leverage Trading)
This module implements an enhanced CNN model using PyTorch for time series analysis
with a focus on detecting short-term high-leverage trading opportunities.
Key improvements include attention mechanisms, rapid pattern detection,
and optimized decision thresholds for trading signals.
"""
import os
import logging
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch.nn.functional as F
# Configure logging
logger = logging.getLogger(__name__)
class AttentionLayer(nn.Module):
"""Self-attention layer for time series data"""
def __init__(self, input_dim):
super(AttentionLayer, self).__init__()
self.query = nn.Linear(input_dim, input_dim)
self.key = nn.Linear(input_dim, input_dim)
self.value = nn.Linear(input_dim, input_dim)
self.scale = math.sqrt(input_dim)
def forward(self, x):
# x shape: [batch, channels, seq_len]
batch, channels, seq_len = x.size()
# Reshape for attention computation
x_reshaped = x.transpose(1, 2) # [batch, seq_len, channels]
# Compute query, key, value
q = self.query(x_reshaped) # [batch, seq_len, channels]
k = self.key(x_reshaped) # [batch, seq_len, channels]
v = self.value(x_reshaped) # [batch, seq_len, channels]
# Compute attention scores
attn_scores = torch.bmm(q, k.transpose(1, 2)) / self.scale # [batch, seq_len, seq_len]
attn_weights = F.softmax(attn_scores, dim=2)
# Apply attention
out = torch.bmm(attn_weights, v) # [batch, seq_len, channels]
out = out.transpose(1, 2) # [batch, channels, seq_len]
return out
class CNNPyTorch(nn.Module):
"""
CNN model for time series analysis using PyTorch.
"""
def __init__(self, input_shape, output_size=3):
"""
Initialize the CNN architecture.
Args:
input_shape (tuple): Shape of input data (window_size, features)
output_size (int): Number of output classes
"""
super(CNNPyTorch, self).__init__()
# Set device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
window_size, num_features = input_shape
self.window_size = window_size
# Simpler architecture with fewer layers and dropout
self.conv1 = nn.Sequential(
nn.Conv1d(num_features, 32, kernel_size=3, padding=1),
nn.BatchNorm1d(32),
nn.ReLU(),
nn.Dropout(0.2)
)
self.conv2 = nn.Sequential(
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(0.2)
)
# Global average pooling to handle variable length sequences
self.global_pool = nn.AdaptiveAvgPool1d(1)
# Fully connected layers
self.fc = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(32, output_size)
)
def forward(self, x):
"""
Forward pass through the network.
Args:
x: Input tensor of shape [batch_size, window_size, features]
Returns:
action_probs: Action probabilities
"""
# Transpose for conv1d: [batch, features, window]
x = x.transpose(1, 2)
# Convolutional layers
x = self.conv1(x)
x = self.conv2(x)
# Global pooling
x = self.global_pool(x)
x = x.squeeze(-1)
# Fully connected layers
action_logits = self.fc(x)
# Apply class weights to reduce HOLD bias
# This helps overcome the dataset imbalance that often favors HOLD
class_weights = torch.tensor([2.5, 0.4, 2.5], device=self.device) # Higher weights for BUY/SELL
weighted_logits = action_logits * class_weights
# Add random perturbation during training to encourage exploration
if self.training:
# Add small noise to encourage exploration
noise = torch.randn_like(weighted_logits) * 0.3
weighted_logits = weighted_logits + noise
# Softmax to get probabilities
action_probs = F.softmax(weighted_logits, dim=1)
return action_probs, None # Return None for price_pred as we're focusing on actions
class CNNModelPyTorch:
"""
High-level wrapper for the CNN model with training and evaluation functionality.
"""
def __init__(self, window_size=20, timeframes=None, output_size=3, num_pairs=3):
"""
Initialize the model.
Args:
window_size (int): Size of the input window
timeframes (list): List of timeframes to use
output_size (int): Number of output classes
num_pairs (int): Number of trading pairs
"""
self.window_size = window_size
self.timeframes = timeframes or ["1m", "5m", "15m"]
self.output_size = output_size
self.num_pairs = num_pairs
# Set device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f"Using device: {self.device}")
# Initialize the underlying CNN model
input_shape = (window_size, len(self.timeframes) * 5) # 5 features per timeframe
self.model = CNNPyTorch(input_shape, output_size).to(self.device)
# Initialize optimizer with lower learning rate for stability
self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001, weight_decay=0.01)
# Initialize loss functions
self.action_criterion = nn.CrossEntropyLoss()
# Training history
self.history = {
'train_loss': [],
'val_loss': [],
'train_acc': [],
'val_acc': []
}
# For compatibility with older code
self.train_losses = []
self.val_losses = []
self.train_accuracies = []
self.val_accuracies = []
# Initialize action counts
self.action_counts = {
'BUY': [0, 0], # [total, correct]
'SELL': [0, 0], # [total, correct]
'HOLD': [0, 0] # [total, correct]
}
logger.info(f"Building PyTorch CNN model with window_size={window_size}, output_size={output_size}")
# Learning rate scheduler
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer,
mode='min',
factor=0.5,
patience=5,
verbose=True
)
# Sensitivity parameters for high-leverage trading
self.confidence_threshold = 0.65
self.max_consecutive_same_action = 3
self.last_actions = [[] for _ in range(num_pairs)] # Track recent actions per pair
def train_epoch(self, X_train, y_train, future_prices, batch_size):
"""Train the model for one epoch with focus on short-term pattern recognition"""
self.model.train()
total_loss = 0
total_correct = 0
total_samples = 0
# Convert inputs to tensors and create DataLoader
X_train_tensor = torch.FloatTensor(X_train).to(self.device)
y_train_tensor = torch.LongTensor(y_train).to(self.device)
# Create dataset and dataloader
dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Training loop
for batch_X, batch_y in train_loader:
self.optimizer.zero_grad()
# Forward pass
action_probs, _ = self.model(batch_X)
# Calculate loss
loss = self.action_criterion(action_probs, batch_y)
# Backward pass and optimization
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
# Update metrics
total_loss += loss.item()
predictions = torch.argmax(action_probs, dim=1)
total_correct += (predictions == batch_y).sum().item()
total_samples += batch_y.size(0)
# Update action counts
for i, (pred, target) in enumerate(zip(predictions, batch_y)):
pred_action = ['SELL', 'HOLD', 'BUY'][pred.item()]
self.action_counts[pred_action][0] += 1
if pred.item() == target.item():
self.action_counts[pred_action][1] += 1
# Calculate average loss and accuracy
avg_loss = total_loss / len(train_loader)
accuracy = total_correct / total_samples
# Update training history
self.history['train_loss'].append(avg_loss)
self.history['train_acc'].append(accuracy)
self.train_losses.append(avg_loss)
self.train_accuracies.append(accuracy)
# Log trading signals
for action in ['BUY', 'SELL', 'HOLD']:
total = self.action_counts[action][0]
correct = self.action_counts[action][1]
precision = correct / total if total > 0 else 0
logger.info(f"Trading signals - {action}: {total}, Precision: {precision:.4f}")
return avg_loss, 0, accuracy # Return 0 for price_loss as we're not using it
def evaluate(self, X_val, y_val, future_prices=None):
"""Evaluate the model with focus on short-term trading performance metrics"""
self.model.eval()
total_loss = 0
total_correct = 0
total_samples = 0
# Convert inputs to tensors
X_val_tensor = torch.FloatTensor(X_val).to(self.device)
y_val_tensor = torch.LongTensor(y_val).to(self.device)
# Create dataset and dataloader
dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(dataset, batch_size=32)
with torch.no_grad():
for batch_X, batch_y in val_loader:
# Forward pass
action_probs, _ = self.model(batch_X)
# Calculate loss
loss = self.action_criterion(action_probs, batch_y)
# Update metrics
total_loss += loss.item()
predictions = torch.argmax(action_probs, dim=1)
total_correct += (predictions == batch_y).sum().item()
total_samples += batch_y.size(0)
# Calculate average loss and accuracy
avg_loss = total_loss / len(val_loader)
accuracy = total_correct / total_samples
# Update validation history
self.history['val_loss'].append(avg_loss)
self.history['val_acc'].append(accuracy)
self.val_losses.append(avg_loss)
self.val_accuracies.append(accuracy)
# Update learning rate scheduler
self.scheduler.step(avg_loss)
return avg_loss, 0, accuracy # Return 0 for price_loss as we're not using it
def predict(self, X):
"""Make predictions optimized for short-term high-leverage trading signals"""
self.model.eval()
# Convert to tensor if not already
if not isinstance(X, torch.Tensor):
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
else:
X_tensor = X.to(self.device)
with torch.no_grad():
action_probs, price_pred = self.model(X_tensor)
# Post-processing optimized for short-term trading signals
action_probs_np = action_probs.cpu().numpy()
# Apply more aggressive HOLD reduction for short-term trading
action_probs_np[:, 1] *= 0.3 # More aggressive HOLD reduction
# Apply boosting for BUY/SELL signals
action_probs_np[:, 0] *= 2.0 # Boost SELL probabilities
action_probs_np[:, 2] *= 2.0 # Boost BUY probabilities
# Re-normalize
action_probs_np = action_probs_np / action_probs_np.sum(axis=1, keepdims=True)
# Store the predicted action for the most recent input
if action_probs_np.shape[0] > 0:
latest_action = np.argmax(action_probs_np[-1])
self.last_actions[0].append(int(latest_action))
# Keep only the most recent actions
self.last_actions[0] = self.last_actions[0][-10:] # Store last 10 actions
# Update action counts for stats
actions = np.argmax(action_probs_np, axis=1)
unique, counts = np.unique(actions, return_counts=True)
action_dict = dict(zip(unique, counts))
if 0 in action_dict:
self.action_counts['SELL'][0] += action_dict[0]
if 1 in action_dict:
self.action_counts['HOLD'][0] += action_dict[1]
if 2 in action_dict:
self.action_counts['BUY'][0] += action_dict[2]
# If price_pred is None, create a dummy array of zeros
if price_pred is None:
# Get the current close prices from the input if available
current_prices = X_tensor[:, -1, 3].cpu().numpy() if X_tensor.shape[2] > 3 else np.zeros(X_tensor.shape[0])
# Calculate price directions based on probabilities
price_directions = action_probs_np[:, 2] - action_probs_np[:, 0] # BUY - SELL
# Scale the price change based on signal strength
price_preds = current_prices * (1 + price_directions * 0.002)
return action_probs_np, price_preds.reshape(-1, 1)
else:
return action_probs_np, price_pred.cpu().numpy()
def predict_next_candles(self, X, n_candles=3):
"""
Predict the next n candles with focus on short-term signals.
Args:
X: Input data of shape [batch_size, window_size, features]
n_candles: Number of future candles to predict
Returns:
Dictionary of predictions for each timeframe
"""
self.model.eval()
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
with torch.no_grad():
# Get initial predictions
action_probs, price_pred = self.model(X_tensor)
action_probs_np = action_probs.cpu().numpy()
# Apply more aggressive processing for short-term signals
action_probs_np[:, 1] *= 0.5 # Reduce HOLD
action_probs_np[:, 0] *= 1.3 # Boost SELL
action_probs_np[:, 2] *= 1.3 # Boost BUY
# Re-normalize
action_probs_np = action_probs_np / action_probs_np.sum(axis=1, keepdims=True)
# For short-term predictions, implement decay of signal over time
# First candle: full signal, then gradually decay
predictions = {}
for i, tf in enumerate(self.timeframes):
tf_preds = np.zeros((n_candles, action_probs_np.shape[0], 3))
for j in range(n_candles):
# Apply decay factor to move signals toward HOLD over time
# (short-term signals shouldn't persist too long)
decay_factor = max(0.1, 1.0 - j * 0.3)
# First, move probabilities toward HOLD with decay
decayed_probs = action_probs_np.copy()
decayed_probs[:, 0] = action_probs_np[:, 0] * decay_factor # Decay SELL
decayed_probs[:, 2] = action_probs_np[:, 2] * decay_factor # Decay BUY
# Increase HOLD probability to compensate
hold_increase = (1.0 - decay_factor) * (action_probs_np[:, 0] + action_probs_np[:, 2])
decayed_probs[:, 1] = action_probs_np[:, 1] + hold_increase
# Re-normalize
decayed_probs = decayed_probs / decayed_probs.sum(axis=1, keepdims=True)
# Store in predictions array
tf_preds[j] = decayed_probs
# Store in output dictionary
predictions[tf] = tf_preds
return predictions
def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100):
"""
Train the CNN model.
Args:
X_train: Training input data
y_train: Training target data
X_val: Validation input data
y_val: Validation target data
batch_size: Batch size for training
epochs: Number of training epochs
Returns:
Training history
"""
logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, "
f"batch_size={batch_size}, epochs={epochs}")
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device)
# Handle different output sizes for y_train
if self.output_size == 1:
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device)
else:
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device)
# Create DataLoader for training data
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for validation data if provided
if X_val is not None and y_val is not None:
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device)
else:
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
else:
val_loader = None
# Training loop
for epoch in range(epochs):
# Training phase
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in train_loader:
# Zero the parameter gradients
self.optimizer.zero_grad()
# Forward pass
action_probs, price_pred = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(action_probs, targets.unsqueeze(1))
else:
loss = self.criterion(action_probs, targets)
# Backward pass and optimize
loss.backward()
self.optimizer.step()
# Statistics
running_loss += loss.item()
_, predicted = torch.max(action_probs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct / total if total > 0 else 0
# Validation phase
if val_loader is not None:
val_loss, val_acc = self.evaluate(X_val, y_val)
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - "
f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}")
# Update history
self.train_losses.append(epoch_loss)
self.train_accuracies.append(epoch_acc)
self.val_losses.append(val_loss)
self.val_accuracies.append(val_acc)
else:
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}")
# Update history without validation
self.train_losses.append(epoch_loss)
self.train_accuracies.append(epoch_acc)
logger.info("Training completed")
return {
'loss': self.train_losses,
'accuracy': self.train_accuracies,
'val_loss': self.val_losses,
'val_accuracy': self.val_accuracies
}
def evaluate_metrics(self, X_test, y_test):
"""
Calculate and return comprehensive evaluation metrics as dict
"""
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device)
self.model.eval()
with torch.no_grad():
y_pred = self.model(X_test_tensor)
if self.output_size > 1:
_, y_pred_class = torch.max(y_pred, 1)
y_pred_class = y_pred_class.cpu().numpy()
else:
y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten()
metrics = {
'accuracy': accuracy_score(y_test, y_pred_class),
'precision': precision_score(y_test, y_pred_class, average='weighted', zero_division=0),
'recall': recall_score(y_test, y_pred_class, average='weighted', zero_division=0),
'f1_score': f1_score(y_test, y_pred_class, average='weighted', zero_division=0)
}
return metrics
def save(self, filepath):
"""
Save the model to a file with trading configuration.
Args:
filepath: Path to save the model
"""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Save the model state with additional trading parameters
model_state = {
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'history': self.history,
'window_size': self.window_size,
'num_features': len(self.timeframes) * 5, # 5 features per timeframe
'output_size': self.output_size,
'timeframes': self.timeframes,
# Save trading configuration
'confidence_threshold': self.confidence_threshold,
'max_consecutive_same_action': self.max_consecutive_same_action,
'action_counts': self.action_counts,
'last_actions': self.last_actions,
# Save model version information
'model_version': 'short_term_optimized_v2.0',
'timestamp': datetime.now().strftime('%Y%m%d_%H%M%S')
}
torch.save(model_state, f"{filepath}.pt")
logger.info(f"Model saved to {filepath}.pt with short-term trading optimizations")
# Save a backup of the model periodically
backup_dir = f"{filepath}_backup"
os.makedirs(backup_dir, exist_ok=True)
backup_path = os.path.join(backup_dir, f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pt")
torch.save(model_state, backup_path)
logger.info(f"Backup saved to {backup_path}")
def load(self, filepath):
"""Load model weights from file"""
if not os.path.exists(f"{filepath}.pt"):
logger.error(f"Model file {filepath}.pt not found")
return False
try:
# Load the model state
model_state = torch.load(f"{filepath}.pt", map_location=self.device)
# Update model parameters
self.window_size = model_state['window_size']
self.total_features = model_state['num_features']
self.output_size = model_state['output_size']
self.timeframes = model_state.get('timeframes', ["1m"])
# Load model state dict
self.model.load_state_dict(model_state['model_state_dict'])
# Load optimizer state if available
if 'optimizer_state_dict' in model_state:
self.optimizer.load_state_dict(model_state['optimizer_state_dict'])
# Load trading configuration if available
if 'confidence_threshold' in model_state:
self.confidence_threshold = model_state['confidence_threshold']
if 'max_consecutive_same_action' in model_state:
self.max_consecutive_same_action = model_state['max_consecutive_same_action']
# Log model version information if available
if 'model_version' in model_state:
logger.info(f"Model version: {model_state['model_version']}")
if 'timestamp' in model_state:
logger.info(f"Model timestamp: {model_state['timestamp']}")
return True
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
return False
def plot_training_history(self, metrics_file="NN/models/saved/training_metrics.json"):
"""
Plot training history from saved metrics.
Args:
metrics_file: Path to the saved metrics JSON file
"""
try:
import json
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
# Load metrics
with open(metrics_file, 'r') as f:
metrics = json.load(f)
# Create plots directory
plots_dir = os.path.join(os.path.dirname(metrics_file), 'plots')
os.makedirs(plots_dir, exist_ok=True)
# Convert timestamps to datetime objects
timestamps = [datetime.fromisoformat(ts) for ts in metrics['timestamps']]
# 1. Plot Loss and Accuracy
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
# Loss plot
ax1.plot(timestamps, metrics['train_loss'], 'b-', label='Training Loss')
ax1.plot(timestamps, metrics['val_loss'], 'r-', label='Validation Loss')
ax1.set_title('Model Loss Over Time')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)
# Accuracy plot
ax2.plot(timestamps, metrics['train_acc'], 'g-', label='Training Accuracy')
ax2.plot(timestamps, metrics['val_acc'], 'm-', label='Validation Accuracy')
ax2.set_title('Model Accuracy Over Time')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True)
# Format x-axis
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
plt.xticks(rotation=45)
# Save the plot
plt.tight_layout()
plt.savefig(os.path.join(plots_dir, 'loss_accuracy.png'))
plt.close()
# 2. Plot PnL and Win Rate
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
# PnL plot
ax1.plot(timestamps, metrics['train_pnl'], 'g-', label='Training PnL')
ax1.plot(timestamps, metrics['val_pnl'], 'r-', label='Validation PnL')
ax1.set_title('PnL Over Time')
ax1.set_ylabel('PnL')
ax1.legend()
ax1.grid(True)
# Win Rate plot
ax2.plot(timestamps, metrics['train_win_rate'], 'b-', label='Training Win Rate')
ax2.plot(timestamps, metrics['val_win_rate'], 'm-', label='Validation Win Rate')
ax2.set_title('Win Rate Over Time')
ax2.set_ylabel('Win Rate')
ax2.legend()
ax2.grid(True)
# Format x-axis
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
plt.xticks(rotation=45)
# Save the plot
plt.tight_layout()
plt.savefig(os.path.join(plots_dir, 'pnl_winrate.png'))
plt.close()
print(f"Performance visualizations saved to {plots_dir}")
return True
except Exception as e:
print(f"Error generating plots: {str(e)}")
import traceback
print(traceback.format_exc())
return False
def extract_hidden_features(self, X):
"""
Extract hidden features from the model - outputs from last dense layer before output.
Args:
X: Input data
Returns:
Hidden features (output from penultimate dense layer)
"""
# Convert to PyTorch tensor
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
# Forward pass through the model
self.model.eval()
with torch.no_grad():
# Get features through CNN layers
x_t = X_tensor.transpose(1, 2)
conv_out = self.model.conv_layers(x_t)
# Process through all dense layers except the output layer
features = conv_out
for layer in self.model.dense_block[:-2]: # Exclude last linear layer and dropout
features = layer(features)
return features.cpu().numpy()