396 lines
16 KiB
Python
396 lines
16 KiB
Python
import numpy as np
|
|
import gym
|
|
from gym import spaces
|
|
from typing import Dict, Tuple, List
|
|
import pandas as pd
|
|
import logging
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TradingEnvironment(gym.Env):
|
|
"""
|
|
Custom trading environment for reinforcement learning
|
|
"""
|
|
def __init__(self,
|
|
data: pd.DataFrame,
|
|
initial_balance: float = 100.0,
|
|
fee_rate: float = 0.0002,
|
|
max_steps: int = 1000,
|
|
window_size: int = 20,
|
|
risk_aversion: float = 0.2, # Controls how much to penalize volatility
|
|
price_scaling: str = 'zscore', # 'zscore', 'minmax', or 'raw'
|
|
reward_scaling: float = 10.0, # Scale factor for rewards
|
|
episode_penalty: float = 0.1): # Penalty for active positions at end of episode
|
|
super(TradingEnvironment, self).__init__()
|
|
|
|
self.data = data
|
|
self.initial_balance = initial_balance
|
|
self.fee_rate = fee_rate
|
|
self.max_steps = max_steps
|
|
self.window_size = window_size
|
|
self.risk_aversion = risk_aversion
|
|
self.price_scaling = price_scaling
|
|
self.reward_scaling = reward_scaling
|
|
self.episode_penalty = episode_penalty
|
|
|
|
# Preprocess data if needed
|
|
self._preprocess_data()
|
|
|
|
# Action space: 0 (BUY), 1 (SELL), 2 (HOLD)
|
|
self.action_space = spaces.Discrete(3)
|
|
|
|
# Observation space: price data, technical indicators, and account state
|
|
feature_dim = self.data.shape[1] + 3 # Adding position, equity, unrealized_pnl
|
|
self.observation_space = spaces.Box(
|
|
low=-np.inf,
|
|
high=np.inf,
|
|
shape=(feature_dim,),
|
|
dtype=np.float32
|
|
)
|
|
|
|
# Initialize state
|
|
self.reset()
|
|
|
|
def _preprocess_data(self):
|
|
"""Preprocess data - normalize or standardize features"""
|
|
# Store the original data for reference
|
|
self.original_data = self.data.copy()
|
|
|
|
# Normalize price data based on the selected method
|
|
if self.price_scaling == 'zscore':
|
|
# For each feature, apply z-score normalization
|
|
for col in self.data.columns:
|
|
if col in ['open', 'high', 'low', 'close']:
|
|
mean = self.data[col].mean()
|
|
std = self.data[col].std()
|
|
if std > 0:
|
|
self.data[col] = (self.data[col] - mean) / std
|
|
# Normalize volume separately
|
|
elif col == 'volume':
|
|
mean = self.data[col].mean()
|
|
std = self.data[col].std()
|
|
if std > 0:
|
|
self.data[col] = (self.data[col] - mean) / std
|
|
|
|
elif self.price_scaling == 'minmax':
|
|
# For each feature, apply min-max scaling
|
|
for col in self.data.columns:
|
|
min_val = self.data[col].min()
|
|
max_val = self.data[col].max()
|
|
if max_val > min_val:
|
|
self.data[col] = (self.data[col] - min_val) / (max_val - min_val)
|
|
|
|
def reset(self) -> np.ndarray:
|
|
"""Reset the environment to initial state"""
|
|
self.current_step = self.window_size
|
|
self.balance = self.initial_balance
|
|
self.position = 0 # 0: no position, 1: long position, -1: short position
|
|
self.entry_price = 0
|
|
self.entry_time = 0
|
|
self.total_trades = 0
|
|
self.winning_trades = 0
|
|
self.losing_trades = 0
|
|
self.total_pnl = 0
|
|
self.balance_history = [self.initial_balance]
|
|
self.equity_history = [self.initial_balance]
|
|
self.max_balance = self.initial_balance
|
|
self.max_drawdown = 0
|
|
|
|
# Trading performance metrics
|
|
self.trade_durations = [] # Track how long trades are held
|
|
self.returns = [] # Track returns of each trade
|
|
|
|
# For analyzing trade clustering
|
|
self.last_action_time = 0
|
|
self.actions_taken = []
|
|
|
|
return self._get_observation()
|
|
|
|
def _get_observation(self) -> np.ndarray:
|
|
"""Get current observation state with account information"""
|
|
# Get market data for the current step
|
|
market_data = self.data.iloc[self.current_step].values
|
|
|
|
# Get current price
|
|
current_price = self.original_data.iloc[self.current_step]['close']
|
|
|
|
# Calculate unrealized PnL
|
|
unrealized_pnl = 0
|
|
if self.position != 0:
|
|
price_diff = current_price - self.entry_price
|
|
unrealized_pnl = self.position * price_diff
|
|
|
|
# Calculate total equity (balance + unrealized PnL)
|
|
equity = self.balance + unrealized_pnl
|
|
|
|
# Normalize account state
|
|
normalized_position = self.position # -1, 0, or 1
|
|
normalized_equity = equity / self.initial_balance - 1.0 # Percent change from initial
|
|
normalized_unrealized_pnl = unrealized_pnl / self.initial_balance if self.initial_balance > 0 else 0
|
|
|
|
# Combine market data with account state
|
|
account_state = np.array([normalized_position, normalized_equity, normalized_unrealized_pnl])
|
|
observation = np.concatenate([market_data, account_state])
|
|
|
|
# Handle any NaN values
|
|
observation = np.nan_to_num(observation, nan=0.0)
|
|
|
|
return observation
|
|
|
|
def _calculate_reward(self, action: int) -> float:
|
|
"""
|
|
Calculate reward based on action and outcome with improved risk-adjusted metrics
|
|
|
|
Args:
|
|
action: The action taken (0=BUY, 1=SELL, 2=HOLD)
|
|
|
|
Returns:
|
|
float: Calculated reward value
|
|
"""
|
|
# Get current price and next price
|
|
current_price = self.original_data.iloc[self.current_step]['close']
|
|
|
|
# Default reward is slightly negative to discourage excessive trading
|
|
reward = -0.0001
|
|
pnl = 0.0
|
|
|
|
# Handle different actions based on current position
|
|
if self.position == 0: # No position
|
|
if action == 0: # BUY
|
|
self.position = 1
|
|
self.entry_price = current_price
|
|
self.entry_time = self.current_step
|
|
reward = -self.fee_rate # Small penalty for trading cost
|
|
|
|
elif action == 1: # SELL (start short position)
|
|
self.position = -1
|
|
self.entry_price = current_price
|
|
self.entry_time = self.current_step
|
|
reward = -self.fee_rate # Small penalty for trading cost
|
|
|
|
# else action == 2 (HOLD) - keep the small negative reward
|
|
|
|
elif self.position > 0: # Long position
|
|
if action == 1: # SELL (close long)
|
|
# Calculate profit/loss
|
|
price_diff = current_price - self.entry_price
|
|
pnl = price_diff / self.entry_price - 2 * self.fee_rate # Account for entry and exit fees
|
|
|
|
# Adjust reward based on PnL and risk
|
|
reward = pnl * self.reward_scaling
|
|
|
|
# Track trade performance
|
|
self.total_trades += 1
|
|
if pnl > 0:
|
|
self.winning_trades += 1
|
|
else:
|
|
self.losing_trades += 1
|
|
|
|
# Calculate trade duration
|
|
trade_duration = self.current_step - self.entry_time
|
|
self.trade_durations.append(trade_duration)
|
|
|
|
# Update returns list
|
|
self.returns.append(pnl)
|
|
|
|
# Update balance and reset position
|
|
self.balance *= (1 + pnl)
|
|
self.balance_history.append(self.balance)
|
|
self.max_balance = max(self.max_balance, self.balance)
|
|
self.total_pnl += pnl
|
|
|
|
# Reset position
|
|
self.position = 0
|
|
|
|
elif action == 0: # BUY (while already long)
|
|
# Penalize trying to increase an already active position
|
|
reward = -0.001
|
|
|
|
# else action == 2 (HOLD) - calculate unrealized P&L for reward
|
|
else:
|
|
price_diff = current_price - self.entry_price
|
|
unrealized_pnl = price_diff / self.entry_price
|
|
|
|
# Small reward/penalty based on unrealized P&L
|
|
reward = unrealized_pnl * 0.05 # Scale down to encourage holding good positions
|
|
|
|
elif self.position < 0: # Short position
|
|
if action == 0: # BUY (close short)
|
|
# Calculate profit/loss
|
|
price_diff = self.entry_price - current_price
|
|
pnl = price_diff / self.entry_price - 2 * self.fee_rate # Account for entry and exit fees
|
|
|
|
# Adjust reward based on PnL and risk
|
|
reward = pnl * self.reward_scaling
|
|
|
|
# Track trade performance
|
|
self.total_trades += 1
|
|
if pnl > 0:
|
|
self.winning_trades += 1
|
|
else:
|
|
self.losing_trades += 1
|
|
|
|
# Calculate trade duration
|
|
trade_duration = self.current_step - self.entry_time
|
|
self.trade_durations.append(trade_duration)
|
|
|
|
# Update returns list
|
|
self.returns.append(pnl)
|
|
|
|
# Update balance and reset position
|
|
self.balance *= (1 + pnl)
|
|
self.balance_history.append(self.balance)
|
|
self.max_balance = max(self.max_balance, self.balance)
|
|
self.total_pnl += pnl
|
|
|
|
# Reset position
|
|
self.position = 0
|
|
|
|
elif action == 1: # SELL (while already short)
|
|
# Penalize trying to increase an already active position
|
|
reward = -0.001
|
|
|
|
# else action == 2 (HOLD) - calculate unrealized P&L for reward
|
|
else:
|
|
price_diff = self.entry_price - current_price
|
|
unrealized_pnl = price_diff / self.entry_price
|
|
|
|
# Small reward/penalty based on unrealized P&L
|
|
reward = unrealized_pnl * 0.05 # Scale down to encourage holding good positions
|
|
|
|
# Record the action
|
|
self.actions_taken.append(action)
|
|
self.last_action_time = self.current_step
|
|
|
|
# Update equity history (balance + unrealized P&L)
|
|
current_equity = self.balance
|
|
if self.position != 0:
|
|
# Calculate unrealized P&L
|
|
if self.position > 0: # Long
|
|
price_diff = current_price - self.entry_price
|
|
unrealized_pnl = price_diff / self.entry_price * self.balance
|
|
else: # Short
|
|
price_diff = self.entry_price - current_price
|
|
unrealized_pnl = price_diff / self.entry_price * self.balance
|
|
|
|
current_equity = self.balance + unrealized_pnl
|
|
|
|
self.equity_history.append(current_equity)
|
|
|
|
# Calculate current drawdown
|
|
peak_equity = max(self.equity_history)
|
|
current_drawdown = (peak_equity - current_equity) / peak_equity if peak_equity > 0 else 0
|
|
self.max_drawdown = max(self.max_drawdown, current_drawdown)
|
|
|
|
# Apply risk aversion factor - penalize volatility
|
|
if len(self.returns) > 1:
|
|
returns_std = np.std(self.returns)
|
|
reward -= returns_std * self.risk_aversion
|
|
|
|
return reward, pnl
|
|
|
|
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
|
|
"""Execute one step in the environment"""
|
|
# Calculate reward and update state
|
|
reward, pnl = self._calculate_reward(action)
|
|
|
|
# Move to next step
|
|
self.current_step += 1
|
|
|
|
# Check if episode is done
|
|
done = self.current_step >= min(self.max_steps - 1, len(self.data) - 1)
|
|
|
|
# Apply penalty if episode ends with open position
|
|
if done and self.position != 0:
|
|
reward -= self.episode_penalty
|
|
|
|
# Force close the position at the end if still open
|
|
current_price = self.original_data.iloc[self.current_step]['close']
|
|
if self.position > 0: # Long position
|
|
price_diff = current_price - self.entry_price
|
|
pnl = price_diff / self.entry_price - 2 * self.fee_rate
|
|
else: # Short position
|
|
price_diff = self.entry_price - current_price
|
|
pnl = price_diff / self.entry_price - 2 * self.fee_rate
|
|
|
|
# Update balance
|
|
self.balance *= (1 + pnl)
|
|
self.total_pnl += pnl
|
|
|
|
# Track trade
|
|
self.total_trades += 1
|
|
if pnl > 0:
|
|
self.winning_trades += 1
|
|
else:
|
|
self.losing_trades += 1
|
|
|
|
# Reset position
|
|
self.position = 0
|
|
|
|
# Get next observation
|
|
observation = self._get_observation()
|
|
|
|
# Calculate sharpe ratio and sortino ratio if possible
|
|
sharpe_ratio = 0
|
|
sortino_ratio = 0
|
|
win_rate = self.winning_trades / max(1, self.total_trades)
|
|
|
|
if len(self.returns) > 1:
|
|
mean_return = np.mean(self.returns)
|
|
std_return = np.std(self.returns)
|
|
if std_return > 0:
|
|
sharpe_ratio = mean_return / std_return
|
|
|
|
# For sortino, we only consider downside deviation
|
|
downside_returns = [r for r in self.returns if r < 0]
|
|
if downside_returns:
|
|
downside_deviation = np.std(downside_returns)
|
|
if downside_deviation > 0:
|
|
sortino_ratio = mean_return / downside_deviation
|
|
|
|
# Calculate average trade duration
|
|
avg_trade_duration = np.mean(self.trade_durations) if self.trade_durations else 0
|
|
|
|
# Additional info
|
|
info = {
|
|
'balance': self.balance,
|
|
'position': self.position,
|
|
'total_trades': self.total_trades,
|
|
'win_rate': win_rate,
|
|
'total_pnl': self.total_pnl,
|
|
'max_drawdown': self.max_drawdown,
|
|
'sharpe_ratio': sharpe_ratio,
|
|
'sortino_ratio': sortino_ratio,
|
|
'avg_trade_duration': avg_trade_duration,
|
|
'pnl': pnl,
|
|
'gain': (self.balance - self.initial_balance) / self.initial_balance
|
|
}
|
|
|
|
return observation, reward, done, info
|
|
|
|
def render(self, mode='human'):
|
|
"""Render the environment"""
|
|
if mode == 'human':
|
|
print(f"Step: {self.current_step}")
|
|
print(f"Balance: ${self.balance:.2f}")
|
|
print(f"Position: {self.position}")
|
|
print(f"Total Trades: {self.total_trades}")
|
|
print(f"Win Rate: {self.winning_trades/max(1, self.total_trades):.2%}")
|
|
print(f"Total PnL: ${self.total_pnl:.2f}")
|
|
print(f"Max Drawdown: {self.max_drawdown:.2%}")
|
|
print(f"Sharpe Ratio: {self._calculate_sharpe_ratio():.4f}")
|
|
print("-" * 50)
|
|
|
|
def _calculate_sharpe_ratio(self):
|
|
"""Calculate Sharpe ratio from returns"""
|
|
if len(self.returns) < 2:
|
|
return 0.0
|
|
|
|
mean_return = np.mean(self.returns)
|
|
std_return = np.std(self.returns)
|
|
|
|
if std_return == 0:
|
|
return 0.0
|
|
|
|
return mean_return / std_return |