614 lines
27 KiB
Python
614 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CNN Model Monitoring System
|
|
|
|
This module provides comprehensive monitoring and analytics for CNN models including:
|
|
- Real-time prediction tracking and logging
|
|
- Training session monitoring
|
|
- Performance metrics and visualization
|
|
- Prediction confidence analysis
|
|
- Model behavior insights
|
|
"""
|
|
|
|
import logging
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class CNNPrediction:
|
|
"""Individual CNN prediction record"""
|
|
timestamp: datetime
|
|
symbol: str
|
|
model_name: str
|
|
feature_matrix_shape: Tuple[int, ...]
|
|
|
|
# Core prediction results
|
|
action: int
|
|
action_name: str
|
|
confidence: float
|
|
action_confidence: float
|
|
probabilities: List[float]
|
|
raw_logits: List[float]
|
|
|
|
# Enhanced prediction details (if available)
|
|
regime_probabilities: Optional[List[float]] = None
|
|
volatility_prediction: Optional[float] = None
|
|
extrema_prediction: Optional[List[float]] = None
|
|
risk_assessment: Optional[List[float]] = None
|
|
|
|
# Context information
|
|
current_price: Optional[float] = None
|
|
price_change_1m: Optional[float] = None
|
|
price_change_5m: Optional[float] = None
|
|
volume_ratio: Optional[float] = None
|
|
|
|
# Performance tracking
|
|
prediction_latency_ms: Optional[float] = None
|
|
model_memory_usage_mb: Optional[float] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for JSON serialization"""
|
|
return {
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'symbol': self.symbol,
|
|
'model_name': self.model_name,
|
|
'feature_matrix_shape': list(self.feature_matrix_shape),
|
|
'action': self.action,
|
|
'action_name': self.action_name,
|
|
'confidence': self.confidence,
|
|
'action_confidence': self.action_confidence,
|
|
'probabilities': self.probabilities,
|
|
'raw_logits': self.raw_logits,
|
|
'regime_probabilities': self.regime_probabilities,
|
|
'volatility_prediction': self.volatility_prediction,
|
|
'extrema_prediction': self.extrema_prediction,
|
|
'risk_assessment': self.risk_assessment,
|
|
'current_price': self.current_price,
|
|
'price_change_1m': self.price_change_1m,
|
|
'price_change_5m': self.price_change_5m,
|
|
'volume_ratio': self.volume_ratio,
|
|
'prediction_latency_ms': self.prediction_latency_ms,
|
|
'model_memory_usage_mb': self.model_memory_usage_mb
|
|
}
|
|
|
|
@dataclass
|
|
class CNNTrainingSession:
|
|
"""CNN training session record"""
|
|
session_id: str
|
|
model_name: str
|
|
start_time: datetime
|
|
end_time: Optional[datetime] = None
|
|
|
|
# Training configuration
|
|
learning_rate: float = 0.001
|
|
batch_size: int = 32
|
|
epochs_planned: int = 100
|
|
epochs_completed: int = 0
|
|
|
|
# Training metrics
|
|
train_loss_history: List[float] = field(default_factory=list)
|
|
train_accuracy_history: List[float] = field(default_factory=list)
|
|
val_loss_history: List[float] = field(default_factory=list)
|
|
val_accuracy_history: List[float] = field(default_factory=list)
|
|
|
|
# Multi-task losses (for enhanced CNN)
|
|
confidence_loss_history: List[float] = field(default_factory=list)
|
|
regime_loss_history: List[float] = field(default_factory=list)
|
|
volatility_loss_history: List[float] = field(default_factory=list)
|
|
|
|
# Performance metrics
|
|
best_train_accuracy: float = 0.0
|
|
best_val_accuracy: float = 0.0
|
|
total_samples_processed: int = 0
|
|
avg_training_time_per_epoch: float = 0.0
|
|
|
|
# Model checkpoints
|
|
checkpoint_paths: List[str] = field(default_factory=list)
|
|
best_model_path: Optional[str] = None
|
|
|
|
def get_duration(self) -> timedelta:
|
|
"""Get training session duration"""
|
|
end = self.end_time or datetime.now()
|
|
return end - self.start_time
|
|
|
|
def get_current_learning_rate(self) -> float:
|
|
"""Get current learning rate (may change during training)"""
|
|
return self.learning_rate
|
|
|
|
def is_active(self) -> bool:
|
|
"""Check if training session is still active"""
|
|
return self.end_time is None
|
|
|
|
class CNNMonitor:
|
|
"""Comprehensive CNN model monitoring system"""
|
|
|
|
def __init__(self, max_predictions_history: int = 10000,
|
|
max_training_sessions: int = 100,
|
|
save_directory: str = "logs/cnn_monitoring"):
|
|
|
|
self.max_predictions_history = max_predictions_history
|
|
self.max_training_sessions = max_training_sessions
|
|
self.save_directory = Path(save_directory)
|
|
self.save_directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Prediction tracking
|
|
self.predictions_history: deque = deque(maxlen=max_predictions_history)
|
|
self.predictions_by_symbol: Dict[str, deque] = {}
|
|
self.predictions_by_model: Dict[str, deque] = {}
|
|
|
|
# Training session tracking
|
|
self.training_sessions: Dict[str, CNNTrainingSession] = {}
|
|
self.active_sessions: List[str] = []
|
|
self.completed_sessions: deque = deque(maxlen=max_training_sessions)
|
|
|
|
# Performance analytics
|
|
self.model_performance_stats: Dict[str, Dict[str, Any]] = {}
|
|
self.prediction_accuracy_tracking: Dict[str, List[Tuple[datetime, bool]]] = {}
|
|
|
|
# Real-time monitoring
|
|
self.last_prediction_time: Dict[str, datetime] = {}
|
|
self.prediction_frequency: Dict[str, float] = {} # predictions per minute
|
|
|
|
logger.info(f"CNN Monitor initialized - saving to {self.save_directory}")
|
|
|
|
def log_prediction(self, prediction: CNNPrediction) -> None:
|
|
"""Log a new CNN prediction with full details"""
|
|
try:
|
|
# Add to main history
|
|
self.predictions_history.append(prediction)
|
|
|
|
# Add to symbol-specific history
|
|
if prediction.symbol not in self.predictions_by_symbol:
|
|
self.predictions_by_symbol[prediction.symbol] = deque(maxlen=1000)
|
|
self.predictions_by_symbol[prediction.symbol].append(prediction)
|
|
|
|
# Add to model-specific history
|
|
if prediction.model_name not in self.predictions_by_model:
|
|
self.predictions_by_model[prediction.model_name] = deque(maxlen=1000)
|
|
self.predictions_by_model[prediction.model_name].append(prediction)
|
|
|
|
# Update performance stats
|
|
self._update_performance_stats(prediction)
|
|
|
|
# Update frequency tracking
|
|
self._update_prediction_frequency(prediction)
|
|
|
|
# Log prediction details
|
|
logger.info(f"CNN Prediction [{prediction.model_name}] {prediction.symbol}: "
|
|
f"{prediction.action_name} (confidence: {prediction.confidence:.3f}, "
|
|
f"action_conf: {prediction.action_confidence:.3f})")
|
|
|
|
if prediction.regime_probabilities:
|
|
regime_max_idx = np.argmax(prediction.regime_probabilities)
|
|
logger.info(f" Regime: {regime_max_idx} (conf: {prediction.regime_probabilities[regime_max_idx]:.3f})")
|
|
|
|
if prediction.volatility_prediction is not None:
|
|
logger.info(f" Volatility: {prediction.volatility_prediction:.3f}")
|
|
|
|
# Save to disk periodically
|
|
if len(self.predictions_history) % 100 == 0:
|
|
self._save_predictions_batch()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error logging CNN prediction: {e}")
|
|
|
|
def start_training_session(self, session_id: str, model_name: str,
|
|
learning_rate: float = 0.001, batch_size: int = 32,
|
|
epochs_planned: int = 100) -> CNNTrainingSession:
|
|
"""Start a new training session"""
|
|
session = CNNTrainingSession(
|
|
session_id=session_id,
|
|
model_name=model_name,
|
|
start_time=datetime.now(),
|
|
learning_rate=learning_rate,
|
|
batch_size=batch_size,
|
|
epochs_planned=epochs_planned
|
|
)
|
|
|
|
self.training_sessions[session_id] = session
|
|
self.active_sessions.append(session_id)
|
|
|
|
logger.info(f"Started CNN training session: {session_id} for model {model_name}")
|
|
logger.info(f" LR: {learning_rate}, Batch: {batch_size}, Epochs: {epochs_planned}")
|
|
|
|
return session
|
|
|
|
def log_training_step(self, session_id: str, epoch: int,
|
|
train_loss: float, train_accuracy: float,
|
|
val_loss: Optional[float] = None, val_accuracy: Optional[float] = None,
|
|
**additional_losses) -> None:
|
|
"""Log training step metrics"""
|
|
if session_id not in self.training_sessions:
|
|
logger.warning(f"Training session {session_id} not found")
|
|
return
|
|
|
|
session = self.training_sessions[session_id]
|
|
session.epochs_completed = epoch
|
|
|
|
# Update metrics
|
|
session.train_loss_history.append(train_loss)
|
|
session.train_accuracy_history.append(train_accuracy)
|
|
|
|
if val_loss is not None:
|
|
session.val_loss_history.append(val_loss)
|
|
if val_accuracy is not None:
|
|
session.val_accuracy_history.append(val_accuracy)
|
|
|
|
# Update additional losses for enhanced CNN
|
|
if 'confidence_loss' in additional_losses:
|
|
session.confidence_loss_history.append(additional_losses['confidence_loss'])
|
|
if 'regime_loss' in additional_losses:
|
|
session.regime_loss_history.append(additional_losses['regime_loss'])
|
|
if 'volatility_loss' in additional_losses:
|
|
session.volatility_loss_history.append(additional_losses['volatility_loss'])
|
|
|
|
# Update best metrics
|
|
session.best_train_accuracy = max(session.best_train_accuracy, train_accuracy)
|
|
if val_accuracy is not None:
|
|
session.best_val_accuracy = max(session.best_val_accuracy, val_accuracy)
|
|
|
|
# Log progress
|
|
logger.info(f"Training [{session_id}] Epoch {epoch}: "
|
|
f"Loss: {train_loss:.4f}, Acc: {train_accuracy:.4f}")
|
|
|
|
if val_loss is not None and val_accuracy is not None:
|
|
logger.info(f" Validation - Loss: {val_loss:.4f}, Acc: {val_accuracy:.4f}")
|
|
|
|
def end_training_session(self, session_id: str, final_model_path: Optional[str] = None) -> None:
|
|
"""End a training session"""
|
|
if session_id not in self.training_sessions:
|
|
logger.warning(f"Training session {session_id} not found")
|
|
return
|
|
|
|
session = self.training_sessions[session_id]
|
|
session.end_time = datetime.now()
|
|
session.best_model_path = final_model_path
|
|
|
|
# Remove from active sessions
|
|
if session_id in self.active_sessions:
|
|
self.active_sessions.remove(session_id)
|
|
|
|
# Add to completed sessions
|
|
self.completed_sessions.append(session)
|
|
|
|
duration = session.get_duration()
|
|
logger.info(f"Completed CNN training session: {session_id}")
|
|
logger.info(f" Duration: {duration}")
|
|
logger.info(f" Epochs: {session.epochs_completed}/{session.epochs_planned}")
|
|
logger.info(f" Best train accuracy: {session.best_train_accuracy:.4f}")
|
|
logger.info(f" Best val accuracy: {session.best_val_accuracy:.4f}")
|
|
|
|
# Save session to disk
|
|
self._save_training_session(session)
|
|
|
|
def get_recent_predictions(self, symbol: Optional[str] = None,
|
|
model_name: Optional[str] = None,
|
|
limit: int = 100) -> List[CNNPrediction]:
|
|
"""Get recent predictions with optional filtering"""
|
|
if symbol and symbol in self.predictions_by_symbol:
|
|
predictions = list(self.predictions_by_symbol[symbol])
|
|
elif model_name and model_name in self.predictions_by_model:
|
|
predictions = list(self.predictions_by_model[model_name])
|
|
else:
|
|
predictions = list(self.predictions_history)
|
|
|
|
# Apply additional filtering
|
|
if symbol and not (symbol in self.predictions_by_symbol and symbol):
|
|
predictions = [p for p in predictions if p.symbol == symbol]
|
|
if model_name and not (model_name in self.predictions_by_model and model_name):
|
|
predictions = [p for p in predictions if p.model_name == model_name]
|
|
|
|
return predictions[-limit:]
|
|
|
|
def get_prediction_statistics(self, symbol: Optional[str] = None,
|
|
model_name: Optional[str] = None,
|
|
time_window: timedelta = timedelta(hours=1)) -> Dict[str, Any]:
|
|
"""Get prediction statistics for the specified time window"""
|
|
cutoff_time = datetime.now() - time_window
|
|
predictions = self.get_recent_predictions(symbol, model_name, limit=10000)
|
|
|
|
# Filter by time window
|
|
recent_predictions = [p for p in predictions if p.timestamp >= cutoff_time]
|
|
|
|
if not recent_predictions:
|
|
return {'total_predictions': 0}
|
|
|
|
# Calculate statistics
|
|
confidences = [p.confidence for p in recent_predictions]
|
|
action_confidences = [p.action_confidence for p in recent_predictions]
|
|
actions = [p.action for p in recent_predictions]
|
|
|
|
stats = {
|
|
'total_predictions': len(recent_predictions),
|
|
'time_window_hours': time_window.total_seconds() / 3600,
|
|
'predictions_per_hour': len(recent_predictions) / (time_window.total_seconds() / 3600),
|
|
|
|
'confidence_stats': {
|
|
'mean': np.mean(confidences),
|
|
'std': np.std(confidences),
|
|
'min': np.min(confidences),
|
|
'max': np.max(confidences),
|
|
'median': np.median(confidences)
|
|
},
|
|
|
|
'action_confidence_stats': {
|
|
'mean': np.mean(action_confidences),
|
|
'std': np.std(action_confidences),
|
|
'min': np.min(action_confidences),
|
|
'max': np.max(action_confidences),
|
|
'median': np.median(action_confidences)
|
|
},
|
|
|
|
'action_distribution': {
|
|
'buy_count': sum(1 for a in actions if a == 0),
|
|
'sell_count': sum(1 for a in actions if a == 1),
|
|
'buy_percentage': (sum(1 for a in actions if a == 0) / len(actions)) * 100,
|
|
'sell_percentage': (sum(1 for a in actions if a == 1) / len(actions)) * 100
|
|
}
|
|
}
|
|
|
|
# Add enhanced model statistics if available
|
|
enhanced_predictions = [p for p in recent_predictions if p.regime_probabilities is not None]
|
|
if enhanced_predictions:
|
|
regime_predictions = [np.argmax(p.regime_probabilities) for p in enhanced_predictions]
|
|
volatility_predictions = [p.volatility_prediction for p in enhanced_predictions
|
|
if p.volatility_prediction is not None]
|
|
|
|
stats['enhanced_model_stats'] = {
|
|
'enhanced_predictions_count': len(enhanced_predictions),
|
|
'regime_distribution': {i: regime_predictions.count(i) for i in range(8)},
|
|
'volatility_stats': {
|
|
'mean': np.mean(volatility_predictions) if volatility_predictions else 0,
|
|
'std': np.std(volatility_predictions) if volatility_predictions else 0
|
|
} if volatility_predictions else None
|
|
}
|
|
|
|
return stats
|
|
|
|
def get_active_training_sessions(self) -> List[CNNTrainingSession]:
|
|
"""Get all currently active training sessions"""
|
|
return [self.training_sessions[sid] for sid in self.active_sessions
|
|
if sid in self.training_sessions]
|
|
|
|
def get_training_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get detailed summary of a training session"""
|
|
if session_id not in self.training_sessions:
|
|
return None
|
|
|
|
session = self.training_sessions[session_id]
|
|
|
|
summary = {
|
|
'session_id': session_id,
|
|
'model_name': session.model_name,
|
|
'start_time': session.start_time.isoformat(),
|
|
'end_time': session.end_time.isoformat() if session.end_time else None,
|
|
'duration_minutes': session.get_duration().total_seconds() / 60,
|
|
'is_active': session.is_active(),
|
|
|
|
'progress': {
|
|
'epochs_completed': session.epochs_completed,
|
|
'epochs_planned': session.epochs_planned,
|
|
'progress_percentage': (session.epochs_completed / session.epochs_planned) * 100
|
|
},
|
|
|
|
'performance': {
|
|
'best_train_accuracy': session.best_train_accuracy,
|
|
'best_val_accuracy': session.best_val_accuracy,
|
|
'current_train_loss': session.train_loss_history[-1] if session.train_loss_history else None,
|
|
'current_train_accuracy': session.train_accuracy_history[-1] if session.train_accuracy_history else None,
|
|
'current_val_loss': session.val_loss_history[-1] if session.val_loss_history else None,
|
|
'current_val_accuracy': session.val_accuracy_history[-1] if session.val_accuracy_history else None
|
|
},
|
|
|
|
'configuration': {
|
|
'learning_rate': session.learning_rate,
|
|
'batch_size': session.batch_size
|
|
}
|
|
}
|
|
|
|
# Add enhanced model metrics if available
|
|
if session.confidence_loss_history:
|
|
summary['enhanced_metrics'] = {
|
|
'confidence_loss': session.confidence_loss_history[-1] if session.confidence_loss_history else None,
|
|
'regime_loss': session.regime_loss_history[-1] if session.regime_loss_history else None,
|
|
'volatility_loss': session.volatility_loss_history[-1] if session.volatility_loss_history else None
|
|
}
|
|
|
|
return summary
|
|
|
|
def _update_performance_stats(self, prediction: CNNPrediction) -> None:
|
|
"""Update model performance statistics"""
|
|
model_name = prediction.model_name
|
|
|
|
if model_name not in self.model_performance_stats:
|
|
self.model_performance_stats[model_name] = {
|
|
'total_predictions': 0,
|
|
'confidence_sum': 0.0,
|
|
'action_confidence_sum': 0.0,
|
|
'last_prediction_time': None,
|
|
'prediction_latencies': deque(maxlen=100),
|
|
'memory_usage': deque(maxlen=100)
|
|
}
|
|
|
|
stats = self.model_performance_stats[model_name]
|
|
stats['total_predictions'] += 1
|
|
stats['confidence_sum'] += prediction.confidence
|
|
stats['action_confidence_sum'] += prediction.action_confidence
|
|
stats['last_prediction_time'] = prediction.timestamp
|
|
|
|
if prediction.prediction_latency_ms is not None:
|
|
stats['prediction_latencies'].append(prediction.prediction_latency_ms)
|
|
|
|
if prediction.model_memory_usage_mb is not None:
|
|
stats['memory_usage'].append(prediction.model_memory_usage_mb)
|
|
|
|
def _update_prediction_frequency(self, prediction: CNNPrediction) -> None:
|
|
"""Update prediction frequency tracking"""
|
|
model_name = prediction.model_name
|
|
current_time = prediction.timestamp
|
|
|
|
if model_name in self.last_prediction_time:
|
|
time_diff = (current_time - self.last_prediction_time[model_name]).total_seconds()
|
|
if time_diff > 0:
|
|
freq = 60.0 / time_diff # predictions per minute
|
|
self.prediction_frequency[model_name] = freq
|
|
|
|
self.last_prediction_time[model_name] = current_time
|
|
|
|
def _save_predictions_batch(self) -> None:
|
|
"""Save a batch of predictions to disk"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = self.save_directory / f"cnn_predictions_{timestamp}.json"
|
|
|
|
# Get last 100 predictions
|
|
recent_predictions = list(self.predictions_history)[-100:]
|
|
predictions_data = [p.to_dict() for p in recent_predictions]
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(predictions_data, f, indent=2)
|
|
|
|
logger.debug(f"Saved {len(predictions_data)} CNN predictions to {filename}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving predictions batch: {e}")
|
|
|
|
def _save_training_session(self, session: CNNTrainingSession) -> None:
|
|
"""Save completed training session to disk"""
|
|
try:
|
|
filename = self.save_directory / f"training_session_{session.session_id}.json"
|
|
|
|
session_data = {
|
|
'session_id': session.session_id,
|
|
'model_name': session.model_name,
|
|
'start_time': session.start_time.isoformat(),
|
|
'end_time': session.end_time.isoformat() if session.end_time else None,
|
|
'duration_minutes': session.get_duration().total_seconds() / 60,
|
|
'configuration': {
|
|
'learning_rate': session.learning_rate,
|
|
'batch_size': session.batch_size,
|
|
'epochs_planned': session.epochs_planned,
|
|
'epochs_completed': session.epochs_completed
|
|
},
|
|
'metrics': {
|
|
'train_loss_history': session.train_loss_history,
|
|
'train_accuracy_history': session.train_accuracy_history,
|
|
'val_loss_history': session.val_loss_history,
|
|
'val_accuracy_history': session.val_accuracy_history,
|
|
'confidence_loss_history': session.confidence_loss_history,
|
|
'regime_loss_history': session.regime_loss_history,
|
|
'volatility_loss_history': session.volatility_loss_history
|
|
},
|
|
'performance': {
|
|
'best_train_accuracy': session.best_train_accuracy,
|
|
'best_val_accuracy': session.best_val_accuracy,
|
|
'total_samples_processed': session.total_samples_processed
|
|
},
|
|
'model_info': {
|
|
'checkpoint_paths': session.checkpoint_paths,
|
|
'best_model_path': session.best_model_path
|
|
}
|
|
}
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(session_data, f, indent=2)
|
|
|
|
logger.info(f"Saved training session {session.session_id} to {filename}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving training session: {e}")
|
|
|
|
def get_dashboard_data(self) -> Dict[str, Any]:
|
|
"""Get comprehensive data for dashboard display"""
|
|
return {
|
|
'recent_predictions': [p.to_dict() for p in list(self.predictions_history)[-50:]],
|
|
'active_training_sessions': [self.get_training_session_summary(sid)
|
|
for sid in self.active_sessions],
|
|
'model_performance': self.model_performance_stats,
|
|
'prediction_frequencies': self.prediction_frequency,
|
|
'statistics': {
|
|
'total_predictions_logged': len(self.predictions_history),
|
|
'active_sessions_count': len(self.active_sessions),
|
|
'completed_sessions_count': len(self.completed_sessions),
|
|
'models_tracked': len(self.model_performance_stats)
|
|
}
|
|
}
|
|
|
|
# Global CNN monitor instance
|
|
cnn_monitor = CNNMonitor()
|
|
|
|
def log_cnn_prediction(model_name: str, symbol: str, prediction_result: Dict[str, Any],
|
|
feature_matrix_shape: Tuple[int, ...], current_price: Optional[float] = None,
|
|
prediction_latency_ms: Optional[float] = None,
|
|
model_memory_usage_mb: Optional[float] = None) -> None:
|
|
"""
|
|
Convenience function to log CNN predictions
|
|
|
|
Args:
|
|
model_name: Name of the CNN model
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
prediction_result: Dictionary with prediction results from model.predict()
|
|
feature_matrix_shape: Shape of the input feature matrix
|
|
current_price: Current market price
|
|
prediction_latency_ms: Time taken for prediction in milliseconds
|
|
model_memory_usage_mb: Model memory usage in MB
|
|
"""
|
|
try:
|
|
prediction = CNNPrediction(
|
|
timestamp=datetime.now(),
|
|
symbol=symbol,
|
|
model_name=model_name,
|
|
feature_matrix_shape=feature_matrix_shape,
|
|
action=prediction_result.get('action', 0),
|
|
action_name=prediction_result.get('action_name', 'UNKNOWN'),
|
|
confidence=prediction_result.get('confidence', 0.0),
|
|
action_confidence=prediction_result.get('action_confidence', 0.0),
|
|
probabilities=prediction_result.get('probabilities', []),
|
|
raw_logits=prediction_result.get('raw_logits', []),
|
|
regime_probabilities=prediction_result.get('regime_probabilities'),
|
|
volatility_prediction=prediction_result.get('volatility_prediction'),
|
|
current_price=current_price,
|
|
prediction_latency_ms=prediction_latency_ms,
|
|
model_memory_usage_mb=model_memory_usage_mb
|
|
)
|
|
|
|
cnn_monitor.log_prediction(prediction)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error logging CNN prediction: {e}")
|
|
|
|
def start_cnn_training_session(model_name: str, learning_rate: float = 0.001,
|
|
batch_size: int = 32, epochs_planned: int = 100) -> str:
|
|
"""
|
|
Start a new CNN training session
|
|
|
|
Returns:
|
|
session_id: Unique identifier for the training session
|
|
"""
|
|
session_id = f"{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
cnn_monitor.start_training_session(session_id, model_name, learning_rate, batch_size, epochs_planned)
|
|
return session_id
|
|
|
|
def log_cnn_training_step(session_id: str, epoch: int, train_loss: float, train_accuracy: float,
|
|
val_loss: Optional[float] = None, val_accuracy: Optional[float] = None,
|
|
**additional_losses) -> None:
|
|
"""Log a training step for the specified session"""
|
|
cnn_monitor.log_training_step(session_id, epoch, train_loss, train_accuracy,
|
|
val_loss, val_accuracy, **additional_losses)
|
|
|
|
def end_cnn_training_session(session_id: str, final_model_path: Optional[str] = None) -> None:
|
|
"""End a CNN training session"""
|
|
cnn_monitor.end_training_session(session_id, final_model_path)
|
|
|
|
def get_cnn_dashboard_data() -> Dict[str, Any]:
|
|
"""Get CNN monitoring data for dashboard"""
|
|
return cnn_monitor.get_dashboard_data() |