trying to run enhanced training system

This commit is contained in:
Dobromir Popov
2025-08-10 15:31:56 +03:00
parent b3c5076e37
commit ade4e117bf
4 changed files with 203 additions and 763 deletions

View File

@@ -110,23 +110,4 @@ I want it more to be a part of a proper rewardfunction bias rather than a algori
THINK REALY HARD THINK REALY HARD
do we evaluate and reward/punish each model at each reference? we lost track of our model training metrics. in the dash we show: do we evaluate and reward/punish each model at each reference?
Models & Training Progress
Loaded Models (5)
DQN_AGENT - ACTIVE (0) [CKPT]
Inf
Trn
Route
Last: NONE (0.0%) @ N/A
Loss: N/A
Rate: 0.00/s | 24h: 0
Last Inf: None | Train: None
ENHANCED_CNN - ACTIVE (0) [CKPT]
Inf
Trn
Route
Last: NONE (0.0%) @ N/A
Loss: 2133105152.0000 | Best: 34.2300
Rate: 0.00/s | 24h: 0
Last Inf: None | Train: None
DQN_AGENT and ENHANCED_CNN were the models we had the training working well. we had to include the others but it seems we still havent or at least do not store their metrics and best checkpoints

View File

@@ -70,17 +70,22 @@ except ImportError:
COBIntegration = None COBIntegration = None
COBSnapshot = None COBSnapshot = None
# Import EnhancedRealtimeTrainingSystem # Import EnhancedRealtimeTrainingSystem (support multiple locations)
try: try:
from enhanced_realtime_training import EnhancedRealtimeTrainingSystem # Preferred location under NN/training
from NN.training.enhanced_realtime_training import EnhancedRealtimeTrainingSystem # type: ignore
ENHANCED_TRAINING_AVAILABLE = True ENHANCED_TRAINING_AVAILABLE = True
except ImportError: except Exception:
EnhancedRealtimeTrainingSystem = None try:
ENHANCED_TRAINING_AVAILABLE = False # Fallback flat import
logging.warning( from enhanced_realtime_training import EnhancedRealtimeTrainingSystem # type: ignore
"EnhancedRealtimeTrainingSystem not found. Real-time training features will be disabled." ENHANCED_TRAINING_AVAILABLE = True
) except Exception:
EnhancedRealtimeTrainingSystem = None # type: ignore
ENHANCED_TRAINING_AVAILABLE = False
logging.warning(
"EnhancedRealtimeTrainingSystem not found. Real-time training features will be disabled."
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -6726,6 +6731,14 @@ class TradingOrchestrator:
self.training_enabled = False self.training_enabled = False
self.enhanced_training_system = None self.enhanced_training_system = None
# Public wrapper to match dashboard expectation
def initialize_enhanced_training_system(self):
try:
return self._initialize_enhanced_training_system()
except Exception as e:
logger.error(f"Error in initialize_enhanced_training_system: {e}")
return None
def start_enhanced_training(self): def start_enhanced_training(self):
"""Start the enhanced real-time training system""" """Start the enhanced real-time training system"""
try: try:

View File

@@ -159,8 +159,7 @@ class CleanTradingDashboard:
if orchestrator is None: if orchestrator is None:
self.orchestrator = TradingOrchestrator( self.orchestrator = TradingOrchestrator(
data_provider=self.data_provider, data_provider=self.data_provider,
enhanced_rl_training=True, enhanced_rl_training=True
model_registry={}
) )
logger.debug("Using unified Trading Orchestrator with full ML capabilities") logger.debug("Using unified Trading Orchestrator with full ML capabilities")
else: else:

View File

@@ -1,814 +1,261 @@
""" """
Models Training Panel
Lightweight panel used by the dashboard to render training metrics.
No synthetic data is shown; it only renders what the orchestrator provides.
"""
import logging
from typing import Any
from dash import html
logger = logging.getLogger(__name__)
class ModelsTrainingPanel:
"""Simple training panel wrapper used by the dashboard."""
def __init__(self, orchestrator: Any):
self.orchestrator = orchestrator
def render(self):
try:
# Try to pull basic stats if orchestrator exposes them
stats = getattr(self.orchestrator, "model_statistics", {}) if self.orchestrator else {}
if not stats:
return html.Div([
html.Div("No training metrics available", className="text-muted small")
])
rows = []
for name, s in stats.items():
try:
total = getattr(s, "total_inferences", 0)
avg_ms = getattr(s, "average_inference_time_ms", 0.0)
last_pred = getattr(s, "last_prediction", None)
last_conf = getattr(s, "last_confidence", None)
rows.append(html.Tr([
html.Td(name),
html.Td(str(total)),
html.Td(f"{avg_ms:.1f}"),
html.Td(str(last_pred) if last_pred is not None else ""),
html.Td(f"{last_conf:.3f}" if last_conf is not None else "")
]))
except Exception:
continue
table = html.Table([
html.Thead(html.Tr([
html.Th("Model"), html.Th("Inferences"), html.Th("Avg ms"), html.Th("Last"), html.Th("Conf")
])),
html.Tbody(rows)
], className="table table-sm table-striped mb-0")
return html.Div(table)
except Exception as e:
logger.error(f"Error rendering training panel: {e}")
return html.Div([html.Div("Error loading training panel", className="text-danger small")])
#!/usr/bin/env python3
"""
Models & Training Progress Panel - Clean Implementation Models & Training Progress Panel - Clean Implementation
Displays real-time model status, training metrics, and performance data Displays real-time model status, training metrics, and performance data
""" """
import logging import logging
from typing import Dict, List, Optional, Any from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta from datetime import datetime
from dash import html, dcc from dash import html, dcc
import dash_bootstrap_components as dbc
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ModelsTrainingPanel: class ModelsTrainingPanel:
"""Clean implementation of the Models & Training Progress panel""" """Clean implementation of the Models & Training Progress panel.
- Uses orchestrator.get_model_statistics_summary() and model_states
- No synthetic or placeholder data; shows empty when unavailable
- Avoids duplicate class definitions and invalid imports
"""
def __init__(self, orchestrator=None): def __init__(self, orchestrator=None):
self.orchestrator = orchestrator self.orchestrator = orchestrator
self.last_update = None
def create_panel(self) -> html.Div: def create_panel(self) -> html.Div:
"""Create the main Models & Training Progress panel"""
try: try:
# Get fresh data from orchestrator data = self._gather_data()
panel_data = self._gather_panel_data()
# Build the panel components content: List[html.Div] = []
content = []
# Header with refresh button
content.append(self._create_header()) content.append(self._create_header())
# Models section content.append(self._create_models_section(data.get("models", {})))
if panel_data.get('models'):
content.append(self._create_models_section(panel_data['models']))
else:
content.append(self._create_no_models_message())
# Training status section if data.get("training_status"):
if panel_data.get('training_status'): content.append(self._create_training_status_section(data["training_status"]))
content.append(self._create_training_status_section(panel_data['training_status']))
# Performance metrics section if data.get("system_metrics"):
if panel_data.get('performance_metrics'): content.append(self._create_system_metrics_section(data["system_metrics"]))
content.append(self._create_performance_section(panel_data['performance_metrics']))
return html.Div(content, id="training-metrics") return html.Div(content, id="training-metrics")
except Exception as e: except Exception as e:
logger.error(f"Error creating models training panel: {e}") logger.error(f"Error creating models training panel: {e}")
return html.Div([ return html.Div([
html.P(f"Error loading training panel: {str(e)}", className="text-danger small") html.P(f"Error loading training panel: {str(e)}", className="text-danger small")
], id="training-metrics") ], id="training-metrics")
def _gather_panel_data(self) -> Dict[str, Any]: def _gather_data(self) -> Dict[str, Any]:
"""Gather all data needed for the panel from orchestrator and other sources""" result: Dict[str, Any] = {"models": {}, "training_status": {}, "system_metrics": {}}
data = {
'models': {},
'training_status': {},
'performance_metrics': {},
'last_update': datetime.now().strftime('%H:%M:%S')
}
if not self.orchestrator: if not self.orchestrator:
logger.warning("No orchestrator available for training panel") return result
return data
try: try:
# Get model registry information # Model statistics summary (serializable)
if hasattr(self.orchestrator, 'model_registry') and self.orchestrator.model_registry: stats_summary: Dict[str, Dict[str, Any]] = {}
registered_models = self.orchestrator.model_registry.get_all_models() if hasattr(self.orchestrator, "get_model_statistics_summary"):
for model_name, model_info in registered_models.items(): stats = self.orchestrator.get_model_statistics_summary()
data['models'][model_name] = self._extract_model_data(model_name, model_info) if isinstance(stats, dict):
stats_summary = stats
# Add decision fusion model if it exists (check multiple sources) # Model states (for best_loss and checkpoint flags)
decision_fusion_added = False model_states: Dict[str, Dict[str, Any]] = getattr(self.orchestrator, "model_states", {}) or {}
# Check if it's in the model registry # Build models block from stats_summary
if hasattr(self.orchestrator, 'model_registry') and self.orchestrator.model_registry: for model_name, s in stats_summary.items():
registered_models = self.orchestrator.model_registry.get_all_models() model_info: Dict[str, Any] = {}
if 'decision_fusion' in registered_models: try:
data['models']['decision_fusion'] = self._extract_decision_fusion_data() # Status: active if we have any inference info
decision_fusion_added = True total_inf = int(s.get("total_inferences", 0) or 0)
status = "active" if (total_inf > 0 or s.get("last_inference_time")) else "registered"
# If not in registry, check if decision fusion network exists # Last prediction
if not decision_fusion_added and hasattr(self.orchestrator, 'decision_fusion_network') and self.orchestrator.decision_fusion_network: last_pred_action = s.get("last_prediction")
data['models']['decision_fusion'] = self._extract_decision_fusion_data() last_confidence = s.get("last_confidence")
decision_fusion_added = True last_inference_time = s.get("last_inference_time")
# If still not added, check if decision fusion is enabled # Loss metrics
if not decision_fusion_added and hasattr(self.orchestrator, 'decision_fusion_enabled') and self.orchestrator.decision_fusion_enabled: loss_metrics = {
data['models']['decision_fusion'] = self._extract_decision_fusion_data() "current_loss": s.get("current_loss"),
decision_fusion_added = True "best_loss": s.get("best_loss"),
}
# Add COB RL model if it exists but wasn't captured in registry # Timing metrics
if 'cob_rl_model' not in data['models'] and hasattr(self.orchestrator, 'cob_rl_model'): timing_metrics = {
data['models']['cob_rl_model'] = self._extract_cob_rl_data() "inferences_per_second": s.get("inference_rate_per_second"),
"last_inference": last_inference_time,
"last_training": s.get("last_training_time"),
}
# Get training status # Checkpoint flags from orchestrator.model_states if present
data['training_status'] = self._extract_training_status() ckpt_loaded = False
ckpt_failed = False
if model_states and model_name in model_states:
ckpt_loaded = bool(model_states[model_name].get("checkpoint_loaded", False))
ckpt_failed = bool(model_states[model_name].get("checkpoint_failed", False))
# Get performance metrics model_info = {
data['performance_metrics'] = self._extract_performance_metrics() "name": model_name,
"status": status,
"parameters": 0, # unknown; do not synthesize
"last_prediction": {
"action": last_pred_action if last_pred_action is not None else "",
"confidence": last_confidence if last_confidence is not None else None,
"timestamp": last_inference_time if last_inference_time else "",
},
"training_enabled": True,
"inference_enabled": True,
"routing_enabled": True,
"checkpoint_loaded": ckpt_loaded,
"checkpoint_failed": ckpt_failed,
"loss_metrics": loss_metrics,
"timing_metrics": timing_metrics,
"signal_stats": {},
}
except Exception as e:
logger.debug(f"Error building model info for {model_name}: {e}")
model_info = {"name": model_name, "status": "error", "error": str(e)}
result["models"][model_name] = model_info
# Enhanced training system status
training_status: Dict[str, Any] = {}
try:
if hasattr(self.orchestrator, "get_enhanced_training_stats"):
training_status = self.orchestrator.get_enhanced_training_stats() or {}
except Exception as e:
logger.debug(f"Enhanced training stats unavailable: {e}")
result["training_status"] = training_status
# System metrics (decision fusion, cob integration)
system_metrics = {
"decision_fusion_active": bool(getattr(self.orchestrator, "decision_fusion_enabled", False)),
"cob_integration_active": bool(getattr(self.orchestrator, "cob_integration", None) is not None),
"symbols_tracking": len(getattr(getattr(self.orchestrator, "cob_integration", None), "symbols", []) or []),
}
result["system_metrics"] = system_metrics
return result
except Exception as e: except Exception as e:
logger.error(f"Error gathering panel data: {e}") logger.error(f"Error gathering panel data: {e}")
data['error'] = str(e) return result
return data
def _extract_model_data(self, model_name: str, model_info: Any) -> Dict[str, Any]:
"""Extract relevant data for a single model"""
try:
model_data = {
'name': model_name,
'status': 'unknown',
'parameters': 0,
'last_prediction': {},
'training_enabled': True,
'inference_enabled': True,
'checkpoint_loaded': False,
'loss_metrics': {},
'timing_metrics': {}
}
# Get model status from orchestrator - check if model is actually loaded and active
if hasattr(self.orchestrator, 'get_model_state'):
model_state = self.orchestrator.get_model_state(model_name)
model_data['status'] = 'active' if model_state else 'inactive'
# Check actual inference activity from logs/statistics
if hasattr(self.orchestrator, 'get_model_statistics'):
stats = self.orchestrator.get_model_statistics()
if stats and model_name in stats:
model_stats = stats[model_name]
# Check if model has recent activity (last prediction exists)
if hasattr(model_stats, 'last_prediction') and model_stats.last_prediction:
model_data['status'] = 'active'
elif hasattr(model_stats, 'inferences_per_second') and getattr(model_stats, 'inferences_per_second', 0) > 0:
model_data['status'] = 'active'
else:
model_data['status'] = 'registered' # Registered but not actively inferencing
else:
model_data['status'] = 'inactive'
# Check if model is in registry (fallback)
if hasattr(self.orchestrator, 'model_registry') and self.orchestrator.model_registry:
registered_models = self.orchestrator.model_registry.get_all_models()
if model_name in registered_models and model_data['status'] == 'unknown':
model_data['status'] = 'registered'
# Get toggle states
if hasattr(self.orchestrator, 'get_model_toggle_state'):
toggle_state = self.orchestrator.get_model_toggle_state(model_name)
if isinstance(toggle_state, dict):
model_data['training_enabled'] = toggle_state.get('training_enabled', True)
model_data['inference_enabled'] = toggle_state.get('inference_enabled', True)
model_data['routing_enabled'] = toggle_state.get('routing_enabled', True)
# Get model statistics
if hasattr(self.orchestrator, 'get_model_statistics'):
stats = self.orchestrator.get_model_statistics()
if stats and model_name in stats:
model_stats = stats[model_name]
# Handle both dict and object formats
def safe_get(obj, key, default=None):
if hasattr(obj, key):
return getattr(obj, key, default)
elif isinstance(obj, dict):
return obj.get(key, default)
else:
return default
# Extract loss metrics
model_data['loss_metrics'] = {
'current_loss': safe_get(model_stats, 'current_loss'),
'best_loss': safe_get(model_stats, 'best_loss'),
'loss_5ma': safe_get(model_stats, 'loss_5ma'),
'improvement': safe_get(model_stats, 'improvement', 0)
}
# Extract timing metrics
model_data['timing_metrics'] = {
'last_inference': safe_get(model_stats, 'last_inference'),
'last_training': safe_get(model_stats, 'last_training'),
'inferences_per_second': safe_get(model_stats, 'inferences_per_second', 0),
'predictions_24h': safe_get(model_stats, 'predictions_24h', 0)
}
# Extract last prediction
last_pred = safe_get(model_stats, 'last_prediction')
if last_pred:
model_data['last_prediction'] = {
'action': safe_get(last_pred, 'action', 'NONE'),
'confidence': safe_get(last_pred, 'confidence', 0),
'timestamp': safe_get(last_pred, 'timestamp', 'N/A'),
'predicted_price': safe_get(last_pred, 'predicted_price'),
'price_change': safe_get(last_pred, 'price_change')
}
# Extract model parameters count
model_data['parameters'] = safe_get(model_stats, 'parameters', 0)
# Check checkpoint status from orchestrator model states (more reliable)
checkpoint_loaded = False
checkpoint_failed = False
if hasattr(self.orchestrator, 'model_states'):
model_state_mapping = {
'dqn_agent': 'dqn',
'enhanced_cnn': 'cnn',
'cob_rl_model': 'cob_rl',
'extrema_trainer': 'extrema_trainer'
}
state_key = model_state_mapping.get(model_name, model_name)
if state_key in self.orchestrator.model_states:
checkpoint_loaded = self.orchestrator.model_states[state_key].get('checkpoint_loaded', False)
checkpoint_failed = self.orchestrator.model_states[state_key].get('checkpoint_failed', False)
# If not found in model states, check model stats as fallback
if not checkpoint_loaded and not checkpoint_failed:
checkpoint_loaded = safe_get(model_stats, 'checkpoint_loaded', False)
model_data['checkpoint_loaded'] = checkpoint_loaded
model_data['checkpoint_failed'] = checkpoint_failed
# Extract signal generation statistics and real performance data
model_data['signal_stats'] = {
'buy_signals': safe_get(model_stats, 'buy_signals_count', 0),
'sell_signals': safe_get(model_stats, 'sell_signals_count', 0),
'hold_signals': safe_get(model_stats, 'hold_signals_count', 0),
'total_signals': safe_get(model_stats, 'total_signals', 0),
'accuracy': safe_get(model_stats, 'accuracy', 0),
'win_rate': safe_get(model_stats, 'win_rate', 0)
}
# Do not inject synthetic performance metrics; rely only on available stats
return model_data
except Exception as e:
logger.error(f"Error extracting data for model {model_name}: {e}")
return {'name': model_name, 'status': 'error', 'error': str(e)}
def _extract_decision_fusion_data(self) -> Dict[str, Any]:
"""Extract data for the decision fusion model"""
try:
decision_data = {
'name': 'decision_fusion',
'status': 'active',
'parameters': 0,
'last_prediction': {},
'training_enabled': True,
'inference_enabled': True,
'checkpoint_loaded': False,
'loss_metrics': {},
'timing_metrics': {},
'signal_stats': {}
}
# Check if decision fusion is actually enabled and working
if hasattr(self.orchestrator, 'decision_fusion_enabled'):
decision_data['status'] = 'active' if self.orchestrator.decision_fusion_enabled else 'registered'
# Check if decision fusion network exists
if hasattr(self.orchestrator, 'decision_fusion_network') and self.orchestrator.decision_fusion_network:
decision_data['status'] = 'active'
# Get network parameters
if hasattr(self.orchestrator.decision_fusion_network, 'parameters'):
decision_data['parameters'] = sum(p.numel() for p in self.orchestrator.decision_fusion_network.parameters())
# Check decision fusion mode
if hasattr(self.orchestrator, 'decision_fusion_mode'):
decision_data['mode'] = self.orchestrator.decision_fusion_mode
if self.orchestrator.decision_fusion_mode == 'neural':
decision_data['status'] = 'active'
elif self.orchestrator.decision_fusion_mode == 'programmatic':
decision_data['status'] = 'active' # Still active, just using programmatic mode
# Get decision fusion statistics
if hasattr(self.orchestrator, 'get_decision_fusion_stats'):
stats = self.orchestrator.get_decision_fusion_stats()
if stats:
decision_data['loss_metrics']['current_loss'] = stats.get('recent_loss')
decision_data['timing_metrics']['decisions_per_second'] = stats.get('decisions_per_second', 0)
decision_data['signal_stats'] = {
'buy_decisions': stats.get('buy_decisions', 0),
'sell_decisions': stats.get('sell_decisions', 0),
'hold_decisions': stats.get('hold_decisions', 0),
'total_decisions': stats.get('total_decisions', 0),
'consensus_rate': stats.get('consensus_rate', 0)
}
# Get decision fusion network parameters
if hasattr(self.orchestrator, 'decision_fusion') and self.orchestrator.decision_fusion:
if hasattr(self.orchestrator.decision_fusion, 'parameters'):
decision_data['parameters'] = sum(p.numel() for p in self.orchestrator.decision_fusion.parameters())
# Check for decision fusion checkpoint status
if hasattr(self.orchestrator, 'model_states') and 'decision_fusion' in self.orchestrator.model_states:
df_state = self.orchestrator.model_states['decision_fusion']
decision_data['checkpoint_loaded'] = df_state.get('checkpoint_loaded', False)
return decision_data
except Exception as e:
logger.error(f"Error extracting decision fusion data: {e}")
return {'name': 'decision_fusion', 'status': 'error', 'error': str(e)}
def _extract_cob_rl_data(self) -> Dict[str, Any]:
"""Extract data for the COB RL model"""
try:
cob_data = {
'name': 'cob_rl_model',
'status': 'registered', # Usually registered but not actively inferencing
'parameters': 0,
'last_prediction': {},
'training_enabled': True,
'inference_enabled': True,
'checkpoint_loaded': False,
'loss_metrics': {},
'timing_metrics': {},
'signal_stats': {}
}
# Check if COB RL has actual statistics
if hasattr(self.orchestrator, 'get_model_statistics'):
stats = self.orchestrator.get_model_statistics()
if stats and 'cob_rl_model' in stats:
cob_stats = stats['cob_rl_model']
# Use the safe_get function from above
def safe_get(obj, key, default=None):
if hasattr(obj, key):
return getattr(obj, key, default)
elif isinstance(obj, dict):
return obj.get(key, default)
else:
return default
cob_data['parameters'] = safe_get(cob_stats, 'parameters', 356647429) # Known COB RL size
cob_data['status'] = 'active' if safe_get(cob_stats, 'inferences_per_second', 0) > 0 else 'registered'
# Extract metrics if available
cob_data['loss_metrics'] = {
'current_loss': safe_get(cob_stats, 'current_loss'),
'best_loss': safe_get(cob_stats, 'best_loss'),
}
return cob_data
except Exception as e:
logger.error(f"Error extracting COB RL data: {e}")
return {'name': 'cob_rl_model', 'status': 'error', 'error': str(e)}
def _extract_training_status(self) -> Dict[str, Any]:
"""Extract overall training status"""
try:
status = {
'active_sessions': 0,
'total_training_steps': 0,
'is_training': False,
'last_update': 'N/A'
}
# Check if enhanced training system is available
if hasattr(self.orchestrator, 'enhanced_training') and self.orchestrator.enhanced_training:
enhanced_stats = self.orchestrator.enhanced_training.get_training_statistics()
if enhanced_stats:
status.update({
'is_training': enhanced_stats.get('is_training', False),
'training_iteration': enhanced_stats.get('training_iteration', 0),
'experience_buffer_size': enhanced_stats.get('experience_buffer_size', 0),
'last_update': datetime.now().strftime('%H:%M:%S')
})
return status
except Exception as e:
logger.error(f"Error extracting training status: {e}")
return {'error': str(e)}
def _extract_performance_metrics(self) -> Dict[str, Any]:
"""Extract performance metrics"""
try:
metrics = {
'decision_fusion_active': False,
'cob_integration_active': False,
'symbols_tracking': 0,
'recent_decisions': 0
}
# Check decision fusion status
if hasattr(self.orchestrator, 'decision_fusion_enabled'):
metrics['decision_fusion_active'] = self.orchestrator.decision_fusion_enabled
# Check COB integration
if hasattr(self.orchestrator, 'cob_integration') and self.orchestrator.cob_integration:
metrics['cob_integration_active'] = True
if hasattr(self.orchestrator.cob_integration, 'symbols'):
metrics['symbols_tracking'] = len(self.orchestrator.cob_integration.symbols)
return metrics
except Exception as e:
logger.error(f"Error extracting performance metrics: {e}")
return {'error': str(e)}
def _create_header(self) -> html.Div: def _create_header(self) -> html.Div:
"""Create the panel header with title and refresh button"""
return html.Div([ return html.Div([
html.H6([ html.H6([html.I(className="fas fa-brain me-2 text-primary"), "Models & Training Progress"], className="mb-2"),
html.I(className="fas fa-brain me-2 text-primary"), html.Button([html.I(className="fas fa-sync-alt me-1"), "Refresh"], id="refresh-training-metrics-btn", className="btn btn-sm btn-outline-primary mb-2")
"Models & Training Progress"
], className="mb-2"),
html.Button([
html.I(className="fas fa-sync-alt me-1"),
"Refresh"
], id="refresh-training-metrics-btn", className="btn btn-sm btn-outline-primary mb-2")
], className="d-flex justify-content-between align-items-start") ], className="d-flex justify-content-between align-items-start")
def _create_models_section(self, models_data: Dict[str, Any]) -> html.Div: def _create_models_section(self, models: Dict[str, Any]) -> html.Div:
"""Create the models section showing each loaded model""" cards = [self._create_model_card(name, info) for name, info in models.items()]
model_cards = []
for model_name, model_data in models_data.items():
if model_data.get('error'):
# Error card
model_cards.append(html.Div([
html.Strong(f"{model_name.upper()}", className="text-danger"),
html.P(f"Error: {model_data['error']}", className="text-danger small mb-0")
], className="border border-danger rounded p-2 mb-2"))
else:
model_cards.append(self._create_model_card(model_name, model_data))
return html.Div([ return html.Div([
html.H6([ html.H6([html.I(className="fas fa-microchip me-2 text-success"), f"Loaded Models ({len(models)})"], className="mb-2"),
html.I(className="fas fa-microchip me-2 text-success"), html.Div(cards)
f"Loaded Models ({len(models_data)})"
], className="mb-2"),
html.Div(model_cards)
]) ])
def _create_model_card(self, model_name: str, model_data: Dict[str, Any]) -> html.Div: def _create_model_card(self, model_name: str, model_data: Dict[str, Any]) -> html.Div:
"""Create a card for a single model""" status = model_data.get("status", "unknown")
# Status styling if status == "active":
status = model_data.get('status', 'unknown') status_class = "text-success"; status_icon = "fas fa-check-circle"; status_text = "ACTIVE"
if status == 'active': elif status == "registered":
status_class = "text-success" status_class = "text-warning"; status_icon = "fas fa-circle"; status_text = "REGISTERED"
status_icon = "fas fa-check-circle" elif status == "inactive":
status_text = "ACTIVE" status_class = "text-muted"; status_icon = "fas fa-pause-circle"; status_text = "INACTIVE"
elif status == 'registered':
status_class = "text-warning"
status_icon = "fas fa-circle"
status_text = "REGISTERED"
elif status == 'inactive':
status_class = "text-muted"
status_icon = "fas fa-pause-circle"
status_text = "INACTIVE"
else: else:
status_class = "text-danger" status_class = "text-danger"; status_icon = "fas fa-exclamation-circle"; status_text = "UNKNOWN"
status_icon = "fas fa-exclamation-circle"
status_text = "UNKNOWN"
# Model size formatting last_pred = model_data.get("last_prediction", {})
params = model_data.get('parameters', 0) pred_action = last_pred.get("action") or "NONE"
if params > 1e9: pred_confidence = last_pred.get("confidence")
size_str = f"{params/1e9:.1f}B" pred_time = last_pred.get("timestamp") or "N/A"
elif params > 1e6:
size_str = f"{params/1e6:.1f}M"
elif params > 1e3:
size_str = f"{params/1e3:.1f}K"
else:
size_str = str(params)
# Last prediction info loss_metrics = model_data.get("loss_metrics", {})
last_pred = model_data.get('last_prediction', {}) current_loss = loss_metrics.get("current_loss")
pred_action = last_pred.get('action', 'NONE') best_loss = loss_metrics.get("best_loss")
pred_confidence = last_pred.get('confidence', 0) loss_class = (
pred_time = last_pred.get('timestamp', 'N/A') "text-success" if (isinstance(current_loss, (int, float)) and current_loss < 0.1)
else "text-warning" if (isinstance(current_loss, (int, float)) and current_loss < 0.5)
else "text-danger"
) if current_loss is not None else "text-muted"
# Loss metrics timing = model_data.get("timing_metrics", {})
loss_metrics = model_data.get('loss_metrics', {}) rate = timing.get("inferences_per_second")
current_loss = loss_metrics.get('current_loss')
loss_class = "text-success" if current_loss and current_loss < 0.1 else "text-warning" if current_loss and current_loss < 0.5 else "text-danger"
# Timing metrics
timing = model_data.get('timing_metrics', {})
return html.Div([ return html.Div([
# Header with model name and status
html.Div([ html.Div([
html.Div([ html.Div([
html.I(className=f"{status_icon} me-2 {status_class}"), html.I(className=f"{status_icon} me-2 {status_class}"),
html.Strong(f"{model_name.upper()}", className=status_class), html.Strong(f"{model_name.upper()}", className=status_class),
html.Span(f" - {status_text}", className=f"{status_class} small ms-1"), html.Span(f" - {status_text}", className=f"{status_class} small ms-1"),
html.Span(f" ({size_str})", className="text-muted small ms-2"), html.Span(" [CKPT]" if model_data.get("checkpoint_loaded") else (" [FAILED]" if model_data.get("checkpoint_failed") else " [FRESH]"), className=f"small {'text-success' if model_data.get('checkpoint_loaded') else 'text-danger' if model_data.get('checkpoint_failed') else 'text-warning'} ms-1")
# Show mode for decision fusion
*([html.Span(f" [{model_data.get('mode', 'unknown').upper()}]", className="text-info small ms-1")] if model_name == 'decision_fusion' and model_data.get('mode') else []),
html.Span(
" [CKPT]" if model_data.get('checkpoint_loaded')
else " [FAILED]" if model_data.get('checkpoint_failed')
else " [FRESH]",
className=f"small {'text-success' if model_data.get('checkpoint_loaded') else 'text-danger' if model_data.get('checkpoint_failed') else 'text-warning'} ms-1"
)
], style={"flex": "1"}), ], style={"flex": "1"}),
# Toggle switches with pattern matching IDs
html.Div([ html.Div([
html.Div([ html.Div([
html.Label("Inf", className="text-muted small me-1", style={"font-size": "10px"}), html.Label("Inf", className="text-muted small me-1", style={"fontSize": "10px"}),
dcc.Checklist( dcc.Checklist(id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'inference'}, options=[{"label": "", "value": True}], value=[True] if model_data.get('inference_enabled', True) else [], className="form-check-input me-2", style={"transform": "scale(0.7)"})
id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'inference'},
options=[{"label": "", "value": True}],
value=[True] if model_data.get('inference_enabled', True) else [],
className="form-check-input me-2",
style={"transform": "scale(0.7)"}
)
], className="d-flex align-items-center me-2"), ], className="d-flex align-items-center me-2"),
html.Div([ html.Div([
html.Label("Trn", className="text-muted small me-1", style={"font-size": "10px"}), html.Label("Trn", className="text-muted small me-1", style={"fontSize": "10px"}),
dcc.Checklist( dcc.Checklist(id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'training'}, options=[{"label": "", "value": True}], value=[True] if model_data.get('training_enabled', True) else [], className="form-check-input", style={"transform": "scale(0.7)"})
id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'training'},
options=[{"label": "", "value": True}],
value=[True] if model_data.get('training_enabled', True) else [],
className="form-check-input",
style={"transform": "scale(0.7)"}
)
], className="d-flex align-items-center me-2"), ], className="d-flex align-items-center me-2"),
html.Div([ html.Div([
html.Label("Route", className="text-muted small me-1", style={"font-size": "10px"}), html.Label("Route", className="text-muted small me-1", style={"fontSize": "10px"}),
dcc.Checklist( dcc.Checklist(id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'routing'}, options=[{"label": "", "value": True}], value=[True] if model_data.get('routing_enabled', True) else [], className="form-check-input", style={"transform": "scale(0.7)"})
id={'type': 'model-toggle', 'model': model_name, 'toggle_type': 'routing'}, ], className="d-flex align-items-center"),
options=[{"label": "", "value": True}],
value=[True] if model_data.get('routing_enabled', True) else [],
className="form-check-input",
style={"transform": "scale(0.7)"}
)
], className="d-flex align-items-center")
], className="d-flex") ], className="d-flex")
], className="d-flex align-items-center mb-2"), ], className="d-flex align-items-center mb-2"),
# Model metrics
html.Div([ html.Div([
# Last prediction
html.Div([ html.Div([
html.Span("Last: ", className="text-muted small"), html.Span("Last: ", className="text-muted small"),
html.Span(f"{pred_action}", html.Span(f"{pred_action}", className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-warning'}"),
className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-warning'}"), html.Span(f" ({pred_confidence:.1f}%)" if isinstance(pred_confidence, (int, float)) else "", className="text-muted small"),
html.Span(f" ({pred_confidence:.1f}%)", className="text-muted small"),
html.Span(f" @ {pred_time}", className="text-muted small") html.Span(f" @ {pred_time}", className="text-muted small")
], className="mb-1"), ], className="mb-1"),
# Loss information
html.Div([ html.Div([
html.Span("Loss: ", className="text-muted small"), html.Span("Loss: ", className="text-muted small"),
html.Span(f"{current_loss:.4f}" if current_loss is not None else "N/A", html.Span(f"{current_loss:.4f}" if isinstance(current_loss, (int, float)) else "", className=f"small fw-bold {loss_class}"),
className=f"small fw-bold {loss_class}"), *([html.Span(" | Best: ", className="text-muted small"), html.Span(f"{best_loss:.4f}", className="text-success small")] if isinstance(best_loss, (int, float)) else [])
*([
html.Span(" | Best: ", className="text-muted small"),
html.Span(f"{loss_metrics.get('best_loss', 0):.4f}", className="text-success small")
] if loss_metrics.get('best_loss') is not None else [])
], className="mb-1"), ], className="mb-1"),
# Timing information
html.Div([ html.Div([
html.Span("Rate: ", className="text-muted small"), html.Span("Rate: ", className="text-muted small"),
html.Span(f"{timing.get('inferences_per_second', 0):.2f}/s", className="text-info small"), html.Span(f"{rate:.2f}/s" if isinstance(rate, (int, float)) else "", className="text-info small")
html.Span(" | 24h: ", className="text-muted small"),
html.Span(f"{timing.get('predictions_24h', 0)}", className="text-primary small")
], className="mb-1"), ], className="mb-1"),
# Last activity times
html.Div([ html.Div([
html.Span("Last Inf: ", className="text-muted small"), html.Span("Last Inf: ", className="text-muted small"),
html.Span(f"{timing.get('last_inference', 'N/A')}", className="text-info small"), html.Span(f"{timing.get('last_inference', 'N/A')}", className="text-info small"),
html.Span(" | Train: ", className="text-muted small"), html.Span(" | Train: ", className="text-muted small"),
html.Span(f"{timing.get('last_training', 'N/A')}", className="text-warning small") html.Span(f"{timing.get('last_training', 'N/A')}", className="text-warning small")
], className="mb-1"), ], className="mb-1"),
# Signal generation statistics
*self._create_signal_stats_display(model_data.get('signal_stats', {})),
# Performance metrics
*self._create_performance_metrics_display(model_data)
]) ])
], className="border rounded p-2 mb-2", ], className="border rounded p-2 mb-2")
style={"backgroundColor": "rgba(255,255,255,0.05)" if status == 'active' else "rgba(128,128,128,0.1)"})
def _create_no_models_message(self) -> html.Div: def _create_training_status_section(self, status: Dict[str, Any]) -> html.Div:
"""Create message when no models are loaded""" if not status or status.get("status") == "error":
return html.Div([
html.H6([
html.I(className="fas fa-exclamation-triangle me-2 text-warning"),
"No Models Loaded"
], className="mb-2"),
html.P("No machine learning models are currently loaded. Check orchestrator status.",
className="text-muted small")
])
def _create_training_status_section(self, training_status: Dict[str, Any]) -> html.Div:
"""Create the training status section"""
if training_status.get('error'):
return html.Div([ return html.Div([
html.Hr(), html.Hr(),
html.H6([ html.H6([html.I(className="fas fa-exclamation-triangle me-2 text-danger"), "Training Status Error"], className="mb-2"),
html.I(className="fas fa-exclamation-triangle me-2 text-danger"), html.P(f"Error: {status.get('error', 'Unknown')}", className="text-danger small")
"Training Status Error"
], className="mb-2"),
html.P(f"Error: {training_status['error']}", className="text-danger small")
]) ])
is_enabled = status.get("training_enabled", False)
is_training = training_status.get('is_training', False)
return html.Div([ return html.Div([
html.Hr(), html.Hr(),
html.H6([ html.H6([html.I(className="fas fa-brain me-2 text-secondary"), "Training Status"], className="mb-2"),
html.I(className="fas fa-brain me-2 text-secondary"),
"Training Status"
], className="mb-2"),
html.Div([ html.Div([
html.Span("Status: ", className="text-muted small"), html.Span("Enabled: ", className="text-muted small"),
html.Span("ACTIVE" if is_training else "INACTIVE", html.Span("ON" if is_enabled else "OFF", className=f"small fw-bold {'text-success' if is_enabled else 'text-warning'}"),
className=f"small fw-bold {'text-success' if is_training else 'text-warning'}"),
html.Span(f" | Iteration: {training_status.get('training_iteration', 0):,}",
className="text-info small ms-2")
], className="mb-1"), ], className="mb-1"),
html.Div([
html.Span("Buffer: ", className="text-muted small"),
html.Span(f"{training_status.get('experience_buffer_size', 0):,}",
className="text-success small"),
html.Span(" | Updated: ", className="text-muted small"),
html.Span(f"{training_status.get('last_update', 'N/A')}",
className="text-muted small")
], className="mb-0")
]) ])
def _create_performance_section(self, performance_metrics: Dict[str, Any]) -> html.Div: def _create_system_metrics_section(self, metrics: Dict[str, Any]) -> html.Div:
"""Create the performance metrics section"""
if performance_metrics.get('error'):
return html.Div([
html.Hr(),
html.P(f"Performance metrics error: {performance_metrics['error']}",
className="text-danger small")
])
return html.Div([ return html.Div([
html.Hr(), html.Hr(),
html.H6([ html.H6([html.I(className="fas fa-chart-line me-2 text-primary"), "System Performance"], className="mb-2"),
html.I(className="fas fa-chart-line me-2 text-primary"),
"System Performance"
], className="mb-2"),
html.Div([ html.Div([
html.Span("Decision Fusion: ", className="text-muted small"), html.Span("Decision Fusion: ", className="text-muted small"),
html.Span("ON" if performance_metrics.get('decision_fusion_active') else "OFF", html.Span("ON" if metrics.get("decision_fusion_active") else "OFF", className=f"small {'text-success' if metrics.get('decision_fusion_active') else 'text-muted'}"),
className=f"small {'text-success' if performance_metrics.get('decision_fusion_active') else 'text-muted'}"),
html.Span(" | COB: ", className="text-muted small"), html.Span(" | COB: ", className="text-muted small"),
html.Span("ON" if performance_metrics.get('cob_integration_active') else "OFF", html.Span("ON" if metrics.get("cob_integration_active") else "OFF", className=f"small {'text-success' if metrics.get('cob_integration_active') else 'text-muted'}"),
className=f"small {'text-success' if performance_metrics.get('cob_integration_active') else 'text-muted'}")
], className="mb-1"), ], className="mb-1"),
html.Div([ html.Div([
html.Span("Tracking: ", className="text-muted small"), html.Span("Tracking: ", className="text-muted small"),
html.Span(f"{performance_metrics.get('symbols_tracking', 0)} symbols", html.Span(f"{metrics.get('symbols_tracking', 0)} symbols", className="text-info small"),
className="text-info small"), ], className="mb-0"),
html.Span(" | Decisions: ", className="text-muted small"),
html.Span(f"{performance_metrics.get('recent_decisions', 0):,}",
className="text-primary small")
], className="mb-0")
]) ])
def _create_signal_stats_display(self, signal_stats: Dict[str, Any]) -> List[html.Div]:
"""Create display elements for signal generation statistics"""
if not signal_stats or not any(signal_stats.values()):
return []
buy_signals = signal_stats.get('buy_signals', 0)
sell_signals = signal_stats.get('sell_signals', 0)
hold_signals = signal_stats.get('hold_signals', 0)
total_signals = signal_stats.get('total_signals', 0)
if total_signals == 0:
return []
# Calculate percentages - ensure all values are numeric
buy_signals = buy_signals or 0
sell_signals = sell_signals or 0
hold_signals = hold_signals or 0
total_signals = total_signals or 0
buy_pct = (buy_signals / total_signals * 100) if total_signals > 0 else 0
sell_pct = (sell_signals / total_signals * 100) if total_signals > 0 else 0
hold_pct = (hold_signals / total_signals * 100) if total_signals > 0 else 0
return [
html.Div([
html.Span("Signals: ", className="text-muted small"),
html.Span(f"B:{buy_signals}({buy_pct:.0f}%)", className="text-success small"),
html.Span(" | ", className="text-muted small"),
html.Span(f"S:{sell_signals}({sell_pct:.0f}%)", className="text-danger small"),
html.Span(" | ", className="text-muted small"),
html.Span(f"H:{hold_signals}({hold_pct:.0f}%)", className="text-warning small")
], className="mb-1"),
html.Div([
html.Span("Total: ", className="text-muted small"),
html.Span(f"{total_signals:,}", className="text-primary small fw-bold"),
*([
html.Span(" | Accuracy: ", className="text-muted small"),
html.Span(f"{signal_stats.get('accuracy', 0):.1f}%",
className=f"small fw-bold {'text-success' if signal_stats.get('accuracy', 0) > 60 else 'text-warning' if signal_stats.get('accuracy', 0) > 40 else 'text-danger'}")
] if signal_stats.get('accuracy', 0) > 0 else [])
], className="mb-1")
]
def _create_performance_metrics_display(self, model_data: Dict[str, Any]) -> List[html.Div]:
"""Create display elements for performance metrics"""
elements = []
# Win rate and accuracy
signal_stats = model_data.get('signal_stats', {})
loss_metrics = model_data.get('loss_metrics', {})
# Safely get numeric values
win_rate = signal_stats.get('win_rate', 0) or 0
accuracy = signal_stats.get('accuracy', 0) or 0
if win_rate > 0 or accuracy > 0:
elements.append(html.Div([
html.Span("Performance: ", className="text-muted small"),
*([
html.Span(f"Win: {win_rate:.1f}%",
className=f"small fw-bold {'text-success' if win_rate > 55 else 'text-warning' if win_rate > 45 else 'text-danger'}"),
html.Span(" | ", className="text-muted small")
] if win_rate > 0 else []),
*([
html.Span(f"Acc: {accuracy:.1f}%",
className=f"small fw-bold {'text-success' if accuracy > 60 else 'text-warning' if accuracy > 40 else 'text-danger'}")
] if accuracy > 0 else [])
], className="mb-1"))
# Loss improvement
if loss_metrics.get('improvement', 0) != 0:
improvement = loss_metrics.get('improvement', 0)
elements.append(html.Div([
html.Span("Improvement: ", className="text-muted small"),
html.Span(f"{improvement:+.1f}%",
className=f"small fw-bold {'text-success' if improvement > 0 else 'text-danger'}")
], className="mb-1"))
return elements