220 lines
9.7 KiB
Python
220 lines
9.7 KiB
Python
"""
|
|
Improved Reward Function for RL Trading Agent
|
|
|
|
This module provides a more sophisticated reward function for the RL trading agent
|
|
that incorporates realistic trading fees, penalties for excessive trading, and
|
|
rewards for successful holding of positions.
|
|
"""
|
|
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from collections import deque
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RewardCalculator:
|
|
def __init__(self, base_fee_rate=0.001, reward_scaling=10.0, risk_aversion=0.1):
|
|
self.base_fee_rate = base_fee_rate
|
|
self.reward_scaling = reward_scaling
|
|
self.risk_aversion = risk_aversion
|
|
self.trade_pnls = []
|
|
self.returns = []
|
|
self.trade_timestamps = []
|
|
self.frequency_threshold = 10 # Trades per minute threshold for penalty
|
|
self.max_frequency_penalty = 0.05
|
|
|
|
def record_pnl(self, pnl):
|
|
"""Record P&L for risk adjustment calculations"""
|
|
self.trade_pnls.append(pnl)
|
|
if len(self.trade_pnls) > 100:
|
|
self.trade_pnls.pop(0)
|
|
|
|
def record_trade(self, action):
|
|
"""Record trade action for frequency penalty calculations"""
|
|
from time import time
|
|
self.trade_timestamps.append(time())
|
|
if len(self.trade_timestamps) > 100:
|
|
self.trade_timestamps.pop(0)
|
|
|
|
def _calculate_frequency_penalty(self):
|
|
"""Calculate penalty for high-frequency trading"""
|
|
if len(self.trade_timestamps) < 2:
|
|
return 0.0
|
|
time_span = self.trade_timestamps[-1] - self.trade_timestamps[0]
|
|
if time_span <= 0:
|
|
return 0.0
|
|
trades_per_minute = (len(self.trade_timestamps) / time_span) * 60
|
|
if trades_per_minute > self.frequency_threshold:
|
|
penalty = min(self.max_frequency_penalty, (trades_per_minute - self.frequency_threshold) * 0.001)
|
|
return penalty
|
|
return 0.0
|
|
|
|
def _calculate_risk_adjustment(self, reward):
|
|
"""Adjust rewards based on risk (simple Sharpe ratio implementation)"""
|
|
if len(self.trade_pnls) < 5:
|
|
return reward
|
|
pnl_array = np.array(self.trade_pnls)
|
|
mean_return = np.mean(pnl_array)
|
|
std_return = np.std(pnl_array)
|
|
if std_return == 0:
|
|
return reward
|
|
sharpe = mean_return / std_return
|
|
adjustment_factor = np.clip(1.0 + 0.5 * sharpe, 0.5, 2.0)
|
|
return reward * adjustment_factor
|
|
|
|
def _calculate_holding_reward(self, position_held_time, price_change):
|
|
"""Calculate reward for holding a position"""
|
|
base_holding_reward = 0.0005 * (position_held_time / 60.0)
|
|
if price_change > 0:
|
|
return base_holding_reward * 2
|
|
elif price_change < 0:
|
|
return base_holding_reward * 0.5
|
|
return base_holding_reward
|
|
|
|
def calculate_basic_reward(self, pnl, confidence):
|
|
"""Calculate basic training reward based on P&L and confidence"""
|
|
try:
|
|
base_reward = pnl
|
|
if pnl < 0 and confidence > 0.7:
|
|
confidence_adjustment = -confidence * 2
|
|
elif pnl > 0 and confidence > 0.7:
|
|
confidence_adjustment = confidence * 1.5
|
|
else:
|
|
confidence_adjustment = 0
|
|
final_reward = base_reward + confidence_adjustment
|
|
normalized_reward = np.tanh(final_reward / 10.0)
|
|
logger.debug(f"Basic reward calculation: P&L={pnl:.4f}, confidence={confidence:.2f}, reward={normalized_reward:.4f}")
|
|
return float(normalized_reward)
|
|
except Exception as e:
|
|
logger.error(f"Error calculating basic reward: {e}")
|
|
return 0.0
|
|
|
|
def calculate_enhanced_reward(self, action, price_change, position_held_time=0, volatility=None, is_profitable=False, confidence=0.0, predicted_change=0.0, actual_change=0.0, current_pnl=0.0, symbol='UNKNOWN'):
|
|
"""Calculate enhanced reward for trading actions with shifted neutral point
|
|
|
|
Neutral reward is shifted to require profits that exceed double the fees,
|
|
which penalizes small profit trades and encourages holding for larger moves.
|
|
Current PnL is given more weight in the decision-making process.
|
|
"""
|
|
fee = self.base_fee_rate
|
|
double_fee = fee * 4 # Double the fees (2x open + 2x close = 4x base fee)
|
|
frequency_penalty = self._calculate_frequency_penalty()
|
|
|
|
if action == 0: # Buy
|
|
# Penalize buying more when already in profit
|
|
reward = -fee - frequency_penalty
|
|
if current_pnl > 0:
|
|
# Reduce incentive to close profitable positions
|
|
reward -= current_pnl * 0.2
|
|
elif action == 1: # Sell
|
|
profit_pct = price_change
|
|
|
|
# Shift neutral point - require profit > double fees to be considered positive
|
|
net_profit = profit_pct - double_fee
|
|
|
|
# Scale reward based on profit size
|
|
if net_profit > 0:
|
|
# Exponential reward for larger profits
|
|
reward = (net_profit ** 1.5) * self.reward_scaling
|
|
else:
|
|
# Linear penalty for losses
|
|
reward = net_profit * self.reward_scaling
|
|
|
|
reward -= frequency_penalty
|
|
self.record_pnl(net_profit)
|
|
|
|
# Add extra penalty for very small profits (less than 3x fees)
|
|
if 0 < profit_pct < (fee * 6):
|
|
reward -= 0.5 # Discourage tiny profit-taking
|
|
else: # Hold
|
|
if is_profitable:
|
|
# Increase reward for holding profitable positions
|
|
profit_factor = min(5.0, current_pnl * 20) # Cap at 5x
|
|
reward = self._calculate_holding_reward(position_held_time, price_change) * (1.0 + profit_factor)
|
|
|
|
# Add bonus for holding through volatility when profitable
|
|
if volatility is not None and volatility > 0.001:
|
|
reward += 0.1 * volatility * 100
|
|
else:
|
|
# Small penalty for holding losing positions
|
|
loss_factor = min(1.0, abs(current_pnl) * 10)
|
|
reward = -0.0001 * (1.0 + loss_factor)
|
|
|
|
# But reduce penalty for very recent positions (give them time)
|
|
if position_held_time < 30: # Less than 30 seconds
|
|
reward *= 0.5
|
|
|
|
# Prediction accuracy reward component
|
|
if action in [0, 1] and predicted_change != 0:
|
|
if (action == 0 and actual_change > 0) or (action == 1 and actual_change < 0):
|
|
reward += abs(actual_change) * 5.0
|
|
else:
|
|
reward -= abs(predicted_change) * 2.0
|
|
|
|
# Increase weight of current PnL in decision making (3x more than before)
|
|
reward += current_pnl * 0.3
|
|
|
|
# Volatility penalty
|
|
if volatility is not None:
|
|
reward -= abs(volatility) * 100
|
|
|
|
# Risk adjustment
|
|
if self.risk_aversion > 0 and len(self.returns) > 1:
|
|
returns_std = np.std(self.returns)
|
|
reward -= returns_std * self.risk_aversion
|
|
|
|
self.record_trade(action)
|
|
return reward
|
|
|
|
def calculate_prediction_reward(self, symbol, predicted_direction, actual_direction, confidence, predicted_change, actual_change, current_pnl=0.0, position_duration=0.0):
|
|
"""Calculate reward for prediction accuracy"""
|
|
reward = 0.0
|
|
if predicted_direction == actual_direction:
|
|
reward += 1.0 * confidence
|
|
else:
|
|
reward -= 0.5
|
|
if predicted_direction == actual_direction and abs(predicted_change) > 0.001:
|
|
reward += abs(actual_change) * 5.0
|
|
if predicted_direction != actual_direction and abs(predicted_change) > 0.001:
|
|
reward -= abs(predicted_change) * 2.0
|
|
reward += current_pnl * 0.1
|
|
# Dynamic adjustment based on recent PnL (loss cutting incentive)
|
|
if hasattr(self, 'pnl_history') and symbol in self.pnl_history and self.pnl_history[symbol]:
|
|
latest_pnl_entry = self.pnl_history[symbol][-1]
|
|
latest_pnl_value = latest_pnl_entry.get('pnl', 0.0) if isinstance(latest_pnl_entry, dict) else 0.0
|
|
if latest_pnl_value < 0 and position_duration > 60:
|
|
reward -= (abs(latest_pnl_value) * 0.2)
|
|
pnl_values = [entry.get('pnl', 0.0) for entry in self.pnl_history[symbol] if isinstance(entry, dict)]
|
|
best_pnl = max(pnl_values) if pnl_values else 0.0
|
|
if best_pnl < 0.0:
|
|
reward -= 0.1
|
|
return reward
|
|
|
|
# Example usage:
|
|
if __name__ == "__main__":
|
|
# Create calculator instance
|
|
reward_calc = RewardCalculator()
|
|
|
|
# Example reward for a buy action
|
|
buy_reward = reward_calc.calculate_enhanced_reward(action=0, price_change=0)
|
|
print(f"Buy action reward: {buy_reward:.5f}")
|
|
|
|
# Record a trade for frequency tracking
|
|
reward_calc.record_trade(0)
|
|
|
|
# Wait a bit and make another trade to test frequency penalty
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
# Example reward for a sell action with profit
|
|
sell_reward = reward_calc.calculate_enhanced_reward(action=1, price_change=0.015, position_held_time=60)
|
|
print(f"Sell action reward (with profit): {sell_reward:.5f}")
|
|
|
|
# Example reward for a hold action on profitable position
|
|
hold_reward = reward_calc.calculate_enhanced_reward(action=2, price_change=0.01, position_held_time=30, is_profitable=True)
|
|
print(f"Hold action reward (profitable): {hold_reward:.5f}")
|
|
|
|
# Example reward for a hold action on unprofitable position
|
|
hold_reward_neg = reward_calc.calculate_enhanced_reward(action=2, price_change=-0.01, position_held_time=30, is_profitable=False)
|
|
print(f"Hold action reward (unprofitable): {hold_reward_neg:.5f}") |