This commit is contained in:
Dobromir Popov
2025-05-24 09:58:36 +03:00
parent ef71160282
commit 0fe8286787
11 changed files with 1396 additions and 483 deletions

View File

@ -1,31 +1,23 @@
"""
CNN Training Pipeline - Scalping Pattern Recognition
CNN Training Pipeline
Comprehensive training pipeline for multi-timeframe CNN models:
- Automated data generation and preprocessing
- Training with validation and early stopping
- Memory-efficient batch processing
- Model evaluation and metrics
This module handles training of the CNN model using ONLY real market data.
All training metrics are logged to TensorBoard for real-time monitoring.
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import pandas as pd
import logging
from typing import Dict, List, Tuple, Optional
import time
from pathlib import Path
import time
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
# Add project imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
from core.config import get_config
from core.data_provider import DataProvider
@ -33,13 +25,12 @@ from models.cnn.scalping_cnn import MultiTimeframeCNN, ScalpingDataGenerator
logger = logging.getLogger(__name__)
class TradingDataset(Dataset):
"""PyTorch dataset for trading data"""
class CNNDataset(Dataset):
"""Dataset for CNN training with real market data"""
def __init__(self, features: np.ndarray, labels: np.ndarray, metadata: Optional[Dict] = None):
def __init__(self, features: np.ndarray, labels: np.ndarray):
self.features = torch.FloatTensor(features)
self.labels = torch.FloatTensor(labels)
self.metadata = metadata or {}
self.labels = torch.LongTensor(np.argmax(labels, axis=1)) # Convert one-hot to class indices
def __len__(self):
return len(self.features)
@ -48,431 +39,437 @@ class TradingDataset(Dataset):
return self.features[idx], self.labels[idx]
class CNNTrainer:
"""
CNN Training Pipeline for Scalping
"""
"""CNN Trainer using ONLY real market data with TensorBoard monitoring"""
def __init__(self, data_provider: DataProvider, config: Optional[Dict] = None):
self.data_provider = data_provider
def __init__(self, config: Optional[Dict] = None):
"""Initialize CNN trainer"""
self.config = config or get_config()
# Training parameters
self.learning_rate = 1e-4
self.batch_size = 64
self.num_epochs = 100
self.patience = 15
self.validation_split = 0.2
# Data parameters
self.timeframes = ['1s', '1m', '5m', '1h']
self.window_size = 20
self.num_samples = 20000
# Model parameters
self.n_timeframes = len(self.timeframes)
self.n_features = 26 # Number of technical indicators
self.n_classes = 3 # BUY, SELL, HOLD
# Device
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Initialize data generator
self.data_generator = ScalpingDataGenerator(data_provider, self.window_size)
# Training parameters
self.learning_rate = self.config.training.get('learning_rate', 0.001)
self.batch_size = self.config.training.get('batch_size', 32)
self.epochs = self.config.training.get('epochs', 100)
self.validation_split = self.config.training.get('validation_split', 0.2)
self.early_stopping_patience = self.config.training.get('early_stopping_patience', 10)
# Training state
# Model parameters - will be updated based on real data
self.n_timeframes = len(self.config.timeframes)
self.window_size = self.config.cnn.get('window_size', 20)
self.n_features = self.config.cnn.get('features', 26) # Will be dynamically updated
self.n_classes = 3 # BUY, SELL, HOLD
# Initialize components
self.data_provider = DataProvider(self.config)
self.data_generator = ScalpingDataGenerator(self.data_provider, self.window_size)
self.model = None
self.train_losses = []
self.val_losses = []
self.train_accuracies = []
self.val_accuracies = []
# TensorBoard setup
self.setup_tensorboard()
logger.info(f"CNNTrainer initialized with {self.n_timeframes} timeframes, {self.n_features} features")
logger.info("Will use ONLY real market data for training")
def prepare_data(self, symbols: List[str]) -> Tuple[DataLoader, DataLoader, Dict]:
"""Prepare training and validation data"""
logger.info("Preparing training data...")
def setup_tensorboard(self):
"""Setup TensorBoard logging"""
# Create tensorboard logs directory
log_dir = Path("runs") / f"cnn_training_{int(time.time())}"
log_dir.mkdir(parents=True, exist_ok=True)
all_features = []
all_labels = []
all_metadata = {'symbols': []}
self.writer = SummaryWriter(log_dir=str(log_dir))
self.tensorboard_dir = log_dir
# Generate data for each symbol
for symbol in symbols:
logger.info(f"Generating data for {symbol}...")
features, labels, metadata = self.data_generator.generate_training_cases(
symbol, self.timeframes, self.num_samples // len(symbols)
)
if features is not None and labels is not None:
all_features.append(features)
all_labels.append(labels)
all_metadata['symbols'].extend([symbol] * len(features))
logger.info(f"Generated {len(features)} samples for {symbol}")
# Update feature count based on actual data
if len(all_features) == 1:
actual_features = features.shape[-1]
if actual_features != self.n_features:
logger.info(f"Updating feature count from {self.n_features} to {actual_features}")
self.n_features = actual_features
else:
logger.warning(f"No data generated for {symbol}")
if not all_features:
raise ValueError("No training data generated")
# Combine all data
combined_features = np.concatenate(all_features, axis=0)
combined_labels = np.concatenate(all_labels, axis=0)
logger.info(f"Total dataset: {len(combined_features)} samples")
logger.info(f"Features shape: {combined_features.shape}")
logger.info(f"Labels shape: {combined_labels.shape}")
# Split into train/validation
X_train, X_val, y_train, y_val = train_test_split(
combined_features, combined_labels,
test_size=self.validation_split,
stratify=np.argmax(combined_labels, axis=1),
random_state=42
)
# Create datasets
train_dataset = TradingDataset(X_train, y_train)
val_dataset = TradingDataset(X_val, y_val)
# Create data loaders
train_loader = DataLoader(
train_dataset,
batch_size=self.batch_size,
shuffle=True,
num_workers=0, # Set to 0 to avoid multiprocessing issues
pin_memory=True if torch.cuda.is_available() else False
)
val_loader = DataLoader(
val_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=0,
pin_memory=True if torch.cuda.is_available() else False
)
# Prepare metadata for return
dataset_info = {
'train_size': len(train_dataset),
'val_size': len(val_dataset),
'feature_shape': combined_features.shape[1:],
'label_distribution': {
'train': np.bincount(np.argmax(y_train, axis=1)),
'val': np.bincount(np.argmax(y_val, axis=1))
}
}
logger.info(f"Train samples: {dataset_info['train_size']}")
logger.info(f"Validation samples: {dataset_info['val_size']}")
logger.info(f"Train label distribution: {dataset_info['label_distribution']['train']}")
logger.info(f"Val label distribution: {dataset_info['label_distribution']['val']}")
return train_loader, val_loader, dataset_info
logger.info(f"TensorBoard logging to: {log_dir}")
logger.info(f"Run: tensorboard --logdir=runs")
def log_model_architecture(self):
"""Log model architecture to TensorBoard"""
if self.model is not None:
# Log model graph (requires a dummy input)
dummy_input = torch.randn(1, self.n_timeframes, self.window_size, self.n_features).to(self.device)
try:
self.writer.add_graph(self.model, dummy_input)
logger.info("Model architecture logged to TensorBoard")
except Exception as e:
logger.warning(f"Could not log model graph: {e}")
# Log model parameters count
total_params = sum(p.numel() for p in self.model.parameters())
trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
self.writer.add_scalar('Model/TotalParameters', total_params, 0)
self.writer.add_scalar('Model/TrainableParameters', trainable_params, 0)
def create_model(self) -> MultiTimeframeCNN:
"""Create and initialize the CNN model"""
"""Create CNN model"""
model = MultiTimeframeCNN(
n_timeframes=self.n_timeframes,
window_size=self.window_size,
n_features=self.n_features,
n_classes=self.n_classes
n_classes=self.n_classes,
dropout_rate=self.config.cnn.get('dropout', 0.2)
)
model.to(self.device)
model = model.to(self.device)
# Log model info
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
memory_usage = model.get_memory_usage()
logger.info(f"Model created with {total_params:,} total parameters")
logger.info(f"Trainable parameters: {trainable_params:,}")
logger.info(f"Estimated memory usage: {model.get_memory_usage()}MB")
logger.info(f"Estimated memory usage: {memory_usage}MB")
return model
def prepare_data(self, symbols: List[str], num_samples: int = 10000) -> Tuple[np.ndarray, np.ndarray, Dict]:
"""Prepare training data from REAL market data"""
logger.info("Preparing training data...")
logger.info("Data source: REAL market data from exchange APIs")
all_features = []
all_labels = []
all_metadata = []
for symbol in symbols:
logger.info(f"Generating data for {symbol}...")
features, labels, metadata = self.data_generator.generate_training_cases(
symbol=symbol,
timeframes=self.config.timeframes,
num_samples=num_samples
)
if features is not None:
all_features.append(features)
all_labels.append(labels)
all_metadata.append(metadata)
logger.info(f"Generated {len(features)} samples for {symbol}")
# Update feature count if needed
actual_features = features.shape[-1]
if actual_features != self.n_features:
logger.info(f"Updating feature count from {self.n_features} to {actual_features}")
self.n_features = actual_features
if not all_features:
raise ValueError("No training data generated from real market data")
# Combine all data
features = np.concatenate(all_features, axis=0)
labels = np.concatenate(all_labels, axis=0)
# Log data statistics to TensorBoard
self.log_data_statistics(features, labels)
return features, labels, all_metadata
def log_data_statistics(self, features: np.ndarray, labels: np.ndarray):
"""Log data statistics to TensorBoard"""
# Dataset size
self.writer.add_scalar('Data/TotalSamples', len(features), 0)
self.writer.add_scalar('Data/Features', features.shape[-1], 0)
self.writer.add_scalar('Data/Timeframes', features.shape[1], 0)
self.writer.add_scalar('Data/WindowSize', features.shape[2], 0)
# Class distribution
class_counts = np.bincount(np.argmax(labels, axis=1))
for i, count in enumerate(class_counts):
self.writer.add_scalar(f'Data/Class_{i}_Count', count, 0)
# Feature statistics
feature_means = features.mean(axis=(0, 1, 2))
feature_stds = features.std(axis=(0, 1, 2))
for i in range(min(10, len(feature_means))): # Log first 10 features
self.writer.add_scalar(f'Data/Feature_{i}_Mean', feature_means[i], 0)
self.writer.add_scalar(f'Data/Feature_{i}_Std', feature_stds[i], 0)
def train_epoch(self, model: nn.Module, train_loader: DataLoader,
optimizer: optim.Optimizer, criterion: nn.Module) -> Tuple[float, float]:
"""Train for one epoch"""
optimizer: torch.optim.Optimizer, criterion: nn.Module, epoch: int) -> Tuple[float, float]:
"""Train for one epoch with TensorBoard logging"""
model.train()
total_loss = 0.0
correct_predictions = 0
total_predictions = 0
correct = 0
total = 0
for batch_idx, (features, labels) in enumerate(train_loader):
features = features.to(self.device)
labels = labels.to(self.device)
features, labels = features.to(self.device), labels.to(self.device)
# Zero gradients
optimizer.zero_grad()
# Forward pass
predictions = model(features)
# Calculate loss (multi-task loss)
action_loss = criterion(predictions['action'], labels)
# Additional losses for auxiliary tasks
confidence_loss = torch.mean(torch.abs(predictions['confidence'] - 0.5)) # Encourage diversity
# Total loss
total_loss_batch = action_loss + 0.1 * confidence_loss
# Backward pass
total_loss_batch.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Update weights
loss = criterion(predictions['action'], labels)
loss.backward()
optimizer.step()
# Track metrics
total_loss += total_loss_batch.item()
total_loss += loss.item()
_, predicted = torch.max(predictions['action'].data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# Calculate accuracy
pred_classes = torch.argmax(predictions['action'], dim=1)
true_classes = torch.argmax(labels, dim=1)
correct_predictions += (pred_classes == true_classes).sum().item()
total_predictions += labels.size(0)
# Log batch metrics
step = epoch * len(train_loader) + batch_idx
self.writer.add_scalar('Training/BatchLoss', loss.item(), step)
# Log progress
if batch_idx % 100 == 0:
logger.debug(f"Batch {batch_idx}/{len(train_loader)}, Loss: {total_loss_batch.item():.4f}")
if batch_idx % 50 == 0: # Log every 50 batches
batch_acc = 100. * (predicted == labels).sum().item() / labels.size(0)
self.writer.add_scalar('Training/BatchAccuracy', batch_acc, step)
# Log confidence scores
avg_confidence = predictions['confidence'].mean().item()
self.writer.add_scalar('Training/BatchConfidence', avg_confidence, step)
avg_loss = total_loss / len(train_loader)
accuracy = correct_predictions / total_predictions
epoch_loss = total_loss / len(train_loader)
epoch_accuracy = correct / total
return avg_loss, accuracy
return epoch_loss, epoch_accuracy
def validate_epoch(self, model: nn.Module, val_loader: DataLoader,
criterion: nn.Module) -> Tuple[float, float, Dict]:
"""Validate for one epoch"""
criterion: nn.Module, epoch: int) -> Tuple[float, float, Dict]:
"""Validate for one epoch with TensorBoard logging"""
model.eval()
total_loss = 0.0
correct_predictions = 0
total_predictions = 0
correct = 0
total = 0
all_predictions = []
all_labels = []
all_confidences = []
with torch.no_grad():
for features, labels in val_loader:
features = features.to(self.device)
labels = labels.to(self.device)
features, labels = features.to(self.device), labels.to(self.device)
# Forward pass
predictions = model(features)
# Calculate loss
loss = criterion(predictions['action'], labels)
total_loss += loss.item()
_, predicted = torch.max(predictions['action'].data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# Track predictions
pred_classes = torch.argmax(predictions['action'], dim=1)
true_classes = torch.argmax(labels, dim=1)
correct_predictions += (pred_classes == true_classes).sum().item()
total_predictions += labels.size(0)
# Store for detailed analysis
all_predictions.extend(pred_classes.cpu().numpy())
all_labels.extend(true_classes.cpu().numpy())
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_confidences.extend(predictions['confidence'].cpu().numpy())
avg_loss = total_loss / len(val_loader)
accuracy = correct_predictions / total_predictions
epoch_loss = total_loss / len(val_loader)
epoch_accuracy = correct / total
# Additional metrics
metrics = {
'predictions': np.array(all_predictions),
'labels': np.array(all_labels),
'confidences': np.array(all_confidences),
'accuracy_by_class': {},
'avg_confidence': np.mean(all_confidences)
}
# Calculate detailed metrics
metrics = self.calculate_detailed_metrics(all_predictions, all_labels, all_confidences)
# Calculate per-class accuracy
for class_idx in range(self.n_classes):
class_mask = metrics['labels'] == class_idx
if np.sum(class_mask) > 0:
class_accuracy = np.mean(metrics['predictions'][class_mask] == metrics['labels'][class_mask])
metrics['accuracy_by_class'][class_idx] = class_accuracy
# Log validation metrics to TensorBoard
self.writer.add_scalar('Validation/Loss', epoch_loss, epoch)
self.writer.add_scalar('Validation/Accuracy', epoch_accuracy, epoch)
self.writer.add_scalar('Validation/AvgConfidence', metrics['avg_confidence'], epoch)
return avg_loss, accuracy, metrics
for class_idx, acc in metrics['class_accuracies'].items():
self.writer.add_scalar(f'Validation/Class_{class_idx}_Accuracy', acc, epoch)
return epoch_loss, epoch_accuracy, metrics
def train(self, symbols: List[str], save_path: Optional[str] = None) -> Dict:
"""Train the CNN model"""
def calculate_detailed_metrics(self, predictions: List, labels: List, confidences: List) -> Dict:
"""Calculate detailed training metrics"""
predictions = np.array(predictions)
labels = np.array(labels)
confidences = np.array(confidences)
# Class-wise accuracies
class_accuracies = {}
for class_idx in range(self.n_classes):
class_mask = labels == class_idx
if class_mask.sum() > 0:
class_acc = (predictions[class_mask] == labels[class_mask]).mean()
class_accuracies[class_idx] = class_acc
return {
'class_accuracies': class_accuracies,
'avg_confidence': confidences.mean(),
'confusion_matrix': confusion_matrix(labels, predictions)
}
def train(self, symbols: List[str], save_path: str = 'models/cnn/scalping_cnn_trained.pt',
num_samples: int = 10000) -> Dict:
"""Train CNN model with TensorBoard monitoring"""
logger.info("Starting CNN training...")
logger.info("Using ONLY real market data from exchange APIs")
# Prepare data first to get actual feature count
train_loader, val_loader, dataset_info = self.prepare_data(symbols)
# Prepare data
features, labels, metadata = self.prepare_data(symbols, num_samples)
# Create model with correct feature count
# Log training configuration
self.writer.add_text('Config/Symbols', str(symbols), 0)
self.writer.add_text('Config/Timeframes', str(self.config.timeframes), 0)
self.writer.add_scalar('Config/LearningRate', self.learning_rate, 0)
self.writer.add_scalar('Config/BatchSize', self.batch_size, 0)
self.writer.add_scalar('Config/MaxEpochs', self.epochs, 0)
# Create datasets
dataset = CNNDataset(features, labels)
# Split data
val_size = int(len(dataset) * self.validation_split)
train_size = len(dataset) - val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
logger.info(f"Total dataset: {len(dataset)} samples")
logger.info(f"Features shape: {features.shape}")
logger.info(f"Labels shape: {labels.shape}")
logger.info(f"Train samples: {train_size}")
logger.info(f"Validation samples: {val_size}")
# Log class distributions
train_labels = [dataset[i][1].item() for i in train_dataset.indices]
val_labels = [dataset[i][1].item() for i in val_dataset.indices]
logger.info(f"Train label distribution: {np.bincount(train_labels)}")
logger.info(f"Val label distribution: {np.bincount(val_labels)}")
# Create model
self.model = self.create_model()
self.log_model_architecture()
# Setup training
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, verbose=True)
# Training state
# Training loop
best_val_loss = float('inf')
best_val_accuracy = 0.0
patience_counter = 0
start_time = time.time()
# Training loop
for epoch in range(self.num_epochs):
epoch_start_time = time.time()
for epoch in range(self.epochs):
epoch_start = time.time()
# Train
train_loss, train_accuracy = self.train_epoch(
self.model, train_loader, optimizer, criterion
)
train_loss, train_accuracy = self.train_epoch(self.model, train_loader, optimizer, criterion, epoch)
# Validate
val_loss, val_accuracy, val_metrics = self.validate_epoch(
self.model, val_loader, criterion
)
val_loss, val_accuracy, val_metrics = self.validate_epoch(self.model, val_loader, criterion, epoch)
# Update learning rate
scheduler.step(val_loss)
current_lr = optimizer.param_groups[0]['lr']
# Track metrics
self.train_losses.append(train_loss)
self.val_losses.append(val_loss)
self.train_accuracies.append(train_accuracy)
self.val_accuracies.append(val_accuracy)
# Log epoch metrics
self.writer.add_scalar('Training/EpochLoss', train_loss, epoch)
self.writer.add_scalar('Training/EpochAccuracy', train_accuracy, epoch)
self.writer.add_scalar('Training/LearningRate', current_lr, epoch)
# Check for improvement
epoch_time = time.time() - epoch_start
self.writer.add_scalar('Training/EpochTime', epoch_time, epoch)
# Save best model
if val_loss < best_val_loss:
best_val_loss = val_loss
best_val_accuracy = val_accuracy
patience_counter = 0
# Save best model
if save_path:
best_path = save_path.replace('.pt', '_best.pt')
self.model.save(best_path)
logger.info(f"New best model saved: {best_path}")
best_path = save_path.replace('.pt', '_best.pt')
self.model.save(best_path)
logger.info(f"New best model saved: {best_path}")
# Log best metrics
self.writer.add_scalar('Best/ValidationLoss', best_val_loss, epoch)
self.writer.add_scalar('Best/ValidationAccuracy', best_val_accuracy, epoch)
else:
patience_counter += 1
# Log progress
epoch_time = time.time() - epoch_start_time
logger.info(
f"Epoch {epoch+1}/{self.num_epochs} - "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f} - "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f} - "
f"Time: {epoch_time:.2f}s"
)
logger.info(f"Epoch {epoch+1}/{self.epochs} - "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f} - "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f} - "
f"Time: {epoch_time:.2f}s")
# Detailed validation metrics every 10 epochs
# Log detailed metrics every 10 epochs
if (epoch + 1) % 10 == 0:
logger.info(f"Class accuracies: {val_metrics['accuracy_by_class']}")
logger.info(f"Class accuracies: {val_metrics['class_accuracies']}")
logger.info(f"Average confidence: {val_metrics['avg_confidence']:.4f}")
# Early stopping
if patience_counter >= self.patience:
if patience_counter >= self.early_stopping_patience:
logger.info(f"Early stopping triggered after {epoch+1} epochs")
break
# Training complete
# Training completed
total_time = time.time() - start_time
logger.info(f"Training completed in {total_time:.2f} seconds")
logger.info(f"Best validation loss: {best_val_loss:.4f}")
logger.info(f"Best validation accuracy: {best_val_accuracy:.4f}")
# Save final model
if save_path:
self.model.save(save_path)
logger.info(f"Final model saved: {save_path}")
# Log final metrics
self.writer.add_scalar('Final/TotalTrainingTime', total_time, 0)
self.writer.add_scalar('Final/TotalEpochs', epoch + 1, 0)
# Prepare training results
results = {
# Save final model
self.model.save(save_path)
logger.info(f"Final model saved: {save_path}")
# Log training summary
self.writer.add_text('Training/Summary',
f"Completed training with {len(features)} real market samples. "
f"Best validation accuracy: {best_val_accuracy:.4f}", 0)
return {
'best_val_loss': best_val_loss,
'best_val_accuracy': best_val_accuracy,
'total_epochs': epoch + 1,
'total_time': total_time,
'train_losses': self.train_losses,
'val_losses': self.val_losses,
'train_accuracies': self.train_accuracies,
'val_accuracies': self.val_accuracies,
'dataset_info': dataset_info,
'final_metrics': val_metrics
'training_time': total_time,
'tensorboard_dir': str(self.tensorboard_dir)
}
return results
def evaluate_model(self, test_symbols: List[str]) -> Dict:
def evaluate(self, symbols: List[str], num_samples: int = 5000) -> Dict:
"""Evaluate trained model on test data"""
if self.model is None:
raise ValueError("Model not trained yet")
logger.info("Evaluating model...")
# Generate test data
test_features = []
test_labels = []
# Generate test data from real market data
features, labels, metadata = self.prepare_data(symbols, num_samples)
for symbol in test_symbols:
features, labels, _ = self.data_generator.generate_training_cases(
symbol, self.timeframes, 5000
)
if features is not None:
test_features.append(features)
test_labels.append(labels)
if not test_features:
raise ValueError("No test data generated")
test_features = np.concatenate(test_features, axis=0)
test_labels = np.concatenate(test_labels, axis=0)
# Create test loader
test_dataset = TradingDataset(test_features, test_labels)
# Create test dataset and loader
test_dataset = CNNDataset(features, labels)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
# Evaluate
criterion = nn.CrossEntropyLoss()
test_loss, test_accuracy, test_metrics = self.validate_epoch(
self.model, test_loader, criterion
self.model, test_loader, criterion, epoch=0
)
# Generate classification report
# Generate detailed classification report
from sklearn.metrics import classification_report
class_names = ['BUY', 'SELL', 'HOLD']
classification_rep = classification_report(
test_metrics['labels'],
test_metrics['predictions'],
target_names=class_names,
output_dict=True
)
all_predictions = []
all_labels = []
# Confusion matrix
conf_matrix = confusion_matrix(
test_metrics['labels'],
test_metrics['predictions']
with torch.no_grad():
for features_batch, labels_batch in test_loader:
features_batch = features_batch.to(self.device)
predictions = self.model(features_batch)
_, predicted = torch.max(predictions['action'].data, 1)
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels_batch.numpy())
classification_rep = classification_report(
all_labels, all_predictions, target_names=class_names, output_dict=True
)
evaluation_results = {
'test_loss': test_loss,
'test_accuracy': test_accuracy,
'classification_report': classification_rep,
'confusion_matrix': conf_matrix,
'class_accuracies': test_metrics['accuracy_by_class'],
'avg_confidence': test_metrics['avg_confidence']
'class_accuracies': test_metrics['class_accuracies'],
'avg_confidence': test_metrics['avg_confidence'],
'confusion_matrix': test_metrics['confusion_matrix']
}
logger.info(f"Test accuracy: {test_accuracy:.4f}")
@ -480,40 +477,15 @@ class CNNTrainer:
return evaluation_results
def plot_training_history(self, save_path: Optional[str] = None):
"""Plot training history"""
if not self.train_losses:
logger.warning("No training history to plot")
return
fig, ((ax1, ax2)) = plt.subplots(1, 2, figsize=(12, 4))
# Loss plot
epochs = range(1, len(self.train_losses) + 1)
ax1.plot(epochs, self.train_losses, 'b-', label='Training Loss')
ax1.plot(epochs, self.val_losses, 'r-', label='Validation Loss')
ax1.set_title('Training and Validation Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True)
# Accuracy plot
ax2.plot(epochs, self.train_accuracies, 'b-', label='Training Accuracy')
ax2.plot(epochs, self.val_accuracies, 'r-', label='Validation Accuracy')
ax2.set_title('Training and Validation Accuracy')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Training history plot saved: {save_path}")
plt.show()
def close_tensorboard(self):
"""Close TensorBoard writer"""
if hasattr(self, 'writer'):
self.writer.close()
logger.info("TensorBoard writer closed")
def __del__(self):
"""Cleanup"""
self.close_tensorboard()
# Export
__all__ = ['CNNTrainer', 'TradingDataset']
__all__ = ['CNNTrainer', 'CNNDataset']

View File

@ -18,6 +18,7 @@ from pathlib import Path
import matplotlib.pyplot as plt
from collections import deque
import random
from torch.utils.tensorboard import SummaryWriter
# Add project imports
import sys
@ -75,8 +76,23 @@ class RLTrainer:
self.win_rates = []
self.avg_rewards = []
# TensorBoard setup
self.setup_tensorboard()
logger.info(f"RLTrainer initialized for symbols: {self.symbols}")
def setup_tensorboard(self):
"""Setup TensorBoard logging"""
# Create tensorboard logs directory
log_dir = Path("runs") / f"rl_training_{int(time.time())}"
log_dir.mkdir(parents=True, exist_ok=True)
self.writer = SummaryWriter(log_dir=str(log_dir))
self.tensorboard_dir = log_dir
logger.info(f"TensorBoard logging to: {log_dir}")
logger.info(f"Run: tensorboard --logdir=runs")
def setup_environment_and_agent(self) -> Tuple[ScalpingEnvironment, ScalpingRLAgent]:
"""Setup trading environment and RL agent"""
logger.info("Setting up environment and agent...")
@ -443,6 +459,29 @@ class RLTrainer:
plt.show()
def log_episode_metrics(self, episode: int, metrics: Dict):
"""Log episode metrics to TensorBoard"""
# Main performance metrics
self.writer.add_scalar('Episode/TotalReward', metrics['total_reward'], episode)
self.writer.add_scalar('Episode/FinalBalance', metrics['final_balance'], episode)
self.writer.add_scalar('Episode/TotalReturn', metrics['total_return'], episode)
self.writer.add_scalar('Episode/Steps', metrics['steps'], episode)
# Trading metrics
self.writer.add_scalar('Trading/TotalTrades', metrics['total_trades'], episode)
self.writer.add_scalar('Trading/WinRate', metrics['win_rate'], episode)
self.writer.add_scalar('Trading/ProfitFactor', metrics.get('profit_factor', 0), episode)
self.writer.add_scalar('Trading/MaxDrawdown', metrics.get('max_drawdown', 0), episode)
# Agent metrics
self.writer.add_scalar('Agent/Epsilon', metrics['epsilon'], episode)
self.writer.add_scalar('Agent/LearningRate', metrics.get('learning_rate', self.learning_rate), episode)
self.writer.add_scalar('Agent/MemorySize', metrics.get('memory_size', 0), episode)
# Loss metrics (if available)
if 'loss' in metrics:
self.writer.add_scalar('Agent/Loss', metrics['loss'], episode)
class HybridTrainer:
"""
Hybrid training pipeline combining CNN and RL