diff --git a/core/orchestrator.py b/core/orchestrator.py index ca351fb..620f61a 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -18,7 +18,7 @@ import threading import numpy as np import pandas as pd from datetime import datetime, timedelta -from typing import Dict, List, Optional, Any, Tuple, Union +from typing import Dict, List, Optional, Any, Tuple, Union, Deque from dataclasses import dataclass, field from collections import deque import json @@ -325,6 +325,13 @@ class TradingOrchestrator: "DECISION": ["decision_fusion", "decision"], } + # Recent inference buffer for vector supervision (configurable length) + self.recent_inference_maxlen: int = self.config.orchestrator.get( + "recent_inference_buffer", 10 + ) + # Model name -> deque of recent inference records + self.recent_inferences: Dict[str, Deque[Dict]] = {} + # Configuration - AGGRESSIVE for more training data self.confidence_threshold = self.config.orchestrator.get( "confidence_threshold", 0.15 @@ -1166,53 +1173,64 @@ class TradingOrchestrator: price_direction_pred: torch.Tensor, rewards: torch.Tensor, actions: torch.Tensor, - ) -> torch.Tensor: + target_vector: Optional[Dict[str, float]] = None, + ) -> Optional[torch.Tensor]: """ - Calculate price direction loss for CNN model + Calculate price direction loss for CNN model. + + If target_vector is provided, perform supervised regression towards the + explicit direction/confidence. Otherwise, derive weak targets from + rewards and actions. Args: - price_direction_pred: Tensor of shape [batch, 2] containing [direction, confidence] - rewards: Tensor of shape [batch] containing rewards - actions: Tensor of shape [batch] containing actions + price_direction_pred: [batch, 2] = [direction, confidence] + rewards: [batch] + actions: [batch] + target_vector: Optional dict {'direction': float, 'confidence': float} Returns: - Price direction loss tensor + Loss tensor or None. """ try: if price_direction_pred.size(1) != 2: return None batch_size = price_direction_pred.size(0) + direction_pred = price_direction_pred[:, 0] + confidence_pred = price_direction_pred[:, 1] - # Extract direction and confidence predictions - direction_pred = price_direction_pred[:, 0] # -1 to 1 - confidence_pred = price_direction_pred[:, 1] # 0 to 1 + # Supervised targets from explicit vector if available + if target_vector and isinstance(target_vector, dict): + try: + t_dir = float(target_vector.get("direction", 0.0)) + t_conf = float(target_vector.get("confidence", 0.0)) + direction_targets = torch.full( + (batch_size,), t_dir, device=price_direction_pred.device, dtype=direction_pred.dtype + ) + confidence_targets = torch.full( + (batch_size,), t_conf, device=price_direction_pred.device, dtype=confidence_pred.dtype + ) + dir_loss = nn.MSELoss()(direction_pred, direction_targets) + conf_loss = nn.MSELoss()(confidence_pred, confidence_targets) + return dir_loss + 0.3 * conf_loss + except Exception: + # Fall back to weak supervision below + pass - # Create targets based on rewards and actions + # Weak supervision from rewards/actions with torch.no_grad(): - # Direction targets: 1 if reward > 0 and action is BUY, -1 if reward > 0 and action is SELL, 0 otherwise - direction_targets = torch.zeros( - batch_size, device=price_direction_pred.device - ) + direction_targets = torch.zeros(batch_size, device=price_direction_pred.device) for i in range(batch_size): - if rewards[i] > 0.01: # Positive reward threshold - if actions[i] == 0: # BUY action - direction_targets[i] = 1.0 # UP - elif actions[i] == 1: # SELL action - direction_targets[i] = -1.0 # DOWN - # else: targets remain 0 (sideways) - - # Confidence targets: based on reward magnitude (higher reward = higher confidence) + if rewards[i] > 0.01: + if actions[i] == 0: # BUY + direction_targets[i] = 1.0 + elif actions[i] == 1: # SELL + direction_targets[i] = -1.0 confidence_targets = torch.abs(rewards).clamp(0, 1) - # Calculate losses for each component - direction_loss = nn.MSELoss()(direction_pred, direction_targets) - confidence_loss = nn.MSELoss()(confidence_pred, confidence_targets) - - # Combined loss (direction is more important than confidence) - total_loss = direction_loss + 0.3 * confidence_loss - - return total_loss + dir_loss = nn.MSELoss()(direction_pred, direction_targets) + conf_loss = nn.MSELoss()(confidence_pred, confidence_targets) + return dir_loss + 0.3 * conf_loss except Exception as e: logger.debug(f"Error calculating CNN price direction loss: {e}") @@ -1365,13 +1383,13 @@ class TradingOrchestrator: if hasattr(callback, "clear_session"): callback.clear_session() - logger.info("✅ Orchestrator session data cleared") + logger.info("Orchestrator session data cleared") logger.info("🧠 Model states preserved for continued training") logger.info("📊 Prediction history cleared") logger.info("💼 Position tracking reset") except Exception as e: - logger.error(f"❌ Error clearing orchestrator session data: {e}") + logger.error(f"Error clearing orchestrator session data: {e}") def sync_model_states_with_dashboard(self): """Sync model states with current dashboard values""" @@ -2931,6 +2949,14 @@ class TradingOrchestrator: # Store only the last inference per model (for immediate training) self.last_inference[model_name] = inference_record + # Push into in-memory recent buffer immediately + try: + if model_name not in self.recent_inferences: + self.recent_inferences[model_name] = deque(maxlen=self.recent_inference_maxlen) + self.recent_inferences[model_name].append(inference_record) + except Exception as e: + logger.debug(f"Unable to append to recent buffer for {model_name}: {e}") + # Also save to database using database manager for future training and analysis asyncio.create_task( self._save_to_database_manager_async(model_name, inference_record) @@ -3022,6 +3048,8 @@ class TradingOrchestrator: # Run database operation in thread pool to avoid blocking await asyncio.get_event_loop().run_in_executor(None, save_to_db) + # Note: in-memory recent buffer is appended in _store_inference_data_async + def get_last_inference_status(self) -> Dict[str, Any]: """Get status of last inferences for all models""" status = {} @@ -3299,6 +3327,15 @@ class TradingOrchestrator: f"Triggering immediate training for {model_name} with current price: {current_price}" ) + # Before evaluating the single record, compute a short-horizon direction vector + # from recent inferences and attach to the prediction for vector supervision. + try: + vector = self._compute_recent_direction_vector(model_name, symbol) + if vector is not None: + inference_record.setdefault("prediction", {})["price_direction"] = vector + except Exception as e: + logger.debug(f"Vector computation failed for {model_name}: {e}") + # Evaluate the previous prediction and train the model immediately await self._evaluate_and_train_on_record(inference_record, current_price) @@ -3441,7 +3478,7 @@ class TradingOrchestrator: ): # Price stayed stable was_correct = True - outcome_status = "✅ CORRECT" if was_correct else "❌ INCORRECT" + outcome_status = "CORRECT" if was_correct else "INCORRECT" # Get model statistics for enhanced logging model_stats = self.get_model_statistics(model_name) @@ -4027,6 +4064,86 @@ class TradingOrchestrator: logger.error(f"Error calculating price vector bonus: {e}") return 0.0 + def _compute_recent_direction_vector(self, model_name: str, symbol: str) -> Optional[Dict[str, float]]: + """ + Compute a price direction vector from recent stored inferences by comparing + current price with prices at the times of those inferences. + + Returns a dict: {'direction': float in [-1,1], 'confidence': float in [0,1]} + """ + try: + from statistics import median + recent = self.recent_inferences.get(model_name) + if not recent or len(recent) < 2: + return None + + # Gather tuples (delta_pct, age_seconds) for last N inferences with stored price + deltas = [] + now_price = self._get_current_price(symbol) + if now_price is None or now_price <= 0: + return None + + for rec in list(recent): + infer_price = rec.get("inference_price") + ts = rec.get("timestamp") + if isinstance(ts, str): + try: + ts = datetime.fromisoformat(ts) + except Exception: + ts = None + if infer_price is None or infer_price <= 0 or ts is None: + continue + + pct = (now_price - infer_price) / infer_price * 100.0 + age_sec = max(1.0, (datetime.now() - ts).total_seconds()) + deltas.append((pct, age_sec)) + + if not deltas: + return None + + # Weight recent observations more: weight = 1 / sqrt(age_seconds) + weighted_sum = 0.0 + weight_total = 0.0 + magnitudes = [] + for pct, age in deltas: + w = 1.0 / (age ** 0.5) + weighted_sum += pct * w + weight_total += w + magnitudes.append(abs(pct)) + + if weight_total <= 0: + return None + + avg_pct = weighted_sum / weight_total # signed percentage + + # Map avg_pct to direction in [-1, 1] using tanh on scaled percent (2% -> ~1) + scale = 2.0 + direction = float(np.tanh(avg_pct / scale)) + + # Confidence combines recency, agreement, and magnitude + # Use normalized median magnitude capped at 2% + med_mag = median(magnitudes) if magnitudes else 0.0 + mag_norm = max(0.0, min(1.0, med_mag / 2.0)) + + # Agreement: fraction of deltas with the same sign as avg_pct + if avg_pct > 0: + agree = sum(1 for pct, _ in deltas if pct > 0) / len(deltas) + elif avg_pct < 0: + agree = sum(1 for pct, _ in deltas if pct < 0) / len(deltas) + else: + agree = 0.5 + + # Recency: average weight normalized + recency = max(0.0, min(1.0, (weight_total / len(deltas)) * (1.0 / (1.0 ** 0.5)))) + + confidence = float(max(0.0, min(1.0, 0.5 * agree + 0.4 * mag_norm + 0.1 * recency))) + + return {"direction": direction, "confidence": confidence} + + except Exception as e: + logger.debug(f"Error computing recent direction vector for {model_name}: {e}") + return None + async def _train_model_on_outcome( self, record: Dict, @@ -4556,8 +4673,15 @@ class TradingOrchestrator: price_direction_pred is not None and price_direction_pred.shape[0] > 0 ): + # Supervised vector target from recent inferences if available + vector_target = None + try: + vector_target = self._compute_recent_direction_vector(model_name, symbol) + except Exception: + vector_target = None + price_direction_loss = self._calculate_cnn_price_direction_loss( - price_direction_pred, reward_tensor, action_tensor + price_direction_pred, reward_tensor, action_tensor, vector_target ) if price_direction_loss is not None: total_loss = total_loss + 0.2 * price_direction_loss @@ -7167,7 +7291,7 @@ class TradingOrchestrator: if result and result.get("success"): positions_closed += 1 logger.info( - f"✅ Closed {side} position for {symbol}: {size} units" + f"Closed {side} position for {symbol}: {size} units" ) else: logger.warning( @@ -7184,7 +7308,7 @@ class TradingOrchestrator: if positions_closed > 0: logger.info( - f"✅ Closed {positions_closed} open positions during session clear" + f"Closed {positions_closed} open positions during session clear" ) else: logger.debug("No open positions to close")