""" Trading Executor for MEXC API Integration This module handles the execution of trading signals through the MEXC exchange API. It includes position management, risk controls, and safety features. https://github.com/mexcdevelop/mexc-api-postman/blob/main/MEXC%20V3.postman_collection.json MEXC V3.postman_collection.json """ import logging import time import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass from threading import Lock, RLock import threading import time import sys # Add NN directory to path for exchange interfaces sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'NN')) from core.exchanges.exchange_factory import ExchangeFactory from core.exchanges.exchange_interface import ExchangeInterface from .config import get_config from .config_sync import ConfigSynchronizer logger = logging.getLogger(__name__) @dataclass class Position: """Represents an open trading position""" symbol: str side: str # 'LONG' or 'SHORT' quantity: float entry_price: float entry_time: datetime order_id: str unrealized_pnl: float = 0.0 def calculate_pnl(self, current_price: float, leverage: float = 1.0, include_fees: bool = True, leverage_applied_by_exchange: bool = False) -> float: """Calculate unrealized P&L for the position with leverage and fees Args: current_price: Current market price leverage: Leverage multiplier (default: 1.0) include_fees: Whether to subtract fees from PnL (default: True) leverage_applied_by_exchange: Whether leverage is already applied by broker (default: False) Returns: float: Unrealized PnL including leverage and fees """ # Calculate position value position_value = self.entry_price * self.quantity # Calculate base PnL if self.side == 'LONG': base_pnl = (current_price - self.entry_price) * self.quantity else: # SHORT base_pnl = (self.entry_price - current_price) * self.quantity # Apply leverage only if not already applied by exchange if leverage_applied_by_exchange: # Broker already applies leverage, so use base PnL leveraged_pnl = base_pnl else: # Apply leverage locally leveraged_pnl = base_pnl * leverage # Calculate fees (0.1% open + 0.1% close = 0.2% total) fees = 0.0 if include_fees: # Open fee already paid open_fee = position_value * 0.001 # Close fee will be paid when position is closed close_fee = (current_price * self.quantity) * 0.001 fees = open_fee + close_fee # Final PnL after fees self.unrealized_pnl = leveraged_pnl - fees return self.unrealized_pnl @dataclass class TradeRecord: """Record of a completed trade""" symbol: str side: str quantity: float entry_price: float exit_price: float entry_time: datetime exit_time: datetime pnl: float fees: float confidence: float hold_time_seconds: float = 0.0 # Hold time in seconds leverage: float = 1.0 # Leverage used for the trade position_size_usd: float = 0.0 # Position size in USD gross_pnl: float = 0.0 # PnL before fees net_pnl: float = 0.0 # PnL after fees class TradingExecutor: """Handles trade execution through multiple exchange APIs with risk management""" def __init__(self, config_path: str = "config.yaml"): """Initialize the trading executor""" self.config = get_config(config_path) self.exchanges_config = self.config.get('exchanges', {}) self.trading_config = self.config.get('trading', {}) # Initialize exchanges using factory self.primary_exchange = ExchangeFactory.get_primary_exchange(self.exchanges_config) self.all_exchanges = ExchangeFactory.create_multiple_exchanges(self.exchanges_config) # Set primary exchange as main interface self.exchange = self.primary_exchange # Get primary exchange name and config first primary_name = self.exchanges_config.get('primary', 'deribit') primary_config = self.exchanges_config.get(primary_name, {}) # Determine trading and simulation modes trading_mode = primary_config.get('trading_mode', 'simulation') self.trading_enabled = self.trading_config.get('enabled', True) self.simulation_mode = trading_mode == 'simulation' if not self.exchange: if self.simulation_mode: logger.info("Failed to initialize primary exchange, but simulation mode is enabled - trading allowed") else: logger.error("Failed to initialize primary exchange and not in simulation mode - trading disabled") self.trading_enabled = False else: logger.info(f"Trading Executor initialized with {primary_name} as primary exchange") logger.info(f"Trading mode: {trading_mode}, Simulation: {self.simulation_mode}") # Get primary exchange name and config self.primary_name = self.exchanges_config.get('primary', 'mexc') self.primary_config = self.exchanges_config.get(self.primary_name, {}) # Set exchange config for compatibility (replaces mexc_config) self.exchange_config = self.primary_config self.mexc_config = self.primary_config # Legacy compatibility # Initialize config synchronizer with the primary exchange self.config_sync = ConfigSynchronizer( config_path=config_path, mexc_interface=self.exchange if (self.trading_enabled and self.primary_name == 'mexc') else None ) # Trading state management self.current_position = {} # symbol -> position data self.trade_history = [] self.daily_trades = 0 self.daily_pnl = 0.0 self.daily_loss = 0.0 self.last_trade_time = {} self.consecutive_losses = 0 # Track consecutive losing trades # Store trading mode for compatibility self.trading_mode = self.primary_config.get('trading_mode', 'simulation') # Safety feature: Auto-disable live trading after consecutive losses self.max_consecutive_losses = 5 # Disable live trading after 5 consecutive losses self.min_success_rate_to_reenable = 0.55 # Require 55% success rate to re-enable self.trades_to_evaluate = 20 # Evaluate last 20 trades for success rate self.original_trading_mode = self.trading_mode # Store original mode self.safety_triggered = False # Track if safety feature was triggered # Initialize session stats self.session_start_time = datetime.now() self.session_trades = 0 self.session_pnl = 0.0 # Position tracking self.positions = {} # symbol -> Position object self.trade_records = [] # List of TradeRecord objects # Simulation balance tracking self.simulation_balance = self.trading_config.get('simulation_account_usd', 100.0) self.simulation_positions = {} # symbol -> position data with real entry prices # Trading fees configuration (0.1% for both open and close - REVERTED TO NORMAL) self.trading_fees = { 'open_fee_percent': 0.001, # 0.1% fee when opening position 'close_fee_percent': 0.001, # 0.1% fee when closing position 'total_round_trip_fee': 0.002 # 0.2% total for round trip } # Dynamic profitability reward parameter - starts at 0, adjusts based on success rate self.profitability_reward_multiplier = 0.0 # Starts at 0, can be increased self.min_profitability_multiplier = 0.0 # Minimum value self.max_profitability_multiplier = 2.0 # Maximum 2x multiplier self.profitability_adjustment_step = 0.1 # Adjust by 0.1 each time # Success rate tracking for profitability adjustment self.recent_trades_window = 20 # Look at last 20 trades self.success_rate_increase_threshold = 0.60 # Increase multiplier if >60% success self.success_rate_decrease_threshold = 0.51 # Decrease multiplier if <51% success self.last_profitability_adjustment = datetime.now() logger.info(f"TradingExecutor initialized - Trading: {self.trading_enabled}, Mode: {self.trading_mode}") logger.info(f"Simulation balance: ${self.simulation_balance:.2f}") # Legacy compatibility (deprecated) self.dry_run = self.simulation_mode # Thread safety with timeout support self.lock = RLock() self.lock_timeout = 10.0 # 10 second timeout for order execution # Open order management self.max_open_orders = 2 # Maximum number of open orders allowed self.open_orders_count = 0 # Current count of open orders # Rate limiting for open orders sync (30 seconds) self.last_open_orders_sync = datetime.min self.open_orders_sync_interval = 30 # seconds # Trading symbols self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Connect to exchange if self.trading_enabled: if self.simulation_mode: logger.info("TRADING EXECUTOR: Simulation mode - trading enabled without exchange connection") else: logger.info("TRADING EXECUTOR: Attempting to connect to exchange...") if not self._connect_exchange(): logger.error("TRADING EXECUTOR: Failed initial exchange connection. Trading will be disabled.") self.trading_enabled = False else: logger.info("TRADING EXECUTOR: Trading is explicitly disabled in config.") logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}") # Get exchange-specific configuration self.exchange_config = self.primary_config # Initialize config synchronizer for automatic fee updates (only for MEXC) self.config_synchronizer = ConfigSynchronizer( config_path=config_path, mexc_interface=self.exchange if (self.trading_enabled and self.primary_name == 'mexc') else None ) # Perform initial fee sync on startup if trading is enabled and using MEXC if self.trading_enabled and self.exchange and self.primary_name == 'mexc': try: logger.info(f"TRADING EXECUTOR: Performing initial fee synchronization with {self.primary_name.upper()} API") sync_result = self.config_synchronizer.sync_trading_fees(force=True) if sync_result.get('status') == 'success': logger.info("TRADING EXECUTOR: Fee synchronization completed successfully") if sync_result.get('changes_made'): logger.info(f"TRADING EXECUTOR: Fee changes applied: {list(sync_result['changes'].keys())}") # Reload config to get updated fees self.config = get_config(config_path) self.exchange_config = self.config.get('exchanges', {}).get(self.primary_name, {}) elif sync_result.get('status') == 'warning': logger.warning("TRADING EXECUTOR: Fee sync completed with warnings") else: logger.warning(f"TRADING EXECUTOR: Fee sync failed: {sync_result.get('status')}") except Exception as e: logger.warning(f"TRADING EXECUTOR: Initial fee sync failed: {e}") elif self.trading_enabled and self.exchange: logger.info(f"TRADING EXECUTOR: Using {self.primary_name.upper()} exchange - fee sync not available") # Sync positions from exchange on startup if in live mode if not self.simulation_mode and self.exchange and self.trading_enabled: self._sync_positions_on_startup() logger.info(f"Trading Executor initialized - Exchange: {self.primary_name.upper()}, Mode: {self.trading_mode}, Enabled: {self.trading_enabled}") def _sync_positions_on_startup(self): """Sync positions from exchange on startup""" try: logger.info("TRADING EXECUTOR: Syncing positions from exchange on startup...") # Get all open positions from exchange if hasattr(self.exchange, 'get_positions'): exchange_positions = self.exchange.get_positions() if exchange_positions: for position in exchange_positions: symbol = position.get('symbol', '').replace('USDT', '/USDT') size = float(position.get('size', 0)) side = position.get('side', '').upper() entry_price = float(position.get('entry_price', 0)) if size > 0 and symbol and side in ['LONG', 'SHORT']: # Create position object pos_obj = Position( symbol=symbol, side=side, quantity=size, entry_price=entry_price, entry_time=datetime.now() ) self.positions[symbol] = pos_obj logger.info(f"POSITION SYNC: Found {side} position for {symbol}: {size} @ ${entry_price:.2f}") logger.info(f"POSITION SYNC: Synced {len(self.positions)} positions from exchange") else: logger.warning("Exchange does not support position retrieval") except Exception as e: logger.error(f"POSITION SYNC: Error syncing positions on startup: {e}") def _sync_single_position_from_exchange(self, symbol: str, exchange_position: dict): """Sync a single position from exchange to local state""" try: size = float(exchange_position.get('size', 0)) side = exchange_position.get('side', '').upper() entry_price = float(exchange_position.get('entry_price', 0)) if size > 0 and side in ['LONG', 'SHORT']: pos_obj = Position( symbol=symbol, side=side, quantity=size, entry_price=entry_price, entry_time=datetime.now() ) self.positions[symbol] = pos_obj logger.info(f"POSITION SYNC: Added {side} position for {symbol}: {size} @ ${entry_price:.2f}") return True except Exception as e: logger.error(f"Error syncing single position for {symbol}: {e}") return False def close_all_positions(self): """Emergency close all positions - both local and exchange""" logger.warning("CLOSE ALL POSITIONS: Starting emergency position closure") positions_closed = 0 # Get all positions to close (local + exchange) positions_to_close = set() # Add local positions for symbol in self.positions.keys(): positions_to_close.add(symbol) # Add exchange positions if not in simulation mode if not self.simulation_mode and self.exchange: try: exchange_positions = self.exchange.get_positions() if exchange_positions: for pos in exchange_positions: symbol = pos.get('symbol', '').replace('USDT', '/USDT') size = float(pos.get('size', 0)) if size > 0: positions_to_close.add(symbol) except Exception as e: logger.error(f"Error getting exchange positions for closure: {e}") # Close all positions for symbol in positions_to_close: try: if symbol in self.positions: position = self.positions[symbol] if position.side == 'LONG': if self._close_long_position(symbol, 1.0, position.entry_price): positions_closed += 1 elif position.side == 'SHORT': if self._close_short_position(symbol, 1.0, position.entry_price): positions_closed += 1 else: logger.warning(f"Position {symbol} found on exchange but not locally - manual intervention needed") except Exception as e: logger.error(f"Error closing position {symbol}: {e}") logger.warning(f"CLOSE ALL POSITIONS: Closed {positions_closed} positions") return positions_closed def _safe_exchange_call(self, method_name: str, *args, **kwargs): """Safely call exchange methods with null checking""" if not self.exchange: logger.warning(f"No exchange interface available for {method_name}") return None try: method = getattr(self.exchange, method_name, None) if method: return method(*args, **kwargs) else: logger.error(f"Method {method_name} not available on exchange") return None except Exception as e: logger.error(f"Error calling {method_name}: {e}") return None def _get_real_current_price(self, symbol: str) -> Optional[float]: """Get real current price from data provider - NEVER use simulated data""" try: # Try to get from data provider first (most reliable) from core.data_provider import DataProvider data_provider = DataProvider() # Try multiple timeframes to get the most recent price for timeframe in ['1m', '5m', '1h']: try: df = data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True) if df is not None and not df.empty: price = float(df['close'].iloc[-1]) if price > 0: logger.debug(f"Got real price for {symbol} from {timeframe}: ${price:.2f}") return price except Exception as tf_error: logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}") continue # Try exchange ticker if available if self.exchange: try: ticker = self.exchange.get_ticker(symbol) if ticker and 'last' in ticker: price = float(ticker['last']) if price > 0: logger.debug(f"Got real price for {symbol} from exchange: ${price:.2f}") return price except Exception as ex_error: logger.debug(f"Failed to get price from exchange: {ex_error}") # Try external API as last resort try: import requests if symbol == 'ETH/USDT': response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2) if response.status_code == 200: data = response.json() price = float(data['price']) if price > 0: logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}") return price elif symbol == 'BTC/USDT': response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2) if response.status_code == 200: data = response.json() price = float(data['price']) if price > 0: logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}") return price except Exception as api_error: logger.debug(f"Failed to get price from external API: {api_error}") logger.error(f"Failed to get real current price for {symbol} from all sources") return None except Exception as e: logger.error(f"Error getting real current price for {symbol}: {e}") return None def _connect_exchange(self) -> bool: """Connect to the primary exchange""" if not self.exchange: logger.warning("No exchange interface available") return False if self.exchange.connect(): logger.info("Successfully connected to exchange") return True else: logger.error("Failed to connect to exchange") return False def execute_signal(self, symbol: str, action: str, confidence: float, current_price: Optional[float] = None) -> bool: """Execute a trading signal Args: symbol: Trading symbol (e.g., 'ETH/USDT') action: Trading action ('BUY', 'SELL', 'HOLD') confidence: Confidence level (0.0 to 1.0) current_price: Current market price Returns: bool: True if trade executed successfully """ logger.debug(f"TRADING EXECUTOR: execute_signal called. trading_enabled: {self.trading_enabled}") if not self.trading_enabled: logger.info(f"Trading disabled - Signal: {action} {symbol} (confidence: {confidence:.2f}) - Reason: Trading executor is not enabled.") return False if action == 'HOLD': return True # PERIODIC POSITION SYNC: Every 10th signal execution, sync positions from exchange to prevent desync if not hasattr(self, '_signal_count'): self._signal_count = 0 self._signal_count += 1 if self._signal_count % 10 == 0 and not self.simulation_mode and self.exchange: logger.debug(f"PERIODIC SYNC: Checking position sync for {symbol} (signal #{self._signal_count})") try: exchange_positions = self.exchange.get_positions(symbol) if exchange_positions: for pos in exchange_positions: size = float(pos.get('size', 0)) if size > 0 and symbol not in self.positions: logger.warning(f"DESYNC DETECTED: Found position on exchange but not locally for {symbol}") self._sync_single_position_from_exchange(symbol, pos) elif symbol in self.positions: logger.warning(f"DESYNC DETECTED: Have local position but none on exchange for {symbol}") # Consider removing local position or investigating further except Exception as e: logger.debug(f"Error in periodic position sync: {e}") # Check safety conditions if not self._check_safety_conditions(symbol, action): return False # Get current price if not provided if current_price is None: # Always get real current price - never use simulated data current_price = self._get_real_current_price(symbol) if current_price is None: logger.error(f"Failed to get real current price for {symbol}") return False # Assert that current_price is not None for type checking assert current_price is not None, "current_price should not be None at this point" # --- Balance check before executing trade (skip in simulation mode) --- # Only perform balance check for live trading, not simulation if not self.simulation_mode and (action == 'BUY' or (action == 'SELL' and symbol not in self.positions) or (action == 'SHORT')): # Determine the quote asset (e.g., USDT, USDC) from the symbol if '/' in symbol: quote_asset = symbol.split('/')[1].upper() # Assuming symbol is like ETH/USDT # Keep USDT as USDT for MEXC spot trading (no conversion needed) else: # Fallback for symbols like ETHUSDT (assuming last 4 chars are quote) quote_asset = symbol[-4:].upper() # Keep USDT as USDT for MEXC spot trading (no conversion needed) # Calculate required capital for the trade # If we are selling (to open a short position), we need collateral based on the position size # For simplicity, assume required capital is the full position value in USD required_capital = self._calculate_position_size(confidence, current_price) # Get available balance for the quote asset (try USDT first, then USDC as fallback) if quote_asset == 'USDT': available_balance = self.exchange.get_balance('USDT') if available_balance < required_capital: # If USDT balance is insufficient, check USDC as fallback usdc_balance = self.exchange.get_balance('USDC') if usdc_balance >= required_capital: available_balance = usdc_balance quote_asset = 'USDC' # Use USDC instead logger.info(f"BALANCE CHECK: Using USDC fallback balance for {symbol}") else: available_balance = self.exchange.get_balance(quote_asset) logger.info(f"BALANCE CHECK: Symbol: {symbol}, Action: {action}, Required: ${required_capital:.2f} {quote_asset}, Available: ${available_balance:.2f} {quote_asset}") # Allow some small precision tolerance (1 cent) and ensure sufficient balance balance_tolerance = 0.01 if available_balance < (required_capital - balance_tolerance): logger.warning(f"Trade blocked for {symbol} {action}: Insufficient {quote_asset} balance. " f"Required: ${required_capital:.2f}, Available: ${available_balance:.2f}") return False else: logger.info(f"BALANCE CHECK PASSED: {quote_asset} balance sufficient for trade") elif self.simulation_mode: logger.debug(f"SIMULATION MODE: Skipping balance check for {symbol} {action} - allowing trade for model training") # --- End Balance check --- # Try to acquire lock with timeout to prevent hanging lock_acquired = self.lock.acquire(timeout=self.lock_timeout) if not lock_acquired: logger.error(f"TIMEOUT: Failed to acquire lock within {self.lock_timeout}s for {action} {symbol}") logger.error(f"This indicates another operation is hanging. Cancelling {action} order.") return False try: logger.info(f"LOCK ACQUIRED: Executing {action} for {symbol}") start_time = time.time() if action == 'BUY': result = self._execute_buy(symbol, confidence, current_price) elif action == 'SELL': result = self._execute_sell(symbol, confidence, current_price) elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal result = self._execute_short(symbol, confidence, current_price) else: logger.warning(f"Unknown action: {action}") result = False execution_time = time.time() - start_time logger.info(f"EXECUTION COMPLETE: {action} for {symbol} took {execution_time:.2f}s, result: {result}") return result except Exception as e: logger.error(f"Error executing {action} signal for {symbol}: {e}") return False finally: self.lock.release() logger.debug(f"LOCK RELEASED: {action} for {symbol}") def _get_open_orders_count(self) -> int: """Get current count of open orders across all symbols with rate limiting""" try: if self.simulation_mode: return 0 # Check if enough time has passed since last sync current_time = datetime.now() time_since_last_sync = (current_time - self.last_open_orders_sync).total_seconds() if time_since_last_sync < self.open_orders_sync_interval: # Return cached count if within rate limit logger.debug(f"Using cached open orders count ({self.open_orders_count}) - rate limited") return self.open_orders_count # Update last sync time self.last_open_orders_sync = current_time total_open_orders = 0 for symbol in self.symbols: try: open_orders = self.exchange.get_open_orders(symbol) total_open_orders += len(open_orders) except Exception as e: logger.warning(f"Error getting open orders for {symbol}: {e}") # Continue with other symbols continue # Update cached count self.open_orders_count = total_open_orders logger.debug(f"Updated open orders count: {total_open_orders}") return total_open_orders except Exception as e: logger.error(f"Error getting open orders count: {e}") return self.open_orders_count # Return cached value on error def _can_place_new_order(self) -> bool: """Check if we can place a new order based on open order limit""" current_count = self._get_open_orders_count() can_place = current_count < self.max_open_orders if not can_place: logger.warning(f"Cannot place new order: {current_count}/{self.max_open_orders} open orders") return can_place def sync_open_orders(self) -> Dict[str, Any]: """Synchronize open orders with exchange and update internal state with rate limiting Returns: dict: Sync result with status and order details """ try: if self.simulation_mode: return { 'status': 'success', 'message': 'Simulation mode - no real orders to sync', 'orders': [], 'count': 0 } # Check rate limiting current_time = datetime.now() time_since_last_sync = (current_time - self.last_open_orders_sync).total_seconds() if time_since_last_sync < self.open_orders_sync_interval: # Return cached result if within rate limit logger.debug(f"Open order sync rate limited - using cached data") return { 'status': 'rate_limited', 'message': f'Rate limited - last sync was {time_since_last_sync:.1f}s ago', 'orders': [], 'count': self.open_orders_count, 'cached': True } # Update last sync time self.last_open_orders_sync = current_time sync_result = { 'status': 'started', 'orders': [], 'count': 0, 'errors': [] } total_orders = 0 all_orders = [] # Sync orders for each symbol for symbol in self.symbols: try: open_orders = self.exchange.get_open_orders(symbol) if open_orders: symbol_orders = [] for order in open_orders: order_info = { 'symbol': symbol, 'order_id': order.get('orderId'), 'side': order.get('side'), 'type': order.get('type'), 'quantity': float(order.get('origQty', 0)), 'price': float(order.get('price', 0)), 'status': order.get('status'), 'time': order.get('time') } symbol_orders.append(order_info) all_orders.append(order_info) total_orders += len(symbol_orders) logger.info(f"Synced {len(symbol_orders)} open orders for {symbol}") except Exception as e: error_msg = f"Error syncing orders for {symbol}: {e}" logger.warning(error_msg) # Changed to warning since this is expected with rate limits sync_result['errors'].append(error_msg) # Update internal state self.open_orders_count = total_orders sync_result.update({ 'status': 'success', 'orders': all_orders, 'count': total_orders, 'message': f"Synced {total_orders} open orders across {len(self.symbols)} symbols" }) logger.info(f"Open order sync completed: {total_orders} orders") return sync_result except Exception as e: logger.error(f"Error in open order sync: {e}") return { 'status': 'error', 'message': str(e), 'orders': [], 'count': 0, 'errors': [str(e)] } def _cancel_open_orders(self, symbol: str) -> int: """Cancel all open orders for a symbol and return count of cancelled orders""" try: if self.simulation_mode: return 0 open_orders = self.exchange.get_open_orders(symbol) cancelled_count = 0 for order in open_orders: order_id = order.get('orderId') if order_id: try: cancel_result = self.exchange.cancel_order(symbol, str(order_id)) if cancel_result: cancelled_count += 1 logger.info(f"Cancelled order {order_id} for {symbol}") time.sleep(0.1) # Small delay between cancellations except Exception as e: logger.warning(f"Failed to cancel order {order_id}: {e}") return cancelled_count except Exception as e: logger.error(f"Error cancelling open orders for {symbol}: {e}") return 0 def _calculate_recent_success_rate(self) -> float: """Calculate success rate of recent closed trades Returns: float: Success rate (0.0 to 1.0) of recent trades """ try: if len(self.trade_records) < 5: # Need at least 5 trades return 0.0 # Get recent trades (up to the window size) recent_trades = self.trade_records[-self.recent_trades_window:] if not recent_trades: return 0.0 # Count winning trades (net PnL > 0) winning_trades = sum(1 for trade in recent_trades if trade.net_pnl > 0) success_rate = winning_trades / len(recent_trades) logger.debug(f"Recent success rate: {success_rate:.2%} ({winning_trades}/{len(recent_trades)} trades)") return success_rate except Exception as e: logger.error(f"Error calculating success rate: {e}") return 0.0 def _adjust_profitability_reward_multiplier(self): """Adjust profitability reward multiplier based on recent success rate""" try: # Only adjust every 5 minutes to avoid too frequent changes current_time = datetime.now() time_since_last_adjustment = (current_time - self.last_profitability_adjustment).total_seconds() if time_since_last_adjustment < 300: # 5 minutes return success_rate = self._calculate_recent_success_rate() # Only adjust if we have enough trades if len(self.trade_records) < 10: return old_multiplier = self.profitability_reward_multiplier # Increase multiplier if success rate > 60% if success_rate > self.success_rate_increase_threshold: self.profitability_reward_multiplier = min( self.max_profitability_multiplier, self.profitability_reward_multiplier + self.profitability_adjustment_step ) logger.info(f"🎯 SUCCESS RATE HIGH ({success_rate:.1%}) - Increased profitability multiplier: {old_multiplier:.1f} → {self.profitability_reward_multiplier:.1f}") # Decrease multiplier if success rate < 51% elif success_rate < self.success_rate_decrease_threshold: self.profitability_reward_multiplier = max( self.min_profitability_multiplier, self.profitability_reward_multiplier - self.profitability_adjustment_step ) logger.info(f"⚠️ SUCCESS RATE LOW ({success_rate:.1%}) - Decreased profitability multiplier: {old_multiplier:.1f} → {self.profitability_reward_multiplier:.1f}") else: logger.debug(f"Success rate {success_rate:.1%} in acceptable range - keeping multiplier at {self.profitability_reward_multiplier:.1f}") self.last_profitability_adjustment = current_time except Exception as e: logger.error(f"Error adjusting profitability reward multiplier: {e}") def get_profitability_reward_multiplier(self) -> float: """Get current profitability reward multiplier Returns: float: Current profitability reward multiplier """ return self.profitability_reward_multiplier def _can_reenable_live_trading(self) -> bool: """Check if trading performance has improved enough to re-enable live trading Returns: bool: True if performance meets criteria to re-enable live trading """ try: # Need enough trades to evaluate if len(self.trade_history) < self.trades_to_evaluate: logger.debug(f"Not enough trades to evaluate for re-enabling live trading: {len(self.trade_history)}/{self.trades_to_evaluate}") return False # Get the most recent trades for evaluation recent_trades = self.trade_history[-self.trades_to_evaluate:] # Calculate success rate winning_trades = sum(1 for trade in recent_trades if trade.pnl > 0.001) success_rate = winning_trades / len(recent_trades) # Calculate average PnL avg_pnl = sum(trade.pnl for trade in recent_trades) / len(recent_trades) # Calculate win/loss ratio losing_trades = sum(1 for trade in recent_trades if trade.pnl < -0.001) win_loss_ratio = winning_trades / max(1, losing_trades) # Avoid division by zero logger.info(f"SAFETY FEATURE: Performance evaluation - Success rate: {success_rate:.2%}, Avg PnL: ${avg_pnl:.2f}, Win/Loss ratio: {win_loss_ratio:.2f}") # Criteria to re-enable live trading: # 1. Success rate must exceed minimum threshold # 2. Average PnL must be positive # 3. Win/loss ratio must be at least 1.0 (equal wins and losses) if (success_rate >= self.min_success_rate_to_reenable and avg_pnl > 0 and win_loss_ratio >= 1.0): logger.info(f"SAFETY FEATURE: Performance criteria met for re-enabling live trading") return True else: logger.debug(f"SAFETY FEATURE: Performance criteria not yet met for re-enabling live trading") return False except Exception as e: logger.error(f"Error evaluating trading performance: {e}") return False except Exception as e: logger.error(f"Error evaluating trading performance: {e}") return False def _check_safety_conditions(self, symbol: str, action: str) -> bool: """Check if it's safe to execute a trade""" # Check if trading is stopped if self.exchange_config.get('emergency_stop', False): logger.warning("Emergency stop is active - no trades allowed") return False # Safety feature: Check consecutive losses and switch to simulation mode if needed if not self.simulation_mode and self.consecutive_losses >= self.max_consecutive_losses: logger.warning(f"SAFETY FEATURE ACTIVATED: {self.consecutive_losses} consecutive losses detected") logger.warning(f"Switching from live trading to simulation mode for safety") # Store original mode and switch to simulation self.original_trading_mode = self.trading_mode self.trading_mode = 'simulation' self.simulation_mode = True self.safety_triggered = True # Log the event logger.info(f"Trading mode changed to SIMULATION due to safety feature") logger.info(f"Will continue to monitor performance and re-enable live trading when success rate improves") # Continue allowing trades in simulation mode return True # Check if we should try to re-enable live trading after safety feature was triggered if self.simulation_mode and self.safety_triggered and self.original_trading_mode != 'simulation': # Check if performance has improved enough to re-enable live trading if self._can_reenable_live_trading(): logger.info(f"SAFETY FEATURE: Performance has improved, re-enabling live trading") # Switch back to original mode self.trading_mode = self.original_trading_mode self.simulation_mode = (self.trading_mode == 'simulation') self.safety_triggered = False self.consecutive_losses = 0 # Reset consecutive losses counter logger.info(f"Trading mode restored to {self.trading_mode}") # Continue with the trade return True # Check symbol allowlist allowed_symbols = self.exchange_config.get('allowed_symbols', []) if allowed_symbols and symbol not in allowed_symbols: logger.warning(f"Symbol {symbol} not in allowed list: {allowed_symbols}") return False # Check daily loss limit (use trading config as fallback) max_daily_loss = self.exchange_config.get('max_daily_loss_usd', self.trading_config.get('max_daily_loss_usd', 200.0)) if self.daily_loss >= max_daily_loss: logger.warning(f"Daily loss limit reached: ${self.daily_loss:.2f} >= ${max_daily_loss}") return False # Check trade interval - allow bypass for test scenarios min_interval = self.exchange_config.get('min_trade_interval_seconds', self.trading_config.get('min_trade_interval_seconds', 5)) last_trade = self.last_trade_time.get(symbol, datetime.min) time_since_last = (datetime.now() - last_trade).total_seconds() # Allow bypass for high confidence sells (closing positions) or test scenarios if time_since_last < min_interval: # Allow immediate sells for closing positions or very high confidence (0.9+) if action == 'SELL' and symbol in self.positions: logger.info(f"Allowing immediate SELL to close position for {symbol}") elif hasattr(self, '_test_mode') and self._test_mode: logger.info(f"Test mode: bypassing trade interval for {symbol}") else: logger.info(f"Trade interval not met for {symbol} ({time_since_last:.1f}s < {min_interval}s)") return False # Check concurrent positions max_positions = self.exchange_config.get('max_concurrent_positions', self.trading_config.get('max_concurrent_positions', 3)) if len(self.positions) >= max_positions and action == 'BUY': logger.warning(f"Maximum concurrent positions reached: {len(self.positions)}") return False # SYNC OPEN ORDERS BEFORE CHECKING LIMITS # This ensures we have accurate position data before making decisions if not self.simulation_mode: try: logger.debug(f"Syncing open orders before trade execution for {symbol}") sync_result = self.sync_open_orders() if sync_result.get('status') == 'success': logger.debug(f"Open orders synced successfully: {sync_result.get('count', 0)} orders") else: logger.warning(f"Open orders sync failed: {sync_result.get('message', 'Unknown error')}") except Exception as e: logger.warning(f"Error syncing open orders: {e}") # Check open order limit if not self._can_place_new_order(): logger.warning(f"Maximum open orders reached: {self._get_open_orders_count()}/{self.max_open_orders}") return False # Check position size limit before opening new positions if action in ['BUY', 'SHORT'] and symbol not in self.positions: if not self._check_position_size_limit(): logger.warning(f"Position size limit reached - cannot open new position for {symbol}") return False return True def _execute_buy(self, symbol: str, confidence: float, current_price: float) -> bool: """Execute a buy order with enhanced position management""" # CRITICAL: Check for existing positions (both local and exchange) if symbol in self.positions: position = self.positions[symbol] if position.side == 'SHORT': logger.info(f"Closing SHORT position in {symbol}") return self._close_short_position(symbol, confidence, current_price) else: logger.warning(f"POSITION SAFETY: Already have LONG position in {symbol} - blocking duplicate trade") return False # ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode if not self.simulation_mode and self.exchange: try: exchange_positions = self.exchange.get_positions(symbol) if exchange_positions: for pos in exchange_positions: if float(pos.get('size', 0)) > 0: logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking duplicate trade") logger.warning(f"Position details: {pos}") # Sync this position to local state self._sync_single_position_from_exchange(symbol, pos) return False except Exception as e: logger.debug(f"Error checking exchange positions for {symbol}: {e}") # Don't block trade if we can't check - but log it # Cancel any existing open orders before placing new order if not self.simulation_mode: self._cancel_open_orders(symbol) # Calculate position size position_size = self._calculate_position_size(confidence, current_price) quantity = position_size / current_price logger.info(f"Executing BUY: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]") if self.simulation_mode: # Create simulated position self.positions[symbol] = Position( symbol=symbol, side='LONG', quantity=quantity, entry_price=current_price, entry_time=datetime.now(), order_id=f"sim_{int(datetime.now().timestamp())}" ) logger.info(f"Simulated BUY order: {quantity:.6f} {symbol} at ${current_price:.2f}") return True else: # Place real order with enhanced error handling result = self._place_order_with_retry(symbol, 'BUY', 'MARKET', quantity, current_price) # Check for position check error if result and 'error' in result and result['error'] == 'existing_position': logger.error(f"BUY order blocked: {result['message']}") return False if result and 'orderId' in result: # Use actual fill information if available, otherwise fall back to order parameters filled_quantity = result.get('executedQty', quantity) fill_price = result.get('avgPrice', current_price) # Only create position if order was actually filled if result.get('filled', True): # Assume filled for backward compatibility self.positions[symbol] = Position( symbol=symbol, side='LONG', quantity=float(filled_quantity), entry_price=float(fill_price), entry_time=datetime.now(), order_id=result['orderId'] ) logger.info(f"BUY position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}") self.last_trade_time[symbol] = datetime.now() return True else: logger.error(f"BUY order placed but not filled: {result}") return False else: logger.error("Failed to place BUY order") return False def _execute_sell(self, symbol: str, confidence: float, current_price: float) -> bool: """Execute a sell order (close long position or open short position)""" if symbol in self.positions: position = self.positions[symbol] if position.side == 'LONG': logger.info(f"Closing LONG position in {symbol}") return self._close_long_position(symbol, confidence, current_price) else: logger.info(f"Already have SHORT position in {symbol}") return False else: # No position to sell, open short position logger.info(f"No position to sell in {symbol}. Opening short position") return self._execute_short(symbol, confidence, current_price) def _execute_short(self, symbol: str, confidence: float, current_price: float) -> bool: """Execute a short order (sell without holding the asset) with enhanced position management""" # CRITICAL: Check for any existing positions before opening SHORT if symbol in self.positions: logger.warning(f"POSITION SAFETY: Already have position in {symbol} - blocking SHORT trade") return False # ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode if not self.simulation_mode and self.exchange: try: exchange_positions = self.exchange.get_positions(symbol) if exchange_positions: for pos in exchange_positions: if float(pos.get('size', 0)) > 0: logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking SHORT trade") logger.warning(f"Position details: {pos}") # Sync this position to local state self._sync_single_position_from_exchange(symbol, pos) return False except Exception as e: logger.debug(f"Error checking exchange positions for SHORT {symbol}: {e}") # Cancel any existing open orders before placing new order if not self.simulation_mode: self._cancel_open_orders(symbol) # Calculate position size position_size = self._calculate_position_size(confidence, current_price) quantity = position_size / current_price logger.info(f"Executing SHORT: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]") if self.simulation_mode: # Create simulated short position self.positions[symbol] = Position( symbol=symbol, side='SHORT', quantity=quantity, entry_price=current_price, entry_time=datetime.now(), order_id=f"sim_{int(datetime.now().timestamp())}" ) logger.info(f"Simulated SHORT order: {quantity:.6f} {symbol} at ${current_price:.2f}") return True else: # Place real short order with enhanced error handling result = self._place_order_with_retry(symbol, 'SELL', 'MARKET', quantity, current_price) # Check for position check error if result and 'error' in result and result['error'] == 'existing_position': logger.error(f"SHORT order blocked: {result['message']}") return False if result and 'orderId' in result: # Use actual fill information if available, otherwise fall back to order parameters filled_quantity = result.get('executedQty', quantity) fill_price = result.get('avgPrice', current_price) # Only create position if order was actually filled if result.get('filled', True): # Assume filled for backward compatibility self.positions[symbol] = Position( symbol=symbol, side='SHORT', quantity=float(filled_quantity), entry_price=float(fill_price), entry_time=datetime.now(), order_id=result['orderId'] ) logger.info(f"SHORT position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}") self.last_trade_time[symbol] = datetime.now() return True else: logger.error(f"SHORT order placed but not filled: {result}") return False else: logger.error("Failed to place SHORT order") return False def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]: """Place order with retry logic for MEXC error handling""" # FINAL POSITION CHECK: Verify no existing position before placing order if not self.simulation_mode and self.exchange: try: exchange_positions = self.exchange.get_positions(symbol) if exchange_positions: for pos in exchange_positions: size = float(pos.get('size', 0)) if size > 0: logger.error(f"FINAL POSITION CHECK FAILED: Found existing position for {symbol} before placing order") logger.error(f"Position details: {pos}") logger.error(f"Order details: {side} {quantity} @ ${current_price}") # Sync the position to local state self._sync_single_position_from_exchange(symbol, pos) return {'error': 'existing_position', 'message': f'Position already exists for {symbol}'} except Exception as e: logger.warning(f"Error in final position check for {symbol}: {e}") # Continue with order placement if we can't check positions order_start_time = time.time() max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout) for attempt in range(max_retries): # Check if we've exceeded the maximum order time elapsed_time = time.time() - order_start_time if elapsed_time > max_order_time: logger.error(f"ORDER TIMEOUT: Order placement exceeded {max_order_time}s limit for {side} {symbol}") logger.error(f"Elapsed time: {elapsed_time:.2f}s, cancelling order to prevent lock hanging") return {} try: # For retries, use more aggressive pricing for LIMIT orders order_price = current_price if order_type.upper() == 'LIMIT' and attempt > 0: # Increase aggressiveness with each retry aggression_factor = 1 + (0.005 * (attempt + 1)) # 0.5%, 1.0%, 1.5% etc. if side.upper() == 'BUY': order_price = current_price * aggression_factor else: order_price = current_price / aggression_factor logger.info(f"Retry {attempt + 1}: Using more aggressive price ${order_price:.4f} (vs market ${current_price:.4f})") result = self.exchange.place_order(symbol, side, order_type, quantity, order_price) # Check if result contains error information if isinstance(result, dict) and 'error' in result: error_type = result.get('error') error_code = result.get('code') error_msg = result.get('message', 'Unknown error') if error_type == 'oversold' and error_code == 30005: logger.warning(f"MEXC Oversold error on attempt {attempt + 1}/{max_retries}") logger.warning(f"Error: {error_msg}") if attempt < max_retries - 1: # Calculate wait time but respect timeout limit suggested_wait = result.get('retry_after', 60) * (2 ** attempt) elapsed_time = time.time() - order_start_time remaining_time = max_order_time - elapsed_time if suggested_wait > remaining_time: logger.warning(f"Oversold retry wait ({suggested_wait}s) exceeds timeout limit") logger.warning(f"Remaining time: {remaining_time:.2f}s, skipping retry to prevent hanging") return {} logger.info(f"Waiting {suggested_wait} seconds before retry due to oversold condition...") time.sleep(suggested_wait) # Reduce quantity for next attempt to avoid oversold quantity = quantity * 0.8 # Reduce by 20% logger.info(f"Reducing quantity to {quantity:.6f} for retry") continue else: logger.error(f"Max retries reached for oversold condition") return {} elif error_type == 'direction_not_allowed': logger.error(f"Trading direction not allowed for {symbol} {side}") return {} elif error_type == 'insufficient_position': logger.error(f"Insufficient position for {symbol} {side}") return {} else: logger.error(f"MEXC API error: {error_code} - {error_msg}") if attempt < max_retries - 1: wait_time = 5 * (attempt + 1) # Wait 5, 10, 15 seconds elapsed_time = time.time() - order_start_time remaining_time = max_order_time - elapsed_time if wait_time > remaining_time: logger.warning(f"API error retry wait ({wait_time}s) exceeds timeout, cancelling") return {} time.sleep(wait_time) continue else: return {} # Success case - order placed elif isinstance(result, dict) and ('orderId' in result or 'symbol' in result): logger.info(f"Order placed successfully on attempt {attempt + 1}") # For LIMIT orders, verify that the order actually fills if order_type.upper() == 'LIMIT' and 'orderId' in result: order_id = result['orderId'] filled_result = self._wait_for_order_fill(symbol, order_id, max_wait_time=5.0) if filled_result['filled']: logger.info(f"LIMIT order {order_id} filled successfully") # Update result with fill information result.update(filled_result) return result else: logger.warning(f"LIMIT order {order_id} not filled within timeout, cancelling...") # Cancel the unfilled order try: self.exchange.cancel_order(symbol, order_id) logger.info(f"Cancelled unfilled order {order_id}") except Exception as e: logger.error(f"Failed to cancel unfilled order {order_id}: {e}") # If this was the last attempt, return failure if attempt == max_retries - 1: return {} # Try again with a more aggressive price logger.info(f"Retrying with more aggressive LIMIT pricing...") continue else: # MARKET orders or orders without orderId - assume immediate fill return result # Empty result - treat as failure else: logger.warning(f"Empty result on attempt {attempt + 1}/{max_retries}") if attempt < max_retries - 1: wait_time = 2 * (attempt + 1) # Wait 2, 4, 6 seconds elapsed_time = time.time() - order_start_time remaining_time = max_order_time - elapsed_time if wait_time > remaining_time: logger.warning(f"Empty result retry wait ({wait_time}s) exceeds timeout, cancelling") return {} time.sleep(wait_time) continue else: return {} except Exception as e: logger.error(f"Exception on order attempt {attempt + 1}/{max_retries}: {e}") if attempt < max_retries - 1: wait_time = 3 * (attempt + 1) # Wait 3, 6, 9 seconds elapsed_time = time.time() - order_start_time remaining_time = max_order_time - elapsed_time if wait_time > remaining_time: logger.warning(f"Exception retry wait ({wait_time}s) exceeds timeout, cancelling") return {} time.sleep(wait_time) continue else: return {} logger.error(f"Failed to place order after {max_retries} attempts") return {} def _wait_for_order_fill(self, symbol: str, order_id: str, max_wait_time: float = 5.0) -> Dict[str, Any]: """Wait for a LIMIT order to fill and return fill status Args: symbol: Trading symbol order_id: Order ID to monitor max_wait_time: Maximum time to wait for fill in seconds Returns: dict: {'filled': bool, 'status': str, 'executedQty': float, 'avgPrice': float} """ start_time = time.time() check_interval = 0.2 # Check every 200ms while time.time() - start_time < max_wait_time: try: order_status = self.exchange.get_order_status(symbol, order_id) if order_status and isinstance(order_status, dict): status = order_status.get('status', '').upper() executed_qty = float(order_status.get('executedQty', 0)) orig_qty = float(order_status.get('origQty', 0)) avg_price = float(order_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0 logger.debug(f"Order {order_id} status: {status}, executed: {executed_qty}/{orig_qty}") if status == 'FILLED': return { 'filled': True, 'status': status, 'executedQty': executed_qty, 'avgPrice': avg_price, 'fillTime': time.time() } elif status in ['CANCELED', 'REJECTED', 'EXPIRED']: return { 'filled': False, 'status': status, 'executedQty': executed_qty, 'avgPrice': avg_price, 'reason': f'Order {status.lower()}' } elif status == 'PARTIALLY_FILLED': # For partial fills, continue waiting but log progress fill_percentage = (executed_qty / orig_qty * 100) if orig_qty > 0 else 0 logger.debug(f"Order {order_id} partially filled: {fill_percentage:.1f}%") # Wait before next check time.sleep(check_interval) except Exception as e: logger.error(f"Error checking order status for {order_id}: {e}") time.sleep(check_interval) # Timeout - check final status try: final_status = self.exchange.get_order_status(symbol, order_id) if final_status: status = final_status.get('status', '').upper() executed_qty = float(final_status.get('executedQty', 0)) avg_price = float(final_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0 return { 'filled': status == 'FILLED', 'status': status, 'executedQty': executed_qty, 'avgPrice': avg_price, 'reason': 'timeout' if status != 'FILLED' else None } except Exception as e: logger.error(f"Error getting final order status for {order_id}: {e}") return { 'filled': False, 'status': 'UNKNOWN', 'executedQty': 0, 'avgPrice': 0, 'reason': 'timeout_and_status_check_failed' } def _close_short_position(self, symbol: str, confidence: float, current_price: float) -> bool: """Close a short position by buying""" if symbol not in self.positions: logger.warning(f"No position to close in {symbol}") return False position = self.positions[symbol] if position.side != 'SHORT': logger.warning(f"Position in {symbol} is not SHORT, cannot close with BUY") return False logger.info(f"Closing SHORT position: {position.quantity:.6f} {symbol} at ${current_price:.2f} " f"(confidence: {confidence:.2f})") if self.simulation_mode: logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Short close logged but not executed") # Calculate simulated fees in simulation mode - FIXED to include both entry and exit fees trading_fees = self.exchange_config.get('trading_fees', {}) taker_fee_rate = trading_fees.get('taker_fee', trading_fees.get('default_fee', 0.0006)) # Calculate both entry and exit fees entry_fee = position.quantity * position.entry_price * taker_fee_rate exit_fee = position.quantity * current_price * taker_fee_rate simulated_fees = entry_fee + exit_fee # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = position.quantity * position.entry_price # Calculate gross PnL (before fees) with leverage - FIXED for SHORT positions gross_pnl = (position.entry_price - current_price) * position.quantity * leverage # Calculate net PnL (after fees) net_pnl = gross_pnl - simulated_fees # Calculate hold time exit_time = datetime.now() hold_time_seconds = (exit_time - position.entry_time).total_seconds() # Create trade record with corrected PnL calculations trade_record = TradeRecord( symbol=symbol, side='SHORT', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=exit_time, pnl=net_pnl, # Store net PnL as the main PnL value fees=simulated_fees, confidence=confidence, hold_time_seconds=hold_time_seconds, leverage=leverage, position_size_usd=position_size_usd, gross_pnl=gross_pnl, net_pnl=net_pnl ) self.trade_history.append(trade_record) self.trade_records.append(trade_record) self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl # Adjust profitability reward multiplier based on recent performance self._adjust_profitability_reward_multiplier() # Update consecutive losses using net_pnl if net_pnl < -0.001: # A losing trade self.consecutive_losses += 1 elif net_pnl > 0.001: # A winning trade self.consecutive_losses = 0 else: # Breakeven trade self.consecutive_losses = 0 # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"SHORT position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${simulated_fees:.3f}") return True try: # Get order type from config order_type = self.exchange_config.get('order_type', 'market').lower() # For limit orders, set price slightly above market for immediate execution limit_price = None if order_type == 'limit': # Set buy price slightly above market to ensure immediate execution limit_price = current_price * 1.001 # 0.1% above market # Place buy order to close short if order_type == 'market': order = self.exchange.place_order( symbol=symbol, side='buy', # Buy to close short position order_type=order_type, quantity=position.quantity ) else: # For limit orders, price is required assert limit_price is not None, "limit_price required for limit orders" order = self.exchange.place_order( symbol=symbol, side='buy', # Buy to close short position order_type=order_type, quantity=position.quantity, price=limit_price ) if order: # Calculate fees using real API data when available fees = self._calculate_real_trading_fees(order, symbol, position.quantity, current_price) # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = position.quantity * position.entry_price # Calculate gross PnL (before fees) with leverage - FIXED for SHORT positions gross_pnl = (position.entry_price - current_price) * position.quantity * leverage # Calculate net PnL (after fees) net_pnl = gross_pnl - fees # Calculate hold time exit_time = datetime.now() hold_time_seconds = (exit_time - position.entry_time).total_seconds() # Create trade record with corrected PnL calculations trade_record = TradeRecord( symbol=symbol, side='SHORT', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=exit_time, pnl=net_pnl, # Store net PnL as the main PnL value fees=fees, confidence=confidence, hold_time_seconds=hold_time_seconds, leverage=leverage, position_size_usd=position_size_usd, gross_pnl=gross_pnl, net_pnl=net_pnl ) self.trade_history.append(trade_record) self.trade_records.append(trade_record) self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl # Adjust profitability reward multiplier based on recent performance self._adjust_profitability_reward_multiplier() # Update consecutive losses using net_pnl if net_pnl < -0.001: # A losing trade self.consecutive_losses += 1 elif net_pnl > 0.001: # A winning trade self.consecutive_losses = 0 else: # Breakeven trade self.consecutive_losses = 0 # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"SHORT close order executed: {order}") logger.info(f"SHORT position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${fees:.3f}") return True else: logger.error("Failed to place SHORT close order") return False except Exception as e: logger.error(f"Error closing SHORT position: {e}") return False def _close_long_position(self, symbol: str, confidence: float, current_price: float) -> bool: """Close a long position by selling""" if symbol not in self.positions: logger.warning(f"No position to close in {symbol}") return False position = self.positions[symbol] if position.side != 'LONG': logger.warning(f"Position in {symbol} is not LONG, cannot close with SELL") return False logger.info(f"Closing LONG position: {position.quantity:.6f} {symbol} at ${current_price:.2f} " f"(confidence: {confidence:.2f})") if self.simulation_mode: logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Long close logged but not executed") # Calculate simulated fees in simulation mode - FIXED to include both entry and exit fees trading_fees = self.exchange_config.get('trading_fees', {}) taker_fee_rate = trading_fees.get('taker_fee', trading_fees.get('default_fee', 0.0006)) # Calculate both entry and exit fees entry_fee = position.quantity * position.entry_price * taker_fee_rate exit_fee = position.quantity * current_price * taker_fee_rate simulated_fees = entry_fee + exit_fee # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = position.quantity * position.entry_price # Calculate gross PnL (before fees) with leverage gross_pnl = (current_price - position.entry_price) * position.quantity * leverage # Calculate net PnL (after fees) net_pnl = gross_pnl - simulated_fees # Calculate hold time exit_time = datetime.now() hold_time_seconds = (exit_time - position.entry_time).total_seconds() # Create trade record with corrected PnL calculations trade_record = TradeRecord( symbol=symbol, side='LONG', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=exit_time, pnl=net_pnl, # Store net PnL as the main PnL value fees=simulated_fees, confidence=confidence, hold_time_seconds=hold_time_seconds, leverage=leverage, position_size_usd=position_size_usd, gross_pnl=gross_pnl, net_pnl=net_pnl ) self.trade_history.append(trade_record) self.trade_records.append(trade_record) self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl # Adjust profitability reward multiplier based on recent performance self._adjust_profitability_reward_multiplier() # Update consecutive losses using net_pnl if net_pnl < -0.001: # A losing trade self.consecutive_losses += 1 elif net_pnl > 0.001: # A winning trade self.consecutive_losses = 0 else: # Breakeven trade self.consecutive_losses = 0 # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"LONG position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${simulated_fees:.3f}") return True try: # Get order type from config order_type = self.exchange_config.get('order_type', 'market').lower() # For limit orders, set price slightly below market for immediate execution limit_price = None if order_type == 'limit': # Set sell price slightly below market to ensure immediate execution limit_price = current_price * 0.999 # 0.1% below market # Place sell order to close long if order_type == 'market': order = self.exchange.place_order( symbol=symbol, side='sell', # Sell to close long position order_type=order_type, quantity=position.quantity ) else: # For limit orders, price is required assert limit_price is not None, "limit_price required for limit orders" order = self.exchange.place_order( symbol=symbol, side='sell', # Sell to close long position order_type=order_type, quantity=position.quantity, price=limit_price ) if order: # Calculate fees using real API data when available fees = self._calculate_real_trading_fees(order, symbol, position.quantity, current_price) # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = position.quantity * position.entry_price # Calculate gross PnL (before fees) with leverage gross_pnl = (current_price - position.entry_price) * position.quantity * leverage # Calculate net PnL (after fees) net_pnl = gross_pnl - fees # Calculate hold time exit_time = datetime.now() hold_time_seconds = (exit_time - position.entry_time).total_seconds() # Create trade record with corrected PnL calculations trade_record = TradeRecord( symbol=symbol, side='LONG', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=exit_time, pnl=net_pnl, # Store net PnL as the main PnL value fees=fees, confidence=confidence, hold_time_seconds=hold_time_seconds, leverage=leverage, position_size_usd=position_size_usd, gross_pnl=gross_pnl, net_pnl=net_pnl ) self.trade_history.append(trade_record) self.trade_records.append(trade_record) self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl # Adjust profitability reward multiplier based on recent performance self._adjust_profitability_reward_multiplier() # Update consecutive losses using net_pnl if net_pnl < -0.001: # A losing trade self.consecutive_losses += 1 elif net_pnl > 0.001: # A winning trade self.consecutive_losses = 0 else: # Breakeven trade self.consecutive_losses = 0 # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"LONG close order executed: {order}") logger.info(f"LONG position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${fees:.3f}") return True else: logger.error("Failed to place LONG close order") return False except Exception as e: logger.error(f"Error closing LONG position: {e}") return False def _calculate_position_size(self, confidence: float, current_price: float) -> float: """Calculate position size - limited to 20% of account balance by default""" # Get account balance (simulation or real) account_balance = self._get_account_balance_for_sizing() # Get maximum position size limit (default 20% of balance) max_position_percentage = self.mexc_config.get('max_position_percentage', 0.20) max_position_value = account_balance * max_position_percentage # Calculate desired position size based on confidence # Scale by confidence: 70-100% of max position size based on confidence (0.7-1.0 range) confidence_multiplier = max(0.7, min(1.0, confidence)) desired_position_value = max_position_value * confidence_multiplier # Apply reduction based on consecutive losses (risk management) reduction_factor = self.mexc_config.get('consecutive_loss_reduction_factor', 0.8) adjusted_reduction_factor = reduction_factor ** self.consecutive_losses final_position_value = desired_position_value * adjusted_reduction_factor logger.debug(f"Position calculation: account=${account_balance:.2f}, " f"max_position=${max_position_value:.2f} ({max_position_percentage*100:.0f}%), " f"confidence_mult={confidence_multiplier:.2f}, " f"final_position=${final_position_value:.2f}, confidence={confidence:.2f}") return final_position_value def _get_account_balance_for_sizing(self) -> float: """Get account balance for position sizing calculations""" if self.simulation_mode: return self.simulation_balance else: # For live trading, get actual USDT/USDC balance try: balances = self.get_account_balance() usdt_balance = balances.get('USDT', {}).get('total', 0) usdc_balance = balances.get('USDC', {}).get('total', 0) return max(usdt_balance, usdc_balance) except Exception as e: logger.warning(f"Failed to get live account balance: {e}, using simulation default") return self.simulation_balance def _calculate_pnl_with_fees(self, entry_price: float, exit_price: float, quantity: float, side: str) -> Dict[str, float]: """Calculate PnL including trading fees (0.1% open + 0.1% close = 0.2% total)""" try: # Calculate position value position_value = entry_price * quantity # Calculate fees open_fee = position_value * self.trading_fees['open_fee_percent'] close_fee = (exit_price * quantity) * self.trading_fees['close_fee_percent'] total_fees = open_fee + close_fee # Calculate gross PnL (before fees) if side.upper() == 'LONG': gross_pnl = (exit_price - entry_price) * quantity else: # SHORT gross_pnl = (entry_price - exit_price) * quantity # Calculate net PnL (after fees) net_pnl = gross_pnl - total_fees # Calculate percentage returns gross_pnl_percent = (gross_pnl / position_value) * 100 net_pnl_percent = (net_pnl / position_value) * 100 fee_percent = (total_fees / position_value) * 100 return { 'gross_pnl': gross_pnl, 'net_pnl': net_pnl, 'total_fees': total_fees, 'open_fee': open_fee, 'close_fee': close_fee, 'gross_pnl_percent': gross_pnl_percent, 'net_pnl_percent': net_pnl_percent, 'fee_percent': fee_percent, 'position_value': position_value } except Exception as e: logger.error(f"Error calculating PnL with fees: {e}") return { 'gross_pnl': 0.0, 'net_pnl': 0.0, 'total_fees': 0.0, 'open_fee': 0.0, 'close_fee': 0.0, 'gross_pnl_percent': 0.0, 'net_pnl_percent': 0.0, 'fee_percent': 0.0, 'position_value': 0.0 } def _calculate_pivot_points(self, symbol: str) -> Dict[str, float]: """Calculate pivot points for the symbol using real market data""" try: from core.data_provider import DataProvider data_provider = DataProvider() # Get daily data for pivot calculation df = data_provider.get_historical_data(symbol, '1d', limit=2, refresh=True) if df is None or len(df) < 2: logger.warning(f"Insufficient data for pivot calculation for {symbol}") return {} # Use previous day's data for pivot calculation prev_day = df.iloc[-2] high = float(prev_day['high']) low = float(prev_day['low']) close = float(prev_day['close']) # Calculate pivot point pivot = (high + low + close) / 3 # Calculate support and resistance levels r1 = (2 * pivot) - low s1 = (2 * pivot) - high r2 = pivot + (high - low) s2 = pivot - (high - low) r3 = high + 2 * (pivot - low) s3 = low - 2 * (high - pivot) pivots = { 'pivot': pivot, 'r1': r1, 'r2': r2, 'r3': r3, 's1': s1, 's2': s2, 's3': s3, 'prev_high': high, 'prev_low': low, 'prev_close': close } logger.debug(f"Pivot points for {symbol}: P={pivot:.2f}, R1={r1:.2f}, S1={s1:.2f}") return pivots except Exception as e: logger.error(f"Error calculating pivot points for {symbol}: {e}") return {} def _get_pivot_signal_strength(self, symbol: str, current_price: float, action: str) -> float: """Get signal strength based on proximity to pivot points""" try: pivots = self._calculate_pivot_points(symbol) if not pivots: return 1.0 # Default strength if no pivots available pivot = pivots['pivot'] r1, r2, r3 = pivots['r1'], pivots['r2'], pivots['r3'] s1, s2, s3 = pivots['s1'], pivots['s2'], pivots['s3'] # Calculate distance to nearest pivot levels distances = { 'pivot': abs(current_price - pivot), 'r1': abs(current_price - r1), 'r2': abs(current_price - r2), 'r3': abs(current_price - r3), 's1': abs(current_price - s1), 's2': abs(current_price - s2), 's3': abs(current_price - s3) } # Find nearest level nearest_level = min(distances.keys(), key=lambda k: distances[k]) nearest_distance = distances[nearest_level] nearest_price = pivots[nearest_level] # Calculate signal strength based on action and pivot context strength = 1.0 if action == 'BUY': # Stronger buy signals near support levels if nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price: strength = 1.5 # Boost buy signals at support elif nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price: strength = 0.7 # Reduce buy signals at resistance elif action == 'SELL': # Stronger sell signals near resistance levels if nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price: strength = 1.5 # Boost sell signals at resistance elif nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price: strength = 0.7 # Reduce sell signals at support logger.debug(f"Pivot signal strength for {symbol} {action}: {strength:.2f} " f"(near {nearest_level} at ${nearest_price:.2f}, current ${current_price:.2f})") return strength except Exception as e: logger.error(f"Error calculating pivot signal strength: {e}") return 1.0 def _get_current_price_from_data_provider(self, symbol: str) -> Optional[float]: """Get current price from data provider for most up-to-date information""" try: from core.data_provider import DataProvider data_provider = DataProvider() # Try to get real-time price first current_price = data_provider.get_current_price(symbol) if current_price and current_price > 0: return float(current_price) # Fallback to latest 1m candle df = data_provider.get_historical_data(symbol, '1m', limit=1, refresh=True) if df is not None and len(df) > 0: return float(df.iloc[-1]['close']) logger.warning(f"Could not get current price for {symbol} from data provider") return None except Exception as e: logger.error(f"Error getting current price from data provider for {symbol}: {e}") return None def _check_position_size_limit(self) -> bool: """Check if total open position value exceeds the maximum allowed percentage of balance""" try: # Get account balance account_balance = self._get_account_balance_for_sizing() # Get maximum position percentage (default 20%) max_position_percentage = self.mexc_config.get('max_position_percentage', 0.20) max_position_value = account_balance * max_position_percentage # Calculate total current position value total_position_value = 0.0 # ENHANCED: Also check exchange positions to ensure we don't miss any if not self.simulation_mode and self.exchange: try: exchange_positions = self.exchange.get_positions() if exchange_positions: for pos in exchange_positions: symbol = pos.get('symbol', '').replace('USDT', '/USDT') size = float(pos.get('size', 0)) entry_price = float(pos.get('entry_price', 0)) if size > 0 and symbol: # Check if this position is also in our local state if symbol not in self.positions: logger.warning(f"POSITION LIMIT: Found untracked exchange position for {symbol}: {size} @ ${entry_price:.2f}") # Add to total even if not in local state position_value = size * entry_price total_position_value += position_value logger.debug(f"Exchange position {symbol}: {size:.6f} @ ${entry_price:.2f} = ${position_value:.2f}") except Exception as e: logger.debug(f"Error checking exchange positions for limit: {e}") # Add existing local positions for symbol, position in self.positions.items(): # Get current price for the symbol try: if self.exchange: ticker = self.exchange.get_ticker(symbol) current_price = ticker['last'] if ticker and 'last' in ticker else position.entry_price else: # Simulation mode - use entry price or default current_price = position.entry_price except Exception: # Fallback to entry price if we can't get current price current_price = position.entry_price # Calculate position value position_value = position.quantity * current_price total_position_value += position_value logger.debug(f"Existing position {symbol}: {position.quantity:.6f} @ ${current_price:.2f} = ${position_value:.2f}") # Add potential value from open orders that could become positions if not self.simulation_mode and self.exchange: try: open_orders = self.exchange.get_open_orders() for order in open_orders: if order.get('side', '').lower() in ['buy', 'sell']: # Estimate the value if this order gets filled order_quantity = float(order.get('quantity', 0)) order_price = float(order.get('price', 0)) if order_price > 0: order_value = order_quantity * order_price total_position_value += order_value logger.debug(f"Open order {order.get('symbol')}: {order_quantity:.6f} @ ${order_price:.2f} = ${order_value:.2f}") except Exception as e: logger.debug(f"Error calculating open order values: {e}") # Check if we would exceed the limit if total_position_value >= max_position_value: logger.warning(f"Position size limit reached: ${total_position_value:.2f} >= ${max_position_value:.2f} ({max_position_percentage*100:.0f}% of balance)") return False logger.debug(f"Position size check passed: ${total_position_value:.2f} < ${max_position_value:.2f} ({max_position_percentage*100:.0f}% of balance)") return True except Exception as e: logger.error(f"Error checking position size limit: {e}") # Allow trade if we can't check the limit return True def update_positions(self, symbol: str, current_price: float): """Update position P&L with current market price""" if symbol in self.positions: with self.lock: # Get leverage configuration from primary exchange leverage_applied_by_exchange = False if hasattr(self, 'primary_config'): leverage_applied_by_exchange = self.primary_config.get('leverage_applied_by_exchange', False) # Get configured leverage leverage = 1.0 if hasattr(self, 'primary_config'): leverage = self.primary_config.get('leverage', 1.0) self.positions[symbol].calculate_pnl( current_price, leverage=leverage, leverage_applied_by_exchange=leverage_applied_by_exchange ) def get_positions(self) -> Dict[str, Position]: """Get current positions""" return self.positions.copy() def get_trade_history(self) -> List[TradeRecord]: """Get trade history""" return self.trade_history.copy() def get_daily_stats(self) -> Dict[str, Any]: """Get daily trading statistics with enhanced fee analysis""" total_pnl = sum(trade.pnl for trade in self.trade_history) total_fees = sum(trade.fees for trade in self.trade_history) gross_pnl = total_pnl + total_fees # P&L before fees winning_trades = len([t for t in self.trade_history if t.pnl > 0.001]) # Avoid rounding issues losing_trades = len([t for t in self.trade_history if t.pnl < -0.001]) # Avoid rounding issues total_trades = len(self.trade_history) breakeven_trades = total_trades - winning_trades - losing_trades # Calculate average trade values avg_trade_pnl = total_pnl / max(1, total_trades) avg_trade_fee = total_fees / max(1, total_trades) avg_winning_trade = sum(t.pnl for t in self.trade_history if t.pnl > 0.001) / max(1, winning_trades) avg_losing_trade = sum(t.pnl for t in self.trade_history if t.pnl < -0.001) / max(1, losing_trades) # Enhanced fee analysis from config fee_structure = self.mexc_config.get('trading_fees', {}) maker_fee_rate = fee_structure.get('maker', 0.0000) taker_fee_rate = fee_structure.get('taker', 0.0005) default_fee_rate = fee_structure.get('default', 0.0005) # Calculate fee efficiency total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history) effective_fee_rate = (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0 fee_impact_on_pnl = (total_fees / max(0.01, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0 return { 'daily_trades': self.daily_trades, 'daily_loss': self.daily_loss, 'total_pnl': total_pnl, 'gross_pnl': gross_pnl, 'total_fees': total_fees, 'winning_trades': winning_trades, 'losing_trades': losing_trades, 'breakeven_trades': breakeven_trades, 'total_trades': total_trades, 'win_rate': winning_trades / max(1, total_trades), 'avg_trade_pnl': avg_trade_pnl, 'avg_trade_fee': avg_trade_fee, 'avg_winning_trade': avg_winning_trade, 'avg_losing_trade': avg_losing_trade, 'positions_count': len(self.positions), 'fee_rates': { 'maker': f"{maker_fee_rate*100:.3f}%", 'taker': f"{taker_fee_rate*100:.3f}%", 'default': f"{default_fee_rate*100:.3f}%", 'effective': f"{effective_fee_rate*100:.3f}%" # Actual rate based on trades }, 'fee_analysis': { 'total_volume': total_volume, 'fee_impact_percent': fee_impact_on_pnl, 'is_fee_efficient': fee_impact_on_pnl < 5.0, # Less than 5% impact is good 'fee_savings_vs_market': (0.001 - effective_fee_rate) * total_volume if effective_fee_rate < 0.001 else 0 } } def emergency_stop(self): """Emergency stop all trading""" logger.warning("EMERGENCY STOP ACTIVATED") self.trading_enabled = False # Close all positions if in live mode if not self.dry_run: for symbol, position in self.positions.items(): try: if self.exchange: ticker = self.exchange.get_ticker(symbol) if ticker: self._execute_sell(symbol, 1.0, ticker['last']) else: # Simulation mode - use entry price for closing self._execute_sell(symbol, 1.0, position.entry_price) except Exception as e: logger.error(f"Error closing position {symbol} during emergency stop: {e}") def reset_daily_stats(self): """Reset daily statistics (call at start of new day)""" self.daily_trades = 0 self.daily_loss = 0.0 logger.info("Daily trading statistics reset") def get_account_balance(self) -> Dict[str, Dict[str, float]]: """Get account balance information from the primary exchange (universal method). Returns: Dict with asset balances in format: { 'USDT': {'free': 100.0, 'locked': 0.0, 'total': 100.0, 'type': 'spot'}, 'ETH': {'free': 0.5, 'locked': 0.0, 'total': 0.5, 'type': 'spot'}, ... } """ try: if not self.exchange: logger.error("Exchange interface not available") return {} # Use the universal get_all_balances method that works with all exchanges if hasattr(self.exchange, 'get_all_balances'): raw_balances = self.exchange.get_all_balances() if raw_balances: # Convert to the expected format with 'type' field combined_balances = {} for asset, balance_data in raw_balances.items(): if isinstance(balance_data, dict): combined_balances[asset] = { 'free': balance_data.get('free', 0.0), 'locked': balance_data.get('locked', 0.0), 'total': balance_data.get('total', 0.0), 'type': 'spot' # Default to spot for now } logger.info(f"Retrieved balances for {len(combined_balances)} assets from {self.primary_name}") return combined_balances else: logger.warning(f"No balances returned from {self.primary_name} exchange") return {} else: logger.error(f"Exchange {self.primary_name} does not support get_all_balances method") return {} except Exception as e: logger.error(f"Error getting account balance: {e}") return {} def _calculate_real_trading_fees(self, order_result: Dict[str, Any], symbol: str, quantity: float, price: float) -> float: """Calculate trading fees using real API data when available Args: order_result: Order result from exchange API symbol: Trading symbol quantity: Order quantity price: Execution price Returns: float: Trading fee amount in quote currency """ try: # Try to get actual fee from API response first if order_result and 'fills' in order_result: total_commission = 0.0 for fill in order_result['fills']: commission = float(fill.get('commission', 0)) total_commission += commission if total_commission > 0: logger.info(f"Using real API fee: {total_commission}") return total_commission # Fall back to config-based calculation return self._calculate_trading_fee(order_result, symbol, quantity, price) except Exception as e: logger.warning(f"Error calculating real fees: {e}, falling back to config-based") return self._calculate_trading_fee(order_result, symbol, quantity, price) def _calculate_trading_fee(self, order_result: Dict[str, Any], symbol: str, quantity: float, price: float) -> float: """Calculate trading fee based on order execution details with enhanced MEXC API support Args: order_result: Order result from exchange API symbol: Trading symbol quantity: Order quantity price: Execution price Returns: float: Trading fee amount in quote currency """ try: # 1. Try to get actual fee from API response (most accurate) # MEXC API can return fees in different formats depending on the endpoint # Check for 'fills' array (most common for filled orders) if order_result and 'fills' in order_result: total_commission = 0.0 commission_asset = None for fill in order_result['fills']: commission = float(fill.get('commission', 0)) commission_asset = fill.get('commissionAsset', '') total_commission += commission if total_commission > 0: logger.info(f"Using actual API fee from fills: {total_commission} {commission_asset}") # If commission is in different asset, we might need conversion # For now, assume it's in quote currency (USDC/USDT) return total_commission # 2. Check if order result has fee information directly fee_fields = ['fee', 'commission', 'tradeFee', 'fees'] for field in fee_fields: if order_result and field in order_result: fee = float(order_result[field]) if fee > 0: logger.info(f"Using API fee field '{field}': {fee}") return fee # 3. Check for executedQty and cummulativeQuoteQty for more accurate calculation if order_result and 'executedQty' in order_result and 'cummulativeQuoteQty' in order_result: executed_qty = float(order_result['executedQty']) executed_value = float(order_result['cummulativeQuoteQty']) if executed_qty > 0 and executed_value > 0: # Use executed values instead of provided price/quantity quantity = executed_qty price = executed_value / executed_qty logger.info(f"Using executed order data: {quantity} @ {price:.6f}") # 4. Fall back to config-based fee calculation with enhanced logic trading_fees = self.mexc_config.get('trading_fees', {}) # Determine if this was a maker or taker trade order_type = order_result.get('type', 'MARKET') if order_result else 'MARKET' order_status = order_result.get('status', 'UNKNOWN') if order_result else 'UNKNOWN' time_in_force = order_result.get('timeInForce', 'GTC') if order_result else 'GTC' # Enhanced maker/taker detection logic if order_type.upper() == 'LIMIT': # For limit orders, check execution speed and market conditions if order_status == 'FILLED': # If it's an IOC (Immediate or Cancel) order, it's likely a taker if time_in_force == 'IOC' or time_in_force == 'FOK': fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for {time_in_force} limit order: {fee_rate*100:.3f}%") else: # For GTC orders, assume taker if aggressive pricing is used # This is a heuristic based on our trading strategy fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for aggressive limit order: {fee_rate*100:.3f}%") else: # If not immediately filled, likely a maker (though we don't usually reach here) fee_rate = trading_fees.get('maker', 0.0000) logger.info(f"Using maker fee rate for pending/partial limit order: {fee_rate*100:.3f}%") elif order_type.upper() == 'LIMIT_MAKER': # LIMIT_MAKER orders are guaranteed to be makers fee_rate = trading_fees.get('maker', 0.0000) logger.info(f"Using maker fee rate for LIMIT_MAKER order: {fee_rate*100:.3f}%") else: # Market orders and other types are always takers fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for {order_type} order: {fee_rate*100:.3f}%") # Calculate fee amount trade_value = quantity * price fee_amount = trade_value * fee_rate logger.info(f"Calculated fee: ${fee_amount:.6f} ({fee_rate*100:.3f}% of ${trade_value:.2f})") return fee_amount except Exception as e: logger.warning(f"Error calculating trading fee: {e}") # Ultimate fallback using default rate default_fee_rate = self.mexc_config.get('trading_fees', {}).get('default', 0.0005) fallback_rate = self.mexc_config.get('trading_fee', default_fee_rate) # Legacy support fee_amount = quantity * price * fallback_rate logger.info(f"Using fallback fee: ${fee_amount:.6f} ({fallback_rate*100:.3f}%)") return fee_amount def get_fee_analysis(self) -> Dict[str, Any]: """Get detailed fee analysis and statistics Returns: Dict with fee breakdowns, rates, and impact analysis """ try: fee_structure = self.mexc_config.get('trading_fees', {}) maker_rate = fee_structure.get('maker', 0.0000) taker_rate = fee_structure.get('taker', 0.0005) default_rate = fee_structure.get('default', 0.0005) # Calculate total fees paid total_fees = sum(trade.fees for trade in self.trade_history) total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history) # Estimate fee breakdown (since we don't track maker vs taker separately) # Assume most of our limit orders are takers due to our pricing strategy estimated_taker_volume = total_volume * 0.9 # 90% taker assumption estimated_maker_volume = total_volume * 0.1 # 10% maker assumption estimated_taker_fees = estimated_taker_volume * taker_rate estimated_maker_fees = estimated_maker_volume * maker_rate # Fee impact analysis total_pnl = sum(trade.pnl for trade in self.trade_history) gross_pnl = total_pnl + total_fees fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0 return { 'fee_rates': { 'maker': { 'rate': maker_rate, 'rate_percent': f"{maker_rate*100:.3f}%" }, 'taker': { 'rate': taker_rate, 'rate_percent': f"{taker_rate*100:.3f}%" }, 'default': { 'rate': default_rate, 'rate_percent': f"{default_rate*100:.3f}%" } }, 'total_fees': total_fees, 'total_volume': total_volume, 'estimated_breakdown': { 'taker_fees': estimated_taker_fees, 'maker_fees': estimated_maker_fees, 'taker_volume': estimated_taker_volume, 'maker_volume': estimated_maker_volume }, 'impact_analysis': { 'fee_impact_percent': fee_impact_percent, 'pnl_after_fees': total_pnl, 'pnl_before_fees': gross_pnl, 'avg_fee_per_trade': total_fees / max(1, len(self.trade_history)) }, 'fee_efficiency': { 'volume_to_fee_ratio': total_volume / max(0.01, total_fees), 'is_efficient': fee_impact_percent < 5.0 # Less than 5% impact is good } } except Exception as e: logger.error(f"Error calculating fee analysis: {e}") return { 'error': str(e), 'fee_rates': { 'maker': {'rate': 0.0000, 'rate_percent': '0.000%'}, 'taker': {'rate': 0.0005, 'rate_percent': '0.050%'} } } def sync_fees_with_api(self, force: bool = False) -> Dict[str, Any]: """Manually trigger fee synchronization with MEXC API Args: force: Force sync even if last sync was recent Returns: dict: Sync result with status and details """ if not self.config_synchronizer: return { 'status': 'error', 'error': 'Config synchronizer not initialized' } try: logger.info("TRADING EXECUTOR: Manual fee sync requested") sync_result = self.config_synchronizer.sync_trading_fees(force=force) # If fees were updated, reload config if sync_result.get('changes_made'): logger.info("TRADING EXECUTOR: Reloading config after fee sync") self.config = get_config(self.config_synchronizer.config_path) # Update to use primary exchange config self.exchanges_config = self.config.get('exchanges', {}) self.primary_name = self.exchanges_config.get('primary', 'mexc') self.primary_config = self.exchanges_config.get(self.primary_name, {}) self.mexc_config = self.primary_config # Legacy compatibility return sync_result except Exception as e: logger.error(f"TRADING EXECUTOR: Error in manual fee sync: {e}") return { 'status': 'error', 'error': str(e) } def auto_sync_fees_if_needed(self) -> bool: """Automatically sync fees if needed (called periodically) Returns: bool: True if sync was performed successfully """ if not self.config_synchronizer: return False try: return self.config_synchronizer.auto_sync_fees() except Exception as e: logger.error(f"TRADING EXECUTOR: Error in auto fee sync: {e}") return False def get_fee_sync_status(self) -> Dict[str, Any]: """Get current fee synchronization status Returns: dict: Fee sync status and history """ if not self.config_synchronizer: return { 'sync_available': False, 'error': 'Config synchronizer not initialized' } try: status = self.config_synchronizer.get_sync_status() status['sync_available'] = True return status except Exception as e: logger.error(f"TRADING EXECUTOR: Error getting sync status: {e}") return { 'sync_available': False, 'error': str(e) } def execute_trade(self, symbol: str, action: str, quantity: float) -> bool: """Execute a trade directly (compatibility method for dashboard) Args: symbol: Trading symbol (e.g., 'ETH/USDT') action: Trading action ('BUY', 'SELL') quantity: Quantity to trade Returns: bool: True if trade executed successfully """ try: # Get current price current_price = None # Always get real current price - never use simulated data current_price = self._get_real_current_price(symbol) if current_price is None: logger.error(f"Failed to get real current price for {symbol}") return False # Calculate confidence based on manual trade (high confidence) confidence = 1.0 # Execute using the existing signal execution method return self.execute_signal(symbol, action, confidence, current_price) except Exception as e: logger.error(f"Error executing trade {action} for {symbol}: {e}") return False def get_closed_trades(self) -> List[Dict[str, Any]]: """Get closed trades in dashboard format""" try: trades = [] for trade in self.trade_history: trade_dict = { 'symbol': trade.symbol, 'side': trade.side, 'quantity': trade.quantity, 'entry_price': trade.entry_price, 'exit_price': trade.exit_price, 'entry_time': trade.entry_time, 'exit_time': trade.exit_time, 'pnl': trade.pnl, 'fees': trade.fees, 'confidence': trade.confidence, 'hold_time_seconds': trade.hold_time_seconds } trades.append(trade_dict) return trades except Exception as e: logger.error(f"Error getting closed trades: {e}") return [] def get_current_position(self, symbol: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get current position for a symbol or all positions Args: symbol: Optional symbol to get position for. If None, returns first position. Returns: dict: Position information or None if no position """ try: if symbol: if symbol in self.positions: pos = self.positions[symbol] return { 'symbol': pos.symbol, 'side': pos.side, 'size': pos.quantity, 'price': pos.entry_price, 'entry_time': pos.entry_time, 'unrealized_pnl': pos.unrealized_pnl } return None else: # Return first position if no symbol specified if self.positions: first_symbol = list(self.positions.keys())[0] return self.get_current_position(first_symbol) return None except Exception as e: logger.error(f"Error getting current position: {e}") return None def get_position(self, symbol: str) -> Optional[Dict[str, Any]]: """Get position for a symbol (alias for get_current_position for compatibility) Args: symbol: Trading symbol to get position for Returns: dict: Position information with 'side' key or None if no position """ return self.get_current_position(symbol) def get_leverage(self) -> float: """Get current leverage setting""" return self.mexc_config.get('leverage', 50.0) def set_leverage(self, leverage: float) -> bool: """Set leverage (for UI control) Args: leverage: New leverage value Returns: bool: True if successful """ try: # Update in-memory config self.mexc_config['leverage'] = leverage logger.info(f"TRADING EXECUTOR: Leverage updated to {leverage}x") return True except Exception as e: logger.error(f"Error setting leverage: {e}") return False def get_account_info(self) -> Dict[str, Any]: """Get account information for UI display""" try: account_balance = self._get_account_balance_for_sizing() leverage = self.get_leverage() return { 'account_balance': account_balance, 'leverage': leverage, 'trading_mode': self.trading_mode, 'simulation_mode': self.simulation_mode, 'trading_enabled': self.trading_enabled, 'position_sizing': { 'base_percent': self.mexc_config.get('base_position_percent', 5.0), 'max_percent': self.mexc_config.get('max_position_percent', 20.0), 'min_percent': self.mexc_config.get('min_position_percent', 2.0) } } except Exception as e: logger.error(f"Error getting account info: {e}") return { 'account_balance': 100.0, 'leverage': 50.0, 'trading_mode': 'simulation', 'simulation_mode': True, 'trading_enabled': False, 'position_sizing': { 'base_percent': 5.0, 'max_percent': 20.0, 'min_percent': 2.0 } } def set_test_mode(self, enabled: bool = True): """Enable test mode to bypass safety checks for testing""" self._test_mode = enabled if enabled: logger.info("TRADING EXECUTOR: Test mode enabled - bypassing safety checks") else: logger.info("TRADING EXECUTOR: Test mode disabled - normal safety checks active") def set_trading_mode(self, mode: str) -> bool: """Set trading mode (simulation/live) and update all related settings Args: mode: Trading mode ('simulation' or 'live') Returns: bool: True if mode was set successfully """ try: if mode not in ['simulation', 'live']: logger.error(f"Invalid trading mode: {mode}. Must be 'simulation' or 'live'") return False # Store original mode if not already stored if not hasattr(self, 'original_trading_mode'): self.original_trading_mode = self.trading_mode # Update trading mode self.trading_mode = mode self.simulation_mode = (mode == 'simulation') # Update primary config if available if hasattr(self, 'primary_config') and self.primary_config: self.primary_config['trading_mode'] = mode # Log the change if mode == 'live': logger.warning("TRADING EXECUTOR: MODE CHANGED TO LIVE - Real orders will be executed!") else: logger.info("TRADING EXECUTOR: MODE CHANGED TO SIMULATION - Orders are simulated") return True except Exception as e: logger.error(f"Error setting trading mode to {mode}: {e}") return False def get_status(self) -> Dict[str, Any]: """Get trading executor status with safety feature information""" try: # Get account balance if self.simulation_mode: balance = self.simulation_balance else: balance = self.exchange.get_balance('USDT') if self.exchange else 0.0 # Get open positions positions = self.get_positions() # Calculate total fees paid total_fees = sum(trade.fees for trade in self.trade_history) total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history) # Estimate fee breakdown (since we don't track maker vs taker separately) maker_fee_rate = self.exchange_config.get('maker_fee', 0.0002) taker_fee_rate = self.exchange_config.get('taker_fee', 0.0006) avg_fee_rate = (maker_fee_rate + taker_fee_rate) / 2 # Fee impact analysis total_pnl = sum(trade.pnl for trade in self.trade_history) gross_pnl = total_pnl + total_fees fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0 # Calculate success rate for recent trades recent_trades = self.trade_history[-self.trades_to_evaluate:] if len(self.trade_history) >= self.trades_to_evaluate else self.trade_history winning_trades = sum(1 for trade in recent_trades if trade.pnl > 0.001) if recent_trades else 0 success_rate = (winning_trades / len(recent_trades)) if recent_trades else 0 # Safety feature status safety_status = { 'active': self.safety_triggered, 'consecutive_losses': self.consecutive_losses, 'max_consecutive_losses': self.max_consecutive_losses, 'original_mode': self.original_trading_mode if self.safety_triggered else self.trading_mode, 'success_rate': success_rate, 'min_success_rate_to_reenable': self.min_success_rate_to_reenable, 'trades_evaluated': len(recent_trades), 'trades_needed': self.trades_to_evaluate, 'can_reenable': self._can_reenable_live_trading() if self.safety_triggered else False } return { 'trading_enabled': self.trading_enabled, 'simulation_mode': self.simulation_mode, 'trading_mode': self.trading_mode, 'balance': balance, 'positions': len(positions), 'daily_trades': self.daily_trades, 'daily_pnl': self.daily_pnl, 'daily_loss': self.daily_loss, 'consecutive_losses': self.consecutive_losses, 'total_trades': len(self.trade_history), 'safety_feature': safety_status, 'pnl': { 'total': total_pnl, 'gross': gross_pnl, 'fees': total_fees, 'fee_impact_percent': fee_impact_percent, 'pnl_after_fees': total_pnl, 'pnl_before_fees': gross_pnl, 'avg_fee_per_trade': total_fees / max(1, len(self.trade_history)) }, 'fee_efficiency': { 'total_volume': total_volume, 'total_fees': total_fees, 'effective_fee_rate': (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0, 'expected_fee_rate': avg_fee_rate, 'fee_efficiency': (avg_fee_rate / ((total_fees / max(0.01, total_volume)) if total_volume > 0 else 1)) if avg_fee_rate > 0 else 0 } } except Exception as e: logger.error(f"Error getting trading executor status: {e}") return { 'trading_enabled': self.trading_enabled, 'simulation_mode': self.simulation_mode, 'trading_mode': self.trading_mode, 'error': str(e) } def sync_position_with_mexc(self, symbol: str, desired_state: str) -> bool: """Synchronize dashboard position state with actual MEXC account positions Args: symbol: Trading symbol (e.g., 'ETH/USDT') desired_state: Desired position state ('NO_POSITION', 'LONG', 'SHORT') Returns: bool: True if synchronization successful """ try: logger.info(f"POSITION SYNC: Starting sync for {symbol} - desired state: {desired_state}") if self.simulation_mode: logger.info("POSITION SYNC: Simulation mode - skipping MEXC account sync") return True # Step 1: Cancel all pending orders for the symbol cancelled_orders = self._cancel_open_orders(symbol) if cancelled_orders > 0: logger.info(f"POSITION SYNC: Cancelled {cancelled_orders} pending orders for {symbol}") time.sleep(1) # Wait for cancellations to process # Step 2: Get current MEXC account balances and positions current_balances = self._get_mexc_account_balances() current_holdings = self._get_current_holdings(symbol, current_balances) # Step 3: Determine current position state from MEXC account current_state = self._determine_position_state(symbol, current_holdings) logger.info(f"POSITION SYNC: Current MEXC state: {current_state}, Holdings: {current_holdings}") # Step 4: If states match, no action needed if current_state == desired_state: logger.info(f"POSITION SYNC: States already match ({current_state}) - no action needed") return True # Step 5: Execute corrective trades based on state mismatch return self._execute_corrective_trades(symbol, current_state, desired_state, current_holdings) except Exception as e: logger.error(f"POSITION SYNC ERROR: Failed to sync {symbol}: {e}") import traceback logger.error(f"POSITION SYNC: Full traceback: {traceback.format_exc()}") return False def _get_mexc_account_balances(self) -> Dict[str, Dict[str, float]]: """Get current MEXC account balances""" try: return self.exchange.get_all_balances() except Exception as e: logger.error(f"Failed to get MEXC account balances: {e}") return {} def _get_current_holdings(self, symbol: str, balances: Dict[str, Dict[str, float]]) -> Dict[str, Any]: """Extract current holdings for the symbol from account balances""" try: # Parse symbol to get base and quote assets if '/' in symbol: base_asset, quote_asset = symbol.split('/') else: # Handle symbols like ETHUSDT if symbol.upper().endswith('USDT'): base_asset = symbol[:-4] quote_asset = 'USDT' elif symbol.upper().endswith('USDC'): base_asset = symbol[:-4] quote_asset = 'USDC' else: logger.error(f"Cannot parse symbol: {symbol}") return {'base': 0.0, 'quote': 0.0, 'base_asset': 'UNKNOWN', 'quote_asset': 'UNKNOWN'} base_asset = base_asset.upper() quote_asset = quote_asset.upper() # Get balances for base and quote assets base_balance = balances.get(base_asset, {}).get('total', 0.0) quote_balance = balances.get(quote_asset, {}).get('total', 0.0) # Also check USDC if quote is USDT (MEXC uses USDC for trading) if quote_asset == 'USDT': usdc_balance = balances.get('USDC', {}).get('total', 0.0) quote_balance = max(quote_balance, usdc_balance) return { 'base': base_balance, 'quote': quote_balance, 'base_asset': base_asset, # Note: This contains string values but method returns Dict[str, float] 'quote_asset': quote_asset # We'll handle this in the calling method } except Exception as e: logger.error(f"Error getting current holdings for {symbol}: {e}") return {'base': 0.0, 'quote': 0.0, 'base_asset': 'UNKNOWN', 'quote_asset': 'UNKNOWN'} def _determine_position_state(self, symbol: str, holdings: Dict[str, Any]) -> str: """Determine position state from current holdings""" try: base_balance = holdings.get('base', 0.0) quote_balance = holdings.get('quote', 0.0) # Minimum balance thresholds (to ignore dust) min_base_threshold = 0.001 # 0.001 ETH minimum min_quote_threshold = 1.0 # $1 minimum has_base = base_balance >= min_base_threshold has_quote = quote_balance >= min_quote_threshold if has_base and not has_quote: return 'LONG' # Holding crypto asset elif not has_base and has_quote: return 'SHORT' # Holding only fiat (after selling crypto) elif has_base and has_quote: # Mixed holdings - determine which is larger try: current_price = self._get_current_price_for_sync(symbol) if current_price: base_value = base_balance * current_price if base_value > quote_balance * 1.5: # 50% threshold return 'LONG' else: return 'SHORT' except: return 'LONG' # Default to LONG if price unavailable else: return 'NO_POSITION' # No significant holdings except Exception as e: logger.error(f"Error determining position state: {e}") return 'NO_POSITION' def _get_current_price_for_sync(self, symbol: str) -> Optional[float]: """Get current price for position synchronization""" try: if self.exchange: ticker = self.exchange.get_ticker(symbol) if ticker and 'last' in ticker: return float(ticker['last']) else: # Get real current price - never use simulated data return self._get_real_current_price(symbol) return None except Exception as e: logger.error(f"Error getting current price for sync: {e}") return None def _execute_corrective_trades(self, symbol: str, current_state: str, desired_state: str, holdings: Dict[str, float]) -> bool: """Execute trades to correct position state mismatch""" try: logger.info(f"CORRECTIVE TRADE: {current_state} -> {desired_state} for {symbol}") current_price = self._get_current_price_for_sync(symbol) if not current_price: logger.error("Cannot execute corrective trades without current price") return False base_balance = holdings.get('base', 0.0) quote_balance = holdings.get('quote', 0.0) base_asset = holdings.get('base_asset', 'ETH') if desired_state == 'NO_POSITION': # Need to sell all crypto holdings if base_balance > 0.001: # Minimum to avoid dust logger.info(f"CORRECTIVE: Selling {base_balance:.6f} {base_asset} to reach NO_POSITION") result = self._place_order_with_retry(symbol, 'SELL', 'LIMIT', base_balance, current_price * 0.999) if result: # Wait for order fill and update internal position tracking time.sleep(2) if symbol in self.positions: del self.positions[symbol] logger.info(f"CORRECTIVE: Successfully sold holdings for NO_POSITION") return True else: logger.error("CORRECTIVE: Failed to sell holdings") return False else: logger.info("CORRECTIVE: Already at NO_POSITION (no crypto holdings)") return True elif desired_state == 'LONG': # Need to buy crypto with available quote currency if quote_balance < 10.0: # Minimum order value logger.warning(f"CORRECTIVE: Insufficient quote balance ({quote_balance:.2f}) for LONG position") return False # Use 95% of quote balance for the trade (leaving some for fees) trade_amount = quote_balance * 0.95 quantity = trade_amount / current_price logger.info(f"CORRECTIVE: Buying {quantity:.6f} {base_asset} with ${trade_amount:.2f} for LONG position") result = self._place_order_with_retry(symbol, 'BUY', 'LIMIT', quantity, current_price * 1.001) if result: # Update internal position tracking time.sleep(2) self.positions[symbol] = Position( symbol=symbol, side='LONG', quantity=quantity, entry_price=current_price, entry_time=datetime.now(), order_id=result.get('orderId', f"corrective_{int(time.time())}") ) logger.info(f"CORRECTIVE: Successfully established LONG position") return True else: logger.error("CORRECTIVE: Failed to buy for LONG position") return False elif desired_state == 'SHORT': # Need to sell crypto holdings to get to cash-only position if base_balance > 0.001: logger.info(f"CORRECTIVE: Selling {base_balance:.6f} {base_asset} for SHORT position") result = self._place_order_with_retry(symbol, 'SELL', 'LIMIT', base_balance, current_price * 0.999) if result: # Update internal position tracking for SHORT time.sleep(2) # For spot trading, SHORT means we sold our crypto and are holding fiat # This is effectively being "short" the crypto asset self.positions[symbol] = Position( symbol=symbol, side='SHORT', quantity=base_balance, # Track the amount we sold entry_price=current_price, entry_time=datetime.now(), order_id=result.get('orderId', f"corrective_{int(time.time())}") ) logger.info(f"CORRECTIVE: Successfully established SHORT position") return True else: logger.error("CORRECTIVE: Failed to sell for SHORT position") return False else: logger.info("CORRECTIVE: Already in SHORT position (holding fiat only)") return True else: logger.error(f"CORRECTIVE: Unknown desired state: {desired_state}") return False except Exception as e: logger.error(f"Error executing corrective trades: {e}") import traceback logger.error(f"CORRECTIVE: Full traceback: {traceback.format_exc()}") return False def recalculate_all_trade_records(self): """Recalculate all existing trade records with correct leverage and PnL""" logger.info("Recalculating all trade records with correct leverage and PnL...") updated_count = 0 for i, trade in enumerate(self.trade_history): try: # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = trade.entry_price * trade.quantity # Calculate gross PnL (before fees) with leverage if trade.side == 'LONG': gross_pnl = (trade.exit_price - trade.entry_price) * trade.quantity * leverage else: # SHORT gross_pnl = (trade.entry_price - trade.exit_price) * trade.quantity * leverage # Calculate fees (0.1% open + 0.1% close = 0.2% total) entry_value = trade.entry_price * trade.quantity exit_value = trade.exit_price * trade.quantity fees = (entry_value + exit_value) * 0.001 # Calculate net PnL (after fees) net_pnl = gross_pnl - fees # Update trade record with corrected values trade.leverage = leverage trade.position_size_usd = position_size_usd trade.gross_pnl = gross_pnl trade.net_pnl = net_pnl trade.pnl = net_pnl # Main PnL field trade.fees = fees updated_count += 1 except Exception as e: logger.error(f"Error recalculating trade record {i}: {e}") continue logger.info(f"Updated {updated_count} trade records with correct leverage and PnL calculations") # Also update trade_records list if it exists if hasattr(self, 'trade_records') and self.trade_records: logger.info("Updating trade_records list...") for i, trade in enumerate(self.trade_records): try: # Get current leverage setting leverage = self.get_leverage() # Calculate position size in USD position_size_usd = trade.entry_price * trade.quantity # Calculate gross PnL (before fees) with leverage if trade.side == 'LONG': gross_pnl = (trade.exit_price - trade.entry_price) * trade.quantity * leverage else: # SHORT gross_pnl = (trade.entry_price - trade.exit_price) * trade.quantity * leverage # Calculate fees (0.1% open + 0.1% close = 0.2% total) entry_value = trade.entry_price * trade.quantity exit_value = trade.exit_price * trade.quantity fees = (entry_value + exit_value) * 0.001 # Calculate net PnL (after fees) net_pnl = gross_pnl - fees # Update trade record with corrected values trade.leverage = leverage trade.position_size_usd = position_size_usd trade.gross_pnl = gross_pnl trade.net_pnl = net_pnl trade.pnl = net_pnl # Main PnL field trade.fees = fees except Exception as e: logger.error(f"Error recalculating trade_records entry {i}: {e}") continue logger.info("Trade record recalculation completed")