From 99386dbc504ac5d52ed5c13509706be3d40599ea Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Thu, 26 Jun 2025 17:51:48 +0300 Subject: [PATCH] better testcase managment, script fix --- core/trade_data_manager.py | 140 +++++++++++++++++++++++++++++- scripts/kill_stale_processes.py | 89 +++++++++++++------ web/clean_dashboard.py | 148 ++++++++++++++++++++------------ 3 files changed, 296 insertions(+), 81 deletions(-) diff --git a/core/trade_data_manager.py b/core/trade_data_manager.py index a63a4e1..b893233 100644 --- a/core/trade_data_manager.py +++ b/core/trade_data_manager.py @@ -36,7 +36,8 @@ class TradeDataManager: def _setup_directory_structure(self): """Setup the testcases directory structure""" try: - for case_type in ['positive', 'negative']: + # Create base directories including new 'base' directory for temporary trades + for case_type in ['positive', 'negative', 'base']: for subdir in ['cases', 'sessions', 'models']: dir_path = os.path.join(self.base_dir, case_type, subdir) os.makedirs(dir_path, exist_ok=True) @@ -543,4 +544,139 @@ class TradeDataManager: return [] except Exception as e: logger.debug(f"Error getting price history: {e}") - return [] \ No newline at end of file + return [] + + def store_base_trade_for_later_classification(self, trade_record: Dict[str, Any]) -> Optional[str]: + """Store opening trade as BASE case until position is closed and P&L is known""" + try: + # Store in base directory (temporary) + case_dir = os.path.join(self.base_dir, "base") + cases_dir = os.path.join(case_dir, "cases") + + # Create unique case ID for base case + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + symbol_clean = trade_record['symbol'].replace('/', '') + base_case_id = f"base_{timestamp}_{symbol_clean}_{trade_record['side']}" + + # Store comprehensive case data as pickle + case_filepath = os.path.join(cases_dir, f"{base_case_id}.pkl") + with open(case_filepath, 'wb') as f: + pickle.dump(trade_record, f) + + # Store JSON summary + json_filepath = os.path.join(cases_dir, f"{base_case_id}.json") + json_summary = { + 'case_id': base_case_id, + 'timestamp': trade_record.get('timestamp_entry', datetime.now()).isoformat() if hasattr(trade_record.get('timestamp_entry'), 'isoformat') else str(trade_record.get('timestamp_entry')), + 'symbol': trade_record['symbol'], + 'side': trade_record['side'], + 'entry_price': trade_record['entry_price'], + 'leverage': trade_record.get('leverage', 1), + 'quantity': trade_record.get('quantity', 0), + 'trade_status': 'OPENING', + 'confidence': trade_record.get('confidence', 0), + 'trade_type': trade_record.get('trade_type', 'manual'), + 'training_ready': False, # Not ready until closed + 'feature_counts': { + 'market_state': len(trade_record.get('model_inputs_at_entry', {})), + 'cob_features': len(trade_record.get('cob_snapshot_at_entry', {})) + } + } + + with open(json_filepath, 'w') as f: + json.dump(json_summary, f, indent=2, default=str) + + logger.info(f"Stored base case for later classification: {base_case_id}") + return base_case_id + + except Exception as e: + logger.error(f"Error storing base trade: {e}") + return None + + def move_base_trade_to_outcome(self, base_case_id: str, closing_trade_record: Dict[str, Any], is_positive: bool) -> Optional[str]: + """Move base case to positive/negative based on trade outcome""" + try: + # Load the original base case + base_case_path = os.path.join(self.base_dir, "base", "cases", f"{base_case_id}.pkl") + base_json_path = os.path.join(self.base_dir, "base", "cases", f"{base_case_id}.json") + + if not os.path.exists(base_case_path): + logger.warning(f"Base case not found: {base_case_id}") + return None + + # Load opening trade data + with open(base_case_path, 'rb') as f: + opening_trade_data = pickle.load(f) + + # Combine opening and closing data + combined_trade_record = { + **opening_trade_data, # Opening snapshot + **closing_trade_record, # Closing snapshot + 'opening_data': opening_trade_data, + 'closing_data': closing_trade_record, + 'trade_complete': True + } + + # Determine target directory + case_type = "positive" if is_positive else "negative" + case_dir = os.path.join(self.base_dir, case_type) + cases_dir = os.path.join(case_dir, "cases") + + # Create new case ID for final outcome + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + symbol_clean = closing_trade_record['symbol'].replace('/', '') + pnl_leveraged = closing_trade_record.get('pnl_leveraged', 0) + final_case_id = f"{case_type}_{timestamp}_{symbol_clean}_pnl_{pnl_leveraged:.4f}".replace('.', 'p').replace('-', 'neg') + + # Store final case data + final_case_filepath = os.path.join(cases_dir, f"{final_case_id}.pkl") + with open(final_case_filepath, 'wb') as f: + pickle.dump(combined_trade_record, f) + + # Store JSON summary + final_json_filepath = os.path.join(cases_dir, f"{final_case_id}.json") + json_summary = { + 'case_id': final_case_id, + 'original_base_case_id': base_case_id, + 'timestamp_opened': str(opening_trade_data.get('timestamp_entry', '')), + 'timestamp_closed': str(closing_trade_record.get('timestamp_exit', '')), + 'symbol': closing_trade_record['symbol'], + 'side_opened': opening_trade_data['side'], + 'side_closed': closing_trade_record['side'], + 'entry_price': opening_trade_data['entry_price'], + 'exit_price': closing_trade_record['exit_price'], + 'leverage': closing_trade_record.get('leverage', 1), + 'quantity': closing_trade_record.get('quantity', 0), + 'pnl_raw': closing_trade_record.get('pnl_raw', 0), + 'pnl_leveraged': pnl_leveraged, + 'trade_type': closing_trade_record.get('trade_type', 'manual'), + 'training_ready': True, + 'complete_trade_pair': True, + 'feature_counts': { + 'opening_market_state': len(opening_trade_data.get('model_inputs_at_entry', {})), + 'opening_cob_features': len(opening_trade_data.get('cob_snapshot_at_entry', {})), + 'closing_market_state': len(closing_trade_record.get('model_inputs_at_exit', {})), + 'closing_cob_features': len(closing_trade_record.get('cob_snapshot_at_exit', {})) + } + } + + with open(final_json_filepath, 'w') as f: + json.dump(json_summary, f, indent=2, default=str) + + # Update case index + self._update_case_index(case_dir, final_case_id, json_summary, case_type) + + # Clean up base case files + try: + os.remove(base_case_path) + os.remove(base_json_path) + logger.debug(f"Cleaned up base case files: {base_case_id}") + except Exception as e: + logger.warning(f"Error cleaning up base case files: {e}") + + logger.info(f"Moved base case to {case_type}: {final_case_id}") + return final_case_id + + except Exception as e: + logger.error(f"Error moving base trade to outcome: {e}") + return None \ No newline at end of file diff --git a/scripts/kill_stale_processes.py b/scripts/kill_stale_processes.py index f413689..92ec71c 100644 --- a/scripts/kill_stale_processes.py +++ b/scripts/kill_stale_processes.py @@ -11,9 +11,27 @@ import sys import time import signal from pathlib import Path +import threading + +# Global timeout flag +timeout_reached = False + +def timeout_handler(): + """Handler for overall script timeout""" + global timeout_reached + timeout_reached = True + print("\n⚠️ WARNING: Script timeout reached (10s) - forcing exit") + os._exit(0) # Force exit def kill_stale_processes(): """Kill stale trading dashboard processes safely""" + global timeout_reached + + # Set up overall timeout (10 seconds) + timer = threading.Timer(10.0, timeout_handler) + timer.daemon = True + timer.start() + try: import psutil except ImportError: @@ -33,9 +51,15 @@ def kill_stale_processes(): try: print("Scanning for stale processes...") - # Get all Python processes + # Get all Python processes with timeout python_processes = [] + scan_start = time.time() + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + if timeout_reached or (time.time() - scan_start) > 3.0: # 3s max for scanning + print("Process scanning timeout - proceeding with found processes") + break + try: if proc.info['pid'] == current_pid: continue @@ -58,15 +82,24 @@ def kill_stale_processes(): if not python_processes: print("No stale processes found") + timer.cancel() # Cancel the timeout return True print(f"Found {len(python_processes)} target processes to terminate:") - for p in python_processes: + for p in python_processes[:5]: # Show max 5 to save time print(f" - PID {p['pid']}: {p['name']} - {p['cmdline'][:80]}...") + if len(python_processes) > 5: + print(f" ... and {len(python_processes) - 5} more") - # Graceful termination first + # Graceful termination first (with reduced wait time) print("\nAttempting graceful termination...") + termination_start = time.time() + for p in python_processes: + if timeout_reached or (time.time() - termination_start) > 2.0: + print("Termination timeout - moving to force kill") + break + try: proc = p['proc'] if proc.is_running(): @@ -75,12 +108,18 @@ def kill_stale_processes(): except Exception as e: failed_processes.append(f"Failed to terminate PID {p['pid']}: {e}") - # Wait for graceful shutdown - time.sleep(2.0) + # Wait for graceful shutdown (reduced from 2.0 to 1.0) + time.sleep(1.0) # Force kill remaining processes print("\nChecking for remaining processes...") + kill_start = time.time() + for p in python_processes: + if timeout_reached or (time.time() - kill_start) > 2.0: + print("Force kill timeout - exiting") + break + try: proc = p['proc'] if proc.is_running(): @@ -94,23 +133,18 @@ def kill_stale_processes(): except Exception as e: failed_processes.append(f"Failed to kill PID {p['pid']}: {e}") - # Results - print(f"\n=== Process Cleanup Results ===") - if killed_processes: - print(f"Successfully cleaned up {len(killed_processes)} processes:") - for msg in killed_processes: - print(f" ✓ {msg}") - + # Results (quick summary) + print(f"\n=== Quick Results ===") + print(f"✓ Cleaned up {len(killed_processes)} processes") if failed_processes: - print(f"\nFailed to clean up {len(failed_processes)} processes:") - for msg in failed_processes: - print(f" ✗ {msg}") + print(f"✗ Failed: {len(failed_processes)} processes") - print(f"\nCleanup completed. {len(killed_processes)} processes terminated.") + timer.cancel() # Cancel the timeout if we finished early return len(failed_processes) == 0 except Exception as e: print(f"Error during process cleanup: {e}") + timer.cancel() return False def kill_stale_fallback(): @@ -120,10 +154,10 @@ def kill_stale_fallback(): try: if os.name == 'nt': # Windows import subprocess - # Kill Python processes with dashboard keywords + # Kill Python processes with dashboard keywords (with timeout) result = subprocess.run([ 'taskkill', '/f', '/im', 'python.exe' - ], capture_output=True, text=True) + ], capture_output=True, text=True, timeout=5.0) if result.returncode == 0: print("Windows: Killed all Python processes") @@ -132,25 +166,32 @@ def kill_stale_fallback(): else: # Unix/Linux import subprocess - # More targeted approach for Unix - subprocess.run(['pkill', '-f', 'dashboard'], capture_output=True) - subprocess.run(['pkill', '-f', 'scalping'], capture_output=True) - subprocess.run(['pkill', '-f', 'tensorboard'], capture_output=True) + # More targeted approach for Unix (with timeouts) + subprocess.run(['pkill', '-f', 'dashboard'], capture_output=True, timeout=2.0) + subprocess.run(['pkill', '-f', 'scalping'], capture_output=True, timeout=2.0) + subprocess.run(['pkill', '-f', 'tensorboard'], capture_output=True, timeout=2.0) print("Unix: Killed dashboard-related processes") return True + except subprocess.TimeoutExpired: + print("Fallback method timed out") + return False except Exception as e: print(f"Fallback method failed: {e}") return False if __name__ == "__main__": print("=" * 50) - print("STALE PROCESS CLEANUP") + print("STALE PROCESS CLEANUP (10s timeout)") print("=" * 50) + start_time = time.time() success = kill_stale_processes() + elapsed = time.time() - start_time + exit_code = 0 if success else 1 + print(f"Completed in {elapsed:.1f}s") print("=" * 50) - sys.exit(exit_code) \ No newline at end of file + sys.exit(exit_code) \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index a208b73..87f422e 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -138,6 +138,7 @@ class CleanTradingDashboard: self.current_leverage = 50 # Default x50 leverage self.min_leverage = 1 self.max_leverage = 100 + self.pending_trade_case_id = None # For tracking opening trades until closure # WebSocket streaming self.ws_price_cache = {} @@ -2026,73 +2027,109 @@ class CleanTradingDashboard: 'training_ready': True } + # APPLY LEVERAGE TO P&L for display and storage + raw_pnl = latest_trade.pnl + leveraged_pnl = raw_pnl * self.current_leverage + + # Update trade record with leveraged P&L + trade_record['pnl_raw'] = raw_pnl + trade_record['pnl_leveraged'] = leveraged_pnl + trade_record['leverage_used'] = self.current_leverage + + # Update latest_trade P&L for display + latest_trade.pnl = leveraged_pnl + + # Add leveraged P&L to session total + self.session_pnl += leveraged_pnl + # Only add if not already in closed_trades if not any(t.get('entry_time') == trade_record['entry_time'] for t in self.closed_trades): self.closed_trades.append(trade_record) - logger.info(f"Added completed trade to closed_trades: {action} P&L ${latest_trade.pnl:.2f}") + logger.info(f"Added completed trade to closed_trades: {action} P&L ${leveraged_pnl:.2f} (raw: ${raw_pnl:.2f}, leverage: x{self.current_leverage})") + + # MOVE BASE CASE TO POSITIVE/NEGATIVE based on leveraged outcome + if hasattr(self, 'pending_trade_case_id') and self.pending_trade_case_id: + try: + # Capture closing snapshot + closing_model_inputs = self._get_comprehensive_market_state(symbol, current_price) + closing_cob_snapshot = self._capture_cob_snapshot_for_training(symbol, current_price) + + closing_trade_record = { + 'symbol': symbol, + 'side': action, + 'quantity': latest_trade.quantity, + 'exit_price': current_price, + 'leverage': self.current_leverage, + 'pnl_raw': raw_pnl, + 'pnl_leveraged': leveraged_pnl, + 'confidence': 1.0, + 'trade_type': 'manual', + 'model_inputs_at_exit': closing_model_inputs, + 'cob_snapshot_at_exit': closing_cob_snapshot, + 'timestamp_exit': datetime.now(), + 'training_ready': True, + 'trade_status': 'CLOSED' + } + + # Move from base to positive/negative based on leveraged outcome + outcome_case_id = trade_data_manager.move_base_trade_to_outcome( + self.pending_trade_case_id, + closing_trade_record, + leveraged_pnl >= 0 + ) + if outcome_case_id: + logger.info(f"Trade moved from base to {'positive' if leveraged_pnl >= 0 else 'negative'}: {outcome_case_id}") + + # TRIGGER TRAINING on completed trade pair (opening + closing) + try: + from core.training_integration import TrainingIntegration + training_integration = TrainingIntegration(self.orchestrator) + + training_success = training_integration.trigger_cold_start_training( + closing_trade_record, outcome_case_id + ) + if training_success: + logger.info(f"Retrospective RL training completed for trade pair (P&L: ${leveraged_pnl:.3f})") + else: + logger.warning(f"Retrospective RL training failed for trade pair") + except Exception as e: + logger.warning(f"Failed to trigger retrospective RL training: {e}") + + # Clear pending case ID + self.pending_trade_case_id = None + + except Exception as e: + logger.warning(f"Failed to move base case to outcome: {e}") + else: + logger.debug("No pending trade case ID found - this may be a position opening") - # Store for cold start training when trade closes using core TradeDataManager + # Store OPENING trade as BASE case (temporary) - will be moved to positive/negative when closed try: - case_id = trade_data_manager.store_trade_for_training({ + opening_trade_record = { 'symbol': symbol, 'side': action, - 'quantity': 0.01, + 'quantity': size, 'entry_price': current_price, + 'leverage': self.current_leverage, # Store leverage at entry 'pnl': 0.0, # Will be updated when position closes 'confidence': 1.0, 'trade_type': 'manual', 'model_inputs_at_entry': model_inputs, - 'training_ready': True - }) - if case_id: - logger.info(f"Trade stored for training with case ID: {case_id}") - except Exception as e: - logger.warning(f"Failed to store trade for training: {e}") - - # Store for cold start training when trade closes using core TradeDataManager - try: - case_id = trade_data_manager.store_trade_for_training(trade_record) - if case_id: - logger.info(f"Trade stored for training with case ID: {case_id}") - except Exception as e: - logger.warning(f"Failed to store trade for training: {e}") - - # Update session metrics - if action == 'BUY': - self.session_pnl += 0.0 # No immediate P&L for entry - else: # SELL - # For demo purposes, simulate small positive P&L - demo_pnl = 0.05 # $0.05 demo profit - self.session_pnl += demo_pnl - trade_record['pnl'] = demo_pnl + 'cob_snapshot_at_entry': cob_snapshot, + 'timestamp_entry': datetime.now(), + 'training_ready': False, # Not ready until closed + 'trade_status': 'OPENING' + } - # TRIGGER RETROSPECTIVE RL TRAINING (NO HOLD SIGNALS) - # Only train on BUY/SELL actions with meaningful outcomes - if action in ['BUY', 'SELL'] and case_id: - try: - from core.training_integration import TrainingIntegration - training_integration = TrainingIntegration(self.orchestrator) - - # Enhanced trade record with COB data for RL loop - enhanced_trade_record = trade_record.copy() - enhanced_trade_record.update({ - 'cob_data_available': bool(cob_snapshot), - 'training_priority': 'HIGH' if abs(demo_pnl) > 0.1 else 'NORMAL', - 'signal_type': 'BUY_SELL_ONLY', # No HOLD signals - 'model_inputs_complete': bool(model_inputs) - }) - - training_success = training_integration.trigger_cold_start_training( - enhanced_trade_record, case_id - ) - if training_success: - logger.info(f"Retrospective RL training completed for {action} trade (P&L: ${demo_pnl:.3f})") - else: - logger.warning(f"Retrospective RL training failed for {action} trade") - except Exception as e: - logger.warning(f"Failed to trigger retrospective RL training: {e}") - else: - logger.debug(f"Skipped training for {action} - only BUY/SELL signals are trained") + # Store as BASE case (temporary) using special base directory + base_case_id = trade_data_manager.store_base_trade_for_later_classification(opening_trade_record) + if base_case_id: + logger.info(f"Opening trade stored as base case: {base_case_id}") + # Store the base case ID for when we close the position + self.pending_trade_case_id = base_case_id + except Exception as e: + logger.warning(f"Failed to store opening trade as base case: {e}") + self.pending_trade_case_id = None else: decision['executed'] = False @@ -2418,8 +2455,9 @@ class CleanTradingDashboard: self.ws_price_cache = {} self.current_prices = {} - # Clear current position + # Clear current position and pending trade tracking self.current_position = None + self.pending_trade_case_id = None # Clear pending trade tracking logger.info("Session data cleared")