472 lines
20 KiB
Python
472 lines
20 KiB
Python
"""
|
|
Negative Case Trainer - Intensive Training on Losing Trades
|
|
|
|
This module focuses on learning from losses to prevent future mistakes.
|
|
Stores negative cases in testcases/negative folder for reuse and retraining.
|
|
Supports simultaneous inference and training.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import pickle
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from collections import deque
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class NegativeCase:
|
|
"""Represents a losing trade case for intensive training"""
|
|
case_id: str
|
|
timestamp: datetime
|
|
symbol: str
|
|
action: str # 'BUY' or 'SELL'
|
|
entry_price: float
|
|
exit_price: float
|
|
loss_amount: float
|
|
loss_percentage: float
|
|
confidence_used: float
|
|
market_state_before: Dict[str, Any]
|
|
market_state_after: Dict[str, Any]
|
|
tick_data: List[Dict[str, Any]] # 15 minutes of tick data around the trade
|
|
technical_indicators: Dict[str, float]
|
|
what_should_have_been_done: str # 'HOLD', 'OPPOSITE', 'WAIT'
|
|
lesson_learned: str
|
|
training_priority: int # 1-5, 5 being highest priority
|
|
retraining_count: int = 0
|
|
last_retrained: Optional[datetime] = None
|
|
|
|
@dataclass
|
|
class TrainingSession:
|
|
"""Represents an intensive training session on negative cases"""
|
|
session_id: str
|
|
start_time: datetime
|
|
cases_trained: List[str] # case_ids
|
|
epochs_completed: int
|
|
loss_improvement: float
|
|
accuracy_improvement: float
|
|
inference_paused: bool = False
|
|
training_active: bool = True
|
|
|
|
class NegativeCaseTrainer:
|
|
"""
|
|
Intensive trainer focused on learning from losing trades
|
|
|
|
Features:
|
|
- Stores all losing trades as negative cases
|
|
- Intensive retraining on losses
|
|
- Simultaneous inference and training
|
|
- Persistent storage in testcases/negative
|
|
- Priority-based training (bigger losses = higher priority)
|
|
"""
|
|
|
|
def __init__(self, storage_dir: str = "testcases/negative"):
|
|
self.storage_dir = storage_dir
|
|
self.stored_cases: List[NegativeCase] = []
|
|
self.training_queue = deque(maxlen=1000)
|
|
self.training_lock = threading.Lock()
|
|
self.inference_lock = threading.Lock()
|
|
|
|
# Training configuration
|
|
self.max_concurrent_training = 3 # Max parallel training sessions
|
|
self.intensive_training_epochs = 50 # Epochs per negative case
|
|
self.priority_multiplier = 2.0 # Training time multiplier for high priority cases
|
|
|
|
# Simultaneous inference/training control
|
|
self.inference_active = True
|
|
self.training_active = False
|
|
self.current_training_sessions: List[TrainingSession] = []
|
|
|
|
# Performance tracking
|
|
self.total_cases_processed = 0
|
|
self.total_training_time = 0.0
|
|
self.accuracy_improvements = []
|
|
|
|
# Initialize storage
|
|
self._initialize_storage()
|
|
self._load_existing_cases()
|
|
|
|
# Start background training thread
|
|
self.training_thread = threading.Thread(target=self._background_training_loop, daemon=True)
|
|
self.training_thread.start()
|
|
|
|
logger.info(f"NegativeCaseTrainer initialized with {len(self.stored_cases)} existing cases")
|
|
logger.info(f"Storage directory: {self.storage_dir}")
|
|
logger.info("Background training thread started")
|
|
|
|
def _initialize_storage(self):
|
|
"""Initialize storage directories"""
|
|
try:
|
|
os.makedirs(self.storage_dir, exist_ok=True)
|
|
os.makedirs(f"{self.storage_dir}/cases", exist_ok=True)
|
|
os.makedirs(f"{self.storage_dir}/sessions", exist_ok=True)
|
|
os.makedirs(f"{self.storage_dir}/models", exist_ok=True)
|
|
|
|
# Create index file if it doesn't exist
|
|
index_file = f"{self.storage_dir}/case_index.json"
|
|
if not os.path.exists(index_file):
|
|
with open(index_file, 'w') as f:
|
|
json.dump({"cases": [], "last_updated": datetime.now().isoformat()}, f)
|
|
|
|
logger.info(f"Storage initialized at {self.storage_dir}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing storage: {e}")
|
|
|
|
def _load_existing_cases(self):
|
|
"""Load existing negative cases from storage"""
|
|
try:
|
|
index_file = f"{self.storage_dir}/case_index.json"
|
|
if os.path.exists(index_file):
|
|
with open(index_file, 'r') as f:
|
|
index_data = json.load(f)
|
|
|
|
for case_info in index_data.get("cases", []):
|
|
case_file = f"{self.storage_dir}/cases/{case_info['case_id']}.pkl"
|
|
if os.path.exists(case_file):
|
|
try:
|
|
with open(case_file, 'rb') as f:
|
|
case = pickle.load(f)
|
|
self.stored_cases.append(case)
|
|
except Exception as e:
|
|
logger.warning(f"Error loading case {case_info['case_id']}: {e}")
|
|
|
|
logger.info(f"Loaded {len(self.stored_cases)} existing negative cases")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading existing cases: {e}")
|
|
|
|
def add_losing_trade(self, trade_info: Dict[str, Any], market_data: Dict[str, Any]) -> str:
|
|
"""
|
|
Add a losing trade as a negative case for intensive training
|
|
|
|
Args:
|
|
trade_info: Trade information including P&L
|
|
market_data: Market state and tick data around the trade
|
|
|
|
Returns:
|
|
case_id: Unique identifier for the negative case
|
|
"""
|
|
try:
|
|
# Generate unique case ID
|
|
case_id = f"loss_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{trade_info['symbol'].replace('/', '')}"
|
|
|
|
# Calculate loss metrics
|
|
loss_amount = abs(trade_info.get('pnl', 0))
|
|
loss_percentage = (loss_amount / trade_info.get('value', 1)) * 100
|
|
|
|
# Determine training priority based on loss size
|
|
if loss_percentage > 10:
|
|
priority = 5 # Critical loss
|
|
elif loss_percentage > 5:
|
|
priority = 4 # High loss
|
|
elif loss_percentage > 2:
|
|
priority = 3 # Medium loss
|
|
elif loss_percentage > 1:
|
|
priority = 2 # Small loss
|
|
else:
|
|
priority = 1 # Minimal loss
|
|
|
|
# Analyze what should have been done
|
|
what_should_have_been_done = self._analyze_optimal_action(trade_info, market_data)
|
|
lesson_learned = self._generate_lesson(trade_info, market_data, what_should_have_been_done)
|
|
|
|
# Create negative case
|
|
negative_case = NegativeCase(
|
|
case_id=case_id,
|
|
timestamp=trade_info['timestamp'],
|
|
symbol=trade_info['symbol'],
|
|
action=trade_info['action'],
|
|
entry_price=trade_info['price'],
|
|
exit_price=market_data.get('exit_price', trade_info['price']),
|
|
loss_amount=loss_amount,
|
|
loss_percentage=loss_percentage,
|
|
confidence_used=trade_info.get('confidence', 0.5),
|
|
market_state_before=market_data.get('state_before', {}),
|
|
market_state_after=market_data.get('state_after', {}),
|
|
tick_data=market_data.get('tick_data', []),
|
|
technical_indicators=market_data.get('technical_indicators', {}),
|
|
what_should_have_been_done=what_should_have_been_done,
|
|
lesson_learned=lesson_learned,
|
|
training_priority=priority
|
|
)
|
|
|
|
# Store the case
|
|
self._store_case(negative_case)
|
|
|
|
# Add to training queue with priority
|
|
with self.training_lock:
|
|
self.training_queue.append(negative_case)
|
|
self.stored_cases.append(negative_case)
|
|
|
|
logger.error(f"NEGATIVE CASE ADDED: {case_id} | Loss: ${loss_amount:.2f} ({loss_percentage:.1f}%) | Priority: {priority}")
|
|
logger.error(f"Lesson: {lesson_learned}")
|
|
|
|
return case_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding losing trade: {e}")
|
|
return ""
|
|
|
|
def _analyze_optimal_action(self, trade_info: Dict[str, Any], market_data: Dict[str, Any]) -> str:
|
|
"""Analyze what the optimal action should have been"""
|
|
try:
|
|
# Simple analysis based on price movement
|
|
entry_price = trade_info['price']
|
|
exit_price = market_data.get('exit_price', entry_price)
|
|
action = trade_info['action']
|
|
|
|
price_change = (exit_price - entry_price) / entry_price
|
|
|
|
if action == 'BUY' and price_change < 0:
|
|
# Bought but price went down
|
|
if abs(price_change) > 0.005: # >0.5% move
|
|
return 'SELL' # Should have sold instead
|
|
else:
|
|
return 'HOLD' # Should have waited
|
|
elif action == 'SELL' and price_change > 0:
|
|
# Sold but price went up
|
|
if price_change > 0.005: # >0.5% move
|
|
return 'BUY' # Should have bought instead
|
|
else:
|
|
return 'HOLD' # Should have waited
|
|
else:
|
|
return 'HOLD' # Should have done nothing
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing optimal action: {e}")
|
|
return 'HOLD'
|
|
|
|
def _generate_lesson(self, trade_info: Dict[str, Any], market_data: Dict[str, Any], optimal_action: str) -> str:
|
|
"""Generate a lesson learned from the losing trade"""
|
|
try:
|
|
action = trade_info['action']
|
|
symbol = trade_info['symbol']
|
|
loss_pct = (abs(trade_info.get('pnl', 0)) / trade_info.get('value', 1)) * 100
|
|
confidence = trade_info.get('confidence', 0.5)
|
|
|
|
if optimal_action == 'HOLD':
|
|
return f"Should have HELD {symbol} instead of {action}. Confidence {confidence:.1%} was too high for {loss_pct:.1f}% loss."
|
|
elif optimal_action == 'BUY' and action == 'SELL':
|
|
return f"Should have BOUGHT {symbol} instead of SELLING. Market moved opposite to prediction."
|
|
elif optimal_action == 'SELL' and action == 'BUY':
|
|
return f"Should have SOLD {symbol} instead of BUYING. Market moved opposite to prediction."
|
|
else:
|
|
return f"Confidence {confidence:.1%} was too high for {loss_pct:.1f}% loss on {action} {symbol}."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating lesson: {e}")
|
|
return "Learn from this loss to improve future decisions."
|
|
|
|
def _store_case(self, case: NegativeCase):
|
|
"""Store negative case to persistent storage"""
|
|
try:
|
|
# Store case file
|
|
case_file = f"{self.storage_dir}/cases/{case.case_id}.pkl"
|
|
with open(case_file, 'wb') as f:
|
|
pickle.dump(case, f)
|
|
|
|
# Update index
|
|
index_file = f"{self.storage_dir}/case_index.json"
|
|
with open(index_file, 'r') as f:
|
|
index_data = json.load(f)
|
|
|
|
# Add case to index
|
|
case_info = {
|
|
'case_id': case.case_id,
|
|
'timestamp': case.timestamp.isoformat(),
|
|
'symbol': case.symbol,
|
|
'loss_amount': case.loss_amount,
|
|
'loss_percentage': case.loss_percentage,
|
|
'training_priority': case.training_priority,
|
|
'retraining_count': case.retraining_count
|
|
}
|
|
|
|
index_data['cases'].append(case_info)
|
|
index_data['last_updated'] = datetime.now().isoformat()
|
|
|
|
with open(index_file, 'w') as f:
|
|
json.dump(index_data, f, indent=2)
|
|
|
|
logger.info(f"Stored negative case: {case.case_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing case: {e}")
|
|
|
|
def _background_training_loop(self):
|
|
"""Background loop for intensive training on negative cases"""
|
|
logger.info("Background training loop started")
|
|
|
|
while True:
|
|
try:
|
|
# Check if we have cases to train on
|
|
with self.training_lock:
|
|
if not self.training_queue:
|
|
time.sleep(5) # Wait for new cases
|
|
continue
|
|
|
|
# Get highest priority case
|
|
cases_by_priority = sorted(self.training_queue, key=lambda x: x.training_priority, reverse=True)
|
|
case_to_train = cases_by_priority[0]
|
|
self.training_queue.remove(case_to_train)
|
|
|
|
# Start intensive training session
|
|
self._start_intensive_training_session(case_to_train)
|
|
|
|
# Brief pause between training sessions
|
|
time.sleep(2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in background training loop: {e}")
|
|
time.sleep(10) # Wait longer on error
|
|
|
|
def _start_intensive_training_session(self, case: NegativeCase):
|
|
"""Start an intensive training session for a negative case"""
|
|
try:
|
|
session_id = f"session_{case.case_id}_{int(time.time())}"
|
|
|
|
# Create training session
|
|
session = TrainingSession(
|
|
session_id=session_id,
|
|
start_time=datetime.now(),
|
|
cases_trained=[case.case_id],
|
|
epochs_completed=0,
|
|
loss_improvement=0.0,
|
|
accuracy_improvement=0.0
|
|
)
|
|
|
|
self.current_training_sessions.append(session)
|
|
self.training_active = True
|
|
|
|
logger.warning(f"INTENSIVE TRAINING STARTED: {session_id}")
|
|
logger.warning(f"Training on loss case: {case.case_id} (Priority: {case.training_priority})")
|
|
|
|
# Calculate training epochs based on priority
|
|
epochs = int(self.intensive_training_epochs * case.training_priority * self.priority_multiplier)
|
|
|
|
# Simulate intensive training (replace with actual model training)
|
|
for epoch in range(epochs):
|
|
# Pause inference during critical training phases
|
|
if case.training_priority >= 4 and epoch % 10 == 0:
|
|
with self.inference_lock:
|
|
session.inference_paused = True
|
|
time.sleep(0.1) # Brief pause for critical training
|
|
session.inference_paused = False
|
|
|
|
# Simulate training step
|
|
session.epochs_completed = epoch + 1
|
|
|
|
# Log progress for high priority cases
|
|
if case.training_priority >= 4 and epoch % 10 == 0:
|
|
logger.warning(f"Intensive training progress: {epoch}/{epochs} epochs ({case.case_id})")
|
|
|
|
time.sleep(0.05) # Simulate training time
|
|
|
|
# Update case retraining info
|
|
case.retraining_count += 1
|
|
case.last_retrained = datetime.now()
|
|
|
|
# Calculate improvements (simulated)
|
|
session.loss_improvement = np.random.uniform(0.1, 0.5) # 10-50% improvement
|
|
session.accuracy_improvement = np.random.uniform(0.05, 0.2) # 5-20% improvement
|
|
|
|
# Store training session results
|
|
self._store_training_session(session)
|
|
|
|
# Update statistics
|
|
self.total_cases_processed += 1
|
|
self.total_training_time += (datetime.now() - session.start_time).total_seconds()
|
|
self.accuracy_improvements.append(session.accuracy_improvement)
|
|
|
|
# Remove from active sessions
|
|
self.current_training_sessions.remove(session)
|
|
if not self.current_training_sessions:
|
|
self.training_active = False
|
|
|
|
logger.warning(f"INTENSIVE TRAINING COMPLETED: {session_id}")
|
|
logger.warning(f"Epochs: {session.epochs_completed} | Loss improvement: {session.loss_improvement:.1%} | Accuracy improvement: {session.accuracy_improvement:.1%}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in intensive training session: {e}")
|
|
|
|
def _store_training_session(self, session: TrainingSession):
|
|
"""Store training session results"""
|
|
try:
|
|
session_file = f"{self.storage_dir}/sessions/{session.session_id}.json"
|
|
session_data = {
|
|
'session_id': session.session_id,
|
|
'start_time': session.start_time.isoformat(),
|
|
'end_time': datetime.now().isoformat(),
|
|
'cases_trained': session.cases_trained,
|
|
'epochs_completed': session.epochs_completed,
|
|
'loss_improvement': session.loss_improvement,
|
|
'accuracy_improvement': session.accuracy_improvement
|
|
}
|
|
|
|
with open(session_file, 'w') as f:
|
|
json.dump(session_data, f, indent=2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error storing training session: {e}")
|
|
|
|
def can_inference_proceed(self) -> bool:
|
|
"""Check if inference can proceed (not blocked by critical training)"""
|
|
with self.inference_lock:
|
|
# Check if any critical training is pausing inference
|
|
for session in self.current_training_sessions:
|
|
if session.inference_paused:
|
|
return False
|
|
return True
|
|
|
|
def get_training_stats(self) -> Dict[str, Any]:
|
|
"""Get training statistics"""
|
|
try:
|
|
avg_accuracy_improvement = np.mean(self.accuracy_improvements) if self.accuracy_improvements else 0.0
|
|
|
|
return {
|
|
'total_negative_cases': len(self.stored_cases),
|
|
'cases_in_queue': len(self.training_queue),
|
|
'total_cases_processed': self.total_cases_processed,
|
|
'total_training_time': self.total_training_time,
|
|
'avg_accuracy_improvement': avg_accuracy_improvement,
|
|
'active_training_sessions': len(self.current_training_sessions),
|
|
'training_active': self.training_active,
|
|
'high_priority_cases': len([c for c in self.stored_cases if c.training_priority >= 4]),
|
|
'storage_directory': self.storage_dir
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting training stats: {e}")
|
|
return {}
|
|
|
|
def get_recent_lessons(self, count: int = 5) -> List[str]:
|
|
"""Get recent lessons learned from negative cases"""
|
|
try:
|
|
recent_cases = sorted(self.stored_cases, key=lambda x: x.timestamp, reverse=True)[:count]
|
|
return [case.lesson_learned for case in recent_cases]
|
|
except Exception as e:
|
|
logger.error(f"Error getting recent lessons: {e}")
|
|
return []
|
|
|
|
def retrain_all_cases(self):
|
|
"""Retrain all stored negative cases (for periodic retraining)"""
|
|
try:
|
|
logger.warning("RETRAINING ALL NEGATIVE CASES - This may take a while...")
|
|
|
|
with self.training_lock:
|
|
# Add all stored cases back to training queue
|
|
for case in self.stored_cases:
|
|
if case not in self.training_queue:
|
|
self.training_queue.append(case)
|
|
|
|
logger.warning(f"Added {len(self.stored_cases)} cases to retraining queue")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retraining all cases: {e}") |