remove model name mappings

This commit is contained in:
Dobromir Popov
2025-08-10 22:59:20 +03:00
parent 8738f02d24
commit 9c1ba6dbe2

View File

@@ -18,7 +18,7 @@ import threading
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple, Union
from typing import Dict, List, Optional, Any, Tuple, Union, Deque
from dataclasses import dataclass, field
from collections import deque
import json
@@ -325,6 +325,13 @@ class TradingOrchestrator:
"DECISION": ["decision_fusion", "decision"],
}
# Recent inference buffer for vector supervision (configurable length)
self.recent_inference_maxlen: int = self.config.orchestrator.get(
"recent_inference_buffer", 10
)
# Model name -> deque of recent inference records
self.recent_inferences: Dict[str, Deque[Dict]] = {}
# Configuration - AGGRESSIVE for more training data
self.confidence_threshold = self.config.orchestrator.get(
"confidence_threshold", 0.15
@@ -1166,53 +1173,64 @@ class TradingOrchestrator:
price_direction_pred: torch.Tensor,
rewards: torch.Tensor,
actions: torch.Tensor,
) -> torch.Tensor:
target_vector: Optional[Dict[str, float]] = None,
) -> Optional[torch.Tensor]:
"""
Calculate price direction loss for CNN model
Calculate price direction loss for CNN model.
If target_vector is provided, perform supervised regression towards the
explicit direction/confidence. Otherwise, derive weak targets from
rewards and actions.
Args:
price_direction_pred: Tensor of shape [batch, 2] containing [direction, confidence]
rewards: Tensor of shape [batch] containing rewards
actions: Tensor of shape [batch] containing actions
price_direction_pred: [batch, 2] = [direction, confidence]
rewards: [batch]
actions: [batch]
target_vector: Optional dict {'direction': float, 'confidence': float}
Returns:
Price direction loss tensor
Loss tensor or None.
"""
try:
if price_direction_pred.size(1) != 2:
return None
batch_size = price_direction_pred.size(0)
direction_pred = price_direction_pred[:, 0]
confidence_pred = price_direction_pred[:, 1]
# Extract direction and confidence predictions
direction_pred = price_direction_pred[:, 0] # -1 to 1
confidence_pred = price_direction_pred[:, 1] # 0 to 1
# Supervised targets from explicit vector if available
if target_vector and isinstance(target_vector, dict):
try:
t_dir = float(target_vector.get("direction", 0.0))
t_conf = float(target_vector.get("confidence", 0.0))
direction_targets = torch.full(
(batch_size,), t_dir, device=price_direction_pred.device, dtype=direction_pred.dtype
)
confidence_targets = torch.full(
(batch_size,), t_conf, device=price_direction_pred.device, dtype=confidence_pred.dtype
)
dir_loss = nn.MSELoss()(direction_pred, direction_targets)
conf_loss = nn.MSELoss()(confidence_pred, confidence_targets)
return dir_loss + 0.3 * conf_loss
except Exception:
# Fall back to weak supervision below
pass
# Create targets based on rewards and actions
# Weak supervision from rewards/actions
with torch.no_grad():
# Direction targets: 1 if reward > 0 and action is BUY, -1 if reward > 0 and action is SELL, 0 otherwise
direction_targets = torch.zeros(
batch_size, device=price_direction_pred.device
)
direction_targets = torch.zeros(batch_size, device=price_direction_pred.device)
for i in range(batch_size):
if rewards[i] > 0.01: # Positive reward threshold
if actions[i] == 0: # BUY action
direction_targets[i] = 1.0 # UP
elif actions[i] == 1: # SELL action
direction_targets[i] = -1.0 # DOWN
# else: targets remain 0 (sideways)
# Confidence targets: based on reward magnitude (higher reward = higher confidence)
if rewards[i] > 0.01:
if actions[i] == 0: # BUY
direction_targets[i] = 1.0
elif actions[i] == 1: # SELL
direction_targets[i] = -1.0
confidence_targets = torch.abs(rewards).clamp(0, 1)
# Calculate losses for each component
direction_loss = nn.MSELoss()(direction_pred, direction_targets)
confidence_loss = nn.MSELoss()(confidence_pred, confidence_targets)
# Combined loss (direction is more important than confidence)
total_loss = direction_loss + 0.3 * confidence_loss
return total_loss
dir_loss = nn.MSELoss()(direction_pred, direction_targets)
conf_loss = nn.MSELoss()(confidence_pred, confidence_targets)
return dir_loss + 0.3 * conf_loss
except Exception as e:
logger.debug(f"Error calculating CNN price direction loss: {e}")
@@ -1365,13 +1383,13 @@ class TradingOrchestrator:
if hasattr(callback, "clear_session"):
callback.clear_session()
logger.info("Orchestrator session data cleared")
logger.info("Orchestrator session data cleared")
logger.info("🧠 Model states preserved for continued training")
logger.info("📊 Prediction history cleared")
logger.info("💼 Position tracking reset")
except Exception as e:
logger.error(f"Error clearing orchestrator session data: {e}")
logger.error(f"Error clearing orchestrator session data: {e}")
def sync_model_states_with_dashboard(self):
"""Sync model states with current dashboard values"""
@@ -2931,6 +2949,14 @@ class TradingOrchestrator:
# Store only the last inference per model (for immediate training)
self.last_inference[model_name] = inference_record
# Push into in-memory recent buffer immediately
try:
if model_name not in self.recent_inferences:
self.recent_inferences[model_name] = deque(maxlen=self.recent_inference_maxlen)
self.recent_inferences[model_name].append(inference_record)
except Exception as e:
logger.debug(f"Unable to append to recent buffer for {model_name}: {e}")
# Also save to database using database manager for future training and analysis
asyncio.create_task(
self._save_to_database_manager_async(model_name, inference_record)
@@ -3022,6 +3048,8 @@ class TradingOrchestrator:
# Run database operation in thread pool to avoid blocking
await asyncio.get_event_loop().run_in_executor(None, save_to_db)
# Note: in-memory recent buffer is appended in _store_inference_data_async
def get_last_inference_status(self) -> Dict[str, Any]:
"""Get status of last inferences for all models"""
status = {}
@@ -3299,6 +3327,15 @@ class TradingOrchestrator:
f"Triggering immediate training for {model_name} with current price: {current_price}"
)
# Before evaluating the single record, compute a short-horizon direction vector
# from recent inferences and attach to the prediction for vector supervision.
try:
vector = self._compute_recent_direction_vector(model_name, symbol)
if vector is not None:
inference_record.setdefault("prediction", {})["price_direction"] = vector
except Exception as e:
logger.debug(f"Vector computation failed for {model_name}: {e}")
# Evaluate the previous prediction and train the model immediately
await self._evaluate_and_train_on_record(inference_record, current_price)
@@ -3441,7 +3478,7 @@ class TradingOrchestrator:
): # Price stayed stable
was_correct = True
outcome_status = "CORRECT" if was_correct else "INCORRECT"
outcome_status = "CORRECT" if was_correct else "INCORRECT"
# Get model statistics for enhanced logging
model_stats = self.get_model_statistics(model_name)
@@ -4027,6 +4064,86 @@ class TradingOrchestrator:
logger.error(f"Error calculating price vector bonus: {e}")
return 0.0
def _compute_recent_direction_vector(self, model_name: str, symbol: str) -> Optional[Dict[str, float]]:
"""
Compute a price direction vector from recent stored inferences by comparing
current price with prices at the times of those inferences.
Returns a dict: {'direction': float in [-1,1], 'confidence': float in [0,1]}
"""
try:
from statistics import median
recent = self.recent_inferences.get(model_name)
if not recent or len(recent) < 2:
return None
# Gather tuples (delta_pct, age_seconds) for last N inferences with stored price
deltas = []
now_price = self._get_current_price(symbol)
if now_price is None or now_price <= 0:
return None
for rec in list(recent):
infer_price = rec.get("inference_price")
ts = rec.get("timestamp")
if isinstance(ts, str):
try:
ts = datetime.fromisoformat(ts)
except Exception:
ts = None
if infer_price is None or infer_price <= 0 or ts is None:
continue
pct = (now_price - infer_price) / infer_price * 100.0
age_sec = max(1.0, (datetime.now() - ts).total_seconds())
deltas.append((pct, age_sec))
if not deltas:
return None
# Weight recent observations more: weight = 1 / sqrt(age_seconds)
weighted_sum = 0.0
weight_total = 0.0
magnitudes = []
for pct, age in deltas:
w = 1.0 / (age ** 0.5)
weighted_sum += pct * w
weight_total += w
magnitudes.append(abs(pct))
if weight_total <= 0:
return None
avg_pct = weighted_sum / weight_total # signed percentage
# Map avg_pct to direction in [-1, 1] using tanh on scaled percent (2% -> ~1)
scale = 2.0
direction = float(np.tanh(avg_pct / scale))
# Confidence combines recency, agreement, and magnitude
# Use normalized median magnitude capped at 2%
med_mag = median(magnitudes) if magnitudes else 0.0
mag_norm = max(0.0, min(1.0, med_mag / 2.0))
# Agreement: fraction of deltas with the same sign as avg_pct
if avg_pct > 0:
agree = sum(1 for pct, _ in deltas if pct > 0) / len(deltas)
elif avg_pct < 0:
agree = sum(1 for pct, _ in deltas if pct < 0) / len(deltas)
else:
agree = 0.5
# Recency: average weight normalized
recency = max(0.0, min(1.0, (weight_total / len(deltas)) * (1.0 / (1.0 ** 0.5))))
confidence = float(max(0.0, min(1.0, 0.5 * agree + 0.4 * mag_norm + 0.1 * recency)))
return {"direction": direction, "confidence": confidence}
except Exception as e:
logger.debug(f"Error computing recent direction vector for {model_name}: {e}")
return None
async def _train_model_on_outcome(
self,
record: Dict,
@@ -4556,8 +4673,15 @@ class TradingOrchestrator:
price_direction_pred is not None
and price_direction_pred.shape[0] > 0
):
# Supervised vector target from recent inferences if available
vector_target = None
try:
vector_target = self._compute_recent_direction_vector(model_name, symbol)
except Exception:
vector_target = None
price_direction_loss = self._calculate_cnn_price_direction_loss(
price_direction_pred, reward_tensor, action_tensor
price_direction_pred, reward_tensor, action_tensor, vector_target
)
if price_direction_loss is not None:
total_loss = total_loss + 0.2 * price_direction_loss
@@ -7167,7 +7291,7 @@ class TradingOrchestrator:
if result and result.get("success"):
positions_closed += 1
logger.info(
f"Closed {side} position for {symbol}: {size} units"
f"Closed {side} position for {symbol}: {size} units"
)
else:
logger.warning(
@@ -7184,7 +7308,7 @@ class TradingOrchestrator:
if positions_closed > 0:
logger.info(
f"Closed {positions_closed} open positions during session clear"
f"Closed {positions_closed} open positions during session clear"
)
else:
logger.debug("No open positions to close")