gogo2/NN/models/cnn_model_pytorch.py
2025-03-31 14:22:33 +03:00

1107 lines
47 KiB
Python

#!/usr/bin/env python3
"""
CNN Model - PyTorch Implementation (Optimized for Short-Term High-Leverage Trading)
This module implements an enhanced CNN model using PyTorch for time series analysis
with a focus on detecting short-term high-leverage trading opportunities.
Key improvements include attention mechanisms, rapid pattern detection,
and optimized decision thresholds for trading signals.
"""
import os
import logging
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch.nn.functional as F
# Configure logging
logger = logging.getLogger(__name__)
class AttentionLayer(nn.Module):
"""Self-attention layer for time series data"""
def __init__(self, input_dim):
super(AttentionLayer, self).__init__()
self.query = nn.Linear(input_dim, input_dim)
self.key = nn.Linear(input_dim, input_dim)
self.value = nn.Linear(input_dim, input_dim)
self.scale = math.sqrt(input_dim)
def forward(self, x):
# x shape: [batch, channels, seq_len]
batch, channels, seq_len = x.size()
# Reshape for attention computation
x_reshaped = x.transpose(1, 2) # [batch, seq_len, channels]
# Compute query, key, value
q = self.query(x_reshaped) # [batch, seq_len, channels]
k = self.key(x_reshaped) # [batch, seq_len, channels]
v = self.value(x_reshaped) # [batch, seq_len, channels]
# Compute attention scores
attn_scores = torch.bmm(q, k.transpose(1, 2)) / self.scale # [batch, seq_len, seq_len]
attn_weights = F.softmax(attn_scores, dim=2)
# Apply attention
out = torch.bmm(attn_weights, v) # [batch, seq_len, channels]
out = out.transpose(1, 2) # [batch, channels, seq_len]
return out
class CNNPyTorch(nn.Module):
"""
CNN model for time series analysis using PyTorch.
"""
def __init__(self, input_shape, output_size=3):
"""
Initialize the CNN architecture.
Args:
input_shape (tuple): Shape of input data (window_size, features)
output_size (int): Number of output classes
"""
super(CNNPyTorch, self).__init__()
# Set device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
window_size, num_features = input_shape
self.window_size = window_size
# Increased dropout for better generalization
dropout_rate = 0.25
# Convolutional layers with wider kernels for better pattern detection
self.conv1 = nn.Sequential(
nn.Conv1d(num_features, 64, kernel_size=5, padding=2),
nn.BatchNorm1d(64),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate)
)
self.conv2 = nn.Sequential(
nn.Conv1d(64, 128, kernel_size=5, padding=2),
nn.BatchNorm1d(128),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate)
)
# Micro-movement detection with smaller kernels
self.micro_conv = nn.Sequential(
nn.Conv1d(num_features, 32, kernel_size=3, padding=1),
nn.BatchNorm1d(32),
nn.LeakyReLU(0.1),
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm1d(64),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate)
)
# Attention mechanism for pattern importance weighting
self.attention = nn.Conv1d(64, 1, kernel_size=1)
self.softmax = nn.Softmax(dim=2)
# Define a fixed output size for conv features to avoid dimension mismatch
fixed_conv_size = 10 # This should match the expected size in forward pass
# Use adaptive pooling to get fixed size regardless of input
self.adaptive_pool = nn.AdaptiveAvgPool1d(fixed_conv_size)
# Calculate input size for fully connected layer
# After adaptive pooling, dimensions are [batch_size, channels, fixed_conv_size]
conv2_flat_size = 128 * fixed_conv_size # From conv2
micro_flat_size = 64 * fixed_conv_size # From micro_conv
fc_input_size = conv2_flat_size + micro_flat_size
# Shared fully connected layers
self.shared_fc = nn.Sequential(
nn.Linear(fc_input_size, 256),
nn.BatchNorm1d(256),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate)
)
# Action prediction head
self.action_fc = nn.Sequential(
nn.Linear(256, 64),
nn.BatchNorm1d(64),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate),
nn.Linear(64, output_size)
)
# Price prediction head
self.price_fc = nn.Sequential(
nn.Linear(256, 64),
nn.BatchNorm1d(64),
nn.LeakyReLU(0.1),
nn.Dropout(dropout_rate),
nn.Linear(64, 1) # Predict price change percentage
)
# Confidence thresholds for decision making
self.buy_threshold = 0.55 # Higher threshold for BUY signals
self.sell_threshold = 0.55 # Higher threshold for SELL signals
def forward(self, x):
"""
Forward pass through the network with enhanced pattern detection.
Args:
x: Input tensor of shape [batch_size, window_size, features]
Returns:
Tuple of (action_probs, price_pred)
"""
# Transpose for conv1d: [batch, features, window]
x = x.transpose(1, 2)
# Main convolutional layers
conv1_out = self.conv1(x)
conv2_out = self.conv2(conv1_out) # Use conv1_out as input to conv2
# Micro-movement pattern detection
micro_out = self.micro_conv(x)
# Apply adaptive pooling to ensure fixed size output for both paths
# This ensures both tensors have the same size at dimension 2
micro_out = self.adaptive_pool(micro_out) # Output: [batch, 64, 10]
conv2_out = self.adaptive_pool(conv2_out) # Output: [batch, 128, 10]
# Apply attention to conv1 output to detect important patterns
attention = self.attention(conv1_out)
attention = self.softmax(attention)
# Flatten and concatenate features
conv2_flat = conv2_out.reshape(conv2_out.size(0), -1) # [batch, 128*10]
micro_flat = micro_out.reshape(micro_out.size(0), -1) # [batch, 64*10]
features = torch.cat([conv2_flat, micro_flat], dim=1)
# Shared layers
shared_features = self.shared_fc(features)
# Action head
action_logits = self.action_fc(shared_features)
action_probs = F.softmax(action_logits, dim=1)
# Price prediction head
price_pred = self.price_fc(shared_features)
# Adjust confidence thresholds to favor decisive trading actions
with torch.no_grad():
# Reduce HOLD probabilities more aggressively for short-term trading
action_probs[:, 1] *= 0.4 # More aggressive reduction of HOLD (index 1) probabilities
# Identify high-confidence signals and boost them further
sell_mask = action_probs[:, 0] > self.sell_threshold
buy_mask = action_probs[:, 2] > self.buy_threshold
# Boost high-confidence signals even more
action_probs[sell_mask, 0] *= 1.8 # Higher boost for high-confidence SELL signals
action_probs[buy_mask, 2] *= 1.8 # Higher boost for high-confidence BUY signals
# For other cases, provide moderate boost
action_probs[:, 0] *= 1.4 # Boost SELL probabilities
action_probs[:, 2] *= 1.4 # Boost BUY probabilities
# Re-normalize to sum to 1
action_probs = action_probs / action_probs.sum(dim=1, keepdim=True)
return action_probs, price_pred
class CNNModelPyTorch:
"""
CNN model wrapper class for time series analysis using PyTorch.
This class provides methods for building, training, evaluating, and making
predictions with the CNN model, optimized for short-term trading opportunities.
"""
def __init__(self, window_size=20, timeframes=None, output_size=3, num_pairs=3):
"""
Initialize the CNN model.
Args:
window_size (int): Size of the sliding window
timeframes (list): List of timeframes used
output_size (int): Number of output classes (3 for BUY/HOLD/SELL)
num_pairs (int): Number of trading pairs to analyze in parallel (default 3)
"""
self.window_size = window_size
self.timeframes = timeframes if timeframes else ["1m", "5m", "15m"]
self.output_size = output_size
self.num_pairs = num_pairs
# Calculate total features (5 OHLCV features per timeframe per pair)
self.total_features = len(self.timeframes) * 5 * self.num_pairs
# Build the model
logger.info(f"Building PyTorch CNN model with window_size={window_size}, "
f"num_features={self.total_features}, output_size={output_size}, "
f"num_pairs={num_pairs}")
# Calculate channel sizes that are divisible by num_pairs
base_channels = 96 # 96 is divisible by 3
self.model = nn.Sequential(
# First convolutional layer - process each pair's features
nn.Sequential(
nn.Conv1d(self.total_features, base_channels, kernel_size=5, padding=2, groups=num_pairs),
nn.ReLU(),
nn.BatchNorm1d(base_channels),
nn.Dropout(0.2)
),
# Second convolutional layer - start mixing pair information
nn.Sequential(
nn.Conv1d(base_channels, base_channels*2, kernel_size=3, padding=1),
nn.ReLU(),
nn.BatchNorm1d(base_channels*2),
nn.Dropout(0.2)
),
# Third convolutional layer - deeper feature extraction
nn.Sequential(
nn.Conv1d(base_channels*2, base_channels*4, kernel_size=3, padding=1),
nn.ReLU(),
nn.BatchNorm1d(base_channels*4),
nn.Dropout(0.2)
),
# Global average pooling
nn.AdaptiveAvgPool1d(1),
# Flatten
nn.Flatten(),
# Dense layers for action prediction with cross-pair attention
nn.Sequential(
nn.Linear(base_channels*4, base_channels*2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(base_channels*2, base_channels),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(base_channels, output_size * num_pairs) # Output for each pair
)
).to(self.device)
# Initialize optimizer and loss function
self.optimizer = optim.Adam(self.model.parameters(), lr=0.0005)
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='max', factor=0.5, patience=5, verbose=True
)
self.criterion = nn.CrossEntropyLoss()
# Initialize metrics tracking
self.train_losses = []
self.val_losses = []
self.train_accuracies = []
self.val_accuracies = []
logger.info(f"Model built successfully with {sum(p.numel() for p in self.model.parameters())} parameters")
# Sensitivity parameters for high-leverage trading
self.confidence_threshold = 0.65
self.max_consecutive_same_action = 3
self.last_actions = [[] for _ in range(num_pairs)] # Track recent actions per pair
def compute_trading_loss(self, action_probs, price_pred, targets, future_prices=None):
"""
Custom loss function that prioritizes profitable trades
Args:
action_probs: Predicted action probabilities [batch_size, 3]
price_pred: Predicted price changes [batch_size, 1]
targets: Target actions [batch_size]
future_prices: Actual future price changes [batch_size]
Returns:
Total loss value
"""
batch_size = action_probs.size(0)
# Base classification loss
action_loss = self.criterion(action_probs, targets)
# Initialize price and profitability losses
price_loss = torch.tensor(0.0, device=self.device)
profit_loss = torch.tensor(0.0, device=self.device)
diversity_loss = torch.tensor(0.0, device=self.device)
# Get predicted actions
pred_actions = torch.argmax(action_probs, dim=1)
# Calculate signal diversity loss to prevent model from always predicting the same action
# Count actions in the batch
buy_count = (pred_actions == 2).float().sum() / batch_size
sell_count = (pred_actions == 0).float().sum() / batch_size
hold_count = (pred_actions == 1).float().sum() / batch_size
# Enhanced diversity mechanism
# For short-term high-leverage trading, we want a more balanced distribution
# with a slight preference for actions over holds, but still maintaining diversity
# Ideal distribution varies based on market conditions and training phase
# Start with more conservative distribution and gradually shift to more aggressive
if hasattr(self, 'training_progress'):
self.training_progress += 1
else:
self.training_progress = 0
# Early training phase - more balanced with higher HOLD
if self.training_progress < 500:
ideal_buy = 0.3
ideal_sell = 0.3
ideal_hold = 0.4
# Mid training phase - balanced trading signals
elif self.training_progress < 1500:
ideal_buy = 0.35
ideal_sell = 0.35
ideal_hold = 0.3
# Late training phase - more aggressive with tactical HOLDs
else:
ideal_buy = 0.4
ideal_sell = 0.4
ideal_hold = 0.2
# Calculate diversity loss using Kullback-Leibler divergence approximation
# Plus an additional penalty for extreme imbalance
actual_dist = torch.tensor([sell_count, hold_count, buy_count], device=self.device)
ideal_dist = torch.tensor([ideal_sell, ideal_hold, ideal_buy], device=self.device)
# KL divergence component (approximation)
eps = 1e-8 # Small constant to avoid division by zero
kl_div = torch.sum(actual_dist * torch.log((actual_dist + eps) / (ideal_dist + eps)))
# Add strong penalty for extreme predictions (all same class)
max_ratio = torch.max(actual_dist)
if max_ratio > 0.9: # If more than 90% of predictions are the same class
diversity_loss = kl_div + (max_ratio - 0.9) * 5.0 # Stronger penalty
elif max_ratio > 0.7: # If more than 70% predictions are the same class
diversity_loss = kl_div + (max_ratio - 0.7) * 2.0 # Moderate penalty
else:
diversity_loss = kl_div
# Add additional penalty if any class has zero predictions
# This is critical for avoiding scenarios where model never predicts a certain class
zero_class_penalty = 0.0
min_class_ratio = 0.1 # We want at least 10% of each class
if buy_count < min_class_ratio:
zero_class_penalty += (min_class_ratio - buy_count) * 3.0
if sell_count < min_class_ratio:
zero_class_penalty += (min_class_ratio - sell_count) * 3.0
if hold_count < min_class_ratio:
zero_class_penalty += (min_class_ratio - hold_count) * 2.0 # Slightly lower penalty for HOLD
diversity_loss += zero_class_penalty
# If we have future prices, calculate profitability-based losses
if future_prices is not None and future_prices.numel() > 0:
# Calculate price direction loss - penalize wrong direction predictions
if price_pred is not None:
# For each sample where future price is available
valid_mask = ~torch.isnan(future_prices) & (future_prices != 0)
if valid_mask.any():
valid_future = future_prices[valid_mask]
valid_price_pred = price_pred.view(-1)[valid_mask]
# Mean squared error for price prediction
price_loss = F.mse_loss(valid_price_pred, valid_future)
# Direction loss - penalize wrong direction predictions more heavily
pred_direction = torch.sign(valid_price_pred)
true_direction = torch.sign(valid_future)
direction_loss = ((pred_direction != true_direction) & (true_direction != 0)).float().mean()
# Add direction loss to price loss with higher weight
price_loss = price_loss + direction_loss * 2.0
# Calculate trade profitability loss
# This penalizes unprofitable trades more than just wrong classifications
profitable_trades = 0
unprofitable_trades = 0
for i in range(batch_size):
if i < future_prices.size(0) and not torch.isnan(future_prices[i]) and future_prices[i] != 0:
price_change = future_prices[i].item()
# Calculate expected profit/loss based on action
if pred_actions[i] == 0: # SELL
expected_pnl = -price_change # Negative price change is profit for SELL
elif pred_actions[i] == 2: # BUY
expected_pnl = price_change # Positive price change is profit for BUY
else: # HOLD
expected_pnl = 0 # No profit/loss for HOLD
# Enhanced profit/loss penalties with larger gradient for bad trades
if expected_pnl < 0:
# Exponential penalty for larger losses
severity = abs(expected_pnl) ** 1.5 # Higher exponent for short-term trading
profit_loss = profit_loss + torch.tensor(severity, device=self.device) * 2.5
unprofitable_trades += 1
elif expected_pnl > 0:
# Reward for profitable trades (negative loss contribution)
# Higher reward for larger profits
reward = expected_pnl * 0.9
profit_loss = profit_loss - torch.tensor(reward, device=self.device)
profitable_trades += 1
# Calculate win rate and further adjust profit loss
if profitable_trades + unprofitable_trades > 0:
win_rate = profitable_trades / (profitable_trades + unprofitable_trades)
# Add extra penalty if win rate is less than 50%
if win_rate < 0.5:
profit_loss = profit_loss * (1.0 + (0.5 - win_rate) * 2.5)
# Add small reward if win rate is high
elif win_rate > 0.6:
profit_loss = profit_loss * (1.0 - (win_rate - 0.6) * 0.5)
# Combine all loss components with dynamic weighting
# Adjust weights based on training progress
# Early training focuses more on classification accuracy
if self.training_progress < 500:
action_weight = 1.0
price_weight = 0.2
profit_weight = 0.5
diversity_weight = 0.3
# Mid training balances all components
elif self.training_progress < 1500:
action_weight = 0.8
price_weight = 0.3
profit_weight = 0.8
diversity_weight = 0.5
# Late training emphasizes profitability and diversity
else:
action_weight = 0.6
price_weight = 0.3
profit_weight = 1.0
diversity_weight = 0.7
total_loss = (action_weight * action_loss +
price_weight * price_loss +
profit_weight * profit_loss +
diversity_weight * diversity_loss)
return total_loss, action_loss, price_loss
def train_epoch(self, X_train, y_train, future_prices, batch_size):
"""Train the model for one epoch with focus on short-term pattern recognition"""
self.model.train()
total_action_loss = 0
total_price_loss = 0
total_correct = 0
total_samples = 0
# Convert inputs to tensors and create DataLoader
X_train_tensor = torch.FloatTensor(X_train).to(self.device)
y_train_tensor = torch.LongTensor(y_train).to(self.device)
future_prices_tensor = torch.FloatTensor(future_prices).to(self.device) if future_prices is not None else None
# Create dataset and dataloader
if future_prices_tensor is not None:
dataset = TensorDataset(X_train_tensor, y_train_tensor, future_prices_tensor)
else:
dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Training loop
for batch_data in train_loader:
self.optimizer.zero_grad()
# Extract batch data
if len(batch_data) == 3:
batch_X, batch_y, batch_future_prices = batch_data
else:
batch_X, batch_y = batch_data
batch_future_prices = None
# Forward pass
action_probs, price_pred = self.model(batch_X)
# Calculate loss using custom trading loss function
total_loss, action_loss, price_loss = self.compute_trading_loss(
action_probs, price_pred, batch_y, batch_future_prices
)
# Backward pass and optimization
total_loss.backward()
# Apply gradient clipping to prevent exploding gradients
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
# Update metrics
total_action_loss += action_loss.item()
total_price_loss += price_loss.item() if hasattr(price_loss, 'item') else 0
predictions = torch.argmax(action_probs, dim=1)
total_correct += (predictions == batch_y).sum().item()
total_samples += batch_y.size(0)
# Track trading signals for logging
buy_count = (predictions == 2).sum().item()
sell_count = (predictions == 0).sum().item()
hold_count = (predictions == 1).sum().item()
buy_correct = ((predictions == 2) & (batch_y == 2)).sum().item()
sell_correct = ((predictions == 0) & (batch_y == 0)).sum().item()
# Calculate average losses and accuracy
avg_action_loss = total_action_loss / len(train_loader)
avg_price_loss = total_price_loss / len(train_loader)
accuracy = total_correct / total_samples
# Log trading signals
logger.info(f"Trading signals: BUY={buy_count}, SELL={sell_count}, HOLD={hold_count}")
logger.info(f"Signal precision: BUY={buy_correct/max(1, buy_count):.4f}, SELL={sell_correct/max(1, sell_count):.4f}")
# Update learning rate
self.scheduler.step(accuracy)
return avg_action_loss, avg_price_loss, accuracy
def evaluate(self, X_val, y_val, future_prices=None):
"""Evaluate the model with focus on short-term trading performance metrics"""
self.model.eval()
total_action_loss = 0
total_price_loss = 0
total_correct = 0
total_samples = 0
# Additional metrics for trading performance
trade_signals = {'BUY': 0, 'SELL': 0, 'HOLD': 0}
correct_signals = {'BUY': 0, 'SELL': 0, 'HOLD': 0}
# Convert inputs to tensors
X_val_tensor = torch.FloatTensor(X_val).to(self.device)
y_val_tensor = torch.LongTensor(y_val).to(self.device)
future_prices_tensor = torch.FloatTensor(future_prices).to(self.device) if future_prices is not None else None
with torch.no_grad():
# Forward pass
action_probs, price_pred = self.model(X_val_tensor)
# Calculate loss using custom trading loss function
total_loss, action_loss, price_loss = self.compute_trading_loss(
action_probs, price_pred, y_val_tensor, future_prices_tensor
)
# Calculate predictions and accuracy
predictions = torch.argmax(action_probs, dim=1)
# Count prediction types and correct predictions
for i in range(predictions.shape[0]):
pred = predictions[i].item()
if pred == 0:
trade_signals['SELL'] += 1
if y_val_tensor[i].item() == pred:
correct_signals['SELL'] += 1
elif pred == 1:
trade_signals['HOLD'] += 1
if y_val_tensor[i].item() == pred:
correct_signals['HOLD'] += 1
elif pred == 2:
trade_signals['BUY'] += 1
if y_val_tensor[i].item() == pred:
correct_signals['BUY'] += 1
# Update metrics
total_action_loss = action_loss.item()
total_price_loss = price_loss.item() if hasattr(price_loss, 'item') else 0
total_correct = (predictions == y_val_tensor).sum().item()
total_samples = y_val_tensor.size(0)
# Calculate accuracy
accuracy = total_correct / total_samples if total_samples > 0 else 0
# Calculate signal precision (crucial for short-term trading)
buy_precision = correct_signals['BUY'] / trade_signals['BUY'] if trade_signals['BUY'] > 0 else 0
sell_precision = correct_signals['SELL'] / trade_signals['SELL'] if trade_signals['SELL'] > 0 else 0
# Log trading-specific metrics
logger.info(f"Trading signals: BUY={trade_signals['BUY']}, SELL={trade_signals['SELL']}, HOLD={trade_signals['HOLD']}")
logger.info(f"Signal precision: BUY={buy_precision:.4f}, SELL={sell_precision:.4f}")
# Return combined loss, accuracy and volatility factor for adaptive training
return total_action_loss, total_price_loss, accuracy
def predict(self, X):
"""Make predictions optimized for short-term high-leverage trading signals"""
self.model.eval()
# Convert to tensor if not already
if not isinstance(X, torch.Tensor):
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
else:
X_tensor = X.to(self.device)
with torch.no_grad():
action_probs, price_pred = self.model(X_tensor)
# Post-processing optimized for short-term trading signals
action_probs_np = action_probs.cpu().numpy()
# Apply more aggressive HOLD reduction for short-term trading
action_probs_np[:, 1] *= 0.5 # More aggressive HOLD reduction
# Apply boosting for BUY/SELL signals
action_probs_np[:, 0] *= 1.3 # Boost SELL probabilities
action_probs_np[:, 2] *= 1.3 # Boost BUY probabilities
# Implement signal filtering based on previous actions to avoid oscillation
if len(self.last_actions[0]) >= self.max_consecutive_same_action:
# Check for too many consecutive identical actions
if all(a == 0 for a in self.last_actions[0][-self.max_consecutive_same_action:]):
# Too many consecutive SELL - reduce sell probability
action_probs_np[:, 0] *= 0.7
elif all(a == 2 for a in self.last_actions[0][-self.max_consecutive_same_action:]):
# Too many consecutive BUY - reduce buy probability
action_probs_np[:, 2] *= 0.7
# Apply confidence threshold to reduce noise
max_probs = np.max(action_probs_np, axis=1)
for i in range(len(action_probs_np)):
if max_probs[i] < self.confidence_threshold:
# If confidence is too low, force HOLD
action_probs_np[i] = np.array([0.1, 0.8, 0.1])
# Re-normalize
action_probs_np = action_probs_np / action_probs_np.sum(axis=1, keepdims=True)
# Store the predicted action for the most recent input
if action_probs_np.shape[0] > 0:
latest_action = np.argmax(action_probs_np[-1])
self.last_actions[0].append(int(latest_action))
# Keep only the most recent actions
self.last_actions[0] = self.last_actions[0][-10:] # Store last 10 actions
# Update action counts for stats
actions = np.argmax(action_probs_np, axis=1)
unique, counts = np.unique(actions, return_counts=True)
action_dict = dict(zip(unique, counts))
if 0 in action_dict:
self.action_counts['SELL'][0] += action_dict[0]
if 1 in action_dict:
self.action_counts['HOLD'][0] += action_dict[1]
if 2 in action_dict:
self.action_counts['BUY'][0] += action_dict[2]
# Get the current close prices from the input
current_prices = X_tensor[:, -1, 3].cpu().numpy() if X_tensor.shape[2] > 3 else np.zeros(X_tensor.shape[0])
# Calculate price directions based on probabilities
price_directions = action_probs_np[:, 2] - action_probs_np[:, 0] # BUY - SELL
# Scale the price change based on signal strength
price_preds = current_prices * (1 + price_directions * 0.002)
return action_probs_np, price_preds.reshape(-1, 1)
def predict_next_candles(self, X, n_candles=3):
"""
Predict the next n candles with focus on short-term signals.
Args:
X: Input data of shape [batch_size, window_size, features]
n_candles: Number of future candles to predict
Returns:
Dictionary of predictions for each timeframe
"""
self.model.eval()
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
with torch.no_grad():
# Get initial predictions
action_probs, price_pred = self.model(X_tensor)
action_probs_np = action_probs.cpu().numpy()
# Apply more aggressive processing for short-term signals
action_probs_np[:, 1] *= 0.5 # Reduce HOLD
action_probs_np[:, 0] *= 1.3 # Boost SELL
action_probs_np[:, 2] *= 1.3 # Boost BUY
# Re-normalize
action_probs_np = action_probs_np / action_probs_np.sum(axis=1, keepdims=True)
# For short-term predictions, implement decay of signal over time
# First candle: full signal, then gradually decay
predictions = {}
for i, tf in enumerate(self.timeframes):
tf_preds = np.zeros((n_candles, action_probs_np.shape[0], 3))
for j in range(n_candles):
# Apply decay factor to move signals toward HOLD over time
# (short-term signals shouldn't persist too long)
decay_factor = max(0.1, 1.0 - j * 0.3)
# First, move probabilities toward HOLD with decay
decayed_probs = action_probs_np.copy()
decayed_probs[:, 0] = action_probs_np[:, 0] * decay_factor # Decay SELL
decayed_probs[:, 2] = action_probs_np[:, 2] * decay_factor # Decay BUY
# Increase HOLD probability to compensate
hold_increase = (1.0 - decay_factor) * (action_probs_np[:, 0] + action_probs_np[:, 2])
decayed_probs[:, 1] = action_probs_np[:, 1] + hold_increase
# Re-normalize
decayed_probs = decayed_probs / decayed_probs.sum(axis=1, keepdims=True)
# Store in predictions array
tf_preds[j] = decayed_probs
# Store in output dictionary
predictions[tf] = tf_preds
return predictions
def train(self, X_train, y_train, X_val=None, y_val=None, batch_size=32, epochs=100):
"""
Train the CNN model.
Args:
X_train: Training input data
y_train: Training target data
X_val: Validation input data
y_val: Validation target data
batch_size: Batch size for training
epochs: Number of training epochs
Returns:
Training history
"""
logger.info(f"Training PyTorch CNN model with {len(X_train)} samples, "
f"batch_size={batch_size}, epochs={epochs}")
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(self.device)
# Handle different output sizes for y_train
if self.output_size == 1:
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(self.device)
else:
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(self.device)
# Create DataLoader for training data
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create DataLoader for validation data if provided
if X_val is not None and y_val is not None:
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(self.device)
if self.output_size == 1:
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(self.device)
else:
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(self.device)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
else:
val_loader = None
# Training loop
for epoch in range(epochs):
# Training phase
self.model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in train_loader:
# Zero the parameter gradients
self.optimizer.zero_grad()
# Forward pass
action_probs, price_pred = self.model(inputs)
# Calculate loss
if self.output_size == 1:
loss = self.criterion(action_probs, targets.unsqueeze(1))
else:
loss = self.criterion(action_probs, targets)
# Backward pass and optimize
loss.backward()
self.optimizer.step()
# Statistics
running_loss += loss.item()
_, predicted = torch.max(action_probs, 1)
total += targets.size(0)
correct += (predicted == targets).sum().item()
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct / total if total > 0 else 0
# Validation phase
if val_loader is not None:
val_loss, val_acc = self.evaluate(X_val, y_val)
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f} - "
f"val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f}")
# Update history
self.train_losses.append(epoch_loss)
self.train_accuracies.append(epoch_acc)
self.val_losses.append(val_loss)
self.val_accuracies.append(val_acc)
else:
logger.info(f"Epoch {epoch+1}/{epochs} - "
f"loss: {epoch_loss:.4f} - acc: {epoch_acc:.4f}")
# Update history without validation
self.train_losses.append(epoch_loss)
self.train_accuracies.append(epoch_acc)
logger.info("Training completed")
return {
'loss': self.train_losses,
'accuracy': self.train_accuracies,
'val_loss': self.val_losses,
'val_accuracy': self.val_accuracies
}
def evaluate_metrics(self, X_test, y_test):
"""
Calculate and return comprehensive evaluation metrics as dict
"""
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(self.device)
self.model.eval()
with torch.no_grad():
y_pred = self.model(X_test_tensor)
if self.output_size > 1:
_, y_pred_class = torch.max(y_pred, 1)
y_pred_class = y_pred_class.cpu().numpy()
else:
y_pred_class = (y_pred.cpu().numpy() > 0.5).astype(int).flatten()
metrics = {
'accuracy': accuracy_score(y_test, y_pred_class),
'precision': precision_score(y_test, y_pred_class, average='weighted', zero_division=0),
'recall': recall_score(y_test, y_pred_class, average='weighted', zero_division=0),
'f1_score': f1_score(y_test, y_pred_class, average='weighted', zero_division=0)
}
return metrics
def save(self, filepath):
"""
Save the model to a file with trading configuration.
Args:
filepath: Path to save the model
"""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# Save the model state with additional trading parameters
model_state = {
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'history': {
'loss': self.train_losses,
'accuracy': self.train_accuracies,
'val_loss': self.val_losses,
'val_accuracy': self.val_accuracies
},
'window_size': self.window_size,
'num_features': self.total_features,
'output_size': self.output_size,
'timeframes': self.timeframes,
# Save trading configuration
'confidence_threshold': self.confidence_threshold,
'max_consecutive_same_action': self.max_consecutive_same_action,
'action_counts': self.action_counts,
'last_actions': self.last_actions,
# Save model version information
'model_version': 'short_term_optimized_v1.0',
'timestamp': datetime.now().strftime('%Y%m%d_%H%M%S')
}
torch.save(model_state, f"{filepath}.pt")
logger.info(f"Model saved to {filepath}.pt with short-term trading optimizations")
# Save a backup of the model periodically
if not os.path.exists(f"{filepath}_backup"):
os.makedirs(f"{filepath}_backup", exist_ok=True)
backup_path = os.path.join(f"{filepath}_backup", f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pt")
torch.save(model_state, backup_path)
logger.info(f"Backup saved to {backup_path}")
def load(self, filepath):
"""Load model weights from file"""
if not os.path.exists(f"{filepath}.pt"):
logger.error(f"Model file {filepath}.pt not found")
return False
try:
# Load the model state
model_state = torch.load(f"{filepath}.pt", map_location=self.device)
# Update model parameters
self.window_size = model_state['window_size']
self.total_features = model_state['num_features']
self.output_size = model_state['output_size']
self.timeframes = model_state.get('timeframes', ["1m"])
# Load model state dict
self.model.load_state_dict(model_state['model_state_dict'])
# Load optimizer state if available
if 'optimizer_state_dict' in model_state:
self.optimizer.load_state_dict(model_state['optimizer_state_dict'])
# Load trading configuration if available
if 'confidence_threshold' in model_state:
self.confidence_threshold = model_state['confidence_threshold']
if 'max_consecutive_same_action' in model_state:
self.max_consecutive_same_action = model_state['max_consecutive_same_action']
# Log model version information if available
if 'model_version' in model_state:
logger.info(f"Model version: {model_state['model_version']}")
if 'timestamp' in model_state:
logger.info(f"Model timestamp: {model_state['timestamp']}")
return True
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
return False
def plot_training_history(self, metrics_file="NN/models/saved/training_metrics.json"):
"""
Plot training history from saved metrics.
Args:
metrics_file: Path to the saved metrics JSON file
"""
try:
import json
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
# Load metrics
with open(metrics_file, 'r') as f:
metrics = json.load(f)
# Create plots directory
plots_dir = os.path.join(os.path.dirname(metrics_file), 'plots')
os.makedirs(plots_dir, exist_ok=True)
# Convert timestamps to datetime objects
timestamps = [datetime.fromisoformat(ts) for ts in metrics['timestamps']]
# 1. Plot Loss and Accuracy
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
# Loss plot
ax1.plot(timestamps, metrics['train_loss'], 'b-', label='Training Loss')
ax1.plot(timestamps, metrics['val_loss'], 'r-', label='Validation Loss')
ax1.set_title('Model Loss Over Time')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)
# Accuracy plot
ax2.plot(timestamps, metrics['train_acc'], 'g-', label='Training Accuracy')
ax2.plot(timestamps, metrics['val_acc'], 'm-', label='Validation Accuracy')
ax2.set_title('Model Accuracy Over Time')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True)
# Format x-axis
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
plt.xticks(rotation=45)
# Save the plot
plt.tight_layout()
plt.savefig(os.path.join(plots_dir, 'loss_accuracy.png'))
plt.close()
# 2. Plot PnL and Win Rate
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), sharex=True)
# PnL plot
ax1.plot(timestamps, metrics['train_pnl'], 'g-', label='Training PnL')
ax1.plot(timestamps, metrics['val_pnl'], 'r-', label='Validation PnL')
ax1.set_title('PnL Over Time')
ax1.set_ylabel('PnL')
ax1.legend()
ax1.grid(True)
# Win Rate plot
ax2.plot(timestamps, metrics['train_win_rate'], 'b-', label='Training Win Rate')
ax2.plot(timestamps, metrics['val_win_rate'], 'm-', label='Validation Win Rate')
ax2.set_title('Win Rate Over Time')
ax2.set_ylabel('Win Rate')
ax2.legend()
ax2.grid(True)
# Format x-axis
ax2.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
plt.xticks(rotation=45)
# Save the plot
plt.tight_layout()
plt.savefig(os.path.join(plots_dir, 'pnl_winrate.png'))
plt.close()
print(f"Performance visualizations saved to {plots_dir}")
return True
except Exception as e:
print(f"Error generating plots: {str(e)}")
import traceback
print(traceback.format_exc())
return False
def extract_hidden_features(self, X):
"""
Extract hidden features from the model - outputs from last dense layer before output.
Args:
X: Input data
Returns:
Hidden features (output from penultimate dense layer)
"""
# Convert to PyTorch tensor
X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device)
# Forward pass through the model
self.model.eval()
with torch.no_grad():
# Get features through CNN layers
x_t = X_tensor.transpose(1, 2)
conv_out = self.model.conv_layers(x_t)
# Process through all dense layers except the output layer
features = conv_out
for layer in self.model.dense_block[:-2]: # Exclude last linear layer and dropout
features = layer(features)
return features.cpu().numpy()