add decision fusion. training but not enabled.

reports cleanup
This commit is contained in:
Dobromir Popov
2025-07-28 18:22:13 +03:00
parent 233bb9935c
commit bc4b72c6de
57 changed files with 504 additions and 9903 deletions

View File

@ -371,6 +371,7 @@ class TradingOrchestrator:
self._initialize_cob_integration()
self._start_cob_integration_sync() # Start COB integration
self._initialize_decision_fusion() # Initialize fusion system
self._initialize_transformer_model() # Initialize transformer model
self._initialize_enhanced_training_system() # Initialize real-time training
def _initialize_ml_models(self):
@ -380,10 +381,11 @@ class TradingOrchestrator:
# Initialize model state tracking (SSOT) - Updated with current training progress
self.model_states = {
'dqn': {'initial_loss': 0.4120, 'current_loss': 0.0234, 'best_loss': 0.0234, 'checkpoint_loaded': True},
'cnn': {'initial_loss': 0.4120, 'current_loss': 0.0000, 'best_loss': 0.0000, 'checkpoint_loaded': True},
'dqn': {'initial_loss':None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': True},
'cnn': {'initial_loss': None, 'current_loss': None, 'best_loss':None, 'checkpoint_loaded': True},
'cob_rl': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False},
'decision': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False},
'transformer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False},
'extrema_trainer': {'initial_loss': None, 'current_loss': None, 'best_loss': None, 'checkpoint_loaded': False}
}
@ -1404,13 +1406,25 @@ class TradingOrchestrator:
logger.debug(f"No fallback prediction available for {symbol}")
return None
# Combine predictions
decision = self._combine_predictions(
symbol=symbol,
price=current_price,
predictions=predictions,
timestamp=current_time
)
# Choose decision method based on configuration
if (self.decision_fusion_enabled and
self.decision_fusion_mode == 'neural' and
self.decision_fusion_network is not None):
# Use neural decision fusion
decision = self._make_decision_fusion_decision(
symbol=symbol,
predictions=predictions,
current_price=current_price,
timestamp=current_time
)
else:
# Use programmatic decision combination
decision = self._combine_predictions(
symbol=symbol,
price=current_price,
predictions=predictions,
timestamp=current_time
)
# Update state
self.last_decision_time[symbol] = current_time
@ -3607,31 +3621,393 @@ class TradingOrchestrator:
if not self.decision_fusion_enabled:
return
# Create decision fusion network
# Create enhanced decision fusion network
class DecisionFusionNet(nn.Module):
def __init__(self, input_size=32, hidden_size=64):
def __init__(self, input_size=128, hidden_size=256):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# Enhanced architecture for complex decision making
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, 3) # BUY, SELL, HOLD
self.dropout = nn.Dropout(0.2)
self.fc3 = nn.Linear(hidden_size, hidden_size // 2)
self.fc4 = nn.Linear(hidden_size // 2, 3) # BUY, SELL, HOLD
self.dropout = nn.Dropout(0.3)
self.batch_norm1 = nn.BatchNorm1d(hidden_size)
self.batch_norm2 = nn.BatchNorm1d(hidden_size)
self.batch_norm3 = nn.BatchNorm1d(hidden_size // 2)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.batch_norm1(self.fc1(x)))
x = self.dropout(x)
x = torch.relu(self.fc2(x))
x = torch.relu(self.batch_norm2(self.fc2(x)))
x = self.dropout(x)
return torch.softmax(self.fc3(x), dim=1)
x = torch.relu(self.batch_norm3(self.fc3(x)))
x = self.dropout(x)
return torch.softmax(self.fc4(x), dim=1)
def save(self, filepath: str):
"""Save the decision fusion network"""
torch.save({
'model_state_dict': self.state_dict(),
'input_size': self.input_size,
'hidden_size': self.hidden_size
}, filepath)
logger.info(f"Decision fusion network saved to {filepath}")
def load(self, filepath: str):
"""Load the decision fusion network"""
checkpoint = torch.load(filepath, map_location=self.device if hasattr(self, 'device') else 'cpu')
self.load_state_dict(checkpoint['model_state_dict'])
logger.info(f"Decision fusion network loaded from {filepath}")
self.decision_fusion_network = DecisionFusionNet()
# Get decision fusion configuration
decision_fusion_config = self.config.orchestrator.get('decision_fusion', {})
input_size = decision_fusion_config.get('input_size', 128)
hidden_size = decision_fusion_config.get('hidden_size', 256)
self.decision_fusion_network = DecisionFusionNet(input_size=input_size, hidden_size=hidden_size)
# Move decision fusion network to the device
self.decision_fusion_network.to(self.device)
# Initialize decision fusion mode
self.decision_fusion_mode = decision_fusion_config.get('mode', 'neural')
self.decision_fusion_enabled = decision_fusion_config.get('enabled', True)
self.decision_fusion_history_length = decision_fusion_config.get('history_length', 20)
self.decision_fusion_training_interval = decision_fusion_config.get('training_interval', 100)
self.decision_fusion_min_samples = decision_fusion_config.get('min_samples_for_training', 50)
# Initialize decision fusion training data
self.decision_fusion_training_data = []
self.decision_fusion_decisions_count = 0
# Try to load existing checkpoint
try:
from utils.checkpoint_manager import load_best_checkpoint
result = load_best_checkpoint("decision", "decision")
if result:
file_path, metadata = result
self.decision_fusion_network.load(file_path)
self.model_states['decision']['checkpoint_loaded'] = True
self.model_states['decision']['checkpoint_filename'] = metadata.checkpoint_id
logger.info(f"Decision fusion network loaded from checkpoint: {metadata.checkpoint_id}")
else:
logger.info("No existing decision fusion checkpoint found, starting fresh")
except Exception as e:
logger.warning(f"Error loading decision fusion checkpoint: {e}")
logger.info("Decision fusion network starting fresh")
logger.info(f"Decision fusion network initialized on device: {self.device}")
logger.info(f"Decision fusion mode: {self.decision_fusion_mode}")
except Exception as e:
logger.warning(f"Decision fusion initialization failed: {e}")
self.decision_fusion_enabled = False
def _create_decision_fusion_input(self, symbol: str, predictions: List[Prediction],
current_price: float, timestamp: datetime) -> torch.Tensor:
"""Create input features for the decision fusion network"""
try:
features = []
# 1. Market data features (standard input)
market_data = self._get_current_market_data(symbol)
if market_data:
# Price features
features.extend([
current_price,
market_data.get('volume', 0.0),
market_data.get('rsi', 50.0) / 100.0, # Normalize RSI
market_data.get('macd', 0.0),
market_data.get('bollinger_upper', current_price) / current_price - 1.0,
market_data.get('bollinger_lower', current_price) / current_price - 1.0,
])
else:
# Fallback features
features.extend([current_price, 0.0, 0.5, 0.0, 0.0, 0.0])
# 2. Model prediction features (up to 20 recent decisions per model)
model_names = ['dqn', 'cnn', 'transformer', 'cob_rl']
for model_name in model_names:
model_stats = self.model_statistics.get(model_name)
if model_stats:
# Model performance metrics
features.extend([
model_stats.accuracy or 0.0,
model_stats.average_loss or 0.0,
model_stats.best_loss or 0.0,
model_stats.total_inferences or 0.0,
model_stats.total_trainings or 0.0,
])
# Recent predictions (up to 20)
recent_predictions = list(model_stats.predictions_history)[-self.decision_fusion_history_length:]
for pred in recent_predictions:
# Action encoding: BUY=0, SELL=1, HOLD=2
action_encoding = {'BUY': 0.0, 'SELL': 1.0, 'HOLD': 2.0}.get(pred['action'], 2.0)
features.extend([action_encoding, pred['confidence']])
# Pad with zeros if less than 20 predictions
padding_needed = self.decision_fusion_history_length - len(recent_predictions)
features.extend([0.0, 0.0] * padding_needed)
else:
# No model stats available
features.extend([0.0, 0.0, 0.0, 0.0, 0.0] + [0.0, 0.0] * self.decision_fusion_history_length)
# 3. Current predictions features
for pred in predictions:
action_encoding = {'BUY': 0.0, 'SELL': 1.0, 'HOLD': 2.0}.get(pred.action, 2.0)
features.extend([action_encoding, pred.confidence])
# 4. Position and P&L features
current_position_pnl = self._get_current_position_pnl(symbol, current_price)
has_position = self._has_open_position(symbol)
features.extend([
current_position_pnl,
1.0 if has_position else 0.0,
self.entry_aggressiveness,
self.exit_aggressiveness,
])
# 5. Time-based features
features.extend([
timestamp.hour / 24.0, # Hour of day (0-1)
timestamp.minute / 60.0, # Minute of hour (0-1)
timestamp.weekday() / 7.0, # Day of week (0-1)
])
# Ensure we have the expected input size
expected_size = self.decision_fusion_network.input_size
if len(features) < expected_size:
features.extend([0.0] * (expected_size - len(features)))
elif len(features) > expected_size:
features = features[:expected_size]
return torch.tensor(features, dtype=torch.float32, device=self.device).unsqueeze(0)
except Exception as e:
logger.error(f"Error creating decision fusion input: {e}")
# Return zero tensor as fallback
return torch.zeros(1, self.decision_fusion_network.input_size, device=self.device)
def _make_decision_fusion_decision(self, symbol: str, predictions: List[Prediction],
current_price: float, timestamp: datetime) -> TradingDecision:
"""Use the decision fusion network to make trading decisions"""
try:
# Create input features
input_features = self._create_decision_fusion_input(symbol, predictions, current_price, timestamp)
# Get decision fusion network prediction
with torch.no_grad():
output = self.decision_fusion_network(input_features)
probabilities = output.squeeze().cpu().numpy()
# Convert probabilities to action and confidence
action_idx = np.argmax(probabilities)
actions = ['BUY', 'SELL', 'HOLD']
best_action = actions[action_idx]
best_confidence = float(probabilities[action_idx])
# Get current position P&L
current_position_pnl = self._get_current_position_pnl(symbol, current_price)
# Create reasoning
reasoning = {
'method': 'decision_fusion_neural',
'predictions_count': len(predictions),
'models_used': [pred.model_name for pred in predictions],
'fusion_probabilities': {
'BUY': float(probabilities[0]),
'SELL': float(probabilities[1]),
'HOLD': float(probabilities[2])
},
'input_features_size': input_features.shape[1],
'decision_fusion_mode': self.decision_fusion_mode
}
# Apply P&L feedback
best_action, best_confidence = self._apply_pnl_feedback(
best_action, best_confidence, current_position_pnl, symbol, reasoning
)
# Get memory usage
memory_usage = {}
try:
if hasattr(self.model_registry, 'get_memory_stats'):
memory_usage = self.model_registry.get_memory_stats()
except Exception:
pass
# Create final decision
decision = TradingDecision(
action=best_action,
confidence=best_confidence,
symbol=symbol,
price=current_price,
timestamp=timestamp,
reasoning=reasoning,
memory_usage=memory_usage.get('models', {}) if memory_usage else {},
entry_aggressiveness=self.entry_aggressiveness,
exit_aggressiveness=self.exit_aggressiveness,
current_position_pnl=current_position_pnl
)
# Add to training data for future training
self._add_decision_fusion_training_sample(decision, predictions, current_price)
# Trigger training on decision
self._trigger_training_on_decision(decision, current_price)
return decision
except Exception as e:
logger.error(f"Error in decision fusion decision: {e}")
# Fallback to programmatic method
return self._combine_predictions(symbol, current_price, predictions, timestamp)
def _add_decision_fusion_training_sample(self, decision: TradingDecision,
predictions: List[Prediction], current_price: float):
"""Add decision fusion training sample"""
try:
# Create training sample
training_sample = {
'input_features': self._create_decision_fusion_input(
decision.symbol, predictions, current_price, decision.timestamp
),
'target_action': decision.action,
'target_confidence': decision.confidence,
'timestamp': decision.timestamp,
'price': current_price
}
self.decision_fusion_training_data.append(training_sample)
self.decision_fusion_decisions_count += 1
# Train decision fusion network periodically
if (self.decision_fusion_decisions_count % self.decision_fusion_training_interval == 0 and
len(self.decision_fusion_training_data) >= self.decision_fusion_min_samples):
self._train_decision_fusion_network()
except Exception as e:
logger.error(f"Error adding decision fusion training sample: {e}")
def _train_decision_fusion_network(self):
"""Train the decision fusion network on collected data"""
try:
if len(self.decision_fusion_training_data) < self.decision_fusion_min_samples:
return
logger.info(f"Training decision fusion network with {len(self.decision_fusion_training_data)} samples")
# Prepare training data
inputs = []
targets = []
for sample in self.decision_fusion_training_data:
inputs.append(sample['input_features'])
# Create target (one-hot encoding)
action_idx = {'BUY': 0, 'SELL': 1, 'HOLD': 2}[sample['target_action']]
target = torch.zeros(3, device=self.device)
target[action_idx] = 1.0
targets.append(target)
# Stack tensors
inputs = torch.cat(inputs, dim=0)
targets = torch.stack(targets, dim=0)
# Train the network
optimizer = torch.optim.Adam(self.decision_fusion_network.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
self.decision_fusion_network.train()
optimizer.zero_grad()
outputs = self.decision_fusion_network(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
logger.info(f"Decision fusion training completed. Loss: {loss.item():.4f}")
# Clear training data after training
self.decision_fusion_training_data = []
except Exception as e:
logger.error(f"Error training decision fusion network: {e}")
except Exception as e:
logger.warning(f"Decision fusion initialization failed: {e}")
self.decision_fusion_enabled = False
def _initialize_transformer_model(self):
"""Initialize the transformer model for advanced sequence modeling"""
try:
from NN.models.advanced_transformer_trading import create_trading_transformer, TradingTransformerConfig
# Create transformer configuration
config = TradingTransformerConfig(
d_model=512,
n_heads=8,
n_layers=8,
seq_len=100,
n_actions=3,
use_multi_scale_attention=True,
use_market_regime_detection=True,
use_uncertainty_estimation=True,
use_deep_attention=True,
use_residual_connections=True,
use_layer_norm_variants=True
)
# Create transformer model and trainer
self.primary_transformer, self.primary_transformer_trainer = create_trading_transformer(config)
# Try to load existing checkpoint
try:
from utils.checkpoint_manager import load_best_checkpoint
result = load_best_checkpoint("transformer", "transformer")
if result:
file_path, metadata = result
self.primary_transformer_trainer.load_model(file_path)
self.model_states['transformer'] = {
'initial_loss': None,
'current_loss': metadata.performance_metrics.get('loss', None),
'best_loss': metadata.performance_metrics.get('loss', None),
'checkpoint_loaded': True,
'checkpoint_filename': metadata.checkpoint_id
}
logger.info(f"Transformer model loaded from checkpoint: {metadata.checkpoint_id}")
else:
logger.info("No existing transformer checkpoint found, starting fresh")
self.model_states['transformer'] = {
'initial_loss': None,
'current_loss': None,
'best_loss': None,
'checkpoint_loaded': False,
'checkpoint_filename': 'none (fresh start)'
}
except Exception as e:
logger.warning(f"Error loading transformer checkpoint: {e}")
logger.info("Transformer model starting fresh")
self.model_states['transformer'] = {
'initial_loss': None,
'current_loss': None,
'best_loss': None,
'checkpoint_loaded': False,
'checkpoint_filename': 'none (fresh start)'
}
logger.info("Transformer model initialized")
except Exception as e:
logger.warning(f"Transformer model initialization failed: {e}")
self.primary_transformer = None
self.primary_transformer_trainer = None
def _initialize_enhanced_training_system(self):
"""Initialize the enhanced real-time training system"""
try: