Files
gogo2/core/trading_executor.py

1600 lines
74 KiB
Python

"""
Trading Executor for MEXC API Integration
This module handles the execution of trading signals through the MEXC exchange API.
It includes position management, risk controls, and safety features.
https://github.com/mexcdevelop/mexc-api-postman/blob/main/MEXC%20V3.postman_collection.json
MEXC V3.postman_collection.json
"""
import logging
import time
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from threading import Lock, RLock
import threading
import time
import sys
# Add NN directory to path for exchange interfaces
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'NN'))
from NN.exchanges import MEXCInterface
from .config import get_config
from .config_sync import ConfigSynchronizer
logger = logging.getLogger(__name__)
@dataclass
class Position:
"""Represents an open trading position"""
symbol: str
side: str # 'LONG' or 'SHORT'
quantity: float
entry_price: float
entry_time: datetime
order_id: str
unrealized_pnl: float = 0.0
def calculate_pnl(self, current_price: float) -> float:
"""Calculate unrealized P&L for the position"""
if self.side == 'LONG':
self.unrealized_pnl = (current_price - self.entry_price) * self.quantity
else: # SHORT
self.unrealized_pnl = (self.entry_price - current_price) * self.quantity
return self.unrealized_pnl
@dataclass
class TradeRecord:
"""Record of a completed trade"""
symbol: str
side: str
quantity: float
entry_price: float
exit_price: float
entry_time: datetime
exit_time: datetime
pnl: float
fees: float
confidence: float
hold_time_seconds: float = 0.0 # Hold time in seconds
class TradingExecutor:
"""Handles trade execution through MEXC API with risk management"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialize the trading executor"""
self.config = get_config(config_path)
self.mexc_config = self.config.get('mexc_trading', {})
# Initialize MEXC interface
api_key = os.getenv('MEXC_API_KEY', self.mexc_config.get('api_key', ''))
api_secret = os.getenv('MEXC_SECRET_KEY', self.mexc_config.get('api_secret', ''))
# Determine trading mode from unified config
trading_mode = self.mexc_config.get('trading_mode', 'simulation')
# Map trading mode to exchange test_mode and execution mode
if trading_mode == 'simulation':
exchange_test_mode = True
self.simulation_mode = True
elif trading_mode == 'testnet':
exchange_test_mode = True
self.simulation_mode = False
elif trading_mode == 'live':
exchange_test_mode = False
self.simulation_mode = False
else:
logger.warning(f"Unknown trading_mode '{trading_mode}', defaulting to simulation")
exchange_test_mode = True
self.simulation_mode = True
self.exchange = MEXCInterface(
api_key=api_key,
api_secret=api_secret,
test_mode=exchange_test_mode,
)
# Trading state
self.positions: Dict[str, Position] = {}
self.trade_history: List[TradeRecord] = []
self.daily_trades = 0
self.daily_loss = 0.0
self.last_trade_time = {}
self.trading_enabled = self.mexc_config.get('enabled', False)
self.trading_mode = trading_mode
self.consecutive_losses = 0 # Track consecutive losing trades
logger.debug(f"TRADING EXECUTOR: Initial trading_enabled state from config: {self.trading_enabled}")
# Legacy compatibility (deprecated)
self.dry_run = self.simulation_mode
# Thread safety with timeout support
self.lock = RLock()
self.lock_timeout = 10.0 # 10 second timeout for order execution
# Connect to exchange
if self.trading_enabled:
logger.info("TRADING EXECUTOR: Attempting to connect to exchange...")
if not self._connect_exchange():
logger.error("TRADING EXECUTOR: Failed initial exchange connection. Trading will be disabled.")
self.trading_enabled = False
else:
logger.info("TRADING EXECUTOR: Trading is explicitly disabled in config.")
logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}")
# Initialize config synchronizer for automatic fee updates
self.config_synchronizer = ConfigSynchronizer(
config_path=config_path,
mexc_interface=self.exchange if self.trading_enabled else None
)
# Perform initial fee sync on startup if trading is enabled
if self.trading_enabled and self.exchange:
try:
logger.info("TRADING EXECUTOR: Performing initial fee synchronization with MEXC API")
sync_result = self.config_synchronizer.sync_trading_fees(force=True)
if sync_result.get('status') == 'success':
logger.info("TRADING EXECUTOR: Fee synchronization completed successfully")
if sync_result.get('changes_made'):
logger.info(f"TRADING EXECUTOR: Fee changes applied: {list(sync_result['changes'].keys())}")
# Reload config to get updated fees
self.config = get_config(config_path)
self.mexc_config = self.config.get('mexc_trading', {})
elif sync_result.get('status') == 'warning':
logger.warning("TRADING EXECUTOR: Fee sync completed with warnings")
else:
logger.warning(f"TRADING EXECUTOR: Fee sync failed: {sync_result.get('status')}")
except Exception as e:
logger.warning(f"TRADING EXECUTOR: Initial fee sync failed: {e}")
logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}")
def _connect_exchange(self) -> bool:
"""Connect to the MEXC exchange"""
try:
logger.debug("TRADING EXECUTOR: Calling self.exchange.connect()...")
connected = self.exchange.connect()
logger.debug(f"TRADING EXECUTOR: self.exchange.connect() returned: {connected}")
if connected:
logger.info("Successfully connected to MEXC exchange")
return True
else:
logger.error("Failed to connect to MEXC exchange: Connection returned False.")
if not self.dry_run:
logger.info("TRADING EXECUTOR: Setting trading_enabled to False due to connection failure.")
self.trading_enabled = False
return False
except Exception as e:
logger.error(f"Error connecting to MEXC exchange: {e}. Setting trading_enabled to False.")
self.trading_enabled = False
return False
def execute_signal(self, symbol: str, action: str, confidence: float,
current_price: Optional[float] = None) -> bool:
"""Execute a trading signal
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
action: Trading action ('BUY', 'SELL', 'HOLD')
confidence: Confidence level (0.0 to 1.0)
current_price: Current market price
Returns:
bool: True if trade executed successfully
"""
logger.debug(f"TRADING EXECUTOR: execute_signal called. trading_enabled: {self.trading_enabled}")
if not self.trading_enabled:
logger.info(f"Trading disabled - Signal: {action} {symbol} (confidence: {confidence:.2f}) - Reason: Trading executor is not enabled.")
return False
if action == 'HOLD':
return True
# Check safety conditions
if not self._check_safety_conditions(symbol, action):
return False
# Get current price if not provided
if current_price is None:
ticker = self.exchange.get_ticker(symbol)
if not ticker or 'last' not in ticker:
logger.error(f"Failed to get current price for {symbol} or ticker is malformed.")
return False
current_price = ticker['last']
# Assert that current_price is not None for type checking
assert current_price is not None, "current_price should not be None at this point"
# --- Balance check before executing trade (skip in simulation mode) ---
# Only perform balance check for live trading, not simulation
if not self.simulation_mode and (action == 'BUY' or (action == 'SELL' and symbol not in self.positions) or (action == 'SHORT')):
# Determine the quote asset (e.g., USDT, USDC) from the symbol
if '/' in symbol:
quote_asset = symbol.split('/')[1].upper() # Assuming symbol is like ETH/USDT
# Keep USDT as USDT for MEXC spot trading (no conversion needed)
else:
# Fallback for symbols like ETHUSDT (assuming last 4 chars are quote)
quote_asset = symbol[-4:].upper()
# Keep USDT as USDT for MEXC spot trading (no conversion needed)
# Calculate required capital for the trade
# If we are selling (to open a short position), we need collateral based on the position size
# For simplicity, assume required capital is the full position value in USD
required_capital = self._calculate_position_size(confidence, current_price)
# Get available balance for the quote asset (try USDT first, then USDC as fallback)
if quote_asset == 'USDT':
available_balance = self.exchange.get_balance('USDT')
if available_balance < required_capital:
# If USDT balance is insufficient, check USDC as fallback
usdc_balance = self.exchange.get_balance('USDC')
if usdc_balance >= required_capital:
available_balance = usdc_balance
quote_asset = 'USDC' # Use USDC instead
logger.info(f"BALANCE CHECK: Using USDC fallback balance for {symbol}")
else:
available_balance = self.exchange.get_balance(quote_asset)
logger.info(f"BALANCE CHECK: Symbol: {symbol}, Action: {action}, Required: ${required_capital:.2f} {quote_asset}, Available: ${available_balance:.2f} {quote_asset}")
# Allow some small precision tolerance (1 cent) and ensure sufficient balance
balance_tolerance = 0.01
if available_balance < (required_capital - balance_tolerance):
logger.warning(f"Trade blocked for {symbol} {action}: Insufficient {quote_asset} balance. "
f"Required: ${required_capital:.2f}, Available: ${available_balance:.2f}")
return False
else:
logger.info(f"BALANCE CHECK PASSED: {quote_asset} balance sufficient for trade")
elif self.simulation_mode:
logger.debug(f"SIMULATION MODE: Skipping balance check for {symbol} {action} - allowing trade for model training")
# --- End Balance check ---
# Try to acquire lock with timeout to prevent hanging
lock_acquired = self.lock.acquire(timeout=self.lock_timeout)
if not lock_acquired:
logger.error(f"TIMEOUT: Failed to acquire lock within {self.lock_timeout}s for {action} {symbol}")
logger.error(f"This indicates another operation is hanging. Cancelling {action} order.")
return False
try:
logger.info(f"LOCK ACQUIRED: Executing {action} for {symbol}")
start_time = time.time()
if action == 'BUY':
result = self._execute_buy(symbol, confidence, current_price)
elif action == 'SELL':
result = self._execute_sell(symbol, confidence, current_price)
elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal
result = self._execute_short(symbol, confidence, current_price)
else:
logger.warning(f"Unknown action: {action}")
result = False
execution_time = time.time() - start_time
logger.info(f"EXECUTION COMPLETE: {action} for {symbol} took {execution_time:.2f}s, result: {result}")
return result
except Exception as e:
logger.error(f"Error executing {action} signal for {symbol}: {e}")
return False
finally:
self.lock.release()
logger.debug(f"LOCK RELEASED: {action} for {symbol}")
def _cancel_open_orders(self, symbol: str) -> bool:
"""Cancel all open orders for a symbol before placing new orders"""
try:
logger.info(f"Checking for open orders to cancel for {symbol}")
open_orders = self.exchange.get_open_orders(symbol)
if open_orders and len(open_orders) > 0:
logger.info(f"Found {len(open_orders)} open orders for {symbol}, cancelling...")
for order in open_orders:
order_id = order.get('orderId')
if order_id:
try:
cancel_result = self.exchange.cancel_order(symbol, str(order_id))
if cancel_result:
logger.info(f"Successfully cancelled order {order_id} for {symbol}")
else:
logger.warning(f"Failed to cancel order {order_id} for {symbol}")
except Exception as e:
logger.error(f"Error cancelling order {order_id}: {e}")
# Wait a moment for cancellations to process
time.sleep(0.5)
return True
else:
logger.debug(f"No open orders found for {symbol}")
return True
except Exception as e:
logger.error(f"Error checking/cancelling open orders for {symbol}: {e}")
return False
def _check_safety_conditions(self, symbol: str, action: str) -> bool:
"""Check if it's safe to execute a trade"""
# Check if trading is stopped
if self.mexc_config.get('emergency_stop', False):
logger.warning("Emergency stop is active - no trades allowed")
return False
# Check symbol allowlist
allowed_symbols = self.mexc_config.get('allowed_symbols', [])
if allowed_symbols and symbol not in allowed_symbols:
logger.warning(f"Symbol {symbol} not in allowed list: {allowed_symbols}")
return False
# Check daily loss limit
max_daily_loss = self.mexc_config.get('max_daily_loss_usd', 5.0)
if self.daily_loss >= max_daily_loss:
logger.warning(f"Daily loss limit reached: ${self.daily_loss:.2f} >= ${max_daily_loss}")
return False
# Check daily trade limit
# max_daily_trades = self.mexc_config.get('max_daily_trades', 100)
# if self.daily_trades >= max_daily_trades:
# logger.warning(f"Daily trade limit reached: {self.daily_trades}")
# return False
# Check trade interval - allow bypass for test scenarios
min_interval = self.mexc_config.get('min_trade_interval_seconds', 5)
last_trade = self.last_trade_time.get(symbol, datetime.min)
time_since_last = (datetime.now() - last_trade).total_seconds()
# Allow bypass for high confidence sells (closing positions) or test scenarios
if time_since_last < min_interval:
# Allow immediate sells for closing positions or very high confidence (0.9+)
if action == 'SELL' and symbol in self.positions:
logger.info(f"Allowing immediate SELL to close position for {symbol}")
elif hasattr(self, '_test_mode') and self._test_mode:
logger.info(f"Test mode: bypassing trade interval for {symbol}")
else:
logger.info(f"Trade interval not met for {symbol} ({time_since_last:.1f}s < {min_interval}s)")
return False
# Check concurrent positions
max_positions = self.mexc_config.get('max_concurrent_positions', 1)
if len(self.positions) >= max_positions and action == 'BUY':
logger.warning(f"Maximum concurrent positions reached: {len(self.positions)}")
return False
return True
def _execute_buy(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a buy order"""
# Check if we have a short position to close
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'SHORT':
logger.info(f"Closing SHORT position in {symbol}")
return self._close_short_position(symbol, confidence, current_price)
else:
logger.info(f"Already have LONG position in {symbol}")
return False
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
# Calculate position size
position_size = self._calculate_position_size(confidence, current_price)
quantity = position_size / current_price
logger.info(f"Executing BUY: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]")
if self.simulation_mode:
# Create simulated position
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=quantity,
entry_price=current_price,
entry_time=datetime.now(),
order_id=f"sim_{int(datetime.now().timestamp())}"
)
logger.info(f"Simulated BUY order: {quantity:.6f} {symbol} at ${current_price:.2f}")
return True
else:
# Place real order with enhanced error handling
result = self._place_order_with_retry(symbol, 'BUY', 'MARKET', quantity, current_price)
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
fill_price = result.get('avgPrice', current_price)
# Only create position if order was actually filled
if result.get('filled', True): # Assume filled for backward compatibility
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=float(filled_quantity),
entry_price=float(fill_price),
entry_time=datetime.now(),
order_id=result['orderId']
)
logger.info(f"BUY position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}")
self.last_trade_time[symbol] = datetime.now()
return True
else:
logger.error(f"BUY order placed but not filled: {result}")
return False
else:
logger.error("Failed to place BUY order")
return False
def _execute_sell(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a sell order (close long position or open short position)"""
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'LONG':
logger.info(f"Closing LONG position in {symbol}")
return self._close_long_position(symbol, confidence, current_price)
else:
logger.info(f"Already have SHORT position in {symbol}")
return False
else:
# No position to sell, open short position
logger.info(f"No position to sell in {symbol}. Opening short position")
return self._execute_short(symbol, confidence, current_price)
def _execute_short(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a short order (sell without holding the asset)"""
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
# Calculate position size
position_size = self._calculate_position_size(confidence, current_price)
quantity = position_size / current_price
logger.info(f"Executing SHORT: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]")
if self.simulation_mode:
# Create simulated short position
self.positions[symbol] = Position(
symbol=symbol,
side='SHORT',
quantity=quantity,
entry_price=current_price,
entry_time=datetime.now(),
order_id=f"sim_{int(datetime.now().timestamp())}"
)
logger.info(f"Simulated SHORT order: {quantity:.6f} {symbol} at ${current_price:.2f}")
return True
else:
# Place real short order with enhanced error handling
result = self._place_order_with_retry(symbol, 'SELL', 'MARKET', quantity, current_price)
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
fill_price = result.get('avgPrice', current_price)
# Only create position if order was actually filled
if result.get('filled', True): # Assume filled for backward compatibility
self.positions[symbol] = Position(
symbol=symbol,
side='SHORT',
quantity=float(filled_quantity),
entry_price=float(fill_price),
entry_time=datetime.now(),
order_id=result['orderId']
)
logger.info(f"SHORT position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}")
self.last_trade_time[symbol] = datetime.now()
return True
else:
logger.error(f"SHORT order placed but not filled: {result}")
return False
else:
logger.error("Failed to place SHORT order")
return False
def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]:
"""Place order with retry logic for MEXC error handling"""
order_start_time = time.time()
max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout)
for attempt in range(max_retries):
# Check if we've exceeded the maximum order time
elapsed_time = time.time() - order_start_time
if elapsed_time > max_order_time:
logger.error(f"ORDER TIMEOUT: Order placement exceeded {max_order_time}s limit for {side} {symbol}")
logger.error(f"Elapsed time: {elapsed_time:.2f}s, cancelling order to prevent lock hanging")
return {}
try:
# For retries, use more aggressive pricing for LIMIT orders
order_price = current_price
if order_type.upper() == 'LIMIT' and attempt > 0:
# Increase aggressiveness with each retry
aggression_factor = 1 + (0.005 * (attempt + 1)) # 0.5%, 1.0%, 1.5% etc.
if side.upper() == 'BUY':
order_price = current_price * aggression_factor
else:
order_price = current_price / aggression_factor
logger.info(f"Retry {attempt + 1}: Using more aggressive price ${order_price:.4f} (vs market ${current_price:.4f})")
result = self.exchange.place_order(symbol, side, order_type, quantity, order_price)
# Check if result contains error information
if isinstance(result, dict) and 'error' in result:
error_type = result.get('error')
error_code = result.get('code')
error_msg = result.get('message', 'Unknown error')
if error_type == 'oversold' and error_code == 30005:
logger.warning(f"MEXC Oversold error on attempt {attempt + 1}/{max_retries}")
logger.warning(f"Error: {error_msg}")
if attempt < max_retries - 1:
# Calculate wait time but respect timeout limit
suggested_wait = result.get('retry_after', 60) * (2 ** attempt)
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if suggested_wait > remaining_time:
logger.warning(f"Oversold retry wait ({suggested_wait}s) exceeds timeout limit")
logger.warning(f"Remaining time: {remaining_time:.2f}s, skipping retry to prevent hanging")
return {}
logger.info(f"Waiting {suggested_wait} seconds before retry due to oversold condition...")
time.sleep(suggested_wait)
# Reduce quantity for next attempt to avoid oversold
quantity = quantity * 0.8 # Reduce by 20%
logger.info(f"Reducing quantity to {quantity:.6f} for retry")
continue
else:
logger.error(f"Max retries reached for oversold condition")
return {}
elif error_type == 'direction_not_allowed':
logger.error(f"Trading direction not allowed for {symbol} {side}")
return {}
elif error_type == 'insufficient_position':
logger.error(f"Insufficient position for {symbol} {side}")
return {}
else:
logger.error(f"MEXC API error: {error_code} - {error_msg}")
if attempt < max_retries - 1:
wait_time = 5 * (attempt + 1) # Wait 5, 10, 15 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"API error retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
# Success case - order placed
elif isinstance(result, dict) and ('orderId' in result or 'symbol' in result):
logger.info(f"Order placed successfully on attempt {attempt + 1}")
# For LIMIT orders, verify that the order actually fills
if order_type.upper() == 'LIMIT' and 'orderId' in result:
order_id = result['orderId']
filled_result = self._wait_for_order_fill(symbol, order_id, max_wait_time=5.0)
if filled_result['filled']:
logger.info(f"LIMIT order {order_id} filled successfully")
# Update result with fill information
result.update(filled_result)
return result
else:
logger.warning(f"LIMIT order {order_id} not filled within timeout, cancelling...")
# Cancel the unfilled order
try:
self.exchange.cancel_order(symbol, order_id)
logger.info(f"Cancelled unfilled order {order_id}")
except Exception as e:
logger.error(f"Failed to cancel unfilled order {order_id}: {e}")
# If this was the last attempt, return failure
if attempt == max_retries - 1:
return {}
# Try again with a more aggressive price
logger.info(f"Retrying with more aggressive LIMIT pricing...")
continue
else:
# MARKET orders or orders without orderId - assume immediate fill
return result
# Empty result - treat as failure
else:
logger.warning(f"Empty result on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = 2 * (attempt + 1) # Wait 2, 4, 6 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Empty result retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
except Exception as e:
logger.error(f"Exception on order attempt {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = 3 * (attempt + 1) # Wait 3, 6, 9 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Exception retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
logger.error(f"Failed to place order after {max_retries} attempts")
return {}
def _wait_for_order_fill(self, symbol: str, order_id: str, max_wait_time: float = 5.0) -> Dict[str, Any]:
"""Wait for a LIMIT order to fill and return fill status
Args:
symbol: Trading symbol
order_id: Order ID to monitor
max_wait_time: Maximum time to wait for fill in seconds
Returns:
dict: {'filled': bool, 'status': str, 'executedQty': float, 'avgPrice': float}
"""
start_time = time.time()
check_interval = 0.2 # Check every 200ms
while time.time() - start_time < max_wait_time:
try:
order_status = self.exchange.get_order_status(symbol, order_id)
if order_status and isinstance(order_status, dict):
status = order_status.get('status', '').upper()
executed_qty = float(order_status.get('executedQty', 0))
orig_qty = float(order_status.get('origQty', 0))
avg_price = float(order_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0
logger.debug(f"Order {order_id} status: {status}, executed: {executed_qty}/{orig_qty}")
if status == 'FILLED':
return {
'filled': True,
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'fillTime': time.time()
}
elif status in ['CANCELED', 'REJECTED', 'EXPIRED']:
return {
'filled': False,
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'reason': f'Order {status.lower()}'
}
elif status == 'PARTIALLY_FILLED':
# For partial fills, continue waiting but log progress
fill_percentage = (executed_qty / orig_qty * 100) if orig_qty > 0 else 0
logger.debug(f"Order {order_id} partially filled: {fill_percentage:.1f}%")
# Wait before next check
time.sleep(check_interval)
except Exception as e:
logger.error(f"Error checking order status for {order_id}: {e}")
time.sleep(check_interval)
# Timeout - check final status
try:
final_status = self.exchange.get_order_status(symbol, order_id)
if final_status:
status = final_status.get('status', '').upper()
executed_qty = float(final_status.get('executedQty', 0))
avg_price = float(final_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0
return {
'filled': status == 'FILLED',
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'reason': 'timeout' if status != 'FILLED' else None
}
except Exception as e:
logger.error(f"Error getting final order status for {order_id}: {e}")
return {
'filled': False,
'status': 'UNKNOWN',
'executedQty': 0,
'avgPrice': 0,
'reason': 'timeout_and_status_check_failed'
}
def _close_short_position(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Close a short position by buying"""
if symbol not in self.positions:
logger.warning(f"No position to close in {symbol}")
return False
position = self.positions[symbol]
if position.side != 'SHORT':
logger.warning(f"Position in {symbol} is not SHORT, cannot close with BUY")
return False
logger.info(f"Closing SHORT position: {position.quantity:.6f} {symbol} at ${current_price:.2f} "
f"(confidence: {confidence:.2f})")
if self.simulation_mode:
logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Short close logged but not executed")
# Calculate simulated fees in simulation mode
taker_fee_rate = self.mexc_config.get('trading_fees', {}).get('taker_fee', 0.0006)
simulated_fees = position.quantity * current_price * taker_fee_rate
# Calculate P&L for short position and hold time
pnl = position.calculate_pnl(current_price)
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl,
fees=simulated_fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
)
self.trade_history.append(trade_record)
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
# Update consecutive losses
if pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"Position closed - P&L: ${pnl:.2f}")
return True
try:
# Get order type from config
order_type = self.mexc_config.get('order_type', 'market').lower()
# For limit orders, set price slightly above market for immediate execution
limit_price = None
if order_type == 'limit':
# Set buy price slightly above market to ensure immediate execution
limit_price = current_price * 1.001 # 0.1% above market
# Place buy order to close short
if order_type == 'market':
order = self.exchange.place_order(
symbol=symbol,
side='buy', # Buy to close short position
order_type=order_type,
quantity=position.quantity
)
else:
# For limit orders, price is required
assert limit_price is not None, "limit_price required for limit orders"
order = self.exchange.place_order(
symbol=symbol,
side='buy', # Buy to close short position
order_type=order_type,
quantity=position.quantity,
price=limit_price
)
if order:
# Calculate simulated fees in simulation mode
taker_fee_rate = self.mexc_config.get('trading_fees', {}).get('taker_fee', 0.0006)
simulated_fees = position.quantity * current_price * taker_fee_rate
# Calculate P&L, fees, and hold time
pnl = position.calculate_pnl(current_price)
fees = simulated_fees
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl - fees,
fees=fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
)
self.trade_history.append(trade_record)
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
# Update consecutive losses
if pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"SHORT close order executed: {order}")
logger.info(f"SHORT position closed - P&L: ${pnl - fees:.2f}")
return True
else:
logger.error("Failed to place SHORT close order")
return False
except Exception as e:
logger.error(f"Error closing SHORT position: {e}")
return False
def _close_long_position(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Close a long position by selling"""
if symbol not in self.positions:
logger.warning(f"No position to close in {symbol}")
return False
position = self.positions[symbol]
if position.side != 'LONG':
logger.warning(f"Position in {symbol} is not LONG, cannot close with SELL")
return False
logger.info(f"Closing LONG position: {position.quantity:.6f} {symbol} at ${current_price:.2f} "
f"(confidence: {confidence:.2f})")
if self.simulation_mode:
logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Long close logged but not executed")
# Calculate simulated fees in simulation mode
taker_fee_rate = self.mexc_config.get('trading_fees', {}).get('taker_fee', 0.0006)
simulated_fees = position.quantity * current_price * taker_fee_rate
# Calculate P&L for long position and hold time
pnl = position.calculate_pnl(current_price)
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
trade_record = TradeRecord(
symbol=symbol,
side='LONG',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl,
fees=simulated_fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
)
self.trade_history.append(trade_record)
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
# Update consecutive losses
if pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"Position closed - P&L: ${pnl:.2f}")
return True
try:
# Get order type from config
order_type = self.mexc_config.get('order_type', 'market').lower()
# For limit orders, set price slightly below market for immediate execution
limit_price = None
if order_type == 'limit':
# Set sell price slightly below market to ensure immediate execution
limit_price = current_price * 0.999 # 0.1% below market
# Place sell order to close long
if order_type == 'market':
order = self.exchange.place_order(
symbol=symbol,
side='sell', # Sell to close long position
order_type=order_type,
quantity=position.quantity
)
else:
# For limit orders, price is required
assert limit_price is not None, "limit_price required for limit orders"
order = self.exchange.place_order(
symbol=symbol,
side='sell', # Sell to close long position
order_type=order_type,
quantity=position.quantity,
price=limit_price
)
if order:
# Calculate simulated fees in simulation mode
taker_fee_rate = self.mexc_config.get('trading_fees', {}).get('taker_fee', 0.0006)
simulated_fees = position.quantity * current_price * taker_fee_rate
# Calculate P&L, fees, and hold time
pnl = position.calculate_pnl(current_price)
fees = simulated_fees
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record
trade_record = TradeRecord(
symbol=symbol,
side='LONG',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=pnl - fees,
fees=fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds
)
self.trade_history.append(trade_record)
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
# Update consecutive losses
if pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"LONG close order executed: {order}")
logger.info(f"LONG position closed - P&L: ${pnl - fees:.2f}")
return True
else:
logger.error("Failed to place LONG close order")
return False
except Exception as e:
logger.error(f"Error closing LONG position: {e}")
return False
def _calculate_position_size(self, confidence: float, current_price: float) -> float:
"""Calculate position size - use 100% of account balance for short-term scalping"""
# Get account balance (simulation or real)
account_balance = self._get_account_balance_for_sizing()
# Use 100% of account balance since we're holding for seconds/minutes only
# Scale by confidence: 70-100% of balance based on confidence (0.7-1.0 range)
confidence_multiplier = max(0.7, min(1.0, confidence))
position_value = account_balance * confidence_multiplier
# Apply reduction based on consecutive losses (risk management)
reduction_factor = self.mexc_config.get('consecutive_loss_reduction_factor', 0.8)
adjusted_reduction_factor = reduction_factor ** self.consecutive_losses
position_value *= adjusted_reduction_factor
logger.debug(f"Position calculation: account=${account_balance:.2f}, "
f"confidence_mult={confidence_multiplier:.2f}, "
f"position=${position_value:.2f}, confidence={confidence:.2f}")
return position_value
def _get_account_balance_for_sizing(self) -> float:
"""Get account balance for position sizing calculations"""
if self.simulation_mode:
return self.mexc_config.get('simulation_account_usd', 100.0)
else:
# For live trading, get actual USDT/USDC balance
try:
balances = self.get_account_balance()
usdt_balance = balances.get('USDT', {}).get('total', 0)
usdc_balance = balances.get('USDC', {}).get('total', 0)
return max(usdt_balance, usdc_balance)
except Exception as e:
logger.warning(f"Failed to get live account balance: {e}, using simulation default")
return self.mexc_config.get('simulation_account_usd', 100.0)
def update_positions(self, symbol: str, current_price: float):
"""Update position P&L with current market price"""
if symbol in self.positions:
with self.lock:
self.positions[symbol].calculate_pnl(current_price)
def get_positions(self) -> Dict[str, Position]:
"""Get current positions"""
return self.positions.copy()
def get_trade_history(self) -> List[TradeRecord]:
"""Get trade history"""
return self.trade_history.copy()
def get_daily_stats(self) -> Dict[str, Any]:
"""Get daily trading statistics with enhanced fee analysis"""
total_pnl = sum(trade.pnl for trade in self.trade_history)
total_fees = sum(trade.fees for trade in self.trade_history)
gross_pnl = total_pnl + total_fees # P&L before fees
winning_trades = len([t for t in self.trade_history if t.pnl > 0.001]) # Avoid rounding issues
losing_trades = len([t for t in self.trade_history if t.pnl < -0.001]) # Avoid rounding issues
total_trades = len(self.trade_history)
breakeven_trades = total_trades - winning_trades - losing_trades
# Calculate average trade values
avg_trade_pnl = total_pnl / max(1, total_trades)
avg_trade_fee = total_fees / max(1, total_trades)
avg_winning_trade = sum(t.pnl for t in self.trade_history if t.pnl > 0.001) / max(1, winning_trades)
avg_losing_trade = sum(t.pnl for t in self.trade_history if t.pnl < -0.001) / max(1, losing_trades)
# Enhanced fee analysis from config
fee_structure = self.mexc_config.get('trading_fees', {})
maker_fee_rate = fee_structure.get('maker', 0.0000)
taker_fee_rate = fee_structure.get('taker', 0.0005)
default_fee_rate = fee_structure.get('default', 0.0005)
# Calculate fee efficiency
total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history)
effective_fee_rate = (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0
fee_impact_on_pnl = (total_fees / max(0.01, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0
return {
'daily_trades': self.daily_trades,
'daily_loss': self.daily_loss,
'total_pnl': total_pnl,
'gross_pnl': gross_pnl,
'total_fees': total_fees,
'winning_trades': winning_trades,
'losing_trades': losing_trades,
'breakeven_trades': breakeven_trades,
'total_trades': total_trades,
'win_rate': winning_trades / max(1, total_trades),
'avg_trade_pnl': avg_trade_pnl,
'avg_trade_fee': avg_trade_fee,
'avg_winning_trade': avg_winning_trade,
'avg_losing_trade': avg_losing_trade,
'positions_count': len(self.positions),
'fee_rates': {
'maker': f"{maker_fee_rate*100:.3f}%",
'taker': f"{taker_fee_rate*100:.3f}%",
'default': f"{default_fee_rate*100:.3f}%",
'effective': f"{effective_fee_rate*100:.3f}%" # Actual rate based on trades
},
'fee_analysis': {
'total_volume': total_volume,
'fee_impact_percent': fee_impact_on_pnl,
'is_fee_efficient': fee_impact_on_pnl < 5.0, # Less than 5% impact is good
'fee_savings_vs_market': (0.001 - effective_fee_rate) * total_volume if effective_fee_rate < 0.001 else 0
}
}
def emergency_stop(self):
"""Emergency stop all trading"""
logger.warning("EMERGENCY STOP ACTIVATED")
self.trading_enabled = False
# Close all positions if in live mode
if not self.dry_run:
for symbol, position in self.positions.items():
try:
ticker = self.exchange.get_ticker(symbol)
if ticker:
self._execute_sell(symbol, 1.0, ticker['last'])
except Exception as e:
logger.error(f"Error closing position {symbol} during emergency stop: {e}")
def reset_daily_stats(self):
"""Reset daily statistics (call at start of new day)"""
self.daily_trades = 0
self.daily_loss = 0.0
logger.info("Daily trading statistics reset")
def get_account_balance(self) -> Dict[str, Dict[str, float]]:
"""Get account balance information from MEXC, including spot and futures.
Returns:
Dict with asset balances in format:
{
'USDT': {'free': 100.0, 'locked': 0.0, 'total': 100.0, 'type': 'spot'},
'ETH': {'free': 0.5, 'locked': 0.0, 'total': 0.5, 'type': 'spot'},
'FUTURES_USDT': {'free': 500.0, 'locked': 50.0, 'total': 550.0, 'type': 'futures'}
...
}
"""
try:
if not self.exchange:
logger.error("Exchange interface not available")
return {}
combined_balances = {}
# 1. Get Spot Account Info
spot_account_info = self.exchange.get_account_info()
if spot_account_info and 'balances' in spot_account_info:
for balance in spot_account_info['balances']:
asset = balance.get('asset', '')
free = float(balance.get('free', 0))
locked = float(balance.get('locked', 0))
if free > 0 or locked > 0:
combined_balances[asset] = {
'free': free,
'locked': locked,
'total': free + locked,
'type': 'spot'
}
else:
logger.warning("Failed to get spot account info from MEXC or no balances found.")
# 2. Get Futures Account Info (commented out until futures API is implemented)
# futures_account_info = self.exchange.get_futures_account_info()
# if futures_account_info:
# for currency, asset_data in futures_account_info.items():
# # MEXC Futures API returns 'availableBalance' and 'frozenBalance'
# free = float(asset_data.get('availableBalance', 0))
# locked = float(asset_data.get('frozenBalance', 0))
# total = free + locked # total is the sum of available and frozen
# if free > 0 or locked > 0:
# # Prefix with 'FUTURES_' to distinguish from spot, or decide on a unified key
# # For now, let's keep them distinct for clarity
# combined_balances[f'FUTURES_{currency}'] = {
# 'free': free,
# 'locked': locked,
# 'total': total,
# 'type': 'futures'
# }
# else:
# logger.warning("Failed to get futures account info from MEXC or no futures assets found.")
logger.info(f"Retrieved combined balances for {len(combined_balances)} assets.")
return combined_balances
except Exception as e:
logger.error(f"Error getting account balance: {e}")
return {}
def _calculate_trading_fee(self, order_result: Dict[str, Any], symbol: str,
quantity: float, price: float) -> float:
"""Calculate trading fee based on order execution details with enhanced MEXC API support
Args:
order_result: Order result from exchange API
symbol: Trading symbol
quantity: Order quantity
price: Execution price
Returns:
float: Trading fee amount in quote currency
"""
try:
# 1. Try to get actual fee from API response (most accurate)
# MEXC API can return fees in different formats depending on the endpoint
# Check for 'fills' array (most common for filled orders)
if order_result and 'fills' in order_result:
total_commission = 0.0
commission_asset = None
for fill in order_result['fills']:
commission = float(fill.get('commission', 0))
commission_asset = fill.get('commissionAsset', '')
total_commission += commission
if total_commission > 0:
logger.info(f"Using actual API fee from fills: {total_commission} {commission_asset}")
# If commission is in different asset, we might need conversion
# For now, assume it's in quote currency (USDC/USDT)
return total_commission
# 2. Check if order result has fee information directly
fee_fields = ['fee', 'commission', 'tradeFee', 'fees']
for field in fee_fields:
if order_result and field in order_result:
fee = float(order_result[field])
if fee > 0:
logger.info(f"Using API fee field '{field}': {fee}")
return fee
# 3. Check for executedQty and cummulativeQuoteQty for more accurate calculation
if order_result and 'executedQty' in order_result and 'cummulativeQuoteQty' in order_result:
executed_qty = float(order_result['executedQty'])
executed_value = float(order_result['cummulativeQuoteQty'])
if executed_qty > 0 and executed_value > 0:
# Use executed values instead of provided price/quantity
quantity = executed_qty
price = executed_value / executed_qty
logger.info(f"Using executed order data: {quantity} @ {price:.6f}")
# 4. Fall back to config-based fee calculation with enhanced logic
trading_fees = self.mexc_config.get('trading_fees', {})
# Determine if this was a maker or taker trade
order_type = order_result.get('type', 'MARKET') if order_result else 'MARKET'
order_status = order_result.get('status', 'UNKNOWN') if order_result else 'UNKNOWN'
time_in_force = order_result.get('timeInForce', 'GTC') if order_result else 'GTC'
# Enhanced maker/taker detection logic
if order_type.upper() == 'LIMIT':
# For limit orders, check execution speed and market conditions
if order_status == 'FILLED':
# If it's an IOC (Immediate or Cancel) order, it's likely a taker
if time_in_force == 'IOC' or time_in_force == 'FOK':
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for {time_in_force} limit order: {fee_rate*100:.3f}%")
else:
# For GTC orders, assume taker if aggressive pricing is used
# This is a heuristic based on our trading strategy
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for aggressive limit order: {fee_rate*100:.3f}%")
else:
# If not immediately filled, likely a maker (though we don't usually reach here)
fee_rate = trading_fees.get('maker', 0.0000)
logger.info(f"Using maker fee rate for pending/partial limit order: {fee_rate*100:.3f}%")
elif order_type.upper() == 'LIMIT_MAKER':
# LIMIT_MAKER orders are guaranteed to be makers
fee_rate = trading_fees.get('maker', 0.0000)
logger.info(f"Using maker fee rate for LIMIT_MAKER order: {fee_rate*100:.3f}%")
else:
# Market orders and other types are always takers
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for {order_type} order: {fee_rate*100:.3f}%")
# Calculate fee amount
trade_value = quantity * price
fee_amount = trade_value * fee_rate
logger.info(f"Calculated fee: ${fee_amount:.6f} ({fee_rate*100:.3f}% of ${trade_value:.2f})")
return fee_amount
except Exception as e:
logger.warning(f"Error calculating trading fee: {e}")
# Ultimate fallback using default rate
default_fee_rate = self.mexc_config.get('trading_fees', {}).get('default', 0.0005)
fallback_rate = self.mexc_config.get('trading_fee', default_fee_rate) # Legacy support
fee_amount = quantity * price * fallback_rate
logger.info(f"Using fallback fee: ${fee_amount:.6f} ({fallback_rate*100:.3f}%)")
return fee_amount
def get_fee_analysis(self) -> Dict[str, Any]:
"""Get detailed fee analysis and statistics
Returns:
Dict with fee breakdowns, rates, and impact analysis
"""
try:
fee_structure = self.mexc_config.get('trading_fees', {})
maker_rate = fee_structure.get('maker', 0.0000)
taker_rate = fee_structure.get('taker', 0.0005)
default_rate = fee_structure.get('default', 0.0005)
# Calculate total fees paid
total_fees = sum(trade.fees for trade in self.trade_history)
total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history)
# Estimate fee breakdown (since we don't track maker vs taker separately)
# Assume most of our limit orders are takers due to our pricing strategy
estimated_taker_volume = total_volume * 0.9 # 90% taker assumption
estimated_maker_volume = total_volume * 0.1 # 10% maker assumption
estimated_taker_fees = estimated_taker_volume * taker_rate
estimated_maker_fees = estimated_maker_volume * maker_rate
# Fee impact analysis
total_pnl = sum(trade.pnl for trade in self.trade_history)
gross_pnl = total_pnl + total_fees
fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0
return {
'fee_rates': {
'maker': {
'rate': maker_rate,
'rate_percent': f"{maker_rate*100:.3f}%"
},
'taker': {
'rate': taker_rate,
'rate_percent': f"{taker_rate*100:.3f}%"
},
'default': {
'rate': default_rate,
'rate_percent': f"{default_rate*100:.3f}%"
}
},
'total_fees': total_fees,
'total_volume': total_volume,
'estimated_breakdown': {
'taker_fees': estimated_taker_fees,
'maker_fees': estimated_maker_fees,
'taker_volume': estimated_taker_volume,
'maker_volume': estimated_maker_volume
},
'impact_analysis': {
'fee_impact_percent': fee_impact_percent,
'pnl_after_fees': total_pnl,
'pnl_before_fees': gross_pnl,
'avg_fee_per_trade': total_fees / max(1, len(self.trade_history))
},
'fee_efficiency': {
'volume_to_fee_ratio': total_volume / max(0.01, total_fees),
'is_efficient': fee_impact_percent < 5.0 # Less than 5% impact is good
}
}
except Exception as e:
logger.error(f"Error calculating fee analysis: {e}")
return {
'error': str(e),
'fee_rates': {
'maker': {'rate': 0.0000, 'rate_percent': '0.000%'},
'taker': {'rate': 0.0005, 'rate_percent': '0.050%'}
}
}
def sync_fees_with_api(self, force: bool = False) -> Dict[str, Any]:
"""Manually trigger fee synchronization with MEXC API
Args:
force: Force sync even if last sync was recent
Returns:
dict: Sync result with status and details
"""
if not self.config_synchronizer:
return {
'status': 'error',
'error': 'Config synchronizer not initialized'
}
try:
logger.info("TRADING EXECUTOR: Manual fee sync requested")
sync_result = self.config_synchronizer.sync_trading_fees(force=force)
# If fees were updated, reload config
if sync_result.get('changes_made'):
logger.info("TRADING EXECUTOR: Reloading config after fee sync")
self.config = get_config(self.config_synchronizer.config_path)
self.mexc_config = self.config.get('mexc_trading', {})
return sync_result
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error in manual fee sync: {e}")
return {
'status': 'error',
'error': str(e)
}
def auto_sync_fees_if_needed(self) -> bool:
"""Automatically sync fees if needed (called periodically)
Returns:
bool: True if sync was performed successfully
"""
if not self.config_synchronizer:
return False
try:
return self.config_synchronizer.auto_sync_fees()
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error in auto fee sync: {e}")
return False
def get_fee_sync_status(self) -> Dict[str, Any]:
"""Get current fee synchronization status
Returns:
dict: Fee sync status and history
"""
if not self.config_synchronizer:
return {
'sync_available': False,
'error': 'Config synchronizer not initialized'
}
try:
status = self.config_synchronizer.get_sync_status()
status['sync_available'] = True
return status
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error getting sync status: {e}")
return {
'sync_available': False,
'error': str(e)
}
def execute_trade(self, symbol: str, action: str, quantity: float) -> bool:
"""Execute a trade directly (compatibility method for dashboard)
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
action: Trading action ('BUY', 'SELL')
quantity: Quantity to trade
Returns:
bool: True if trade executed successfully
"""
try:
# Get current price
current_price = None
ticker = self.exchange.get_ticker(symbol)
if ticker:
current_price = ticker['last']
else:
logger.error(f"Failed to get current price for {symbol}")
return False
# Calculate confidence based on manual trade (high confidence)
confidence = 1.0
# Execute using the existing signal execution method
return self.execute_signal(symbol, action, confidence, current_price)
except Exception as e:
logger.error(f"Error executing trade {action} for {symbol}: {e}")
return False
def get_closed_trades(self) -> List[Dict[str, Any]]:
"""Get closed trades in dashboard format"""
try:
trades = []
for trade in self.trade_history:
trade_dict = {
'symbol': trade.symbol,
'side': trade.side,
'quantity': trade.quantity,
'entry_price': trade.entry_price,
'exit_price': trade.exit_price,
'entry_time': trade.entry_time,
'exit_time': trade.exit_time,
'pnl': trade.pnl,
'fees': trade.fees,
'confidence': trade.confidence,
'hold_time_seconds': trade.hold_time_seconds
}
trades.append(trade_dict)
return trades
except Exception as e:
logger.error(f"Error getting closed trades: {e}")
return []
def get_current_position(self, symbol: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Get current position for a symbol or all positions
Args:
symbol: Optional symbol to get position for. If None, returns first position.
Returns:
dict: Position information or None if no position
"""
try:
if symbol:
if symbol in self.positions:
pos = self.positions[symbol]
return {
'symbol': pos.symbol,
'side': pos.side,
'size': pos.quantity,
'price': pos.entry_price,
'entry_time': pos.entry_time,
'unrealized_pnl': pos.unrealized_pnl
}
return None
else:
# Return first position if no symbol specified
if self.positions:
first_symbol = list(self.positions.keys())[0]
return self.get_current_position(first_symbol)
return None
except Exception as e:
logger.error(f"Error getting current position: {e}")
return None
def get_leverage(self) -> float:
"""Get current leverage setting"""
return self.mexc_config.get('leverage', 50.0)
def set_leverage(self, leverage: float) -> bool:
"""Set leverage (for UI control)
Args:
leverage: New leverage value
Returns:
bool: True if successful
"""
try:
# Update in-memory config
self.mexc_config['leverage'] = leverage
logger.info(f"TRADING EXECUTOR: Leverage updated to {leverage}x")
return True
except Exception as e:
logger.error(f"Error setting leverage: {e}")
return False
def get_account_info(self) -> Dict[str, Any]:
"""Get account information for UI display"""
try:
account_balance = self._get_account_balance_for_sizing()
leverage = self.get_leverage()
return {
'account_balance': account_balance,
'leverage': leverage,
'trading_mode': self.trading_mode,
'simulation_mode': self.simulation_mode,
'trading_enabled': self.trading_enabled,
'position_sizing': {
'base_percent': self.mexc_config.get('base_position_percent', 5.0),
'max_percent': self.mexc_config.get('max_position_percent', 20.0),
'min_percent': self.mexc_config.get('min_position_percent', 2.0)
}
}
except Exception as e:
logger.error(f"Error getting account info: {e}")
return {
'account_balance': 100.0,
'leverage': 50.0,
'trading_mode': 'simulation',
'simulation_mode': True,
'trading_enabled': False,
'position_sizing': {
'base_percent': 5.0,
'max_percent': 20.0,
'min_percent': 2.0
}
}
def set_test_mode(self, enabled: bool = True):
"""Enable test mode to bypass safety checks for testing"""
self._test_mode = enabled
if enabled:
logger.info("TRADING EXECUTOR: Test mode enabled - bypassing safety checks")
else:
logger.info("TRADING EXECUTOR: Test mode disabled - normal safety checks active")