Compare commits
8 Commits
9639073a09
...
d15ebf54ca
Author | SHA1 | Date | |
---|---|---|---|
d15ebf54ca | |||
488fbacf67 | |||
b47805dafc | |||
11718bf92f | |||
29e4076638 | |||
03573cfb56 | |||
083c1272ae | |||
b9159690ef |
@ -271,15 +271,15 @@
|
||||
],
|
||||
"decision": [
|
||||
{
|
||||
"checkpoint_id": "decision_20250702_013257",
|
||||
"checkpoint_id": "decision_20250702_083032",
|
||||
"model_name": "decision",
|
||||
"model_type": "decision_fusion",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013257.pt",
|
||||
"created_at": "2025-07-02T01:32:57.057698",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_083032.pt",
|
||||
"created_at": "2025-07-02T08:30:32.225869",
|
||||
"file_size_mb": 0.06720924377441406,
|
||||
"performance_score": 9.99999352005137,
|
||||
"performance_score": 102.79972716525019,
|
||||
"accuracy": null,
|
||||
"loss": 6.479948628599987e-06,
|
||||
"loss": 2.7283549419721e-06,
|
||||
"val_accuracy": null,
|
||||
"val_loss": null,
|
||||
"reward": null,
|
||||
@ -291,15 +291,15 @@
|
||||
"wandb_artifact_name": null
|
||||
},
|
||||
{
|
||||
"checkpoint_id": "decision_20250702_013256",
|
||||
"checkpoint_id": "decision_20250702_082925",
|
||||
"model_name": "decision",
|
||||
"model_type": "decision_fusion",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013256.pt",
|
||||
"created_at": "2025-07-02T01:32:56.667169",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||
"created_at": "2025-07-02T08:29:25.899383",
|
||||
"file_size_mb": 0.06720924377441406,
|
||||
"performance_score": 9.999993471487318,
|
||||
"performance_score": 102.7997148991013,
|
||||
"accuracy": null,
|
||||
"loss": 6.528512681061979e-06,
|
||||
"loss": 2.8510171153430164e-06,
|
||||
"val_accuracy": null,
|
||||
"val_loss": null,
|
||||
"reward": null,
|
||||
@ -311,15 +311,15 @@
|
||||
"wandb_artifact_name": null
|
||||
},
|
||||
{
|
||||
"checkpoint_id": "decision_20250702_013255",
|
||||
"checkpoint_id": "decision_20250702_082924",
|
||||
"model_name": "decision",
|
||||
"model_type": "decision_fusion",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
||||
"created_at": "2025-07-02T01:32:55.915359",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082924.pt",
|
||||
"created_at": "2025-07-02T08:29:24.538886",
|
||||
"file_size_mb": 0.06720924377441406,
|
||||
"performance_score": 9.999993469737547,
|
||||
"performance_score": 102.79971291710027,
|
||||
"accuracy": null,
|
||||
"loss": 6.5302624539599814e-06,
|
||||
"loss": 2.8708372390440218e-06,
|
||||
"val_accuracy": null,
|
||||
"val_loss": null,
|
||||
"reward": null,
|
||||
@ -331,15 +331,15 @@
|
||||
"wandb_artifact_name": null
|
||||
},
|
||||
{
|
||||
"checkpoint_id": "decision_20250702_013255",
|
||||
"checkpoint_id": "decision_20250702_082925",
|
||||
"model_name": "decision",
|
||||
"model_type": "decision_fusion",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
||||
"created_at": "2025-07-02T01:32:55.774316",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||
"created_at": "2025-07-02T08:29:25.218718",
|
||||
"file_size_mb": 0.06720924377441406,
|
||||
"performance_score": 9.99999346914947,
|
||||
"performance_score": 102.79971274601752,
|
||||
"accuracy": null,
|
||||
"loss": 6.530850530594989e-06,
|
||||
"loss": 2.87254807635711e-06,
|
||||
"val_accuracy": null,
|
||||
"val_loss": null,
|
||||
"reward": null,
|
||||
@ -351,15 +351,15 @@
|
||||
"wandb_artifact_name": null
|
||||
},
|
||||
{
|
||||
"checkpoint_id": "decision_20250702_013255",
|
||||
"checkpoint_id": "decision_20250702_082925",
|
||||
"model_name": "decision",
|
||||
"model_type": "decision_fusion",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
||||
"created_at": "2025-07-02T01:32:55.646001",
|
||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||
"created_at": "2025-07-02T08:29:25.332228",
|
||||
"file_size_mb": 0.06720924377441406,
|
||||
"performance_score": 9.99999346889822,
|
||||
"performance_score": 102.79971263447665,
|
||||
"accuracy": null,
|
||||
"loss": 6.531101780155828e-06,
|
||||
"loss": 2.873663491419011e-06,
|
||||
"val_accuracy": null,
|
||||
"val_loss": null,
|
||||
"reward": null,
|
||||
|
@ -476,16 +476,36 @@ class COBIntegration:
|
||||
async def _analyze_cob_patterns(self, symbol: str, cob_snapshot: COBSnapshot):
|
||||
"""Analyze COB data for trading patterns and signals"""
|
||||
try:
|
||||
# Large liquidity imbalance detection
|
||||
if abs(cob_snapshot.liquidity_imbalance) > 0.4:
|
||||
# Enhanced liquidity imbalance detection with dynamic thresholds
|
||||
imbalance = abs(cob_snapshot.liquidity_imbalance)
|
||||
|
||||
# Dynamic threshold based on imbalance strength
|
||||
if imbalance > 0.8: # Very strong imbalance (>80%)
|
||||
threshold = 0.05 # 5% threshold for very strong signals
|
||||
confidence_multiplier = 3.0
|
||||
elif imbalance > 0.5: # Strong imbalance (>50%)
|
||||
threshold = 0.1 # 10% threshold for strong signals
|
||||
confidence_multiplier = 2.5
|
||||
elif imbalance > 0.3: # Moderate imbalance (>30%)
|
||||
threshold = 0.15 # 15% threshold for moderate signals
|
||||
confidence_multiplier = 2.0
|
||||
else: # Weak imbalance
|
||||
threshold = 0.2 # 20% threshold for weak signals
|
||||
confidence_multiplier = 1.5
|
||||
|
||||
# Generate signal if imbalance exceeds threshold
|
||||
if abs(cob_snapshot.liquidity_imbalance) > threshold:
|
||||
signal = {
|
||||
'timestamp': cob_snapshot.timestamp.isoformat(),
|
||||
'type': 'liquidity_imbalance',
|
||||
'side': 'buy' if cob_snapshot.liquidity_imbalance > 0 else 'sell',
|
||||
'strength': abs(cob_snapshot.liquidity_imbalance),
|
||||
'confidence': min(1.0, abs(cob_snapshot.liquidity_imbalance) * 2)
|
||||
'confidence': min(1.0, abs(cob_snapshot.liquidity_imbalance) * confidence_multiplier),
|
||||
'threshold_used': threshold,
|
||||
'signal_strength': 'very_strong' if imbalance > 0.8 else 'strong' if imbalance > 0.5 else 'moderate' if imbalance > 0.3 else 'weak'
|
||||
}
|
||||
self.cob_signals[symbol].append(signal)
|
||||
logger.info(f"COB SIGNAL: {symbol} {signal['side'].upper()} signal generated - imbalance: {cob_snapshot.liquidity_imbalance:.3f}, confidence: {signal['confidence']:.3f}")
|
||||
|
||||
# Cleanup old signals
|
||||
self.cob_signals[symbol] = self.cob_signals[symbol][-100:]
|
||||
|
@ -16,13 +16,13 @@ import time
|
||||
import threading
|
||||
import numpy as np
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||||
from dataclasses import dataclass, field
|
||||
from collections import deque
|
||||
|
||||
from .config import get_config
|
||||
from .data_provider import DataProvider
|
||||
from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface
|
||||
from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface, ModelRegistry
|
||||
|
||||
# Import COB integration for real-time market microstructure data
|
||||
try:
|
||||
@ -45,7 +45,7 @@ class Prediction:
|
||||
timeframe: str # Timeframe this prediction is for
|
||||
timestamp: datetime
|
||||
model_name: str # Name of the model that made this prediction
|
||||
metadata: Dict[str, Any] = None # Additional model-specific data
|
||||
metadata: Optional[Dict[str, Any]] = None # Additional model-specific data
|
||||
|
||||
@dataclass
|
||||
class TradingDecision:
|
||||
@ -62,10 +62,10 @@ class TradingOrchestrator:
|
||||
"""
|
||||
Enhanced Trading Orchestrator with full ML and COB integration
|
||||
Coordinates CNN, DQN, and COB models for advanced trading decisions
|
||||
Features real-time COB (Change of Bid) integration for market microstructure data
|
||||
Features real-time COB (Change of Bid) data for market microstructure data
|
||||
"""
|
||||
|
||||
def __init__(self, data_provider: DataProvider = None, enhanced_rl_training: bool = True, model_registry: Dict = None):
|
||||
def __init__(self, data_provider: Optional[DataProvider] = None, enhanced_rl_training: bool = True, model_registry: Optional[ModelRegistry] = None):
|
||||
"""Initialize the enhanced orchestrator with full ML capabilities"""
|
||||
self.config = get_config()
|
||||
self.data_provider = data_provider or DataProvider()
|
||||
@ -79,18 +79,18 @@ class TradingOrchestrator:
|
||||
self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols
|
||||
|
||||
# Dynamic weights (will be adapted based on performance)
|
||||
self.model_weights = {} # {model_name: weight}
|
||||
self.model_weights: Dict[str, float] = {} # {model_name: weight}
|
||||
self._initialize_default_weights()
|
||||
|
||||
# State tracking
|
||||
self.last_decision_time = {} # {symbol: datetime}
|
||||
self.recent_decisions = {} # {symbol: List[TradingDecision]}
|
||||
self.model_performance = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}}
|
||||
self.last_decision_time: Dict[str, datetime] = {} # {symbol: datetime}
|
||||
self.recent_decisions: Dict[str, List[TradingDecision]] = {} # {symbol: List[TradingDecision]}
|
||||
self.model_performance: Dict[str, Dict[str, Any]] = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}}
|
||||
|
||||
# Model prediction tracking for dashboard visualization
|
||||
self.recent_dqn_predictions = {} # {symbol: List[Dict]} - Recent DQN predictions
|
||||
self.recent_cnn_predictions = {} # {symbol: List[Dict]} - Recent CNN predictions
|
||||
self.prediction_accuracy_history = {} # {symbol: List[Dict]} - Prediction accuracy tracking
|
||||
self.recent_dqn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent DQN predictions
|
||||
self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions
|
||||
self.prediction_accuracy_history: Dict[str, deque] = {} # {symbol: List[Dict]} - Prediction accuracy tracking
|
||||
|
||||
# Initialize prediction tracking for each symbol
|
||||
for symbol in self.symbols:
|
||||
@ -99,39 +99,45 @@ class TradingOrchestrator:
|
||||
self.prediction_accuracy_history[symbol] = deque(maxlen=200)
|
||||
|
||||
# Decision callbacks
|
||||
self.decision_callbacks = []
|
||||
self.decision_callbacks: List[Any] = []
|
||||
|
||||
# ENHANCED: Decision Fusion System - Built into orchestrator (no separate file needed!)
|
||||
self.decision_fusion_enabled = True
|
||||
self.decision_fusion_network = None
|
||||
self.fusion_training_history = []
|
||||
self.last_fusion_inputs = {}
|
||||
self.fusion_checkpoint_frequency = 50 # Save every 50 decisions
|
||||
self.fusion_decisions_count = 0
|
||||
self.fusion_training_data = [] # Store training examples for decision model
|
||||
self.decision_fusion_enabled: bool = True
|
||||
self.decision_fusion_network: Any = None
|
||||
self.fusion_training_history: List[Any] = []
|
||||
self.last_fusion_inputs: Dict[str, Any] = {} # Fix: Explicitly initialize as dictionary
|
||||
self.fusion_checkpoint_frequency: int = 50 # Save every 50 decisions
|
||||
self.fusion_decisions_count: int = 0
|
||||
self.fusion_training_data: List[Any] = [] # Store training examples for decision model
|
||||
|
||||
# COB Integration - Real-time market microstructure data
|
||||
self.cob_integration = None
|
||||
self.cob_integration: Optional[COBIntegration] = None # Fix: Use Optional for COBIntegration
|
||||
self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot}
|
||||
self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features
|
||||
self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features
|
||||
self.cob_feature_history: Dict[str, List] = {symbol: [] for symbol in self.symbols} # Rolling history for models
|
||||
self.cob_feature_history: Dict[str, List[Any]] = {symbol: [] for symbol in self.symbols} # Rolling history for models
|
||||
|
||||
# Enhanced ML Models
|
||||
self.rl_agent = None # DQN Agent
|
||||
self.cnn_model = None # CNN Model for pattern recognition
|
||||
self.extrema_trainer = None # Extrema/pivot trainer
|
||||
self.rl_agent: Any = None # DQN Agent
|
||||
self.cnn_model: Any = None # CNN Model for pattern recognition
|
||||
self.extrema_trainer: Any = None # Extrema/pivot trainer
|
||||
self.primary_transformer: Any = None # Transformer model
|
||||
self.primary_transformer_trainer: Any = None # Transformer model trainer
|
||||
self.transformer_checkpoint_info: Dict[str, Any] = {} # Transformer checkpoint info
|
||||
self.cob_rl_agent: Any = None # COB RL Agent
|
||||
self.decision_model: Any = None # Decision Fusion model
|
||||
|
||||
self.latest_cnn_features: Dict[str, Any] = {} # CNN hidden features
|
||||
self.latest_cnn_predictions: Dict[str, Any] = {} # CNN predictions
|
||||
|
||||
# Enhanced RL features
|
||||
self.sensitivity_learning_queue = [] # For outcome-based learning
|
||||
self.perfect_move_buffer = [] # Buffer for perfect move analysis
|
||||
self.position_status = {} # Current positions
|
||||
self.sensitivity_learning_queue: List[Any] = [] # For outcome-based learning
|
||||
self.perfect_move_buffer: List[Any] = [] # Buffer for perfect move analysis
|
||||
self.position_status: Dict[str, Any] = {} # Current positions
|
||||
|
||||
# Real-time processing
|
||||
self.realtime_processing = False
|
||||
self.realtime_tasks = []
|
||||
self.realtime_processing: bool = False
|
||||
self.realtime_tasks: List[Any] = []
|
||||
|
||||
logger.info("Enhanced TradingOrchestrator initialized with full ML capabilities")
|
||||
logger.info(f"Enhanced RL training: {enhanced_rl_training}")
|
||||
@ -310,9 +316,10 @@ class TradingOrchestrator:
|
||||
self.cob_integration = COBIntegration(symbols=self.symbols)
|
||||
|
||||
# Register callbacks to receive real-time COB data
|
||||
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
|
||||
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
|
||||
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
|
||||
if self.cob_integration:
|
||||
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
|
||||
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
|
||||
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
|
||||
|
||||
# Initialize 5-minute COB data matrix system
|
||||
self.cob_matrix_duration = 300 # 5 minutes in seconds
|
||||
@ -320,9 +327,9 @@ class TradingOrchestrator:
|
||||
self.cob_matrix_size = self.cob_matrix_duration // self.cob_matrix_resolution # 300 samples
|
||||
|
||||
# COB data matrix storage - 5 minutes of 1-second snapshots
|
||||
self.cob_data_matrix: Dict[str, deque] = {}
|
||||
self.cob_feature_matrix: Dict[str, deque] = {}
|
||||
self.cob_state_matrix: Dict[str, deque] = {}
|
||||
self.cob_data_matrix: Dict[str, deque[Any]] = {}
|
||||
self.cob_feature_matrix: Dict[str, deque[Any]] = {}
|
||||
self.cob_state_matrix: Dict[str, deque[Any]] = {}
|
||||
|
||||
# Initialize matrix storage for each symbol
|
||||
for symbol in self.symbols:
|
||||
@ -336,16 +343,16 @@ class TradingOrchestrator:
|
||||
self.cob_state_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
|
||||
|
||||
# Initialize COB data storage (legacy support)
|
||||
self.latest_cob_snapshots = {}
|
||||
self.cob_feature_cache = {}
|
||||
self.cob_state_cache = {}
|
||||
self.latest_cob_snapshots: Dict[str, Any] = {}
|
||||
self.cob_feature_cache: Dict[str, Any] = {}
|
||||
self.cob_state_cache: Dict[str, Any] = {}
|
||||
|
||||
# COB matrix update tracking
|
||||
self.last_cob_matrix_update = {}
|
||||
self.last_cob_matrix_update: Dict[str, float] = {}
|
||||
self.cob_matrix_update_interval = 1.0 # Update every 1 second
|
||||
|
||||
# COB matrix statistics
|
||||
self.cob_matrix_stats = {
|
||||
self.cob_matrix_stats: Dict[str, Any] = {
|
||||
'total_updates': 0,
|
||||
'matrix_fills': {symbol: 0 for symbol in self.symbols},
|
||||
'feature_generations': 0,
|
||||
@ -375,7 +382,8 @@ class TradingOrchestrator:
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
async def cob_main():
|
||||
await self.cob_integration.start()
|
||||
if self.cob_integration: # Additional check
|
||||
await self.cob_integration.start()
|
||||
# Keep running until stopped
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
@ -52,7 +52,7 @@ def main():
|
||||
|
||||
# Run the dashboard
|
||||
logger.info("Starting templated dashboard server...")
|
||||
dashboard.run_server(host='127.0.0.1', port=8051, debug=False)
|
||||
dashboard.run_server(host='127.0.0.1', port=8052, debug=False)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error running templated dashboard: {e}")
|
||||
|
@ -12,6 +12,7 @@ from typing import Dict, List, Optional, Tuple, Any
|
||||
from dataclasses import dataclass, asdict
|
||||
from collections import defaultdict
|
||||
import torch
|
||||
import random
|
||||
|
||||
try:
|
||||
import wandb
|
||||
@ -150,36 +151,80 @@ class CheckpointManager:
|
||||
return None
|
||||
|
||||
def _calculate_performance_score(self, metrics: Dict[str, float]) -> float:
|
||||
"""Calculate performance score with improved sensitivity for training models"""
|
||||
score = 0.0
|
||||
|
||||
if 'accuracy' in metrics:
|
||||
score += metrics['accuracy'] * 100
|
||||
if 'val_accuracy' in metrics:
|
||||
score += metrics['val_accuracy'] * 100
|
||||
# Prioritize loss reduction for active training models
|
||||
if 'loss' in metrics:
|
||||
score += max(0, 10 - metrics['loss'])
|
||||
if 'val_loss' in metrics:
|
||||
score += max(0, 10 - metrics['val_loss'])
|
||||
if 'reward' in metrics:
|
||||
score += metrics['reward']
|
||||
if 'pnl' in metrics:
|
||||
score += metrics['pnl']
|
||||
# Invert loss so lower loss = higher score, with better scaling
|
||||
loss_value = metrics['loss']
|
||||
if loss_value > 0:
|
||||
score += max(0, 100 / (1 + loss_value)) # More sensitive to loss changes
|
||||
else:
|
||||
score += 100 # Perfect loss
|
||||
|
||||
# Add other metrics with appropriate weights
|
||||
if 'accuracy' in metrics:
|
||||
score += metrics['accuracy'] * 50 # Reduced weight to balance with loss
|
||||
if 'val_accuracy' in metrics:
|
||||
score += metrics['val_accuracy'] * 50
|
||||
if 'val_loss' in metrics:
|
||||
val_loss = metrics['val_loss']
|
||||
if val_loss > 0:
|
||||
score += max(0, 50 / (1 + val_loss))
|
||||
if 'reward' in metrics:
|
||||
score += metrics['reward'] * 10
|
||||
if 'pnl' in metrics:
|
||||
score += metrics['pnl'] * 5
|
||||
if 'training_samples' in metrics:
|
||||
# Bonus for processing more training samples
|
||||
score += min(10, metrics['training_samples'] / 10)
|
||||
|
||||
# Ensure minimum score for any training activity
|
||||
if score == 0.0 and metrics:
|
||||
# Use the first available metric with better scaling
|
||||
first_metric = next(iter(metrics.values()))
|
||||
score = first_metric if first_metric > 0 else 0.1
|
||||
if first_metric > 0:
|
||||
score = max(0.1, min(10, first_metric))
|
||||
else:
|
||||
score = 0.1
|
||||
|
||||
return max(score, 0.1)
|
||||
|
||||
def _should_save_checkpoint(self, model_name: str, performance_score: float) -> bool:
|
||||
"""Improved checkpoint saving logic with more frequent saves during training"""
|
||||
if model_name not in self.checkpoints or not self.checkpoints[model_name]:
|
||||
return True
|
||||
return True # Always save first checkpoint
|
||||
|
||||
# Allow more checkpoints during active training
|
||||
if len(self.checkpoints[model_name]) < self.max_checkpoints:
|
||||
return True
|
||||
|
||||
worst_score = min(cp.performance_score for cp in self.checkpoints[model_name])
|
||||
return performance_score > worst_score
|
||||
# Get current best and worst scores
|
||||
scores = [cp.performance_score for cp in self.checkpoints[model_name]]
|
||||
best_score = max(scores)
|
||||
worst_score = min(scores)
|
||||
|
||||
# Save if better than worst (more frequent saves)
|
||||
if performance_score > worst_score:
|
||||
return True
|
||||
|
||||
# For high-performing models (score > 100), be more sensitive to small improvements
|
||||
if best_score > 100:
|
||||
# Save if within 0.1% of best score (very sensitive for converged models)
|
||||
if performance_score >= best_score * 0.999:
|
||||
return True
|
||||
else:
|
||||
# Also save if we're within 10% of best score (capture near-optimal models)
|
||||
if performance_score >= best_score * 0.9:
|
||||
return True
|
||||
|
||||
# Save more frequently during active training (every 5th attempt instead of 10th)
|
||||
if random.random() < 0.2: # 20% chance to save anyway
|
||||
logger.info(f"Saving checkpoint for {model_name} - periodic save during active training")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _save_model_file(self, model, file_path: Path, model_type: str) -> bool:
|
||||
try:
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -59,10 +59,19 @@ class DashboardComponentManager:
|
||||
action_color = "text-success" if action == "BUY" else "text-danger"
|
||||
manual_indicator = " [M]" if manual else ""
|
||||
|
||||
# Highlight COB signals
|
||||
cob_indicator = ""
|
||||
if hasattr(decision, 'type') and getattr(decision, 'type', '') == 'cob_liquidity_imbalance':
|
||||
cob_indicator = " [COB]"
|
||||
badge_class = "bg-info" # Use blue for COB signals
|
||||
elif isinstance(decision, dict) and decision.get('type') == 'cob_liquidity_imbalance':
|
||||
cob_indicator = " [COB]"
|
||||
badge_class = "bg-info" # Use blue for COB signals
|
||||
|
||||
signal_div = html.Div([
|
||||
html.Span(f"{timestamp}", className="small text-muted me-2"),
|
||||
html.Span(f"{status}", className=f"badge {badge_class} me-2"),
|
||||
html.Span(f"{action}{manual_indicator}", className=f"{action_color} fw-bold me-2"),
|
||||
html.Span(f"{action}{manual_indicator}{cob_indicator}", className=f"{action_color} fw-bold me-2"),
|
||||
html.Span(f"({confidence:.1f}%)", className="small text-muted me-2"),
|
||||
html.Span(f"${price:.2f}", className="small")
|
||||
], className="mb-1")
|
||||
@ -349,7 +358,8 @@ class DashboardComponentManager:
|
||||
|
||||
def _create_cob_ladder_panel(self, bids, asks, mid_price, symbol=""):
|
||||
"""Creates the right panel with the compact COB ladder."""
|
||||
bucket_size = 10
|
||||
# Use symbol-specific bucket sizes: ETH = $1, BTC = $10
|
||||
bucket_size = 1.0 if "ETH" in symbol else 10.0
|
||||
num_levels = 5
|
||||
|
||||
def aggregate_buckets(orders):
|
||||
@ -667,15 +677,31 @@ class DashboardComponentManager:
|
||||
|
||||
# Model metrics
|
||||
html.Div([
|
||||
# Last prediction
|
||||
# Last prediction with enhanced details
|
||||
html.Div([
|
||||
html.Span("Last: ", className="text-muted small"),
|
||||
html.Span(f"{pred_action}",
|
||||
className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-muted'}"),
|
||||
className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-warning' if 'PREDICTION' in pred_action else 'text-info'}"),
|
||||
html.Span(f" ({pred_confidence:.1f}%)", className="text-muted small"),
|
||||
html.Span(f" @ {pred_time}", className="text-muted small")
|
||||
], className="mb-1"),
|
||||
|
||||
# Additional prediction details if available
|
||||
*([
|
||||
html.Div([
|
||||
html.Span("Price: ", className="text-muted small"),
|
||||
html.Span(f"${last_prediction.get('predicted_price', 0):.2f}", className="text-warning small fw-bold")
|
||||
], className="mb-1")
|
||||
] if last_prediction.get('predicted_price', 0) > 0 else []),
|
||||
|
||||
*([
|
||||
html.Div([
|
||||
html.Span("Change: ", className="text-muted small"),
|
||||
html.Span(f"{last_prediction.get('price_change', 0):+.2f}%",
|
||||
className=f"small fw-bold {'text-success' if last_prediction.get('price_change', 0) > 0 else 'text-danger'}")
|
||||
], className="mb-1")
|
||||
] if last_prediction.get('price_change', 0) != 0 else []),
|
||||
|
||||
# Timing information (NEW)
|
||||
html.Div([
|
||||
html.Span("Timing: ", className="text-muted small"),
|
||||
|
@ -149,6 +149,10 @@ class DashboardLayoutManager:
|
||||
html.I(className="fas fa-trash me-1"),
|
||||
"Clear Session"
|
||||
], id="clear-session-btn", className="btn btn-warning btn-sm w-100"),
|
||||
html.Button([
|
||||
html.I(className="fas fa-save me-1"),
|
||||
"Store All Models"
|
||||
], id="store-models-btn", className="btn btn-info btn-sm w-100 mt-2"),
|
||||
html.Hr(className="my-2"),
|
||||
html.Small("System Status", className="text-muted d-block mb-1"),
|
||||
html.Div([
|
||||
|
@ -5,8 +5,7 @@ Handles HTML template rendering with Jinja2
|
||||
import os
|
||||
from typing import Dict, Any
|
||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||
import dash_html_components as html
|
||||
from dash import dcc
|
||||
from dash import html, dcc
|
||||
import plotly.graph_objects as go
|
||||
|
||||
from .dashboard_model import DashboardModel, DashboardDataBuilder
|
||||
|
@ -3,20 +3,33 @@ Template-based Trading Dashboard
|
||||
Uses MVC architecture with HTML templates and data models
|
||||
"""
|
||||
import logging
|
||||
from typing import Optional, Any, Dict, List
|
||||
from datetime import datetime
|
||||
import sys
|
||||
import os
|
||||
from typing import Optional, Any, Dict, List, Deque
|
||||
from datetime import datetime, timedelta
|
||||
import pandas as pd
|
||||
import pytz
|
||||
import time
|
||||
import threading
|
||||
from collections import deque
|
||||
from dataclasses import asdict
|
||||
|
||||
import dash
|
||||
from dash import dcc, html, Input, Output, State, callback_context
|
||||
import plotly.graph_objects as go
|
||||
import plotly.express as px
|
||||
|
||||
from .dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
|
||||
from .template_renderer import DashboardTemplateRenderer
|
||||
from core.data_provider import DataProvider
|
||||
from core.orchestrator import TradingOrchestrator
|
||||
from core.trading_executor import TradingExecutor
|
||||
from core.config import get_config
|
||||
from core.unified_data_stream import UnifiedDataStream
|
||||
from web.dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
|
||||
from web.template_renderer import DashboardTemplateRenderer
|
||||
from web.component_manager import DashboardComponentManager
|
||||
from web.layout_manager import DashboardLayoutManager
|
||||
from utils.checkpoint_manager import save_checkpoint, load_best_checkpoint
|
||||
from NN.models.advanced_transformer_trading import create_trading_transformer, TradingTransformerConfig
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -29,29 +42,125 @@ class TemplatedTradingDashboard:
|
||||
orchestrator: Optional[TradingOrchestrator] = None,
|
||||
trading_executor: Optional[TradingExecutor] = None):
|
||||
"""Initialize the templated dashboard"""
|
||||
self.data_provider = data_provider
|
||||
self.orchestrator = orchestrator
|
||||
self.trading_executor = trading_executor
|
||||
self.config = get_config()
|
||||
|
||||
# Initialize components
|
||||
self.data_provider = data_provider or DataProvider()
|
||||
self.trading_executor = trading_executor or TradingExecutor()
|
||||
|
||||
# Initialize template renderer
|
||||
self.renderer = DashboardTemplateRenderer()
|
||||
|
||||
# Initialize Dash app
|
||||
# Initialize unified orchestrator with full ML capabilities
|
||||
if orchestrator is None:
|
||||
self.orchestrator = TradingOrchestrator(
|
||||
data_provider=self.data_provider,
|
||||
enhanced_rl_training=True,
|
||||
model_registry={}
|
||||
)
|
||||
logger.info("TEMPLATED DASHBOARD: Using unified Trading Orchestrator with full ML capabilities")
|
||||
else:
|
||||
self.orchestrator = orchestrator
|
||||
|
||||
# Initialize enhanced training system for predictions
|
||||
self.training_system = None
|
||||
self._initialize_enhanced_training_system()
|
||||
|
||||
# Initialize layout and component managers
|
||||
self.layout_manager = DashboardLayoutManager(
|
||||
starting_balance=self._get_initial_balance(),
|
||||
trading_executor=self.trading_executor
|
||||
)
|
||||
self.component_manager = DashboardComponentManager()
|
||||
|
||||
# Initialize Universal Data Stream for the 5 timeseries architecture
|
||||
self.unified_stream = UnifiedDataStream(self.data_provider, self.orchestrator)
|
||||
self.stream_consumer_id = self.unified_stream.register_consumer(
|
||||
consumer_name="TemplatedTradingDashboard",
|
||||
callback=self._handle_unified_stream_data,
|
||||
data_types=['ticks', 'ohlcv', 'training_data', 'ui_data']
|
||||
)
|
||||
logger.info(f"TEMPLATED DASHBOARD: Universal Data Stream initialized with consumer ID: {self.stream_consumer_id}")
|
||||
logger.info("TEMPLATED DASHBOARD: Subscribed to Universal 5 Timeseries: ETH(ticks,1m,1h,1d) + BTC(ticks)")
|
||||
|
||||
# Dashboard state
|
||||
self.recent_decisions: list = []
|
||||
self.closed_trades: list = []
|
||||
self.current_prices: dict = {}
|
||||
self.session_pnl = 0.0
|
||||
self.total_fees = 0.0
|
||||
self.current_position: Optional[float] = 0.0
|
||||
self.session_trades: list = []
|
||||
|
||||
# Model control toggles - separate inference and training
|
||||
self.dqn_inference_enabled = True # Default: enabled
|
||||
self.dqn_training_enabled = True # Default: enabled
|
||||
self.cnn_inference_enabled = True
|
||||
self.cnn_training_enabled = True
|
||||
|
||||
# Leverage management - adjustable x1 to x100
|
||||
self.current_leverage = 50 # Default x50 leverage
|
||||
self.min_leverage = 1
|
||||
self.max_leverage = 100
|
||||
self.pending_trade_case_id = None # For tracking opening trades until closure
|
||||
|
||||
# WebSocket streaming
|
||||
self.ws_price_cache: dict = {}
|
||||
self.is_streaming = False
|
||||
self.tick_cache: list = []
|
||||
|
||||
# COB data cache - enhanced with price buckets and memory system
|
||||
self.cob_cache: dict = {
|
||||
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0},
|
||||
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
|
||||
}
|
||||
self.latest_cob_data: dict = {} # Cache for COB integration data
|
||||
self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display)
|
||||
|
||||
# COB High-frequency data handling (50-100 updates/sec)
|
||||
self.cob_data_buffer: dict = {} # Buffer for high-freq data
|
||||
self.cob_memory: dict = {} # Memory system like GPT - keeps last N snapshots
|
||||
self.cob_price_buckets: dict = {} # Price bucket cache
|
||||
self.cob_update_count = 0
|
||||
self.last_cob_broadcast: Dict[str, Optional[float]] = {'ETH/USDT': None, 'BTC/USDT': None} # Rate limiting for UI updates, updated type
|
||||
self.cob_data_history: Dict[str, Deque[Any]] = {
|
||||
'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots
|
||||
'BTC/USDT': deque(maxlen=61)
|
||||
}
|
||||
|
||||
# Initialize timezone
|
||||
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
|
||||
self.timezone = pytz.timezone(timezone_name)
|
||||
|
||||
# Create Dash app
|
||||
self.app = dash.Dash(__name__, external_stylesheets=[
|
||||
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css'
|
||||
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
|
||||
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
|
||||
])
|
||||
|
||||
# Session data
|
||||
self.session_start_time = datetime.now()
|
||||
self.session_trades = []
|
||||
self.session_pnl = 0.0
|
||||
self.current_position = 0.0
|
||||
# Suppress Dash development mode logging
|
||||
self.app.enable_dev_tools(debug=False, dev_tools_silence_routes_logging=True)
|
||||
|
||||
# Setup layout and callbacks
|
||||
self._setup_layout()
|
||||
self._setup_callbacks()
|
||||
|
||||
logger.info("TEMPLATED DASHBOARD: Initialized with MVC architecture")
|
||||
# Start data streams
|
||||
self._initialize_streaming()
|
||||
|
||||
# Connect to orchestrator for real trading signals
|
||||
self._connect_to_orchestrator()
|
||||
|
||||
# Initialize COB integration with high-frequency data handling
|
||||
self._initialize_cob_integration()
|
||||
|
||||
# Start signal generation loop to ensure continuous trading signals
|
||||
self._start_signal_generation_loop()
|
||||
|
||||
# Start training sessions if models are showing FRESH status
|
||||
threading.Thread(target=self._delayed_training_check, daemon=True).start()
|
||||
|
||||
logger.info("TEMPLATED DASHBOARD: Initialized with HIGH-FREQUENCY COB integration and signal generation")
|
||||
|
||||
def _setup_layout(self):
|
||||
"""Setup the dashboard layout using templates"""
|
||||
@ -61,69 +170,20 @@ class TemplatedTradingDashboard:
|
||||
# Render layout using template
|
||||
layout = self.renderer.render_dashboard(dashboard_data)
|
||||
|
||||
# Add custom CSS
|
||||
layout.children.insert(0, self._get_custom_css())
|
||||
# Custom CSS will be handled via external stylesheets
|
||||
|
||||
self.app.layout = layout
|
||||
|
||||
def _get_custom_css(self) -> html.Style:
|
||||
"""Get custom CSS styles"""
|
||||
return html.Style(children="""
|
||||
.metric-card {
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
color: white;
|
||||
border-radius: 10px;
|
||||
padding: 15px;
|
||||
margin-bottom: 10px;
|
||||
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
|
||||
}
|
||||
.metric-value {
|
||||
font-size: 1.5rem;
|
||||
font-weight: bold;
|
||||
}
|
||||
.metric-label {
|
||||
font-size: 0.9rem;
|
||||
opacity: 0.9;
|
||||
}
|
||||
.cob-ladder {
|
||||
max-height: 400px;
|
||||
overflow-y: auto;
|
||||
font-family: 'Courier New', monospace;
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
.bid-row {
|
||||
background-color: rgba(40, 167, 69, 0.1);
|
||||
border-left: 3px solid #28a745;
|
||||
}
|
||||
.ask-row {
|
||||
background-color: rgba(220, 53, 69, 0.1);
|
||||
border-left: 3px solid #dc3545;
|
||||
}
|
||||
.training-panel {
|
||||
background: #f8f9fa;
|
||||
border-radius: 8px;
|
||||
padding: 15px;
|
||||
height: 300px;
|
||||
overflow-y: auto;
|
||||
}
|
||||
.model-status {
|
||||
padding: 8px 12px;
|
||||
border-radius: 20px;
|
||||
font-size: 0.8rem;
|
||||
font-weight: bold;
|
||||
margin: 2px;
|
||||
display: inline-block;
|
||||
}
|
||||
.status-training { background-color: #28a745; color: white; }
|
||||
.status-idle { background-color: #6c757d; color: white; }
|
||||
.status-loading { background-color: #ffc107; color: black; }
|
||||
.closed-trades {
|
||||
max-height: 200px;
|
||||
overflow-y: auto;
|
||||
}
|
||||
.trade-profit { color: #28a745; font-weight: bold; }
|
||||
.trade-loss { color: #dc3545; font-weight: bold; }
|
||||
""")
|
||||
def _get_initial_balance(self) -> float:
|
||||
"""Get initial balance from trading executor or default"""
|
||||
try:
|
||||
if self.trading_executor and hasattr(self.trading_executor, 'starting_balance'):
|
||||
balance = getattr(self.trading_executor, 'starting_balance', None)
|
||||
if balance and balance > 0:
|
||||
return balance
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting balance: {e}")
|
||||
return 100.0 # Default balance
|
||||
|
||||
def _setup_callbacks(self):
|
||||
"""Setup dashboard callbacks"""
|
||||
@ -220,7 +280,8 @@ class TemplatedTradingDashboard:
|
||||
def update_closed_trades(n):
|
||||
"""Update closed trades table"""
|
||||
try:
|
||||
return self._render_closed_trades()
|
||||
# Return the table wrapped in a Div
|
||||
return html.Div(self._render_closed_trades())
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating closed trades: {e}")
|
||||
return html.Div("No trades")
|
||||
@ -523,44 +584,32 @@ class TemplatedTradingDashboard:
|
||||
])
|
||||
])
|
||||
|
||||
def _render_closed_trades(self) -> html.Table:
|
||||
def _render_closed_trades(self) -> html.Div:
|
||||
"""Render closed trades table"""
|
||||
return html.Table([
|
||||
html.Thead([
|
||||
html.Tr([
|
||||
html.Th("Time"),
|
||||
html.Th("Symbol"),
|
||||
html.Th("Side"),
|
||||
html.Th("Size"),
|
||||
html.Th("Entry"),
|
||||
html.Th("Exit"),
|
||||
html.Th("PnL"),
|
||||
html.Th("Duration")
|
||||
])
|
||||
]),
|
||||
html.Tbody([
|
||||
html.Tr([
|
||||
html.Td("14:23:45"),
|
||||
html.Td("ETH/USDT"),
|
||||
html.Td([html.Span("BUY", className="badge bg-success")]),
|
||||
html.Td("1.5"),
|
||||
html.Td("$3420.45"),
|
||||
html.Td("$3428.12"),
|
||||
html.Td("$11.51", className="trade-profit"),
|
||||
html.Td("2m 34s")
|
||||
]),
|
||||
html.Tr([
|
||||
html.Td("14:21:12"),
|
||||
html.Td("BTC/USDT"),
|
||||
html.Td([html.Span("SELL", className="badge bg-danger")]),
|
||||
html.Td("0.1"),
|
||||
html.Td("$45150.23"),
|
||||
html.Td("$45142.67"),
|
||||
html.Td("-$0.76", className="trade-loss"),
|
||||
html.Td("1m 12s")
|
||||
])
|
||||
])
|
||||
], className="table table-sm")
|
||||
if not self.closed_trades:
|
||||
return html.Div("No closed trades yet.", className="alert alert-info mt-3")
|
||||
|
||||
# Create a DataFrame from closed trades
|
||||
df_trades = pd.DataFrame(self.closed_trades)
|
||||
|
||||
# Format columns for display
|
||||
df_trades['timestamp'] = pd.to_datetime(df_trades['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
df_trades['entry_price'] = df_trades['entry_price'].apply(lambda x: f"${x:,.2f}")
|
||||
df_trades['exit_price'] = df_trades['exit_price'].apply(lambda x: f"${x:,.2f}")
|
||||
df_trades['pnl'] = df_trades['pnl'].apply(lambda x: f"${x:,.2f}")
|
||||
df_trades['profit_percentage'] = df_trades['profit_percentage'].apply(lambda x: f"{x:,.2f}%")
|
||||
df_trades['size'] = df_trades['size'].apply(lambda x: f"{x:,.4f}")
|
||||
df_trades['fees'] = df_trades['fees'].apply(lambda x: f"${x:,.2f}")
|
||||
|
||||
table_header = [html.Thead(html.Tr([html.Th(col) for col in df_trades.columns]))]
|
||||
table_body = [html.Tbody([
|
||||
html.Tr([html.Td(df_trades.iloc[i][col]) for col in df_trades.columns]) for i in range(len(df_trades))
|
||||
])]
|
||||
|
||||
return html.Div(
|
||||
html.Table(table_header + table_body, className="table table-striped table-hover table-sm"),
|
||||
className="table-responsive"
|
||||
)
|
||||
|
||||
def _execute_manual_trade(self, action: str):
|
||||
"""Execute manual trade"""
|
||||
@ -585,11 +634,587 @@ class TemplatedTradingDashboard:
|
||||
self.session_start_time = datetime.now()
|
||||
logger.info("SESSION: Cleared")
|
||||
|
||||
def run_server(self, host='127.0.0.1', port=8051, debug=False):
|
||||
def run_server(self, host='127.0.0.1', port=8052, debug=False):
|
||||
"""Run the dashboard server"""
|
||||
logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}")
|
||||
self.app.run_server(host=host, port=port, debug=debug)
|
||||
self.app.run(host=host, port=port, debug=debug)
|
||||
|
||||
def _handle_unified_stream_data(self, data):
|
||||
"""Placeholder for unified stream data handling."""
|
||||
logger.debug(f"Received data from unified stream: {data}")
|
||||
|
||||
def _delayed_training_check(self):
|
||||
"""Check and start training after a delay to allow initialization"""
|
||||
try:
|
||||
time.sleep(10) # Wait 10 seconds for initialization
|
||||
logger.info("Checking if models need training activation...")
|
||||
self._start_actual_training_if_needed()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in delayed training check: {e}")
|
||||
|
||||
def _initialize_enhanced_training_system(self):
|
||||
"""Initialize enhanced training system for model predictions"""
|
||||
try:
|
||||
# Try to import and initialize enhanced training system
|
||||
from enhanced_realtime_training import EnhancedRealtimeTrainingSystem
|
||||
|
||||
self.training_system = EnhancedRealtimeTrainingSystem(
|
||||
orchestrator=self.orchestrator,
|
||||
data_provider=self.data_provider,
|
||||
dashboard=self
|
||||
)
|
||||
|
||||
# Initialize prediction storage
|
||||
if not hasattr(self.orchestrator, 'recent_dqn_predictions'):
|
||||
self.orchestrator.recent_dqn_predictions = {}
|
||||
if not hasattr(self.orchestrator, 'recent_cnn_predictions'):
|
||||
self.orchestrator.recent_cnn_predictions = {}
|
||||
|
||||
logger.info("TEMPLATED DASHBOARD: Enhanced training system initialized for model predictions")
|
||||
|
||||
except ImportError:
|
||||
logger.warning("TEMPLATED DASHBOARD: Enhanced training system not available - using mock predictions")
|
||||
self.training_system = None
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error initializing enhanced training system: {e}")
|
||||
self.training_system = None
|
||||
|
||||
def _initialize_streaming(self):
|
||||
"""Initialize data streaming"""
|
||||
try:
|
||||
self._start_websocket_streaming()
|
||||
self._start_data_collection()
|
||||
logger.info("TEMPLATED DASHBOARD: Data streaming initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error initializing streaming: {e}")
|
||||
|
||||
def _start_websocket_streaming(self):
|
||||
"""Start WebSocket streaming for real-time data."""
|
||||
ws_thread = threading.Thread(target=self._ws_worker, daemon=True)
|
||||
ws_thread.start()
|
||||
|
||||
def _ws_worker(self):
|
||||
try:
|
||||
import websocket
|
||||
import json # Added import
|
||||
def on_message(ws, message):
|
||||
try:
|
||||
data = json.loads(message)
|
||||
if 'k' in data:
|
||||
kline = data['k']
|
||||
tick_record = {
|
||||
'symbol': 'ETHUSDT',
|
||||
'datetime': datetime.fromtimestamp(int(kline['t']) / 1000),
|
||||
'open': float(kline['o']),
|
||||
'high': float(kline['h']),
|
||||
'low': float(kline['l']),
|
||||
'close': float(kline['c']),
|
||||
'price': float(kline['c']),
|
||||
'volume': float(kline['v']),
|
||||
}
|
||||
self.ws_price_cache['ETHUSDT'] = tick_record['price']
|
||||
self.current_prices['ETH/USDT'] = tick_record['price']
|
||||
self.tick_cache.append(tick_record)
|
||||
if len(self.tick_cache) > 1000:
|
||||
self.tick_cache.pop(0)
|
||||
except Exception as e:
|
||||
logger.warning(f"TEMPLATED DASHBOARD: WebSocket message error: {e}")
|
||||
def on_error(ws, error):
|
||||
logger.error(f"TEMPLATED DASHBOARD: WebSocket error: {error}")
|
||||
self.is_streaming = False
|
||||
def on_close(ws, close_status_code, close_msg):
|
||||
logger.warning("TEMPLATED DASHBOARD: WebSocket connection closed")
|
||||
self.is_streaming = False
|
||||
def on_open(ws):
|
||||
logger.info("TEMPLATED DASHBOARD: WebSocket connected")
|
||||
self.is_streaming = True
|
||||
ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s"
|
||||
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
|
||||
ws.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: WebSocket worker error: {e}")
|
||||
self.is_streaming = False
|
||||
|
||||
def _start_data_collection(self):
|
||||
"""Start background data collection"""
|
||||
data_thread = threading.Thread(target=self._data_worker, daemon=True)
|
||||
data_thread.start()
|
||||
|
||||
def _data_worker(self):
|
||||
while True:
|
||||
try:
|
||||
self._update_session_metrics()
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
logger.warning(f"TEMPLATED DASHBOARD: Data collection error: {e}")
|
||||
time.sleep(10)
|
||||
|
||||
def _update_session_metrics(self):
|
||||
"""Update session P&L and total fees from closed trades."""
|
||||
try:
|
||||
closed_trades = []
|
||||
if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'):
|
||||
closed_trades = self.trading_executor.get_closed_trades()
|
||||
self.closed_trades = closed_trades
|
||||
if closed_trades:
|
||||
self.session_pnl = sum(trade.get('pnl', 0) for trade in closed_trades)
|
||||
self.total_fees = sum(trade.get('fees', 0) for trade in closed_trades)
|
||||
else:
|
||||
self.session_pnl = 0.0
|
||||
self.total_fees = 0.0
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error updating session metrics: {e}")
|
||||
|
||||
def _connect_to_orchestrator(self):
|
||||
"""Connect to orchestrator for real trading signals"""
|
||||
try:
|
||||
if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'):
|
||||
import asyncio # Added import
|
||||
# from dataclasses import asdict # Moved asdict to top-level import
|
||||
|
||||
def connect_worker():
|
||||
try:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
# No need to run_until_complete here, just register the callback
|
||||
self.orchestrator.add_decision_callback(self._on_trading_decision)
|
||||
logger.info("TEMPLATED DASHBOARD: Successfully connected to orchestrator for trading signals.")
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Orchestrator connection worker failed: {e}")
|
||||
thread = threading.Thread(target=connect_worker, daemon=True)
|
||||
thread.start()
|
||||
else:
|
||||
logger.warning("TEMPLATED DASHBOARD: Orchestrator not available or doesn\'t support callbacks")
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error initiating orchestrator connection: {e}")
|
||||
|
||||
async def _on_trading_decision(self, decision):
|
||||
"""Handle trading decision from orchestrator."""
|
||||
try:
|
||||
action = getattr(decision, 'action', decision.get('action'))
|
||||
if action == 'HOLD':
|
||||
return
|
||||
symbol = getattr(decision, 'symbol', decision.get('symbol', 'ETH/USDT'))
|
||||
if 'ETH' not in symbol.upper():
|
||||
return
|
||||
dashboard_decision = asdict(decision) if not isinstance(decision, dict) else decision.copy()
|
||||
dashboard_decision['timestamp'] = datetime.now()
|
||||
dashboard_decision['executed'] = False
|
||||
self.recent_decisions.append(dashboard_decision)
|
||||
if len(self.recent_decisions) > 200:
|
||||
self.recent_decisions.pop(0)
|
||||
logger.info(f"TEMPLATED DASHBOARD: [ORCHESTRATOR SIGNAL] Received: {action} for {symbol}")
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error handling trading decision: {e}")
|
||||
|
||||
def _initialize_cob_integration(self):
|
||||
"""Initialize simple COB integration that works without async event loops"""
|
||||
try:
|
||||
logger.info("TEMPLATED DASHBOARD: Initializing simple COB integration for model feeding")
|
||||
|
||||
# Initialize COB data storage
|
||||
self.cob_bucketed_data = {
|
||||
'ETH/USDT': {},
|
||||
'BTC/USDT': {}
|
||||
}
|
||||
self.cob_last_update: Dict[str, Optional[float]] = {
|
||||
'ETH/USDT': None,
|
||||
'BTC/USDT': None
|
||||
} # Corrected type hint
|
||||
|
||||
# Start simple COB data collection
|
||||
self._start_simple_cob_collection()
|
||||
|
||||
logger.info("TEMPLATED DASHBOARD: Simple COB integration initialized successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error initializing COB integration: {e}")
|
||||
self.cob_integration = None
|
||||
|
||||
def _start_simple_cob_collection(self):
|
||||
"""Start simple COB data collection using REST APIs (no async required)"""
|
||||
try:
|
||||
# threading and time already imported
|
||||
|
||||
def cob_collector():
|
||||
"""Collect COB data using simple REST API calls"""
|
||||
while True:
|
||||
try:
|
||||
# Collect data for both symbols
|
||||
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||
self._collect_simple_cob_data(symbol)
|
||||
|
||||
# Sleep for 1 second between collections
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
logger.debug(f"TEMPLATED DASHBOARD: Error in COB collection: {e}")
|
||||
time.sleep(5) # Wait longer on error
|
||||
|
||||
# Start collector in background thread
|
||||
cob_thread = threading.Thread(target=cob_collector, daemon=True)
|
||||
cob_thread.start()
|
||||
|
||||
logger.info("TEMPLATED DASHBOARD: Simple COB data collection started")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error starting COB collection: {e}")
|
||||
|
||||
def _collect_simple_cob_data(self, symbol: str):
|
||||
"""Collect simple COB data using Binance REST API"""
|
||||
try:
|
||||
import requests # Added import
|
||||
# time already imported
|
||||
|
||||
# Use Binance REST API for order book data
|
||||
binance_symbol = symbol.replace('/', '')
|
||||
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500"
|
||||
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
# Process order book data
|
||||
bids = []
|
||||
asks = []
|
||||
|
||||
# Process bids (buy orders)
|
||||
for bid in data['bids'][:100]: # Top 100 levels
|
||||
price = float(bid[0])
|
||||
size = float(bid[1])
|
||||
bids.append({
|
||||
'price': price,
|
||||
'size': size,
|
||||
'total': price * size
|
||||
})
|
||||
|
||||
# Process asks (sell orders)
|
||||
for ask in data['asks'][:100]: # Top 100 levels
|
||||
price = float(ask[0])
|
||||
size = float(ask[1])
|
||||
asks.append({
|
||||
'price': price,
|
||||
'size': size,
|
||||
'total': price * size
|
||||
})
|
||||
|
||||
# Calculate statistics
|
||||
if bids and asks:
|
||||
best_bid = max(bids, key=lambda x: x['price'])
|
||||
best_ask = min(asks, key=lambda x: x['price'])
|
||||
mid_price = (best_bid['price'] + best_ask['price']) / 2
|
||||
spread_bps = ((best_ask['price'] - best_bid['price']) / mid_price) * 10000 if mid_price > 0 else 0
|
||||
|
||||
total_bid_liquidity = sum(bid['total'] for bid in bids[:20])
|
||||
total_ask_liquidity = sum(ask['total'] for ask in asks[:20])
|
||||
total_liquidity = total_bid_liquidity + total_ask_liquidity
|
||||
imbalance = (total_bid_liquidity - total_ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
||||
|
||||
# Create COB snapshot
|
||||
cob_snapshot = {
|
||||
'symbol': symbol,
|
||||
'timestamp': time.time(),
|
||||
'bids': bids,
|
||||
'asks': asks,
|
||||
'stats': {
|
||||
'mid_price': mid_price,
|
||||
'spread_bps': spread_bps,
|
||||
'total_bid_liquidity': total_bid_liquidity,
|
||||
'total_ask_liquidity': total_ask_liquidity,
|
||||
'imbalance': imbalance,
|
||||
'exchanges_active': ['Binance']
|
||||
}
|
||||
}
|
||||
|
||||
# Store in history (keep last 15 seconds)
|
||||
self.cob_data_history[symbol].append(cob_snapshot)
|
||||
if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds
|
||||
# Use slicing to remove old elements from deque to ensure correct behavior
|
||||
while len(self.cob_data_history[symbol]) > 15:
|
||||
self.cob_data_history[symbol].popleft()
|
||||
|
||||
# Update latest data
|
||||
self.latest_cob_data[symbol] = cob_snapshot
|
||||
self.cob_last_update[symbol] = time.time()
|
||||
|
||||
# Generate bucketed data for models
|
||||
self._generate_bucketed_cob_data(symbol, cob_snapshot)
|
||||
|
||||
logger.debug(f"TEMPLATED DASHBOARD: COB data collected for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"TEMPLATED DASHBOARD: Error collecting COB data for {symbol}: {e}")
|
||||
|
||||
def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict):
|
||||
"""Generate bucketed COB data for model feeding"""
|
||||
try:
|
||||
# Create price buckets (1 basis point granularity)
|
||||
bucket_size_bps = 1.0
|
||||
mid_price = cob_snapshot['stats']['mid_price']
|
||||
|
||||
# Initialize buckets
|
||||
buckets = {}
|
||||
|
||||
# Process bids into buckets
|
||||
for bid in cob_snapshot['bids']:
|
||||
price_offset_bps = ((bid['price'] - mid_price) / mid_price) * 10000
|
||||
bucket_key = int(price_offset_bps / bucket_size_bps)
|
||||
|
||||
if bucket_key not in buckets:
|
||||
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
|
||||
|
||||
buckets[bucket_key]['bid_volume'] += bid['total']
|
||||
|
||||
# Process asks into buckets
|
||||
for ask in cob_snapshot['asks']:
|
||||
price_offset_bps = ((ask['price'] - mid_price) / mid_price) * 10000
|
||||
bucket_key = int(price_offset_bps / bucket_size_bps)
|
||||
|
||||
if bucket_key not in buckets:
|
||||
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
|
||||
|
||||
buckets[bucket_key]['ask_volume'] += ask['total']
|
||||
|
||||
# Store bucketed data
|
||||
self.cob_bucketed_data[symbol] = {
|
||||
'timestamp': cob_snapshot['timestamp'],
|
||||
'mid_price': mid_price,
|
||||
'buckets': buckets,
|
||||
'bucket_size_bps': bucket_size_bps
|
||||
}
|
||||
|
||||
# Feed to models
|
||||
self._feed_cob_data_to_models(symbol, cob_snapshot)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}")
|
||||
|
||||
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
||||
"""Calculate average imbalance over multiple time windows."""
|
||||
stats = {}
|
||||
now = time.time()
|
||||
history = self.cob_data_history.get(symbol)
|
||||
|
||||
if not history:
|
||||
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||
|
||||
periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
||||
|
||||
for name, duration in periods.items():
|
||||
recent_imbalances = []
|
||||
for snap in history:
|
||||
# Check if snap is a valid dict with timestamp and stats
|
||||
if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']:
|
||||
imbalance = snap['stats'].get('imbalance')
|
||||
if imbalance is not None:
|
||||
recent_imbalances.append(imbalance)
|
||||
|
||||
if recent_imbalances:
|
||||
stats[name] = sum(recent_imbalances) / len(recent_imbalances)
|
||||
else:
|
||||
stats[name] = 0.0
|
||||
|
||||
# Debug logging to verify cumulative imbalance calculation
|
||||
if any(value != 0.0 for value in stats.values()):
|
||||
logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}")
|
||||
|
||||
return stats
|
||||
|
||||
def _feed_cob_data_to_models(self, symbol: str, cob_snapshot: dict):
|
||||
"""Feed COB data to models for training and inference"""
|
||||
try:
|
||||
# Calculate cumulative imbalance for model feeding
|
||||
cumulative_imbalance = self._calculate_cumulative_imbalance(symbol) # Assumes _calculate_cumulative_imbalance is available
|
||||
|
||||
history_data = {
|
||||
'symbol': symbol,
|
||||
'current_snapshot': cob_snapshot,
|
||||
'history': list(self.cob_data_history[symbol]), # Convert deque to list for consistent slicing
|
||||
'bucketed_data': self.cob_bucketed_data[symbol],
|
||||
'cumulative_imbalance': cumulative_imbalance, # Add cumulative imbalance
|
||||
'timestamp': cob_snapshot['timestamp']
|
||||
}
|
||||
|
||||
# Pass to orchestrator for model feeding
|
||||
if self.orchestrator and hasattr(self.orchestrator, 'feed_cob_data'):
|
||||
self.orchestrator.feed_cob_data(symbol, history_data) # Assumes feed_cob_data exists in orchestrator
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"TEMPLATED DASHBOARD: Error feeding COB data to models: {e}")
|
||||
|
||||
def _is_signal_generation_active(self) -> bool:
|
||||
"""Check if signal generation is active (e.g., models are loaded and running)"""
|
||||
# For now, return true to always generate signals
|
||||
# In a real system, this would check model loading status, training status, etc.
|
||||
return True # Simplified for initial integration
|
||||
|
||||
def _start_signal_generation_loop(self):
|
||||
"""Start signal generation loop to ensure continuous trading signals"""
|
||||
try:
|
||||
def signal_worker():
|
||||
logger.info("TEMPLATED DASHBOARD: Signal generation worker started")
|
||||
while True:
|
||||
try:
|
||||
# Ensure signal generation is active before processing
|
||||
if self._is_signal_generation_active():
|
||||
symbol = 'ETH/USDT' # Focus on ETH for now
|
||||
current_price = self._get_current_price(symbol)
|
||||
if current_price:
|
||||
# Generate a momentum signal (simplified for demo)
|
||||
signal = self._generate_momentum_signal(symbol, current_price) # Assumes _generate_momentum_signal is available
|
||||
if signal:
|
||||
self._process_dashboard_signal(signal) # Assumes _process_dashboard_signal is available
|
||||
|
||||
# Generate a DQN signal if enabled
|
||||
if self.dqn_inference_enabled:
|
||||
dqn_signal = self._generate_dqn_signal(symbol, current_price) # Assumes _generate_dqn_signal is available
|
||||
if dqn_signal:
|
||||
self._process_dashboard_signal(dqn_signal)
|
||||
|
||||
# Generate a CNN pivot signal if enabled
|
||||
if self.cnn_inference_enabled:
|
||||
cnn_signal = self._get_cnn_pivot_prediction() # Assumes _get_cnn_pivot_prediction is available
|
||||
if cnn_signal:
|
||||
self._process_dashboard_signal(cnn_signal)
|
||||
|
||||
# Update session metrics every 1 second interval to reflect new trades
|
||||
self._update_session_metrics()
|
||||
|
||||
time.sleep(1) # Run every second for signal generation
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error in signal worker: {e}")
|
||||
time.sleep(5) # Longer sleep on error
|
||||
|
||||
signal_thread = threading.Thread(target=signal_worker, daemon=True)
|
||||
signal_thread.start()
|
||||
logger.info("TEMPLATED DASHBOARD: Signal generation loop started")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error starting signal generation loop: {e}")
|
||||
|
||||
def _start_actual_training_if_needed(self):
|
||||
"""Start actual model training with real data collection and training loops"""
|
||||
try:
|
||||
if not self.orchestrator:
|
||||
logger.warning("TEMPLATED DASHBOARD: No orchestrator available for training")
|
||||
return
|
||||
logger.info("TEMPLATED DASHBOARD: TRAINING: Starting actual training system with real data collection")
|
||||
self._start_real_training_system()
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error starting comprehensive training system: {e}")
|
||||
|
||||
def _start_real_training_system(self):
|
||||
"""Start real training system with data collection and actual model training"""
|
||||
try:
|
||||
# Training performance metrics
|
||||
self.training_performance = {
|
||||
'decision': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||
'cob_rl': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||
'dqn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||
'cnn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||
'transformer': {'inference_times': [], 'training_times': [], 'total_calls': 0} # Added for transformer
|
||||
}
|
||||
|
||||
def training_coordinator():
|
||||
logger.info("TEMPLATED DASHBOARD: TRAINING: High-frequency training coordinator started")
|
||||
training_iteration = 0
|
||||
last_dqn_training = 0
|
||||
last_cnn_training = 0
|
||||
last_decision_training = 0
|
||||
last_cob_rl_training = 0
|
||||
last_transformer_training = 0 # For transformer
|
||||
|
||||
while True:
|
||||
try:
|
||||
training_iteration += 1
|
||||
current_time = time.time()
|
||||
market_data = self._collect_training_data() # Assumes _collect_training_data is available
|
||||
|
||||
if market_data:
|
||||
logger.debug(f"TEMPLATED DASHBOARD: TRAINING: Collected {len(market_data)} market data points for training")
|
||||
|
||||
# High-frequency training for split-second decisions
|
||||
# Train decision fusion and COB RL as fast as hardware allows
|
||||
if current_time - last_decision_training > 0.1: # Every 100ms
|
||||
start_time = time.time()
|
||||
self._perform_real_decision_training(market_data) # Assumes _perform_real_decision_training is available
|
||||
training_time = time.time() - start_time
|
||||
self.training_performance['decision']['training_times'].append(training_time)
|
||||
self.training_performance['decision']['total_calls'] += 1
|
||||
last_decision_training = current_time
|
||||
|
||||
# Keep only last 100 measurements
|
||||
if len(self.training_performance['decision']['training_times']) > 100:
|
||||
self.training_performance['decision']['training_times'] = self.training_performance['decision']['training_times'][-100:]
|
||||
|
||||
# Advanced Transformer Training (every 200ms for comprehensive features)
|
||||
if current_time - last_transformer_training > 0.2: # Every 200ms for transformer
|
||||
start_time = time.time()
|
||||
self._perform_real_transformer_training(market_data) # Assumes _perform_real_transformer_training is available
|
||||
training_time = time.time() - start_time
|
||||
self.training_performance['transformer']['training_times'].append(training_time)
|
||||
self.training_performance['transformer']['total_calls'] += 1
|
||||
last_transformer_training = current_time # Update last training time
|
||||
|
||||
# Keep only last 100 measurements
|
||||
if len(self.training_performance['transformer']['training_times']) > 100:
|
||||
self.training_performance['transformer']['training_times'] = self.training_performance['transformer']['training_times'][-100:]
|
||||
|
||||
if current_time - last_cob_rl_training > 0.1: # Every 100ms
|
||||
start_time = time.time()
|
||||
self._perform_real_cob_rl_training(market_data) # Assumes _perform_real_cob_rl_training is available
|
||||
training_time = time.time() - start_time
|
||||
self.training_performance['cob_rl']['training_times'].append(training_time)
|
||||
self.training_performance['cob_rl']['total_calls'] += 1
|
||||
last_cob_rl_training = current_time
|
||||
|
||||
# Keep only last 100 measurements
|
||||
if len(self.training_performance['cob_rl']['training_times']) > 100:
|
||||
self.training_performance['cob_rl']['training_times'] = self.training_performance['cob_rl']['training_times'][-100:]
|
||||
|
||||
# Standard frequency for larger models
|
||||
if current_time - last_dqn_training > 30:
|
||||
start_time = time.time()
|
||||
self._perform_real_dqn_training(market_data) # Assumes _perform_real_dqn_training is available
|
||||
training_time = time.time() - start_time
|
||||
self.training_performance['dqn']['training_times'].append(training_time)
|
||||
self.training_performance['dqn']['total_calls'] += 1
|
||||
last_dqn_training = current_time
|
||||
|
||||
if len(self.training_performance['dqn']['training_times']) > 50:
|
||||
self.training_performance['dqn']['training_times'] = self.training_performance['dqn']['training_times'][-50:]
|
||||
|
||||
if current_time - last_cnn_training > 45:
|
||||
start_time = time.time()
|
||||
self._perform_real_cnn_training(market_data) # Assumes _perform_real_cnn_training is available
|
||||
training_time = time.time() - start_time
|
||||
self.training_performance['cnn']['training_times'].append(training_time)
|
||||
self.training_performance['cnn']['total_calls'] += 1
|
||||
last_cnn_training = current_time
|
||||
|
||||
if len(self.training_performance['cnn']['training_times']) > 50:
|
||||
self.training_performance['cnn']['training_times'] = self.training_performance['cnn']['training_times'][-50:]
|
||||
|
||||
self._update_training_progress(training_iteration) # Assumes _update_training_progress is available
|
||||
|
||||
# Log performance metrics every 100 iterations
|
||||
if training_iteration % 100 == 0:
|
||||
self._log_training_performance() # Assumes _log_training_performance is available
|
||||
logger.info(f"TEMPLATED DASHBOARD: TRAINING: Iteration {training_iteration} - High-frequency training active")
|
||||
|
||||
# Minimal sleep for maximum responsiveness
|
||||
time.sleep(0.05) # 50ms sleep for 20Hz training loop
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: TRAINING: Error in training iteration {training_iteration}: {e}")
|
||||
time.sleep(1) # Shorter error recovery
|
||||
|
||||
training_thread = threading.Thread(target=training_coordinator, daemon=True)
|
||||
training_thread.start()
|
||||
logger.info("TEMPLATED DASHBOARD: Real training system started")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TEMPLATED DASHBOARD: Error starting real training system: {e}")
|
||||
|
||||
def create_templated_dashboard(data_provider: Optional[DataProvider] = None,
|
||||
orchestrator: Optional[TradingOrchestrator] = None,
|
||||
|
Reference in New Issue
Block a user