gogo2/training/cnn_trainer.py
2025-05-24 02:42:11 +03:00

519 lines
19 KiB
Python

"""
CNN Training Pipeline - Scalping Pattern Recognition
Comprehensive training pipeline for multi-timeframe CNN models:
- Automated data generation and preprocessing
- Training with validation and early stopping
- Memory-efficient batch processing
- Model evaluation and metrics
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import logging
from typing import Dict, List, Tuple, Optional
import time
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
# Add project imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.config import get_config
from core.data_provider import DataProvider
from models.cnn.scalping_cnn import MultiTimeframeCNN, ScalpingDataGenerator
logger = logging.getLogger(__name__)
class TradingDataset(Dataset):
"""PyTorch dataset for trading data"""
def __init__(self, features: np.ndarray, labels: np.ndarray, metadata: Optional[Dict] = None):
self.features = torch.FloatTensor(features)
self.labels = torch.FloatTensor(labels)
self.metadata = metadata or {}
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
class CNNTrainer:
"""
CNN Training Pipeline for Scalping
"""
def __init__(self, data_provider: DataProvider, config: Optional[Dict] = None):
self.data_provider = data_provider
self.config = config or get_config()
# Training parameters
self.learning_rate = 1e-4
self.batch_size = 64
self.num_epochs = 100
self.patience = 15
self.validation_split = 0.2
# Data parameters
self.timeframes = ['1s', '1m', '5m', '1h']
self.window_size = 20
self.num_samples = 20000
# Model parameters
self.n_timeframes = len(self.timeframes)
self.n_features = 26 # Number of technical indicators
self.n_classes = 3 # BUY, SELL, HOLD
# Device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Initialize data generator
self.data_generator = ScalpingDataGenerator(data_provider, self.window_size)
# Training state
self.model = None
self.train_losses = []
self.val_losses = []
self.train_accuracies = []
self.val_accuracies = []
logger.info(f"CNNTrainer initialized with {self.n_timeframes} timeframes, {self.n_features} features")
def prepare_data(self, symbols: List[str]) -> Tuple[DataLoader, DataLoader, Dict]:
"""Prepare training and validation data"""
logger.info("Preparing training data...")
all_features = []
all_labels = []
all_metadata = {'symbols': []}
# Generate data for each symbol
for symbol in symbols:
logger.info(f"Generating data for {symbol}...")
features, labels, metadata = self.data_generator.generate_training_cases(
symbol, self.timeframes, self.num_samples // len(symbols)
)
if features is not None and labels is not None:
all_features.append(features)
all_labels.append(labels)
all_metadata['symbols'].extend([symbol] * len(features))
logger.info(f"Generated {len(features)} samples for {symbol}")
# Update feature count based on actual data
if len(all_features) == 1:
actual_features = features.shape[-1]
if actual_features != self.n_features:
logger.info(f"Updating feature count from {self.n_features} to {actual_features}")
self.n_features = actual_features
else:
logger.warning(f"No data generated for {symbol}")
if not all_features:
raise ValueError("No training data generated")
# Combine all data
combined_features = np.concatenate(all_features, axis=0)
combined_labels = np.concatenate(all_labels, axis=0)
logger.info(f"Total dataset: {len(combined_features)} samples")
logger.info(f"Features shape: {combined_features.shape}")
logger.info(f"Labels shape: {combined_labels.shape}")
# Split into train/validation
X_train, X_val, y_train, y_val = train_test_split(
combined_features, combined_labels,
test_size=self.validation_split,
stratify=np.argmax(combined_labels, axis=1),
random_state=42
)
# Create datasets
train_dataset = TradingDataset(X_train, y_train)
val_dataset = TradingDataset(X_val, y_val)
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=self.batch_size,
shuffle=True,
num_workers=0, # Set to 0 to avoid multiprocessing issues
pin_memory=True if torch.cuda.is_available() else False
)
val_loader = DataLoader(
val_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=0,
pin_memory=True if torch.cuda.is_available() else False
)
# Prepare metadata for return
dataset_info = {
'train_size': len(train_dataset),
'val_size': len(val_dataset),
'feature_shape': combined_features.shape[1:],
'label_distribution': {
'train': np.bincount(np.argmax(y_train, axis=1)),
'val': np.bincount(np.argmax(y_val, axis=1))
}
}
logger.info(f"Train samples: {dataset_info['train_size']}")
logger.info(f"Validation samples: {dataset_info['val_size']}")
logger.info(f"Train label distribution: {dataset_info['label_distribution']['train']}")
logger.info(f"Val label distribution: {dataset_info['label_distribution']['val']}")
return train_loader, val_loader, dataset_info
def create_model(self) -> MultiTimeframeCNN:
"""Create and initialize the CNN model"""
model = MultiTimeframeCNN(
n_timeframes=self.n_timeframes,
window_size=self.window_size,
n_features=self.n_features,
n_classes=self.n_classes
)
model.to(self.device)
# Log model info
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
logger.info(f"Model created with {total_params:,} total parameters")
logger.info(f"Trainable parameters: {trainable_params:,}")
logger.info(f"Estimated memory usage: {model.get_memory_usage()}MB")
return model
def train_epoch(self, model: nn.Module, train_loader: DataLoader,
optimizer: optim.Optimizer, criterion: nn.Module) -> Tuple[float, float]:
"""Train for one epoch"""
model.train()
total_loss = 0.0
correct_predictions = 0
total_predictions = 0
for batch_idx, (features, labels) in enumerate(train_loader):
features = features.to(self.device)
labels = labels.to(self.device)
# Zero gradients
optimizer.zero_grad()
# Forward pass
predictions = model(features)
# Calculate loss (multi-task loss)
action_loss = criterion(predictions['action'], labels)
# Additional losses for auxiliary tasks
confidence_loss = torch.mean(torch.abs(predictions['confidence'] - 0.5)) # Encourage diversity
# Total loss
total_loss_batch = action_loss + 0.1 * confidence_loss
# Backward pass
total_loss_batch.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Update weights
optimizer.step()
# Track metrics
total_loss += total_loss_batch.item()
# Calculate accuracy
pred_classes = torch.argmax(predictions['action'], dim=1)
true_classes = torch.argmax(labels, dim=1)
correct_predictions += (pred_classes == true_classes).sum().item()
total_predictions += labels.size(0)
# Log progress
if batch_idx % 100 == 0:
logger.debug(f"Batch {batch_idx}/{len(train_loader)}, Loss: {total_loss_batch.item():.4f}")
avg_loss = total_loss / len(train_loader)
accuracy = correct_predictions / total_predictions
return avg_loss, accuracy
def validate_epoch(self, model: nn.Module, val_loader: DataLoader,
criterion: nn.Module) -> Tuple[float, float, Dict]:
"""Validate for one epoch"""
model.eval()
total_loss = 0.0
correct_predictions = 0
total_predictions = 0
all_predictions = []
all_labels = []
all_confidences = []
with torch.no_grad():
for features, labels in val_loader:
features = features.to(self.device)
labels = labels.to(self.device)
# Forward pass
predictions = model(features)
# Calculate loss
loss = criterion(predictions['action'], labels)
total_loss += loss.item()
# Track predictions
pred_classes = torch.argmax(predictions['action'], dim=1)
true_classes = torch.argmax(labels, dim=1)
correct_predictions += (pred_classes == true_classes).sum().item()
total_predictions += labels.size(0)
# Store for detailed analysis
all_predictions.extend(pred_classes.cpu().numpy())
all_labels.extend(true_classes.cpu().numpy())
all_confidences.extend(predictions['confidence'].cpu().numpy())
avg_loss = total_loss / len(val_loader)
accuracy = correct_predictions / total_predictions
# Additional metrics
metrics = {
'predictions': np.array(all_predictions),
'labels': np.array(all_labels),
'confidences': np.array(all_confidences),
'accuracy_by_class': {},
'avg_confidence': np.mean(all_confidences)
}
# Calculate per-class accuracy
for class_idx in range(self.n_classes):
class_mask = metrics['labels'] == class_idx
if np.sum(class_mask) > 0:
class_accuracy = np.mean(metrics['predictions'][class_mask] == metrics['labels'][class_mask])
metrics['accuracy_by_class'][class_idx] = class_accuracy
return avg_loss, accuracy, metrics
def train(self, symbols: List[str], save_path: Optional[str] = None) -> Dict:
"""Train the CNN model"""
logger.info("Starting CNN training...")
# Prepare data first to get actual feature count
train_loader, val_loader, dataset_info = self.prepare_data(symbols)
# Create model with correct feature count
self.model = self.create_model()
# Setup training
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
# Training state
best_val_loss = float('inf')
best_val_accuracy = 0.0
patience_counter = 0
start_time = time.time()
# Training loop
for epoch in range(self.num_epochs):
epoch_start_time = time.time()
# Train
train_loss, train_accuracy = self.train_epoch(
self.model, train_loader, optimizer, criterion
)
# Validate
val_loss, val_accuracy, val_metrics = self.validate_epoch(
self.model, val_loader, criterion
)
# Update learning rate
scheduler.step(val_loss)
# Track metrics
self.train_losses.append(train_loss)
self.val_losses.append(val_loss)
self.train_accuracies.append(train_accuracy)
self.val_accuracies.append(val_accuracy)
# Check for improvement
if val_loss < best_val_loss:
best_val_loss = val_loss
best_val_accuracy = val_accuracy
patience_counter = 0
# Save best model
if save_path:
best_path = save_path.replace('.pt', '_best.pt')
self.model.save(best_path)
logger.info(f"New best model saved: {best_path}")
else:
patience_counter += 1
# Log progress
epoch_time = time.time() - epoch_start_time
logger.info(
f"Epoch {epoch+1}/{self.num_epochs} - "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f} - "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f} - "
f"Time: {epoch_time:.2f}s"
)
# Detailed validation metrics every 10 epochs
if (epoch + 1) % 10 == 0:
logger.info(f"Class accuracies: {val_metrics['accuracy_by_class']}")
logger.info(f"Average confidence: {val_metrics['avg_confidence']:.4f}")
# Early stopping
if patience_counter >= self.patience:
logger.info(f"Early stopping triggered after {epoch+1} epochs")
break
# Training complete
total_time = time.time() - start_time
logger.info(f"Training completed in {total_time:.2f} seconds")
logger.info(f"Best validation loss: {best_val_loss:.4f}")
logger.info(f"Best validation accuracy: {best_val_accuracy:.4f}")
# Save final model
if save_path:
self.model.save(save_path)
logger.info(f"Final model saved: {save_path}")
# Prepare training results
results = {
'best_val_loss': best_val_loss,
'best_val_accuracy': best_val_accuracy,
'total_epochs': epoch + 1,
'total_time': total_time,
'train_losses': self.train_losses,
'val_losses': self.val_losses,
'train_accuracies': self.train_accuracies,
'val_accuracies': self.val_accuracies,
'dataset_info': dataset_info,
'final_metrics': val_metrics
}
return results
def evaluate_model(self, test_symbols: List[str]) -> Dict:
"""Evaluate trained model on test data"""
if self.model is None:
raise ValueError("Model not trained yet")
logger.info("Evaluating model...")
# Generate test data
test_features = []
test_labels = []
for symbol in test_symbols:
features, labels, _ = self.data_generator.generate_training_cases(
symbol, self.timeframes, 5000
)
if features is not None:
test_features.append(features)
test_labels.append(labels)
if not test_features:
raise ValueError("No test data generated")
test_features = np.concatenate(test_features, axis=0)
test_labels = np.concatenate(test_labels, axis=0)
# Create test loader
test_dataset = TradingDataset(test_features, test_labels)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
# Evaluate
criterion = nn.CrossEntropyLoss()
test_loss, test_accuracy, test_metrics = self.validate_epoch(
self.model, test_loader, criterion
)
# Generate classification report
class_names = ['BUY', 'SELL', 'HOLD']
classification_rep = classification_report(
test_metrics['labels'],
test_metrics['predictions'],
target_names=class_names,
output_dict=True
)
# Confusion matrix
conf_matrix = confusion_matrix(
test_metrics['labels'],
test_metrics['predictions']
)
evaluation_results = {
'test_loss': test_loss,
'test_accuracy': test_accuracy,
'classification_report': classification_rep,
'confusion_matrix': conf_matrix,
'class_accuracies': test_metrics['accuracy_by_class'],
'avg_confidence': test_metrics['avg_confidence']
}
logger.info(f"Test accuracy: {test_accuracy:.4f}")
logger.info(f"Test loss: {test_loss:.4f}")
return evaluation_results
def plot_training_history(self, save_path: Optional[str] = None):
"""Plot training history"""
if not self.train_losses:
logger.warning("No training history to plot")
return
fig, ((ax1, ax2)) = plt.subplots(1, 2, figsize=(12, 4))
# Loss plot
epochs = range(1, len(self.train_losses) + 1)
ax1.plot(epochs, self.train_losses, 'b-', label='Training Loss')
ax1.plot(epochs, self.val_losses, 'r-', label='Validation Loss')
ax1.set_title('Training and Validation Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)
# Accuracy plot
ax2.plot(epochs, self.train_accuracies, 'b-', label='Training Accuracy')
ax2.plot(epochs, self.val_accuracies, 'r-', label='Validation Accuracy')
ax2.set_title('Training and Validation Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Training history plot saved: {save_path}")
plt.show()
# Export
__all__ = ['CNNTrainer', 'TradingDataset']