From 477e5dca3942d3665eb5d8b612f73749b61e8c9f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 24 May 2025 00:59:29 +0300 Subject: [PATCH] combined edits --- NN/utils/data_interface.py | 2 +- add_test_trades.py | 2 +- generate_trades.py | 2 +- main.py | 6 +- test_positions.py | 116 ++++ test_timestamps.py | 2 +- trading_main.py | 2 +- train_cnn_with_realtime.py | 2 +- train_dqn.py | 2 +- train_hybrid.py | 4 +- train_hybrid_fixed.py | 1247 ++++++++++++++++++++++++++++++++++++ train_improved_rl.py | 2 +- train_rl_with_realtime.py | 4 +- 13 files changed, 1378 insertions(+), 15 deletions(-) create mode 100644 test_positions.py create mode 100644 train_hybrid_fixed.py diff --git a/NN/utils/data_interface.py b/NN/utils/data_interface.py index 653c8be..7fd59c7 100644 --- a/NN/utils/data_interface.py +++ b/NN/utils/data_interface.py @@ -21,7 +21,7 @@ if project_root not in sys.path: sys.path.append(project_root) # Import BinanceHistoricalData from the root module -from realtime import BinanceHistoricalData +from dataprovider_realtime import BinanceHistoricalData logger = logging.getLogger(__name__) diff --git a/add_test_trades.py b/add_test_trades.py index ad43d5d..d9c5dcf 100644 --- a/add_test_trades.py +++ b/add_test_trades.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta import random import time -from realtime import RealTimeChart +from dataprovider_realtime import RealTimeChart # Create a standalone chart instance chart = RealTimeChart('BTC/USDT') diff --git a/generate_trades.py b/generate_trades.py index 0625498..433e8fb 100644 --- a/generate_trades.py +++ b/generate_trades.py @@ -1,5 +1,5 @@ from datetime import datetime -from realtime import RealTimeChart +from dataprovider_realtime import RealTimeChart chart = RealTimeChart('BTC/USDT') diff --git a/main.py b/main.py index 3761276..50653b6 100644 --- a/main.py +++ b/main.py @@ -29,7 +29,7 @@ from PIL import Image import matplotlib.pyplot as mpf import matplotlib.gridspec as gridspec import datetime -from realtime import BinanceWebSocket, BinanceHistoricalData +from dataprovider_realtime import BinanceWebSocket, BinanceHistoricalData from datetime import datetime as dt # Add Dash-related imports import dash @@ -376,7 +376,7 @@ def main(): # Initialize real-time charts and data interfaces try: - from realtime import RealTimeChart + from dataprovider_realtime import RealTimeChart # Create a real-time chart for each symbol charts = {} @@ -1152,7 +1152,7 @@ from PIL import Image import matplotlib.pyplot as mpf import matplotlib.gridspec as gridspec import datetime -from realtime import BinanceWebSocket, BinanceHistoricalData +from dataprovider_realtime import BinanceWebSocket, BinanceHistoricalData from datetime import datetime as dt # Add Dash-related imports import dash diff --git a/test_positions.py b/test_positions.py new file mode 100644 index 0000000..87afb20 --- /dev/null +++ b/test_positions.py @@ -0,0 +1,116 @@ +from NN.environments.trading_env import TradingEnvironment +import logging +import numpy as np +import pandas as pd +import os +import sys +from datetime import datetime, timedelta + +# Add the project root directory to the path +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Create a mock data interface class +class MockDataInterface: + def __init__(self, symbol, timeframes): + self.symbol = symbol + self.timeframes = timeframes + self.dataframes = {} + + # Create mock data for each timeframe + for tf in timeframes: + self.dataframes[tf] = self._create_mock_data(tf) + + def _create_mock_data(self, timeframe): + # Generate timestamps + end_time = datetime.now() + if timeframe == '1m': + start_time = end_time - timedelta(minutes=1000) + freq = 'T' # minute frequency + elif timeframe == '5m': + start_time = end_time - timedelta(minutes=5000) + freq = '5T' + else: # '15m' + start_time = end_time - timedelta(minutes=15000) + freq = '15T' + + dates = pd.date_range(start=start_time, end=end_time, freq=freq) + + # Create price data with some random walk behavior + np.random.seed(42) # For reproducibility + price = 1000.0 + prices = [price] + for _ in range(len(dates) - 1): + price = price * (1 + np.random.normal(0, 0.005)) # 0.5% daily volatility + prices.append(price) + + # Calculate OHLCV data + df = pd.DataFrame(index=dates) + df['close'] = prices + df['open'] = df['close'].shift(1).fillna(df['close'].iloc[0] * 0.999) + df['high'] = df['close'] * (1 + abs(np.random.normal(0, 0.001, len(df)))) + df['low'] = df['open'] * (1 - abs(np.random.normal(0, 0.001, len(df)))) + df['volume'] = np.random.normal(1000, 100, len(df)) + + return df + +# Create mock data interface +di = MockDataInterface('ETH/USDT', ['1m', '5m', '15m']) + +# Create environment +env = TradingEnvironment(di, initial_balance=1000.0, max_position=0.1) + +# Run multiple episodes to accumulate some trade history +for episode in range(3): + logger.info(f"Episode {episode+1}/3") + + # Reset environment + observation = env.reset() + + # Run episode + for step in range(100): + # Choose action: 0=Buy, 1=Sell, 2=Hold + # Use a more deliberate pattern to generate trades + if step % 10 == 0: + action = 0 # Buy + elif step % 10 == 5: + action = 1 # Sell + else: + action = 2 # Hold + + # Take action + observation, reward, done, info = env.step(action) + + # Print trade information if a trade was made + if 'trade_result' in info: + trade = info['trade_result'] + print(f"\nTrade executed:") + print(f"Action: {['BUY', 'SELL', 'HOLD'][trade['action']]}") + print(f"Price: {trade['price']:.4f}") + print(f"Position change: {trade['prev_position']:.4f} -> {trade['new_position']:.4f}") + print(f"Entry price: {trade.get('entry_price', 0):.4f}") + if trade.get('realized_pnl', 0) != 0: + print(f"Realized PnL: {trade['realized_pnl']:.4f}") + print(f"Balance: {trade['balance_before']:.2f} -> {trade['balance_after']:.2f}") + + # End episode if done + if done: + break + +# Render environment with final state +print("\n\nFinal environment state:") +env.render() + +# Print detailed information about the last 5 positions +positions = env.get_last_positions(5) +print("\nDetailed position history:") +for i, pos in enumerate(positions): + print(f"\nPosition {i+1}:") + for key, value in pos.items(): + if isinstance(value, float): + print(f" {key}: {value:.4f}") + else: + print(f" {key}: {value}") \ No newline at end of file diff --git a/test_timestamps.py b/test_timestamps.py index 6996a90..e0dc595 100644 --- a/test_timestamps.py +++ b/test_timestamps.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from realtime import RealTimeChart +from dataprovider_realtime import RealTimeChart # Create a chart instance chart = RealTimeChart('BTC/USDT') diff --git a/trading_main.py b/trading_main.py index f8a44ca..9505c84 100644 --- a/trading_main.py +++ b/trading_main.py @@ -72,7 +72,7 @@ def main(): # Initialize real-time charts and data interfaces try: - from realtime import RealTimeChart + from dataprovider_realtime import RealTimeChart # Create a real-time chart for each symbol charts = {} diff --git a/train_cnn_with_realtime.py b/train_cnn_with_realtime.py index 33ca713..740c189 100644 --- a/train_cnn_with_realtime.py +++ b/train_cnn_with_realtime.py @@ -35,7 +35,7 @@ logger = logging.getLogger('realtime_training') # Import the model and data interfaces from NN.models.cnn_model_pytorch import CNNModelPyTorch -from realtime import MultiTimeframeDataInterface +from dataprovider_realtime import MultiTimeframeDataInterface from NN.utils.signal_interpreter import SignalInterpreter # Global variables for graceful shutdown diff --git a/train_dqn.py b/train_dqn.py index cae85c1..9acff57 100644 --- a/train_dqn.py +++ b/train_dqn.py @@ -30,7 +30,7 @@ import train_config # Import key components from NN.models.dqn_agent import DQNAgent -from realtime import MultiTimeframeDataInterface +from dataprovider_realtime import MultiTimeframeDataInterface # Configure logging log_dir = Path("logs") diff --git a/train_hybrid.py b/train_hybrid.py index ee1d2a4..0a16b52 100644 --- a/train_hybrid.py +++ b/train_hybrid.py @@ -39,7 +39,7 @@ import train_config # Import key components from NN.models.cnn_model_pytorch import CNNModelPyTorch from NN.models.dqn_agent import DQNAgent -from realtime import MultiTimeframeDataInterface, RealTimeChart +from dataprovider_realtime import MultiTimeframeDataInterface, RealTimeChart from NN.utils.signal_interpreter import SignalInterpreter # Global variables for graceful shutdown @@ -241,7 +241,7 @@ class HybridModel: def _initialize_chart(self): """Initialize the RealTimeChart for visualization""" try: - from realtime import RealTimeChart + from dataprovider_realtime import RealTimeChart symbol = self.config['market_data']['symbol'] self.logger.info(f"Initializing RealTimeChart for {symbol}") diff --git a/train_hybrid_fixed.py b/train_hybrid_fixed.py new file mode 100644 index 0000000..c111b7b --- /dev/null +++ b/train_hybrid_fixed.py @@ -0,0 +1,1247 @@ +#!/usr/bin/env python +""" +Hybrid Training Script with Device Compatibility Fixes + +This is a fixed version of the hybrid training script that: +1. Forces CPU use to avoid CUDA/device mismatch errors +2. Adds better error handling and recovery for model initialization +3. Implements direct model movement to CPU + +Usage: + python train_hybrid_fixed.py --iterations 10 --sv-epochs 5 --rl-episodes 2 +""" + +import os +import sys +import logging +import argparse +import numpy as np +import torch +import time +import json +import asyncio +import signal +import threading +from datetime import datetime +from pathlib import Path +import matplotlib.pyplot as plt +from torch.utils.tensorboard import SummaryWriter +from torch import optim +import torch.nn.functional as F + +# Force CPU usage to avoid device mismatch errors +os.environ['CUDA_VISIBLE_DEVICES'] = '' +os.environ['DISABLE_MIXED_PRECISION'] = '1' +# Force PyTorch to use CPU +os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128' +os.environ['PYTORCH_JIT'] = '0' +# Disable CUDA completely in PyTorch +torch.cuda.is_available = lambda: False + +# Add project root to path if needed +project_root = os.path.dirname(os.path.abspath(__file__)) +if project_root not in sys.path: + sys.path.append(project_root) + +# Import configurations +import train_config + +# Import key components +from NN.models.cnn_model_pytorch import CNNModelPyTorch, CNNPyTorch +from NN.models.dqn_agent import DQNAgent +from dataprovider_realtime import MultiTimeframeDataInterface, RealTimeChart +from NN.utils.signal_interpreter import SignalInterpreter + +# Configure logging +log_dir = Path("logs") +log_dir.mkdir(exist_ok=True) +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +log_file = log_dir / f"hybrid_training_{timestamp}.log" + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ] +) +logger = logging.getLogger('hybrid_training') + +# Global variables for graceful shutdown +running = True +training_stats = { + "supervised": { + "epochs_completed": 0, + "best_val_pnl": -float('inf'), + "best_epoch": 0, + "best_win_rate": 0 + }, + "reinforcement": { + "episodes_completed": 0, + "best_reward": -float('inf'), + "best_episode": 0, + "best_win_rate": 0 + }, + "hybrid": { + "iterations_completed": 0, + "best_combined_score": -float('inf'), + "training_started": datetime.now().isoformat(), + "last_update": datetime.now().isoformat() + } +} + +# Configure signal handler for graceful shutdown +def signal_handler(sig, frame): + global running + logger.info("Received interrupt signal. Finishing current training cycle and saving models...") + running = False + +# Register signal handler +signal.signal(signal.SIGINT, signal_handler) + +class HybridModel: + """ + Hybrid model that combines supervised CNN learning with RL-based decision optimization + """ + def __init__(self, config): + self.config = config + + # Force CPU for all operations + config['hardware']['device'] = 'cpu' + config['hardware']['mixed_precision'] = False + + self.device = torch.device('cpu') + self.supervised_model = None + self.rl_agent = None + self.data_interface = None + self.signal_interpreter = None + self.chart = None + + # Training stats + self.tensorboard_writer = None + self.iter_count = 0 + self.supervised_epochs = 0 + self.rl_episodes = 0 + + # Initialize logging + self.logger = logging.getLogger('hybrid_model') + + # Paths + self.models_dir = Path(config['paths']['models_dir']) + self.models_dir.mkdir(exist_ok=True, parents=True) + + def initialize(self): + """Initialize all components of the hybrid model""" + # Set up TensorBoard + tb_dir = Path(self.config['paths']['tensorboard_dir']) + tb_dir.mkdir(exist_ok=True, parents=True) + log_dir = tb_dir / f"hybrid_{timestamp}" + self.tensorboard_writer = SummaryWriter(log_dir=str(log_dir)) + self.logger.info(f"TensorBoard initialized at {log_dir}") + + # Initialize data interface + symbol = self.config['market_data']['symbol'] + timeframes = self.config['market_data']['timeframes'] + window_size = self.config['market_data']['window_size'] + + self.logger.info(f"Initializing data interface for {symbol} with timeframes {timeframes}") + self.data_interface = MultiTimeframeDataInterface( + symbol=symbol, + timeframes=timeframes + ) + + # Initialize supervised model (CNN) + self._initialize_supervised_model(window_size) + + # Initialize RL agent + self._initialize_rl_agent(window_size) + + # Initialize signal interpreter + self.signal_interpreter = SignalInterpreter(config={ + 'buy_threshold': 0.65, + 'sell_threshold': 0.65, + 'hold_threshold': 0.75, + 'trend_filter_enabled': True, + 'volume_filter_enabled': True + }) + + # Initialize chart if visualization is enabled + if self.config.get('visualization', {}).get('enabled', False): + self._initialize_chart() + + return True + + def _initialize_supervised_model(self, window_size): + """Initialize the supervised CNN model""" + try: + # Get data shape information + X_train_dict, y_train, X_val_dict, y_val, _, _ = self.data_interface.prepare_training_data( + window_size=window_size, + refresh=True + ) + + if X_train_dict is None or y_train is None: + raise ValueError("Failed to load training data") + + # Get reference timeframe (lowest timeframe) + reference_tf = min( + self.config['market_data']['timeframes'], + key=lambda x: self.data_interface.timeframe_to_seconds.get(x, 3600) + ) + + # Get feature count from the data + features_per_tf = X_train_dict[reference_tf].shape[2] + total_features = features_per_tf * len(self.config['market_data']['timeframes']) + + # Initialize model + self.logger.info(f"Initializing CNN model with {total_features} features") + + self.supervised_model = CNNModelPyTorch( + window_size=window_size, + timeframes=self.config['market_data']['timeframes'], + output_size=3, # BUY/HOLD/SELL + num_pairs=1 # Single pair for now + ) + + # Create a new model instance with the correct input shape + if hasattr(self.supervised_model, 'model'): + # The underlying model needs to be recreated with the correct input shape + input_shape = (window_size, total_features) + # Force CPU device for this model + self.supervised_model.device = self.device + + # Create a new CNNPyTorch model on the CPU + new_model = CNNPyTorch(input_shape, self.supervised_model.output_size) + new_model.device = self.device + new_model.to(self.device) + + # Make sure class_weights tensor is on CPU + if hasattr(new_model, 'class_weights'): + new_model.class_weights = new_model.class_weights.to(self.device) + + # Replace the model + self.supervised_model.model = new_model + + # Reinitialize the optimizer + self.supervised_model.optimizer = optim.Adam( + self.supervised_model.model.parameters(), + lr=0.0001, + weight_decay=0.01 + ) + + # Initialize the criterion (missing in the model) + self.supervised_model.criterion = torch.nn.CrossEntropyLoss() + + # Ensure model is on CPU + self.supervised_model.device = self.device + if hasattr(self.supervised_model, 'model'): + self.supervised_model.model.to(self.device) + + # Load existing model if available and not creating new model + model_path = self.models_dir / "supervised_model_best.pt" + if model_path.exists() and not self.config.get('model', {}).get('new_model', False): + self.logger.info(f"Loading existing CNN model from {model_path}") + try: + self.supervised_model.load(str(model_path)) + self.logger.info("CNN model loaded successfully") + except Exception as e: + self.logger.error(f"Error loading CNN model: {str(e)}") + self.logger.info("Starting with a new CNN model") + else: + self.logger.info("Starting with a new CNN model") + + except Exception as e: + self.logger.error(f"Error initializing supervised model: {str(e)}") + import traceback + self.logger.error(traceback.format_exc()) + raise + + def _initialize_rl_agent(self, window_size): + """Initialize the RL agent""" + try: + # Get data for RL training + X_train_dict, _, _, _, _, _ = self.data_interface.prepare_training_data( + window_size=window_size, + refresh=True + ) + + if X_train_dict is None: + raise ValueError("Failed to load training data for RL agent") + + # Get reference timeframe features + reference_tf = min( + self.config['market_data']['timeframes'], + key=lambda x: self.data_interface.timeframe_to_seconds.get(x, 3600) + ) + + # Get feature count from the data + num_features = X_train_dict[reference_tf].shape[2] + + # Initialize RL agent + self.logger.info(f"Initializing RL agent") + + # State shape for DQN agent: (timeframes, window_size, features) + state_shape = (len(self.config['market_data']['timeframes']), window_size, num_features) + + self.rl_agent = DQNAgent( + state_shape=state_shape, + n_actions=3, # BUY/HOLD/SELL + epsilon=1.0, + epsilon_min=0.01, + epsilon_decay=0.995, + learning_rate=self.config['training']['learning_rate'], + gamma=0.95, + buffer_size=10000, + batch_size=self.config['training']['batch_size'], + device=self.device # Explicitly pass CPU device + ) + + # Explicitly move agent to CPU and force it to stay there + try: + # First set the device in the agent itself + self.rl_agent.device = self.device + + # Force PyTorch to use CPU by setting device on each model + if hasattr(self.rl_agent, 'policy_net'): + self.rl_agent.policy_net.to(self.device) + # Force all layers to CPU + for parameter in self.rl_agent.policy_net.parameters(): + parameter.data = parameter.data.to(self.device) + + if hasattr(self.rl_agent, 'target_net'): + self.rl_agent.target_net.to(self.device) + # Force all layers to CPU + for parameter in self.rl_agent.target_net.parameters(): + parameter.data = parameter.data.to(self.device) + + # Move models to the specified device + self.rl_agent.move_models_to_device(self.device) + self.logger.info(f"RL agent models moved to {self.device}") + except Exception as e: + self.logger.warning(f"Could not move RL agent models to device: {str(e)}") + + # Load existing agent if available and not creating new model + agent_path = self.models_dir / "rl_agent_best" + if os.path.exists(f"{agent_path}_policy.pt") and not self.config.get('model', {}).get('new_model', False): + self.logger.info(f"Loading existing RL agent from {agent_path}") + try: + self.rl_agent.load(str(agent_path)) + self.logger.info("RL agent loaded successfully") + except Exception as e: + self.logger.error(f"Error loading RL agent: {str(e)}") + self.logger.info("Starting with a new RL agent") + else: + self.logger.info("Starting with a new RL agent") + + # Reset epsilon if training a new model + if self.config.get('model', {}).get('new_model', False): + if hasattr(self.rl_agent, 'epsilon_start'): + self.rl_agent.epsilon = self.rl_agent.epsilon_start + self.logger.info(f"New model requested. Reset RL agent epsilon to starting value: {self.rl_agent.epsilon:.2f}") + else: + # Fallback if epsilon_start isn't defined, assume 1.0 + self.rl_agent.epsilon = 1.0 + self.logger.info("New model requested. Reset RL agent epsilon to default starting value: 1.00") + + except Exception as e: + self.logger.error(f"Error initializing RL agent: {str(e)}") + import traceback + self.logger.error(traceback.format_exc()) + raise + + def _initialize_chart(self): + """Initialize the RealTimeChart for visualization""" + try: + symbol = self.config['market_data']['symbol'] + self.logger.info(f"Initializing RealTimeChart for {symbol}") + + self.chart = RealTimeChart(symbol=symbol) + + # Start chart server in a background thread + dashboard_port = self.config.get('visualization', {}).get('port', 8050) + self.logger.info(f"Starting web dashboard for {symbol} on port {dashboard_port}") + self.chart_thread = threading.Thread( + target=lambda: self.chart.run(host='localhost', port=dashboard_port) + ) + self.chart_thread.daemon = True # Allow the thread to exit when main program exits + self.chart_thread.start() + self.logger.info(f"Web dashboard started at http://localhost:{dashboard_port}/") + + # Also start the websocket connection for real-time data + self.websocket_thread = threading.Thread( + target=lambda: asyncio.run(self.chart.start_websocket()) + ) + self.websocket_thread.daemon = True + self.websocket_thread.start() + self.logger.info(f"WebSocket connection started for {symbol}") + + except Exception as e: + self.logger.error(f"Error initializing chart: {str(e)}") + import traceback + self.logger.error(traceback.format_exc()) + self.chart = None + + def train_hybrid(self, iterations=10, sv_epochs_per_iter=5, rl_episodes_per_iter=2): + """ + Main hybrid training loop + + Args: + iterations: Number of hybrid iterations to run + sv_epochs_per_iter: Number of supervised epochs per iteration + rl_episodes_per_iter: Number of RL episodes per iteration + + Returns: + dict: Training statistics + """ + self.logger.info(f"Starting hybrid training with {iterations} iterations") + self.logger.info(f"Each iteration includes {sv_epochs_per_iter} supervised epochs and {rl_episodes_per_iter} RL episodes") + + # Training loop + for iteration in range(iterations): + if not running: + self.logger.info("Training stopped by user") + break + + self.logger.info(f"Iteration {iteration+1}/{iterations}") + self.iter_count += 1 + + # 1. Supervised learning phase + self.logger.info("Starting supervised learning phase") + sv_stats = self.train_supervised(epochs=sv_epochs_per_iter) + + # 2. Reinforcement learning phase + self.logger.info("Starting reinforcement learning phase") + rl_stats = self.train_reinforcement(episodes=rl_episodes_per_iter) + + # 3. Update global training stats + self._update_training_stats(sv_stats, rl_stats) + + # 4. Save models and stats + self._save_models_and_stats() + + # 5. Log to TensorBoard + if self.tensorboard_writer: + self._log_to_tensorboard(iteration, sv_stats, rl_stats) + + self.logger.info("Hybrid training completed") + return training_stats + + def train_supervised(self, epochs=5): + """Train the supervised CNN model""" + stats = { + "epochs": epochs, + "completed": 0, + "best_val_pnl": -float('inf'), + "best_win_rate": 0, + "final_loss": 0 + } + + self.logger.info(f"Training supervised model for {epochs} epochs") + + try: + # Prepare training data + window_size = self.config['market_data']['window_size'] + X_train_dict, y_train, X_val_dict, y_val, _, _ = self.data_interface.prepare_training_data( + window_size=window_size, + refresh=True + ) + + # Get reference timeframe for consistency + reference_tf = min( + self.config['market_data']['timeframes'], + key=lambda x: self.data_interface.timeframe_to_seconds.get(x, 3600) + ) + + # Check available samples + min_samples = min(X_train_dict[tf].shape[0] for tf in self.config['market_data']['timeframes']) + self.logger.info(f"Using {min_samples} samples for training") + + # Get the feature count per timeframe + features_per_tf = X_train_dict[reference_tf].shape[2] + total_features = features_per_tf * len(self.config['market_data']['timeframes']) + self.logger.info(f"Features per timeframe: {features_per_tf}, Total features: {total_features}") + + # Log timeframe data shapes for debugging + for tf in self.config['market_data']['timeframes']: + self.logger.info(f"Timeframe {tf} data shape: {X_train_dict[tf].shape}") + + # Prepare concatenated inputs for each sample across all timeframes + # Shape will be [samples, window_size, features*num_timeframes] + X_train_combined = np.zeros((min_samples, window_size, total_features)) + + # Fill the array with data from all timeframes + for i in range(min_samples): + # For each timeframe, stack the features horizontally for the same window + for tf_idx, tf in enumerate(self.config['market_data']['timeframes']): + # Place this timeframe's features at the appropriate position + start_idx = tf_idx * features_per_tf + end_idx = (tf_idx + 1) * features_per_tf + X_train_combined[i, :, start_idx:end_idx] = X_train_dict[tf][i] + + # For validation data - ensure we have validation data by splitting training data if needed + if X_val_dict is None or y_val is None or min(X_val_dict[tf].shape[0] for tf in self.config['market_data']['timeframes']) == 0: + # No validation data provided, use a portion of training data + self.logger.info("No validation data available, using 20% of training data for validation") + train_size = int(0.8 * min_samples) + + # Split the training data + X_train_split = X_train_combined[:train_size] + y_train_split = y_train[:train_size] + + X_val_combined = X_train_combined[train_size:min_samples] + y_val_np = y_train[train_size:min_samples] + + # Update training data + X_train_combined = X_train_split + y_train_np = y_train_split + else: + # For validation data + min_val_samples = min(X_val_dict[tf].shape[0] for tf in self.config['market_data']['timeframes']) + X_val_combined = np.zeros((min_val_samples, window_size, features_per_tf * len(self.config['market_data']['timeframes']))) + + for i in range(min_val_samples): + for tf_idx, tf in enumerate(self.config['market_data']['timeframes']): + start_idx = tf_idx * features_per_tf + end_idx = (tf_idx + 1) * features_per_tf + X_val_combined[i, :, start_idx:end_idx] = X_val_dict[tf][i] + + y_train_np = y_train[:min_samples] + y_val_np = y_val[:min_val_samples] + + self.logger.info(f"Prepared data: X_train shape: {X_train_combined.shape}, X_val shape: {X_val_combined.shape}") + + # Reset and initialize chart for trading information + if self.chart: + # Reset trading stats on the chart + if hasattr(self.chart, 'positions'): + self.chart.positions = [] + + if hasattr(self.chart, 'accumulative_pnl'): + self.chart.accumulative_pnl = 0.0 + + if hasattr(self.chart, 'current_balance'): + self.chart.current_balance = 100.0 + + if hasattr(self.chart, 'update_trading_info'): + self.chart.update_trading_info( + action="INIT", + prediction=None, + price=0.0, + timestamp=int(time.time() * 1000) + ) + + # Create a custom training loop instead of using the model's train method + # This gives us more control over the process + self.supervised_model.model.train() + + # History to store metrics + history = { + 'loss': [], + 'val_loss': [], + 'accuracy': [], + 'val_accuracy': [], + 'val_pnl': [] + } + + # Convert data to tensors + X_train_tensor = torch.tensor(X_train_combined, dtype=torch.float32).to(self.device) + y_train_tensor = torch.tensor(y_train_np, dtype=torch.long).to(self.device) + X_val_tensor = torch.tensor(X_val_combined, dtype=torch.float32).to(self.device) + y_val_tensor = torch.tensor(y_val_np, dtype=torch.long).to(self.device) + + # Verify that model's feature dimensions match the input data + if hasattr(self.supervised_model, 'total_features'): + expected_features = X_train_combined.shape[2] + if self.supervised_model.total_features != expected_features: + self.logger.warning(f"Model features ({self.supervised_model.total_features}) don't match input features ({expected_features})") + self.logger.info(f"Updating model's total_features to match input data") + self.supervised_model.total_features = expected_features + # Rebuild the layers with correct dimensions + if hasattr(self.supervised_model, '_create_layers'): + self.supervised_model._create_layers() + self.supervised_model.to(self.device) + # Reinitialize optimizer after changing the model + self.supervised_model.optimizer = optim.Adam( + self.supervised_model.parameters(), + lr=0.0001, + weight_decay=0.01 + ) + + # Create dataloaders + train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor) + train_loader = torch.utils.data.DataLoader( + train_dataset, + batch_size=self.config['training']['batch_size'], + shuffle=True + ) + + val_dataset = torch.utils.data.TensorDataset(X_val_tensor, y_val_tensor) + val_loader = torch.utils.data.DataLoader( + val_dataset, + batch_size=self.config['training']['batch_size'] + ) + + # Training loop + for epoch in range(epochs): + # Training phase + self.supervised_model.model.train() + train_loss = 0.0 + train_correct = 0 + train_total = 0 + + for inputs, targets in train_loader: + # Zero the parameter gradients + self.supervised_model.optimizer.zero_grad() + + # Forward pass + outputs, _ = self.supervised_model.model(inputs) + loss = self.supervised_model.criterion(outputs, targets) + + # Backward pass and optimize + loss.backward() + self.supervised_model.optimizer.step() + + # Statistics + train_loss += loss.item() + _, predicted = torch.max(outputs.data, 1) + train_total += targets.size(0) + train_correct += (predicted == targets).sum().item() + + # Calculate training metrics + train_loss = train_loss / len(train_loader) + train_accuracy = 100 * train_correct / train_total if train_total > 0 else 0 + + # Validation phase + self.supervised_model.model.eval() + val_loss = 0.0 + val_correct = 0 + val_total = 0 + all_predictions = [] + all_targets = [] + + with torch.no_grad(): + for inputs, targets in val_loader: + # Forward pass + outputs, _ = self.supervised_model.model(inputs) + loss = self.supervised_model.criterion(outputs, targets) + + # Statistics + val_loss += loss.item() + _, predicted = torch.max(outputs.data, 1) + val_total += targets.size(0) + val_correct += (predicted == targets).sum().item() + + # Store for PnL calculation + all_predictions.append(predicted.cpu().numpy()) + all_targets.append(targets.cpu().numpy()) + + # Calculate validation metrics + val_loss = val_loss / len(val_loader) + val_accuracy = 100 * val_correct / val_total if val_total > 0 else 0 + + # Calculate PnL using the robust DataInterface method + all_predictions = np.concatenate(all_predictions) + # We need the corresponding prices for the validation set + # Fetch the raw prices used for validation data + val_prices_start_index = len(y_train_np) # Assuming validation data follows training data + val_prices_end_index = val_prices_start_index + len(y_val_np) + # Get prices from the reference timeframe dataframe prepared earlier + if hasattr(self.data_interface, 'dataframes') and reference_tf in self.data_interface.dataframes: + reference_df = self.data_interface.dataframes[reference_tf] + # Ensure indices align with the X_val_combined data length + # We need prices corresponding to the END of each window in validation + price_indices = np.arange(len(X_train_combined) + window_size -1 , len(X_train_combined) + len(X_val_combined) + window_size - 1) + + # Clamp indices to be within bounds of the reference dataframe + price_indices = np.clip(price_indices, 0, len(reference_df) - 1) + + if len(price_indices) == len(all_predictions): + actual_val_prices = reference_df['close'].iloc[price_indices].values + pnl, win_rate, _ = self.data_interface.calculate_pnl(all_predictions, actual_val_prices) + self.logger.info(f"PnL calculation (robust) - Trades based on {len(actual_val_prices)} prices. Net PnL: {pnl:.4f}, Win Rate: {win_rate:.2f}") + else: + self.logger.warning(f"Price indices length ({len(price_indices)}) doesn't match predictions length ({len(all_predictions)}). Cannot calculate robust PnL.") + pnl, win_rate = 0.0, 0.0 # Fallback + else: + self.logger.warning("Reference timeframe data not available for robust PnL calculation.") + pnl, win_rate = 0.0, 0.0 # Fallback + + # Update history + history['loss'].append(train_loss) + history['val_loss'].append(val_loss) + history['accuracy'].append(train_accuracy) + history['val_accuracy'].append(val_accuracy) + history['val_pnl'].append(pnl) + + # Update stats + stats["completed"] += 1 + stats["final_loss"] = val_loss + + if pnl > stats["best_val_pnl"]: + stats["best_val_pnl"] = pnl + # Save best model by PnL + model_path = self.models_dir / "supervised_model_best.pt" + self.supervised_model.save(str(model_path)) + self.logger.info(f"New best CNN model saved with PnL: {pnl:.2f}") + + if win_rate > stats["best_win_rate"]: + stats["best_win_rate"] = win_rate + + # Log epoch results + self.logger.info(f"Epoch {epoch+1}/{epochs} - Train loss: {train_loss:.4f}, " + + f"Train acc: {train_accuracy:.2f}%, Val loss: {val_loss:.4f}, " + + f"Val acc: {val_accuracy:.2f}%, PnL: {pnl:.2f}, Win rate: {win_rate:.2f}") + + # Log to TensorBoard + if self.tensorboard_writer: + self.tensorboard_writer.add_scalar('SupervisedTrain/Loss', train_loss, self.supervised_epochs + epoch) + self.tensorboard_writer.add_scalar('SupervisedTrain/Accuracy', train_accuracy, self.supervised_epochs + epoch) + self.tensorboard_writer.add_scalar('SupervisedVal/Loss', val_loss, self.supervised_epochs + epoch) + self.tensorboard_writer.add_scalar('SupervisedVal/Accuracy', val_accuracy, self.supervised_epochs + epoch) + self.tensorboard_writer.add_scalar('SupervisedVal/PnL', pnl, self.supervised_epochs + epoch) + self.tensorboard_writer.add_scalar('SupervisedVal/WinRate', win_rate * 100, self.supervised_epochs + epoch) + + # Update chart with model predictions + if self.chart and epoch % 2 == 0: # Update every other epoch + # Use the model to make predictions on some validation data for visualization + try: + # Choose a subset of validation data for visualization + viz_size = min(20, len(X_val_tensor)) + viz_indices = np.random.choice(len(X_val_tensor), viz_size, replace=False) + viz_inputs = X_val_tensor[viz_indices] + viz_targets = y_val_tensor[viz_indices] + + # Get predictions + self.supervised_model.model.eval() + with torch.no_grad(): + outputs, _ = self.supervised_model.model(viz_inputs) + probs = F.softmax(outputs, dim=1) + _, predictions = torch.max(probs, 1) + + # Display last few predictions in the chart + for i in range(min(5, viz_size)): + timestamp_ms = int(time.time() * 1000) + i * 1000 # Space them out + + # Get prediction and target + pred_idx = predictions[i].item() + target_idx = viz_targets[i].item() + action_names = ["BUY", "HOLD", "SELL"] + pred_action = action_names[pred_idx] + + # Get confidence + confidence = probs[i, pred_idx].item() + + # Add to chart + if hasattr(self.chart, 'latest_price') and self.chart.latest_price is not None: + display_price = self.chart.latest_price + else: + display_price = 20000 + np.random.randn() * 100 # Placeholder price for BTC + + # Add signal to chart + if hasattr(self.chart, 'add_nn_signal'): + self.chart.add_nn_signal( + symbol=self.config['market_data']['symbol'], + signal=pred_action, + confidence=confidence, + timestamp=timestamp_ms + ) + + # Update trading info + if hasattr(self.chart, 'update_trading_info'): + self.chart.update_trading_info( + action="EPOCH_VIZ", + prediction=f"SV Acc: {val_accuracy:.1f}%, PnL: {pnl:.1f}", + price=display_price, + timestamp=int(time.time() * 1000) + ) + except Exception as e: + self.logger.warning(f"Error updating chart during supervised viz: {str(e)}") + + # Update supervised epochs counter + self.supervised_epochs += epochs + + except Exception as e: + self.logger.error(f"Error in supervised learning: {str(e)}") + import traceback + self.logger.error(traceback.format_exc()) + + return stats + + def train_reinforcement(self, episodes=2): + """Train the RL agent""" + stats = { + "episodes": episodes, + "completed": 0, + "best_reward": -float('inf'), + "final_reward": 0, + "avg_reward": 0 + } + + self.logger.info(f"Training RL agent for {episodes} episodes") + + try: + # Prepare data for RL training + window_size = self.config['market_data']['window_size'] + X_train_dict, y_train, _, _, _, _ = self.data_interface.prepare_training_data( + window_size=window_size, + refresh=True # Ensure we get relatively fresh data for this iteration + ) + + if X_train_dict is None or not X_train_dict or y_train is None: + self.logger.error("Failed to get training data for RL phase.") + return stats + + # Get reference timeframe + reference_tf = min( + self.config['market_data']['timeframes'], + key=lambda x: self.data_interface.timeframe_to_seconds.get(x, 3600) + ) + + # Find minimum length across all timeframes + min_length = min(len(X_train_dict[tf]) for tf in self.config['market_data']['timeframes'] if X_train_dict[tf] is not None) + if min_length <= window_size + 1: + self.logger.error(f"Not enough data samples ({min_length}) for RL training with window size {window_size}.") + return stats + self.logger.info(f"Using {min_length} samples from each timeframe for RL training preparation") + + # For DQN we need to reshape data according to state_shape=(timeframes, window_size, features) + states = [] + num_features = X_train_dict[reference_tf].shape[2] + num_timeframes = len(self.config['market_data']['timeframes']) + + for i in range(min_length - 1): # -1 to ensure we have next states + state = np.zeros((num_timeframes, window_size, num_features), dtype=np.float32) + valid_state = True + for tf_idx, tf in enumerate(self.config['market_data']['timeframes']): + if X_train_dict[tf] is None or len(X_train_dict[tf]) <= i: + valid_state = False + break + state[tf_idx] = X_train_dict[tf][i] + if valid_state: + states.append(state) + else: + # This should ideally not happen if min_length was calculated correctly + self.logger.warning(f"Skipping state preparation at index {i} due to insufficient data in a timeframe.") + + # Get actions from labels (corresponding to the prepared states) + actions = [] + # Ensure y_train is sliced correctly to match the number of prepared states + num_states = len(states) + if len(y_train) >= num_states: + y_train_sliced = y_train[:num_states] + for i in range(num_states): + # Ensure y_train_sliced[i] is a valid array/list before argmax + if isinstance(y_train_sliced[i], (np.ndarray, list)) and len(y_train_sliced[i]) > 0: + actions.append(np.argmax(y_train_sliced[i])) + else: + # Handle cases where y_train_sliced[i] might be invalid + self.logger.warning(f"Invalid label found at index {i}, using default action (HOLD=1). Label: {y_train_sliced[i]}") + actions.append(1) # Default to HOLD + else: + self.logger.error(f"Mismatch between number of states ({num_states}) and labels ({len(y_train)}). Cannot proceed with RL training.") + return stats + + self.logger.info(f"Prepared {len(states)} state-action pairs for RL training") + if not states: + self.logger.error("No states were prepared for RL training.") + return stats + + # --- Pre-calculate Supervised Predictions --- + self.logger.info("Pre-calculating supervised model predictions for RL states...") + sv_predictions = [] + try: + self.supervised_model.model.eval() # Set model to evaluation mode + with torch.no_grad(): + # Reshape states for supervised model: [batch, window_size, features*num_timeframes] + reshaped_states_list = [] + for state in states: + # state shape: [timeframes, window_size, features] + # Target shape: [window_size, features*num_timeframes] + reshaped_state = state.transpose(1, 0, 2).reshape(window_size, -1) + reshaped_states_list.append(reshaped_state) + + if reshaped_states_list: + reshaped_states_batch = np.array(reshaped_states_list) + states_tensor = torch.tensor(reshaped_states_batch, dtype=torch.float32).to(self.device) + + # Process in batches if necessary to avoid memory issues + sv_batch_size = 128 + num_batches = int(np.ceil(len(states_tensor) / sv_batch_size)) + for j in range(num_batches): + batch_start = j * sv_batch_size + batch_end = min((j + 1) * sv_batch_size, len(states_tensor)) + batch_tensor = states_tensor[batch_start:batch_end] + + outputs, _ = self.supervised_model.model(batch_tensor) + _, predicted_actions = torch.max(outputs.data, 1) + sv_predictions.extend(predicted_actions.cpu().numpy()) + + self.logger.info(f"Finished pre-calculating {len(sv_predictions)} supervised predictions.") + if len(sv_predictions) != len(states): + self.logger.error(f"Mismatch in supervised predictions ({len(sv_predictions)}) and states ({len(states)}). Aborting RL phase.") + return stats + except Exception as e: + self.logger.error(f"Error during supervised prediction pre-calculation: {e}") + import traceback + self.logger.error(traceback.format_exc()) + return stats # Cannot proceed without supervised predictions for consensus + + # Reset and initialize chart for trading information + if self.chart: + # Reset trading stats on the chart + if hasattr(self.chart, 'positions'): + self.chart.positions = [] + if hasattr(self.chart, 'accumulative_pnl'): + self.chart.accumulative_pnl = 0.0 + if hasattr(self.chart, 'current_balance'): + self.chart.current_balance = 100.0 + if hasattr(self.chart, 'update_trading_info'): + self.chart.update_trading_info(action="INIT", prediction=None, price=0.0, timestamp=int(time.time() * 1000)) + + # Training loop + for episode in range(episodes): + # --- Check and potentially bump epsilon --- + if self.rl_agent.epsilon <= self.rl_agent.epsilon_min + 1e-6: # Check if epsilon is at/near minimum + # Bump epsilon slightly to encourage exploration if stuck + bump_value = 0.1 + self.rl_agent.epsilon = min(self.rl_agent.epsilon_min + bump_value, self.rl_agent.epsilon_start) + self.logger.warning(f"RL agent epsilon was at minimum. Bumped to {self.rl_agent.epsilon:.4f} for episode {episode+1}") + + if not running: + self.logger.info("RL training interrupted") + break + + episode_reward = 0 + correct_actions = 0 + consensus_actions = 0 + + # Sample a segment of the data + # Ensure segment size is reasonable and doesn't exceed available states + segment_size = min(200, len(states) -1) # Max 200 steps or available data + if segment_size <= 0: + self.logger.warning(f"Not enough states ({len(states)}) to form a training segment. Skipping episode {episode+1}.") + continue + + start_idx = np.random.randint(0, len(states) - segment_size) if len(states) > segment_size else 0 + end_idx = start_idx + segment_size + + self.logger.info(f"RL Episode {episode+1}/{episodes}: Training on segment [{start_idx}:{end_idx}]") + + # Train on segment + for i in range(start_idx, end_idx): + state = states[i] + # Original intended action based on labels + true_action = actions[i] + # Get RL agent's predicted action + rl_pred_action = self.rl_agent.act(state) + # Get pre-calculated supervised prediction + sv_pred_action = sv_predictions[i] + + next_state = states[i + 1] + + # Calculate reward based on price change (standard reward) + try: + # Ensure indices are valid for X_train_dict + if i < len(X_train_dict[reference_tf]) and i+1 < len(X_train_dict[reference_tf]): + price_current = X_train_dict[reference_tf][i][-1, -1] # Closing price + price_next = X_train_dict[reference_tf][i+1][-1, -1] + price_change = (price_next - price_current) / price_current if price_current != 0 else 0 + else: + price_change = 0 + self.logger.warning(f"Index {i} or {i+1} out of bounds for price calculation.") + + except IndexError: + price_change = 0 + self.logger.warning(f"IndexError during price calculation at step {i}. Using price_change = 0.") + except Exception as e: + price_change = 0 + self.logger.error(f"Unexpected error during price calculation: {e}") + + # Define standard reward based on the RL agent's action and outcome + if rl_pred_action == 0: # Buy + reward = price_change * 100 + elif rl_pred_action == 2: # Sell + reward = -price_change * 100 + else: # Hold (action 1) + # Penalize holding during significant moves, slightly reward holding in stable periods + reward = -abs(price_change) * 50 if abs(price_change) > 0.0005 else abs(price_change) * 10 + + # --- Apply Consensus Modifier --- + consensus_met = (sv_pred_action == rl_pred_action) + if not consensus_met and rl_pred_action != 1: # If actions disagree and RL didn't choose HOLD + reward -= 5 # REDUCED Penalty for disagreement + # self.logger.debug(f"Step {i}: RL ({rl_pred_action}) vs SV ({sv_pred_action}) disagree. Penalty applied.") + elif consensus_met and rl_pred_action != 1: + consensus_actions += 1 # Count consensus non-hold actions + + # Check if RL action matches the true label action + if rl_pred_action == true_action: + correct_actions += 1 + + # Remember experience (using the true action from labels, but the modified reward) + done = (i == end_idx - 1) + self.rl_agent.remember(state, true_action, reward, next_state, done) + + # Replay experiences periodically + if i % 10 == 0: + self.rl_agent.replay() + + episode_reward += reward + + # Update chart with predicted trading information (no actual trades logged here) + if self.chart and i % 5 == 0: + timestamp_ms = int(time.time() * 1000) + action_names = ["BUY", "HOLD", "SELL"] + action_name = action_names[rl_pred_action] # Show RL's predicted action + + # Display price logic... (remains the same) + if hasattr(self.chart, 'latest_price') and self.chart.latest_price is not None: + display_price = self.chart.latest_price + else: + display_price = price_current if 'price_current' in locals() else 0 + + # Add predicted signal to chart + if hasattr(self.chart, 'add_nn_signal'): + # Indicate consensus in the signal display if possible + signal_text = f"{action_name}{'*' if consensus_met else ''}" + self.chart.add_nn_signal( + symbol=self.config['market_data']['symbol'], + signal=signal_text, # Append '*' for consensus + confidence=0.7, # Placeholder + timestamp=timestamp_ms + ) + + # Update info display + if hasattr(self.chart, 'update_trading_info'): + consensus_status = "Yes" if consensus_met else "No" + info_text = f"RL: {action_name}, SV: {action_names[sv_pred_action]}, Consensus: {consensus_status}" + self.chart.update_trading_info( + action=action_name, # Still show RL action mainly + prediction=info_text, # Add consensus info + price=display_price, + timestamp=timestamp_ms + ) + + # Calculate accuracy & consensus rate for the episode + segment_len = end_idx - start_idx + accuracy = (correct_actions / segment_len) * 100 if segment_len > 0 else 0 + consensus_rate = (consensus_actions / segment_len) * 100 if segment_len > 0 else 0 # Rate of non-hold consensus actions + + # Update the chart with final episode metrics + if self.chart: + # Keep updating the text display if needed + if hasattr(self.chart, 'update_trading_info'): + self.chart.update_trading_info( + action="RL_EP_END", + prediction=f"Reward: {episode_reward:.1f}, Acc: {accuracy:.1f}%, Cons: {consensus_rate:.1f}%", + price=getattr(self.chart, 'latest_price', 0), + timestamp=int(time.time() * 1000) + ) + + # Log results + self.logger.info(f"RL Episode {episode+1} - Reward: {episode_reward:.2f}, " + + f"Accuracy: {accuracy:.2f}%, Consensus Rate: {consensus_rate:.2f}%, Epsilon: {self.rl_agent.epsilon:.4f}") + + # Update stats + stats["completed"] += 1 + stats["final_reward"] = episode_reward + stats["avg_reward"] = self.rl_agent.avg_reward + + # Save best model based on reward + if episode_reward > stats["best_reward"]: + stats["best_reward"] = episode_reward + self.rl_agent.save(str(self.models_dir / "rl_agent_best")) + self.logger.info(f"New best RL model saved with reward: {episode_reward:.2f}") + + self.rl_episodes += episodes + + except Exception as e: + self.logger.error(f"Error in reinforcement learning: {str(e)}") + import traceback + self.logger.error(traceback.format_exc()) + + return stats + + def _update_training_stats(self, sv_stats, rl_stats): + """Update global training statistics""" + global training_stats + + # Ensure sv_stats has the necessary keys + if not isinstance(sv_stats, dict) or "completed" not in sv_stats: + self.logger.warning("Supervised training stats missing expected keys, using defaults") + sv_stats = { + "completed": 0, + "best_val_pnl": -float('inf'), + "best_win_rate": 0, + "final_loss": 0 + } + + # Ensure rl_stats has the necessary keys + if not isinstance(rl_stats, dict) or "completed" not in rl_stats: + self.logger.warning("RL training stats missing expected keys, using defaults") + rl_stats = { + "completed": 0, + "best_reward": -float('inf'), + "final_reward": 0, + "avg_reward": 0 + } + + # Update supervised stats + training_stats["supervised"]["epochs_completed"] += sv_stats.get("completed", 0) + if sv_stats.get("best_val_pnl", -float('inf')) > training_stats["supervised"]["best_val_pnl"]: + training_stats["supervised"]["best_val_pnl"] = sv_stats["best_val_pnl"] + training_stats["supervised"]["best_epoch"] = self.supervised_epochs + if sv_stats.get("best_win_rate", 0) > training_stats["supervised"]["best_win_rate"]: + training_stats["supervised"]["best_win_rate"] = sv_stats["best_win_rate"] + + # Update reinforcement stats + training_stats["reinforcement"]["episodes_completed"] += rl_stats.get("completed", 0) + if rl_stats.get("best_reward", -float('inf')) > training_stats["reinforcement"]["best_reward"]: + training_stats["reinforcement"]["best_reward"] = rl_stats["best_reward"] + training_stats["reinforcement"]["best_episode"] = self.rl_episodes + + # Update hybrid stats + training_stats["hybrid"]["iterations_completed"] = self.iter_count + + # Calculate a combined score (simple weighted average) + # Ensure values are valid numbers before calculation + sv_pnl_score = training_stats["supervised"]["best_val_pnl"] if isinstance(training_stats["supervised"]["best_val_pnl"], (int, float)) and np.isfinite(training_stats["supervised"]["best_val_pnl"]) else 0.0 + rl_reward_score = training_stats["reinforcement"]["best_reward"] if isinstance(training_stats["reinforcement"]["best_reward"], (int, float)) and np.isfinite(training_stats["reinforcement"]["best_reward"]) else 0.0 + + combined_score = (sv_pnl_score * 0.5) + (rl_reward_score * 0.5) + + if combined_score > training_stats["hybrid"]["best_combined_score"]: + training_stats["hybrid"]["best_combined_score"] = combined_score + + training_stats["hybrid"]["last_update"] = datetime.now().isoformat() + + # Log updated stats + self.logger.info(f"Updated training stats - Combined score: {combined_score:.2f}") + + def _save_models_and_stats(self): + """Save models and training statistics""" + # Save models (Best models are saved within their respective training methods) + # Consider saving latest models here if needed + + # Save stats to JSON + stats_path = self.models_dir / f"hybrid_stats_{timestamp}.json" + try: + with open(stats_path, 'w') as f: + # Use a custom JSON encoder for numpy types if necessary + json.dump(training_stats, f, indent=2, default=lambda x: float(x) if isinstance(x, (np.float32, np.float64)) else x) + + # Also save to a consistent filename for easy access + latest_path = self.models_dir / "hybrid_stats_latest.json" + with open(latest_path, 'w') as f: + json.dump(training_stats, f, indent=2, default=lambda x: float(x) if isinstance(x, (np.float32, np.float64)) else x) + + self.logger.info(f"Saved training stats to {stats_path} and {latest_path}") + except Exception as e: + self.logger.error(f"Error saving training stats: {e}") + + def _log_to_tensorboard(self, iteration, sv_stats, rl_stats): + """Log metrics to TensorBoard""" + if not self.tensorboard_writer: + return + + # Ensure stats are dictionaries + sv_stats = sv_stats or {} + rl_stats = rl_stats or {} + + # Log supervised metrics + self.tensorboard_writer.add_scalar('Supervised/FinalLoss_PerIter', sv_stats.get("final_loss", 0), iteration) + self.tensorboard_writer.add_scalar('Supervised/BestPnL_Overall', training_stats['supervised']['best_val_pnl'], iteration) + self.tensorboard_writer.add_scalar('Supervised/BestWinRate_Overall', training_stats['supervised']['best_win_rate'], iteration) + + # Log RL metrics + self.tensorboard_writer.add_scalar('RL/FinalReward_PerIter', rl_stats.get("final_reward", 0), iteration) + self.tensorboard_writer.add_scalar('RL/BestReward_Overall', training_stats['reinforcement']['best_reward'], iteration) + self.tensorboard_writer.add_scalar('RL/AvgReward_PerIter', rl_stats.get("avg_reward", 0), iteration) + self.tensorboard_writer.add_scalar('RL/Epsilon_Current', self.rl_agent.epsilon if self.rl_agent else 0, iteration) + + # Log combined metrics + combined_score = training_stats['hybrid']['best_combined_score'] + self.tensorboard_writer.add_scalar('Hybrid/CombinedScore_Overall', combined_score, iteration) + self.tensorboard_writer.add_scalar('Hybrid/Iterations', self.iter_count, iteration) + +def main(): + """Main entry point""" + # Parse command line arguments + parser = argparse.ArgumentParser(description='Hybrid Training with CPU Compatibility Fixes') + parser.add_argument('--iterations', type=int, default=10, help='Number of hybrid iterations') + parser.add_argument('--sv-epochs', type=int, default=5, help='Supervised epochs per iteration') + parser.add_argument('--rl-episodes', type=int, default=2, help='RL episodes per iteration') + parser.add_argument('--symbol', type=str, default='BTC/USDT', help='Trading symbol') + parser.add_argument('--timeframes', type=str, default='1m,5m,15m', help='Comma-separated timeframes') + parser.add_argument('--window', type=int, default=24, help='Window size for input data') + parser.add_argument('--batch-size', type=int, default=64, help='Batch size for training') + parser.add_argument('--new-model', action='store_true', help='Start with new models instead of loading existing') + parser.add_argument('--no-dashboard', action='store_true', help='Disable web dashboard') + parser.add_argument('--dashboard-port', type=int, default=8050, help='Port for web dashboard') + args = parser.parse_args() + + # Create custom config + custom_config = { + 'market_data': { + 'symbol': args.symbol, + 'timeframes': args.timeframes.split(','), + 'window_size': args.window + }, + 'training': { + 'batch_size': args.batch_size, + 'learning_rate': 0.0001, # Conservative learning rate + }, + 'hardware': { + 'device': 'cpu', # Force CPU + 'mixed_precision': False # Disable mixed precision + }, + 'model': { + 'new_model': args.new_model + }, + 'visualization': { + 'enabled': not args.no_dashboard, + 'port': args.dashboard_port + } + } + + # Get config from train_config + config = train_config.get_config('hybrid', custom_config) + + # Save the config for reference + config_dir = Path('configs') + config_dir.mkdir(exist_ok=True) + train_config.save_config(config, f"configs/hybrid_training_{timestamp}.json") + + # Initialize the hybrid model + model = HybridModel(config) + if not model.initialize(): + logger.error("Failed to initialize hybrid model") + return + + # Show instructions for the web dashboard if enabled + if not args.no_dashboard: + dash_url = f"http://localhost:{args.dashboard_port}" + logger.info(f"Web dashboard is enabled at {dash_url}") + logger.info("You can monitor training progress, see predictions and track PnL in real-time.") + logger.info("Press Ctrl+C to gracefully terminate training (models will be saved).") + + # Run the training + stats = model.train_hybrid( + iterations=args.iterations, + sv_epochs_per_iter=args.sv_epochs, + rl_episodes_per_iter=args.rl_episodes + ) + + # Log final results + logger.info("Training completed successfully") + logger.info(f"Best supervised PnL: {stats['supervised']['best_val_pnl']:.4f}") + logger.info(f"Best RL reward: {stats['reinforcement']['best_reward']:.4f}") + logger.info(f"Best combined score: {stats['hybrid']['best_combined_score']:.4f}") + + # Close TensorBoard writer + if model.tensorboard_writer: + model.tensorboard_writer.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/train_improved_rl.py b/train_improved_rl.py index 56792da..829c252 100644 --- a/train_improved_rl.py +++ b/train_improved_rl.py @@ -27,7 +27,7 @@ if project_root not in sys.path: from NN.models.dqn_agent import DQNAgent from NN.utils.trading_env import TradingEnvironment from NN.utils.data_interface import DataInterface -from realtime import BinanceHistoricalData, RealTimeChart +from dataprovider_realtime import BinanceHistoricalData, RealTimeChart # Configure logging log_filename = f'improved_rl_training_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' diff --git a/train_rl_with_realtime.py b/train_rl_with_realtime.py index 057da74..dadd9ac 100644 --- a/train_rl_with_realtime.py +++ b/train_rl_with_realtime.py @@ -1055,7 +1055,7 @@ async def start_realtime_chart(symbol="ETH/USDT", port=8050, manual_mode=False): Returns: tuple: (RealTimeChart instance, websocket task) """ - from realtime import RealTimeChart + from dataprovider_realtime import RealTimeChart try: logger.info(f"Initializing RealTimeChart for {symbol}") @@ -1105,7 +1105,7 @@ async def start_realtime_chart(symbol="ETH/USDT", port=8050, manual_mode=False): def _add_trade_compat(chart, price, timestamp, amount, pnl=0.0, action="BUY"): """Compatibility function for adding trades to the chart""" - from realtime import Position + from dataprovider_realtime import Position try: # Create a new position