diff --git a/core/cnn_monitor.py b/core/cnn_monitor.py new file mode 100644 index 0000000..224c4a9 --- /dev/null +++ b/core/cnn_monitor.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python3 +""" +CNN Model Monitoring System + +This module provides comprehensive monitoring and analytics for CNN models including: +- Real-time prediction tracking and logging +- Training session monitoring +- Performance metrics and visualization +- Prediction confidence analysis +- Model behavior insights +""" + +import logging +import numpy as np +import pandas as pd +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Tuple +from dataclasses import dataclass, field +from collections import deque +import json +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + +@dataclass +class CNNPrediction: + """Individual CNN prediction record""" + timestamp: datetime + symbol: str + model_name: str + feature_matrix_shape: Tuple[int, ...] + + # Core prediction results + action: int + action_name: str + confidence: float + action_confidence: float + probabilities: List[float] + raw_logits: List[float] + + # Enhanced prediction details (if available) + regime_probabilities: Optional[List[float]] = None + volatility_prediction: Optional[float] = None + extrema_prediction: Optional[List[float]] = None + risk_assessment: Optional[List[float]] = None + + # Context information + current_price: Optional[float] = None + price_change_1m: Optional[float] = None + price_change_5m: Optional[float] = None + volume_ratio: Optional[float] = None + + # Performance tracking + prediction_latency_ms: Optional[float] = None + model_memory_usage_mb: Optional[float] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + return { + 'timestamp': self.timestamp.isoformat(), + 'symbol': self.symbol, + 'model_name': self.model_name, + 'feature_matrix_shape': list(self.feature_matrix_shape), + 'action': self.action, + 'action_name': self.action_name, + 'confidence': self.confidence, + 'action_confidence': self.action_confidence, + 'probabilities': self.probabilities, + 'raw_logits': self.raw_logits, + 'regime_probabilities': self.regime_probabilities, + 'volatility_prediction': self.volatility_prediction, + 'extrema_prediction': self.extrema_prediction, + 'risk_assessment': self.risk_assessment, + 'current_price': self.current_price, + 'price_change_1m': self.price_change_1m, + 'price_change_5m': self.price_change_5m, + 'volume_ratio': self.volume_ratio, + 'prediction_latency_ms': self.prediction_latency_ms, + 'model_memory_usage_mb': self.model_memory_usage_mb + } + +@dataclass +class CNNTrainingSession: + """CNN training session record""" + session_id: str + model_name: str + start_time: datetime + end_time: Optional[datetime] = None + + # Training configuration + learning_rate: float = 0.001 + batch_size: int = 32 + epochs_planned: int = 100 + epochs_completed: int = 0 + + # Training metrics + train_loss_history: List[float] = field(default_factory=list) + train_accuracy_history: List[float] = field(default_factory=list) + val_loss_history: List[float] = field(default_factory=list) + val_accuracy_history: List[float] = field(default_factory=list) + + # Multi-task losses (for enhanced CNN) + confidence_loss_history: List[float] = field(default_factory=list) + regime_loss_history: List[float] = field(default_factory=list) + volatility_loss_history: List[float] = field(default_factory=list) + + # Performance metrics + best_train_accuracy: float = 0.0 + best_val_accuracy: float = 0.0 + total_samples_processed: int = 0 + avg_training_time_per_epoch: float = 0.0 + + # Model checkpoints + checkpoint_paths: List[str] = field(default_factory=list) + best_model_path: Optional[str] = None + + def get_duration(self) -> timedelta: + """Get training session duration""" + end = self.end_time or datetime.now() + return end - self.start_time + + def get_current_learning_rate(self) -> float: + """Get current learning rate (may change during training)""" + return self.learning_rate + + def is_active(self) -> bool: + """Check if training session is still active""" + return self.end_time is None + +class CNNMonitor: + """Comprehensive CNN model monitoring system""" + + def __init__(self, max_predictions_history: int = 10000, + max_training_sessions: int = 100, + save_directory: str = "logs/cnn_monitoring"): + + self.max_predictions_history = max_predictions_history + self.max_training_sessions = max_training_sessions + self.save_directory = Path(save_directory) + self.save_directory.mkdir(parents=True, exist_ok=True) + + # Prediction tracking + self.predictions_history: deque = deque(maxlen=max_predictions_history) + self.predictions_by_symbol: Dict[str, deque] = {} + self.predictions_by_model: Dict[str, deque] = {} + + # Training session tracking + self.training_sessions: Dict[str, CNNTrainingSession] = {} + self.active_sessions: List[str] = [] + self.completed_sessions: deque = deque(maxlen=max_training_sessions) + + # Performance analytics + self.model_performance_stats: Dict[str, Dict[str, Any]] = {} + self.prediction_accuracy_tracking: Dict[str, List[Tuple[datetime, bool]]] = {} + + # Real-time monitoring + self.last_prediction_time: Dict[str, datetime] = {} + self.prediction_frequency: Dict[str, float] = {} # predictions per minute + + logger.info(f"CNN Monitor initialized - saving to {self.save_directory}") + + def log_prediction(self, prediction: CNNPrediction) -> None: + """Log a new CNN prediction with full details""" + try: + # Add to main history + self.predictions_history.append(prediction) + + # Add to symbol-specific history + if prediction.symbol not in self.predictions_by_symbol: + self.predictions_by_symbol[prediction.symbol] = deque(maxlen=1000) + self.predictions_by_symbol[prediction.symbol].append(prediction) + + # Add to model-specific history + if prediction.model_name not in self.predictions_by_model: + self.predictions_by_model[prediction.model_name] = deque(maxlen=1000) + self.predictions_by_model[prediction.model_name].append(prediction) + + # Update performance stats + self._update_performance_stats(prediction) + + # Update frequency tracking + self._update_prediction_frequency(prediction) + + # Log prediction details + logger.info(f"CNN Prediction [{prediction.model_name}] {prediction.symbol}: " + f"{prediction.action_name} (confidence: {prediction.confidence:.3f}, " + f"action_conf: {prediction.action_confidence:.3f})") + + if prediction.regime_probabilities: + regime_max_idx = np.argmax(prediction.regime_probabilities) + logger.info(f" Regime: {regime_max_idx} (conf: {prediction.regime_probabilities[regime_max_idx]:.3f})") + + if prediction.volatility_prediction is not None: + logger.info(f" Volatility: {prediction.volatility_prediction:.3f}") + + # Save to disk periodically + if len(self.predictions_history) % 100 == 0: + self._save_predictions_batch() + + except Exception as e: + logger.error(f"Error logging CNN prediction: {e}") + + def start_training_session(self, session_id: str, model_name: str, + learning_rate: float = 0.001, batch_size: int = 32, + epochs_planned: int = 100) -> CNNTrainingSession: + """Start a new training session""" + session = CNNTrainingSession( + session_id=session_id, + model_name=model_name, + start_time=datetime.now(), + learning_rate=learning_rate, + batch_size=batch_size, + epochs_planned=epochs_planned + ) + + self.training_sessions[session_id] = session + self.active_sessions.append(session_id) + + logger.info(f"Started CNN training session: {session_id} for model {model_name}") + logger.info(f" LR: {learning_rate}, Batch: {batch_size}, Epochs: {epochs_planned}") + + return session + + def log_training_step(self, session_id: str, epoch: int, + train_loss: float, train_accuracy: float, + val_loss: Optional[float] = None, val_accuracy: Optional[float] = None, + **additional_losses) -> None: + """Log training step metrics""" + if session_id not in self.training_sessions: + logger.warning(f"Training session {session_id} not found") + return + + session = self.training_sessions[session_id] + session.epochs_completed = epoch + + # Update metrics + session.train_loss_history.append(train_loss) + session.train_accuracy_history.append(train_accuracy) + + if val_loss is not None: + session.val_loss_history.append(val_loss) + if val_accuracy is not None: + session.val_accuracy_history.append(val_accuracy) + + # Update additional losses for enhanced CNN + if 'confidence_loss' in additional_losses: + session.confidence_loss_history.append(additional_losses['confidence_loss']) + if 'regime_loss' in additional_losses: + session.regime_loss_history.append(additional_losses['regime_loss']) + if 'volatility_loss' in additional_losses: + session.volatility_loss_history.append(additional_losses['volatility_loss']) + + # Update best metrics + session.best_train_accuracy = max(session.best_train_accuracy, train_accuracy) + if val_accuracy is not None: + session.best_val_accuracy = max(session.best_val_accuracy, val_accuracy) + + # Log progress + logger.info(f"Training [{session_id}] Epoch {epoch}: " + f"Loss: {train_loss:.4f}, Acc: {train_accuracy:.4f}") + + if val_loss is not None and val_accuracy is not None: + logger.info(f" Validation - Loss: {val_loss:.4f}, Acc: {val_accuracy:.4f}") + + def end_training_session(self, session_id: str, final_model_path: Optional[str] = None) -> None: + """End a training session""" + if session_id not in self.training_sessions: + logger.warning(f"Training session {session_id} not found") + return + + session = self.training_sessions[session_id] + session.end_time = datetime.now() + session.best_model_path = final_model_path + + # Remove from active sessions + if session_id in self.active_sessions: + self.active_sessions.remove(session_id) + + # Add to completed sessions + self.completed_sessions.append(session) + + duration = session.get_duration() + logger.info(f"Completed CNN training session: {session_id}") + logger.info(f" Duration: {duration}") + logger.info(f" Epochs: {session.epochs_completed}/{session.epochs_planned}") + logger.info(f" Best train accuracy: {session.best_train_accuracy:.4f}") + logger.info(f" Best val accuracy: {session.best_val_accuracy:.4f}") + + # Save session to disk + self._save_training_session(session) + + def get_recent_predictions(self, symbol: Optional[str] = None, + model_name: Optional[str] = None, + limit: int = 100) -> List[CNNPrediction]: + """Get recent predictions with optional filtering""" + if symbol and symbol in self.predictions_by_symbol: + predictions = list(self.predictions_by_symbol[symbol]) + elif model_name and model_name in self.predictions_by_model: + predictions = list(self.predictions_by_model[model_name]) + else: + predictions = list(self.predictions_history) + + # Apply additional filtering + if symbol and not (symbol in self.predictions_by_symbol and symbol): + predictions = [p for p in predictions if p.symbol == symbol] + if model_name and not (model_name in self.predictions_by_model and model_name): + predictions = [p for p in predictions if p.model_name == model_name] + + return predictions[-limit:] + + def get_prediction_statistics(self, symbol: Optional[str] = None, + model_name: Optional[str] = None, + time_window: timedelta = timedelta(hours=1)) -> Dict[str, Any]: + """Get prediction statistics for the specified time window""" + cutoff_time = datetime.now() - time_window + predictions = self.get_recent_predictions(symbol, model_name, limit=10000) + + # Filter by time window + recent_predictions = [p for p in predictions if p.timestamp >= cutoff_time] + + if not recent_predictions: + return {'total_predictions': 0} + + # Calculate statistics + confidences = [p.confidence for p in recent_predictions] + action_confidences = [p.action_confidence for p in recent_predictions] + actions = [p.action for p in recent_predictions] + + stats = { + 'total_predictions': len(recent_predictions), + 'time_window_hours': time_window.total_seconds() / 3600, + 'predictions_per_hour': len(recent_predictions) / (time_window.total_seconds() / 3600), + + 'confidence_stats': { + 'mean': np.mean(confidences), + 'std': np.std(confidences), + 'min': np.min(confidences), + 'max': np.max(confidences), + 'median': np.median(confidences) + }, + + 'action_confidence_stats': { + 'mean': np.mean(action_confidences), + 'std': np.std(action_confidences), + 'min': np.min(action_confidences), + 'max': np.max(action_confidences), + 'median': np.median(action_confidences) + }, + + 'action_distribution': { + 'buy_count': sum(1 for a in actions if a == 0), + 'sell_count': sum(1 for a in actions if a == 1), + 'buy_percentage': (sum(1 for a in actions if a == 0) / len(actions)) * 100, + 'sell_percentage': (sum(1 for a in actions if a == 1) / len(actions)) * 100 + } + } + + # Add enhanced model statistics if available + enhanced_predictions = [p for p in recent_predictions if p.regime_probabilities is not None] + if enhanced_predictions: + regime_predictions = [np.argmax(p.regime_probabilities) for p in enhanced_predictions] + volatility_predictions = [p.volatility_prediction for p in enhanced_predictions + if p.volatility_prediction is not None] + + stats['enhanced_model_stats'] = { + 'enhanced_predictions_count': len(enhanced_predictions), + 'regime_distribution': {i: regime_predictions.count(i) for i in range(8)}, + 'volatility_stats': { + 'mean': np.mean(volatility_predictions) if volatility_predictions else 0, + 'std': np.std(volatility_predictions) if volatility_predictions else 0 + } if volatility_predictions else None + } + + return stats + + def get_active_training_sessions(self) -> List[CNNTrainingSession]: + """Get all currently active training sessions""" + return [self.training_sessions[sid] for sid in self.active_sessions + if sid in self.training_sessions] + + def get_training_session_summary(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get detailed summary of a training session""" + if session_id not in self.training_sessions: + return None + + session = self.training_sessions[session_id] + + summary = { + 'session_id': session_id, + 'model_name': session.model_name, + 'start_time': session.start_time.isoformat(), + 'end_time': session.end_time.isoformat() if session.end_time else None, + 'duration_minutes': session.get_duration().total_seconds() / 60, + 'is_active': session.is_active(), + + 'progress': { + 'epochs_completed': session.epochs_completed, + 'epochs_planned': session.epochs_planned, + 'progress_percentage': (session.epochs_completed / session.epochs_planned) * 100 + }, + + 'performance': { + 'best_train_accuracy': session.best_train_accuracy, + 'best_val_accuracy': session.best_val_accuracy, + 'current_train_loss': session.train_loss_history[-1] if session.train_loss_history else None, + 'current_train_accuracy': session.train_accuracy_history[-1] if session.train_accuracy_history else None, + 'current_val_loss': session.val_loss_history[-1] if session.val_loss_history else None, + 'current_val_accuracy': session.val_accuracy_history[-1] if session.val_accuracy_history else None + }, + + 'configuration': { + 'learning_rate': session.learning_rate, + 'batch_size': session.batch_size + } + } + + # Add enhanced model metrics if available + if session.confidence_loss_history: + summary['enhanced_metrics'] = { + 'confidence_loss': session.confidence_loss_history[-1] if session.confidence_loss_history else None, + 'regime_loss': session.regime_loss_history[-1] if session.regime_loss_history else None, + 'volatility_loss': session.volatility_loss_history[-1] if session.volatility_loss_history else None + } + + return summary + + def _update_performance_stats(self, prediction: CNNPrediction) -> None: + """Update model performance statistics""" + model_name = prediction.model_name + + if model_name not in self.model_performance_stats: + self.model_performance_stats[model_name] = { + 'total_predictions': 0, + 'confidence_sum': 0.0, + 'action_confidence_sum': 0.0, + 'last_prediction_time': None, + 'prediction_latencies': deque(maxlen=100), + 'memory_usage': deque(maxlen=100) + } + + stats = self.model_performance_stats[model_name] + stats['total_predictions'] += 1 + stats['confidence_sum'] += prediction.confidence + stats['action_confidence_sum'] += prediction.action_confidence + stats['last_prediction_time'] = prediction.timestamp + + if prediction.prediction_latency_ms is not None: + stats['prediction_latencies'].append(prediction.prediction_latency_ms) + + if prediction.model_memory_usage_mb is not None: + stats['memory_usage'].append(prediction.model_memory_usage_mb) + + def _update_prediction_frequency(self, prediction: CNNPrediction) -> None: + """Update prediction frequency tracking""" + model_name = prediction.model_name + current_time = prediction.timestamp + + if model_name in self.last_prediction_time: + time_diff = (current_time - self.last_prediction_time[model_name]).total_seconds() + if time_diff > 0: + freq = 60.0 / time_diff # predictions per minute + self.prediction_frequency[model_name] = freq + + self.last_prediction_time[model_name] = current_time + + def _save_predictions_batch(self) -> None: + """Save a batch of predictions to disk""" + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = self.save_directory / f"cnn_predictions_{timestamp}.json" + + # Get last 100 predictions + recent_predictions = list(self.predictions_history)[-100:] + predictions_data = [p.to_dict() for p in recent_predictions] + + with open(filename, 'w') as f: + json.dump(predictions_data, f, indent=2) + + logger.debug(f"Saved {len(predictions_data)} CNN predictions to {filename}") + + except Exception as e: + logger.error(f"Error saving predictions batch: {e}") + + def _save_training_session(self, session: CNNTrainingSession) -> None: + """Save completed training session to disk""" + try: + filename = self.save_directory / f"training_session_{session.session_id}.json" + + session_data = { + 'session_id': session.session_id, + 'model_name': session.model_name, + 'start_time': session.start_time.isoformat(), + 'end_time': session.end_time.isoformat() if session.end_time else None, + 'duration_minutes': session.get_duration().total_seconds() / 60, + 'configuration': { + 'learning_rate': session.learning_rate, + 'batch_size': session.batch_size, + 'epochs_planned': session.epochs_planned, + 'epochs_completed': session.epochs_completed + }, + 'metrics': { + 'train_loss_history': session.train_loss_history, + 'train_accuracy_history': session.train_accuracy_history, + 'val_loss_history': session.val_loss_history, + 'val_accuracy_history': session.val_accuracy_history, + 'confidence_loss_history': session.confidence_loss_history, + 'regime_loss_history': session.regime_loss_history, + 'volatility_loss_history': session.volatility_loss_history + }, + 'performance': { + 'best_train_accuracy': session.best_train_accuracy, + 'best_val_accuracy': session.best_val_accuracy, + 'total_samples_processed': session.total_samples_processed + }, + 'model_info': { + 'checkpoint_paths': session.checkpoint_paths, + 'best_model_path': session.best_model_path + } + } + + with open(filename, 'w') as f: + json.dump(session_data, f, indent=2) + + logger.info(f"Saved training session {session.session_id} to {filename}") + + except Exception as e: + logger.error(f"Error saving training session: {e}") + + def get_dashboard_data(self) -> Dict[str, Any]: + """Get comprehensive data for dashboard display""" + return { + 'recent_predictions': [p.to_dict() for p in list(self.predictions_history)[-50:]], + 'active_training_sessions': [self.get_training_session_summary(sid) + for sid in self.active_sessions], + 'model_performance': self.model_performance_stats, + 'prediction_frequencies': self.prediction_frequency, + 'statistics': { + 'total_predictions_logged': len(self.predictions_history), + 'active_sessions_count': len(self.active_sessions), + 'completed_sessions_count': len(self.completed_sessions), + 'models_tracked': len(self.model_performance_stats) + } + } + +# Global CNN monitor instance +cnn_monitor = CNNMonitor() + +def log_cnn_prediction(model_name: str, symbol: str, prediction_result: Dict[str, Any], + feature_matrix_shape: Tuple[int, ...], current_price: Optional[float] = None, + prediction_latency_ms: Optional[float] = None, + model_memory_usage_mb: Optional[float] = None) -> None: + """ + Convenience function to log CNN predictions + + Args: + model_name: Name of the CNN model + symbol: Trading symbol (e.g., 'ETH/USDT') + prediction_result: Dictionary with prediction results from model.predict() + feature_matrix_shape: Shape of the input feature matrix + current_price: Current market price + prediction_latency_ms: Time taken for prediction in milliseconds + model_memory_usage_mb: Model memory usage in MB + """ + try: + prediction = CNNPrediction( + timestamp=datetime.now(), + symbol=symbol, + model_name=model_name, + feature_matrix_shape=feature_matrix_shape, + action=prediction_result.get('action', 0), + action_name=prediction_result.get('action_name', 'UNKNOWN'), + confidence=prediction_result.get('confidence', 0.0), + action_confidence=prediction_result.get('action_confidence', 0.0), + probabilities=prediction_result.get('probabilities', []), + raw_logits=prediction_result.get('raw_logits', []), + regime_probabilities=prediction_result.get('regime_probabilities'), + volatility_prediction=prediction_result.get('volatility_prediction'), + current_price=current_price, + prediction_latency_ms=prediction_latency_ms, + model_memory_usage_mb=model_memory_usage_mb + ) + + cnn_monitor.log_prediction(prediction) + + except Exception as e: + logger.error(f"Error logging CNN prediction: {e}") + +def start_cnn_training_session(model_name: str, learning_rate: float = 0.001, + batch_size: int = 32, epochs_planned: int = 100) -> str: + """ + Start a new CNN training session + + Returns: + session_id: Unique identifier for the training session + """ + session_id = f"{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + cnn_monitor.start_training_session(session_id, model_name, learning_rate, batch_size, epochs_planned) + return session_id + +def log_cnn_training_step(session_id: str, epoch: int, train_loss: float, train_accuracy: float, + val_loss: Optional[float] = None, val_accuracy: Optional[float] = None, + **additional_losses) -> None: + """Log a training step for the specified session""" + cnn_monitor.log_training_step(session_id, epoch, train_loss, train_accuracy, + val_loss, val_accuracy, **additional_losses) + +def end_cnn_training_session(session_id: str, final_model_path: Optional[str] = None) -> None: + """End a CNN training session""" + cnn_monitor.end_training_session(session_id, final_model_path) + +def get_cnn_dashboard_data() -> Dict[str, Any]: + """Get CNN monitoring data for dashboard""" + return cnn_monitor.get_dashboard_data() \ No newline at end of file diff --git a/core/data_provider.py b/core/data_provider.py index 195a942..99cd675 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -33,6 +33,7 @@ from collections import deque from .config import get_config from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar +from .cnn_monitor import log_cnn_prediction logger = logging.getLogger(__name__) diff --git a/core/enhanced_orchestrator.py b/core/enhanced_orchestrator.py index 3a776b1..3691cd3 100644 --- a/core/enhanced_orchestrator.py +++ b/core/enhanced_orchestrator.py @@ -31,6 +31,7 @@ from .extrema_trainer import ExtremaTrainer from .trading_action import TradingAction from .negative_case_trainer import NegativeCaseTrainer from .trading_executor import TradingExecutor +from .cnn_monitor import log_cnn_prediction, start_cnn_training_session # Enhanced pivot RL trainer functionality integrated into orchestrator logger = logging.getLogger(__name__) @@ -790,19 +791,145 @@ class EnhancedTradingOrchestrator: async def _get_timeframe_prediction_universal(self, model: CNNModelInterface, feature_matrix: np.ndarray, timeframe: str, market_state: MarketState, universal_stream: UniversalDataStream) -> Tuple[Optional[np.ndarray], float]: - """Get prediction for specific timeframe using universal data format""" + """Get prediction for specific timeframe using universal data format with CNN monitoring""" try: - # Check if model supports timeframe-specific prediction + # Measure prediction timing + prediction_start_time = time.time() + + # Get current price for context + current_price = market_state.prices.get(timeframe) + + # Check if model supports timeframe-specific prediction or enhanced predict method if hasattr(model, 'predict_timeframe'): action_probs, confidence = model.predict_timeframe(feature_matrix, timeframe) + elif hasattr(model, 'predict') and hasattr(model.predict, '__call__'): + # Enhanced CNN model with detailed output + if hasattr(model, 'enhanced_predict'): + # Get detailed prediction results + prediction_result = model.enhanced_predict(feature_matrix) + action_probs = prediction_result.get('probabilities', []) + confidence = prediction_result.get('confidence', 0.0) + else: + # Standard prediction + prediction_result = model.predict(feature_matrix) + if isinstance(prediction_result, dict): + action_probs = prediction_result.get('probabilities', []) + confidence = prediction_result.get('confidence', 0.0) + else: + action_probs, confidence = prediction_result else: action_probs, confidence = model.predict(feature_matrix) + # Calculate prediction latency + prediction_latency_ms = (time.time() - prediction_start_time) * 1000 + if action_probs is not None and confidence is not None: # Enhance confidence based on universal data quality and market conditions enhanced_confidence = self._enhance_confidence_with_universal_context( confidence, timeframe, market_state, universal_stream ) + + # Log detailed CNN prediction for monitoring + try: + # Convert probabilities to list if needed + if hasattr(action_probs, 'tolist'): + prob_list = action_probs.tolist() + elif isinstance(action_probs, (list, tuple)): + prob_list = list(action_probs) + else: + prob_list = [float(action_probs)] + + # Determine action and action confidence + if len(prob_list) >= 2: + action_idx = np.argmax(prob_list) + action_name = ['SELL', 'BUY'][action_idx] if len(prob_list) == 2 else ['SELL', 'HOLD', 'BUY'][action_idx] + action_confidence = prob_list[action_idx] + else: + action_idx = 0 + action_name = 'HOLD' + action_confidence = enhanced_confidence + + # Get model memory usage if available + model_memory_mb = None + if hasattr(model, 'get_memory_usage'): + try: + memory_info = model.get_memory_usage() + if isinstance(memory_info, dict): + model_memory_mb = memory_info.get('total_size_mb', 0.0) + else: + model_memory_mb = float(memory_info) + except: + pass + + # Create detailed prediction result for monitoring + detailed_prediction = { + 'action': action_idx, + 'action_name': action_name, + 'confidence': float(enhanced_confidence), + 'action_confidence': float(action_confidence), + 'probabilities': prob_list, + 'raw_logits': prob_list # Use probabilities as proxy for logits if not available + } + + # Add enhanced model outputs if available + if hasattr(model, 'enhanced_predict') and isinstance(prediction_result, dict): + detailed_prediction.update({ + 'regime_probabilities': prediction_result.get('regime_probabilities'), + 'volatility_prediction': prediction_result.get('volatility_prediction'), + 'extrema_prediction': prediction_result.get('extrema_prediction'), + 'risk_assessment': prediction_result.get('risk_assessment') + }) + + # Calculate price changes for context + price_change_1m = None + price_change_5m = None + volume_ratio = None + + if current_price and timeframe in market_state.prices: + # Try to get historical prices for context + try: + # Get 1m and 5m price changes if available + if '1m' in market_state.prices and market_state.prices['1m'] != current_price: + price_change_1m = (current_price - market_state.prices['1m']) / market_state.prices['1m'] + if '5m' in market_state.prices and market_state.prices['5m'] != current_price: + price_change_5m = (current_price - market_state.prices['5m']) / market_state.prices['5m'] + + # Volume ratio (current vs average) + volume_ratio = market_state.volume + except: + pass + + # Log the CNN prediction with full context + log_cnn_prediction( + model_name=getattr(model, 'name', model.__class__.__name__), + symbol=market_state.symbol, + prediction_result=detailed_prediction, + feature_matrix_shape=feature_matrix.shape, + current_price=current_price, + prediction_latency_ms=prediction_latency_ms, + model_memory_usage_mb=model_memory_mb + ) + + # Enhanced logging for detailed analysis + logger.info(f"CNN [{getattr(model, 'name', 'Unknown')}] {market_state.symbol} {timeframe}: " + f"{action_name} (conf: {enhanced_confidence:.3f}, " + f"action_conf: {action_confidence:.3f}, " + f"latency: {prediction_latency_ms:.1f}ms)") + + if detailed_prediction.get('regime_probabilities'): + regime_idx = np.argmax(detailed_prediction['regime_probabilities']) + regime_conf = detailed_prediction['regime_probabilities'][regime_idx] + logger.info(f" Regime: {regime_idx} (conf: {regime_conf:.3f})") + + if detailed_prediction.get('volatility_prediction') is not None: + logger.info(f" Volatility: {detailed_prediction['volatility_prediction']:.3f}") + + if price_change_1m is not None: + logger.info(f" Context: 1m_change: {price_change_1m:.4f}, volume_ratio: {volume_ratio:.2f}") + + except Exception as e: + logger.warning(f"Error logging CNN prediction details: {e}") + return action_probs, enhanced_confidence except Exception as e: diff --git a/web/dashboard.py b/web/dashboard.py index 76e3b5c..8656ff5 100644 --- a/web/dashboard.py +++ b/web/dashboard.py @@ -51,6 +51,17 @@ from core.trading_executor import TradingExecutor from core.trading_action import TradingAction from models import get_model_registry +# Import CNN monitoring +try: + from core.cnn_monitor import get_cnn_dashboard_data + CNN_MONITORING_AVAILABLE = True + logger.info("CNN monitoring system available") +except ImportError: + CNN_MONITORING_AVAILABLE = False + logger.warning("CNN monitoring not available") + def get_cnn_dashboard_data(): + return {'statistics': {'total_predictions_logged': 0}} + # Import enhanced RL components if available try: @@ -839,45 +850,18 @@ class TradingDashboard: ], className="card", style={"width": "28%", "marginLeft": "2%"}), ], className="row g-2 mb-3"), - # # Model Data Feed Charts - Small charts showing data fed to models - # html.Div([ - # html.Div([ - # html.Div([ - # html.H6([ - # html.I(className="fas fa-database me-1"), - # "Model Data Feeds" - # ], className="card-title mb-2 small"), - # html.Div([ - # # Row of 4 small charts - # html.Div([ - # # 1m Chart - # html.Div([ - # html.P("ETH 1m OHLCV", className="text-center mb-1 tiny text-muted"), - # dcc.Graph(id="model-data-1m", style={"height": "120px"}, config={'displayModeBar': False}, className="model-data-chart") - # ], style={"width": "24%"}), - - # # 1h Chart - # html.Div([ - # html.P("ETH 1h OHLCV", className="text-center mb-1 tiny text-muted"), - # dcc.Graph(id="model-data-1h", style={"height": "120px"}, config={'displayModeBar': False}, className="model-data-chart") - # ], style={"width": "24%", "marginLeft": "1%"}), - - # # 1d Chart - # html.Div([ - # html.P("ETH 1d OHLCV", className="text-center mb-1 tiny text-muted"), - # dcc.Graph(id="model-data-1d", style={"height": "120px"}, config={'displayModeBar': False}, className="model-data-chart") - # ], style={"width": "24%", "marginLeft": "1%"}), - - # # BTC Reference Chart - # html.Div([ - # html.P("BTC Reference", className="text-center mb-1 tiny text-muted"), - # dcc.Graph(id="model-data-btc", style={"height": "120px"}, config={'displayModeBar': False}, className="model-data-chart") - # ], style={"width": "24%", "marginLeft": "1%"}) - # ], className="d-flex") - # ]) - # ], className="card-body p-2") - # ], className="card") - # ], className="mb-3"), + # CNN Model Monitoring Section + html.Div([ + html.Div([ + html.Div([ + html.H6([ + html.I(className="fas fa-brain me-2"), + "CNN Model Analysis & Predictions" + ], className="card-title mb-2"), + html.Div(id="cnn-monitoring-content", style={"height": "350px", "overflowY": "auto"}) + ], className="card-body p-2") + ], className="card") + ], className="mb-3"), # Bottom row - Session performance and system status html.Div([ @@ -1000,11 +984,7 @@ class TradingDashboard: Output('system-status-details', 'children'), Output('current-leverage', 'children'), Output('leverage-risk', 'children'), - # Model data feed charts - # Output('model-data-1m', 'figure'), - # Output('model-data-1h', 'figure'), - # Output('model-data-1d', 'figure'), - # Output('model-data-btc', 'figure') + Output('cnn-monitoring-content', 'children') ], [Input('interval-component', 'n_intervals')] )