491 lines
20 KiB
Python
491 lines
20 KiB
Python
"""
|
|
CNN Training Pipeline
|
|
|
|
This module handles training of the CNN model using ONLY real market data.
|
|
All training metrics are logged to TensorBoard for real-time monitoring.
|
|
"""
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
from torch.utils.data import Dataset, DataLoader, random_split
|
|
from torch.utils.tensorboard import SummaryWriter
|
|
import numpy as np
|
|
import pandas as pd
|
|
import logging
|
|
from typing import Dict, List, Tuple, Optional
|
|
from pathlib import Path
|
|
import time
|
|
from sklearn.metrics import classification_report, confusion_matrix
|
|
import json
|
|
|
|
from core.config import get_config
|
|
from core.data_provider import DataProvider
|
|
from models.cnn.scalping_cnn import MultiTimeframeCNN, ScalpingDataGenerator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CNNDataset(Dataset):
|
|
"""Dataset for CNN training with real market data"""
|
|
|
|
def __init__(self, features: np.ndarray, labels: np.ndarray):
|
|
self.features = torch.FloatTensor(features)
|
|
self.labels = torch.LongTensor(np.argmax(labels, axis=1)) # Convert one-hot to class indices
|
|
|
|
def __len__(self):
|
|
return len(self.features)
|
|
|
|
def __getitem__(self, idx):
|
|
return self.features[idx], self.labels[idx]
|
|
|
|
class CNNTrainer:
|
|
"""CNN Trainer using ONLY real market data with TensorBoard monitoring"""
|
|
|
|
def __init__(self, config: Optional[Dict] = None):
|
|
"""Initialize CNN trainer"""
|
|
self.config = config or get_config()
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
# Training parameters
|
|
self.learning_rate = self.config.training.get('learning_rate', 0.001)
|
|
self.batch_size = self.config.training.get('batch_size', 32)
|
|
self.epochs = self.config.training.get('epochs', 100)
|
|
self.validation_split = self.config.training.get('validation_split', 0.2)
|
|
self.early_stopping_patience = self.config.training.get('early_stopping_patience', 10)
|
|
|
|
# Model parameters - will be updated based on real data
|
|
self.n_timeframes = len(self.config.timeframes)
|
|
self.window_size = self.config.cnn.get('window_size', 20)
|
|
self.n_features = self.config.cnn.get('features', 26) # Will be dynamically updated
|
|
self.n_classes = 3 # BUY, SELL, HOLD
|
|
|
|
# Initialize components
|
|
self.data_provider = DataProvider(self.config)
|
|
self.data_generator = ScalpingDataGenerator(self.data_provider, self.window_size)
|
|
self.model = None
|
|
|
|
# TensorBoard setup
|
|
self.setup_tensorboard()
|
|
|
|
logger.info(f"CNNTrainer initialized with {self.n_timeframes} timeframes, {self.n_features} features")
|
|
logger.info("Will use ONLY real market data for training")
|
|
|
|
def setup_tensorboard(self):
|
|
"""Setup TensorBoard logging"""
|
|
# Create tensorboard logs directory
|
|
log_dir = Path("runs") / f"cnn_training_{int(time.time())}"
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.writer = SummaryWriter(log_dir=str(log_dir))
|
|
self.tensorboard_dir = log_dir
|
|
|
|
logger.info(f"TensorBoard logging to: {log_dir}")
|
|
logger.info(f"Run: tensorboard --logdir=runs")
|
|
|
|
def log_model_architecture(self):
|
|
"""Log model architecture to TensorBoard"""
|
|
if self.model is not None:
|
|
# Log model graph (requires a dummy input)
|
|
dummy_input = torch.randn(1, self.n_timeframes, self.window_size, self.n_features).to(self.device)
|
|
try:
|
|
self.writer.add_graph(self.model, dummy_input)
|
|
logger.info("Model architecture logged to TensorBoard")
|
|
except Exception as e:
|
|
logger.warning(f"Could not log model graph: {e}")
|
|
|
|
# Log model parameters count
|
|
total_params = sum(p.numel() for p in self.model.parameters())
|
|
trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
|
|
|
|
self.writer.add_scalar('Model/TotalParameters', total_params, 0)
|
|
self.writer.add_scalar('Model/TrainableParameters', trainable_params, 0)
|
|
|
|
def create_model(self) -> MultiTimeframeCNN:
|
|
"""Create CNN model"""
|
|
model = MultiTimeframeCNN(
|
|
n_timeframes=self.n_timeframes,
|
|
window_size=self.window_size,
|
|
n_features=self.n_features,
|
|
n_classes=self.n_classes,
|
|
dropout_rate=self.config.cnn.get('dropout', 0.2)
|
|
)
|
|
|
|
model = model.to(self.device)
|
|
|
|
# Log model info
|
|
total_params = sum(p.numel() for p in model.parameters())
|
|
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
|
|
memory_usage = model.get_memory_usage()
|
|
|
|
logger.info(f"Model created with {total_params:,} total parameters")
|
|
logger.info(f"Trainable parameters: {trainable_params:,}")
|
|
logger.info(f"Estimated memory usage: {memory_usage}MB")
|
|
|
|
return model
|
|
|
|
def prepare_data(self, symbols: List[str], num_samples: int = 10000) -> Tuple[np.ndarray, np.ndarray, Dict]:
|
|
"""Prepare training data from REAL market data"""
|
|
logger.info("Preparing training data...")
|
|
logger.info("Data source: REAL market data from exchange APIs")
|
|
|
|
all_features = []
|
|
all_labels = []
|
|
all_metadata = []
|
|
|
|
for symbol in symbols:
|
|
logger.info(f"Generating data for {symbol}...")
|
|
|
|
features, labels, metadata = self.data_generator.generate_training_cases(
|
|
symbol=symbol,
|
|
timeframes=self.config.timeframes,
|
|
num_samples=num_samples
|
|
)
|
|
|
|
if features is not None:
|
|
all_features.append(features)
|
|
all_labels.append(labels)
|
|
all_metadata.append(metadata)
|
|
|
|
logger.info(f"Generated {len(features)} samples for {symbol}")
|
|
|
|
# Update feature count if needed
|
|
actual_features = features.shape[-1]
|
|
if actual_features != self.n_features:
|
|
logger.info(f"Updating feature count from {self.n_features} to {actual_features}")
|
|
self.n_features = actual_features
|
|
|
|
if not all_features:
|
|
raise ValueError("No training data generated from real market data")
|
|
|
|
# Combine all data
|
|
features = np.concatenate(all_features, axis=0)
|
|
labels = np.concatenate(all_labels, axis=0)
|
|
|
|
# Log data statistics to TensorBoard
|
|
self.log_data_statistics(features, labels)
|
|
|
|
return features, labels, all_metadata
|
|
|
|
def log_data_statistics(self, features: np.ndarray, labels: np.ndarray):
|
|
"""Log data statistics to TensorBoard"""
|
|
# Dataset size
|
|
self.writer.add_scalar('Data/TotalSamples', len(features), 0)
|
|
self.writer.add_scalar('Data/Features', features.shape[-1], 0)
|
|
self.writer.add_scalar('Data/Timeframes', features.shape[1], 0)
|
|
self.writer.add_scalar('Data/WindowSize', features.shape[2], 0)
|
|
|
|
# Class distribution
|
|
class_counts = np.bincount(np.argmax(labels, axis=1))
|
|
for i, count in enumerate(class_counts):
|
|
self.writer.add_scalar(f'Data/Class_{i}_Count', count, 0)
|
|
|
|
# Feature statistics
|
|
feature_means = features.mean(axis=(0, 1, 2))
|
|
feature_stds = features.std(axis=(0, 1, 2))
|
|
|
|
for i in range(min(10, len(feature_means))): # Log first 10 features
|
|
self.writer.add_scalar(f'Data/Feature_{i}_Mean', feature_means[i], 0)
|
|
self.writer.add_scalar(f'Data/Feature_{i}_Std', feature_stds[i], 0)
|
|
|
|
def train_epoch(self, model: nn.Module, train_loader: DataLoader,
|
|
optimizer: torch.optim.Optimizer, criterion: nn.Module, epoch: int) -> Tuple[float, float]:
|
|
"""Train for one epoch with TensorBoard logging"""
|
|
model.train()
|
|
total_loss = 0.0
|
|
correct = 0
|
|
total = 0
|
|
|
|
for batch_idx, (features, labels) in enumerate(train_loader):
|
|
features, labels = features.to(self.device), labels.to(self.device)
|
|
|
|
optimizer.zero_grad()
|
|
predictions = model(features)
|
|
loss = criterion(predictions['action'], labels)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
total_loss += loss.item()
|
|
_, predicted = torch.max(predictions['action'].data, 1)
|
|
total += labels.size(0)
|
|
correct += (predicted == labels).sum().item()
|
|
|
|
# Log batch metrics
|
|
step = epoch * len(train_loader) + batch_idx
|
|
self.writer.add_scalar('Training/BatchLoss', loss.item(), step)
|
|
|
|
if batch_idx % 50 == 0: # Log every 50 batches
|
|
batch_acc = 100. * (predicted == labels).sum().item() / labels.size(0)
|
|
self.writer.add_scalar('Training/BatchAccuracy', batch_acc, step)
|
|
|
|
# Log confidence scores
|
|
avg_confidence = predictions['confidence'].mean().item()
|
|
self.writer.add_scalar('Training/BatchConfidence', avg_confidence, step)
|
|
|
|
epoch_loss = total_loss / len(train_loader)
|
|
epoch_accuracy = correct / total
|
|
|
|
return epoch_loss, epoch_accuracy
|
|
|
|
def validate_epoch(self, model: nn.Module, val_loader: DataLoader,
|
|
criterion: nn.Module, epoch: int) -> Tuple[float, float, Dict]:
|
|
"""Validate for one epoch with TensorBoard logging"""
|
|
model.eval()
|
|
total_loss = 0.0
|
|
correct = 0
|
|
total = 0
|
|
all_predictions = []
|
|
all_labels = []
|
|
all_confidences = []
|
|
|
|
with torch.no_grad():
|
|
for features, labels in val_loader:
|
|
features, labels = features.to(self.device), labels.to(self.device)
|
|
|
|
predictions = model(features)
|
|
loss = criterion(predictions['action'], labels)
|
|
|
|
total_loss += loss.item()
|
|
_, predicted = torch.max(predictions['action'].data, 1)
|
|
total += labels.size(0)
|
|
correct += (predicted == labels).sum().item()
|
|
|
|
all_predictions.extend(predicted.cpu().numpy())
|
|
all_labels.extend(labels.cpu().numpy())
|
|
all_confidences.extend(predictions['confidence'].cpu().numpy())
|
|
|
|
epoch_loss = total_loss / len(val_loader)
|
|
epoch_accuracy = correct / total
|
|
|
|
# Calculate detailed metrics
|
|
metrics = self.calculate_detailed_metrics(all_predictions, all_labels, all_confidences)
|
|
|
|
# Log validation metrics to TensorBoard
|
|
self.writer.add_scalar('Validation/Loss', epoch_loss, epoch)
|
|
self.writer.add_scalar('Validation/Accuracy', epoch_accuracy, epoch)
|
|
self.writer.add_scalar('Validation/AvgConfidence', metrics['avg_confidence'], epoch)
|
|
|
|
for class_idx, acc in metrics['class_accuracies'].items():
|
|
self.writer.add_scalar(f'Validation/Class_{class_idx}_Accuracy', acc, epoch)
|
|
|
|
return epoch_loss, epoch_accuracy, metrics
|
|
|
|
def calculate_detailed_metrics(self, predictions: List, labels: List, confidences: List) -> Dict:
|
|
"""Calculate detailed training metrics"""
|
|
predictions = np.array(predictions)
|
|
labels = np.array(labels)
|
|
confidences = np.array(confidences)
|
|
|
|
# Class-wise accuracies
|
|
class_accuracies = {}
|
|
for class_idx in range(self.n_classes):
|
|
class_mask = labels == class_idx
|
|
if class_mask.sum() > 0:
|
|
class_acc = (predictions[class_mask] == labels[class_mask]).mean()
|
|
class_accuracies[class_idx] = class_acc
|
|
|
|
return {
|
|
'class_accuracies': class_accuracies,
|
|
'avg_confidence': confidences.mean(),
|
|
'confusion_matrix': confusion_matrix(labels, predictions)
|
|
}
|
|
|
|
def train(self, symbols: List[str], save_path: str = 'models/cnn/scalping_cnn_trained.pt',
|
|
num_samples: int = 10000) -> Dict:
|
|
"""Train CNN model with TensorBoard monitoring"""
|
|
logger.info("Starting CNN training...")
|
|
logger.info("Using ONLY real market data from exchange APIs")
|
|
|
|
# Prepare data
|
|
features, labels, metadata = self.prepare_data(symbols, num_samples)
|
|
|
|
# Log training configuration
|
|
self.writer.add_text('Config/Symbols', str(symbols), 0)
|
|
self.writer.add_text('Config/Timeframes', str(self.config.timeframes), 0)
|
|
self.writer.add_scalar('Config/LearningRate', self.learning_rate, 0)
|
|
self.writer.add_scalar('Config/BatchSize', self.batch_size, 0)
|
|
self.writer.add_scalar('Config/MaxEpochs', self.epochs, 0)
|
|
|
|
# Create datasets
|
|
dataset = CNNDataset(features, labels)
|
|
|
|
# Split data
|
|
val_size = int(len(dataset) * self.validation_split)
|
|
train_size = len(dataset) - val_size
|
|
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
|
|
|
|
# Create data loaders
|
|
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
|
|
val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
|
|
|
|
logger.info(f"Total dataset: {len(dataset)} samples")
|
|
logger.info(f"Features shape: {features.shape}")
|
|
logger.info(f"Labels shape: {labels.shape}")
|
|
logger.info(f"Train samples: {train_size}")
|
|
logger.info(f"Validation samples: {val_size}")
|
|
|
|
# Log class distributions
|
|
train_labels = [dataset[i][1].item() for i in train_dataset.indices]
|
|
val_labels = [dataset[i][1].item() for i in val_dataset.indices]
|
|
|
|
logger.info(f"Train label distribution: {np.bincount(train_labels)}")
|
|
logger.info(f"Val label distribution: {np.bincount(val_labels)}")
|
|
|
|
# Create model
|
|
self.model = self.create_model()
|
|
self.log_model_architecture()
|
|
|
|
# Setup training
|
|
criterion = nn.CrossEntropyLoss()
|
|
optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
|
|
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, verbose=True)
|
|
|
|
# Training loop
|
|
best_val_loss = float('inf')
|
|
best_val_accuracy = 0.0
|
|
patience_counter = 0
|
|
start_time = time.time()
|
|
|
|
for epoch in range(self.epochs):
|
|
epoch_start = time.time()
|
|
|
|
# Train
|
|
train_loss, train_accuracy = self.train_epoch(self.model, train_loader, optimizer, criterion, epoch)
|
|
|
|
# Validate
|
|
val_loss, val_accuracy, val_metrics = self.validate_epoch(self.model, val_loader, criterion, epoch)
|
|
|
|
# Update learning rate
|
|
scheduler.step(val_loss)
|
|
current_lr = optimizer.param_groups[0]['lr']
|
|
|
|
# Log epoch metrics
|
|
self.writer.add_scalar('Training/EpochLoss', train_loss, epoch)
|
|
self.writer.add_scalar('Training/EpochAccuracy', train_accuracy, epoch)
|
|
self.writer.add_scalar('Training/LearningRate', current_lr, epoch)
|
|
|
|
epoch_time = time.time() - epoch_start
|
|
self.writer.add_scalar('Training/EpochTime', epoch_time, epoch)
|
|
|
|
# Save best model
|
|
if val_loss < best_val_loss:
|
|
best_val_loss = val_loss
|
|
best_val_accuracy = val_accuracy
|
|
patience_counter = 0
|
|
|
|
# Save best model
|
|
best_path = save_path.replace('.pt', '_best.pt')
|
|
self.model.save(best_path)
|
|
logger.info(f"New best model saved: {best_path}")
|
|
|
|
# Log best metrics
|
|
self.writer.add_scalar('Best/ValidationLoss', best_val_loss, epoch)
|
|
self.writer.add_scalar('Best/ValidationAccuracy', best_val_accuracy, epoch)
|
|
else:
|
|
patience_counter += 1
|
|
|
|
logger.info(f"Epoch {epoch+1}/{self.epochs} - "
|
|
f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f} - "
|
|
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f} - "
|
|
f"Time: {epoch_time:.2f}s")
|
|
|
|
# Log detailed metrics every 10 epochs
|
|
if (epoch + 1) % 10 == 0:
|
|
logger.info(f"Class accuracies: {val_metrics['class_accuracies']}")
|
|
logger.info(f"Average confidence: {val_metrics['avg_confidence']:.4f}")
|
|
|
|
# Early stopping
|
|
if patience_counter >= self.early_stopping_patience:
|
|
logger.info(f"Early stopping triggered after {epoch+1} epochs")
|
|
break
|
|
|
|
# Training completed
|
|
total_time = time.time() - start_time
|
|
logger.info(f"Training completed in {total_time:.2f} seconds")
|
|
logger.info(f"Best validation loss: {best_val_loss:.4f}")
|
|
logger.info(f"Best validation accuracy: {best_val_accuracy:.4f}")
|
|
|
|
# Log final metrics
|
|
self.writer.add_scalar('Final/TotalTrainingTime', total_time, 0)
|
|
self.writer.add_scalar('Final/TotalEpochs', epoch + 1, 0)
|
|
|
|
# Save final model
|
|
self.model.save(save_path)
|
|
logger.info(f"Final model saved: {save_path}")
|
|
|
|
# Log training summary
|
|
self.writer.add_text('Training/Summary',
|
|
f"Completed training with {len(features)} real market samples. "
|
|
f"Best validation accuracy: {best_val_accuracy:.4f}", 0)
|
|
|
|
return {
|
|
'best_val_loss': best_val_loss,
|
|
'best_val_accuracy': best_val_accuracy,
|
|
'total_epochs': epoch + 1,
|
|
'training_time': total_time,
|
|
'tensorboard_dir': str(self.tensorboard_dir)
|
|
}
|
|
|
|
def evaluate(self, symbols: List[str], num_samples: int = 5000) -> Dict:
|
|
"""Evaluate trained model on test data"""
|
|
if self.model is None:
|
|
raise ValueError("Model not trained yet")
|
|
|
|
logger.info("Evaluating model...")
|
|
|
|
# Generate test data from real market data
|
|
features, labels, metadata = self.prepare_data(symbols, num_samples)
|
|
|
|
# Create test dataset and loader
|
|
test_dataset = CNNDataset(features, labels)
|
|
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
|
|
|
|
# Evaluate
|
|
criterion = nn.CrossEntropyLoss()
|
|
test_loss, test_accuracy, test_metrics = self.validate_epoch(
|
|
self.model, test_loader, criterion, epoch=0
|
|
)
|
|
|
|
# Generate detailed classification report
|
|
from sklearn.metrics import classification_report
|
|
class_names = ['BUY', 'SELL', 'HOLD']
|
|
all_predictions = []
|
|
all_labels = []
|
|
|
|
with torch.no_grad():
|
|
for features_batch, labels_batch in test_loader:
|
|
features_batch = features_batch.to(self.device)
|
|
predictions = self.model(features_batch)
|
|
_, predicted = torch.max(predictions['action'].data, 1)
|
|
all_predictions.extend(predicted.cpu().numpy())
|
|
all_labels.extend(labels_batch.numpy())
|
|
|
|
classification_rep = classification_report(
|
|
all_labels, all_predictions, target_names=class_names, output_dict=True
|
|
)
|
|
|
|
evaluation_results = {
|
|
'test_loss': test_loss,
|
|
'test_accuracy': test_accuracy,
|
|
'classification_report': classification_rep,
|
|
'class_accuracies': test_metrics['class_accuracies'],
|
|
'avg_confidence': test_metrics['avg_confidence'],
|
|
'confusion_matrix': test_metrics['confusion_matrix']
|
|
}
|
|
|
|
logger.info(f"Test accuracy: {test_accuracy:.4f}")
|
|
logger.info(f"Test loss: {test_loss:.4f}")
|
|
|
|
return evaluation_results
|
|
|
|
def close_tensorboard(self):
|
|
"""Close TensorBoard writer"""
|
|
if hasattr(self, 'writer'):
|
|
self.writer.close()
|
|
logger.info("TensorBoard writer closed")
|
|
|
|
def __del__(self):
|
|
"""Cleanup"""
|
|
self.close_tensorboard()
|
|
|
|
# Export
|
|
__all__ = ['CNNTrainer', 'CNNDataset'] |