import numpy as np import gym from gym import spaces from typing import Dict, Tuple, List import pandas as pd import logging # Configure logger logger = logging.getLogger(__name__) class TradingEnvironment(gym.Env): """ Custom trading environment for reinforcement learning """ def __init__(self, data: pd.DataFrame, initial_balance: float = 100.0, fee_rate: float = 0.0002, max_steps: int = 1000, window_size: int = 20, risk_aversion: float = 0.2, # Controls how much to penalize volatility price_scaling: str = 'zscore', # 'zscore', 'minmax', or 'raw' reward_scaling: float = 10.0, # Scale factor for rewards episode_penalty: float = 0.1): # Penalty for active positions at end of episode super(TradingEnvironment, self).__init__() self.data = data self.initial_balance = initial_balance self.fee_rate = fee_rate self.max_steps = max_steps self.window_size = window_size self.risk_aversion = risk_aversion self.price_scaling = price_scaling self.reward_scaling = reward_scaling self.episode_penalty = episode_penalty # Preprocess data if needed self._preprocess_data() # Action space: 0 (BUY), 1 (SELL), 2 (HOLD) self.action_space = spaces.Discrete(3) # Observation space: price data, technical indicators, and account state feature_dim = self.data.shape[1] + 3 # Adding position, equity, unrealized_pnl self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=(feature_dim,), dtype=np.float32 ) # Initialize state self.reset() def _preprocess_data(self): """Preprocess data - normalize or standardize features""" # Store the original data for reference self.original_data = self.data.copy() # Normalize price data based on the selected method if self.price_scaling == 'zscore': # For each feature, apply z-score normalization for col in self.data.columns: if col in ['open', 'high', 'low', 'close']: mean = self.data[col].mean() std = self.data[col].std() if std > 0: self.data[col] = (self.data[col] - mean) / std # Normalize volume separately elif col == 'volume': mean = self.data[col].mean() std = self.data[col].std() if std > 0: self.data[col] = (self.data[col] - mean) / std elif self.price_scaling == 'minmax': # For each feature, apply min-max scaling for col in self.data.columns: min_val = self.data[col].min() max_val = self.data[col].max() if max_val > min_val: self.data[col] = (self.data[col] - min_val) / (max_val - min_val) def reset(self) -> np.ndarray: """Reset the environment to initial state""" self.current_step = self.window_size self.balance = self.initial_balance self.position = 0 # 0: no position, 1: long position, -1: short position self.entry_price = 0 self.entry_time = 0 self.total_trades = 0 self.winning_trades = 0 self.losing_trades = 0 self.total_pnl = 0 self.balance_history = [self.initial_balance] self.equity_history = [self.initial_balance] self.max_balance = self.initial_balance self.max_drawdown = 0 # Trading performance metrics self.trade_durations = [] # Track how long trades are held self.returns = [] # Track returns of each trade # For analyzing trade clustering self.last_action_time = 0 self.actions_taken = [] return self._get_observation() def _get_observation(self) -> np.ndarray: """Get current observation state with account information""" # Get market data for the current step market_data = self.data.iloc[self.current_step].values # Get current price current_price = self.original_data.iloc[self.current_step]['close'] # Calculate unrealized PnL unrealized_pnl = 0 if self.position != 0: price_diff = current_price - self.entry_price unrealized_pnl = self.position * price_diff # Calculate total equity (balance + unrealized PnL) equity = self.balance + unrealized_pnl # Normalize account state normalized_position = self.position # -1, 0, or 1 normalized_equity = equity / self.initial_balance - 1.0 # Percent change from initial normalized_unrealized_pnl = unrealized_pnl / self.initial_balance if self.initial_balance > 0 else 0 # Combine market data with account state account_state = np.array([normalized_position, normalized_equity, normalized_unrealized_pnl]) observation = np.concatenate([market_data, account_state]) # Handle any NaN values observation = np.nan_to_num(observation, nan=0.0) return observation def _calculate_reward(self, action: int) -> float: """ Calculate reward based on action and outcome with improved risk-adjusted metrics Args: action: The action taken (0=BUY, 1=SELL, 2=HOLD) Returns: float: Calculated reward value """ # Get current price and next price current_price = self.original_data.iloc[self.current_step]['close'] # Default reward is slightly negative to discourage excessive trading reward = -0.0001 pnl = 0.0 # Handle different actions based on current position if self.position == 0: # No position if action == 0: # BUY self.position = 1 self.entry_price = current_price self.entry_time = self.current_step reward = -self.fee_rate # Small penalty for trading cost elif action == 1: # SELL (start short position) self.position = -1 self.entry_price = current_price self.entry_time = self.current_step reward = -self.fee_rate # Small penalty for trading cost # else action == 2 (HOLD) - keep the small negative reward elif self.position > 0: # Long position if action == 1: # SELL (close long) # Calculate profit/loss price_diff = current_price - self.entry_price pnl = price_diff / self.entry_price - 2 * self.fee_rate # Account for entry and exit fees # Adjust reward based on PnL and risk reward = pnl * self.reward_scaling # Track trade performance self.total_trades += 1 if pnl > 0: self.winning_trades += 1 else: self.losing_trades += 1 # Calculate trade duration trade_duration = self.current_step - self.entry_time self.trade_durations.append(trade_duration) # Update returns list self.returns.append(pnl) # Update balance and reset position self.balance *= (1 + pnl) self.balance_history.append(self.balance) self.max_balance = max(self.max_balance, self.balance) self.total_pnl += pnl # Reset position self.position = 0 elif action == 0: # BUY (while already long) # Penalize trying to increase an already active position reward = -0.001 # else action == 2 (HOLD) - calculate unrealized P&L for reward else: price_diff = current_price - self.entry_price unrealized_pnl = price_diff / self.entry_price # Small reward/penalty based on unrealized P&L reward = unrealized_pnl * 0.05 # Scale down to encourage holding good positions elif self.position < 0: # Short position if action == 0: # BUY (close short) # Calculate profit/loss price_diff = self.entry_price - current_price pnl = price_diff / self.entry_price - 2 * self.fee_rate # Account for entry and exit fees # Adjust reward based on PnL and risk reward = pnl * self.reward_scaling # Track trade performance self.total_trades += 1 if pnl > 0: self.winning_trades += 1 else: self.losing_trades += 1 # Calculate trade duration trade_duration = self.current_step - self.entry_time self.trade_durations.append(trade_duration) # Update returns list self.returns.append(pnl) # Update balance and reset position self.balance *= (1 + pnl) self.balance_history.append(self.balance) self.max_balance = max(self.max_balance, self.balance) self.total_pnl += pnl # Reset position self.position = 0 elif action == 1: # SELL (while already short) # Penalize trying to increase an already active position reward = -0.001 # else action == 2 (HOLD) - calculate unrealized P&L for reward else: price_diff = self.entry_price - current_price unrealized_pnl = price_diff / self.entry_price # Small reward/penalty based on unrealized P&L reward = unrealized_pnl * 0.05 # Scale down to encourage holding good positions # Record the action self.actions_taken.append(action) self.last_action_time = self.current_step # Update equity history (balance + unrealized P&L) current_equity = self.balance if self.position != 0: # Calculate unrealized P&L if self.position > 0: # Long price_diff = current_price - self.entry_price unrealized_pnl = price_diff / self.entry_price * self.balance else: # Short price_diff = self.entry_price - current_price unrealized_pnl = price_diff / self.entry_price * self.balance current_equity = self.balance + unrealized_pnl self.equity_history.append(current_equity) # Calculate current drawdown peak_equity = max(self.equity_history) current_drawdown = (peak_equity - current_equity) / peak_equity if peak_equity > 0 else 0 self.max_drawdown = max(self.max_drawdown, current_drawdown) # Apply risk aversion factor - penalize volatility if len(self.returns) > 1: returns_std = np.std(self.returns) reward -= returns_std * self.risk_aversion return reward, pnl def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]: """Execute one step in the environment""" # Calculate reward and update state reward, pnl = self._calculate_reward(action) # Move to next step self.current_step += 1 # Check if episode is done done = self.current_step >= min(self.max_steps - 1, len(self.data) - 1) # Apply penalty if episode ends with open position if done and self.position != 0: reward -= self.episode_penalty # Force close the position at the end if still open current_price = self.original_data.iloc[self.current_step]['close'] if self.position > 0: # Long position price_diff = current_price - self.entry_price pnl = price_diff / self.entry_price - 2 * self.fee_rate else: # Short position price_diff = self.entry_price - current_price pnl = price_diff / self.entry_price - 2 * self.fee_rate # Update balance self.balance *= (1 + pnl) self.total_pnl += pnl # Track trade self.total_trades += 1 if pnl > 0: self.winning_trades += 1 else: self.losing_trades += 1 # Reset position self.position = 0 # Get next observation observation = self._get_observation() # Calculate sharpe ratio and sortino ratio if possible sharpe_ratio = 0 sortino_ratio = 0 win_rate = self.winning_trades / max(1, self.total_trades) if len(self.returns) > 1: mean_return = np.mean(self.returns) std_return = np.std(self.returns) if std_return > 0: sharpe_ratio = mean_return / std_return # For sortino, we only consider downside deviation downside_returns = [r for r in self.returns if r < 0] if downside_returns: downside_deviation = np.std(downside_returns) if downside_deviation > 0: sortino_ratio = mean_return / downside_deviation # Calculate average trade duration avg_trade_duration = np.mean(self.trade_durations) if self.trade_durations else 0 # Additional info info = { 'balance': self.balance, 'position': self.position, 'total_trades': self.total_trades, 'win_rate': win_rate, 'total_pnl': self.total_pnl, 'max_drawdown': self.max_drawdown, 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio, 'avg_trade_duration': avg_trade_duration, 'pnl': pnl, 'gain': (self.balance - self.initial_balance) / self.initial_balance } return observation, reward, done, info def render(self, mode='human'): """Render the environment""" if mode == 'human': print(f"Step: {self.current_step}") print(f"Balance: ${self.balance:.2f}") print(f"Position: {self.position}") print(f"Total Trades: {self.total_trades}") print(f"Win Rate: {self.winning_trades/max(1, self.total_trades):.2%}") print(f"Total PnL: ${self.total_pnl:.2f}") print(f"Max Drawdown: {self.max_drawdown:.2%}") print(f"Sharpe Ratio: {self._calculate_sharpe_ratio():.4f}") print("-" * 50) def _calculate_sharpe_ratio(self): """Calculate Sharpe ratio from returns""" if len(self.returns) < 2: return 0.0 mean_return = np.mean(self.returns) std_return = np.std(self.returns) if std_return == 0: return 0.0 return mean_return / std_return