Files
gogo2/utils/training_integration.py
2025-09-02 15:30:05 +03:00

234 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Training Integration for Checkpoint Management
"""
import logging
import torch
from datetime import datetime
from typing import Dict, Any, Optional
from pathlib import Path
from .checkpoint_manager import get_checkpoint_manager, save_checkpoint, load_best_checkpoint
logger = logging.getLogger(__name__)
class TrainingIntegration:
def __init__(self, enable_wandb: bool = False):
self.checkpoint_manager = get_checkpoint_manager()
self.enable_wandb = enable_wandb
if self.enable_wandb:
self._init_wandb()
def _init_wandb(self):
# Disabled by default to avoid CLI prompts
pass
def save_cnn_checkpoint(self,
cnn_model,
model_name: str,
epoch: int,
train_accuracy: float,
val_accuracy: float,
train_loss: float,
val_loss: float,
training_time_hours: float = None) -> bool:
try:
performance_metrics = {
'accuracy': train_accuracy,
'val_accuracy': val_accuracy,
'loss': train_loss,
'val_loss': val_loss
}
training_metadata = {
'epoch': epoch,
'training_time_hours': training_time_hours,
'total_parameters': self._count_parameters(cnn_model)
}
# W&B disabled
metadata = save_checkpoint(
model=cnn_model,
model_name=model_name,
model_type='cnn',
performance_metrics=performance_metrics,
training_metadata=training_metadata
)
if metadata:
logger.info(f"CNN checkpoint saved: {metadata.checkpoint_id}")
return True
else:
logger.info(f"CNN checkpoint not saved (performance not improved)")
return False
except Exception as e:
logger.error(f"Error saving CNN checkpoint: {e}")
return False
def save_rl_checkpoint(self,
rl_agent,
model_name: str,
episode: int,
avg_reward: float,
best_reward: float,
epsilon: float,
total_pnl: float = None) -> bool:
try:
performance_metrics = {
'reward': avg_reward,
'best_reward': best_reward
}
if total_pnl is not None:
performance_metrics['pnl'] = total_pnl
training_metadata = {
'episode': episode,
'epsilon': epsilon,
'total_parameters': self._count_parameters(rl_agent)
}
# W&B disabled
metadata = save_checkpoint(
model=rl_agent,
model_name=model_name,
model_type='rl',
performance_metrics=performance_metrics,
training_metadata=training_metadata
)
if metadata:
logger.info(f"RL checkpoint saved: {metadata.checkpoint_id}")
return True
else:
logger.info(f"RL checkpoint not saved (performance not improved)")
return False
except Exception as e:
logger.error(f"Error saving RL checkpoint: {e}")
return False
def load_best_model(self, model_name: str, model_class=None):
try:
result = load_best_checkpoint(model_name)
if not result:
logger.warning(f"No checkpoint found for model: {model_name}")
return None
file_path, metadata = result
checkpoint = torch.load(file_path, map_location='cpu')
logger.info(f"Loaded best checkpoint for {model_name}:")
logger.info(f" Performance score: {metadata.performance_score:.4f}")
logger.info(f" Created: {metadata.created_at}")
if model_class and 'model_state_dict' in checkpoint:
model = model_class()
model.load_state_dict(checkpoint['model_state_dict'])
return model
return checkpoint
except Exception as e:
logger.error(f"Error loading best model {model_name}: {e}")
return None
def _count_parameters(self, model) -> int:
try:
if hasattr(model, 'parameters'):
return sum(p.numel() for p in model.parameters())
elif hasattr(model, 'policy_net'):
policy_params = sum(p.numel() for p in model.policy_net.parameters())
target_params = sum(p.numel() for p in model.target_net.parameters()) if hasattr(model, 'target_net') else 0
return policy_params + target_params
else:
return 0
except Exception:
return 0
_training_integration = None
def get_training_integration() -> TrainingIntegration:
global _training_integration
if _training_integration is None:
_training_integration = TrainingIntegration()
return _training_integration
# ---------------- Unified Training Manager ----------------
class UnifiedTrainingManager:
"""Single entry point to manage all training in the system.
Coordinates EnhancedRealtimeTrainingSystem and provides start/stop/status.
"""
def __init__(self, orchestrator, data_provider, dashboard=None):
self.orchestrator = orchestrator
self.data_provider = data_provider
self.dashboard = dashboard
self.training_system = None
self.started = False
def initialize(self) -> bool:
try:
# Import via project root shim to avoid path issues
from enhanced_realtime_training import EnhancedRealtimeTrainingSystem
self.training_system = EnhancedRealtimeTrainingSystem(
orchestrator=self.orchestrator,
data_provider=self.data_provider,
dashboard=self.dashboard
)
return True
except Exception as e:
logger.error(f"UnifiedTrainingManager: failed to initialize training system: {e}")
self.training_system = None
return False
def start(self) -> bool:
try:
if self.training_system is None:
if not self.initialize():
return False
self.training_system.start_training()
self.started = True
logger.info("UnifiedTrainingManager: training started")
return True
except Exception as e:
logger.error(f"UnifiedTrainingManager: error starting training: {e}")
return False
def stop(self) -> bool:
try:
if self.training_system and self.started:
self.training_system.stop_training()
self.started = False
logger.info("UnifiedTrainingManager: training stopped")
return True
except Exception as e:
logger.error(f"UnifiedTrainingManager: error stopping training: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
try:
if self.training_system and hasattr(self.training_system, 'get_training_stats'):
return self.training_system.get_training_stats()
return {}
except Exception:
return {}
_unified_training_manager = None
def get_unified_training_manager(orchestrator=None, data_provider=None, dashboard=None) -> UnifiedTrainingManager:
global _unified_training_manager
if _unified_training_manager is None:
if orchestrator is None or data_provider is None:
raise ValueError("orchestrator and data_provider are required for first-time initialization")
_unified_training_manager = UnifiedTrainingManager(orchestrator, data_provider, dashboard)
return _unified_training_manager