gogo2/core/cnn_monitor.py
2025-05-31 00:47:59 +03:00

614 lines
27 KiB
Python

#!/usr/bin/env python3
"""
CNN Model Monitoring System
This module provides comprehensive monitoring and analytics for CNN models including:
- Real-time prediction tracking and logging
- Training session monitoring
- Performance metrics and visualization
- Prediction confidence analysis
- Model behavior insights
"""
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from collections import deque
import json
import os
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class CNNPrediction:
"""Individual CNN prediction record"""
timestamp: datetime
symbol: str
model_name: str
feature_matrix_shape: Tuple[int, ...]
# Core prediction results
action: int
action_name: str
confidence: float
action_confidence: float
probabilities: List[float]
raw_logits: List[float]
# Enhanced prediction details (if available)
regime_probabilities: Optional[List[float]] = None
volatility_prediction: Optional[float] = None
extrema_prediction: Optional[List[float]] = None
risk_assessment: Optional[List[float]] = None
# Context information
current_price: Optional[float] = None
price_change_1m: Optional[float] = None
price_change_5m: Optional[float] = None
volume_ratio: Optional[float] = None
# Performance tracking
prediction_latency_ms: Optional[float] = None
model_memory_usage_mb: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
return {
'timestamp': self.timestamp.isoformat(),
'symbol': self.symbol,
'model_name': self.model_name,
'feature_matrix_shape': list(self.feature_matrix_shape),
'action': self.action,
'action_name': self.action_name,
'confidence': self.confidence,
'action_confidence': self.action_confidence,
'probabilities': self.probabilities,
'raw_logits': self.raw_logits,
'regime_probabilities': self.regime_probabilities,
'volatility_prediction': self.volatility_prediction,
'extrema_prediction': self.extrema_prediction,
'risk_assessment': self.risk_assessment,
'current_price': self.current_price,
'price_change_1m': self.price_change_1m,
'price_change_5m': self.price_change_5m,
'volume_ratio': self.volume_ratio,
'prediction_latency_ms': self.prediction_latency_ms,
'model_memory_usage_mb': self.model_memory_usage_mb
}
@dataclass
class CNNTrainingSession:
"""CNN training session record"""
session_id: str
model_name: str
start_time: datetime
end_time: Optional[datetime] = None
# Training configuration
learning_rate: float = 0.001
batch_size: int = 32
epochs_planned: int = 100
epochs_completed: int = 0
# Training metrics
train_loss_history: List[float] = field(default_factory=list)
train_accuracy_history: List[float] = field(default_factory=list)
val_loss_history: List[float] = field(default_factory=list)
val_accuracy_history: List[float] = field(default_factory=list)
# Multi-task losses (for enhanced CNN)
confidence_loss_history: List[float] = field(default_factory=list)
regime_loss_history: List[float] = field(default_factory=list)
volatility_loss_history: List[float] = field(default_factory=list)
# Performance metrics
best_train_accuracy: float = 0.0
best_val_accuracy: float = 0.0
total_samples_processed: int = 0
avg_training_time_per_epoch: float = 0.0
# Model checkpoints
checkpoint_paths: List[str] = field(default_factory=list)
best_model_path: Optional[str] = None
def get_duration(self) -> timedelta:
"""Get training session duration"""
end = self.end_time or datetime.now()
return end - self.start_time
def get_current_learning_rate(self) -> float:
"""Get current learning rate (may change during training)"""
return self.learning_rate
def is_active(self) -> bool:
"""Check if training session is still active"""
return self.end_time is None
class CNNMonitor:
"""Comprehensive CNN model monitoring system"""
def __init__(self, max_predictions_history: int = 10000,
max_training_sessions: int = 100,
save_directory: str = "logs/cnn_monitoring"):
self.max_predictions_history = max_predictions_history
self.max_training_sessions = max_training_sessions
self.save_directory = Path(save_directory)
self.save_directory.mkdir(parents=True, exist_ok=True)
# Prediction tracking
self.predictions_history: deque = deque(maxlen=max_predictions_history)
self.predictions_by_symbol: Dict[str, deque] = {}
self.predictions_by_model: Dict[str, deque] = {}
# Training session tracking
self.training_sessions: Dict[str, CNNTrainingSession] = {}
self.active_sessions: List[str] = []
self.completed_sessions: deque = deque(maxlen=max_training_sessions)
# Performance analytics
self.model_performance_stats: Dict[str, Dict[str, Any]] = {}
self.prediction_accuracy_tracking: Dict[str, List[Tuple[datetime, bool]]] = {}
# Real-time monitoring
self.last_prediction_time: Dict[str, datetime] = {}
self.prediction_frequency: Dict[str, float] = {} # predictions per minute
logger.info(f"CNN Monitor initialized - saving to {self.save_directory}")
def log_prediction(self, prediction: CNNPrediction) -> None:
"""Log a new CNN prediction with full details"""
try:
# Add to main history
self.predictions_history.append(prediction)
# Add to symbol-specific history
if prediction.symbol not in self.predictions_by_symbol:
self.predictions_by_symbol[prediction.symbol] = deque(maxlen=1000)
self.predictions_by_symbol[prediction.symbol].append(prediction)
# Add to model-specific history
if prediction.model_name not in self.predictions_by_model:
self.predictions_by_model[prediction.model_name] = deque(maxlen=1000)
self.predictions_by_model[prediction.model_name].append(prediction)
# Update performance stats
self._update_performance_stats(prediction)
# Update frequency tracking
self._update_prediction_frequency(prediction)
# Log prediction details
logger.info(f"CNN Prediction [{prediction.model_name}] {prediction.symbol}: "
f"{prediction.action_name} (confidence: {prediction.confidence:.3f}, "
f"action_conf: {prediction.action_confidence:.3f})")
if prediction.regime_probabilities:
regime_max_idx = np.argmax(prediction.regime_probabilities)
logger.info(f" Regime: {regime_max_idx} (conf: {prediction.regime_probabilities[regime_max_idx]:.3f})")
if prediction.volatility_prediction is not None:
logger.info(f" Volatility: {prediction.volatility_prediction:.3f}")
# Save to disk periodically
if len(self.predictions_history) % 100 == 0:
self._save_predictions_batch()
except Exception as e:
logger.error(f"Error logging CNN prediction: {e}")
def start_training_session(self, session_id: str, model_name: str,
learning_rate: float = 0.001, batch_size: int = 32,
epochs_planned: int = 100) -> CNNTrainingSession:
"""Start a new training session"""
session = CNNTrainingSession(
session_id=session_id,
model_name=model_name,
start_time=datetime.now(),
learning_rate=learning_rate,
batch_size=batch_size,
epochs_planned=epochs_planned
)
self.training_sessions[session_id] = session
self.active_sessions.append(session_id)
logger.info(f"Started CNN training session: {session_id} for model {model_name}")
logger.info(f" LR: {learning_rate}, Batch: {batch_size}, Epochs: {epochs_planned}")
return session
def log_training_step(self, session_id: str, epoch: int,
train_loss: float, train_accuracy: float,
val_loss: Optional[float] = None, val_accuracy: Optional[float] = None,
**additional_losses) -> None:
"""Log training step metrics"""
if session_id not in self.training_sessions:
logger.warning(f"Training session {session_id} not found")
return
session = self.training_sessions[session_id]
session.epochs_completed = epoch
# Update metrics
session.train_loss_history.append(train_loss)
session.train_accuracy_history.append(train_accuracy)
if val_loss is not None:
session.val_loss_history.append(val_loss)
if val_accuracy is not None:
session.val_accuracy_history.append(val_accuracy)
# Update additional losses for enhanced CNN
if 'confidence_loss' in additional_losses:
session.confidence_loss_history.append(additional_losses['confidence_loss'])
if 'regime_loss' in additional_losses:
session.regime_loss_history.append(additional_losses['regime_loss'])
if 'volatility_loss' in additional_losses:
session.volatility_loss_history.append(additional_losses['volatility_loss'])
# Update best metrics
session.best_train_accuracy = max(session.best_train_accuracy, train_accuracy)
if val_accuracy is not None:
session.best_val_accuracy = max(session.best_val_accuracy, val_accuracy)
# Log progress
logger.info(f"Training [{session_id}] Epoch {epoch}: "
f"Loss: {train_loss:.4f}, Acc: {train_accuracy:.4f}")
if val_loss is not None and val_accuracy is not None:
logger.info(f" Validation - Loss: {val_loss:.4f}, Acc: {val_accuracy:.4f}")
def end_training_session(self, session_id: str, final_model_path: Optional[str] = None) -> None:
"""End a training session"""
if session_id not in self.training_sessions:
logger.warning(f"Training session {session_id} not found")
return
session = self.training_sessions[session_id]
session.end_time = datetime.now()
session.best_model_path = final_model_path
# Remove from active sessions
if session_id in self.active_sessions:
self.active_sessions.remove(session_id)
# Add to completed sessions
self.completed_sessions.append(session)
duration = session.get_duration()
logger.info(f"Completed CNN training session: {session_id}")
logger.info(f" Duration: {duration}")
logger.info(f" Epochs: {session.epochs_completed}/{session.epochs_planned}")
logger.info(f" Best train accuracy: {session.best_train_accuracy:.4f}")
logger.info(f" Best val accuracy: {session.best_val_accuracy:.4f}")
# Save session to disk
self._save_training_session(session)
def get_recent_predictions(self, symbol: Optional[str] = None,
model_name: Optional[str] = None,
limit: int = 100) -> List[CNNPrediction]:
"""Get recent predictions with optional filtering"""
if symbol and symbol in self.predictions_by_symbol:
predictions = list(self.predictions_by_symbol[symbol])
elif model_name and model_name in self.predictions_by_model:
predictions = list(self.predictions_by_model[model_name])
else:
predictions = list(self.predictions_history)
# Apply additional filtering
if symbol and not (symbol in self.predictions_by_symbol and symbol):
predictions = [p for p in predictions if p.symbol == symbol]
if model_name and not (model_name in self.predictions_by_model and model_name):
predictions = [p for p in predictions if p.model_name == model_name]
return predictions[-limit:]
def get_prediction_statistics(self, symbol: Optional[str] = None,
model_name: Optional[str] = None,
time_window: timedelta = timedelta(hours=1)) -> Dict[str, Any]:
"""Get prediction statistics for the specified time window"""
cutoff_time = datetime.now() - time_window
predictions = self.get_recent_predictions(symbol, model_name, limit=10000)
# Filter by time window
recent_predictions = [p for p in predictions if p.timestamp >= cutoff_time]
if not recent_predictions:
return {'total_predictions': 0}
# Calculate statistics
confidences = [p.confidence for p in recent_predictions]
action_confidences = [p.action_confidence for p in recent_predictions]
actions = [p.action for p in recent_predictions]
stats = {
'total_predictions': len(recent_predictions),
'time_window_hours': time_window.total_seconds() / 3600,
'predictions_per_hour': len(recent_predictions) / (time_window.total_seconds() / 3600),
'confidence_stats': {
'mean': np.mean(confidences),
'std': np.std(confidences),
'min': np.min(confidences),
'max': np.max(confidences),
'median': np.median(confidences)
},
'action_confidence_stats': {
'mean': np.mean(action_confidences),
'std': np.std(action_confidences),
'min': np.min(action_confidences),
'max': np.max(action_confidences),
'median': np.median(action_confidences)
},
'action_distribution': {
'buy_count': sum(1 for a in actions if a == 0),
'sell_count': sum(1 for a in actions if a == 1),
'buy_percentage': (sum(1 for a in actions if a == 0) / len(actions)) * 100,
'sell_percentage': (sum(1 for a in actions if a == 1) / len(actions)) * 100
}
}
# Add enhanced model statistics if available
enhanced_predictions = [p for p in recent_predictions if p.regime_probabilities is not None]
if enhanced_predictions:
regime_predictions = [np.argmax(p.regime_probabilities) for p in enhanced_predictions]
volatility_predictions = [p.volatility_prediction for p in enhanced_predictions
if p.volatility_prediction is not None]
stats['enhanced_model_stats'] = {
'enhanced_predictions_count': len(enhanced_predictions),
'regime_distribution': {i: regime_predictions.count(i) for i in range(8)},
'volatility_stats': {
'mean': np.mean(volatility_predictions) if volatility_predictions else 0,
'std': np.std(volatility_predictions) if volatility_predictions else 0
} if volatility_predictions else None
}
return stats
def get_active_training_sessions(self) -> List[CNNTrainingSession]:
"""Get all currently active training sessions"""
return [self.training_sessions[sid] for sid in self.active_sessions
if sid in self.training_sessions]
def get_training_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed summary of a training session"""
if session_id not in self.training_sessions:
return None
session = self.training_sessions[session_id]
summary = {
'session_id': session_id,
'model_name': session.model_name,
'start_time': session.start_time.isoformat(),
'end_time': session.end_time.isoformat() if session.end_time else None,
'duration_minutes': session.get_duration().total_seconds() / 60,
'is_active': session.is_active(),
'progress': {
'epochs_completed': session.epochs_completed,
'epochs_planned': session.epochs_planned,
'progress_percentage': (session.epochs_completed / session.epochs_planned) * 100
},
'performance': {
'best_train_accuracy': session.best_train_accuracy,
'best_val_accuracy': session.best_val_accuracy,
'current_train_loss': session.train_loss_history[-1] if session.train_loss_history else None,
'current_train_accuracy': session.train_accuracy_history[-1] if session.train_accuracy_history else None,
'current_val_loss': session.val_loss_history[-1] if session.val_loss_history else None,
'current_val_accuracy': session.val_accuracy_history[-1] if session.val_accuracy_history else None
},
'configuration': {
'learning_rate': session.learning_rate,
'batch_size': session.batch_size
}
}
# Add enhanced model metrics if available
if session.confidence_loss_history:
summary['enhanced_metrics'] = {
'confidence_loss': session.confidence_loss_history[-1] if session.confidence_loss_history else None,
'regime_loss': session.regime_loss_history[-1] if session.regime_loss_history else None,
'volatility_loss': session.volatility_loss_history[-1] if session.volatility_loss_history else None
}
return summary
def _update_performance_stats(self, prediction: CNNPrediction) -> None:
"""Update model performance statistics"""
model_name = prediction.model_name
if model_name not in self.model_performance_stats:
self.model_performance_stats[model_name] = {
'total_predictions': 0,
'confidence_sum': 0.0,
'action_confidence_sum': 0.0,
'last_prediction_time': None,
'prediction_latencies': deque(maxlen=100),
'memory_usage': deque(maxlen=100)
}
stats = self.model_performance_stats[model_name]
stats['total_predictions'] += 1
stats['confidence_sum'] += prediction.confidence
stats['action_confidence_sum'] += prediction.action_confidence
stats['last_prediction_time'] = prediction.timestamp
if prediction.prediction_latency_ms is not None:
stats['prediction_latencies'].append(prediction.prediction_latency_ms)
if prediction.model_memory_usage_mb is not None:
stats['memory_usage'].append(prediction.model_memory_usage_mb)
def _update_prediction_frequency(self, prediction: CNNPrediction) -> None:
"""Update prediction frequency tracking"""
model_name = prediction.model_name
current_time = prediction.timestamp
if model_name in self.last_prediction_time:
time_diff = (current_time - self.last_prediction_time[model_name]).total_seconds()
if time_diff > 0:
freq = 60.0 / time_diff # predictions per minute
self.prediction_frequency[model_name] = freq
self.last_prediction_time[model_name] = current_time
def _save_predictions_batch(self) -> None:
"""Save a batch of predictions to disk"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = self.save_directory / f"cnn_predictions_{timestamp}.json"
# Get last 100 predictions
recent_predictions = list(self.predictions_history)[-100:]
predictions_data = [p.to_dict() for p in recent_predictions]
with open(filename, 'w') as f:
json.dump(predictions_data, f, indent=2)
logger.debug(f"Saved {len(predictions_data)} CNN predictions to {filename}")
except Exception as e:
logger.error(f"Error saving predictions batch: {e}")
def _save_training_session(self, session: CNNTrainingSession) -> None:
"""Save completed training session to disk"""
try:
filename = self.save_directory / f"training_session_{session.session_id}.json"
session_data = {
'session_id': session.session_id,
'model_name': session.model_name,
'start_time': session.start_time.isoformat(),
'end_time': session.end_time.isoformat() if session.end_time else None,
'duration_minutes': session.get_duration().total_seconds() / 60,
'configuration': {
'learning_rate': session.learning_rate,
'batch_size': session.batch_size,
'epochs_planned': session.epochs_planned,
'epochs_completed': session.epochs_completed
},
'metrics': {
'train_loss_history': session.train_loss_history,
'train_accuracy_history': session.train_accuracy_history,
'val_loss_history': session.val_loss_history,
'val_accuracy_history': session.val_accuracy_history,
'confidence_loss_history': session.confidence_loss_history,
'regime_loss_history': session.regime_loss_history,
'volatility_loss_history': session.volatility_loss_history
},
'performance': {
'best_train_accuracy': session.best_train_accuracy,
'best_val_accuracy': session.best_val_accuracy,
'total_samples_processed': session.total_samples_processed
},
'model_info': {
'checkpoint_paths': session.checkpoint_paths,
'best_model_path': session.best_model_path
}
}
with open(filename, 'w') as f:
json.dump(session_data, f, indent=2)
logger.info(f"Saved training session {session.session_id} to {filename}")
except Exception as e:
logger.error(f"Error saving training session: {e}")
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get comprehensive data for dashboard display"""
return {
'recent_predictions': [p.to_dict() for p in list(self.predictions_history)[-50:]],
'active_training_sessions': [self.get_training_session_summary(sid)
for sid in self.active_sessions],
'model_performance': self.model_performance_stats,
'prediction_frequencies': self.prediction_frequency,
'statistics': {
'total_predictions_logged': len(self.predictions_history),
'active_sessions_count': len(self.active_sessions),
'completed_sessions_count': len(self.completed_sessions),
'models_tracked': len(self.model_performance_stats)
}
}
# Global CNN monitor instance
cnn_monitor = CNNMonitor()
def log_cnn_prediction(model_name: str, symbol: str, prediction_result: Dict[str, Any],
feature_matrix_shape: Tuple[int, ...], current_price: Optional[float] = None,
prediction_latency_ms: Optional[float] = None,
model_memory_usage_mb: Optional[float] = None) -> None:
"""
Convenience function to log CNN predictions
Args:
model_name: Name of the CNN model
symbol: Trading symbol (e.g., 'ETH/USDT')
prediction_result: Dictionary with prediction results from model.predict()
feature_matrix_shape: Shape of the input feature matrix
current_price: Current market price
prediction_latency_ms: Time taken for prediction in milliseconds
model_memory_usage_mb: Model memory usage in MB
"""
try:
prediction = CNNPrediction(
timestamp=datetime.now(),
symbol=symbol,
model_name=model_name,
feature_matrix_shape=feature_matrix_shape,
action=prediction_result.get('action', 0),
action_name=prediction_result.get('action_name', 'UNKNOWN'),
confidence=prediction_result.get('confidence', 0.0),
action_confidence=prediction_result.get('action_confidence', 0.0),
probabilities=prediction_result.get('probabilities', []),
raw_logits=prediction_result.get('raw_logits', []),
regime_probabilities=prediction_result.get('regime_probabilities'),
volatility_prediction=prediction_result.get('volatility_prediction'),
current_price=current_price,
prediction_latency_ms=prediction_latency_ms,
model_memory_usage_mb=model_memory_usage_mb
)
cnn_monitor.log_prediction(prediction)
except Exception as e:
logger.error(f"Error logging CNN prediction: {e}")
def start_cnn_training_session(model_name: str, learning_rate: float = 0.001,
batch_size: int = 32, epochs_planned: int = 100) -> str:
"""
Start a new CNN training session
Returns:
session_id: Unique identifier for the training session
"""
session_id = f"{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
cnn_monitor.start_training_session(session_id, model_name, learning_rate, batch_size, epochs_planned)
return session_id
def log_cnn_training_step(session_id: str, epoch: int, train_loss: float, train_accuracy: float,
val_loss: Optional[float] = None, val_accuracy: Optional[float] = None,
**additional_losses) -> None:
"""Log a training step for the specified session"""
cnn_monitor.log_training_step(session_id, epoch, train_loss, train_accuracy,
val_loss, val_accuracy, **additional_losses)
def end_cnn_training_session(session_id: str, final_model_path: Optional[str] = None) -> None:
"""End a CNN training session"""
cnn_monitor.end_training_session(session_id, final_model_path)
def get_cnn_dashboard_data() -> Dict[str, Any]:
"""Get CNN monitoring data for dashboard"""
return cnn_monitor.get_dashboard_data()