artificially doule fees to promote more profitable trades

This commit is contained in:
Dobromir Popov
2025-07-17 19:22:35 +03:00
parent 6d55061e86
commit 26d440f772
3 changed files with 440 additions and 52 deletions

View File

@ -40,12 +40,40 @@ class Position:
order_id: str
unrealized_pnl: float = 0.0
def calculate_pnl(self, current_price: float) -> float:
"""Calculate unrealized P&L for the position"""
def calculate_pnl(self, current_price: float, leverage: float = 1.0, include_fees: bool = True) -> float:
"""Calculate unrealized P&L for the position with leverage and fees
Args:
current_price: Current market price
leverage: Leverage multiplier (default: 1.0)
include_fees: Whether to subtract fees from PnL (default: True)
Returns:
float: Unrealized PnL including leverage and fees
"""
# Calculate position value
position_value = self.entry_price * self.quantity
# Calculate base PnL
if self.side == 'LONG':
self.unrealized_pnl = (current_price - self.entry_price) * self.quantity
base_pnl = (current_price - self.entry_price) * self.quantity
else: # SHORT
self.unrealized_pnl = (self.entry_price - current_price) * self.quantity
base_pnl = (self.entry_price - current_price) * self.quantity
# Apply leverage
leveraged_pnl = base_pnl * leverage
# Calculate fees (0.1% open + 0.1% close = 0.2% total)
fees = 0.0
if include_fees:
# Open fee already paid
open_fee = position_value * 0.001
# Close fee will be paid when position is closed
close_fee = (current_price * self.quantity) * 0.001
fees = open_fee + close_fee
# Final PnL after fees
self.unrealized_pnl = leveraged_pnl - fees
return self.unrealized_pnl
@dataclass
@ -62,6 +90,10 @@ class TradeRecord:
fees: float
confidence: float
hold_time_seconds: float = 0.0 # Hold time in seconds
leverage: float = 1.0 # Leverage used for the trade
position_size_usd: float = 0.0 # Position size in USD
gross_pnl: float = 0.0 # PnL before fees
net_pnl: float = 0.0 # PnL after fees
class TradingExecutor:
"""Handles trade execution through multiple exchange APIs with risk management"""
@ -79,19 +111,22 @@ class TradingExecutor:
# Set primary exchange as main interface
self.exchange = self.primary_exchange
# Get primary exchange name and config first
primary_name = self.exchanges_config.get('primary', 'deribit')
primary_config = self.exchanges_config.get(primary_name, {})
# Determine trading and simulation modes
trading_mode = primary_config.get('trading_mode', 'simulation')
self.trading_enabled = self.trading_config.get('enabled', True)
self.simulation_mode = trading_mode == 'simulation'
if not self.exchange:
logger.error("Failed to initialize primary exchange")
self.trading_enabled = False
self.simulation_mode = True
if self.simulation_mode:
logger.info("Failed to initialize primary exchange, but simulation mode is enabled - trading allowed")
else:
logger.error("Failed to initialize primary exchange and not in simulation mode - trading disabled")
self.trading_enabled = False
else:
primary_name = self.exchanges_config.get('primary', 'deribit')
primary_config = self.exchanges_config.get(primary_name, {})
# Determine trading and simulation modes
trading_mode = primary_config.get('trading_mode', 'simulation')
self.trading_enabled = self.trading_config.get('enabled', True)
self.simulation_mode = trading_mode == 'simulation'
logger.info(f"Trading Executor initialized with {primary_name} as primary exchange")
logger.info(f"Trading mode: {trading_mode}, Simulation: {self.simulation_mode}")
@ -130,7 +165,19 @@ class TradingExecutor:
self.positions = {} # symbol -> Position object
self.trade_records = [] # List of TradeRecord objects
# Simulation balance tracking
self.simulation_balance = self.trading_config.get('simulation_account_usd', 100.0)
self.simulation_positions = {} # symbol -> position data with real entry prices
# Trading fees configuration (0.1% for both open and close)
self.trading_fees = {
'open_fee_percent': 0.001, # 0.1% fee when opening position
'close_fee_percent': 0.001, # 0.1% fee when closing position
'total_round_trip_fee': 0.002 # 0.2% total for round trip
}
logger.info(f"TradingExecutor initialized - Trading: {self.trading_enabled}, Mode: {self.trading_mode}")
logger.info(f"Simulation balance: ${self.simulation_balance:.2f}")
# Legacy compatibility (deprecated)
self.dry_run = self.simulation_mode
@ -152,10 +199,13 @@ class TradingExecutor:
# Connect to exchange
if self.trading_enabled:
logger.info("TRADING EXECUTOR: Attempting to connect to exchange...")
if not self._connect_exchange():
logger.error("TRADING EXECUTOR: Failed initial exchange connection. Trading will be disabled.")
self.trading_enabled = False
if self.simulation_mode:
logger.info("TRADING EXECUTOR: Simulation mode - trading enabled without exchange connection")
else:
logger.info("TRADING EXECUTOR: Attempting to connect to exchange...")
if not self._connect_exchange():
logger.error("TRADING EXECUTOR: Failed initial exchange connection. Trading will be disabled.")
self.trading_enabled = False
else:
logger.info("TRADING EXECUTOR: Trading is explicitly disabled in config.")
@ -210,6 +260,67 @@ class TradingExecutor:
logger.error(f"Error calling {method_name}: {e}")
return None
def _get_real_current_price(self, symbol: str) -> Optional[float]:
"""Get real current price from data provider - NEVER use simulated data"""
try:
# Try to get from data provider first (most reliable)
from core.data_provider import DataProvider
data_provider = DataProvider()
# Try multiple timeframes to get the most recent price
for timeframe in ['1m', '5m', '1h']:
try:
df = data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True)
if df is not None and not df.empty:
price = float(df['close'].iloc[-1])
if price > 0:
logger.debug(f"Got real price for {symbol} from {timeframe}: ${price:.2f}")
return price
except Exception as tf_error:
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
continue
# Try exchange ticker if available
if self.exchange:
try:
ticker = self.exchange.get_ticker(symbol)
if ticker and 'last' in ticker:
price = float(ticker['last'])
if price > 0:
logger.debug(f"Got real price for {symbol} from exchange: ${price:.2f}")
return price
except Exception as ex_error:
logger.debug(f"Failed to get price from exchange: {ex_error}")
# Try external API as last resort
try:
import requests
if symbol == 'ETH/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}")
return price
elif symbol == 'BTC/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}")
return price
except Exception as api_error:
logger.debug(f"Failed to get price from external API: {api_error}")
logger.error(f"Failed to get real current price for {symbol} from all sources")
return None
except Exception as e:
logger.error(f"Error getting real current price for {symbol}: {e}")
return None
def _connect_exchange(self) -> bool:
"""Connect to the primary exchange"""
if not self.exchange:
@ -250,11 +361,11 @@ class TradingExecutor:
# Get current price if not provided
if current_price is None:
ticker = self.exchange.get_ticker(symbol)
if not ticker or 'last' not in ticker:
logger.error(f"Failed to get current price for {symbol} or ticker is malformed.")
# Always get real current price - never use simulated data
current_price = self._get_real_current_price(symbol)
if current_price is None:
logger.error(f"Failed to get real current price for {symbol}")
return False
current_price = ticker['last']
# Assert that current_price is not None for type checking
assert current_price is not None, "current_price should not be None at this point"
@ -961,7 +1072,22 @@ class TradingExecutor:
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
# Get current leverage setting
leverage = self.trading_config.get('leverage', 1.0)
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
if position.side == 'SHORT':
gross_pnl = (position.entry_price - current_price) * position.quantity * leverage
else: # LONG
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - simulated_fees
# Create trade record with enhanced PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
@ -970,10 +1096,14 @@ class TradingExecutor:
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=simulated_fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
@ -1033,7 +1163,22 @@ class TradingExecutor:
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
# Get current leverage setting
leverage = self.trading_config.get('leverage', 1.0)
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
if position.side == 'SHORT':
gross_pnl = (position.entry_price - current_price) * position.quantity * leverage
else: # LONG
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - fees
# Create trade record with enhanced PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
@ -1042,10 +1187,14 @@ class TradingExecutor:
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl - fees,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
@ -1243,7 +1392,7 @@ class TradingExecutor:
def _get_account_balance_for_sizing(self) -> float:
"""Get account balance for position sizing calculations"""
if self.simulation_mode:
return self.mexc_config.get('simulation_account_usd', 100.0)
return self.simulation_balance
else:
# For live trading, get actual USDT/USDC balance
try:
@ -1253,7 +1402,179 @@ class TradingExecutor:
return max(usdt_balance, usdc_balance)
except Exception as e:
logger.warning(f"Failed to get live account balance: {e}, using simulation default")
return self.mexc_config.get('simulation_account_usd', 100.0)
return self.simulation_balance
def _calculate_pnl_with_fees(self, entry_price: float, exit_price: float, quantity: float, side: str) -> Dict[str, float]:
"""Calculate PnL including trading fees (0.1% open + 0.1% close = 0.2% total)"""
try:
# Calculate position value
position_value = entry_price * quantity
# Calculate fees
open_fee = position_value * self.trading_fees['open_fee_percent']
close_fee = (exit_price * quantity) * self.trading_fees['close_fee_percent']
total_fees = open_fee + close_fee
# Calculate gross PnL (before fees)
if side.upper() == 'LONG':
gross_pnl = (exit_price - entry_price) * quantity
else: # SHORT
gross_pnl = (entry_price - exit_price) * quantity
# Calculate net PnL (after fees)
net_pnl = gross_pnl - total_fees
# Calculate percentage returns
gross_pnl_percent = (gross_pnl / position_value) * 100
net_pnl_percent = (net_pnl / position_value) * 100
fee_percent = (total_fees / position_value) * 100
return {
'gross_pnl': gross_pnl,
'net_pnl': net_pnl,
'total_fees': total_fees,
'open_fee': open_fee,
'close_fee': close_fee,
'gross_pnl_percent': gross_pnl_percent,
'net_pnl_percent': net_pnl_percent,
'fee_percent': fee_percent,
'position_value': position_value
}
except Exception as e:
logger.error(f"Error calculating PnL with fees: {e}")
return {
'gross_pnl': 0.0,
'net_pnl': 0.0,
'total_fees': 0.0,
'open_fee': 0.0,
'close_fee': 0.0,
'gross_pnl_percent': 0.0,
'net_pnl_percent': 0.0,
'fee_percent': 0.0,
'position_value': 0.0
}
def _calculate_pivot_points(self, symbol: str) -> Dict[str, float]:
"""Calculate pivot points for the symbol using real market data"""
try:
from core.data_provider import DataProvider
data_provider = DataProvider()
# Get daily data for pivot calculation
df = data_provider.get_historical_data(symbol, '1d', limit=2, refresh=True)
if df is None or len(df) < 2:
logger.warning(f"Insufficient data for pivot calculation for {symbol}")
return {}
# Use previous day's data for pivot calculation
prev_day = df.iloc[-2]
high = float(prev_day['high'])
low = float(prev_day['low'])
close = float(prev_day['close'])
# Calculate pivot point
pivot = (high + low + close) / 3
# Calculate support and resistance levels
r1 = (2 * pivot) - low
s1 = (2 * pivot) - high
r2 = pivot + (high - low)
s2 = pivot - (high - low)
r3 = high + 2 * (pivot - low)
s3 = low - 2 * (high - pivot)
pivots = {
'pivot': pivot,
'r1': r1, 'r2': r2, 'r3': r3,
's1': s1, 's2': s2, 's3': s3,
'prev_high': high,
'prev_low': low,
'prev_close': close
}
logger.debug(f"Pivot points for {symbol}: P={pivot:.2f}, R1={r1:.2f}, S1={s1:.2f}")
return pivots
except Exception as e:
logger.error(f"Error calculating pivot points for {symbol}: {e}")
return {}
def _get_pivot_signal_strength(self, symbol: str, current_price: float, action: str) -> float:
"""Get signal strength based on proximity to pivot points"""
try:
pivots = self._calculate_pivot_points(symbol)
if not pivots:
return 1.0 # Default strength if no pivots available
pivot = pivots['pivot']
r1, r2, r3 = pivots['r1'], pivots['r2'], pivots['r3']
s1, s2, s3 = pivots['s1'], pivots['s2'], pivots['s3']
# Calculate distance to nearest pivot levels
distances = {
'pivot': abs(current_price - pivot),
'r1': abs(current_price - r1),
'r2': abs(current_price - r2),
'r3': abs(current_price - r3),
's1': abs(current_price - s1),
's2': abs(current_price - s2),
's3': abs(current_price - s3)
}
# Find nearest level
nearest_level = min(distances.keys(), key=lambda k: distances[k])
nearest_distance = distances[nearest_level]
nearest_price = pivots[nearest_level]
# Calculate signal strength based on action and pivot context
strength = 1.0
if action == 'BUY':
# Stronger buy signals near support levels
if nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price:
strength = 1.5 # Boost buy signals at support
elif nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price:
strength = 0.7 # Reduce buy signals at resistance
elif action == 'SELL':
# Stronger sell signals near resistance levels
if nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price:
strength = 1.5 # Boost sell signals at resistance
elif nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price:
strength = 0.7 # Reduce sell signals at support
logger.debug(f"Pivot signal strength for {symbol} {action}: {strength:.2f} "
f"(near {nearest_level} at ${nearest_price:.2f}, current ${current_price:.2f})")
return strength
except Exception as e:
logger.error(f"Error calculating pivot signal strength: {e}")
return 1.0
def _get_current_price_from_data_provider(self, symbol: str) -> Optional[float]:
"""Get current price from data provider for most up-to-date information"""
try:
from core.data_provider import DataProvider
data_provider = DataProvider()
# Try to get real-time price first
current_price = data_provider.get_current_price(symbol)
if current_price and current_price > 0:
return float(current_price)
# Fallback to latest 1m candle
df = data_provider.get_historical_data(symbol, '1m', limit=1, refresh=True)
if df is not None and len(df) > 0:
return float(df.iloc[-1]['close'])
logger.warning(f"Could not get current price for {symbol} from data provider")
return None
except Exception as e:
logger.error(f"Error getting current price from data provider for {symbol}: {e}")
return None
def _check_position_size_limit(self) -> bool:
"""Check if total open position value exceeds the maximum allowed percentage of balance"""
@ -1272,8 +1593,12 @@ class TradingExecutor:
for symbol, position in self.positions.items():
# Get current price for the symbol
try:
ticker = self.exchange.get_ticker(symbol) if self.exchange else None
current_price = ticker['last'] if ticker and 'last' in ticker else position.entry_price
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
current_price = ticker['last'] if ticker and 'last' in ticker else position.entry_price
else:
# Simulation mode - use entry price or default
current_price = position.entry_price
except Exception:
# Fallback to entry price if we can't get current price
current_price = position.entry_price
@ -1393,9 +1718,13 @@ class TradingExecutor:
if not self.dry_run:
for symbol, position in self.positions.items():
try:
ticker = self.exchange.get_ticker(symbol)
if ticker:
self._execute_sell(symbol, 1.0, ticker['last'])
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
if ticker:
self._execute_sell(symbol, 1.0, ticker['last'])
else:
# Simulation mode - use entry price for closing
self._execute_sell(symbol, 1.0, position.entry_price)
except Exception as e:
logger.error(f"Error closing position {symbol} during emergency stop: {e}")
@ -1746,11 +2075,10 @@ class TradingExecutor:
try:
# Get current price
current_price = None
ticker = self.exchange.get_ticker(symbol)
if ticker:
current_price = ticker['last']
else:
logger.error(f"Failed to get current price for {symbol}")
# Always get real current price - never use simulated data
current_price = self._get_real_current_price(symbol)
if current_price is None:
logger.error(f"Failed to get real current price for {symbol}")
return False
# Calculate confidence based on manual trade (high confidence)
@ -2015,9 +2343,13 @@ class TradingExecutor:
def _get_current_price_for_sync(self, symbol: str) -> Optional[float]:
"""Get current price for position synchronization"""
try:
ticker = self.exchange.get_ticker(symbol)
if ticker and 'last' in ticker:
return float(ticker['last'])
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
if ticker and 'last' in ticker:
return float(ticker['last'])
else:
# Get real current price - never use simulated data
return self._get_real_current_price(symbol)
return None
except Exception as e:
logger.error(f"Error getting current price for sync: {e}")