fuse decision fusion
This commit is contained in:
@ -657,6 +657,24 @@ class DQNAgent:
|
||||
done: Whether episode is done
|
||||
is_extrema: Whether this is a local extrema sample (for specialized learning)
|
||||
"""
|
||||
# Validate states before storing experience
|
||||
if state is None or next_state is None:
|
||||
logger.debug("Skipping experience storage: None state provided")
|
||||
return
|
||||
|
||||
if isinstance(state, dict) and not state:
|
||||
logger.debug("Skipping experience storage: empty state dictionary")
|
||||
return
|
||||
|
||||
if isinstance(next_state, dict) and not next_state:
|
||||
logger.debug("Skipping experience storage: empty next_state dictionary")
|
||||
return
|
||||
|
||||
# Check if states are all zeros (invalid)
|
||||
if hasattr(state, '__iter__') and all(f == 0 for f in np.array(state).flatten()):
|
||||
logger.debug("Skipping experience storage: state is all zeros")
|
||||
return
|
||||
|
||||
experience = (state, action, reward, next_state, done)
|
||||
|
||||
# Always add to main memory
|
||||
@ -761,6 +779,15 @@ class DQNAgent:
|
||||
int: Action (0=BUY, 1=SELL)
|
||||
"""
|
||||
try:
|
||||
# Validate state first - return early if empty/invalid/None
|
||||
if state is None:
|
||||
logger.warning("None state provided to act(), returning SELL action")
|
||||
return 1 # SELL action (safe default)
|
||||
|
||||
if isinstance(state, dict) and not state:
|
||||
logger.warning("Empty state dictionary provided to act(), returning SELL action")
|
||||
return 1 # SELL action (safe default)
|
||||
|
||||
# Use the DQNNetwork's act method for consistent behavior
|
||||
action_idx, confidence, action_probs = self.policy_net.act(state, explore=explore)
|
||||
|
||||
@ -790,6 +817,14 @@ class DQNAgent:
|
||||
def act_with_confidence(self, state: np.ndarray, market_regime: str = 'trending') -> Tuple[int, float, List[float]]:
|
||||
"""Choose action with confidence score adapted to market regime"""
|
||||
try:
|
||||
# Validate state first - return early if empty/invalid/None
|
||||
if state is None:
|
||||
logger.warning("None state provided to act_with_confidence(), returning safe defaults")
|
||||
return 1, 0.1, [0.0, 0.9, 0.1] # SELL action with low confidence
|
||||
|
||||
if isinstance(state, dict) and not state:
|
||||
logger.warning("Empty state dictionary provided to act_with_confidence(), returning safe defaults")
|
||||
return 1, 0.0, [0.0, 1.0] # SELL action with zero confidence
|
||||
# Convert state to tensor if needed
|
||||
if isinstance(state, np.ndarray):
|
||||
state_tensor = torch.FloatTensor(state)
|
||||
|
@ -368,6 +368,10 @@ class TradingOrchestrator:
|
||||
{}
|
||||
) # {symbol: {action, timestamp, confidence}}
|
||||
|
||||
# Decision fusion overconfidence tracking
|
||||
self.decision_fusion_overconfidence_count = 0
|
||||
self.max_overconfidence_threshold = 3 # Disable after 3 overconfidence detections
|
||||
|
||||
# Signal accumulation for trend confirmation
|
||||
self.signal_accumulator: Dict[str, List[Dict]] = (
|
||||
{}
|
||||
@ -1385,6 +1389,30 @@ class TradingOrchestrator:
|
||||
"""Check if model training is enabled"""
|
||||
return self.model_toggle_states.get(model_name, {}).get("training_enabled", True)
|
||||
|
||||
def disable_decision_fusion_temporarily(self, reason: str = "overconfidence detected"):
|
||||
"""Temporarily disable decision fusion model due to issues"""
|
||||
logger.warning(f"Disabling decision fusion model: {reason}")
|
||||
self.set_model_toggle_state("decision_fusion", inference_enabled=False, training_enabled=False)
|
||||
logger.info("Decision fusion model disabled. Will use programmatic decision combination.")
|
||||
|
||||
def enable_decision_fusion(self):
|
||||
"""Re-enable decision fusion model"""
|
||||
logger.info("Re-enabling decision fusion model")
|
||||
self.set_model_toggle_state("decision_fusion", inference_enabled=True, training_enabled=True)
|
||||
self.decision_fusion_overconfidence_count = 0 # Reset overconfidence counter
|
||||
|
||||
def get_decision_fusion_status(self) -> Dict[str, Any]:
|
||||
"""Get current decision fusion model status"""
|
||||
return {
|
||||
"enabled": self.decision_fusion_enabled,
|
||||
"mode": self.decision_fusion_mode,
|
||||
"inference_enabled": self.is_model_inference_enabled("decision_fusion"),
|
||||
"training_enabled": self.is_model_training_enabled("decision_fusion"),
|
||||
"network_available": self.decision_fusion_network is not None,
|
||||
"overconfidence_count": self.decision_fusion_overconfidence_count,
|
||||
"max_overconfidence_threshold": self.max_overconfidence_threshold
|
||||
}
|
||||
|
||||
async def start_continuous_trading(self, symbols: Optional[List[str]] = None):
|
||||
"""Start the continuous trading loop, using a decision model and trading executor"""
|
||||
if symbols is None:
|
||||
@ -1959,6 +1987,14 @@ class TradingOrchestrator:
|
||||
decision, predictions, current_price
|
||||
)
|
||||
|
||||
# Train fusion model in programmatic mode at regular intervals
|
||||
self.decision_fusion_decisions_count += 1
|
||||
if (self.decision_fusion_decisions_count % self.decision_fusion_training_interval == 0 and
|
||||
len(self.decision_fusion_training_data) >= self.decision_fusion_min_samples):
|
||||
|
||||
logger.info(f"Training decision fusion model in programmatic mode (decision #{self.decision_fusion_decisions_count})")
|
||||
asyncio.create_task(self._train_decision_fusion_programmatic())
|
||||
|
||||
# Update state
|
||||
self.last_decision_time[symbol] = current_time
|
||||
if symbol not in self.recent_decisions:
|
||||
@ -3976,7 +4012,7 @@ class TradingOrchestrator:
|
||||
"""Train COB RL model"""
|
||||
try:
|
||||
# COB RL models might have specific training methods
|
||||
if hasattr(model, "add_experience"):
|
||||
if hasattr(model, "remember"):
|
||||
action_names = ["SELL", "HOLD", "BUY"]
|
||||
action_idx = action_names.index(prediction["action"])
|
||||
|
||||
@ -3988,7 +4024,7 @@ class TradingOrchestrator:
|
||||
)
|
||||
return False
|
||||
|
||||
model.add_experience(
|
||||
model.remember(
|
||||
state=state,
|
||||
action=action_idx,
|
||||
reward=reward,
|
||||
@ -4569,12 +4605,31 @@ class TradingOrchestrator:
|
||||
if base_data is None:
|
||||
base_data = self.data_provider.build_base_data_input(symbol)
|
||||
if not base_data:
|
||||
logger.warning(f"Cannot build BaseDataInput for RL state: {symbol}")
|
||||
logger.debug(f"Cannot build BaseDataInput for RL state: {symbol}")
|
||||
return None
|
||||
|
||||
# Validate base_data has the required method
|
||||
if not hasattr(base_data, 'get_feature_vector'):
|
||||
logger.debug(f"BaseDataInput for {symbol} missing get_feature_vector method")
|
||||
return None
|
||||
|
||||
# Get unified feature vector (7850 features including all timeframes and COB data)
|
||||
feature_vector = base_data.get_feature_vector()
|
||||
|
||||
# Validate feature vector
|
||||
if feature_vector is None or len(feature_vector) == 0:
|
||||
logger.debug(f"Empty feature vector for RL state: {symbol}")
|
||||
return None
|
||||
|
||||
# Check if all features are zero (invalid state)
|
||||
if all(f == 0 for f in feature_vector):
|
||||
logger.debug(f"All features are zero for RL state: {symbol}")
|
||||
return None
|
||||
|
||||
# Convert to numpy array if needed
|
||||
if not isinstance(feature_vector, np.ndarray):
|
||||
feature_vector = np.array(feature_vector, dtype=np.float32)
|
||||
|
||||
# Return the full unified feature vector for RL agent
|
||||
# The DQN agent is now initialized with the correct size to match this
|
||||
return feature_vector
|
||||
@ -5152,6 +5207,55 @@ class TradingOrchestrator:
|
||||
logger.warning(f"Decision fusion initialization failed: {e}")
|
||||
self.decision_fusion_enabled = False
|
||||
|
||||
async def _train_decision_fusion_programmatic(self):
|
||||
"""Train decision fusion model in programmatic mode"""
|
||||
try:
|
||||
if not self.decision_fusion_network or len(self.decision_fusion_training_data) < self.decision_fusion_min_samples:
|
||||
return
|
||||
|
||||
logger.info(f"Training decision fusion model with {len(self.decision_fusion_training_data)} samples")
|
||||
|
||||
# Prepare training data
|
||||
inputs = []
|
||||
targets = []
|
||||
|
||||
for sample in self.decision_fusion_training_data[-100:]: # Use last 100 samples
|
||||
if 'input_features' in sample and 'outcome' in sample:
|
||||
inputs.append(sample['input_features'])
|
||||
# Convert outcome to target (1.0 for correct, 0.0 for incorrect)
|
||||
target = 1.0 if sample['outcome']['correct'] else 0.0
|
||||
targets.append(target)
|
||||
|
||||
if len(inputs) < 10: # Need minimum samples
|
||||
return
|
||||
|
||||
# Convert to tensors
|
||||
inputs_tensor = torch.tensor(inputs, dtype=torch.float32, device=self.device)
|
||||
targets_tensor = torch.tensor(targets, dtype=torch.float32, device=self.device)
|
||||
|
||||
# Training step
|
||||
self.decision_fusion_network.train()
|
||||
optimizer = torch.optim.Adam(self.decision_fusion_network.parameters(), lr=0.001)
|
||||
|
||||
optimizer.zero_grad()
|
||||
outputs = self.decision_fusion_network(inputs_tensor)
|
||||
loss = torch.nn.MSELoss()(outputs.squeeze(), targets_tensor)
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
# Update statistics
|
||||
current_loss = loss.item()
|
||||
self.update_model_loss("decision_fusion", current_loss)
|
||||
|
||||
logger.info(f"Decision fusion training completed: loss={current_loss:.4f}, samples={len(inputs)}")
|
||||
|
||||
# Save checkpoint periodically
|
||||
if self.decision_fusion_decisions_count % (self.decision_fusion_training_interval * 5) == 0:
|
||||
self._save_decision_fusion_checkpoint()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error training decision fusion in programmatic mode: {e}")
|
||||
|
||||
def _create_decision_fusion_input(
|
||||
self,
|
||||
symbol: str,
|
||||
@ -5293,17 +5397,44 @@ class TradingOrchestrator:
|
||||
symbol, predictions, current_price, timestamp
|
||||
)
|
||||
|
||||
# DEBUG: Log decision fusion input features
|
||||
logger.info(f"=== DECISION FUSION INPUT FEATURES ===")
|
||||
logger.info(f" Input shape: {input_features.shape}")
|
||||
logger.info(f" Input features (first 20): {input_features[0, :20].cpu().numpy()}")
|
||||
logger.info(f" Input features (last 20): {input_features[0, -20:].cpu().numpy()}")
|
||||
logger.info(f" Input features mean: {input_features.mean().item():.4f}")
|
||||
logger.info(f" Input features std: {input_features.std().item():.4f}")
|
||||
|
||||
# Get decision fusion network prediction
|
||||
with torch.no_grad():
|
||||
output = self.decision_fusion_network(input_features)
|
||||
probabilities = output.squeeze().cpu().numpy()
|
||||
|
||||
# DEBUG: Log decision fusion outputs
|
||||
logger.info(f"=== DECISION FUSION OUTPUTS ===")
|
||||
logger.info(f" Raw output shape: {output.shape}")
|
||||
logger.info(f" Probabilities: BUY={probabilities[0]:.4f}, SELL={probabilities[1]:.4f}, HOLD={probabilities[2]:.4f}")
|
||||
logger.info(f" Probability sum: {probabilities.sum():.4f}")
|
||||
|
||||
# Convert probabilities to action and confidence
|
||||
action_idx = np.argmax(probabilities)
|
||||
actions = ["BUY", "SELL", "HOLD"]
|
||||
best_action = actions[action_idx]
|
||||
best_confidence = float(probabilities[action_idx])
|
||||
|
||||
# DEBUG: Check for overconfidence
|
||||
if best_confidence > 0.95:
|
||||
self.decision_fusion_overconfidence_count += 1
|
||||
logger.warning(f"DECISION FUSION OVERCONFIDENCE DETECTED: {best_confidence:.3f} for {best_action} (count: {self.decision_fusion_overconfidence_count})")
|
||||
|
||||
if self.decision_fusion_overconfidence_count >= self.max_overconfidence_threshold:
|
||||
logger.error(f"Decision fusion overconfidence threshold reached ({self.max_overconfidence_threshold}). Disabling model.")
|
||||
self.disable_decision_fusion_temporarily("overconfidence threshold exceeded")
|
||||
# Fallback to programmatic method
|
||||
return self._combine_predictions(
|
||||
symbol, current_price, predictions, timestamp
|
||||
)
|
||||
|
||||
# Get current position P&L
|
||||
current_position_pnl = self._get_current_position_pnl(symbol, current_price)
|
||||
|
||||
@ -6402,31 +6533,44 @@ class TradingOrchestrator:
|
||||
models_trained = []
|
||||
|
||||
# Train DQN agent if available and enabled
|
||||
if self.rl_agent and hasattr(self.rl_agent, "add_experience") and self.is_model_training_enabled("dqn"):
|
||||
if self.rl_agent and hasattr(self.rl_agent, "remember") and self.is_model_training_enabled("dqn"):
|
||||
try:
|
||||
# Create state representation from base_data (same as CNN model)
|
||||
state = self._create_state_from_base_data(symbol, base_data)
|
||||
# Validate base_data before creating state
|
||||
if not base_data or not hasattr(base_data, 'get_feature_vector'):
|
||||
logger.debug(f"⚠️ Skipping DQN training for {symbol}: no valid base_data")
|
||||
else:
|
||||
# Check if base_data has actual features
|
||||
features = base_data.get_feature_vector()
|
||||
if not features or len(features) == 0 or all(f == 0 for f in features):
|
||||
logger.debug(f"⚠️ Skipping DQN training for {symbol}: no valid features in base_data")
|
||||
else:
|
||||
# Create state representation from base_data (same as CNN model)
|
||||
state = self._create_state_from_base_data(symbol, base_data)
|
||||
|
||||
# Map action to DQN action space - CONSISTENT ACTION MAPPING
|
||||
action_mapping = {"BUY": 0, "SELL": 1, "HOLD": 2}
|
||||
dqn_action = action_mapping.get(action, 2)
|
||||
# Skip training if no valid state could be created
|
||||
if state is None:
|
||||
logger.debug(f"⚠️ Skipping DQN training for {symbol}: could not create valid state")
|
||||
else:
|
||||
# Map action to DQN action space - CONSISTENT ACTION MAPPING
|
||||
action_mapping = {"BUY": 0, "SELL": 1, "HOLD": 2}
|
||||
dqn_action = action_mapping.get(action, 2)
|
||||
|
||||
# Calculate immediate reward based on confidence and execution
|
||||
immediate_reward = confidence if action != "HOLD" else 0.0
|
||||
# Calculate immediate reward based on confidence and execution
|
||||
immediate_reward = confidence if action != "HOLD" else 0.0
|
||||
|
||||
# Add experience to DQN
|
||||
self.rl_agent.add_experience(
|
||||
state=state,
|
||||
action=dqn_action,
|
||||
reward=immediate_reward,
|
||||
next_state=state, # Will be updated with actual outcome later
|
||||
done=False,
|
||||
)
|
||||
# Add experience to DQN
|
||||
self.rl_agent.remember(
|
||||
state=state,
|
||||
action=dqn_action,
|
||||
reward=immediate_reward,
|
||||
next_state=state, # Will be updated with actual outcome later
|
||||
done=False,
|
||||
)
|
||||
|
||||
models_trained.append("dqn")
|
||||
logger.debug(
|
||||
f"🧠 Added DQN experience: {action} {symbol} (reward: {immediate_reward:.3f})"
|
||||
)
|
||||
models_trained.append("dqn")
|
||||
logger.debug(
|
||||
f"🧠 Added DQN experience: {action} {symbol} (reward: {immediate_reward:.3f})"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error training DQN on decision: {e}")
|
||||
@ -6462,18 +6606,19 @@ class TradingOrchestrator:
|
||||
if self.cob_rl_agent and symbol in self.latest_cob_data and self.is_model_training_enabled("cob_rl"):
|
||||
try:
|
||||
cob_data = self.latest_cob_data[symbol]
|
||||
if hasattr(self.cob_rl_agent, "add_experience"):
|
||||
if hasattr(self.cob_rl_agent, "remember"):
|
||||
# Create COB state representation
|
||||
cob_state = self._create_cob_state_for_training(
|
||||
symbol, cob_data
|
||||
)
|
||||
|
||||
# Add COB experience
|
||||
self.cob_rl_agent.add_experience(
|
||||
self.cob_rl_agent.remember(
|
||||
state=cob_state,
|
||||
action=action,
|
||||
reward=confidence,
|
||||
symbol=symbol,
|
||||
next_state=cob_state, # Add required next_state parameter
|
||||
done=False, # Add required done parameter
|
||||
)
|
||||
|
||||
models_trained.append("cob_rl")
|
||||
@ -6638,20 +6783,25 @@ class TradingOrchestrator:
|
||||
logger.error(f"Error getting market data for training {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def _create_state_from_base_data(self, symbol: str, base_data: Any) -> np.ndarray:
|
||||
def _create_state_from_base_data(self, symbol: str, base_data: Any) -> Optional[np.ndarray]:
|
||||
"""Create state representation for DQN training from base_data (same as CNN model)"""
|
||||
try:
|
||||
# Validate base_data
|
||||
if not base_data or not hasattr(base_data, 'get_feature_vector'):
|
||||
logger.warning(f"Invalid base_data for {symbol}: {type(base_data)}")
|
||||
return np.zeros(403) # Default state size for DQN
|
||||
logger.debug(f"Invalid base_data for {symbol}: {type(base_data)}")
|
||||
return None
|
||||
|
||||
# Get feature vector from base_data (same as CNN model)
|
||||
features = base_data.get_feature_vector()
|
||||
|
||||
if not features or len(features) == 0:
|
||||
logger.warning(f"No features available from base_data for {symbol}, using default state")
|
||||
return np.zeros(403) # Default state size for DQN
|
||||
logger.debug(f"No features available from base_data for {symbol}")
|
||||
return None
|
||||
|
||||
# Check if all features are zero (invalid state)
|
||||
if all(f == 0 for f in features):
|
||||
logger.debug(f"All features are zero for {symbol}")
|
||||
return None
|
||||
|
||||
# Convert to numpy array
|
||||
state = np.array(features, dtype=np.float32)
|
||||
@ -6671,7 +6821,7 @@ class TradingOrchestrator:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating state from base_data for {symbol}: {e}")
|
||||
return np.zeros(403) # Default state size for DQN
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
25
data/ui_state.json
Normal file
25
data/ui_state.json
Normal file
@ -0,0 +1,25 @@
|
||||
{
|
||||
"model_toggle_states": {
|
||||
"dqn": {
|
||||
"inference_enabled": false,
|
||||
"training_enabled": true
|
||||
},
|
||||
"cnn": {
|
||||
"inference_enabled": true,
|
||||
"training_enabled": true
|
||||
},
|
||||
"cob_rl": {
|
||||
"inference_enabled": true,
|
||||
"training_enabled": true
|
||||
},
|
||||
"decision_fusion": {
|
||||
"inference_enabled": false,
|
||||
"training_enabled": true
|
||||
},
|
||||
"transformer": {
|
||||
"inference_enabled": true,
|
||||
"training_enabled": true
|
||||
}
|
||||
},
|
||||
"timestamp": "2025-07-29T09:07:36.747677"
|
||||
}
|
Reference in New Issue
Block a user