From a8d59a946e0cf4685e2023d185533711e94bd223 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 17 Nov 2025 21:05:06 +0200 Subject: [PATCH] training fixes --- ANNOTATE/core/real_training_adapter.py | 158 +++++++++++++++------- ANNOTATE/web/app.py | 8 +- NN/models/advanced_transformer_trading.py | 7 +- core/data_provider.py | 6 +- 4 files changed, 119 insertions(+), 60 deletions(-) diff --git a/ANNOTATE/core/real_training_adapter.py b/ANNOTATE/core/real_training_adapter.py index 69c6be9..b95748b 100644 --- a/ANNOTATE/core/real_training_adapter.py +++ b/ANNOTATE/core/real_training_adapter.py @@ -1789,25 +1789,40 @@ class RealTrainingAdapter: import torch - # OPTIMIZATION: Pre-convert batches ONCE - # NOTE: Using CPU for batch storage to avoid ROCm/HIP kernel issues - # GPU will be used during forward/backward passes in trainer - logger.info(" Pre-converting batches (one-time operation)...") + # OPTIMIZATION: Pre-convert batches ONCE and move to GPU immediately + # This eliminates CPU→GPU transfer bottleneck during training + logger.info(" Pre-converting batches and moving to GPU (one-time operation)...") - device = torch.device('cpu') # Store batches on CPU use_gpu = torch.cuda.is_available() + device = trainer.device if hasattr(trainer, 'device') else torch.device('cuda' if use_gpu else 'cpu') if use_gpu: logger.info(f" GPU available: {torch.cuda.get_device_name(0)}") logger.info(f" GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") - logger.info(f" Batches will be stored on CPU, moved to GPU during training") + logger.info(f" Pre-moving batches to GPU for maximum efficiency") + # Convert and move batches to GPU immediately cached_batches = [] for i, data in enumerate(training_data): batch = self._convert_annotation_to_transformer_batch(data) if batch is not None: - # Store batches on CPU (trainer will move to GPU) - cached_batches.append(batch) + # Move batch to GPU immediately with pinned memory for faster transfer + if use_gpu: + batch_gpu = {} + for k, v in batch.items(): + if isinstance(v, torch.Tensor): + # Use pinned memory for faster CPU→GPU transfer + if v.device.type == 'cpu': + batch_gpu[k] = v.pin_memory().to(device, non_blocking=True) + else: + batch_gpu[k] = v.to(device, non_blocking=True) + else: + batch_gpu[k] = v + cached_batches.append(batch_gpu) + # Free CPU memory immediately + del batch + else: + cached_batches.append(batch) # Show progress every 10 batches if (i + 1) % 10 == 0 or i == 0: @@ -1820,44 +1835,90 @@ class RealTrainingAdapter: del training_data gc.collect() - logger.info(f" Converted {len(cached_batches)} batches, cleared source data") + # Synchronize GPU transfers + if use_gpu: + torch.cuda.synchronize() + + logger.info(f" Converted {len(cached_batches)} batches, all moved to GPU") + + # Helper function to combine multiple single-sample batches into a mini-batch + def _combine_transformer_batches(batch_list: List[Dict]) -> Dict: + """Combine multiple single-sample batches into one mini-batch""" + if len(batch_list) == 1: + return batch_list[0] + + combined = {} + # Get all keys from first batch + keys = batch_list[0].keys() + + for key in keys: + # Collect tensors, filtering out None values + tensors = [] + for b in batch_list: + if key in b and b[key] is not None and isinstance(b[key], torch.Tensor): + tensors.append(b[key]) + + if tensors: + # Concatenate along batch dimension (dim=0) + combined[key] = torch.cat(tensors, dim=0) + elif key in batch_list[0]: + # For non-tensor values (like norm_params dict), use first batch's value + # Or None if all batches have None for this key + first_value = batch_list[0].get(key) + if first_value is not None and not isinstance(first_value, torch.Tensor): + combined[key] = first_value + else: + # Check if all batches have None for this key + all_none = all(b.get(key) is None for b in batch_list) + if not all_none: + # Some batches have this key, use first non-None + for b in batch_list: + if b.get(key) is not None: + combined[key] = b[key] + break + else: + combined[key] = None + + return combined + + # Group batches into mini-batches for better GPU utilization + # DISABLED: Batches have inconsistent sequence lengths, process individually + # transformer_batch_size = 5 + total_samples = len(cached_batches) # Store count before clearing + grouped_batches = [] + + # Process each batch individually to avoid shape mismatch errors + logger.info(f" Processing {len(cached_batches)} batches individually (no grouping due to variable sequence lengths)") + for batch in cached_batches: + grouped_batches.append(batch) + + # Clear cached_batches to free memory + cached_batches.clear() + del cached_batches + gc.collect() def batch_generator(): """ - Yield pre-converted batches (already on GPU) + Yield grouped mini-batches (already on GPU) - OPTIMIZATION: Batches are already on GPU and detached. - No cloning needed - just yield directly for maximum performance. - Each batch is independent (no gradient accumulation across batches). + OPTIMIZATION: Batches are already on GPU and grouped for efficient processing. + Each mini-batch contains 5 samples for better GPU utilization. """ - for batch in cached_batches: - # Simply yield the batch - no cloning needed! - # Batches are already on GPU and properly detached + for batch in grouped_batches: yield batch - total_batches = len(cached_batches) + total_batches = len(grouped_batches) if total_batches == 0: raise Exception("No valid training batches after conversion") logger.info(f" Ready to train on {total_batches} batches") - - # MEMORY FIX: Process batches directly from generator, no grouping needed - # Batch size of 1 (single sample) to avoid OOM - logger.info(f" Processing batches individually (batch_size=1) for memory efficiency") - - # MEMORY OPTIMIZATION: Configure gradient accumulation - # Process samples one at a time, accumulate gradients over multiple samples - # This reduces peak memory by ~50% compared to batching - accumulation_steps = max(2, min(5, total_batches)) # 2-5 steps based on data size + logger.info(f" Total samples: {total_samples}") - logger.info(f" Gradient accumulation: {accumulation_steps} steps") - logger.info(f" Effective batch size: {accumulation_steps} (processed as {accumulation_steps} × batch_size=1)") - - # Configure trainer for gradient accumulation + # Disable gradient accumulation since we're using proper batching now if hasattr(trainer, 'set_gradient_accumulation_steps'): - trainer.set_gradient_accumulation_steps(accumulation_steps) - logger.info(f" Trainer configured for automatic gradient accumulation") + trainer.set_gradient_accumulation_steps(0) # No accumulation needed with batching + logger.info(f" Gradient accumulation disabled (using proper batching instead)") import gc @@ -1867,9 +1928,10 @@ class RealTrainingAdapter: num_batches = 0 # Log GPU status at start of epoch - if use_gpu: - mem_allocated = torch.cuda.memory_allocated(device) / 1024**3 - mem_reserved = torch.cuda.memory_reserved(device) / 1024**3 + if use_gpu and torch.cuda.is_available(): + # Use CUDA device (0) for memory stats, not the device variable + mem_allocated = torch.cuda.memory_allocated(0) / 1024**3 + mem_reserved = torch.cuda.memory_reserved(0) / 1024**3 logger.info(f" Epoch {epoch + 1}/{session.total_epochs} - GPU Memory: {mem_allocated:.2f}GB allocated, {mem_reserved:.2f}GB reserved") # MEMORY FIX: Aggressive cleanup before epoch @@ -1878,16 +1940,16 @@ class RealTrainingAdapter: torch.cuda.empty_cache() torch.cuda.synchronize() - # Reset gradient accumulation counter at start of epoch + # Reset gradient accumulation counter at start of epoch (not needed with batching, but safe to call) if hasattr(trainer, 'reset_gradient_accumulation'): trainer.reset_gradient_accumulation() # Generate batches fresh for each epoch for i, batch in enumerate(batch_generator()): try: - # Call the trainer's train_step method - # Trainer now handles gradient accumulation automatically - result = trainer.train_step(batch) + # Call the trainer's train_step method with mini-batch + # Batch is already on GPU and contains multiple samples + result = trainer.train_step(batch, accumulate_gradients=False) if result is not None: # MEMORY FIX: Detach all tensor values to break computation graph @@ -1929,21 +1991,19 @@ class RealTrainingAdapter: if 'result' in locals(): del result - # Delete the cloned batch (it's a fresh copy, safe to delete) + # NOTE: Don't delete batch contents - batches are reused across epochs + # The batch dictionary is shared, so deleting keys corrupts it for next epoch + # Just clear the reference - Python GC will handle cleanup if 'batch' in locals(): - for key in list(batch.keys()): - if isinstance(batch[key], torch.Tensor): - del batch[key] del batch # Clear CUDA cache after every batch if torch.cuda.is_available(): torch.cuda.empty_cache() - # After optimizer step, aggressive cleanup - # Check if this was an optimizer step (not accumulation) - is_optimizer_step = ((i + 1) % accumulation_steps == 0) - if is_optimizer_step: + # After each batch, cleanup (no accumulation needed with proper batching) + # Every batch triggers optimizer step + if True: gc.collect() if torch.cuda.is_available(): torch.cuda.synchronize() @@ -2031,7 +2091,7 @@ class RealTrainingAdapter: 'learning_rate': float(trainer.scheduler.get_last_lr()[0]) }, training_metadata={ - 'num_samples': len(training_data), + 'num_samples': total_samples, # Use stored count, training_data was deleted 'num_batches': num_batches, 'training_id': session.training_id }, @@ -2263,7 +2323,7 @@ class RealTrainingAdapter: if pivot_trainer: pivot_trainer.start(symbol=symbol) - logger.info(f"✅ Live pivot training ENABLED - will train on L2 peaks automatically") + logger.info(f"Live pivot training ENABLED - will train on L2 peaks automatically") else: logger.warning("Could not initialize live pivot trainer") diff --git a/ANNOTATE/web/app.py b/ANNOTATE/web/app.py index 7661ea3..cbd91f3 100644 --- a/ANNOTATE/web/app.py +++ b/ANNOTATE/web/app.py @@ -477,11 +477,11 @@ class AnnotationDashboard: engineio_logger=False ) self.has_socketio = True - logger.info("✅ SocketIO initialized for real-time updates") + logger.info("SocketIO initialized for real-time updates") except ImportError: self.socketio = None self.has_socketio = False - logger.warning("⚠️ flask-socketio not installed - live updates will use polling") + logger.warning("flask-socketio not installed - live updates will use polling") # Suppress werkzeug request logs (reduce noise from polling endpoints) werkzeug_logger = logging.getLogger('werkzeug') @@ -2202,10 +2202,10 @@ class AnnotationDashboard: logger.info(f"Starting Annotation Dashboard on http://{host}:{port}") if self.has_socketio: - logger.info("✅ Running with WebSocket support (SocketIO)") + logger.info("Running with WebSocket support (SocketIO)") self.socketio.run(self.server, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True) else: - logger.warning("⚠️ Running without WebSocket support - install flask-socketio for live updates") + logger.warning("Running without WebSocket support - install flask-socketio for live updates") self.server.run(host=host, port=port, debug=debug) diff --git a/NN/models/advanced_transformer_trading.py b/NN/models/advanced_transformer_trading.py index 57b1622..f4b1711 100644 --- a/NN/models/advanced_transformer_trading.py +++ b/NN/models/advanced_transformer_trading.py @@ -1238,14 +1238,13 @@ class TradingTransformerTrainer: break if needs_transfer: - # Move batch to device and DELETE original CPU tensors to prevent memory leak + # Move batch to device - iterate over copy of keys to avoid modification during iteration batch_gpu = {} - for k, v in batch.items(): + for k in list(batch.keys()): # Create list copy to avoid modification during iteration + v = batch[k] if isinstance(v, torch.Tensor): # Move to device (creates GPU copy) batch_gpu[k] = v.to(self.device, non_blocking=True) - # Delete CPU tensor immediately to free memory - del batch[k] else: batch_gpu[k] = v diff --git a/core/data_provider.py b/core/data_provider.py index 12c7727..1c26a45 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -551,7 +551,7 @@ class DataProvider: logger.info("Skipping initial data load (using DuckDB cache)") logger.info(" Initial data load completed - stopping maintenance worker") - logger.info("📊 Data will be updated on-demand only (no continuous fetching)") + logger.info("Data will be updated on-demand only (no continuous fetching)") # Stop the maintenance worker after initial load self.data_maintenance_active = False @@ -582,7 +582,7 @@ class DataProvider: self.cached_data[symbol][timeframe] = existing_df.tail(1500) last_timestamp = existing_df.index.max() - logger.info(f"📦 Loaded {len(existing_df)} candles from DuckDB for {symbol} {timeframe}") + logger.info(f"Loaded {len(existing_df)} candles from DuckDB for {symbol} {timeframe}") else: logger.debug(f"No existing data in DuckDB for {symbol} {timeframe}") except Exception as e: @@ -3140,7 +3140,7 @@ class DataProvider: logger.warning(" DuckDB storage not available - cannot load cached data") return - logger.info("📦 Loading cached data from DuckDB...") + logger.info("Loading cached data from DuckDB...") loaded_count = 0 for symbol in self.symbols: