""" Trading Executor for MEXC API Integration This module handles the execution of trading signals through the MEXC exchange API. It includes position management, risk controls, and safety features. """ import logging import time import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass from threading import Lock import sys # Add NN directory to path for exchange interfaces sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'NN')) from NN.exchanges import MEXCInterface from .config import get_config from .config_sync import ConfigSynchronizer logger = logging.getLogger(__name__) @dataclass class Position: """Represents an open trading position""" symbol: str side: str # 'LONG' or 'SHORT' quantity: float entry_price: float entry_time: datetime order_id: str unrealized_pnl: float = 0.0 def calculate_pnl(self, current_price: float) -> float: """Calculate unrealized P&L for the position""" if self.side == 'LONG': self.unrealized_pnl = (current_price - self.entry_price) * self.quantity else: # SHORT self.unrealized_pnl = (self.entry_price - current_price) * self.quantity return self.unrealized_pnl @dataclass class TradeRecord: """Record of a completed trade""" symbol: str side: str quantity: float entry_price: float exit_price: float entry_time: datetime exit_time: datetime pnl: float fees: float confidence: float class TradingExecutor: """Handles trade execution through MEXC API with risk management""" def __init__(self, config_path: str = "config.yaml"): """Initialize the trading executor""" self.config = get_config(config_path) self.mexc_config = self.config.get('mexc_trading', {}) # Initialize MEXC interface api_key = os.getenv('MEXC_API_KEY', self.mexc_config.get('api_key', '')) api_secret = os.getenv('MEXC_SECRET_KEY', self.mexc_config.get('api_secret', '')) # Determine trading mode from unified config trading_mode = self.mexc_config.get('trading_mode', 'simulation') # Map trading mode to exchange test_mode and execution mode if trading_mode == 'simulation': exchange_test_mode = True self.simulation_mode = True elif trading_mode == 'testnet': exchange_test_mode = True self.simulation_mode = False elif trading_mode == 'live': exchange_test_mode = False self.simulation_mode = False else: logger.warning(f"Unknown trading_mode '{trading_mode}', defaulting to simulation") exchange_test_mode = True self.simulation_mode = True self.exchange = MEXCInterface( api_key=api_key, api_secret=api_secret, test_mode=exchange_test_mode ) # Trading state self.positions: Dict[str, Position] = {} self.trade_history: List[TradeRecord] = [] self.daily_trades = 0 self.daily_loss = 0.0 self.last_trade_time = {} self.trading_enabled = self.mexc_config.get('enabled', False) self.trading_mode = trading_mode # Legacy compatibility (deprecated) self.dry_run = self.simulation_mode # Thread safety self.lock = Lock() # Connect to exchange if self.trading_enabled: self._connect_exchange() logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}") # Initialize config synchronizer for automatic fee updates self.config_synchronizer = ConfigSynchronizer( config_path=config_path, mexc_interface=self.exchange if self.trading_enabled else None ) # Perform initial fee sync on startup if trading is enabled if self.trading_enabled and self.exchange: try: logger.info("TRADING EXECUTOR: Performing initial fee synchronization with MEXC API") sync_result = self.config_synchronizer.sync_trading_fees(force=True) if sync_result.get('status') == 'success': logger.info("TRADING EXECUTOR: Fee synchronization completed successfully") if sync_result.get('changes_made'): logger.info(f"TRADING EXECUTOR: Fee changes applied: {list(sync_result['changes'].keys())}") # Reload config to get updated fees self.config = get_config(config_path) self.mexc_config = self.config.get('mexc_trading', {}) elif sync_result.get('status') == 'warning': logger.warning("TRADING EXECUTOR: Fee sync completed with warnings") else: logger.warning(f"TRADING EXECUTOR: Fee sync failed: {sync_result.get('status')}") except Exception as e: logger.warning(f"TRADING EXECUTOR: Initial fee sync failed: {e}") logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}") def _connect_exchange(self) -> bool: """Connect to the MEXC exchange""" try: connected = self.exchange.connect() if connected: logger.info("Successfully connected to MEXC exchange") return True else: logger.error("Failed to connect to MEXC exchange") if not self.dry_run: self.trading_enabled = False return False except Exception as e: logger.error(f"Error connecting to MEXC exchange: {e}") self.trading_enabled = False return False def execute_signal(self, symbol: str, action: str, confidence: float, current_price: float = None) -> bool: """Execute a trading signal Args: symbol: Trading symbol (e.g., 'ETH/USDT') action: Trading action ('BUY', 'SELL', 'HOLD') confidence: Confidence level (0.0 to 1.0) current_price: Current market price Returns: bool: True if trade executed successfully """ if not self.trading_enabled: logger.info(f"Trading disabled - Signal: {action} {symbol} (confidence: {confidence:.2f})") return False if action == 'HOLD': return True # Check safety conditions if not self._check_safety_conditions(symbol, action): return False # Get current price if not provided if current_price is None: ticker = self.exchange.get_ticker(symbol) if not ticker: logger.error(f"Failed to get current price for {symbol}") return False current_price = ticker['last'] with self.lock: try: if action == 'BUY': return self._execute_buy(symbol, confidence, current_price) elif action == 'SELL': return self._execute_sell(symbol, confidence, current_price) else: logger.warning(f"Unknown action: {action}") return False except Exception as e: logger.error(f"Error executing {action} signal for {symbol}: {e}") return False def _check_safety_conditions(self, symbol: str, action: str) -> bool: """Check if it's safe to execute a trade""" # Check if trading is stopped if self.mexc_config.get('emergency_stop', False): logger.warning("Emergency stop is active - no trades allowed") return False # Check symbol allowlist allowed_symbols = self.mexc_config.get('allowed_symbols', []) if allowed_symbols and symbol not in allowed_symbols: logger.warning(f"Symbol {symbol} not in allowed list: {allowed_symbols}") return False # Check daily loss limit max_daily_loss = self.mexc_config.get('max_daily_loss_usd', 5.0) if self.daily_loss >= max_daily_loss: logger.warning(f"Daily loss limit reached: ${self.daily_loss:.2f} >= ${max_daily_loss}") return False # Check daily trade limit max_daily_trades = self.mexc_config.get('max_trades_per_hour', 2) * 24 if self.daily_trades >= max_daily_trades: logger.warning(f"Daily trade limit reached: {self.daily_trades}") return False # Check trade interval min_interval = self.mexc_config.get('min_trade_interval_seconds', 300) last_trade = self.last_trade_time.get(symbol, datetime.min) if (datetime.now() - last_trade).total_seconds() < min_interval: logger.info(f"Trade interval not met for {symbol}") return False # Check concurrent positions max_positions = self.mexc_config.get('max_concurrent_positions', 1) if len(self.positions) >= max_positions and action == 'BUY': logger.warning(f"Maximum concurrent positions reached: {len(self.positions)}") return False return True def _execute_buy(self, symbol: str, confidence: float, current_price: float) -> bool: """Execute a buy order""" # Check if we already have a position if symbol in self.positions: logger.info(f"Already have position in {symbol}") return False # Calculate position size position_value = self._calculate_position_size(confidence, current_price) quantity = position_value / current_price logger.info(f"Executing BUY: {quantity:.6f} {symbol} at ${current_price:.2f} " f"(value: ${position_value:.2f}, confidence: {confidence:.2f})") if self.simulation_mode: logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Trade logged but not executed") # Create mock position for tracking self.positions[symbol] = Position( symbol=symbol, side='LONG', quantity=quantity, entry_price=current_price, entry_time=datetime.now(), order_id=f"sim_{int(time.time())}" ) self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 return True try: # Get order type from config order_type = self.mexc_config.get('order_type', 'market').lower() # For limit orders, set price slightly above market for immediate execution limit_price = None if order_type == 'limit': # Set buy price slightly above market to ensure immediate execution limit_price = current_price * 1.001 # 0.1% above market # Place buy order order = self.exchange.place_order( symbol=symbol, side='buy', order_type=order_type, quantity=quantity, price=limit_price ) if order: # Create position record self.positions[symbol] = Position( symbol=symbol, side='LONG', quantity=quantity, entry_price=current_price, entry_time=datetime.now(), order_id=order.get('orderId', 'unknown') ) self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"BUY order executed: {order}") return True else: logger.error("Failed to place BUY order") return False except Exception as e: logger.error(f"Error executing BUY order: {e}") return False def _execute_sell(self, symbol: str, confidence: float, current_price: float) -> bool: """Execute a sell order""" # Check if we have a position to sell if symbol not in self.positions: logger.info(f"No position to sell in {symbol}") return False position = self.positions[symbol] logger.info(f"Executing SELL: {position.quantity:.6f} {symbol} at ${current_price:.2f} " f"(confidence: {confidence:.2f})") if self.simulation_mode: logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Trade logged but not executed") # Calculate P&L pnl = position.calculate_pnl(current_price) # Create trade record trade_record = TradeRecord( symbol=symbol, side='LONG', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=datetime.now(), pnl=pnl, fees=0.0, confidence=confidence ) self.trade_history.append(trade_record) self.daily_loss += max(0, -pnl) # Add to daily loss if negative # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"Position closed - P&L: ${pnl:.2f}") return True try: # Get order type from config order_type = self.mexc_config.get('order_type', 'market').lower() # For limit orders, set price slightly below market for immediate execution limit_price = None if order_type == 'limit': # Set sell price slightly below market to ensure immediate execution limit_price = current_price * 0.999 # 0.1% below market # Place sell order order = self.exchange.place_order( symbol=symbol, side='sell', order_type=order_type, quantity=position.quantity, price=limit_price ) if order: # Calculate P&L pnl = position.calculate_pnl(current_price) fees = self._calculate_trading_fee(order, symbol, position.quantity, current_price) # Create trade record trade_record = TradeRecord( symbol=symbol, side='LONG', quantity=position.quantity, entry_price=position.entry_price, exit_price=current_price, entry_time=position.entry_time, exit_time=datetime.now(), pnl=pnl - fees, fees=fees, confidence=confidence ) self.trade_history.append(trade_record) self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative # Remove position del self.positions[symbol] self.last_trade_time[symbol] = datetime.now() self.daily_trades += 1 logger.info(f"SELL order executed: {order}") logger.info(f"Position closed - P&L: ${pnl - fees:.2f}") return True else: logger.error("Failed to place SELL order") return False except Exception as e: logger.error(f"Error executing SELL order: {e}") return False def _calculate_position_size(self, confidence: float, current_price: float) -> float: """Calculate position size based on configuration and confidence""" max_value = self.mexc_config.get('max_position_value_usd', 1.0) min_value = self.mexc_config.get('min_position_value_usd', 0.1) # Scale position size by confidence base_value = max_value * confidence position_value = max(min_value, min(base_value, max_value)) return position_value def update_positions(self, symbol: str, current_price: float): """Update position P&L with current market price""" if symbol in self.positions: with self.lock: self.positions[symbol].calculate_pnl(current_price) def get_positions(self) -> Dict[str, Position]: """Get current positions""" return self.positions.copy() def get_trade_history(self) -> List[TradeRecord]: """Get trade history""" return self.trade_history.copy() def get_daily_stats(self) -> Dict[str, Any]: """Get daily trading statistics with enhanced fee analysis""" total_pnl = sum(trade.pnl for trade in self.trade_history) total_fees = sum(trade.fees for trade in self.trade_history) gross_pnl = total_pnl + total_fees # P&L before fees winning_trades = len([t for t in self.trade_history if t.pnl > 0]) losing_trades = len([t for t in self.trade_history if t.pnl < 0]) total_trades = len(self.trade_history) # Calculate average trade values avg_trade_pnl = total_pnl / max(1, total_trades) avg_trade_fee = total_fees / max(1, total_trades) avg_winning_trade = sum(t.pnl for t in self.trade_history if t.pnl > 0) / max(1, winning_trades) avg_losing_trade = sum(t.pnl for t in self.trade_history if t.pnl < 0) / max(1, losing_trades) # Enhanced fee analysis from config fee_structure = self.mexc_config.get('trading_fees', {}) maker_fee_rate = fee_structure.get('maker', 0.0000) taker_fee_rate = fee_structure.get('taker', 0.0005) default_fee_rate = fee_structure.get('default', 0.0005) # Calculate fee efficiency total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history) effective_fee_rate = (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0 fee_impact_on_pnl = (total_fees / max(0.01, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0 return { 'daily_trades': self.daily_trades, 'daily_loss': self.daily_loss, 'total_pnl': total_pnl, 'gross_pnl': gross_pnl, 'total_fees': total_fees, 'winning_trades': winning_trades, 'losing_trades': losing_trades, 'total_trades': total_trades, 'win_rate': winning_trades / max(1, total_trades), 'avg_trade_pnl': avg_trade_pnl, 'avg_trade_fee': avg_trade_fee, 'avg_winning_trade': avg_winning_trade, 'avg_losing_trade': avg_losing_trade, 'positions_count': len(self.positions), 'fee_rates': { 'maker': f"{maker_fee_rate*100:.3f}%", 'taker': f"{taker_fee_rate*100:.3f}%", 'default': f"{default_fee_rate*100:.3f}%", 'effective': f"{effective_fee_rate*100:.3f}%" # Actual rate based on trades }, 'fee_analysis': { 'total_volume': total_volume, 'fee_impact_percent': fee_impact_on_pnl, 'is_fee_efficient': fee_impact_on_pnl < 5.0, # Less than 5% impact is good 'fee_savings_vs_market': (0.001 - effective_fee_rate) * total_volume if effective_fee_rate < 0.001 else 0 } } def emergency_stop(self): """Emergency stop all trading""" logger.warning("EMERGENCY STOP ACTIVATED") self.trading_enabled = False # Close all positions if in live mode if not self.dry_run: for symbol, position in self.positions.items(): try: ticker = self.exchange.get_ticker(symbol) if ticker: self._execute_sell(symbol, 1.0, ticker['last']) except Exception as e: logger.error(f"Error closing position {symbol} during emergency stop: {e}") def reset_daily_stats(self): """Reset daily statistics (call at start of new day)""" self.daily_trades = 0 self.daily_loss = 0.0 logger.info("Daily trading statistics reset") def get_account_balance(self) -> Dict[str, Dict[str, float]]: """Get account balance information from MEXC Returns: Dict with asset balances in format: { 'USDT': {'free': 100.0, 'locked': 0.0}, 'ETH': {'free': 0.5, 'locked': 0.0}, ... } """ try: if not self.exchange: logger.error("Exchange interface not available") return {} # Get account info from MEXC account_info = self.exchange.get_account_info() if not account_info: logger.error("Failed to get account info from MEXC") return {} balances = {} for balance in account_info.get('balances', []): asset = balance.get('asset', '') free = float(balance.get('free', 0)) locked = float(balance.get('locked', 0)) # Only include assets with non-zero balance if free > 0 or locked > 0: balances[asset] = { 'free': free, 'locked': locked, 'total': free + locked } logger.info(f"Retrieved balances for {len(balances)} assets") return balances except Exception as e: logger.error(f"Error getting account balance: {e}") return {} def _calculate_trading_fee(self, order_result: Dict[str, Any], symbol: str, quantity: float, price: float) -> float: """Calculate trading fee based on order execution details with enhanced MEXC API support Args: order_result: Order result from exchange API symbol: Trading symbol quantity: Order quantity price: Execution price Returns: float: Trading fee amount in quote currency """ try: # 1. Try to get actual fee from API response (most accurate) # MEXC API can return fees in different formats depending on the endpoint # Check for 'fills' array (most common for filled orders) if order_result and 'fills' in order_result: total_commission = 0.0 commission_asset = None for fill in order_result['fills']: commission = float(fill.get('commission', 0)) commission_asset = fill.get('commissionAsset', '') total_commission += commission if total_commission > 0: logger.info(f"Using actual API fee from fills: {total_commission} {commission_asset}") # If commission is in different asset, we might need conversion # For now, assume it's in quote currency (USDC/USDT) return total_commission # 2. Check if order result has fee information directly fee_fields = ['fee', 'commission', 'tradeFee', 'fees'] for field in fee_fields: if order_result and field in order_result: fee = float(order_result[field]) if fee > 0: logger.info(f"Using API fee field '{field}': {fee}") return fee # 3. Check for executedQty and cummulativeQuoteQty for more accurate calculation if order_result and 'executedQty' in order_result and 'cummulativeQuoteQty' in order_result: executed_qty = float(order_result['executedQty']) executed_value = float(order_result['cummulativeQuoteQty']) if executed_qty > 0 and executed_value > 0: # Use executed values instead of provided price/quantity quantity = executed_qty price = executed_value / executed_qty logger.info(f"Using executed order data: {quantity} @ {price:.6f}") # 4. Fall back to config-based fee calculation with enhanced logic trading_fees = self.mexc_config.get('trading_fees', {}) # Determine if this was a maker or taker trade order_type = order_result.get('type', 'MARKET') if order_result else 'MARKET' order_status = order_result.get('status', 'UNKNOWN') if order_result else 'UNKNOWN' time_in_force = order_result.get('timeInForce', 'GTC') if order_result else 'GTC' # Enhanced maker/taker detection logic if order_type.upper() == 'LIMIT': # For limit orders, check execution speed and market conditions if order_status == 'FILLED': # If it's an IOC (Immediate or Cancel) order, it's likely a taker if time_in_force == 'IOC' or time_in_force == 'FOK': fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for {time_in_force} limit order: {fee_rate*100:.3f}%") else: # For GTC orders, assume taker if aggressive pricing is used # This is a heuristic based on our trading strategy fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for aggressive limit order: {fee_rate*100:.3f}%") else: # If not immediately filled, likely a maker (though we don't usually reach here) fee_rate = trading_fees.get('maker', 0.0000) logger.info(f"Using maker fee rate for pending/partial limit order: {fee_rate*100:.3f}%") elif order_type.upper() == 'LIMIT_MAKER': # LIMIT_MAKER orders are guaranteed to be makers fee_rate = trading_fees.get('maker', 0.0000) logger.info(f"Using maker fee rate for LIMIT_MAKER order: {fee_rate*100:.3f}%") else: # Market orders and other types are always takers fee_rate = trading_fees.get('taker', 0.0005) logger.info(f"Using taker fee rate for {order_type} order: {fee_rate*100:.3f}%") # Calculate fee amount trade_value = quantity * price fee_amount = trade_value * fee_rate logger.info(f"Calculated fee: ${fee_amount:.6f} ({fee_rate*100:.3f}% of ${trade_value:.2f})") return fee_amount except Exception as e: logger.warning(f"Error calculating trading fee: {e}") # Ultimate fallback using default rate default_fee_rate = self.mexc_config.get('trading_fees', {}).get('default', 0.0005) fallback_rate = self.mexc_config.get('trading_fee', default_fee_rate) # Legacy support fee_amount = quantity * price * fallback_rate logger.info(f"Using fallback fee: ${fee_amount:.6f} ({fallback_rate*100:.3f}%)") return fee_amount def get_fee_analysis(self) -> Dict[str, Any]: """Get detailed fee analysis and statistics Returns: Dict with fee breakdowns, rates, and impact analysis """ try: fee_structure = self.mexc_config.get('trading_fees', {}) maker_rate = fee_structure.get('maker', 0.0000) taker_rate = fee_structure.get('taker', 0.0005) default_rate = fee_structure.get('default', 0.0005) # Calculate total fees paid total_fees = sum(trade.fees for trade in self.trade_history) total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history) # Estimate fee breakdown (since we don't track maker vs taker separately) # Assume most of our limit orders are takers due to our pricing strategy estimated_taker_volume = total_volume * 0.9 # 90% taker assumption estimated_maker_volume = total_volume * 0.1 # 10% maker assumption estimated_taker_fees = estimated_taker_volume * taker_rate estimated_maker_fees = estimated_maker_volume * maker_rate # Fee impact analysis total_pnl = sum(trade.pnl for trade in self.trade_history) gross_pnl = total_pnl + total_fees fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0 return { 'fee_rates': { 'maker': { 'rate': maker_rate, 'rate_percent': f"{maker_rate*100:.3f}%" }, 'taker': { 'rate': taker_rate, 'rate_percent': f"{taker_rate*100:.3f}%" }, 'default': { 'rate': default_rate, 'rate_percent': f"{default_rate*100:.3f}%" } }, 'total_fees': total_fees, 'total_volume': total_volume, 'estimated_breakdown': { 'taker_fees': estimated_taker_fees, 'maker_fees': estimated_maker_fees, 'taker_volume': estimated_taker_volume, 'maker_volume': estimated_maker_volume }, 'impact_analysis': { 'fee_impact_percent': fee_impact_percent, 'pnl_after_fees': total_pnl, 'pnl_before_fees': gross_pnl, 'avg_fee_per_trade': total_fees / max(1, len(self.trade_history)) }, 'fee_efficiency': { 'volume_to_fee_ratio': total_volume / max(0.01, total_fees), 'is_efficient': fee_impact_percent < 5.0 # Less than 5% impact is good } } except Exception as e: logger.error(f"Error calculating fee analysis: {e}") return { 'error': str(e), 'fee_rates': { 'maker': {'rate': 0.0000, 'rate_percent': '0.000%'}, 'taker': {'rate': 0.0005, 'rate_percent': '0.050%'} } } def sync_fees_with_api(self, force: bool = False) -> Dict[str, Any]: """Manually trigger fee synchronization with MEXC API Args: force: Force sync even if last sync was recent Returns: dict: Sync result with status and details """ if not self.config_synchronizer: return { 'status': 'error', 'error': 'Config synchronizer not initialized' } try: logger.info("TRADING EXECUTOR: Manual fee sync requested") sync_result = self.config_synchronizer.sync_trading_fees(force=force) # If fees were updated, reload config if sync_result.get('changes_made'): logger.info("TRADING EXECUTOR: Reloading config after fee sync") self.config = get_config(self.config_synchronizer.config_path) self.mexc_config = self.config.get('mexc_trading', {}) return sync_result except Exception as e: logger.error(f"TRADING EXECUTOR: Error in manual fee sync: {e}") return { 'status': 'error', 'error': str(e) } def auto_sync_fees_if_needed(self) -> bool: """Automatically sync fees if needed (called periodically) Returns: bool: True if sync was performed successfully """ if not self.config_synchronizer: return False try: return self.config_synchronizer.auto_sync_fees() except Exception as e: logger.error(f"TRADING EXECUTOR: Error in auto fee sync: {e}") return False def get_fee_sync_status(self) -> Dict[str, Any]: """Get current fee synchronization status Returns: dict: Fee sync status and history """ if not self.config_synchronizer: return { 'sync_available': False, 'error': 'Config synchronizer not initialized' } try: status = self.config_synchronizer.get_sync_status() status['sync_available'] = True return status except Exception as e: logger.error(f"TRADING EXECUTOR: Error getting sync status: {e}") return { 'sync_available': False, 'error': str(e) }