Compare commits
8 Commits
9639073a09
...
d15ebf54ca
Author | SHA1 | Date | |
---|---|---|---|
d15ebf54ca | |||
488fbacf67 | |||
b47805dafc | |||
11718bf92f | |||
29e4076638 | |||
03573cfb56 | |||
083c1272ae | |||
b9159690ef |
@ -271,15 +271,15 @@
|
|||||||
],
|
],
|
||||||
"decision": [
|
"decision": [
|
||||||
{
|
{
|
||||||
"checkpoint_id": "decision_20250702_013257",
|
"checkpoint_id": "decision_20250702_083032",
|
||||||
"model_name": "decision",
|
"model_name": "decision",
|
||||||
"model_type": "decision_fusion",
|
"model_type": "decision_fusion",
|
||||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013257.pt",
|
"file_path": "NN\\models\\saved\\decision\\decision_20250702_083032.pt",
|
||||||
"created_at": "2025-07-02T01:32:57.057698",
|
"created_at": "2025-07-02T08:30:32.225869",
|
||||||
"file_size_mb": 0.06720924377441406,
|
"file_size_mb": 0.06720924377441406,
|
||||||
"performance_score": 9.99999352005137,
|
"performance_score": 102.79972716525019,
|
||||||
"accuracy": null,
|
"accuracy": null,
|
||||||
"loss": 6.479948628599987e-06,
|
"loss": 2.7283549419721e-06,
|
||||||
"val_accuracy": null,
|
"val_accuracy": null,
|
||||||
"val_loss": null,
|
"val_loss": null,
|
||||||
"reward": null,
|
"reward": null,
|
||||||
@ -291,15 +291,15 @@
|
|||||||
"wandb_artifact_name": null
|
"wandb_artifact_name": null
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checkpoint_id": "decision_20250702_013256",
|
"checkpoint_id": "decision_20250702_082925",
|
||||||
"model_name": "decision",
|
"model_name": "decision",
|
||||||
"model_type": "decision_fusion",
|
"model_type": "decision_fusion",
|
||||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013256.pt",
|
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||||
"created_at": "2025-07-02T01:32:56.667169",
|
"created_at": "2025-07-02T08:29:25.899383",
|
||||||
"file_size_mb": 0.06720924377441406,
|
"file_size_mb": 0.06720924377441406,
|
||||||
"performance_score": 9.999993471487318,
|
"performance_score": 102.7997148991013,
|
||||||
"accuracy": null,
|
"accuracy": null,
|
||||||
"loss": 6.528512681061979e-06,
|
"loss": 2.8510171153430164e-06,
|
||||||
"val_accuracy": null,
|
"val_accuracy": null,
|
||||||
"val_loss": null,
|
"val_loss": null,
|
||||||
"reward": null,
|
"reward": null,
|
||||||
@ -311,15 +311,15 @@
|
|||||||
"wandb_artifact_name": null
|
"wandb_artifact_name": null
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checkpoint_id": "decision_20250702_013255",
|
"checkpoint_id": "decision_20250702_082924",
|
||||||
"model_name": "decision",
|
"model_name": "decision",
|
||||||
"model_type": "decision_fusion",
|
"model_type": "decision_fusion",
|
||||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082924.pt",
|
||||||
"created_at": "2025-07-02T01:32:55.915359",
|
"created_at": "2025-07-02T08:29:24.538886",
|
||||||
"file_size_mb": 0.06720924377441406,
|
"file_size_mb": 0.06720924377441406,
|
||||||
"performance_score": 9.999993469737547,
|
"performance_score": 102.79971291710027,
|
||||||
"accuracy": null,
|
"accuracy": null,
|
||||||
"loss": 6.5302624539599814e-06,
|
"loss": 2.8708372390440218e-06,
|
||||||
"val_accuracy": null,
|
"val_accuracy": null,
|
||||||
"val_loss": null,
|
"val_loss": null,
|
||||||
"reward": null,
|
"reward": null,
|
||||||
@ -331,15 +331,15 @@
|
|||||||
"wandb_artifact_name": null
|
"wandb_artifact_name": null
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checkpoint_id": "decision_20250702_013255",
|
"checkpoint_id": "decision_20250702_082925",
|
||||||
"model_name": "decision",
|
"model_name": "decision",
|
||||||
"model_type": "decision_fusion",
|
"model_type": "decision_fusion",
|
||||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||||
"created_at": "2025-07-02T01:32:55.774316",
|
"created_at": "2025-07-02T08:29:25.218718",
|
||||||
"file_size_mb": 0.06720924377441406,
|
"file_size_mb": 0.06720924377441406,
|
||||||
"performance_score": 9.99999346914947,
|
"performance_score": 102.79971274601752,
|
||||||
"accuracy": null,
|
"accuracy": null,
|
||||||
"loss": 6.530850530594989e-06,
|
"loss": 2.87254807635711e-06,
|
||||||
"val_accuracy": null,
|
"val_accuracy": null,
|
||||||
"val_loss": null,
|
"val_loss": null,
|
||||||
"reward": null,
|
"reward": null,
|
||||||
@ -351,15 +351,15 @@
|
|||||||
"wandb_artifact_name": null
|
"wandb_artifact_name": null
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checkpoint_id": "decision_20250702_013255",
|
"checkpoint_id": "decision_20250702_082925",
|
||||||
"model_name": "decision",
|
"model_name": "decision",
|
||||||
"model_type": "decision_fusion",
|
"model_type": "decision_fusion",
|
||||||
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt",
|
"file_path": "NN\\models\\saved\\decision\\decision_20250702_082925.pt",
|
||||||
"created_at": "2025-07-02T01:32:55.646001",
|
"created_at": "2025-07-02T08:29:25.332228",
|
||||||
"file_size_mb": 0.06720924377441406,
|
"file_size_mb": 0.06720924377441406,
|
||||||
"performance_score": 9.99999346889822,
|
"performance_score": 102.79971263447665,
|
||||||
"accuracy": null,
|
"accuracy": null,
|
||||||
"loss": 6.531101780155828e-06,
|
"loss": 2.873663491419011e-06,
|
||||||
"val_accuracy": null,
|
"val_accuracy": null,
|
||||||
"val_loss": null,
|
"val_loss": null,
|
||||||
"reward": null,
|
"reward": null,
|
||||||
|
@ -476,16 +476,36 @@ class COBIntegration:
|
|||||||
async def _analyze_cob_patterns(self, symbol: str, cob_snapshot: COBSnapshot):
|
async def _analyze_cob_patterns(self, symbol: str, cob_snapshot: COBSnapshot):
|
||||||
"""Analyze COB data for trading patterns and signals"""
|
"""Analyze COB data for trading patterns and signals"""
|
||||||
try:
|
try:
|
||||||
# Large liquidity imbalance detection
|
# Enhanced liquidity imbalance detection with dynamic thresholds
|
||||||
if abs(cob_snapshot.liquidity_imbalance) > 0.4:
|
imbalance = abs(cob_snapshot.liquidity_imbalance)
|
||||||
|
|
||||||
|
# Dynamic threshold based on imbalance strength
|
||||||
|
if imbalance > 0.8: # Very strong imbalance (>80%)
|
||||||
|
threshold = 0.05 # 5% threshold for very strong signals
|
||||||
|
confidence_multiplier = 3.0
|
||||||
|
elif imbalance > 0.5: # Strong imbalance (>50%)
|
||||||
|
threshold = 0.1 # 10% threshold for strong signals
|
||||||
|
confidence_multiplier = 2.5
|
||||||
|
elif imbalance > 0.3: # Moderate imbalance (>30%)
|
||||||
|
threshold = 0.15 # 15% threshold for moderate signals
|
||||||
|
confidence_multiplier = 2.0
|
||||||
|
else: # Weak imbalance
|
||||||
|
threshold = 0.2 # 20% threshold for weak signals
|
||||||
|
confidence_multiplier = 1.5
|
||||||
|
|
||||||
|
# Generate signal if imbalance exceeds threshold
|
||||||
|
if abs(cob_snapshot.liquidity_imbalance) > threshold:
|
||||||
signal = {
|
signal = {
|
||||||
'timestamp': cob_snapshot.timestamp.isoformat(),
|
'timestamp': cob_snapshot.timestamp.isoformat(),
|
||||||
'type': 'liquidity_imbalance',
|
'type': 'liquidity_imbalance',
|
||||||
'side': 'buy' if cob_snapshot.liquidity_imbalance > 0 else 'sell',
|
'side': 'buy' if cob_snapshot.liquidity_imbalance > 0 else 'sell',
|
||||||
'strength': abs(cob_snapshot.liquidity_imbalance),
|
'strength': abs(cob_snapshot.liquidity_imbalance),
|
||||||
'confidence': min(1.0, abs(cob_snapshot.liquidity_imbalance) * 2)
|
'confidence': min(1.0, abs(cob_snapshot.liquidity_imbalance) * confidence_multiplier),
|
||||||
|
'threshold_used': threshold,
|
||||||
|
'signal_strength': 'very_strong' if imbalance > 0.8 else 'strong' if imbalance > 0.5 else 'moderate' if imbalance > 0.3 else 'weak'
|
||||||
}
|
}
|
||||||
self.cob_signals[symbol].append(signal)
|
self.cob_signals[symbol].append(signal)
|
||||||
|
logger.info(f"COB SIGNAL: {symbol} {signal['side'].upper()} signal generated - imbalance: {cob_snapshot.liquidity_imbalance:.3f}, confidence: {signal['confidence']:.3f}")
|
||||||
|
|
||||||
# Cleanup old signals
|
# Cleanup old signals
|
||||||
self.cob_signals[symbol] = self.cob_signals[symbol][-100:]
|
self.cob_signals[symbol] = self.cob_signals[symbol][-100:]
|
||||||
|
@ -16,13 +16,13 @@ import time
|
|||||||
import threading
|
import threading
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Dict, List, Optional, Tuple, Any
|
from typing import Dict, List, Optional, Tuple, Any, Union
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from .config import get_config
|
from .config import get_config
|
||||||
from .data_provider import DataProvider
|
from .data_provider import DataProvider
|
||||||
from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface
|
from models import get_model_registry, ModelInterface, CNNModelInterface, RLAgentInterface, ModelRegistry
|
||||||
|
|
||||||
# Import COB integration for real-time market microstructure data
|
# Import COB integration for real-time market microstructure data
|
||||||
try:
|
try:
|
||||||
@ -45,7 +45,7 @@ class Prediction:
|
|||||||
timeframe: str # Timeframe this prediction is for
|
timeframe: str # Timeframe this prediction is for
|
||||||
timestamp: datetime
|
timestamp: datetime
|
||||||
model_name: str # Name of the model that made this prediction
|
model_name: str # Name of the model that made this prediction
|
||||||
metadata: Dict[str, Any] = None # Additional model-specific data
|
metadata: Optional[Dict[str, Any]] = None # Additional model-specific data
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TradingDecision:
|
class TradingDecision:
|
||||||
@ -62,10 +62,10 @@ class TradingOrchestrator:
|
|||||||
"""
|
"""
|
||||||
Enhanced Trading Orchestrator with full ML and COB integration
|
Enhanced Trading Orchestrator with full ML and COB integration
|
||||||
Coordinates CNN, DQN, and COB models for advanced trading decisions
|
Coordinates CNN, DQN, and COB models for advanced trading decisions
|
||||||
Features real-time COB (Change of Bid) integration for market microstructure data
|
Features real-time COB (Change of Bid) data for market microstructure data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, data_provider: DataProvider = None, enhanced_rl_training: bool = True, model_registry: Dict = None):
|
def __init__(self, data_provider: Optional[DataProvider] = None, enhanced_rl_training: bool = True, model_registry: Optional[ModelRegistry] = None):
|
||||||
"""Initialize the enhanced orchestrator with full ML capabilities"""
|
"""Initialize the enhanced orchestrator with full ML capabilities"""
|
||||||
self.config = get_config()
|
self.config = get_config()
|
||||||
self.data_provider = data_provider or DataProvider()
|
self.data_provider = data_provider or DataProvider()
|
||||||
@ -79,18 +79,18 @@ class TradingOrchestrator:
|
|||||||
self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols
|
self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols
|
||||||
|
|
||||||
# Dynamic weights (will be adapted based on performance)
|
# Dynamic weights (will be adapted based on performance)
|
||||||
self.model_weights = {} # {model_name: weight}
|
self.model_weights: Dict[str, float] = {} # {model_name: weight}
|
||||||
self._initialize_default_weights()
|
self._initialize_default_weights()
|
||||||
|
|
||||||
# State tracking
|
# State tracking
|
||||||
self.last_decision_time = {} # {symbol: datetime}
|
self.last_decision_time: Dict[str, datetime] = {} # {symbol: datetime}
|
||||||
self.recent_decisions = {} # {symbol: List[TradingDecision]}
|
self.recent_decisions: Dict[str, List[TradingDecision]] = {} # {symbol: List[TradingDecision]}
|
||||||
self.model_performance = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}}
|
self.model_performance: Dict[str, Dict[str, Any]] = {} # {model_name: {'correct': int, 'total': int, 'accuracy': float}}
|
||||||
|
|
||||||
# Model prediction tracking for dashboard visualization
|
# Model prediction tracking for dashboard visualization
|
||||||
self.recent_dqn_predictions = {} # {symbol: List[Dict]} - Recent DQN predictions
|
self.recent_dqn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent DQN predictions
|
||||||
self.recent_cnn_predictions = {} # {symbol: List[Dict]} - Recent CNN predictions
|
self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions
|
||||||
self.prediction_accuracy_history = {} # {symbol: List[Dict]} - Prediction accuracy tracking
|
self.prediction_accuracy_history: Dict[str, deque] = {} # {symbol: List[Dict]} - Prediction accuracy tracking
|
||||||
|
|
||||||
# Initialize prediction tracking for each symbol
|
# Initialize prediction tracking for each symbol
|
||||||
for symbol in self.symbols:
|
for symbol in self.symbols:
|
||||||
@ -99,39 +99,45 @@ class TradingOrchestrator:
|
|||||||
self.prediction_accuracy_history[symbol] = deque(maxlen=200)
|
self.prediction_accuracy_history[symbol] = deque(maxlen=200)
|
||||||
|
|
||||||
# Decision callbacks
|
# Decision callbacks
|
||||||
self.decision_callbacks = []
|
self.decision_callbacks: List[Any] = []
|
||||||
|
|
||||||
# ENHANCED: Decision Fusion System - Built into orchestrator (no separate file needed!)
|
# ENHANCED: Decision Fusion System - Built into orchestrator (no separate file needed!)
|
||||||
self.decision_fusion_enabled = True
|
self.decision_fusion_enabled: bool = True
|
||||||
self.decision_fusion_network = None
|
self.decision_fusion_network: Any = None
|
||||||
self.fusion_training_history = []
|
self.fusion_training_history: List[Any] = []
|
||||||
self.last_fusion_inputs = {}
|
self.last_fusion_inputs: Dict[str, Any] = {} # Fix: Explicitly initialize as dictionary
|
||||||
self.fusion_checkpoint_frequency = 50 # Save every 50 decisions
|
self.fusion_checkpoint_frequency: int = 50 # Save every 50 decisions
|
||||||
self.fusion_decisions_count = 0
|
self.fusion_decisions_count: int = 0
|
||||||
self.fusion_training_data = [] # Store training examples for decision model
|
self.fusion_training_data: List[Any] = [] # Store training examples for decision model
|
||||||
|
|
||||||
# COB Integration - Real-time market microstructure data
|
# COB Integration - Real-time market microstructure data
|
||||||
self.cob_integration = None
|
self.cob_integration: Optional[COBIntegration] = None # Fix: Use Optional for COBIntegration
|
||||||
self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot}
|
self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot}
|
||||||
self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features
|
self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features
|
||||||
self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features
|
self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features
|
||||||
self.cob_feature_history: Dict[str, List] = {symbol: [] for symbol in self.symbols} # Rolling history for models
|
self.cob_feature_history: Dict[str, List[Any]] = {symbol: [] for symbol in self.symbols} # Rolling history for models
|
||||||
|
|
||||||
# Enhanced ML Models
|
# Enhanced ML Models
|
||||||
self.rl_agent = None # DQN Agent
|
self.rl_agent: Any = None # DQN Agent
|
||||||
self.cnn_model = None # CNN Model for pattern recognition
|
self.cnn_model: Any = None # CNN Model for pattern recognition
|
||||||
self.extrema_trainer = None # Extrema/pivot trainer
|
self.extrema_trainer: Any = None # Extrema/pivot trainer
|
||||||
|
self.primary_transformer: Any = None # Transformer model
|
||||||
|
self.primary_transformer_trainer: Any = None # Transformer model trainer
|
||||||
|
self.transformer_checkpoint_info: Dict[str, Any] = {} # Transformer checkpoint info
|
||||||
|
self.cob_rl_agent: Any = None # COB RL Agent
|
||||||
|
self.decision_model: Any = None # Decision Fusion model
|
||||||
|
|
||||||
self.latest_cnn_features: Dict[str, Any] = {} # CNN hidden features
|
self.latest_cnn_features: Dict[str, Any] = {} # CNN hidden features
|
||||||
self.latest_cnn_predictions: Dict[str, Any] = {} # CNN predictions
|
self.latest_cnn_predictions: Dict[str, Any] = {} # CNN predictions
|
||||||
|
|
||||||
# Enhanced RL features
|
# Enhanced RL features
|
||||||
self.sensitivity_learning_queue = [] # For outcome-based learning
|
self.sensitivity_learning_queue: List[Any] = [] # For outcome-based learning
|
||||||
self.perfect_move_buffer = [] # Buffer for perfect move analysis
|
self.perfect_move_buffer: List[Any] = [] # Buffer for perfect move analysis
|
||||||
self.position_status = {} # Current positions
|
self.position_status: Dict[str, Any] = {} # Current positions
|
||||||
|
|
||||||
# Real-time processing
|
# Real-time processing
|
||||||
self.realtime_processing = False
|
self.realtime_processing: bool = False
|
||||||
self.realtime_tasks = []
|
self.realtime_tasks: List[Any] = []
|
||||||
|
|
||||||
logger.info("Enhanced TradingOrchestrator initialized with full ML capabilities")
|
logger.info("Enhanced TradingOrchestrator initialized with full ML capabilities")
|
||||||
logger.info(f"Enhanced RL training: {enhanced_rl_training}")
|
logger.info(f"Enhanced RL training: {enhanced_rl_training}")
|
||||||
@ -310,9 +316,10 @@ class TradingOrchestrator:
|
|||||||
self.cob_integration = COBIntegration(symbols=self.symbols)
|
self.cob_integration = COBIntegration(symbols=self.symbols)
|
||||||
|
|
||||||
# Register callbacks to receive real-time COB data
|
# Register callbacks to receive real-time COB data
|
||||||
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
|
if self.cob_integration:
|
||||||
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
|
self.cob_integration.add_cnn_callback(self._on_cob_cnn_features)
|
||||||
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
|
self.cob_integration.add_dqn_callback(self._on_cob_dqn_features)
|
||||||
|
self.cob_integration.add_dashboard_callback(self._on_cob_dashboard_data)
|
||||||
|
|
||||||
# Initialize 5-minute COB data matrix system
|
# Initialize 5-minute COB data matrix system
|
||||||
self.cob_matrix_duration = 300 # 5 minutes in seconds
|
self.cob_matrix_duration = 300 # 5 minutes in seconds
|
||||||
@ -320,9 +327,9 @@ class TradingOrchestrator:
|
|||||||
self.cob_matrix_size = self.cob_matrix_duration // self.cob_matrix_resolution # 300 samples
|
self.cob_matrix_size = self.cob_matrix_duration // self.cob_matrix_resolution # 300 samples
|
||||||
|
|
||||||
# COB data matrix storage - 5 minutes of 1-second snapshots
|
# COB data matrix storage - 5 minutes of 1-second snapshots
|
||||||
self.cob_data_matrix: Dict[str, deque] = {}
|
self.cob_data_matrix: Dict[str, deque[Any]] = {}
|
||||||
self.cob_feature_matrix: Dict[str, deque] = {}
|
self.cob_feature_matrix: Dict[str, deque[Any]] = {}
|
||||||
self.cob_state_matrix: Dict[str, deque] = {}
|
self.cob_state_matrix: Dict[str, deque[Any]] = {}
|
||||||
|
|
||||||
# Initialize matrix storage for each symbol
|
# Initialize matrix storage for each symbol
|
||||||
for symbol in self.symbols:
|
for symbol in self.symbols:
|
||||||
@ -336,16 +343,16 @@ class TradingOrchestrator:
|
|||||||
self.cob_state_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
|
self.cob_state_matrix[symbol] = deque(maxlen=self.cob_matrix_size)
|
||||||
|
|
||||||
# Initialize COB data storage (legacy support)
|
# Initialize COB data storage (legacy support)
|
||||||
self.latest_cob_snapshots = {}
|
self.latest_cob_snapshots: Dict[str, Any] = {}
|
||||||
self.cob_feature_cache = {}
|
self.cob_feature_cache: Dict[str, Any] = {}
|
||||||
self.cob_state_cache = {}
|
self.cob_state_cache: Dict[str, Any] = {}
|
||||||
|
|
||||||
# COB matrix update tracking
|
# COB matrix update tracking
|
||||||
self.last_cob_matrix_update = {}
|
self.last_cob_matrix_update: Dict[str, float] = {}
|
||||||
self.cob_matrix_update_interval = 1.0 # Update every 1 second
|
self.cob_matrix_update_interval = 1.0 # Update every 1 second
|
||||||
|
|
||||||
# COB matrix statistics
|
# COB matrix statistics
|
||||||
self.cob_matrix_stats = {
|
self.cob_matrix_stats: Dict[str, Any] = {
|
||||||
'total_updates': 0,
|
'total_updates': 0,
|
||||||
'matrix_fills': {symbol: 0 for symbol in self.symbols},
|
'matrix_fills': {symbol: 0 for symbol in self.symbols},
|
||||||
'feature_generations': 0,
|
'feature_generations': 0,
|
||||||
@ -375,7 +382,8 @@ class TradingOrchestrator:
|
|||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
async def cob_main():
|
async def cob_main():
|
||||||
await self.cob_integration.start()
|
if self.cob_integration: # Additional check
|
||||||
|
await self.cob_integration.start()
|
||||||
# Keep running until stopped
|
# Keep running until stopped
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
@ -52,7 +52,7 @@ def main():
|
|||||||
|
|
||||||
# Run the dashboard
|
# Run the dashboard
|
||||||
logger.info("Starting templated dashboard server...")
|
logger.info("Starting templated dashboard server...")
|
||||||
dashboard.run_server(host='127.0.0.1', port=8051, debug=False)
|
dashboard.run_server(host='127.0.0.1', port=8052, debug=False)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error running templated dashboard: {e}")
|
logger.error(f"Error running templated dashboard: {e}")
|
||||||
|
@ -12,6 +12,7 @@ from typing import Dict, List, Optional, Tuple, Any
|
|||||||
from dataclasses import dataclass, asdict
|
from dataclasses import dataclass, asdict
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import torch
|
import torch
|
||||||
|
import random
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import wandb
|
import wandb
|
||||||
@ -150,36 +151,80 @@ class CheckpointManager:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def _calculate_performance_score(self, metrics: Dict[str, float]) -> float:
|
def _calculate_performance_score(self, metrics: Dict[str, float]) -> float:
|
||||||
|
"""Calculate performance score with improved sensitivity for training models"""
|
||||||
score = 0.0
|
score = 0.0
|
||||||
|
|
||||||
if 'accuracy' in metrics:
|
# Prioritize loss reduction for active training models
|
||||||
score += metrics['accuracy'] * 100
|
|
||||||
if 'val_accuracy' in metrics:
|
|
||||||
score += metrics['val_accuracy'] * 100
|
|
||||||
if 'loss' in metrics:
|
if 'loss' in metrics:
|
||||||
score += max(0, 10 - metrics['loss'])
|
# Invert loss so lower loss = higher score, with better scaling
|
||||||
if 'val_loss' in metrics:
|
loss_value = metrics['loss']
|
||||||
score += max(0, 10 - metrics['val_loss'])
|
if loss_value > 0:
|
||||||
if 'reward' in metrics:
|
score += max(0, 100 / (1 + loss_value)) # More sensitive to loss changes
|
||||||
score += metrics['reward']
|
else:
|
||||||
if 'pnl' in metrics:
|
score += 100 # Perfect loss
|
||||||
score += metrics['pnl']
|
|
||||||
|
|
||||||
|
# Add other metrics with appropriate weights
|
||||||
|
if 'accuracy' in metrics:
|
||||||
|
score += metrics['accuracy'] * 50 # Reduced weight to balance with loss
|
||||||
|
if 'val_accuracy' in metrics:
|
||||||
|
score += metrics['val_accuracy'] * 50
|
||||||
|
if 'val_loss' in metrics:
|
||||||
|
val_loss = metrics['val_loss']
|
||||||
|
if val_loss > 0:
|
||||||
|
score += max(0, 50 / (1 + val_loss))
|
||||||
|
if 'reward' in metrics:
|
||||||
|
score += metrics['reward'] * 10
|
||||||
|
if 'pnl' in metrics:
|
||||||
|
score += metrics['pnl'] * 5
|
||||||
|
if 'training_samples' in metrics:
|
||||||
|
# Bonus for processing more training samples
|
||||||
|
score += min(10, metrics['training_samples'] / 10)
|
||||||
|
|
||||||
|
# Ensure minimum score for any training activity
|
||||||
if score == 0.0 and metrics:
|
if score == 0.0 and metrics:
|
||||||
|
# Use the first available metric with better scaling
|
||||||
first_metric = next(iter(metrics.values()))
|
first_metric = next(iter(metrics.values()))
|
||||||
score = first_metric if first_metric > 0 else 0.1
|
if first_metric > 0:
|
||||||
|
score = max(0.1, min(10, first_metric))
|
||||||
|
else:
|
||||||
|
score = 0.1
|
||||||
|
|
||||||
return max(score, 0.1)
|
return max(score, 0.1)
|
||||||
|
|
||||||
def _should_save_checkpoint(self, model_name: str, performance_score: float) -> bool:
|
def _should_save_checkpoint(self, model_name: str, performance_score: float) -> bool:
|
||||||
|
"""Improved checkpoint saving logic with more frequent saves during training"""
|
||||||
if model_name not in self.checkpoints or not self.checkpoints[model_name]:
|
if model_name not in self.checkpoints or not self.checkpoints[model_name]:
|
||||||
return True
|
return True # Always save first checkpoint
|
||||||
|
|
||||||
|
# Allow more checkpoints during active training
|
||||||
if len(self.checkpoints[model_name]) < self.max_checkpoints:
|
if len(self.checkpoints[model_name]) < self.max_checkpoints:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
worst_score = min(cp.performance_score for cp in self.checkpoints[model_name])
|
# Get current best and worst scores
|
||||||
return performance_score > worst_score
|
scores = [cp.performance_score for cp in self.checkpoints[model_name]]
|
||||||
|
best_score = max(scores)
|
||||||
|
worst_score = min(scores)
|
||||||
|
|
||||||
|
# Save if better than worst (more frequent saves)
|
||||||
|
if performance_score > worst_score:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# For high-performing models (score > 100), be more sensitive to small improvements
|
||||||
|
if best_score > 100:
|
||||||
|
# Save if within 0.1% of best score (very sensitive for converged models)
|
||||||
|
if performance_score >= best_score * 0.999:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# Also save if we're within 10% of best score (capture near-optimal models)
|
||||||
|
if performance_score >= best_score * 0.9:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Save more frequently during active training (every 5th attempt instead of 10th)
|
||||||
|
if random.random() < 0.2: # 20% chance to save anyway
|
||||||
|
logger.info(f"Saving checkpoint for {model_name} - periodic save during active training")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def _save_model_file(self, model, file_path: Path, model_type: str) -> bool:
|
def _save_model_file(self, model, file_path: Path, model_type: str) -> bool:
|
||||||
try:
|
try:
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -59,10 +59,19 @@ class DashboardComponentManager:
|
|||||||
action_color = "text-success" if action == "BUY" else "text-danger"
|
action_color = "text-success" if action == "BUY" else "text-danger"
|
||||||
manual_indicator = " [M]" if manual else ""
|
manual_indicator = " [M]" if manual else ""
|
||||||
|
|
||||||
|
# Highlight COB signals
|
||||||
|
cob_indicator = ""
|
||||||
|
if hasattr(decision, 'type') and getattr(decision, 'type', '') == 'cob_liquidity_imbalance':
|
||||||
|
cob_indicator = " [COB]"
|
||||||
|
badge_class = "bg-info" # Use blue for COB signals
|
||||||
|
elif isinstance(decision, dict) and decision.get('type') == 'cob_liquidity_imbalance':
|
||||||
|
cob_indicator = " [COB]"
|
||||||
|
badge_class = "bg-info" # Use blue for COB signals
|
||||||
|
|
||||||
signal_div = html.Div([
|
signal_div = html.Div([
|
||||||
html.Span(f"{timestamp}", className="small text-muted me-2"),
|
html.Span(f"{timestamp}", className="small text-muted me-2"),
|
||||||
html.Span(f"{status}", className=f"badge {badge_class} me-2"),
|
html.Span(f"{status}", className=f"badge {badge_class} me-2"),
|
||||||
html.Span(f"{action}{manual_indicator}", className=f"{action_color} fw-bold me-2"),
|
html.Span(f"{action}{manual_indicator}{cob_indicator}", className=f"{action_color} fw-bold me-2"),
|
||||||
html.Span(f"({confidence:.1f}%)", className="small text-muted me-2"),
|
html.Span(f"({confidence:.1f}%)", className="small text-muted me-2"),
|
||||||
html.Span(f"${price:.2f}", className="small")
|
html.Span(f"${price:.2f}", className="small")
|
||||||
], className="mb-1")
|
], className="mb-1")
|
||||||
@ -349,7 +358,8 @@ class DashboardComponentManager:
|
|||||||
|
|
||||||
def _create_cob_ladder_panel(self, bids, asks, mid_price, symbol=""):
|
def _create_cob_ladder_panel(self, bids, asks, mid_price, symbol=""):
|
||||||
"""Creates the right panel with the compact COB ladder."""
|
"""Creates the right panel with the compact COB ladder."""
|
||||||
bucket_size = 10
|
# Use symbol-specific bucket sizes: ETH = $1, BTC = $10
|
||||||
|
bucket_size = 1.0 if "ETH" in symbol else 10.0
|
||||||
num_levels = 5
|
num_levels = 5
|
||||||
|
|
||||||
def aggregate_buckets(orders):
|
def aggregate_buckets(orders):
|
||||||
@ -667,15 +677,31 @@ class DashboardComponentManager:
|
|||||||
|
|
||||||
# Model metrics
|
# Model metrics
|
||||||
html.Div([
|
html.Div([
|
||||||
# Last prediction
|
# Last prediction with enhanced details
|
||||||
html.Div([
|
html.Div([
|
||||||
html.Span("Last: ", className="text-muted small"),
|
html.Span("Last: ", className="text-muted small"),
|
||||||
html.Span(f"{pred_action}",
|
html.Span(f"{pred_action}",
|
||||||
className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-muted'}"),
|
className=f"small fw-bold {'text-success' if pred_action == 'BUY' else 'text-danger' if pred_action == 'SELL' else 'text-warning' if 'PREDICTION' in pred_action else 'text-info'}"),
|
||||||
html.Span(f" ({pred_confidence:.1f}%)", className="text-muted small"),
|
html.Span(f" ({pred_confidence:.1f}%)", className="text-muted small"),
|
||||||
html.Span(f" @ {pred_time}", className="text-muted small")
|
html.Span(f" @ {pred_time}", className="text-muted small")
|
||||||
], className="mb-1"),
|
], className="mb-1"),
|
||||||
|
|
||||||
|
# Additional prediction details if available
|
||||||
|
*([
|
||||||
|
html.Div([
|
||||||
|
html.Span("Price: ", className="text-muted small"),
|
||||||
|
html.Span(f"${last_prediction.get('predicted_price', 0):.2f}", className="text-warning small fw-bold")
|
||||||
|
], className="mb-1")
|
||||||
|
] if last_prediction.get('predicted_price', 0) > 0 else []),
|
||||||
|
|
||||||
|
*([
|
||||||
|
html.Div([
|
||||||
|
html.Span("Change: ", className="text-muted small"),
|
||||||
|
html.Span(f"{last_prediction.get('price_change', 0):+.2f}%",
|
||||||
|
className=f"small fw-bold {'text-success' if last_prediction.get('price_change', 0) > 0 else 'text-danger'}")
|
||||||
|
], className="mb-1")
|
||||||
|
] if last_prediction.get('price_change', 0) != 0 else []),
|
||||||
|
|
||||||
# Timing information (NEW)
|
# Timing information (NEW)
|
||||||
html.Div([
|
html.Div([
|
||||||
html.Span("Timing: ", className="text-muted small"),
|
html.Span("Timing: ", className="text-muted small"),
|
||||||
|
@ -149,6 +149,10 @@ class DashboardLayoutManager:
|
|||||||
html.I(className="fas fa-trash me-1"),
|
html.I(className="fas fa-trash me-1"),
|
||||||
"Clear Session"
|
"Clear Session"
|
||||||
], id="clear-session-btn", className="btn btn-warning btn-sm w-100"),
|
], id="clear-session-btn", className="btn btn-warning btn-sm w-100"),
|
||||||
|
html.Button([
|
||||||
|
html.I(className="fas fa-save me-1"),
|
||||||
|
"Store All Models"
|
||||||
|
], id="store-models-btn", className="btn btn-info btn-sm w-100 mt-2"),
|
||||||
html.Hr(className="my-2"),
|
html.Hr(className="my-2"),
|
||||||
html.Small("System Status", className="text-muted d-block mb-1"),
|
html.Small("System Status", className="text-muted d-block mb-1"),
|
||||||
html.Div([
|
html.Div([
|
||||||
|
@ -5,8 +5,7 @@ Handles HTML template rendering with Jinja2
|
|||||||
import os
|
import os
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||||
import dash_html_components as html
|
from dash import html, dcc
|
||||||
from dash import dcc
|
|
||||||
import plotly.graph_objects as go
|
import plotly.graph_objects as go
|
||||||
|
|
||||||
from .dashboard_model import DashboardModel, DashboardDataBuilder
|
from .dashboard_model import DashboardModel, DashboardDataBuilder
|
||||||
|
@ -3,20 +3,33 @@ Template-based Trading Dashboard
|
|||||||
Uses MVC architecture with HTML templates and data models
|
Uses MVC architecture with HTML templates and data models
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional, Any, Dict, List
|
import sys
|
||||||
from datetime import datetime
|
import os
|
||||||
|
from typing import Optional, Any, Dict, List, Deque
|
||||||
|
from datetime import datetime, timedelta
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
import pytz
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import asdict
|
||||||
|
|
||||||
import dash
|
import dash
|
||||||
from dash import dcc, html, Input, Output, State, callback_context
|
from dash import dcc, html, Input, Output, State, callback_context
|
||||||
import plotly.graph_objects as go
|
import plotly.graph_objects as go
|
||||||
import plotly.express as px
|
import plotly.express as px
|
||||||
|
|
||||||
from .dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
|
|
||||||
from .template_renderer import DashboardTemplateRenderer
|
|
||||||
from core.data_provider import DataProvider
|
from core.data_provider import DataProvider
|
||||||
from core.orchestrator import TradingOrchestrator
|
from core.orchestrator import TradingOrchestrator
|
||||||
from core.trading_executor import TradingExecutor
|
from core.trading_executor import TradingExecutor
|
||||||
|
from core.config import get_config
|
||||||
|
from core.unified_data_stream import UnifiedDataStream
|
||||||
|
from web.dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
|
||||||
|
from web.template_renderer import DashboardTemplateRenderer
|
||||||
|
from web.component_manager import DashboardComponentManager
|
||||||
|
from web.layout_manager import DashboardLayoutManager
|
||||||
|
from utils.checkpoint_manager import save_checkpoint, load_best_checkpoint
|
||||||
|
from NN.models.advanced_transformer_trading import create_trading_transformer, TradingTransformerConfig
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -29,29 +42,125 @@ class TemplatedTradingDashboard:
|
|||||||
orchestrator: Optional[TradingOrchestrator] = None,
|
orchestrator: Optional[TradingOrchestrator] = None,
|
||||||
trading_executor: Optional[TradingExecutor] = None):
|
trading_executor: Optional[TradingExecutor] = None):
|
||||||
"""Initialize the templated dashboard"""
|
"""Initialize the templated dashboard"""
|
||||||
self.data_provider = data_provider
|
self.config = get_config()
|
||||||
self.orchestrator = orchestrator
|
|
||||||
self.trading_executor = trading_executor
|
# Initialize components
|
||||||
|
self.data_provider = data_provider or DataProvider()
|
||||||
|
self.trading_executor = trading_executor or TradingExecutor()
|
||||||
|
|
||||||
# Initialize template renderer
|
# Initialize template renderer
|
||||||
self.renderer = DashboardTemplateRenderer()
|
self.renderer = DashboardTemplateRenderer()
|
||||||
|
|
||||||
# Initialize Dash app
|
# Initialize unified orchestrator with full ML capabilities
|
||||||
|
if orchestrator is None:
|
||||||
|
self.orchestrator = TradingOrchestrator(
|
||||||
|
data_provider=self.data_provider,
|
||||||
|
enhanced_rl_training=True,
|
||||||
|
model_registry={}
|
||||||
|
)
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Using unified Trading Orchestrator with full ML capabilities")
|
||||||
|
else:
|
||||||
|
self.orchestrator = orchestrator
|
||||||
|
|
||||||
|
# Initialize enhanced training system for predictions
|
||||||
|
self.training_system = None
|
||||||
|
self._initialize_enhanced_training_system()
|
||||||
|
|
||||||
|
# Initialize layout and component managers
|
||||||
|
self.layout_manager = DashboardLayoutManager(
|
||||||
|
starting_balance=self._get_initial_balance(),
|
||||||
|
trading_executor=self.trading_executor
|
||||||
|
)
|
||||||
|
self.component_manager = DashboardComponentManager()
|
||||||
|
|
||||||
|
# Initialize Universal Data Stream for the 5 timeseries architecture
|
||||||
|
self.unified_stream = UnifiedDataStream(self.data_provider, self.orchestrator)
|
||||||
|
self.stream_consumer_id = self.unified_stream.register_consumer(
|
||||||
|
consumer_name="TemplatedTradingDashboard",
|
||||||
|
callback=self._handle_unified_stream_data,
|
||||||
|
data_types=['ticks', 'ohlcv', 'training_data', 'ui_data']
|
||||||
|
)
|
||||||
|
logger.info(f"TEMPLATED DASHBOARD: Universal Data Stream initialized with consumer ID: {self.stream_consumer_id}")
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Subscribed to Universal 5 Timeseries: ETH(ticks,1m,1h,1d) + BTC(ticks)")
|
||||||
|
|
||||||
|
# Dashboard state
|
||||||
|
self.recent_decisions: list = []
|
||||||
|
self.closed_trades: list = []
|
||||||
|
self.current_prices: dict = {}
|
||||||
|
self.session_pnl = 0.0
|
||||||
|
self.total_fees = 0.0
|
||||||
|
self.current_position: Optional[float] = 0.0
|
||||||
|
self.session_trades: list = []
|
||||||
|
|
||||||
|
# Model control toggles - separate inference and training
|
||||||
|
self.dqn_inference_enabled = True # Default: enabled
|
||||||
|
self.dqn_training_enabled = True # Default: enabled
|
||||||
|
self.cnn_inference_enabled = True
|
||||||
|
self.cnn_training_enabled = True
|
||||||
|
|
||||||
|
# Leverage management - adjustable x1 to x100
|
||||||
|
self.current_leverage = 50 # Default x50 leverage
|
||||||
|
self.min_leverage = 1
|
||||||
|
self.max_leverage = 100
|
||||||
|
self.pending_trade_case_id = None # For tracking opening trades until closure
|
||||||
|
|
||||||
|
# WebSocket streaming
|
||||||
|
self.ws_price_cache: dict = {}
|
||||||
|
self.is_streaming = False
|
||||||
|
self.tick_cache: list = []
|
||||||
|
|
||||||
|
# COB data cache - enhanced with price buckets and memory system
|
||||||
|
self.cob_cache: dict = {
|
||||||
|
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0},
|
||||||
|
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
|
||||||
|
}
|
||||||
|
self.latest_cob_data: dict = {} # Cache for COB integration data
|
||||||
|
self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display)
|
||||||
|
|
||||||
|
# COB High-frequency data handling (50-100 updates/sec)
|
||||||
|
self.cob_data_buffer: dict = {} # Buffer for high-freq data
|
||||||
|
self.cob_memory: dict = {} # Memory system like GPT - keeps last N snapshots
|
||||||
|
self.cob_price_buckets: dict = {} # Price bucket cache
|
||||||
|
self.cob_update_count = 0
|
||||||
|
self.last_cob_broadcast: Dict[str, Optional[float]] = {'ETH/USDT': None, 'BTC/USDT': None} # Rate limiting for UI updates, updated type
|
||||||
|
self.cob_data_history: Dict[str, Deque[Any]] = {
|
||||||
|
'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots
|
||||||
|
'BTC/USDT': deque(maxlen=61)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Initialize timezone
|
||||||
|
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
|
||||||
|
self.timezone = pytz.timezone(timezone_name)
|
||||||
|
|
||||||
|
# Create Dash app
|
||||||
self.app = dash.Dash(__name__, external_stylesheets=[
|
self.app = dash.Dash(__name__, external_stylesheets=[
|
||||||
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css'
|
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
|
||||||
|
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
|
||||||
])
|
])
|
||||||
|
|
||||||
# Session data
|
# Suppress Dash development mode logging
|
||||||
self.session_start_time = datetime.now()
|
self.app.enable_dev_tools(debug=False, dev_tools_silence_routes_logging=True)
|
||||||
self.session_trades = []
|
|
||||||
self.session_pnl = 0.0
|
|
||||||
self.current_position = 0.0
|
|
||||||
|
|
||||||
# Setup layout and callbacks
|
# Setup layout and callbacks
|
||||||
self._setup_layout()
|
self._setup_layout()
|
||||||
self._setup_callbacks()
|
self._setup_callbacks()
|
||||||
|
|
||||||
logger.info("TEMPLATED DASHBOARD: Initialized with MVC architecture")
|
# Start data streams
|
||||||
|
self._initialize_streaming()
|
||||||
|
|
||||||
|
# Connect to orchestrator for real trading signals
|
||||||
|
self._connect_to_orchestrator()
|
||||||
|
|
||||||
|
# Initialize COB integration with high-frequency data handling
|
||||||
|
self._initialize_cob_integration()
|
||||||
|
|
||||||
|
# Start signal generation loop to ensure continuous trading signals
|
||||||
|
self._start_signal_generation_loop()
|
||||||
|
|
||||||
|
# Start training sessions if models are showing FRESH status
|
||||||
|
threading.Thread(target=self._delayed_training_check, daemon=True).start()
|
||||||
|
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Initialized with HIGH-FREQUENCY COB integration and signal generation")
|
||||||
|
|
||||||
def _setup_layout(self):
|
def _setup_layout(self):
|
||||||
"""Setup the dashboard layout using templates"""
|
"""Setup the dashboard layout using templates"""
|
||||||
@ -61,69 +170,20 @@ class TemplatedTradingDashboard:
|
|||||||
# Render layout using template
|
# Render layout using template
|
||||||
layout = self.renderer.render_dashboard(dashboard_data)
|
layout = self.renderer.render_dashboard(dashboard_data)
|
||||||
|
|
||||||
# Add custom CSS
|
# Custom CSS will be handled via external stylesheets
|
||||||
layout.children.insert(0, self._get_custom_css())
|
|
||||||
|
|
||||||
self.app.layout = layout
|
self.app.layout = layout
|
||||||
|
|
||||||
def _get_custom_css(self) -> html.Style:
|
def _get_initial_balance(self) -> float:
|
||||||
"""Get custom CSS styles"""
|
"""Get initial balance from trading executor or default"""
|
||||||
return html.Style(children="""
|
try:
|
||||||
.metric-card {
|
if self.trading_executor and hasattr(self.trading_executor, 'starting_balance'):
|
||||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
balance = getattr(self.trading_executor, 'starting_balance', None)
|
||||||
color: white;
|
if balance and balance > 0:
|
||||||
border-radius: 10px;
|
return balance
|
||||||
padding: 15px;
|
except Exception as e:
|
||||||
margin-bottom: 10px;
|
logger.warning(f"Error getting balance: {e}")
|
||||||
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
|
return 100.0 # Default balance
|
||||||
}
|
|
||||||
.metric-value {
|
|
||||||
font-size: 1.5rem;
|
|
||||||
font-weight: bold;
|
|
||||||
}
|
|
||||||
.metric-label {
|
|
||||||
font-size: 0.9rem;
|
|
||||||
opacity: 0.9;
|
|
||||||
}
|
|
||||||
.cob-ladder {
|
|
||||||
max-height: 400px;
|
|
||||||
overflow-y: auto;
|
|
||||||
font-family: 'Courier New', monospace;
|
|
||||||
font-size: 0.85rem;
|
|
||||||
}
|
|
||||||
.bid-row {
|
|
||||||
background-color: rgba(40, 167, 69, 0.1);
|
|
||||||
border-left: 3px solid #28a745;
|
|
||||||
}
|
|
||||||
.ask-row {
|
|
||||||
background-color: rgba(220, 53, 69, 0.1);
|
|
||||||
border-left: 3px solid #dc3545;
|
|
||||||
}
|
|
||||||
.training-panel {
|
|
||||||
background: #f8f9fa;
|
|
||||||
border-radius: 8px;
|
|
||||||
padding: 15px;
|
|
||||||
height: 300px;
|
|
||||||
overflow-y: auto;
|
|
||||||
}
|
|
||||||
.model-status {
|
|
||||||
padding: 8px 12px;
|
|
||||||
border-radius: 20px;
|
|
||||||
font-size: 0.8rem;
|
|
||||||
font-weight: bold;
|
|
||||||
margin: 2px;
|
|
||||||
display: inline-block;
|
|
||||||
}
|
|
||||||
.status-training { background-color: #28a745; color: white; }
|
|
||||||
.status-idle { background-color: #6c757d; color: white; }
|
|
||||||
.status-loading { background-color: #ffc107; color: black; }
|
|
||||||
.closed-trades {
|
|
||||||
max-height: 200px;
|
|
||||||
overflow-y: auto;
|
|
||||||
}
|
|
||||||
.trade-profit { color: #28a745; font-weight: bold; }
|
|
||||||
.trade-loss { color: #dc3545; font-weight: bold; }
|
|
||||||
""")
|
|
||||||
|
|
||||||
def _setup_callbacks(self):
|
def _setup_callbacks(self):
|
||||||
"""Setup dashboard callbacks"""
|
"""Setup dashboard callbacks"""
|
||||||
@ -220,7 +280,8 @@ class TemplatedTradingDashboard:
|
|||||||
def update_closed_trades(n):
|
def update_closed_trades(n):
|
||||||
"""Update closed trades table"""
|
"""Update closed trades table"""
|
||||||
try:
|
try:
|
||||||
return self._render_closed_trades()
|
# Return the table wrapped in a Div
|
||||||
|
return html.Div(self._render_closed_trades())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating closed trades: {e}")
|
logger.error(f"Error updating closed trades: {e}")
|
||||||
return html.Div("No trades")
|
return html.Div("No trades")
|
||||||
@ -523,44 +584,32 @@ class TemplatedTradingDashboard:
|
|||||||
])
|
])
|
||||||
])
|
])
|
||||||
|
|
||||||
def _render_closed_trades(self) -> html.Table:
|
def _render_closed_trades(self) -> html.Div:
|
||||||
"""Render closed trades table"""
|
"""Render closed trades table"""
|
||||||
return html.Table([
|
if not self.closed_trades:
|
||||||
html.Thead([
|
return html.Div("No closed trades yet.", className="alert alert-info mt-3")
|
||||||
html.Tr([
|
|
||||||
html.Th("Time"),
|
# Create a DataFrame from closed trades
|
||||||
html.Th("Symbol"),
|
df_trades = pd.DataFrame(self.closed_trades)
|
||||||
html.Th("Side"),
|
|
||||||
html.Th("Size"),
|
# Format columns for display
|
||||||
html.Th("Entry"),
|
df_trades['timestamp'] = pd.to_datetime(df_trades['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
html.Th("Exit"),
|
df_trades['entry_price'] = df_trades['entry_price'].apply(lambda x: f"${x:,.2f}")
|
||||||
html.Th("PnL"),
|
df_trades['exit_price'] = df_trades['exit_price'].apply(lambda x: f"${x:,.2f}")
|
||||||
html.Th("Duration")
|
df_trades['pnl'] = df_trades['pnl'].apply(lambda x: f"${x:,.2f}")
|
||||||
])
|
df_trades['profit_percentage'] = df_trades['profit_percentage'].apply(lambda x: f"{x:,.2f}%")
|
||||||
]),
|
df_trades['size'] = df_trades['size'].apply(lambda x: f"{x:,.4f}")
|
||||||
html.Tbody([
|
df_trades['fees'] = df_trades['fees'].apply(lambda x: f"${x:,.2f}")
|
||||||
html.Tr([
|
|
||||||
html.Td("14:23:45"),
|
table_header = [html.Thead(html.Tr([html.Th(col) for col in df_trades.columns]))]
|
||||||
html.Td("ETH/USDT"),
|
table_body = [html.Tbody([
|
||||||
html.Td([html.Span("BUY", className="badge bg-success")]),
|
html.Tr([html.Td(df_trades.iloc[i][col]) for col in df_trades.columns]) for i in range(len(df_trades))
|
||||||
html.Td("1.5"),
|
])]
|
||||||
html.Td("$3420.45"),
|
|
||||||
html.Td("$3428.12"),
|
return html.Div(
|
||||||
html.Td("$11.51", className="trade-profit"),
|
html.Table(table_header + table_body, className="table table-striped table-hover table-sm"),
|
||||||
html.Td("2m 34s")
|
className="table-responsive"
|
||||||
]),
|
)
|
||||||
html.Tr([
|
|
||||||
html.Td("14:21:12"),
|
|
||||||
html.Td("BTC/USDT"),
|
|
||||||
html.Td([html.Span("SELL", className="badge bg-danger")]),
|
|
||||||
html.Td("0.1"),
|
|
||||||
html.Td("$45150.23"),
|
|
||||||
html.Td("$45142.67"),
|
|
||||||
html.Td("-$0.76", className="trade-loss"),
|
|
||||||
html.Td("1m 12s")
|
|
||||||
])
|
|
||||||
])
|
|
||||||
], className="table table-sm")
|
|
||||||
|
|
||||||
def _execute_manual_trade(self, action: str):
|
def _execute_manual_trade(self, action: str):
|
||||||
"""Execute manual trade"""
|
"""Execute manual trade"""
|
||||||
@ -585,11 +634,587 @@ class TemplatedTradingDashboard:
|
|||||||
self.session_start_time = datetime.now()
|
self.session_start_time = datetime.now()
|
||||||
logger.info("SESSION: Cleared")
|
logger.info("SESSION: Cleared")
|
||||||
|
|
||||||
def run_server(self, host='127.0.0.1', port=8051, debug=False):
|
def run_server(self, host='127.0.0.1', port=8052, debug=False):
|
||||||
"""Run the dashboard server"""
|
"""Run the dashboard server"""
|
||||||
logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}")
|
logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}")
|
||||||
self.app.run_server(host=host, port=port, debug=debug)
|
self.app.run(host=host, port=port, debug=debug)
|
||||||
|
|
||||||
|
def _handle_unified_stream_data(self, data):
|
||||||
|
"""Placeholder for unified stream data handling."""
|
||||||
|
logger.debug(f"Received data from unified stream: {data}")
|
||||||
|
|
||||||
|
def _delayed_training_check(self):
|
||||||
|
"""Check and start training after a delay to allow initialization"""
|
||||||
|
try:
|
||||||
|
time.sleep(10) # Wait 10 seconds for initialization
|
||||||
|
logger.info("Checking if models need training activation...")
|
||||||
|
self._start_actual_training_if_needed()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in delayed training check: {e}")
|
||||||
|
|
||||||
|
def _initialize_enhanced_training_system(self):
|
||||||
|
"""Initialize enhanced training system for model predictions"""
|
||||||
|
try:
|
||||||
|
# Try to import and initialize enhanced training system
|
||||||
|
from enhanced_realtime_training import EnhancedRealtimeTrainingSystem
|
||||||
|
|
||||||
|
self.training_system = EnhancedRealtimeTrainingSystem(
|
||||||
|
orchestrator=self.orchestrator,
|
||||||
|
data_provider=self.data_provider,
|
||||||
|
dashboard=self
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize prediction storage
|
||||||
|
if not hasattr(self.orchestrator, 'recent_dqn_predictions'):
|
||||||
|
self.orchestrator.recent_dqn_predictions = {}
|
||||||
|
if not hasattr(self.orchestrator, 'recent_cnn_predictions'):
|
||||||
|
self.orchestrator.recent_cnn_predictions = {}
|
||||||
|
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Enhanced training system initialized for model predictions")
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
logger.warning("TEMPLATED DASHBOARD: Enhanced training system not available - using mock predictions")
|
||||||
|
self.training_system = None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error initializing enhanced training system: {e}")
|
||||||
|
self.training_system = None
|
||||||
|
|
||||||
|
def _initialize_streaming(self):
|
||||||
|
"""Initialize data streaming"""
|
||||||
|
try:
|
||||||
|
self._start_websocket_streaming()
|
||||||
|
self._start_data_collection()
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Data streaming initialized")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error initializing streaming: {e}")
|
||||||
|
|
||||||
|
def _start_websocket_streaming(self):
|
||||||
|
"""Start WebSocket streaming for real-time data."""
|
||||||
|
ws_thread = threading.Thread(target=self._ws_worker, daemon=True)
|
||||||
|
ws_thread.start()
|
||||||
|
|
||||||
|
def _ws_worker(self):
|
||||||
|
try:
|
||||||
|
import websocket
|
||||||
|
import json # Added import
|
||||||
|
def on_message(ws, message):
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
if 'k' in data:
|
||||||
|
kline = data['k']
|
||||||
|
tick_record = {
|
||||||
|
'symbol': 'ETHUSDT',
|
||||||
|
'datetime': datetime.fromtimestamp(int(kline['t']) / 1000),
|
||||||
|
'open': float(kline['o']),
|
||||||
|
'high': float(kline['h']),
|
||||||
|
'low': float(kline['l']),
|
||||||
|
'close': float(kline['c']),
|
||||||
|
'price': float(kline['c']),
|
||||||
|
'volume': float(kline['v']),
|
||||||
|
}
|
||||||
|
self.ws_price_cache['ETHUSDT'] = tick_record['price']
|
||||||
|
self.current_prices['ETH/USDT'] = tick_record['price']
|
||||||
|
self.tick_cache.append(tick_record)
|
||||||
|
if len(self.tick_cache) > 1000:
|
||||||
|
self.tick_cache.pop(0)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"TEMPLATED DASHBOARD: WebSocket message error: {e}")
|
||||||
|
def on_error(ws, error):
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: WebSocket error: {error}")
|
||||||
|
self.is_streaming = False
|
||||||
|
def on_close(ws, close_status_code, close_msg):
|
||||||
|
logger.warning("TEMPLATED DASHBOARD: WebSocket connection closed")
|
||||||
|
self.is_streaming = False
|
||||||
|
def on_open(ws):
|
||||||
|
logger.info("TEMPLATED DASHBOARD: WebSocket connected")
|
||||||
|
self.is_streaming = True
|
||||||
|
ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s"
|
||||||
|
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
|
||||||
|
ws.run_forever()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: WebSocket worker error: {e}")
|
||||||
|
self.is_streaming = False
|
||||||
|
|
||||||
|
def _start_data_collection(self):
|
||||||
|
"""Start background data collection"""
|
||||||
|
data_thread = threading.Thread(target=self._data_worker, daemon=True)
|
||||||
|
data_thread.start()
|
||||||
|
|
||||||
|
def _data_worker(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self._update_session_metrics()
|
||||||
|
time.sleep(5)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"TEMPLATED DASHBOARD: Data collection error: {e}")
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
def _update_session_metrics(self):
|
||||||
|
"""Update session P&L and total fees from closed trades."""
|
||||||
|
try:
|
||||||
|
closed_trades = []
|
||||||
|
if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'):
|
||||||
|
closed_trades = self.trading_executor.get_closed_trades()
|
||||||
|
self.closed_trades = closed_trades
|
||||||
|
if closed_trades:
|
||||||
|
self.session_pnl = sum(trade.get('pnl', 0) for trade in closed_trades)
|
||||||
|
self.total_fees = sum(trade.get('fees', 0) for trade in closed_trades)
|
||||||
|
else:
|
||||||
|
self.session_pnl = 0.0
|
||||||
|
self.total_fees = 0.0
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error updating session metrics: {e}")
|
||||||
|
|
||||||
|
def _connect_to_orchestrator(self):
|
||||||
|
"""Connect to orchestrator for real trading signals"""
|
||||||
|
try:
|
||||||
|
if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'):
|
||||||
|
import asyncio # Added import
|
||||||
|
# from dataclasses import asdict # Moved asdict to top-level import
|
||||||
|
|
||||||
|
def connect_worker():
|
||||||
|
try:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
# No need to run_until_complete here, just register the callback
|
||||||
|
self.orchestrator.add_decision_callback(self._on_trading_decision)
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Successfully connected to orchestrator for trading signals.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Orchestrator connection worker failed: {e}")
|
||||||
|
thread = threading.Thread(target=connect_worker, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
else:
|
||||||
|
logger.warning("TEMPLATED DASHBOARD: Orchestrator not available or doesn\'t support callbacks")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error initiating orchestrator connection: {e}")
|
||||||
|
|
||||||
|
async def _on_trading_decision(self, decision):
|
||||||
|
"""Handle trading decision from orchestrator."""
|
||||||
|
try:
|
||||||
|
action = getattr(decision, 'action', decision.get('action'))
|
||||||
|
if action == 'HOLD':
|
||||||
|
return
|
||||||
|
symbol = getattr(decision, 'symbol', decision.get('symbol', 'ETH/USDT'))
|
||||||
|
if 'ETH' not in symbol.upper():
|
||||||
|
return
|
||||||
|
dashboard_decision = asdict(decision) if not isinstance(decision, dict) else decision.copy()
|
||||||
|
dashboard_decision['timestamp'] = datetime.now()
|
||||||
|
dashboard_decision['executed'] = False
|
||||||
|
self.recent_decisions.append(dashboard_decision)
|
||||||
|
if len(self.recent_decisions) > 200:
|
||||||
|
self.recent_decisions.pop(0)
|
||||||
|
logger.info(f"TEMPLATED DASHBOARD: [ORCHESTRATOR SIGNAL] Received: {action} for {symbol}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error handling trading decision: {e}")
|
||||||
|
|
||||||
|
def _initialize_cob_integration(self):
|
||||||
|
"""Initialize simple COB integration that works without async event loops"""
|
||||||
|
try:
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Initializing simple COB integration for model feeding")
|
||||||
|
|
||||||
|
# Initialize COB data storage
|
||||||
|
self.cob_bucketed_data = {
|
||||||
|
'ETH/USDT': {},
|
||||||
|
'BTC/USDT': {}
|
||||||
|
}
|
||||||
|
self.cob_last_update: Dict[str, Optional[float]] = {
|
||||||
|
'ETH/USDT': None,
|
||||||
|
'BTC/USDT': None
|
||||||
|
} # Corrected type hint
|
||||||
|
|
||||||
|
# Start simple COB data collection
|
||||||
|
self._start_simple_cob_collection()
|
||||||
|
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Simple COB integration initialized successfully")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error initializing COB integration: {e}")
|
||||||
|
self.cob_integration = None
|
||||||
|
|
||||||
|
def _start_simple_cob_collection(self):
|
||||||
|
"""Start simple COB data collection using REST APIs (no async required)"""
|
||||||
|
try:
|
||||||
|
# threading and time already imported
|
||||||
|
|
||||||
|
def cob_collector():
|
||||||
|
"""Collect COB data using simple REST API calls"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Collect data for both symbols
|
||||||
|
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||||
|
self._collect_simple_cob_data(symbol)
|
||||||
|
|
||||||
|
# Sleep for 1 second between collections
|
||||||
|
time.sleep(1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: Error in COB collection: {e}")
|
||||||
|
time.sleep(5) # Wait longer on error
|
||||||
|
|
||||||
|
# Start collector in background thread
|
||||||
|
cob_thread = threading.Thread(target=cob_collector, daemon=True)
|
||||||
|
cob_thread.start()
|
||||||
|
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Simple COB data collection started")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error starting COB collection: {e}")
|
||||||
|
|
||||||
|
def _collect_simple_cob_data(self, symbol: str):
|
||||||
|
"""Collect simple COB data using Binance REST API"""
|
||||||
|
try:
|
||||||
|
import requests # Added import
|
||||||
|
# time already imported
|
||||||
|
|
||||||
|
# Use Binance REST API for order book data
|
||||||
|
binance_symbol = symbol.replace('/', '')
|
||||||
|
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500"
|
||||||
|
|
||||||
|
response = requests.get(url, timeout=5)
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
# Process order book data
|
||||||
|
bids = []
|
||||||
|
asks = []
|
||||||
|
|
||||||
|
# Process bids (buy orders)
|
||||||
|
for bid in data['bids'][:100]: # Top 100 levels
|
||||||
|
price = float(bid[0])
|
||||||
|
size = float(bid[1])
|
||||||
|
bids.append({
|
||||||
|
'price': price,
|
||||||
|
'size': size,
|
||||||
|
'total': price * size
|
||||||
|
})
|
||||||
|
|
||||||
|
# Process asks (sell orders)
|
||||||
|
for ask in data['asks'][:100]: # Top 100 levels
|
||||||
|
price = float(ask[0])
|
||||||
|
size = float(ask[1])
|
||||||
|
asks.append({
|
||||||
|
'price': price,
|
||||||
|
'size': size,
|
||||||
|
'total': price * size
|
||||||
|
})
|
||||||
|
|
||||||
|
# Calculate statistics
|
||||||
|
if bids and asks:
|
||||||
|
best_bid = max(bids, key=lambda x: x['price'])
|
||||||
|
best_ask = min(asks, key=lambda x: x['price'])
|
||||||
|
mid_price = (best_bid['price'] + best_ask['price']) / 2
|
||||||
|
spread_bps = ((best_ask['price'] - best_bid['price']) / mid_price) * 10000 if mid_price > 0 else 0
|
||||||
|
|
||||||
|
total_bid_liquidity = sum(bid['total'] for bid in bids[:20])
|
||||||
|
total_ask_liquidity = sum(ask['total'] for ask in asks[:20])
|
||||||
|
total_liquidity = total_bid_liquidity + total_ask_liquidity
|
||||||
|
imbalance = (total_bid_liquidity - total_ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
||||||
|
|
||||||
|
# Create COB snapshot
|
||||||
|
cob_snapshot = {
|
||||||
|
'symbol': symbol,
|
||||||
|
'timestamp': time.time(),
|
||||||
|
'bids': bids,
|
||||||
|
'asks': asks,
|
||||||
|
'stats': {
|
||||||
|
'mid_price': mid_price,
|
||||||
|
'spread_bps': spread_bps,
|
||||||
|
'total_bid_liquidity': total_bid_liquidity,
|
||||||
|
'total_ask_liquidity': total_ask_liquidity,
|
||||||
|
'imbalance': imbalance,
|
||||||
|
'exchanges_active': ['Binance']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Store in history (keep last 15 seconds)
|
||||||
|
self.cob_data_history[symbol].append(cob_snapshot)
|
||||||
|
if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds
|
||||||
|
# Use slicing to remove old elements from deque to ensure correct behavior
|
||||||
|
while len(self.cob_data_history[symbol]) > 15:
|
||||||
|
self.cob_data_history[symbol].popleft()
|
||||||
|
|
||||||
|
# Update latest data
|
||||||
|
self.latest_cob_data[symbol] = cob_snapshot
|
||||||
|
self.cob_last_update[symbol] = time.time()
|
||||||
|
|
||||||
|
# Generate bucketed data for models
|
||||||
|
self._generate_bucketed_cob_data(symbol, cob_snapshot)
|
||||||
|
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: COB data collected for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: Error collecting COB data for {symbol}: {e}")
|
||||||
|
|
||||||
|
def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict):
|
||||||
|
"""Generate bucketed COB data for model feeding"""
|
||||||
|
try:
|
||||||
|
# Create price buckets (1 basis point granularity)
|
||||||
|
bucket_size_bps = 1.0
|
||||||
|
mid_price = cob_snapshot['stats']['mid_price']
|
||||||
|
|
||||||
|
# Initialize buckets
|
||||||
|
buckets = {}
|
||||||
|
|
||||||
|
# Process bids into buckets
|
||||||
|
for bid in cob_snapshot['bids']:
|
||||||
|
price_offset_bps = ((bid['price'] - mid_price) / mid_price) * 10000
|
||||||
|
bucket_key = int(price_offset_bps / bucket_size_bps)
|
||||||
|
|
||||||
|
if bucket_key not in buckets:
|
||||||
|
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
|
||||||
|
|
||||||
|
buckets[bucket_key]['bid_volume'] += bid['total']
|
||||||
|
|
||||||
|
# Process asks into buckets
|
||||||
|
for ask in cob_snapshot['asks']:
|
||||||
|
price_offset_bps = ((ask['price'] - mid_price) / mid_price) * 10000
|
||||||
|
bucket_key = int(price_offset_bps / bucket_size_bps)
|
||||||
|
|
||||||
|
if bucket_key not in buckets:
|
||||||
|
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
|
||||||
|
|
||||||
|
buckets[bucket_key]['ask_volume'] += ask['total']
|
||||||
|
|
||||||
|
# Store bucketed data
|
||||||
|
self.cob_bucketed_data[symbol] = {
|
||||||
|
'timestamp': cob_snapshot['timestamp'],
|
||||||
|
'mid_price': mid_price,
|
||||||
|
'buckets': buckets,
|
||||||
|
'bucket_size_bps': bucket_size_bps
|
||||||
|
}
|
||||||
|
|
||||||
|
# Feed to models
|
||||||
|
self._feed_cob_data_to_models(symbol, cob_snapshot)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}")
|
||||||
|
|
||||||
|
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
||||||
|
"""Calculate average imbalance over multiple time windows."""
|
||||||
|
stats = {}
|
||||||
|
now = time.time()
|
||||||
|
history = self.cob_data_history.get(symbol)
|
||||||
|
|
||||||
|
if not history:
|
||||||
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
|
|
||||||
|
periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
||||||
|
|
||||||
|
for name, duration in periods.items():
|
||||||
|
recent_imbalances = []
|
||||||
|
for snap in history:
|
||||||
|
# Check if snap is a valid dict with timestamp and stats
|
||||||
|
if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']:
|
||||||
|
imbalance = snap['stats'].get('imbalance')
|
||||||
|
if imbalance is not None:
|
||||||
|
recent_imbalances.append(imbalance)
|
||||||
|
|
||||||
|
if recent_imbalances:
|
||||||
|
stats[name] = sum(recent_imbalances) / len(recent_imbalances)
|
||||||
|
else:
|
||||||
|
stats[name] = 0.0
|
||||||
|
|
||||||
|
# Debug logging to verify cumulative imbalance calculation
|
||||||
|
if any(value != 0.0 for value in stats.values()):
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}")
|
||||||
|
|
||||||
|
return stats
|
||||||
|
|
||||||
|
def _feed_cob_data_to_models(self, symbol: str, cob_snapshot: dict):
|
||||||
|
"""Feed COB data to models for training and inference"""
|
||||||
|
try:
|
||||||
|
# Calculate cumulative imbalance for model feeding
|
||||||
|
cumulative_imbalance = self._calculate_cumulative_imbalance(symbol) # Assumes _calculate_cumulative_imbalance is available
|
||||||
|
|
||||||
|
history_data = {
|
||||||
|
'symbol': symbol,
|
||||||
|
'current_snapshot': cob_snapshot,
|
||||||
|
'history': list(self.cob_data_history[symbol]), # Convert deque to list for consistent slicing
|
||||||
|
'bucketed_data': self.cob_bucketed_data[symbol],
|
||||||
|
'cumulative_imbalance': cumulative_imbalance, # Add cumulative imbalance
|
||||||
|
'timestamp': cob_snapshot['timestamp']
|
||||||
|
}
|
||||||
|
|
||||||
|
# Pass to orchestrator for model feeding
|
||||||
|
if self.orchestrator and hasattr(self.orchestrator, 'feed_cob_data'):
|
||||||
|
self.orchestrator.feed_cob_data(symbol, history_data) # Assumes feed_cob_data exists in orchestrator
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: Error feeding COB data to models: {e}")
|
||||||
|
|
||||||
|
def _is_signal_generation_active(self) -> bool:
|
||||||
|
"""Check if signal generation is active (e.g., models are loaded and running)"""
|
||||||
|
# For now, return true to always generate signals
|
||||||
|
# In a real system, this would check model loading status, training status, etc.
|
||||||
|
return True # Simplified for initial integration
|
||||||
|
|
||||||
|
def _start_signal_generation_loop(self):
|
||||||
|
"""Start signal generation loop to ensure continuous trading signals"""
|
||||||
|
try:
|
||||||
|
def signal_worker():
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Signal generation worker started")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Ensure signal generation is active before processing
|
||||||
|
if self._is_signal_generation_active():
|
||||||
|
symbol = 'ETH/USDT' # Focus on ETH for now
|
||||||
|
current_price = self._get_current_price(symbol)
|
||||||
|
if current_price:
|
||||||
|
# Generate a momentum signal (simplified for demo)
|
||||||
|
signal = self._generate_momentum_signal(symbol, current_price) # Assumes _generate_momentum_signal is available
|
||||||
|
if signal:
|
||||||
|
self._process_dashboard_signal(signal) # Assumes _process_dashboard_signal is available
|
||||||
|
|
||||||
|
# Generate a DQN signal if enabled
|
||||||
|
if self.dqn_inference_enabled:
|
||||||
|
dqn_signal = self._generate_dqn_signal(symbol, current_price) # Assumes _generate_dqn_signal is available
|
||||||
|
if dqn_signal:
|
||||||
|
self._process_dashboard_signal(dqn_signal)
|
||||||
|
|
||||||
|
# Generate a CNN pivot signal if enabled
|
||||||
|
if self.cnn_inference_enabled:
|
||||||
|
cnn_signal = self._get_cnn_pivot_prediction() # Assumes _get_cnn_pivot_prediction is available
|
||||||
|
if cnn_signal:
|
||||||
|
self._process_dashboard_signal(cnn_signal)
|
||||||
|
|
||||||
|
# Update session metrics every 1 second interval to reflect new trades
|
||||||
|
self._update_session_metrics()
|
||||||
|
|
||||||
|
time.sleep(1) # Run every second for signal generation
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error in signal worker: {e}")
|
||||||
|
time.sleep(5) # Longer sleep on error
|
||||||
|
|
||||||
|
signal_thread = threading.Thread(target=signal_worker, daemon=True)
|
||||||
|
signal_thread.start()
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Signal generation loop started")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error starting signal generation loop: {e}")
|
||||||
|
|
||||||
|
def _start_actual_training_if_needed(self):
|
||||||
|
"""Start actual model training with real data collection and training loops"""
|
||||||
|
try:
|
||||||
|
if not self.orchestrator:
|
||||||
|
logger.warning("TEMPLATED DASHBOARD: No orchestrator available for training")
|
||||||
|
return
|
||||||
|
logger.info("TEMPLATED DASHBOARD: TRAINING: Starting actual training system with real data collection")
|
||||||
|
self._start_real_training_system()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error starting comprehensive training system: {e}")
|
||||||
|
|
||||||
|
def _start_real_training_system(self):
|
||||||
|
"""Start real training system with data collection and actual model training"""
|
||||||
|
try:
|
||||||
|
# Training performance metrics
|
||||||
|
self.training_performance = {
|
||||||
|
'decision': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||||
|
'cob_rl': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||||
|
'dqn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||||
|
'cnn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
|
||||||
|
'transformer': {'inference_times': [], 'training_times': [], 'total_calls': 0} # Added for transformer
|
||||||
|
}
|
||||||
|
|
||||||
|
def training_coordinator():
|
||||||
|
logger.info("TEMPLATED DASHBOARD: TRAINING: High-frequency training coordinator started")
|
||||||
|
training_iteration = 0
|
||||||
|
last_dqn_training = 0
|
||||||
|
last_cnn_training = 0
|
||||||
|
last_decision_training = 0
|
||||||
|
last_cob_rl_training = 0
|
||||||
|
last_transformer_training = 0 # For transformer
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
training_iteration += 1
|
||||||
|
current_time = time.time()
|
||||||
|
market_data = self._collect_training_data() # Assumes _collect_training_data is available
|
||||||
|
|
||||||
|
if market_data:
|
||||||
|
logger.debug(f"TEMPLATED DASHBOARD: TRAINING: Collected {len(market_data)} market data points for training")
|
||||||
|
|
||||||
|
# High-frequency training for split-second decisions
|
||||||
|
# Train decision fusion and COB RL as fast as hardware allows
|
||||||
|
if current_time - last_decision_training > 0.1: # Every 100ms
|
||||||
|
start_time = time.time()
|
||||||
|
self._perform_real_decision_training(market_data) # Assumes _perform_real_decision_training is available
|
||||||
|
training_time = time.time() - start_time
|
||||||
|
self.training_performance['decision']['training_times'].append(training_time)
|
||||||
|
self.training_performance['decision']['total_calls'] += 1
|
||||||
|
last_decision_training = current_time
|
||||||
|
|
||||||
|
# Keep only last 100 measurements
|
||||||
|
if len(self.training_performance['decision']['training_times']) > 100:
|
||||||
|
self.training_performance['decision']['training_times'] = self.training_performance['decision']['training_times'][-100:]
|
||||||
|
|
||||||
|
# Advanced Transformer Training (every 200ms for comprehensive features)
|
||||||
|
if current_time - last_transformer_training > 0.2: # Every 200ms for transformer
|
||||||
|
start_time = time.time()
|
||||||
|
self._perform_real_transformer_training(market_data) # Assumes _perform_real_transformer_training is available
|
||||||
|
training_time = time.time() - start_time
|
||||||
|
self.training_performance['transformer']['training_times'].append(training_time)
|
||||||
|
self.training_performance['transformer']['total_calls'] += 1
|
||||||
|
last_transformer_training = current_time # Update last training time
|
||||||
|
|
||||||
|
# Keep only last 100 measurements
|
||||||
|
if len(self.training_performance['transformer']['training_times']) > 100:
|
||||||
|
self.training_performance['transformer']['training_times'] = self.training_performance['transformer']['training_times'][-100:]
|
||||||
|
|
||||||
|
if current_time - last_cob_rl_training > 0.1: # Every 100ms
|
||||||
|
start_time = time.time()
|
||||||
|
self._perform_real_cob_rl_training(market_data) # Assumes _perform_real_cob_rl_training is available
|
||||||
|
training_time = time.time() - start_time
|
||||||
|
self.training_performance['cob_rl']['training_times'].append(training_time)
|
||||||
|
self.training_performance['cob_rl']['total_calls'] += 1
|
||||||
|
last_cob_rl_training = current_time
|
||||||
|
|
||||||
|
# Keep only last 100 measurements
|
||||||
|
if len(self.training_performance['cob_rl']['training_times']) > 100:
|
||||||
|
self.training_performance['cob_rl']['training_times'] = self.training_performance['cob_rl']['training_times'][-100:]
|
||||||
|
|
||||||
|
# Standard frequency for larger models
|
||||||
|
if current_time - last_dqn_training > 30:
|
||||||
|
start_time = time.time()
|
||||||
|
self._perform_real_dqn_training(market_data) # Assumes _perform_real_dqn_training is available
|
||||||
|
training_time = time.time() - start_time
|
||||||
|
self.training_performance['dqn']['training_times'].append(training_time)
|
||||||
|
self.training_performance['dqn']['total_calls'] += 1
|
||||||
|
last_dqn_training = current_time
|
||||||
|
|
||||||
|
if len(self.training_performance['dqn']['training_times']) > 50:
|
||||||
|
self.training_performance['dqn']['training_times'] = self.training_performance['dqn']['training_times'][-50:]
|
||||||
|
|
||||||
|
if current_time - last_cnn_training > 45:
|
||||||
|
start_time = time.time()
|
||||||
|
self._perform_real_cnn_training(market_data) # Assumes _perform_real_cnn_training is available
|
||||||
|
training_time = time.time() - start_time
|
||||||
|
self.training_performance['cnn']['training_times'].append(training_time)
|
||||||
|
self.training_performance['cnn']['total_calls'] += 1
|
||||||
|
last_cnn_training = current_time
|
||||||
|
|
||||||
|
if len(self.training_performance['cnn']['training_times']) > 50:
|
||||||
|
self.training_performance['cnn']['training_times'] = self.training_performance['cnn']['training_times'][-50:]
|
||||||
|
|
||||||
|
self._update_training_progress(training_iteration) # Assumes _update_training_progress is available
|
||||||
|
|
||||||
|
# Log performance metrics every 100 iterations
|
||||||
|
if training_iteration % 100 == 0:
|
||||||
|
self._log_training_performance() # Assumes _log_training_performance is available
|
||||||
|
logger.info(f"TEMPLATED DASHBOARD: TRAINING: Iteration {training_iteration} - High-frequency training active")
|
||||||
|
|
||||||
|
# Minimal sleep for maximum responsiveness
|
||||||
|
time.sleep(0.05) # 50ms sleep for 20Hz training loop
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: TRAINING: Error in training iteration {training_iteration}: {e}")
|
||||||
|
time.sleep(1) # Shorter error recovery
|
||||||
|
|
||||||
|
training_thread = threading.Thread(target=training_coordinator, daemon=True)
|
||||||
|
training_thread.start()
|
||||||
|
logger.info("TEMPLATED DASHBOARD: Real training system started")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"TEMPLATED DASHBOARD: Error starting real training system: {e}")
|
||||||
|
|
||||||
def create_templated_dashboard(data_provider: Optional[DataProvider] = None,
|
def create_templated_dashboard(data_provider: Optional[DataProvider] = None,
|
||||||
orchestrator: Optional[TradingOrchestrator] = None,
|
orchestrator: Optional[TradingOrchestrator] = None,
|
||||||
|
Reference in New Issue
Block a user