restart script

This commit is contained in:
Dobromir Popov
2025-06-19 16:07:05 +03:00
parent 7b4fba3b4c
commit bf55ba5b51
3 changed files with 488 additions and 103 deletions

View File

@ -69,20 +69,30 @@ class ResidualBlock(nn.Module):
super().__init__()
self.conv1 = nn.Conv1d(channels, channels, kernel_size=3, padding=1)
self.conv2 = nn.Conv1d(channels, channels, kernel_size=3, padding=1)
self.norm1 = nn.BatchNorm1d(channels)
self.norm2 = nn.BatchNorm1d(channels)
self.norm1 = nn.GroupNorm(1, channels) # Changed from BatchNorm1d to GroupNorm
self.norm2 = nn.GroupNorm(1, channels) # Changed from BatchNorm1d to GroupNorm
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
residual = x
# Create completely independent copy for residual connection
residual = x.detach().clone()
out = F.relu(self.norm1(self.conv1(x)))
# First convolution branch - ensure no memory sharing
out = self.conv1(x)
out = self.norm1(out)
out = F.relu(out)
out = self.dropout(out)
out = self.norm2(self.conv2(out))
# Add residual connection (avoid in-place operation)
out = out + residual
return F.relu(out)
# Second convolution branch
out = self.conv2(out)
out = self.norm2(out)
# Residual connection - create completely new tensor
# Avoid any potential in-place operations or memory sharing
combined = residual + out
result = F.relu(combined)
return result
class SpatialAttentionBlock(nn.Module):
"""Spatial attention for feature maps"""
@ -144,11 +154,11 @@ class EnhancedCNNModel(nn.Module):
# Feature fusion with more capacity
self.feature_fusion = nn.Sequential(
nn.Conv1d(base_channels * 4, base_channels * 3, kernel_size=1), # 4 paths now
nn.BatchNorm1d(base_channels * 3),
nn.GroupNorm(1, base_channels * 3), # Changed from BatchNorm1d to GroupNorm
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Conv1d(base_channels * 3, base_channels * 2, kernel_size=1),
nn.BatchNorm1d(base_channels * 2),
nn.GroupNorm(1, base_channels * 2), # Changed from BatchNorm1d to GroupNorm
nn.ReLU(),
nn.Dropout(dropout_rate)
)
@ -258,22 +268,22 @@ class EnhancedCNNModel(nn.Module):
# Initialize weights
self._initialize_weights()
def _build_conv_path(self, in_channels: int, out_channels: int, kernel_size: int) -> nn.Module:
"""Build a convolutional path with multiple layers"""
return nn.Sequential(
nn.Conv1d(in_channels, out_channels, kernel_size, padding=kernel_size//2),
nn.BatchNorm1d(out_channels),
nn.GroupNorm(1, out_channels), # Changed from BatchNorm1d to GroupNorm
nn.ReLU(),
nn.Dropout(0.1),
nn.Conv1d(out_channels, out_channels, kernel_size, padding=kernel_size//2),
nn.BatchNorm1d(out_channels),
nn.GroupNorm(1, out_channels), # Changed from BatchNorm1d to GroupNorm
nn.ReLU(),
nn.Dropout(0.1),
nn.Conv1d(out_channels, out_channels, kernel_size, padding=kernel_size//2),
nn.BatchNorm1d(out_channels),
nn.GroupNorm(1, out_channels), # Changed from BatchNorm1d to GroupNorm
nn.ReLU()
)
@ -288,19 +298,28 @@ class EnhancedCNNModel(nn.Module):
nn.init.xavier_normal_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm1d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, (nn.BatchNorm1d, nn.GroupNorm, nn.LayerNorm)):
if hasattr(m, 'weight') and m.weight is not None:
nn.init.constant_(m.weight, 1)
if hasattr(m, 'bias') and m.bias is not None:
nn.init.constant_(m.bias, 0)
def _memory_barrier(self, tensor: torch.Tensor) -> torch.Tensor:
"""Create a memory barrier to prevent in-place operation issues"""
return tensor.detach().clone().requires_grad_(tensor.requires_grad)
def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
"""
Forward pass with multiple outputs
Forward pass with multiple outputs - completely avoiding in-place operations
Args:
x: Input tensor of shape [batch_size, sequence_length, features]
Returns:
Dictionary with predictions, confidence, regime, and volatility
"""
# Handle input shapes flexibly
# Apply memory barrier to input
x = self._memory_barrier(x)
# Handle input shapes flexibly - create new tensors to avoid memory sharing
if len(x.shape) == 2:
# Input is [seq_len, features] - add batch dimension
x = x.unsqueeze(0)
@ -308,76 +327,96 @@ class EnhancedCNNModel(nn.Module):
# Input has extra dimensions - flatten to [batch, seq, features]
x = x.view(x.shape[0], -1, x.shape[-1])
x = self._memory_barrier(x) # Apply barrier after shape changes
batch_size, seq_len, features = x.shape
# Reshape for processing: [batch, seq, features] -> [batch*seq, features]
x_reshaped = x.view(-1, features)
x_reshaped = self._memory_barrier(x_reshaped)
# Input embedding
embedded = self.input_embedding(x_reshaped) # [batch*seq, base_channels]
embedded = self._memory_barrier(embedded)
# Reshape back for conv1d: [batch*seq, channels] -> [batch, channels, seq]
embedded = embedded.view(batch_size, seq_len, -1).transpose(1, 2)
embedded = embedded.view(batch_size, seq_len, -1).transpose(1, 2).contiguous()
embedded = self._memory_barrier(embedded)
# Multi-scale feature extraction
path1 = self.conv_path1(embedded)
path2 = self.conv_path2(embedded)
path3 = self.conv_path3(embedded)
path4 = self.conv_path4(embedded)
# Multi-scale feature extraction - ensure each path creates independent tensors
path1 = self._memory_barrier(self.conv_path1(embedded))
path2 = self._memory_barrier(self.conv_path2(embedded))
path3 = self._memory_barrier(self.conv_path3(embedded))
path4 = self._memory_barrier(self.conv_path4(embedded))
# Feature fusion
# Feature fusion - create new tensor
fused_features = torch.cat([path1, path2, path3, path4], dim=1)
fused_features = self.feature_fusion(fused_features)
fused_features = self._memory_barrier(self.feature_fusion(fused_features))
# Apply residual blocks with spatial attention
current_features = fused_features
current_features = self._memory_barrier(fused_features)
for i, (res_block, attention) in enumerate(zip(self.residual_blocks, self.spatial_attention)):
current_features = res_block(current_features)
current_features = self._memory_barrier(res_block(current_features))
if i % 2 == 0: # Apply attention every other block
current_features = attention(current_features)
current_features = self._memory_barrier(attention(current_features))
# Apply remaining residual blocks
for res_block in self.residual_blocks[len(self.spatial_attention):]:
current_features = res_block(current_features)
current_features = self._memory_barrier(res_block(current_features))
# Temporal attention - apply both attention layers
# Reshape for attention: [batch, channels, seq] -> [batch, seq, channels]
attention_input = current_features.transpose(1, 2)
attended_features = self.temporal_attention1(attention_input)
attended_features = self.temporal_attention2(attended_features)
attention_input = current_features.transpose(1, 2).contiguous()
attention_input = self._memory_barrier(attention_input)
attended_features = self._memory_barrier(self.temporal_attention1(attention_input))
attended_features = self._memory_barrier(self.temporal_attention2(attended_features))
# Back to conv format: [batch, seq, channels] -> [batch, channels, seq]
attended_features = attended_features.transpose(1, 2)
attended_features = attended_features.transpose(1, 2).contiguous()
attended_features = self._memory_barrier(attended_features)
# Global aggregation
avg_pooled = self.global_pool(attended_features).squeeze(-1) # [batch, channels]
max_pooled = self.global_max_pool(attended_features).squeeze(-1) # [batch, channels]
# Global aggregation - create independent tensors
avg_pooled = self.global_pool(attended_features)
avg_pooled = self._memory_barrier(avg_pooled.view(avg_pooled.shape[0], -1)) # Flatten instead of squeeze
# Combine global features
max_pooled = self.global_max_pool(attended_features)
max_pooled = self._memory_barrier(max_pooled.view(max_pooled.shape[0], -1)) # Flatten instead of squeeze
# Combine global features - create new tensor
global_features = torch.cat([avg_pooled, max_pooled], dim=1)
global_features = self._memory_barrier(global_features)
# Advanced feature processing
processed_features = self.advanced_features(global_features)
processed_features = self._memory_barrier(self.advanced_features(global_features))
# Multi-task predictions
regime_probs = self.regime_detector(processed_features)
volatility_pred = self.volatility_predictor(processed_features)
confidence = self.confidence_head(processed_features)
# Multi-task predictions - ensure each creates independent tensors
regime_probs = self._memory_barrier(self.regime_detector(processed_features))
volatility_pred = self._memory_barrier(self.volatility_predictor(processed_features))
confidence = self._memory_barrier(self.confidence_head(processed_features))
# Combine all features for final decision (8 regime classes + 1 volatility)
combined_features = torch.cat([processed_features, regime_probs, volatility_pred], dim=1)
trading_logits = self.decision_head(combined_features)
# Create completely independent tensors for concatenation
vol_pred_flat = self._memory_barrier(volatility_pred.view(volatility_pred.shape[0], -1)) # Flatten instead of squeeze
combined_features = torch.cat([processed_features, regime_probs, vol_pred_flat], dim=1)
combined_features = self._memory_barrier(combined_features)
# Apply temperature scaling for better calibration
trading_logits = self._memory_barrier(self.decision_head(combined_features))
# Apply temperature scaling for better calibration - create new tensor
temperature = 1.5
trading_probs = F.softmax(trading_logits / temperature, dim=1)
scaled_logits = trading_logits / temperature
trading_probs = self._memory_barrier(F.softmax(scaled_logits, dim=1))
# Flatten confidence to ensure consistent shape
confidence_flat = self._memory_barrier(confidence.view(confidence.shape[0], -1))
volatility_flat = self._memory_barrier(volatility_pred.view(volatility_pred.shape[0], -1))
return {
'logits': trading_logits,
'probabilities': trading_probs,
'confidence': confidence.squeeze(-1),
'regime': regime_probs,
'volatility': volatility_pred.squeeze(-1),
'features': processed_features
'logits': self._memory_barrier(trading_logits),
'probabilities': self._memory_barrier(trading_probs),
'confidence': confidence_flat[:, 0] if confidence_flat.shape[1] > 0 else confidence_flat.view(-1)[0],
'regime': self._memory_barrier(regime_probs),
'volatility': volatility_flat[:, 0] if volatility_flat.shape[1] > 0 else volatility_flat.view(-1)[0],
'features': self._memory_barrier(processed_features)
}
def predict(self, feature_matrix: np.ndarray) -> Dict[str, Any]:
@ -478,60 +517,128 @@ class CNNModelTrainer:
self.training_history = []
def reset_computational_graph(self):
"""Reset the computational graph to prevent in-place operation issues"""
try:
# Clear all gradients
for param in self.model.parameters():
param.grad = None
# Force garbage collection
import gc
gc.collect()
# Clear CUDA cache if available
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
# Reset optimizer state if needed
for group in self.optimizer.param_groups:
for param in group['params']:
if param in self.optimizer.state:
# Clear momentum buffers that might have stale references
self.optimizer.state[param] = {}
except Exception as e:
logger.warning(f"Error during computational graph reset: {e}")
def train_step(self, x: torch.Tensor, y: torch.Tensor,
confidence_targets: Optional[torch.Tensor] = None,
regime_targets: Optional[torch.Tensor] = None,
volatility_targets: Optional[torch.Tensor] = None) -> Dict[str, float]:
"""Single training step with multi-task learning"""
"""Single training step with multi-task learning and robust error handling"""
self.model.train()
self.optimizer.zero_grad()
# Reset computational graph before each training step
self.reset_computational_graph()
# Forward pass
outputs = self.model(x)
# Main trading loss
main_loss = self.main_criterion(outputs['logits'], y)
total_loss = main_loss
losses = {'main_loss': main_loss.item()}
# Confidence loss (if targets provided)
if confidence_targets is not None:
conf_loss = self.confidence_criterion(outputs['confidence'], confidence_targets)
total_loss += 0.1 * conf_loss
losses['confidence_loss'] = conf_loss.item()
# Regime classification loss (if targets provided)
if regime_targets is not None:
regime_loss = self.regime_criterion(outputs['regime'], regime_targets)
total_loss += 0.05 * regime_loss
losses['regime_loss'] = regime_loss.item()
# Volatility prediction loss (if targets provided)
if volatility_targets is not None:
vol_loss = self.volatility_criterion(outputs['volatility'], volatility_targets)
total_loss += 0.05 * vol_loss
losses['volatility_loss'] = vol_loss.item()
losses['total_loss'] = total_loss.item()
# Backward pass
total_loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
self.scheduler.step()
# Calculate accuracy
with torch.no_grad():
predictions = torch.argmax(outputs['probabilities'], dim=1)
accuracy = (predictions == y).float().mean().item()
losses['accuracy'] = accuracy
return losses
try:
self.model.train()
# Ensure inputs are completely independent from original tensors
x_train = x.detach().clone().requires_grad_(False).to(self.device)
y_train = y.detach().clone().requires_grad_(False).to(self.device)
# Forward pass with error handling
try:
outputs = self.model(x_train)
except RuntimeError as forward_error:
if "modified by an inplace operation" in str(forward_error):
logger.error(f"In-place operation in forward pass: {forward_error}")
self.reset_computational_graph()
return {'main_loss': 0.0, 'total_loss': 0.0, 'accuracy': 0.5}
else:
raise forward_error
# Calculate main loss with detached outputs to prevent memory sharing
main_loss = self.main_criterion(outputs['logits'], y_train)
total_loss = main_loss
losses = {'main_loss': main_loss.item()}
# Add auxiliary losses if targets provided
if confidence_targets is not None:
conf_targets = confidence_targets.detach().clone().to(self.device)
conf_loss = self.confidence_criterion(outputs['confidence'], conf_targets)
total_loss = total_loss + 0.1 * conf_loss
losses['confidence_loss'] = conf_loss.item()
if regime_targets is not None:
regime_targets_clean = regime_targets.detach().clone().to(self.device)
regime_loss = self.regime_criterion(outputs['regime'], regime_targets_clean)
total_loss = total_loss + 0.05 * regime_loss
losses['regime_loss'] = regime_loss.item()
if volatility_targets is not None:
vol_targets = volatility_targets.detach().clone().to(self.device)
vol_loss = self.volatility_criterion(outputs['volatility'], vol_targets)
total_loss = total_loss + 0.05 * vol_loss
losses['volatility_loss'] = vol_loss.item()
losses['total_loss'] = total_loss.item()
# Backward pass with comprehensive error handling
try:
total_loss.backward()
except RuntimeError as backward_error:
if "modified by an inplace operation" in str(backward_error):
logger.error(f"In-place operation during backward pass: {backward_error}")
logger.error("Attempting to continue training with gradient reset...")
# Comprehensive cleanup
self.reset_computational_graph()
return {'main_loss': losses.get('main_loss', 0.0), 'total_loss': losses.get('total_loss', 0.0), 'accuracy': 0.5}
else:
raise backward_error
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# Optimizer step
self.optimizer.step()
self.scheduler.step()
# Calculate accuracy with detached tensors
with torch.no_grad():
predictions = torch.argmax(outputs['probabilities'], dim=1)
accuracy = (predictions == y_train).float().mean().item()
losses['accuracy'] = accuracy
return losses
except Exception as e:
logger.error(f"Training step failed with unexpected error: {e}")
logger.error(f"Error type: {type(e).__name__}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
# Comprehensive cleanup on any error
self.reset_computational_graph()
# Return safe dummy values to continue training
return {'main_loss': 0.0, 'total_loss': 0.0, 'accuracy': 0.5}
def save_model(self, filepath: str, metadata: Optional[Dict] = None):
"""Save model with metadata"""
@ -610,7 +717,7 @@ class CNNModel:
feature_dim=input_shape[1],
output_size=output_size
)
self.trainer = CNNModelTrainer(self.model, device=self.device)
self.trainer = CNNModelTrainer(self.model, device=str(self.device))
logger.info(f"CNN Model wrapper initialized: input_shape={input_shape}, output_size={output_size}")