#!/usr/bin/env python3 """ Advanced Transformer Models for High-Frequency Trading Optimized for COB data, technical indicators, and market microstructure """ import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import numpy as np import math import logging from typing import Dict, Any, Optional, Tuple, List, Callable from dataclasses import dataclass import os import json from datetime import datetime # Configure logging logger = logging.getLogger(__name__) @dataclass class TradingTransformerConfig: """Configuration for trading transformer models - SCALED TO 46M PARAMETERS""" # Model architecture - SCALED UP d_model: int = 1024 # Model dimension (2x increase) n_heads: int = 16 # Number of attention heads (2x increase) n_layers: int = 12 # Number of transformer layers (2x increase) d_ff: int = 4096 # Feed-forward dimension (2x increase) dropout: float = 0.1 # Dropout rate # Input dimensions - ENHANCED seq_len: int = 150 # Sequence length for time series (1.5x increase) cob_features: int = 100 # COB feature dimension (2x increase) tech_features: int = 40 # Technical indicator features (2x increase) market_features: int = 30 # Market microstructure features (2x increase) # Output configuration n_actions: int = 3 # BUY, SELL, HOLD confidence_output: bool = True # Output confidence scores # Training configuration - OPTIMIZED FOR LARGER MODEL learning_rate: float = 5e-5 # Reduced for larger model weight_decay: float = 1e-4 # Increased regularization warmup_steps: int = 8000 # More warmup steps max_grad_norm: float = 0.5 # Tighter gradient clipping # Advanced features - ENHANCED use_relative_position: bool = True use_multi_scale_attention: bool = True use_market_regime_detection: bool = True use_uncertainty_estimation: bool = True # NEW: Additional scaling features use_deep_attention: bool = True # Deeper attention mechanisms use_residual_connections: bool = True # Enhanced residual connections use_layer_norm_variants: bool = True # Advanced normalization class PositionalEncoding(nn.Module): """Sinusoidal positional encoding for transformer""" def __init__(self, d_model: int, max_len: int = 5000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x: torch.Tensor) -> torch.Tensor: return x + self.pe[:x.size(0), :] class RelativePositionalEncoding(nn.Module): """Relative positional encoding for better temporal understanding""" def __init__(self, d_model: int, max_relative_position: int = 128): super().__init__() self.d_model = d_model self.max_relative_position = max_relative_position # Learnable relative position embeddings self.relative_position_embeddings = nn.Embedding( 2 * max_relative_position + 1, d_model ) def forward(self, seq_len: int) -> torch.Tensor: """Generate relative position encoding matrix""" range_vec = torch.arange(seq_len) range_mat = range_vec.unsqueeze(0).repeat(seq_len, 1) distance_mat = range_mat - range_mat.transpose(0, 1) # Clip to max relative position distance_mat_clipped = torch.clamp( distance_mat, -self.max_relative_position, self.max_relative_position ) # Shift to positive indices final_mat = distance_mat_clipped + self.max_relative_position return self.relative_position_embeddings(final_mat) class DeepMultiScaleAttention(nn.Module): """Enhanced multi-scale attention with deeper mechanisms for 46M parameter model""" def __init__(self, d_model: int, n_heads: int, scales: List[int] = [1, 3, 5, 7, 11, 15]): super().__init__() self.d_model = d_model self.n_heads = n_heads self.scales = scales self.head_dim = d_model // n_heads assert d_model % n_heads == 0, "d_model must be divisible by n_heads" # Enhanced multi-scale projections with deeper architecture self.scale_projections = nn.ModuleList([ nn.ModuleDict({ 'query': nn.Sequential( nn.Linear(d_model, d_model * 2), nn.GELU(), nn.Dropout(0.1), nn.Linear(d_model * 2, d_model) ), 'key': nn.Sequential( nn.Linear(d_model, d_model * 2), nn.GELU(), nn.Dropout(0.1), nn.Linear(d_model * 2, d_model) ), 'value': nn.Sequential( nn.Linear(d_model, d_model * 2), nn.GELU(), nn.Dropout(0.1), nn.Linear(d_model * 2, d_model) ), 'conv': nn.Sequential( nn.Conv1d(d_model, d_model * 2, kernel_size=scale, padding=scale//2, groups=d_model), nn.GELU(), nn.Conv1d(d_model * 2, d_model, kernel_size=1) ) }) for scale in scales ]) # Enhanced output projection with residual connection self.output_projection = nn.Sequential( nn.Linear(d_model * len(scales), d_model * 2), nn.GELU(), nn.Dropout(0.1), nn.Linear(d_model * 2, d_model) ) # Additional attention mechanisms self.cross_scale_attention = nn.MultiheadAttention( d_model, n_heads // 2, dropout=0.1, batch_first=True ) self.dropout = nn.Dropout(0.1) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor: batch_size, seq_len, _ = x.size() scale_outputs = [] for scale_proj in self.scale_projections: # Apply enhanced temporal convolution for this scale x_conv = scale_proj['conv'](x.transpose(1, 2)).transpose(1, 2) # Enhanced attention computation with deeper projections Q = scale_proj['query'](x_conv).view(batch_size, seq_len, self.n_heads, self.head_dim) K = scale_proj['key'](x_conv).view(batch_size, seq_len, self.n_heads, self.head_dim) V = scale_proj['value'](x_conv).view(batch_size, seq_len, self.n_heads, self.head_dim) # Transpose for attention computation Q = Q.transpose(1, 2) # (batch, n_heads, seq_len, head_dim) K = K.transpose(1, 2) V = V.transpose(1, 2) # Scaled dot-product attention scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim) if mask is not None: scores.masked_fill_(mask == 0, -1e9) attention = F.softmax(scores, dim=-1) attention = self.dropout(attention) output = torch.matmul(attention, V) output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) scale_outputs.append(output) # Combine multi-scale outputs with enhanced projection combined = torch.cat(scale_outputs, dim=-1) output = self.output_projection(combined) # Apply cross-scale attention for better integration cross_attended, _ = self.cross_scale_attention(output, output, output, attn_mask=mask) # Residual connection return output + cross_attended class MarketRegimeDetector(nn.Module): """Market regime detection module for adaptive behavior""" def __init__(self, d_model: int, n_regimes: int = 4): super().__init__() self.d_model = d_model self.n_regimes = n_regimes # Regime classification layers self.regime_classifier = nn.Sequential( nn.Linear(d_model, d_model // 2), nn.ReLU(), nn.Dropout(0.1), nn.Linear(d_model // 2, n_regimes) ) # Regime-specific transformations self.regime_transforms = nn.ModuleList([ nn.Linear(d_model, d_model) for _ in range(n_regimes) ]) def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: # Global pooling for regime detection pooled = torch.mean(x, dim=1) # (batch, d_model) # Classify market regime regime_logits = self.regime_classifier(pooled) regime_probs = F.softmax(regime_logits, dim=-1) # Apply regime-specific transformations regime_outputs = [] for i, transform in enumerate(self.regime_transforms): regime_output = transform(x) # (batch, seq_len, d_model) regime_outputs.append(regime_output) # Weighted combination based on regime probabilities regime_stack = torch.stack(regime_outputs, dim=0) # (n_regimes, batch, seq_len, d_model) regime_weights = regime_probs.unsqueeze(0).unsqueeze(2).unsqueeze(3) # (1, batch, 1, 1, n_regimes) regime_weights = regime_weights.permute(4, 1, 2, 3, 0).squeeze(-1) # (n_regimes, batch, 1, 1) # Weighted sum across regimes adapted_output = torch.sum(regime_stack * regime_weights, dim=0) return adapted_output, regime_probs class UncertaintyEstimation(nn.Module): """Uncertainty estimation using Monte Carlo Dropout""" def __init__(self, d_model: int, n_samples: int = 10): super().__init__() self.d_model = d_model self.n_samples = n_samples self.uncertainty_head = nn.Sequential( nn.Linear(d_model, d_model // 2), nn.ReLU(), nn.Dropout(0.5), # Higher dropout for uncertainty estimation nn.Linear(d_model // 2, 1), nn.Sigmoid() ) def forward(self, x: torch.Tensor, training: bool = False) -> Tuple[torch.Tensor, torch.Tensor]: if training or not self.training: # Single forward pass during training or when not in MC mode uncertainty = self.uncertainty_head(x) return uncertainty, uncertainty # Monte Carlo sampling during inference uncertainties = [] for _ in range(self.n_samples): uncertainty = self.uncertainty_head(x) uncertainties.append(uncertainty) uncertainties = torch.stack(uncertainties, dim=0) mean_uncertainty = torch.mean(uncertainties, dim=0) std_uncertainty = torch.std(uncertainties, dim=0) return mean_uncertainty, std_uncertainty class TradingTransformerLayer(nn.Module): """Enhanced transformer layer for trading applications""" def __init__(self, config: TradingTransformerConfig): super().__init__() self.config = config # Enhanced multi-scale attention or standard attention if config.use_multi_scale_attention: self.attention = DeepMultiScaleAttention(config.d_model, config.n_heads) else: self.attention = nn.MultiheadAttention( config.d_model, config.n_heads, dropout=config.dropout, batch_first=True ) # Feed-forward network self.feed_forward = nn.Sequential( nn.Linear(config.d_model, config.d_ff), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_ff, config.d_model) ) # Layer normalization self.norm1 = nn.LayerNorm(config.d_model) self.norm2 = nn.LayerNorm(config.d_model) # Dropout self.dropout = nn.Dropout(config.dropout) # Market regime detection if config.use_market_regime_detection: self.regime_detector = MarketRegimeDetector(config.d_model) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Dict[str, torch.Tensor]: # Self-attention with residual connection if isinstance(self.attention, DeepMultiScaleAttention): attn_output = self.attention(x, mask) else: attn_output, _ = self.attention(x, x, x, attn_mask=mask) x = self.norm1(x + self.dropout(attn_output)) # Market regime adaptation regime_probs = None if hasattr(self, 'regime_detector'): x, regime_probs = self.regime_detector(x) # Feed-forward with residual connection ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout(ff_output)) return { 'output': x, 'regime_probs': regime_probs } class AdvancedTradingTransformer(nn.Module): """Advanced transformer model for high-frequency trading""" def __init__(self, config: TradingTransformerConfig): super().__init__() self.config = config # Input projections self.price_projection = nn.Linear(5, config.d_model) # OHLCV self.cob_projection = nn.Linear(config.cob_features, config.d_model) self.tech_projection = nn.Linear(config.tech_features, config.d_model) self.market_projection = nn.Linear(config.market_features, config.d_model) # Positional encoding if config.use_relative_position: self.pos_encoding = RelativePositionalEncoding(config.d_model) else: self.pos_encoding = PositionalEncoding(config.d_model, config.seq_len) # Transformer layers self.layers = nn.ModuleList([ TradingTransformerLayer(config) for _ in range(config.n_layers) ]) # Enhanced output heads for 46M parameter model self.action_head = nn.Sequential( nn.Linear(config.d_model, config.d_model), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.n_actions) ) if config.confidence_output: self.confidence_head = nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.d_model // 4), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 4, 1), nn.Sigmoid() ) # Enhanced uncertainty estimation if config.use_uncertainty_estimation: self.uncertainty_estimator = UncertaintyEstimation(config.d_model) # Enhanced price prediction head (auxiliary task) self.price_head = nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.d_model // 4), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 4, 1) ) # Additional specialized heads for 46M model self.volatility_head = nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, 1), nn.Softplus() ) self.trend_strength_head = nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, 1), nn.Tanh() ) # NEW: Next candle OHLCV prediction heads for each timeframe (1s, 1m, 1h, 1d) # Each timeframe predicts: [open, high, low, close, volume] = 5 values self.timeframes = ['1s', '1m', '1h', '1d'] self.next_candle_heads = nn.ModuleDict({ tf: nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.d_model // 4), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 4, 5) # OHLCV: [open, high, low, close, volume] ) for tf in self.timeframes }) # NEW: Next pivot point prediction heads for L1-L5 levels # Each level predicts: [price, type_prob_high, type_prob_low, confidence] # type_prob_high + type_prob_low = 1 (softmax), but we output separately for clarity self.pivot_levels = [1, 2, 3, 4, 5] # L1 to L5 self.pivot_heads = nn.ModuleDict({ f'L{level}': nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.d_model // 4), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 4, 4) # [price, type_prob_high, type_prob_low, confidence] ) for level in self.pivot_levels }) # NEW: Trend vector analysis head (calculates trend from pivot predictions) self.trend_analysis_head = nn.Sequential( nn.Linear(config.d_model, config.d_model // 2), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 2, config.d_model // 4), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(config.d_model // 4, 3) # [angle_radians, steepness, direction] ) # Initialize weights self._init_weights() def _init_weights(self): """Initialize model weights""" for module in self.modules(): if isinstance(module, nn.Linear): nn.init.xavier_uniform_(module.weight) if module.bias is not None: nn.init.zeros_(module.bias) elif isinstance(module, nn.LayerNorm): nn.init.ones_(module.weight) nn.init.zeros_(module.bias) def forward(self, price_data: torch.Tensor, cob_data: torch.Tensor, tech_data: torch.Tensor, market_data: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Dict[str, torch.Tensor]: """ Forward pass of the trading transformer Args: price_data: (batch, seq_len, 5) - OHLCV data cob_data: (batch, seq_len, cob_features) - COB features tech_data: (batch, seq_len, tech_features) - Technical indicators market_data: (batch, seq_len, market_features) - Market microstructure mask: Optional attention mask Returns: Dictionary containing model outputs """ batch_size, seq_len = price_data.shape[:2] # Handle different input dimensions - expand to sequence if needed if cob_data.dim() == 2: # (batch, features) -> (batch, seq_len, features) cob_data = cob_data.unsqueeze(1).expand(batch_size, seq_len, -1) if tech_data.dim() == 2: # (batch, features) -> (batch, seq_len, features) tech_data = tech_data.unsqueeze(1).expand(batch_size, seq_len, -1) if market_data.dim() == 2: # (batch, features) -> (batch, seq_len, features) market_data = market_data.unsqueeze(1).expand(batch_size, seq_len, -1) # Project inputs to model dimension price_emb = self.price_projection(price_data) cob_emb = self.cob_projection(cob_data) tech_emb = self.tech_projection(tech_data) market_emb = self.market_projection(market_data) # Combine embeddings (could also use cross-attention) x = price_emb + cob_emb + tech_emb + market_emb # Add positional encoding if isinstance(self.pos_encoding, RelativePositionalEncoding): # Relative position encoding is applied in attention pass else: x = self.pos_encoding(x.transpose(0, 1)).transpose(0, 1) # Apply transformer layers regime_probs_history = [] for layer in self.layers: layer_output = layer(x, mask) x = layer_output['output'] if layer_output['regime_probs'] is not None: regime_probs_history.append(layer_output['regime_probs']) # Global pooling for final prediction # Use attention-based pooling pooling_weights = F.softmax( torch.sum(x, dim=-1, keepdim=True), dim=1 ) pooled = torch.sum(x * pooling_weights, dim=1) # Generate outputs outputs = {} # Action prediction action_logits = self.action_head(pooled) outputs['action_logits'] = action_logits outputs['action_probs'] = F.softmax(action_logits, dim=-1) # Confidence prediction if self.config.confidence_output: confidence = self.confidence_head(pooled) outputs['confidence'] = confidence # Uncertainty estimation if self.config.use_uncertainty_estimation: uncertainty_mean, uncertainty_std = self.uncertainty_estimator(pooled) outputs['uncertainty_mean'] = uncertainty_mean outputs['uncertainty_std'] = uncertainty_std # Enhanced price prediction (auxiliary task) price_pred = self.price_head(pooled) outputs['price_prediction'] = price_pred # Additional specialized predictions for 46M model volatility_pred = self.volatility_head(pooled) outputs['volatility_prediction'] = volatility_pred trend_strength_pred = self.trend_strength_head(pooled) outputs['trend_strength_prediction'] = trend_strength_pred # NEW: Next candle OHLCV predictions for each timeframe next_candles = {} for tf in self.timeframes: candle_pred = self.next_candle_heads[tf](pooled) # (batch, 5) next_candles[tf] = candle_pred outputs['next_candles'] = next_candles # NEW: Next pivot point predictions for L1-L5 next_pivots = {} for level in self.pivot_levels: pivot_pred = self.pivot_heads[f'L{level}'](pooled) # (batch, 4) # Extract components: [price, type_logit_high, type_logit_low, confidence] # Use softmax to ensure type probabilities sum to 1 type_logits = pivot_pred[:, 1:3] # (batch, 2) - [high, low] type_probs = F.softmax(type_logits, dim=-1) # (batch, 2) next_pivots[f'L{level}'] = { 'price': pivot_pred[:, 0:1], # Keep as (batch, 1) 'type_prob_high': type_probs[:, 0:1], # Probability of high pivot 'type_prob_low': type_probs[:, 1:2], # Probability of low pivot 'pivot_type': torch.argmax(type_probs, dim=-1, keepdim=True), # 0=high, 1=low 'confidence': torch.sigmoid(pivot_pred[:, 3:4]) # Prediction confidence } outputs['next_pivots'] = next_pivots # NEW: Trend vector analysis from pivot predictions trend_analysis = self.trend_analysis_head(pooled) # (batch, 3) outputs['trend_analysis'] = { 'angle_radians': trend_analysis[:, 0:1], # Trend angle in radians 'steepness': F.softplus(trend_analysis[:, 1:2]), # Always positive steepness 'direction': torch.tanh(trend_analysis[:, 2:3]) # -1 to 1 (down to up) } # NEW: Calculate trend vector from pivot predictions # Extract pivot prices and create trend vector pivot_prices = torch.stack([next_pivots[f'L{level}']['price'] for level in self.pivot_levels], dim=1) # (batch, 5, 1) pivot_prices = pivot_prices.squeeze(-1) # (batch, 5) # Calculate trend vector: (price_change, time_change) # Assume equal time spacing between pivot levels time_points = torch.arange(1, len(self.pivot_levels) + 1, dtype=torch.float32, device=pooled.device).unsqueeze(0) # (1, 5) # Calculate trend line slope using linear regression on pivot prices # Trend vector = (delta_price, delta_time) normalized if batch_size > 0: # For each sample, calculate trend from L1 to L5 price_deltas = pivot_prices[:, -1:] - pivot_prices[:, :1] # L5 - L1 price change time_deltas = time_points[:, -1:] - time_points[:, :1] # Time change (should be 4) # Calculate angle and steepness trend_angles = torch.atan2(price_deltas.squeeze(), time_deltas.squeeze()) # (batch,) trend_steepness = torch.sqrt(price_deltas.squeeze() ** 2 + time_deltas.squeeze() ** 2) # (batch,) trend_direction = torch.sign(price_deltas.squeeze()) # (batch,) outputs['trend_vector'] = { 'pivot_prices': pivot_prices, # (batch, 5) - prices for L1-L5 'price_delta': price_deltas.squeeze(), # (batch,) - price change from L1 to L5 'time_delta': time_deltas.squeeze(), # (batch,) - time change 'calculated_angle': trend_angles.unsqueeze(-1), # (batch, 1) 'calculated_steepness': trend_steepness.unsqueeze(-1), # (batch, 1) 'calculated_direction': trend_direction.unsqueeze(-1), # (batch, 1) 'vector': torch.stack([price_deltas.squeeze(), time_deltas.squeeze()], dim=0).unsqueeze(0) if batch_size == 1 else torch.stack([price_deltas.squeeze(), time_deltas.squeeze()], dim=1) # (batch, 2) - [price_delta, time_delta] } else: outputs['trend_vector'] = { 'pivot_prices': pivot_prices, 'price_delta': torch.zeros(batch_size, device=pooled.device), 'time_delta': torch.zeros(batch_size, device=pooled.device), 'calculated_angle': torch.zeros(batch_size, 1, device=pooled.device), 'calculated_steepness': torch.zeros(batch_size, 1, device=pooled.device), 'calculated_direction': torch.zeros(batch_size, 1, device=pooled.device), 'vector': torch.zeros(batch_size, 2, device=pooled.device) } # NEW: Trade action based on trend steepness and angle # Combine predicted trend analysis with calculated trend vector predicted_angle = outputs['trend_analysis']['angle_radians'].squeeze() # (batch,) predicted_steepness = outputs['trend_analysis']['steepness'].squeeze() # (batch,) predicted_direction = outputs['trend_analysis']['direction'].squeeze() # (batch,) # Use calculated trend if available, otherwise use predicted if 'calculated_angle' in outputs['trend_vector']: trend_angle = outputs['trend_vector']['calculated_angle'].squeeze() # (batch,) trend_steepness_val = outputs['trend_vector']['calculated_steepness'].squeeze() # (batch,) else: trend_angle = predicted_angle trend_steepness_val = predicted_steepness # Trade action logic based on trend steepness and angle # Steep upward trend (> 45 degrees) -> BUY # Steep downward trend (< -45 degrees) -> SELL # Shallow trend -> HOLD angle_threshold = math.pi / 4 # 45 degrees # Determine action from trend angle trend_action_logits = torch.zeros(batch_size, 3, device=pooled.device) # [BUY, SELL, HOLD] # Calculate action probabilities based on trend for i in range(batch_size): # Handle both 0-dim and 1-dim tensors if trend_angle.dim() == 0: angle = trend_angle.item() steep = trend_steepness_val.item() else: angle = trend_angle[i].item() steep = trend_steepness_val[i].item() # Normalize steepness to [0, 1] range (assuming max steepness of 10 units) normalized_steepness = min(steep / 10.0, 1.0) if steep > 0 else 0.0 if angle > angle_threshold: # Steep upward trend trend_action_logits[i, 0] = normalized_steepness * 2.0 # BUY trend_action_logits[i, 2] = (1.0 - normalized_steepness) * 0.5 # HOLD elif angle < -angle_threshold: # Steep downward trend trend_action_logits[i, 1] = normalized_steepness * 2.0 # SELL trend_action_logits[i, 2] = (1.0 - normalized_steepness) * 0.5 # HOLD else: # Shallow trend trend_action_logits[i, 2] = 1.0 # HOLD # Combine trend-based action with main action prediction trend_action_probs = F.softmax(trend_action_logits, dim=-1) outputs['trend_based_action'] = { 'logits': trend_action_logits, 'probabilities': trend_action_probs, 'action_idx': torch.argmax(trend_action_probs, dim=-1), 'trend_angle_degrees': trend_angle * 180.0 / math.pi, # Convert to degrees 'trend_steepness': trend_steepness_val } # Market regime information if regime_probs_history: outputs['regime_probs'] = torch.stack(regime_probs_history, dim=1) return outputs def extract_predictions(self, outputs: Dict[str, torch.Tensor], denormalize_prices: Optional[Callable] = None) -> Dict[str, Any]: """ Extract predictions from model outputs in a user-friendly format Args: outputs: Raw model outputs from forward() method denormalize_prices: Optional function to denormalize predicted prices Returns: Dictionary with formatted predictions including: - next_candles: Dict[str, Dict] - OHLCV predictions for each timeframe - next_pivots: Dict[str, Dict] - Pivot predictions for L1-L5 - trend_vector: Dict - Trend vector analysis - trend_based_action: Dict - Trading action based on trend """ self.eval() device = next(self.parameters()).device predictions = {} # Extract next candle predictions for each timeframe if 'next_candles' in outputs: next_candles = {} for tf in self.timeframes: candle_tensor = outputs['next_candles'][tf] if candle_tensor.dim() > 1: candle_tensor = candle_tensor[0] # Take first batch item candle_values = candle_tensor.cpu().detach().numpy() if hasattr(candle_tensor, 'cpu') else candle_tensor if isinstance(candle_values, np.ndarray): candle_values = candle_values.tolist() next_candles[tf] = { 'open': float(candle_values[0]) if len(candle_values) > 0 else 0.0, 'high': float(candle_values[1]) if len(candle_values) > 1 else 0.0, 'low': float(candle_values[2]) if len(candle_values) > 2 else 0.0, 'close': float(candle_values[3]) if len(candle_values) > 3 else 0.0, 'volume': float(candle_values[4]) if len(candle_values) > 4 else 0.0 } # Denormalize if function provided if denormalize_prices and callable(denormalize_prices): for key in ['open', 'high', 'low', 'close']: next_candles[tf][key] = denormalize_prices(next_candles[tf][key]) predictions['next_candles'] = next_candles # Extract pivot point predictions if 'next_pivots' in outputs: next_pivots = {} for level in self.pivot_levels: pivot_data = outputs['next_pivots'][f'L{level}'] # Extract values price = pivot_data['price'] if price.dim() > 1: price = price[0, 0] if price.shape[0] > 0 else torch.tensor(0.0, device=device) price_val = float(price.cpu().detach().item() if hasattr(price, 'cpu') else price) type_prob_high = pivot_data['type_prob_high'] if type_prob_high.dim() > 1: type_prob_high = type_prob_high[0, 0] if type_prob_high.shape[0] > 0 else torch.tensor(0.0, device=device) prob_high = float(type_prob_high.cpu().detach().item() if hasattr(type_prob_high, 'cpu') else type_prob_high) type_prob_low = pivot_data['type_prob_low'] if type_prob_low.dim() > 1: type_prob_low = type_prob_low[0, 0] if type_prob_low.shape[0] > 0 else torch.tensor(0.0, device=device) prob_low = float(type_prob_low.cpu().detach().item() if hasattr(type_prob_low, 'cpu') else type_prob_low) confidence = pivot_data['confidence'] if confidence.dim() > 1: confidence = confidence[0, 0] if confidence.shape[0] > 0 else torch.tensor(0.0, device=device) conf_val = float(confidence.cpu().detach().item() if hasattr(confidence, 'cpu') else confidence) pivot_type = pivot_data.get('pivot_type', torch.tensor(0)) if isinstance(pivot_type, torch.Tensor): if pivot_type.dim() > 1: pivot_type = pivot_type[0, 0] if pivot_type.shape[0] > 0 else torch.tensor(0, device=device) pivot_type_val = int(pivot_type.cpu().detach().item() if hasattr(pivot_type, 'cpu') else pivot_type) else: pivot_type_val = int(pivot_type) # Denormalize price if function provided if denormalize_prices and callable(denormalize_prices): price_val = denormalize_prices(price_val) next_pivots[f'L{level}'] = { 'price': price_val, 'type': 'high' if pivot_type_val == 0 else 'low', 'type_prob_high': prob_high, 'type_prob_low': prob_low, 'confidence': conf_val } predictions['next_pivots'] = next_pivots # Extract trend vector if 'trend_vector' in outputs: trend_vec = outputs['trend_vector'] # Extract pivot prices pivot_prices = trend_vec.get('pivot_prices', torch.zeros(5, device=device)) if isinstance(pivot_prices, torch.Tensor): if pivot_prices.dim() > 1: pivot_prices = pivot_prices[0] pivot_prices_list = pivot_prices.cpu().detach().numpy().tolist() if hasattr(pivot_prices, 'cpu') else pivot_prices.tolist() else: pivot_prices_list = pivot_prices # Denormalize pivot prices if function provided if denormalize_prices and callable(denormalize_prices): pivot_prices_list = [denormalize_prices(p) for p in pivot_prices_list] angle = trend_vec.get('calculated_angle', torch.tensor(0.0, device=device)) if isinstance(angle, torch.Tensor): if angle.dim() > 1: angle = angle[0, 0] if angle.shape[0] > 0 else torch.tensor(0.0, device=device) angle_val = float(angle.cpu().detach().item() if hasattr(angle, 'cpu') else angle) else: angle_val = float(angle) steepness = trend_vec.get('calculated_steepness', torch.tensor(0.0, device=device)) if isinstance(steepness, torch.Tensor): if steepness.dim() > 1: steepness = steepness[0, 0] if steepness.shape[0] > 0 else torch.tensor(0.0, device=device) steepness_val = float(steepness.cpu().detach().item() if hasattr(steepness, 'cpu') else steepness) else: steepness_val = float(steepness) direction = trend_vec.get('calculated_direction', torch.tensor(0.0, device=device)) if isinstance(direction, torch.Tensor): if direction.dim() > 1: direction = direction[0, 0] if direction.shape[0] > 0 else torch.tensor(0.0, device=device) direction_val = float(direction.cpu().detach().item() if hasattr(direction, 'cpu') else direction) else: direction_val = float(direction) price_delta = trend_vec.get('price_delta', torch.tensor(0.0, device=device)) if isinstance(price_delta, torch.Tensor): if price_delta.dim() > 0: price_delta = price_delta[0] if price_delta.shape[0] > 0 else torch.tensor(0.0, device=device) price_delta_val = float(price_delta.cpu().detach().item() if hasattr(price_delta, 'cpu') else price_delta) else: price_delta_val = float(price_delta) predictions['trend_vector'] = { 'pivot_prices': pivot_prices_list, # [L1, L2, L3, L4, L5] 'angle_radians': angle_val, 'angle_degrees': angle_val * 180.0 / math.pi, 'steepness': steepness_val, 'direction': 'up' if direction_val > 0 else 'down' if direction_val < 0 else 'sideways', 'price_delta': price_delta_val } # Extract trend-based action if 'trend_based_action' in outputs: trend_action = outputs['trend_based_action'] action_probs = trend_action.get('probabilities', torch.zeros(3, device=device)) if isinstance(action_probs, torch.Tensor): if action_probs.dim() > 1: action_probs = action_probs[0] action_probs_list = action_probs.cpu().detach().numpy().tolist() if hasattr(action_probs, 'cpu') else action_probs.tolist() else: action_probs_list = action_probs action_idx = trend_action.get('action_idx', torch.tensor(2, device=device)) if isinstance(action_idx, torch.Tensor): if action_idx.dim() > 0: action_idx = action_idx[0] if action_idx.shape[0] > 0 else torch.tensor(2, device=device) action_idx_val = int(action_idx.cpu().detach().item() if hasattr(action_idx, 'cpu') else action_idx) else: action_idx_val = int(action_idx) angle_degrees = trend_action.get('trend_angle_degrees', torch.tensor(0.0, device=device)) if isinstance(angle_degrees, torch.Tensor): if angle_degrees.dim() > 0: angle_degrees = angle_degrees[0] if angle_degrees.shape[0] > 0 else torch.tensor(0.0, device=device) angle_degrees_val = float(angle_degrees.cpu().detach().item() if hasattr(angle_degrees, 'cpu') else angle_degrees) else: angle_degrees_val = float(angle_degrees) steepness = trend_action.get('trend_steepness', torch.tensor(0.0, device=device)) if isinstance(steepness, torch.Tensor): if steepness.dim() > 0: steepness = steepness[0] if steepness.shape[0] > 0 else torch.tensor(0.0, device=device) steepness_val = float(steepness.cpu().detach().item() if hasattr(steepness, 'cpu') else steepness) else: steepness_val = float(steepness) action_names = ['BUY', 'SELL', 'HOLD'] predictions['trend_based_action'] = { 'action': action_names[action_idx_val] if 0 <= action_idx_val < len(action_names) else 'HOLD', 'action_idx': action_idx_val, 'probabilities': { 'BUY': float(action_probs_list[0]) if len(action_probs_list) > 0 else 0.0, 'SELL': float(action_probs_list[1]) if len(action_probs_list) > 1 else 0.0, 'HOLD': float(action_probs_list[2]) if len(action_probs_list) > 2 else 0.0 }, 'trend_angle_degrees': angle_degrees_val, 'trend_steepness': steepness_val } return predictions class TradingTransformerTrainer: """Trainer for the advanced trading transformer""" def __init__(self, model: AdvancedTradingTransformer, config: TradingTransformerConfig): self.model = model self.config = config self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Move model to device self.model.to(self.device) # Optimizer with warmup self.optimizer = optim.AdamW( model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay ) # Learning rate scheduler self.scheduler = optim.lr_scheduler.OneCycleLR( self.optimizer, max_lr=config.learning_rate, total_steps=10000, # Will be updated based on training data pct_start=0.1 ) # Loss functions self.action_criterion = nn.CrossEntropyLoss() self.price_criterion = nn.MSELoss() self.confidence_criterion = nn.BCELoss() # Training history self.training_history = { 'train_loss': [], 'val_loss': [], 'train_accuracy': [], 'val_accuracy': [], 'learning_rates': [] } def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]: """Single training step""" self.model.train() self.optimizer.zero_grad() # Clone and detach batch tensors before moving to device to avoid in-place operation issues # This ensures each batch is independent and prevents gradient computation errors batch = {k: v.detach().clone().to(self.device) for k, v in batch.items()} # Forward pass outputs = self.model( batch['price_data'], batch['cob_data'], batch['tech_data'], batch['market_data'] ) # Calculate losses action_loss = self.action_criterion(outputs['action_logits'], batch['actions']) price_loss = self.price_criterion(outputs['price_prediction'], batch['future_prices']) # Start with base losses - avoid inplace operations on computation graph total_loss = action_loss + 0.1 * price_loss # Weight auxiliary task # Add confidence loss if available if 'confidence' in outputs and 'trade_success' in batch: # Ensure both tensors have compatible shapes for BCELoss # BCELoss requires both inputs to have the same shape confidence_pred = outputs['confidence'] # Keep as [batch_size, 1] trade_target = batch['trade_success'].float() # Reshape target to match prediction shape [batch_size, 1] if trade_target.dim() == 1: trade_target = trade_target.unsqueeze(-1) # Ensure both have same shape if confidence_pred.shape != trade_target.shape: # If shapes still don't match, squeeze both to 1D confidence_pred = confidence_pred.view(-1) trade_target = trade_target.view(-1) confidence_loss = self.confidence_criterion(confidence_pred, trade_target) # Use addition instead of += to avoid inplace operation total_loss = total_loss + 0.1 * confidence_loss # Backward pass total_loss.backward() # Gradient clipping torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.max_grad_norm) # Optimizer step self.optimizer.step() self.scheduler.step() # Calculate accuracy predictions = torch.argmax(outputs['action_logits'], dim=-1) accuracy = (predictions == batch['actions']).float().mean() return { 'total_loss': total_loss.item(), 'action_loss': action_loss.item(), 'price_loss': price_loss.item(), 'accuracy': accuracy.item(), 'learning_rate': self.scheduler.get_last_lr()[0] } def validate(self, val_loader: DataLoader) -> Dict[str, float]: """Validation step""" self.model.eval() total_loss = 0 total_accuracy = 0 num_batches = 0 with torch.no_grad(): for batch in val_loader: batch = {k: v.to(self.device) for k, v in batch.items()} outputs = self.model( batch['price_data'], batch['cob_data'], batch['tech_data'], batch['market_data'] ) # Calculate losses action_loss = self.action_criterion(outputs['action_logits'], batch['actions']) price_loss = self.price_criterion(outputs['price_prediction'], batch['future_prices']) total_loss += action_loss.item() + 0.1 * price_loss.item() # Calculate accuracy predictions = torch.argmax(outputs['action_logits'], dim=-1) accuracy = (predictions == batch['actions']).float().mean() total_accuracy += accuracy.item() num_batches += 1 return { 'val_loss': total_loss / num_batches, 'val_accuracy': total_accuracy / num_batches } def train(self, train_loader: DataLoader, val_loader: DataLoader, epochs: int, save_path: str = "NN/models/saved/"): """Full training loop""" best_val_loss = float('inf') for epoch in range(epochs): # Training epoch_losses = [] epoch_accuracies = [] for batch in train_loader: metrics = self.train_step(batch) epoch_losses.append(metrics['total_loss']) epoch_accuracies.append(metrics['accuracy']) # Validation val_metrics = self.validate(val_loader) # Update history avg_train_loss = np.mean(epoch_losses) avg_train_accuracy = np.mean(epoch_accuracies) self.training_history['train_loss'].append(avg_train_loss) self.training_history['val_loss'].append(val_metrics['val_loss']) self.training_history['train_accuracy'].append(avg_train_accuracy) self.training_history['val_accuracy'].append(val_metrics['val_accuracy']) self.training_history['learning_rates'].append(self.scheduler.get_last_lr()[0]) # Logging logger.info(f"Epoch {epoch+1}/{epochs}") logger.info(f" Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_accuracy:.4f}") logger.info(f" Val Loss: {val_metrics['val_loss']:.4f}, Val Acc: {val_metrics['val_accuracy']:.4f}") logger.info(f" LR: {self.scheduler.get_last_lr()[0]:.6f}") # Save best model if val_metrics['val_loss'] < best_val_loss: best_val_loss = val_metrics['val_loss'] self.save_model(os.path.join(save_path, 'best_transformer_model.pt')) logger.info(f" New best model saved (val_loss: {best_val_loss:.4f})") def save_model(self, path: str): """Save model and training state""" os.makedirs(os.path.dirname(path), exist_ok=True) torch.save({ 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'scheduler_state_dict': self.scheduler.state_dict(), 'config': self.config, 'training_history': self.training_history }, path) logger.info(f"Model saved to {path}") def load_model(self, path: str): """Load model and training state""" checkpoint = torch.load(path, map_location=self.device) self.model.load_state_dict(checkpoint['model_state_dict']) self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) self.scheduler.load_state_dict(checkpoint['scheduler_state_dict']) self.training_history = checkpoint.get('training_history', self.training_history) logger.info(f"Model loaded from {path}") def create_trading_transformer(config: Optional[TradingTransformerConfig] = None) -> Tuple[AdvancedTradingTransformer, TradingTransformerTrainer]: """Factory function to create trading transformer and trainer""" if config is None: config = TradingTransformerConfig() model = AdvancedTradingTransformer(config) trainer = TradingTransformerTrainer(model, config) return model, trainer # Example usage if __name__ == "__main__": # Create configuration config = TradingTransformerConfig( d_model=256, n_heads=8, n_layers=4, seq_len=50, n_actions=3, use_multi_scale_attention=True, use_market_regime_detection=True, use_uncertainty_estimation=True ) # Create model and trainer model, trainer = create_trading_transformer(config) logger.info(f"Created Advanced Trading Transformer with {sum(p.numel() for p in model.parameters())} parameters") logger.info("Model is ready for training on real market data!")