Files
gogo2/core/trading_executor.py
2025-07-28 16:57:02 +03:00

3110 lines
146 KiB
Python

"""
Trading Executor for MEXC API Integration
This module handles the execution of trading signals through the MEXC exchange API.
It includes position management, risk controls, and safety features.
https://github.com/mexcdevelop/mexc-api-postman/blob/main/MEXC%20V3.postman_collection.json
MEXC V3.postman_collection.json
"""
import logging
import time
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from threading import Lock, RLock
import threading
import time
import sys
# Add NN directory to path for exchange interfaces
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'NN'))
from core.exchanges.exchange_factory import ExchangeFactory
from core.exchanges.exchange_interface import ExchangeInterface
from .config import get_config
from .config_sync import ConfigSynchronizer
logger = logging.getLogger(__name__)
@dataclass
class Position:
"""Represents an open trading position"""
symbol: str
side: str # 'LONG' or 'SHORT'
quantity: float
entry_price: float
entry_time: datetime
order_id: str
unrealized_pnl: float = 0.0
def calculate_pnl(self, current_price: float, leverage: float = 1.0, include_fees: bool = True, leverage_applied_by_exchange: bool = False) -> float:
"""Calculate unrealized P&L for the position with leverage and fees
Args:
current_price: Current market price
leverage: Leverage multiplier (default: 1.0)
include_fees: Whether to subtract fees from PnL (default: True)
leverage_applied_by_exchange: Whether leverage is already applied by broker (default: False)
Returns:
float: Unrealized PnL including leverage and fees
"""
# Calculate position value
position_value = self.entry_price * self.quantity
# Calculate base PnL
if self.side == 'LONG':
base_pnl = (current_price - self.entry_price) * self.quantity
else: # SHORT
base_pnl = (self.entry_price - current_price) * self.quantity
# Apply leverage only if not already applied by exchange
if leverage_applied_by_exchange:
# Broker already applies leverage, so use base PnL
leveraged_pnl = base_pnl
else:
# Apply leverage locally
leveraged_pnl = base_pnl * leverage
# Calculate fees (0.1% open + 0.1% close = 0.2% total)
fees = 0.0
if include_fees:
# Open fee already paid
open_fee = position_value * 0.001
# Close fee will be paid when position is closed
close_fee = (current_price * self.quantity) * 0.001
fees = open_fee + close_fee
# Final PnL after fees
self.unrealized_pnl = leveraged_pnl - fees
return self.unrealized_pnl
@dataclass
class TradeRecord:
"""Record of a completed trade"""
symbol: str
side: str
quantity: float
entry_price: float
exit_price: float
entry_time: datetime
exit_time: datetime
pnl: float
fees: float
confidence: float
hold_time_seconds: float = 0.0 # Hold time in seconds
leverage: float = 1.0 # Leverage used for the trade
position_size_usd: float = 0.0 # Position size in USD
gross_pnl: float = 0.0 # PnL before fees
net_pnl: float = 0.0 # PnL after fees
class TradingExecutor:
"""Handles trade execution through multiple exchange APIs with risk management"""
def __init__(self, config_path: str = "config.yaml"):
"""Initialize the trading executor"""
self.config = get_config(config_path)
self.exchanges_config = self.config.get('exchanges', {})
self.trading_config = self.config.get('trading', {})
# Initialize exchanges using factory
self.primary_exchange = ExchangeFactory.get_primary_exchange(self.exchanges_config)
self.all_exchanges = ExchangeFactory.create_multiple_exchanges(self.exchanges_config)
# Set primary exchange as main interface
self.exchange = self.primary_exchange
# Get primary exchange name and config first
primary_name = self.exchanges_config.get('primary', 'deribit')
primary_config = self.exchanges_config.get(primary_name, {})
# Determine trading and simulation modes
trading_mode = primary_config.get('trading_mode', 'simulation')
self.trading_enabled = self.trading_config.get('enabled', True)
self.simulation_mode = trading_mode == 'simulation'
if not self.exchange:
if self.simulation_mode:
logger.info("Failed to initialize primary exchange, but simulation mode is enabled - trading allowed")
else:
logger.error("Failed to initialize primary exchange and not in simulation mode - trading disabled")
self.trading_enabled = False
else:
logger.info(f"Trading Executor initialized with {primary_name} as primary exchange")
logger.info(f"Trading mode: {trading_mode}, Simulation: {self.simulation_mode}")
# Get primary exchange name and config
self.primary_name = self.exchanges_config.get('primary', 'mexc')
self.primary_config = self.exchanges_config.get(self.primary_name, {})
# Set exchange config for compatibility (replaces mexc_config)
self.exchange_config = self.primary_config
self.mexc_config = self.primary_config # Legacy compatibility
# Initialize config synchronizer with the primary exchange
self.config_sync = ConfigSynchronizer(
config_path=config_path,
mexc_interface=self.exchange if (self.trading_enabled and self.primary_name == 'mexc') else None
)
# Trading state management
self.current_position = {} # symbol -> position data
self.trade_history = []
self.daily_trades = 0
self.daily_pnl = 0.0
self.daily_loss = 0.0
self.last_trade_time = {}
self.consecutive_losses = 0 # Track consecutive losing trades
# Store trading mode for compatibility
self.trading_mode = self.primary_config.get('trading_mode', 'simulation')
# Safety feature: Auto-disable live trading after consecutive losses
self.max_consecutive_losses = 5 # Disable live trading after 5 consecutive losses
self.min_success_rate_to_reenable = 0.55 # Require 55% success rate to re-enable
self.trades_to_evaluate = 20 # Evaluate last 20 trades for success rate
self.original_trading_mode = self.trading_mode # Store original mode
self.safety_triggered = False # Track if safety feature was triggered
# Initialize session stats
self.session_start_time = datetime.now()
self.session_trades = 0
self.session_pnl = 0.0
# Position tracking
self.positions = {} # symbol -> Position object
self.trade_records = [] # List of TradeRecord objects
# Simulation balance tracking
self.simulation_balance = self.trading_config.get('simulation_account_usd', 100.0)
self.simulation_positions = {} # symbol -> position data with real entry prices
# Trading fees configuration (0.1% for both open and close - REVERTED TO NORMAL)
self.trading_fees = {
'open_fee_percent': 0.001, # 0.1% fee when opening position
'close_fee_percent': 0.001, # 0.1% fee when closing position
'total_round_trip_fee': 0.002 # 0.2% total for round trip
}
# Dynamic profitability reward parameter - starts at 0, adjusts based on success rate
self.profitability_reward_multiplier = 0.0 # Starts at 0, can be increased
self.min_profitability_multiplier = 0.0 # Minimum value
self.max_profitability_multiplier = 2.0 # Maximum 2x multiplier
self.profitability_adjustment_step = 0.1 # Adjust by 0.1 each time
# Success rate tracking for profitability adjustment
self.recent_trades_window = 20 # Look at last 20 trades
self.success_rate_increase_threshold = 0.60 # Increase multiplier if >60% success
self.success_rate_decrease_threshold = 0.51 # Decrease multiplier if <51% success
self.last_profitability_adjustment = datetime.now()
logger.info(f"TradingExecutor initialized - Trading: {self.trading_enabled}, Mode: {self.trading_mode}")
logger.info(f"Simulation balance: ${self.simulation_balance:.2f}")
# Legacy compatibility (deprecated)
self.dry_run = self.simulation_mode
# Thread safety with timeout support
self.lock = RLock()
self.lock_timeout = 10.0 # 10 second timeout for order execution
# Open order management
self.max_open_orders = 2 # Maximum number of open orders allowed
self.open_orders_count = 0 # Current count of open orders
# Rate limiting for open orders sync (30 seconds)
self.last_open_orders_sync = datetime.min
self.open_orders_sync_interval = 30 # seconds
# Trading symbols
self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT'])
# Connect to exchange
if self.trading_enabled:
if self.simulation_mode:
logger.info("TRADING EXECUTOR: Simulation mode - trading enabled without exchange connection")
else:
logger.info("TRADING EXECUTOR: Attempting to connect to exchange...")
if not self._connect_exchange():
logger.error("TRADING EXECUTOR: Failed initial exchange connection. Trading will be disabled.")
self.trading_enabled = False
else:
logger.info("TRADING EXECUTOR: Trading is explicitly disabled in config.")
logger.info(f"Trading Executor initialized - Mode: {self.trading_mode}, Enabled: {self.trading_enabled}")
# Get exchange-specific configuration
self.exchange_config = self.primary_config
# Initialize config synchronizer for automatic fee updates (only for MEXC)
self.config_synchronizer = ConfigSynchronizer(
config_path=config_path,
mexc_interface=self.exchange if (self.trading_enabled and self.primary_name == 'mexc') else None
)
# Perform initial fee sync on startup if trading is enabled and using MEXC
if self.trading_enabled and self.exchange and self.primary_name == 'mexc':
try:
logger.info(f"TRADING EXECUTOR: Performing initial fee synchronization with {self.primary_name.upper()} API")
sync_result = self.config_synchronizer.sync_trading_fees(force=True)
if sync_result.get('status') == 'success':
logger.info("TRADING EXECUTOR: Fee synchronization completed successfully")
if sync_result.get('changes_made'):
logger.info(f"TRADING EXECUTOR: Fee changes applied: {list(sync_result['changes'].keys())}")
# Reload config to get updated fees
self.config = get_config(config_path)
self.exchange_config = self.config.get('exchanges', {}).get(self.primary_name, {})
elif sync_result.get('status') == 'warning':
logger.warning("TRADING EXECUTOR: Fee sync completed with warnings")
else:
logger.warning(f"TRADING EXECUTOR: Fee sync failed: {sync_result.get('status')}")
except Exception as e:
logger.warning(f"TRADING EXECUTOR: Initial fee sync failed: {e}")
elif self.trading_enabled and self.exchange:
logger.info(f"TRADING EXECUTOR: Using {self.primary_name.upper()} exchange - fee sync not available")
# Sync positions from exchange on startup if in live mode
if not self.simulation_mode and self.exchange and self.trading_enabled:
self._sync_positions_on_startup()
logger.info(f"Trading Executor initialized - Exchange: {self.primary_name.upper()}, Mode: {self.trading_mode}, Enabled: {self.trading_enabled}")
def _sync_positions_on_startup(self):
"""Sync positions from exchange on startup"""
try:
logger.info("TRADING EXECUTOR: Syncing positions from exchange on startup...")
# Get all open positions from exchange
if hasattr(self.exchange, 'get_positions'):
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for position in exchange_positions:
symbol = position.get('symbol', '').replace('USDT', '/USDT')
size = float(position.get('size', 0))
side = position.get('side', '').upper()
entry_price = float(position.get('entry_price', 0))
if size > 0 and symbol and side in ['LONG', 'SHORT']:
# Create position object
pos_obj = Position(
symbol=symbol,
side=side,
quantity=size,
entry_price=entry_price,
entry_time=datetime.now()
)
self.positions[symbol] = pos_obj
logger.info(f"POSITION SYNC: Found {side} position for {symbol}: {size} @ ${entry_price:.2f}")
logger.info(f"POSITION SYNC: Synced {len(self.positions)} positions from exchange")
else:
logger.warning("Exchange does not support position retrieval")
except Exception as e:
logger.error(f"POSITION SYNC: Error syncing positions on startup: {e}")
def _sync_single_position_from_exchange(self, symbol: str, exchange_position: dict):
"""Sync a single position from exchange to local state"""
try:
size = float(exchange_position.get('size', 0))
side = exchange_position.get('side', '').upper()
entry_price = float(exchange_position.get('entry_price', 0))
if size > 0 and side in ['LONG', 'SHORT']:
pos_obj = Position(
symbol=symbol,
side=side,
quantity=size,
entry_price=entry_price,
entry_time=datetime.now()
)
self.positions[symbol] = pos_obj
logger.info(f"POSITION SYNC: Added {side} position for {symbol}: {size} @ ${entry_price:.2f}")
return True
except Exception as e:
logger.error(f"Error syncing single position for {symbol}: {e}")
return False
def close_all_positions(self):
"""Emergency close all positions - both local and exchange"""
logger.warning("CLOSE ALL POSITIONS: Starting emergency position closure")
positions_closed = 0
# Get all positions to close (local + exchange)
positions_to_close = set()
# Add local positions
for symbol in self.positions.keys():
positions_to_close.add(symbol)
# Add exchange positions if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for pos in exchange_positions:
symbol = pos.get('symbol', '').replace('USDT', '/USDT')
size = float(pos.get('size', 0))
if size > 0:
positions_to_close.add(symbol)
except Exception as e:
logger.error(f"Error getting exchange positions for closure: {e}")
# Close all positions
for symbol in positions_to_close:
try:
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'LONG':
if self._close_long_position(symbol, 1.0, position.entry_price):
positions_closed += 1
elif position.side == 'SHORT':
if self._close_short_position(symbol, 1.0, position.entry_price):
positions_closed += 1
else:
logger.warning(f"Position {symbol} found on exchange but not locally - manual intervention needed")
except Exception as e:
logger.error(f"Error closing position {symbol}: {e}")
logger.warning(f"CLOSE ALL POSITIONS: Closed {positions_closed} positions")
return positions_closed
def _safe_exchange_call(self, method_name: str, *args, **kwargs):
"""Safely call exchange methods with null checking"""
if not self.exchange:
logger.warning(f"No exchange interface available for {method_name}")
return None
try:
method = getattr(self.exchange, method_name, None)
if method:
return method(*args, **kwargs)
else:
logger.error(f"Method {method_name} not available on exchange")
return None
except Exception as e:
logger.error(f"Error calling {method_name}: {e}")
return None
def _get_real_current_price(self, symbol: str) -> Optional[float]:
"""Get real current price from data provider - NEVER use simulated data"""
try:
# Try to get from data provider first (most reliable)
from core.data_provider import DataProvider
data_provider = DataProvider()
# Try multiple timeframes to get the most recent price
for timeframe in ['1m', '5m', '1h']:
try:
df = data_provider.get_historical_data(symbol, timeframe, limit=1, refresh=True)
if df is not None and not df.empty:
price = float(df['close'].iloc[-1])
if price > 0:
logger.debug(f"Got real price for {symbol} from {timeframe}: ${price:.2f}")
return price
except Exception as tf_error:
logger.debug(f"Failed to get {timeframe} data for {symbol}: {tf_error}")
continue
# Try exchange ticker if available
if self.exchange:
try:
ticker = self.exchange.get_ticker(symbol)
if ticker and 'last' in ticker:
price = float(ticker['last'])
if price > 0:
logger.debug(f"Got real price for {symbol} from exchange: ${price:.2f}")
return price
except Exception as ex_error:
logger.debug(f"Failed to get price from exchange: {ex_error}")
# Try external API as last resort
try:
import requests
if symbol == 'ETH/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}")
return price
elif symbol == 'BTC/USDT':
response = requests.get('https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT', timeout=2)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
if price > 0:
logger.debug(f"Got real price for {symbol} from Binance API: ${price:.2f}")
return price
except Exception as api_error:
logger.debug(f"Failed to get price from external API: {api_error}")
logger.error(f"Failed to get real current price for {symbol} from all sources")
return None
except Exception as e:
logger.error(f"Error getting real current price for {symbol}: {e}")
return None
def _connect_exchange(self) -> bool:
"""Connect to the primary exchange"""
if not self.exchange:
logger.warning("No exchange interface available")
return False
if self.exchange.connect():
logger.info("Successfully connected to exchange")
return True
else:
logger.error("Failed to connect to exchange")
return False
def execute_signal(self, symbol: str, action: str, confidence: float,
current_price: Optional[float] = None) -> bool:
"""Execute a trading signal
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
action: Trading action ('BUY', 'SELL', 'HOLD')
confidence: Confidence level (0.0 to 1.0)
current_price: Current market price
Returns:
bool: True if trade executed successfully
"""
logger.debug(f"TRADING EXECUTOR: execute_signal called. trading_enabled: {self.trading_enabled}")
if not self.trading_enabled:
logger.info(f"Trading disabled - Signal: {action} {symbol} (confidence: {confidence:.2f}) - Reason: Trading executor is not enabled.")
return False
if action == 'HOLD':
return True
# PERIODIC POSITION SYNC: Every 10th signal execution, sync positions from exchange to prevent desync
if not hasattr(self, '_signal_count'):
self._signal_count = 0
self._signal_count += 1
if self._signal_count % 10 == 0 and not self.simulation_mode and self.exchange:
logger.debug(f"PERIODIC SYNC: Checking position sync for {symbol} (signal #{self._signal_count})")
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
size = float(pos.get('size', 0))
if size > 0 and symbol not in self.positions:
logger.warning(f"DESYNC DETECTED: Found position on exchange but not locally for {symbol}")
self._sync_single_position_from_exchange(symbol, pos)
elif symbol in self.positions:
logger.warning(f"DESYNC DETECTED: Have local position but none on exchange for {symbol}")
# Consider removing local position or investigating further
except Exception as e:
logger.debug(f"Error in periodic position sync: {e}")
# Check safety conditions
if not self._check_safety_conditions(symbol, action):
return False
# Get current price if not provided
if current_price is None:
# Always get real current price - never use simulated data
current_price = self._get_real_current_price(symbol)
if current_price is None:
logger.error(f"Failed to get real current price for {symbol}")
return False
# Assert that current_price is not None for type checking
assert current_price is not None, "current_price should not be None at this point"
# --- Balance check before executing trade (skip in simulation mode) ---
# Only perform balance check for live trading, not simulation
if not self.simulation_mode and (action == 'BUY' or (action == 'SELL' and symbol not in self.positions) or (action == 'SHORT')):
# Determine the quote asset (e.g., USDT, USDC) from the symbol
if '/' in symbol:
quote_asset = symbol.split('/')[1].upper() # Assuming symbol is like ETH/USDT
# Keep USDT as USDT for MEXC spot trading (no conversion needed)
else:
# Fallback for symbols like ETHUSDT (assuming last 4 chars are quote)
quote_asset = symbol[-4:].upper()
# Keep USDT as USDT for MEXC spot trading (no conversion needed)
# Calculate required capital for the trade
# If we are selling (to open a short position), we need collateral based on the position size
# For simplicity, assume required capital is the full position value in USD
required_capital = self._calculate_position_size(confidence, current_price)
# Get available balance for the quote asset (try USDT first, then USDC as fallback)
if quote_asset == 'USDT':
available_balance = self.exchange.get_balance('USDT')
if available_balance < required_capital:
# If USDT balance is insufficient, check USDC as fallback
usdc_balance = self.exchange.get_balance('USDC')
if usdc_balance >= required_capital:
available_balance = usdc_balance
quote_asset = 'USDC' # Use USDC instead
logger.info(f"BALANCE CHECK: Using USDC fallback balance for {symbol}")
else:
available_balance = self.exchange.get_balance(quote_asset)
logger.info(f"BALANCE CHECK: Symbol: {symbol}, Action: {action}, Required: ${required_capital:.2f} {quote_asset}, Available: ${available_balance:.2f} {quote_asset}")
# Allow some small precision tolerance (1 cent) and ensure sufficient balance
balance_tolerance = 0.01
if available_balance < (required_capital - balance_tolerance):
logger.warning(f"Trade blocked for {symbol} {action}: Insufficient {quote_asset} balance. "
f"Required: ${required_capital:.2f}, Available: ${available_balance:.2f}")
return False
else:
logger.info(f"BALANCE CHECK PASSED: {quote_asset} balance sufficient for trade")
elif self.simulation_mode:
logger.debug(f"SIMULATION MODE: Skipping balance check for {symbol} {action} - allowing trade for model training")
# --- End Balance check ---
# Try to acquire lock with timeout to prevent hanging
lock_acquired = self.lock.acquire(timeout=self.lock_timeout)
if not lock_acquired:
logger.error(f"TIMEOUT: Failed to acquire lock within {self.lock_timeout}s for {action} {symbol}")
logger.error(f"This indicates another operation is hanging. Cancelling {action} order.")
return False
try:
logger.info(f"LOCK ACQUIRED: Executing {action} for {symbol}")
start_time = time.time()
if action == 'BUY':
result = self._execute_buy(symbol, confidence, current_price)
elif action == 'SELL':
result = self._execute_sell(symbol, confidence, current_price)
elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal
result = self._execute_short(symbol, confidence, current_price)
else:
logger.warning(f"Unknown action: {action}")
result = False
execution_time = time.time() - start_time
logger.info(f"EXECUTION COMPLETE: {action} for {symbol} took {execution_time:.2f}s, result: {result}")
return result
except Exception as e:
logger.error(f"Error executing {action} signal for {symbol}: {e}")
return False
finally:
self.lock.release()
logger.debug(f"LOCK RELEASED: {action} for {symbol}")
def _get_open_orders_count(self) -> int:
"""Get current count of open orders across all symbols with rate limiting"""
try:
if self.simulation_mode:
return 0
# Check if enough time has passed since last sync
current_time = datetime.now()
time_since_last_sync = (current_time - self.last_open_orders_sync).total_seconds()
if time_since_last_sync < self.open_orders_sync_interval:
# Return cached count if within rate limit
logger.debug(f"Using cached open orders count ({self.open_orders_count}) - rate limited")
return self.open_orders_count
# Update last sync time
self.last_open_orders_sync = current_time
total_open_orders = 0
for symbol in self.symbols:
try:
open_orders = self.exchange.get_open_orders(symbol)
total_open_orders += len(open_orders)
except Exception as e:
logger.warning(f"Error getting open orders for {symbol}: {e}")
# Continue with other symbols
continue
# Update cached count
self.open_orders_count = total_open_orders
logger.debug(f"Updated open orders count: {total_open_orders}")
return total_open_orders
except Exception as e:
logger.error(f"Error getting open orders count: {e}")
return self.open_orders_count # Return cached value on error
def _can_place_new_order(self) -> bool:
"""Check if we can place a new order based on open order limit"""
current_count = self._get_open_orders_count()
can_place = current_count < self.max_open_orders
if not can_place:
logger.warning(f"Cannot place new order: {current_count}/{self.max_open_orders} open orders")
return can_place
def sync_open_orders(self) -> Dict[str, Any]:
"""Synchronize open orders with exchange and update internal state with rate limiting
Returns:
dict: Sync result with status and order details
"""
try:
if self.simulation_mode:
return {
'status': 'success',
'message': 'Simulation mode - no real orders to sync',
'orders': [],
'count': 0
}
# Check rate limiting
current_time = datetime.now()
time_since_last_sync = (current_time - self.last_open_orders_sync).total_seconds()
if time_since_last_sync < self.open_orders_sync_interval:
# Return cached result if within rate limit
logger.debug(f"Open order sync rate limited - using cached data")
return {
'status': 'rate_limited',
'message': f'Rate limited - last sync was {time_since_last_sync:.1f}s ago',
'orders': [],
'count': self.open_orders_count,
'cached': True
}
# Update last sync time
self.last_open_orders_sync = current_time
sync_result = {
'status': 'started',
'orders': [],
'count': 0,
'errors': []
}
total_orders = 0
all_orders = []
# Sync orders for each symbol
for symbol in self.symbols:
try:
open_orders = self.exchange.get_open_orders(symbol)
if open_orders:
symbol_orders = []
for order in open_orders:
order_info = {
'symbol': symbol,
'order_id': order.get('orderId'),
'side': order.get('side'),
'type': order.get('type'),
'quantity': float(order.get('origQty', 0)),
'price': float(order.get('price', 0)),
'status': order.get('status'),
'time': order.get('time')
}
symbol_orders.append(order_info)
all_orders.append(order_info)
total_orders += len(symbol_orders)
logger.info(f"Synced {len(symbol_orders)} open orders for {symbol}")
except Exception as e:
error_msg = f"Error syncing orders for {symbol}: {e}"
logger.warning(error_msg) # Changed to warning since this is expected with rate limits
sync_result['errors'].append(error_msg)
# Update internal state
self.open_orders_count = total_orders
sync_result.update({
'status': 'success',
'orders': all_orders,
'count': total_orders,
'message': f"Synced {total_orders} open orders across {len(self.symbols)} symbols"
})
logger.info(f"Open order sync completed: {total_orders} orders")
return sync_result
except Exception as e:
logger.error(f"Error in open order sync: {e}")
return {
'status': 'error',
'message': str(e),
'orders': [],
'count': 0,
'errors': [str(e)]
}
def _cancel_open_orders(self, symbol: str) -> int:
"""Cancel all open orders for a symbol and return count of cancelled orders"""
try:
if self.simulation_mode:
return 0
open_orders = self.exchange.get_open_orders(symbol)
cancelled_count = 0
for order in open_orders:
order_id = order.get('orderId')
if order_id:
try:
cancel_result = self.exchange.cancel_order(symbol, str(order_id))
if cancel_result:
cancelled_count += 1
logger.info(f"Cancelled order {order_id} for {symbol}")
time.sleep(0.1) # Small delay between cancellations
except Exception as e:
logger.warning(f"Failed to cancel order {order_id}: {e}")
return cancelled_count
except Exception as e:
logger.error(f"Error cancelling open orders for {symbol}: {e}")
return 0
def _calculate_recent_success_rate(self) -> float:
"""Calculate success rate of recent closed trades
Returns:
float: Success rate (0.0 to 1.0) of recent trades
"""
try:
if len(self.trade_records) < 5: # Need at least 5 trades
return 0.0
# Get recent trades (up to the window size)
recent_trades = self.trade_records[-self.recent_trades_window:]
if not recent_trades:
return 0.0
# Count winning trades (net PnL > 0)
winning_trades = sum(1 for trade in recent_trades if trade.net_pnl > 0)
success_rate = winning_trades / len(recent_trades)
logger.debug(f"Recent success rate: {success_rate:.2%} ({winning_trades}/{len(recent_trades)} trades)")
return success_rate
except Exception as e:
logger.error(f"Error calculating success rate: {e}")
return 0.0
def _adjust_profitability_reward_multiplier(self):
"""Adjust profitability reward multiplier based on recent success rate"""
try:
# Only adjust every 5 minutes to avoid too frequent changes
current_time = datetime.now()
time_since_last_adjustment = (current_time - self.last_profitability_adjustment).total_seconds()
if time_since_last_adjustment < 300: # 5 minutes
return
success_rate = self._calculate_recent_success_rate()
# Only adjust if we have enough trades
if len(self.trade_records) < 10:
return
old_multiplier = self.profitability_reward_multiplier
# Increase multiplier if success rate > 60%
if success_rate > self.success_rate_increase_threshold:
self.profitability_reward_multiplier = min(
self.max_profitability_multiplier,
self.profitability_reward_multiplier + self.profitability_adjustment_step
)
logger.info(f"🎯 SUCCESS RATE HIGH ({success_rate:.1%}) - Increased profitability multiplier: {old_multiplier:.1f}{self.profitability_reward_multiplier:.1f}")
# Decrease multiplier if success rate < 51%
elif success_rate < self.success_rate_decrease_threshold:
self.profitability_reward_multiplier = max(
self.min_profitability_multiplier,
self.profitability_reward_multiplier - self.profitability_adjustment_step
)
logger.info(f"⚠️ SUCCESS RATE LOW ({success_rate:.1%}) - Decreased profitability multiplier: {old_multiplier:.1f}{self.profitability_reward_multiplier:.1f}")
else:
logger.debug(f"Success rate {success_rate:.1%} in acceptable range - keeping multiplier at {self.profitability_reward_multiplier:.1f}")
self.last_profitability_adjustment = current_time
except Exception as e:
logger.error(f"Error adjusting profitability reward multiplier: {e}")
def get_profitability_reward_multiplier(self) -> float:
"""Get current profitability reward multiplier
Returns:
float: Current profitability reward multiplier
"""
return self.profitability_reward_multiplier
def _can_reenable_live_trading(self) -> bool:
"""Check if trading performance has improved enough to re-enable live trading
Returns:
bool: True if performance meets criteria to re-enable live trading
"""
try:
# Need enough trades to evaluate
if len(self.trade_history) < self.trades_to_evaluate:
logger.debug(f"Not enough trades to evaluate for re-enabling live trading: {len(self.trade_history)}/{self.trades_to_evaluate}")
return False
# Get the most recent trades for evaluation
recent_trades = self.trade_history[-self.trades_to_evaluate:]
# Calculate success rate
winning_trades = sum(1 for trade in recent_trades if trade.pnl > 0.001)
success_rate = winning_trades / len(recent_trades)
# Calculate average PnL
avg_pnl = sum(trade.pnl for trade in recent_trades) / len(recent_trades)
# Calculate win/loss ratio
losing_trades = sum(1 for trade in recent_trades if trade.pnl < -0.001)
win_loss_ratio = winning_trades / max(1, losing_trades) # Avoid division by zero
logger.info(f"SAFETY FEATURE: Performance evaluation - Success rate: {success_rate:.2%}, Avg PnL: ${avg_pnl:.2f}, Win/Loss ratio: {win_loss_ratio:.2f}")
# Criteria to re-enable live trading:
# 1. Success rate must exceed minimum threshold
# 2. Average PnL must be positive
# 3. Win/loss ratio must be at least 1.0 (equal wins and losses)
if (success_rate >= self.min_success_rate_to_reenable and
avg_pnl > 0 and
win_loss_ratio >= 1.0):
logger.info(f"SAFETY FEATURE: Performance criteria met for re-enabling live trading")
return True
else:
logger.debug(f"SAFETY FEATURE: Performance criteria not yet met for re-enabling live trading")
return False
except Exception as e:
logger.error(f"Error evaluating trading performance: {e}")
return False
except Exception as e:
logger.error(f"Error evaluating trading performance: {e}")
return False
def _check_safety_conditions(self, symbol: str, action: str) -> bool:
"""Check if it's safe to execute a trade"""
# Check if trading is stopped
if self.exchange_config.get('emergency_stop', False):
logger.warning("Emergency stop is active - no trades allowed")
return False
# Safety feature: Check consecutive losses and switch to simulation mode if needed
if not self.simulation_mode and self.consecutive_losses >= self.max_consecutive_losses:
logger.warning(f"SAFETY FEATURE ACTIVATED: {self.consecutive_losses} consecutive losses detected")
logger.warning(f"Switching from live trading to simulation mode for safety")
# Store original mode and switch to simulation
self.original_trading_mode = self.trading_mode
self.trading_mode = 'simulation'
self.simulation_mode = True
self.safety_triggered = True
# Log the event
logger.info(f"Trading mode changed to SIMULATION due to safety feature")
logger.info(f"Will continue to monitor performance and re-enable live trading when success rate improves")
# Continue allowing trades in simulation mode
return True
# Check if we should try to re-enable live trading after safety feature was triggered
if self.simulation_mode and self.safety_triggered and self.original_trading_mode != 'simulation':
# Check if performance has improved enough to re-enable live trading
if self._can_reenable_live_trading():
logger.info(f"SAFETY FEATURE: Performance has improved, re-enabling live trading")
# Switch back to original mode
self.trading_mode = self.original_trading_mode
self.simulation_mode = (self.trading_mode == 'simulation')
self.safety_triggered = False
self.consecutive_losses = 0 # Reset consecutive losses counter
logger.info(f"Trading mode restored to {self.trading_mode}")
# Continue with the trade
return True
# Check symbol allowlist
allowed_symbols = self.exchange_config.get('allowed_symbols', [])
if allowed_symbols and symbol not in allowed_symbols:
logger.warning(f"Symbol {symbol} not in allowed list: {allowed_symbols}")
return False
# Check daily loss limit (use trading config as fallback)
max_daily_loss = self.exchange_config.get('max_daily_loss_usd',
self.trading_config.get('max_daily_loss_usd', 200.0))
if self.daily_loss >= max_daily_loss:
logger.warning(f"Daily loss limit reached: ${self.daily_loss:.2f} >= ${max_daily_loss}")
return False
# Check trade interval - allow bypass for test scenarios
min_interval = self.exchange_config.get('min_trade_interval_seconds',
self.trading_config.get('min_trade_interval_seconds', 5))
last_trade = self.last_trade_time.get(symbol, datetime.min)
time_since_last = (datetime.now() - last_trade).total_seconds()
# Allow bypass for high confidence sells (closing positions) or test scenarios
if time_since_last < min_interval:
# Allow immediate sells for closing positions or very high confidence (0.9+)
if action == 'SELL' and symbol in self.positions:
logger.info(f"Allowing immediate SELL to close position for {symbol}")
elif hasattr(self, '_test_mode') and self._test_mode:
logger.info(f"Test mode: bypassing trade interval for {symbol}")
else:
logger.info(f"Trade interval not met for {symbol} ({time_since_last:.1f}s < {min_interval}s)")
return False
# Check concurrent positions
max_positions = self.exchange_config.get('max_concurrent_positions',
self.trading_config.get('max_concurrent_positions', 3))
if len(self.positions) >= max_positions and action == 'BUY':
logger.warning(f"Maximum concurrent positions reached: {len(self.positions)}")
return False
# SYNC OPEN ORDERS BEFORE CHECKING LIMITS
# This ensures we have accurate position data before making decisions
if not self.simulation_mode:
try:
logger.debug(f"Syncing open orders before trade execution for {symbol}")
sync_result = self.sync_open_orders()
if sync_result.get('status') == 'success':
logger.debug(f"Open orders synced successfully: {sync_result.get('count', 0)} orders")
else:
logger.warning(f"Open orders sync failed: {sync_result.get('message', 'Unknown error')}")
except Exception as e:
logger.warning(f"Error syncing open orders: {e}")
# Check open order limit
if not self._can_place_new_order():
logger.warning(f"Maximum open orders reached: {self._get_open_orders_count()}/{self.max_open_orders}")
return False
# Check position size limit before opening new positions
if action in ['BUY', 'SHORT'] and symbol not in self.positions:
if not self._check_position_size_limit():
logger.warning(f"Position size limit reached - cannot open new position for {symbol}")
return False
return True
def _execute_buy(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a buy order with enhanced position management"""
# CRITICAL: Check for existing positions (both local and exchange)
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'SHORT':
logger.info(f"Closing SHORT position in {symbol}")
return self._close_short_position(symbol, confidence, current_price)
else:
logger.warning(f"POSITION SAFETY: Already have LONG position in {symbol} - blocking duplicate trade")
return False
# ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
if float(pos.get('size', 0)) > 0:
logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking duplicate trade")
logger.warning(f"Position details: {pos}")
# Sync this position to local state
self._sync_single_position_from_exchange(symbol, pos)
return False
except Exception as e:
logger.debug(f"Error checking exchange positions for {symbol}: {e}")
# Don't block trade if we can't check - but log it
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
# Calculate position size
position_size = self._calculate_position_size(confidence, current_price)
quantity = position_size / current_price
logger.info(f"Executing BUY: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]")
if self.simulation_mode:
# Create simulated position
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=quantity,
entry_price=current_price,
entry_time=datetime.now(),
order_id=f"sim_{int(datetime.now().timestamp())}"
)
logger.info(f"Simulated BUY order: {quantity:.6f} {symbol} at ${current_price:.2f}")
return True
else:
# Place real order with enhanced error handling
result = self._place_order_with_retry(symbol, 'BUY', 'MARKET', quantity, current_price)
# Check for position check error
if result and 'error' in result and result['error'] == 'existing_position':
logger.error(f"BUY order blocked: {result['message']}")
return False
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
fill_price = result.get('avgPrice', current_price)
# Only create position if order was actually filled
if result.get('filled', True): # Assume filled for backward compatibility
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=float(filled_quantity),
entry_price=float(fill_price),
entry_time=datetime.now(),
order_id=result['orderId']
)
logger.info(f"BUY position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}")
self.last_trade_time[symbol] = datetime.now()
return True
else:
logger.error(f"BUY order placed but not filled: {result}")
return False
else:
logger.error("Failed to place BUY order")
return False
def _execute_sell(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a sell order (close long position or open short position)"""
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'LONG':
logger.info(f"Closing LONG position in {symbol}")
return self._close_long_position(symbol, confidence, current_price)
else:
logger.info(f"Already have SHORT position in {symbol}")
return False
else:
# No position to sell, open short position
logger.info(f"No position to sell in {symbol}. Opening short position")
return self._execute_short(symbol, confidence, current_price)
def _execute_short(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a short order (sell without holding the asset) with enhanced position management"""
# CRITICAL: Check for any existing positions before opening SHORT
if symbol in self.positions:
logger.warning(f"POSITION SAFETY: Already have position in {symbol} - blocking SHORT trade")
return False
# ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
if float(pos.get('size', 0)) > 0:
logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking SHORT trade")
logger.warning(f"Position details: {pos}")
# Sync this position to local state
self._sync_single_position_from_exchange(symbol, pos)
return False
except Exception as e:
logger.debug(f"Error checking exchange positions for SHORT {symbol}: {e}")
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
# Calculate position size
position_size = self._calculate_position_size(confidence, current_price)
quantity = position_size / current_price
logger.info(f"Executing SHORT: {quantity:.6f} {symbol} at ${current_price:.2f} (value: ${position_size:.2f}, confidence: {confidence:.2f}) [{'SIM' if self.simulation_mode else 'LIVE'}]")
if self.simulation_mode:
# Create simulated short position
self.positions[symbol] = Position(
symbol=symbol,
side='SHORT',
quantity=quantity,
entry_price=current_price,
entry_time=datetime.now(),
order_id=f"sim_{int(datetime.now().timestamp())}"
)
logger.info(f"Simulated SHORT order: {quantity:.6f} {symbol} at ${current_price:.2f}")
return True
else:
# Place real short order with enhanced error handling
result = self._place_order_with_retry(symbol, 'SELL', 'MARKET', quantity, current_price)
# Check for position check error
if result and 'error' in result and result['error'] == 'existing_position':
logger.error(f"SHORT order blocked: {result['message']}")
return False
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
fill_price = result.get('avgPrice', current_price)
# Only create position if order was actually filled
if result.get('filled', True): # Assume filled for backward compatibility
self.positions[symbol] = Position(
symbol=symbol,
side='SHORT',
quantity=float(filled_quantity),
entry_price=float(fill_price),
entry_time=datetime.now(),
order_id=result['orderId']
)
logger.info(f"SHORT position created: {filled_quantity:.6f} {symbol} at ${fill_price:.4f}")
self.last_trade_time[symbol] = datetime.now()
return True
else:
logger.error(f"SHORT order placed but not filled: {result}")
return False
else:
logger.error("Failed to place SHORT order")
return False
def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]:
"""Place order with retry logic for MEXC error handling"""
# FINAL POSITION CHECK: Verify no existing position before placing order
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
size = float(pos.get('size', 0))
if size > 0:
logger.error(f"FINAL POSITION CHECK FAILED: Found existing position for {symbol} before placing order")
logger.error(f"Position details: {pos}")
logger.error(f"Order details: {side} {quantity} @ ${current_price}")
# Sync the position to local state
self._sync_single_position_from_exchange(symbol, pos)
return {'error': 'existing_position', 'message': f'Position already exists for {symbol}'}
except Exception as e:
logger.warning(f"Error in final position check for {symbol}: {e}")
# Continue with order placement if we can't check positions
order_start_time = time.time()
max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout)
for attempt in range(max_retries):
# Check if we've exceeded the maximum order time
elapsed_time = time.time() - order_start_time
if elapsed_time > max_order_time:
logger.error(f"ORDER TIMEOUT: Order placement exceeded {max_order_time}s limit for {side} {symbol}")
logger.error(f"Elapsed time: {elapsed_time:.2f}s, cancelling order to prevent lock hanging")
return {}
try:
# For retries, use more aggressive pricing for LIMIT orders
order_price = current_price
if order_type.upper() == 'LIMIT' and attempt > 0:
# Increase aggressiveness with each retry
aggression_factor = 1 + (0.005 * (attempt + 1)) # 0.5%, 1.0%, 1.5% etc.
if side.upper() == 'BUY':
order_price = current_price * aggression_factor
else:
order_price = current_price / aggression_factor
logger.info(f"Retry {attempt + 1}: Using more aggressive price ${order_price:.4f} (vs market ${current_price:.4f})")
result = self.exchange.place_order(symbol, side, order_type, quantity, order_price)
# Check if result contains error information
if isinstance(result, dict) and 'error' in result:
error_type = result.get('error')
error_code = result.get('code')
error_msg = result.get('message', 'Unknown error')
if error_type == 'oversold' and error_code == 30005:
logger.warning(f"MEXC Oversold error on attempt {attempt + 1}/{max_retries}")
logger.warning(f"Error: {error_msg}")
if attempt < max_retries - 1:
# Calculate wait time but respect timeout limit
suggested_wait = result.get('retry_after', 60) * (2 ** attempt)
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if suggested_wait > remaining_time:
logger.warning(f"Oversold retry wait ({suggested_wait}s) exceeds timeout limit")
logger.warning(f"Remaining time: {remaining_time:.2f}s, skipping retry to prevent hanging")
return {}
logger.info(f"Waiting {suggested_wait} seconds before retry due to oversold condition...")
time.sleep(suggested_wait)
# Reduce quantity for next attempt to avoid oversold
quantity = quantity * 0.8 # Reduce by 20%
logger.info(f"Reducing quantity to {quantity:.6f} for retry")
continue
else:
logger.error(f"Max retries reached for oversold condition")
return {}
elif error_type == 'direction_not_allowed':
logger.error(f"Trading direction not allowed for {symbol} {side}")
return {}
elif error_type == 'insufficient_position':
logger.error(f"Insufficient position for {symbol} {side}")
return {}
else:
logger.error(f"MEXC API error: {error_code} - {error_msg}")
if attempt < max_retries - 1:
wait_time = 5 * (attempt + 1) # Wait 5, 10, 15 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"API error retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
# Success case - order placed
elif isinstance(result, dict) and ('orderId' in result or 'symbol' in result):
logger.info(f"Order placed successfully on attempt {attempt + 1}")
# For LIMIT orders, verify that the order actually fills
if order_type.upper() == 'LIMIT' and 'orderId' in result:
order_id = result['orderId']
filled_result = self._wait_for_order_fill(symbol, order_id, max_wait_time=5.0)
if filled_result['filled']:
logger.info(f"LIMIT order {order_id} filled successfully")
# Update result with fill information
result.update(filled_result)
return result
else:
logger.warning(f"LIMIT order {order_id} not filled within timeout, cancelling...")
# Cancel the unfilled order
try:
self.exchange.cancel_order(symbol, order_id)
logger.info(f"Cancelled unfilled order {order_id}")
except Exception as e:
logger.error(f"Failed to cancel unfilled order {order_id}: {e}")
# If this was the last attempt, return failure
if attempt == max_retries - 1:
return {}
# Try again with a more aggressive price
logger.info(f"Retrying with more aggressive LIMIT pricing...")
continue
else:
# MARKET orders or orders without orderId - assume immediate fill
return result
# Empty result - treat as failure
else:
logger.warning(f"Empty result on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
wait_time = 2 * (attempt + 1) # Wait 2, 4, 6 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Empty result retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
except Exception as e:
logger.error(f"Exception on order attempt {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
wait_time = 3 * (attempt + 1) # Wait 3, 6, 9 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Exception retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
logger.error(f"Failed to place order after {max_retries} attempts")
return {}
def _wait_for_order_fill(self, symbol: str, order_id: str, max_wait_time: float = 5.0) -> Dict[str, Any]:
"""Wait for a LIMIT order to fill and return fill status
Args:
symbol: Trading symbol
order_id: Order ID to monitor
max_wait_time: Maximum time to wait for fill in seconds
Returns:
dict: {'filled': bool, 'status': str, 'executedQty': float, 'avgPrice': float}
"""
start_time = time.time()
check_interval = 0.2 # Check every 200ms
while time.time() - start_time < max_wait_time:
try:
order_status = self.exchange.get_order_status(symbol, order_id)
if order_status and isinstance(order_status, dict):
status = order_status.get('status', '').upper()
executed_qty = float(order_status.get('executedQty', 0))
orig_qty = float(order_status.get('origQty', 0))
avg_price = float(order_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0
logger.debug(f"Order {order_id} status: {status}, executed: {executed_qty}/{orig_qty}")
if status == 'FILLED':
return {
'filled': True,
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'fillTime': time.time()
}
elif status in ['CANCELED', 'REJECTED', 'EXPIRED']:
return {
'filled': False,
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'reason': f'Order {status.lower()}'
}
elif status == 'PARTIALLY_FILLED':
# For partial fills, continue waiting but log progress
fill_percentage = (executed_qty / orig_qty * 100) if orig_qty > 0 else 0
logger.debug(f"Order {order_id} partially filled: {fill_percentage:.1f}%")
# Wait before next check
time.sleep(check_interval)
except Exception as e:
logger.error(f"Error checking order status for {order_id}: {e}")
time.sleep(check_interval)
# Timeout - check final status
try:
final_status = self.exchange.get_order_status(symbol, order_id)
if final_status:
status = final_status.get('status', '').upper()
executed_qty = float(final_status.get('executedQty', 0))
avg_price = float(final_status.get('cummulativeQuoteQty', 0)) / executed_qty if executed_qty > 0 else 0
return {
'filled': status == 'FILLED',
'status': status,
'executedQty': executed_qty,
'avgPrice': avg_price,
'reason': 'timeout' if status != 'FILLED' else None
}
except Exception as e:
logger.error(f"Error getting final order status for {order_id}: {e}")
return {
'filled': False,
'status': 'UNKNOWN',
'executedQty': 0,
'avgPrice': 0,
'reason': 'timeout_and_status_check_failed'
}
def _close_short_position(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Close a short position by buying"""
if symbol not in self.positions:
logger.warning(f"No position to close in {symbol}")
return False
position = self.positions[symbol]
if position.side != 'SHORT':
logger.warning(f"Position in {symbol} is not SHORT, cannot close with BUY")
return False
logger.info(f"Closing SHORT position: {position.quantity:.6f} {symbol} at ${current_price:.2f} "
f"(confidence: {confidence:.2f})")
if self.simulation_mode:
logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Short close logged but not executed")
# Calculate simulated fees in simulation mode
trading_fees = self.exchange_config.get('trading_fees', {})
taker_fee_rate = trading_fees.get('taker_fee', trading_fees.get('default_fee', 0.0006))
simulated_fees = position.quantity * current_price * taker_fee_rate
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - simulated_fees
# Calculate hold time
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record with corrected PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=simulated_fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
self.trade_records.append(trade_record)
self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl
# Adjust profitability reward multiplier based on recent performance
self._adjust_profitability_reward_multiplier()
# Update consecutive losses using net_pnl
if net_pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif net_pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"SHORT position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${simulated_fees:.3f}")
return True
try:
# Get order type from config
order_type = self.exchange_config.get('order_type', 'market').lower()
# For limit orders, set price slightly above market for immediate execution
limit_price = None
if order_type == 'limit':
# Set buy price slightly above market to ensure immediate execution
limit_price = current_price * 1.001 # 0.1% above market
# Place buy order to close short
if order_type == 'market':
order = self.exchange.place_order(
symbol=symbol,
side='buy', # Buy to close short position
order_type=order_type,
quantity=position.quantity
)
else:
# For limit orders, price is required
assert limit_price is not None, "limit_price required for limit orders"
order = self.exchange.place_order(
symbol=symbol,
side='buy', # Buy to close short position
order_type=order_type,
quantity=position.quantity,
price=limit_price
)
if order:
# Calculate fees using real API data when available
fees = self._calculate_real_trading_fees(order, symbol, position.quantity, current_price)
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - fees
# Calculate hold time
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record with corrected PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='SHORT',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
self.trade_records.append(trade_record)
self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl
# Adjust profitability reward multiplier based on recent performance
self._adjust_profitability_reward_multiplier()
# Update consecutive losses using net_pnl
if net_pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif net_pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"SHORT close order executed: {order}")
logger.info(f"SHORT position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${fees:.3f}")
return True
else:
logger.error("Failed to place SHORT close order")
return False
except Exception as e:
logger.error(f"Error closing SHORT position: {e}")
return False
def _close_long_position(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Close a long position by selling"""
if symbol not in self.positions:
logger.warning(f"No position to close in {symbol}")
return False
position = self.positions[symbol]
if position.side != 'LONG':
logger.warning(f"Position in {symbol} is not LONG, cannot close with SELL")
return False
logger.info(f"Closing LONG position: {position.quantity:.6f} {symbol} at ${current_price:.2f} "
f"(confidence: {confidence:.2f})")
if self.simulation_mode:
logger.info(f"SIMULATION MODE ({self.trading_mode.upper()}) - Long close logged but not executed")
# Calculate simulated fees in simulation mode
trading_fees = self.exchange_config.get('trading_fees', {})
taker_fee_rate = trading_fees.get('taker_fee', trading_fees.get('default_fee', 0.0006))
simulated_fees = position.quantity * current_price * taker_fee_rate
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - simulated_fees
# Calculate hold time
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record with corrected PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='LONG',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=simulated_fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
self.trade_records.append(trade_record)
self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl
# Adjust profitability reward multiplier based on recent performance
self._adjust_profitability_reward_multiplier()
# Update consecutive losses using net_pnl
if net_pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif net_pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"LONG position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${simulated_fees:.3f}")
return True
try:
# Get order type from config
order_type = self.exchange_config.get('order_type', 'market').lower()
# For limit orders, set price slightly below market for immediate execution
limit_price = None
if order_type == 'limit':
# Set sell price slightly below market to ensure immediate execution
limit_price = current_price * 0.999 # 0.1% below market
# Place sell order to close long
if order_type == 'market':
order = self.exchange.place_order(
symbol=symbol,
side='sell', # Sell to close long position
order_type=order_type,
quantity=position.quantity
)
else:
# For limit orders, price is required
assert limit_price is not None, "limit_price required for limit orders"
order = self.exchange.place_order(
symbol=symbol,
side='sell', # Sell to close long position
order_type=order_type,
quantity=position.quantity,
price=limit_price
)
if order:
# Calculate fees using real API data when available
fees = self._calculate_real_trading_fees(order, symbol, position.quantity, current_price)
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = position.quantity * position.entry_price
# Calculate gross PnL (before fees) with leverage
gross_pnl = (current_price - position.entry_price) * position.quantity * leverage
# Calculate net PnL (after fees)
net_pnl = gross_pnl - fees
# Calculate hold time
exit_time = datetime.now()
hold_time_seconds = (exit_time - position.entry_time).total_seconds()
# Create trade record with corrected PnL calculations
trade_record = TradeRecord(
symbol=symbol,
side='LONG',
quantity=position.quantity,
entry_price=position.entry_price,
exit_price=current_price,
entry_time=position.entry_time,
exit_time=exit_time,
pnl=net_pnl, # Store net PnL as the main PnL value
fees=fees,
confidence=confidence,
hold_time_seconds=hold_time_seconds,
leverage=leverage,
position_size_usd=position_size_usd,
gross_pnl=gross_pnl,
net_pnl=net_pnl
)
self.trade_history.append(trade_record)
self.trade_records.append(trade_record)
self.daily_loss += max(0, -net_pnl) # Use net_pnl instead of pnl
# Adjust profitability reward multiplier based on recent performance
self._adjust_profitability_reward_multiplier()
# Update consecutive losses using net_pnl
if net_pnl < -0.001: # A losing trade
self.consecutive_losses += 1
elif net_pnl > 0.001: # A winning trade
self.consecutive_losses = 0
else: # Breakeven trade
self.consecutive_losses = 0
# Remove position
del self.positions[symbol]
self.last_trade_time[symbol] = datetime.now()
self.daily_trades += 1
logger.info(f"LONG close order executed: {order}")
logger.info(f"LONG position closed - Gross P&L: ${gross_pnl:.2f}, Net P&L: ${net_pnl:.2f}, Fees: ${fees:.3f}")
return True
else:
logger.error("Failed to place LONG close order")
return False
except Exception as e:
logger.error(f"Error closing LONG position: {e}")
return False
def _calculate_position_size(self, confidence: float, current_price: float) -> float:
"""Calculate position size - limited to 20% of account balance by default"""
# Get account balance (simulation or real)
account_balance = self._get_account_balance_for_sizing()
# Get maximum position size limit (default 20% of balance)
max_position_percentage = self.mexc_config.get('max_position_percentage', 0.20)
max_position_value = account_balance * max_position_percentage
# Calculate desired position size based on confidence
# Scale by confidence: 70-100% of max position size based on confidence (0.7-1.0 range)
confidence_multiplier = max(0.7, min(1.0, confidence))
desired_position_value = max_position_value * confidence_multiplier
# Apply reduction based on consecutive losses (risk management)
reduction_factor = self.mexc_config.get('consecutive_loss_reduction_factor', 0.8)
adjusted_reduction_factor = reduction_factor ** self.consecutive_losses
final_position_value = desired_position_value * adjusted_reduction_factor
logger.debug(f"Position calculation: account=${account_balance:.2f}, "
f"max_position=${max_position_value:.2f} ({max_position_percentage*100:.0f}%), "
f"confidence_mult={confidence_multiplier:.2f}, "
f"final_position=${final_position_value:.2f}, confidence={confidence:.2f}")
return final_position_value
def _get_account_balance_for_sizing(self) -> float:
"""Get account balance for position sizing calculations"""
if self.simulation_mode:
return self.simulation_balance
else:
# For live trading, get actual USDT/USDC balance
try:
balances = self.get_account_balance()
usdt_balance = balances.get('USDT', {}).get('total', 0)
usdc_balance = balances.get('USDC', {}).get('total', 0)
return max(usdt_balance, usdc_balance)
except Exception as e:
logger.warning(f"Failed to get live account balance: {e}, using simulation default")
return self.simulation_balance
def _calculate_pnl_with_fees(self, entry_price: float, exit_price: float, quantity: float, side: str) -> Dict[str, float]:
"""Calculate PnL including trading fees (0.1% open + 0.1% close = 0.2% total)"""
try:
# Calculate position value
position_value = entry_price * quantity
# Calculate fees
open_fee = position_value * self.trading_fees['open_fee_percent']
close_fee = (exit_price * quantity) * self.trading_fees['close_fee_percent']
total_fees = open_fee + close_fee
# Calculate gross PnL (before fees)
if side.upper() == 'LONG':
gross_pnl = (exit_price - entry_price) * quantity
else: # SHORT
gross_pnl = (entry_price - exit_price) * quantity
# Calculate net PnL (after fees)
net_pnl = gross_pnl - total_fees
# Calculate percentage returns
gross_pnl_percent = (gross_pnl / position_value) * 100
net_pnl_percent = (net_pnl / position_value) * 100
fee_percent = (total_fees / position_value) * 100
return {
'gross_pnl': gross_pnl,
'net_pnl': net_pnl,
'total_fees': total_fees,
'open_fee': open_fee,
'close_fee': close_fee,
'gross_pnl_percent': gross_pnl_percent,
'net_pnl_percent': net_pnl_percent,
'fee_percent': fee_percent,
'position_value': position_value
}
except Exception as e:
logger.error(f"Error calculating PnL with fees: {e}")
return {
'gross_pnl': 0.0,
'net_pnl': 0.0,
'total_fees': 0.0,
'open_fee': 0.0,
'close_fee': 0.0,
'gross_pnl_percent': 0.0,
'net_pnl_percent': 0.0,
'fee_percent': 0.0,
'position_value': 0.0
}
def _calculate_pivot_points(self, symbol: str) -> Dict[str, float]:
"""Calculate pivot points for the symbol using real market data"""
try:
from core.data_provider import DataProvider
data_provider = DataProvider()
# Get daily data for pivot calculation
df = data_provider.get_historical_data(symbol, '1d', limit=2, refresh=True)
if df is None or len(df) < 2:
logger.warning(f"Insufficient data for pivot calculation for {symbol}")
return {}
# Use previous day's data for pivot calculation
prev_day = df.iloc[-2]
high = float(prev_day['high'])
low = float(prev_day['low'])
close = float(prev_day['close'])
# Calculate pivot point
pivot = (high + low + close) / 3
# Calculate support and resistance levels
r1 = (2 * pivot) - low
s1 = (2 * pivot) - high
r2 = pivot + (high - low)
s2 = pivot - (high - low)
r3 = high + 2 * (pivot - low)
s3 = low - 2 * (high - pivot)
pivots = {
'pivot': pivot,
'r1': r1, 'r2': r2, 'r3': r3,
's1': s1, 's2': s2, 's3': s3,
'prev_high': high,
'prev_low': low,
'prev_close': close
}
logger.debug(f"Pivot points for {symbol}: P={pivot:.2f}, R1={r1:.2f}, S1={s1:.2f}")
return pivots
except Exception as e:
logger.error(f"Error calculating pivot points for {symbol}: {e}")
return {}
def _get_pivot_signal_strength(self, symbol: str, current_price: float, action: str) -> float:
"""Get signal strength based on proximity to pivot points"""
try:
pivots = self._calculate_pivot_points(symbol)
if not pivots:
return 1.0 # Default strength if no pivots available
pivot = pivots['pivot']
r1, r2, r3 = pivots['r1'], pivots['r2'], pivots['r3']
s1, s2, s3 = pivots['s1'], pivots['s2'], pivots['s3']
# Calculate distance to nearest pivot levels
distances = {
'pivot': abs(current_price - pivot),
'r1': abs(current_price - r1),
'r2': abs(current_price - r2),
'r3': abs(current_price - r3),
's1': abs(current_price - s1),
's2': abs(current_price - s2),
's3': abs(current_price - s3)
}
# Find nearest level
nearest_level = min(distances.keys(), key=lambda k: distances[k])
nearest_distance = distances[nearest_level]
nearest_price = pivots[nearest_level]
# Calculate signal strength based on action and pivot context
strength = 1.0
if action == 'BUY':
# Stronger buy signals near support levels
if nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price:
strength = 1.5 # Boost buy signals at support
elif nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price:
strength = 0.7 # Reduce buy signals at resistance
elif action == 'SELL':
# Stronger sell signals near resistance levels
if nearest_level in ['r1', 'r2', 'r3'] and current_price >= nearest_price:
strength = 1.5 # Boost sell signals at resistance
elif nearest_level in ['s1', 's2', 's3'] and current_price <= nearest_price:
strength = 0.7 # Reduce sell signals at support
logger.debug(f"Pivot signal strength for {symbol} {action}: {strength:.2f} "
f"(near {nearest_level} at ${nearest_price:.2f}, current ${current_price:.2f})")
return strength
except Exception as e:
logger.error(f"Error calculating pivot signal strength: {e}")
return 1.0
def _get_current_price_from_data_provider(self, symbol: str) -> Optional[float]:
"""Get current price from data provider for most up-to-date information"""
try:
from core.data_provider import DataProvider
data_provider = DataProvider()
# Try to get real-time price first
current_price = data_provider.get_current_price(symbol)
if current_price and current_price > 0:
return float(current_price)
# Fallback to latest 1m candle
df = data_provider.get_historical_data(symbol, '1m', limit=1, refresh=True)
if df is not None and len(df) > 0:
return float(df.iloc[-1]['close'])
logger.warning(f"Could not get current price for {symbol} from data provider")
return None
except Exception as e:
logger.error(f"Error getting current price from data provider for {symbol}: {e}")
return None
def _check_position_size_limit(self) -> bool:
"""Check if total open position value exceeds the maximum allowed percentage of balance"""
try:
# Get account balance
account_balance = self._get_account_balance_for_sizing()
# Get maximum position percentage (default 20%)
max_position_percentage = self.mexc_config.get('max_position_percentage', 0.20)
max_position_value = account_balance * max_position_percentage
# Calculate total current position value
total_position_value = 0.0
# ENHANCED: Also check exchange positions to ensure we don't miss any
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for pos in exchange_positions:
symbol = pos.get('symbol', '').replace('USDT', '/USDT')
size = float(pos.get('size', 0))
entry_price = float(pos.get('entry_price', 0))
if size > 0 and symbol:
# Check if this position is also in our local state
if symbol not in self.positions:
logger.warning(f"POSITION LIMIT: Found untracked exchange position for {symbol}: {size} @ ${entry_price:.2f}")
# Add to total even if not in local state
position_value = size * entry_price
total_position_value += position_value
logger.debug(f"Exchange position {symbol}: {size:.6f} @ ${entry_price:.2f} = ${position_value:.2f}")
except Exception as e:
logger.debug(f"Error checking exchange positions for limit: {e}")
# Add existing local positions
for symbol, position in self.positions.items():
# Get current price for the symbol
try:
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
current_price = ticker['last'] if ticker and 'last' in ticker else position.entry_price
else:
# Simulation mode - use entry price or default
current_price = position.entry_price
except Exception:
# Fallback to entry price if we can't get current price
current_price = position.entry_price
# Calculate position value
position_value = position.quantity * current_price
total_position_value += position_value
logger.debug(f"Existing position {symbol}: {position.quantity:.6f} @ ${current_price:.2f} = ${position_value:.2f}")
# Add potential value from open orders that could become positions
if not self.simulation_mode and self.exchange:
try:
open_orders = self.exchange.get_open_orders()
for order in open_orders:
if order.get('side', '').lower() in ['buy', 'sell']:
# Estimate the value if this order gets filled
order_quantity = float(order.get('quantity', 0))
order_price = float(order.get('price', 0))
if order_price > 0:
order_value = order_quantity * order_price
total_position_value += order_value
logger.debug(f"Open order {order.get('symbol')}: {order_quantity:.6f} @ ${order_price:.2f} = ${order_value:.2f}")
except Exception as e:
logger.debug(f"Error calculating open order values: {e}")
# Check if we would exceed the limit
if total_position_value >= max_position_value:
logger.warning(f"Position size limit reached: ${total_position_value:.2f} >= ${max_position_value:.2f} ({max_position_percentage*100:.0f}% of balance)")
return False
logger.debug(f"Position size check passed: ${total_position_value:.2f} < ${max_position_value:.2f} ({max_position_percentage*100:.0f}% of balance)")
return True
except Exception as e:
logger.error(f"Error checking position size limit: {e}")
# Allow trade if we can't check the limit
return True
def update_positions(self, symbol: str, current_price: float):
"""Update position P&L with current market price"""
if symbol in self.positions:
with self.lock:
# Get leverage configuration from primary exchange
leverage_applied_by_exchange = False
if hasattr(self, 'primary_config'):
leverage_applied_by_exchange = self.primary_config.get('leverage_applied_by_exchange', False)
# Get configured leverage
leverage = 1.0
if hasattr(self, 'primary_config'):
leverage = self.primary_config.get('leverage', 1.0)
self.positions[symbol].calculate_pnl(
current_price,
leverage=leverage,
leverage_applied_by_exchange=leverage_applied_by_exchange
)
def get_positions(self) -> Dict[str, Position]:
"""Get current positions"""
return self.positions.copy()
def get_trade_history(self) -> List[TradeRecord]:
"""Get trade history"""
return self.trade_history.copy()
def get_daily_stats(self) -> Dict[str, Any]:
"""Get daily trading statistics with enhanced fee analysis"""
total_pnl = sum(trade.pnl for trade in self.trade_history)
total_fees = sum(trade.fees for trade in self.trade_history)
gross_pnl = total_pnl + total_fees # P&L before fees
winning_trades = len([t for t in self.trade_history if t.pnl > 0.001]) # Avoid rounding issues
losing_trades = len([t for t in self.trade_history if t.pnl < -0.001]) # Avoid rounding issues
total_trades = len(self.trade_history)
breakeven_trades = total_trades - winning_trades - losing_trades
# Calculate average trade values
avg_trade_pnl = total_pnl / max(1, total_trades)
avg_trade_fee = total_fees / max(1, total_trades)
avg_winning_trade = sum(t.pnl for t in self.trade_history if t.pnl > 0.001) / max(1, winning_trades)
avg_losing_trade = sum(t.pnl for t in self.trade_history if t.pnl < -0.001) / max(1, losing_trades)
# Enhanced fee analysis from config
fee_structure = self.mexc_config.get('trading_fees', {})
maker_fee_rate = fee_structure.get('maker', 0.0000)
taker_fee_rate = fee_structure.get('taker', 0.0005)
default_fee_rate = fee_structure.get('default', 0.0005)
# Calculate fee efficiency
total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history)
effective_fee_rate = (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0
fee_impact_on_pnl = (total_fees / max(0.01, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0
return {
'daily_trades': self.daily_trades,
'daily_loss': self.daily_loss,
'total_pnl': total_pnl,
'gross_pnl': gross_pnl,
'total_fees': total_fees,
'winning_trades': winning_trades,
'losing_trades': losing_trades,
'breakeven_trades': breakeven_trades,
'total_trades': total_trades,
'win_rate': winning_trades / max(1, total_trades),
'avg_trade_pnl': avg_trade_pnl,
'avg_trade_fee': avg_trade_fee,
'avg_winning_trade': avg_winning_trade,
'avg_losing_trade': avg_losing_trade,
'positions_count': len(self.positions),
'fee_rates': {
'maker': f"{maker_fee_rate*100:.3f}%",
'taker': f"{taker_fee_rate*100:.3f}%",
'default': f"{default_fee_rate*100:.3f}%",
'effective': f"{effective_fee_rate*100:.3f}%" # Actual rate based on trades
},
'fee_analysis': {
'total_volume': total_volume,
'fee_impact_percent': fee_impact_on_pnl,
'is_fee_efficient': fee_impact_on_pnl < 5.0, # Less than 5% impact is good
'fee_savings_vs_market': (0.001 - effective_fee_rate) * total_volume if effective_fee_rate < 0.001 else 0
}
}
def emergency_stop(self):
"""Emergency stop all trading"""
logger.warning("EMERGENCY STOP ACTIVATED")
self.trading_enabled = False
# Close all positions if in live mode
if not self.dry_run:
for symbol, position in self.positions.items():
try:
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
if ticker:
self._execute_sell(symbol, 1.0, ticker['last'])
else:
# Simulation mode - use entry price for closing
self._execute_sell(symbol, 1.0, position.entry_price)
except Exception as e:
logger.error(f"Error closing position {symbol} during emergency stop: {e}")
def reset_daily_stats(self):
"""Reset daily statistics (call at start of new day)"""
self.daily_trades = 0
self.daily_loss = 0.0
logger.info("Daily trading statistics reset")
def get_account_balance(self) -> Dict[str, Dict[str, float]]:
"""Get account balance information from the primary exchange (universal method).
Returns:
Dict with asset balances in format:
{
'USDT': {'free': 100.0, 'locked': 0.0, 'total': 100.0, 'type': 'spot'},
'ETH': {'free': 0.5, 'locked': 0.0, 'total': 0.5, 'type': 'spot'},
...
}
"""
try:
if not self.exchange:
logger.error("Exchange interface not available")
return {}
# Use the universal get_all_balances method that works with all exchanges
if hasattr(self.exchange, 'get_all_balances'):
raw_balances = self.exchange.get_all_balances()
if raw_balances:
# Convert to the expected format with 'type' field
combined_balances = {}
for asset, balance_data in raw_balances.items():
if isinstance(balance_data, dict):
combined_balances[asset] = {
'free': balance_data.get('free', 0.0),
'locked': balance_data.get('locked', 0.0),
'total': balance_data.get('total', 0.0),
'type': 'spot' # Default to spot for now
}
logger.info(f"Retrieved balances for {len(combined_balances)} assets from {self.primary_name}")
return combined_balances
else:
logger.warning(f"No balances returned from {self.primary_name} exchange")
return {}
else:
logger.error(f"Exchange {self.primary_name} does not support get_all_balances method")
return {}
except Exception as e:
logger.error(f"Error getting account balance: {e}")
return {}
def _calculate_real_trading_fees(self, order_result: Dict[str, Any], symbol: str,
quantity: float, price: float) -> float:
"""Calculate trading fees using real API data when available
Args:
order_result: Order result from exchange API
symbol: Trading symbol
quantity: Order quantity
price: Execution price
Returns:
float: Trading fee amount in quote currency
"""
try:
# Try to get actual fee from API response first
if order_result and 'fills' in order_result:
total_commission = 0.0
for fill in order_result['fills']:
commission = float(fill.get('commission', 0))
total_commission += commission
if total_commission > 0:
logger.info(f"Using real API fee: {total_commission}")
return total_commission
# Fall back to config-based calculation
return self._calculate_trading_fee(order_result, symbol, quantity, price)
except Exception as e:
logger.warning(f"Error calculating real fees: {e}, falling back to config-based")
return self._calculate_trading_fee(order_result, symbol, quantity, price)
def _calculate_trading_fee(self, order_result: Dict[str, Any], symbol: str,
quantity: float, price: float) -> float:
"""Calculate trading fee based on order execution details with enhanced MEXC API support
Args:
order_result: Order result from exchange API
symbol: Trading symbol
quantity: Order quantity
price: Execution price
Returns:
float: Trading fee amount in quote currency
"""
try:
# 1. Try to get actual fee from API response (most accurate)
# MEXC API can return fees in different formats depending on the endpoint
# Check for 'fills' array (most common for filled orders)
if order_result and 'fills' in order_result:
total_commission = 0.0
commission_asset = None
for fill in order_result['fills']:
commission = float(fill.get('commission', 0))
commission_asset = fill.get('commissionAsset', '')
total_commission += commission
if total_commission > 0:
logger.info(f"Using actual API fee from fills: {total_commission} {commission_asset}")
# If commission is in different asset, we might need conversion
# For now, assume it's in quote currency (USDC/USDT)
return total_commission
# 2. Check if order result has fee information directly
fee_fields = ['fee', 'commission', 'tradeFee', 'fees']
for field in fee_fields:
if order_result and field in order_result:
fee = float(order_result[field])
if fee > 0:
logger.info(f"Using API fee field '{field}': {fee}")
return fee
# 3. Check for executedQty and cummulativeQuoteQty for more accurate calculation
if order_result and 'executedQty' in order_result and 'cummulativeQuoteQty' in order_result:
executed_qty = float(order_result['executedQty'])
executed_value = float(order_result['cummulativeQuoteQty'])
if executed_qty > 0 and executed_value > 0:
# Use executed values instead of provided price/quantity
quantity = executed_qty
price = executed_value / executed_qty
logger.info(f"Using executed order data: {quantity} @ {price:.6f}")
# 4. Fall back to config-based fee calculation with enhanced logic
trading_fees = self.mexc_config.get('trading_fees', {})
# Determine if this was a maker or taker trade
order_type = order_result.get('type', 'MARKET') if order_result else 'MARKET'
order_status = order_result.get('status', 'UNKNOWN') if order_result else 'UNKNOWN'
time_in_force = order_result.get('timeInForce', 'GTC') if order_result else 'GTC'
# Enhanced maker/taker detection logic
if order_type.upper() == 'LIMIT':
# For limit orders, check execution speed and market conditions
if order_status == 'FILLED':
# If it's an IOC (Immediate or Cancel) order, it's likely a taker
if time_in_force == 'IOC' or time_in_force == 'FOK':
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for {time_in_force} limit order: {fee_rate*100:.3f}%")
else:
# For GTC orders, assume taker if aggressive pricing is used
# This is a heuristic based on our trading strategy
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for aggressive limit order: {fee_rate*100:.3f}%")
else:
# If not immediately filled, likely a maker (though we don't usually reach here)
fee_rate = trading_fees.get('maker', 0.0000)
logger.info(f"Using maker fee rate for pending/partial limit order: {fee_rate*100:.3f}%")
elif order_type.upper() == 'LIMIT_MAKER':
# LIMIT_MAKER orders are guaranteed to be makers
fee_rate = trading_fees.get('maker', 0.0000)
logger.info(f"Using maker fee rate for LIMIT_MAKER order: {fee_rate*100:.3f}%")
else:
# Market orders and other types are always takers
fee_rate = trading_fees.get('taker', 0.0005)
logger.info(f"Using taker fee rate for {order_type} order: {fee_rate*100:.3f}%")
# Calculate fee amount
trade_value = quantity * price
fee_amount = trade_value * fee_rate
logger.info(f"Calculated fee: ${fee_amount:.6f} ({fee_rate*100:.3f}% of ${trade_value:.2f})")
return fee_amount
except Exception as e:
logger.warning(f"Error calculating trading fee: {e}")
# Ultimate fallback using default rate
default_fee_rate = self.mexc_config.get('trading_fees', {}).get('default', 0.0005)
fallback_rate = self.mexc_config.get('trading_fee', default_fee_rate) # Legacy support
fee_amount = quantity * price * fallback_rate
logger.info(f"Using fallback fee: ${fee_amount:.6f} ({fallback_rate*100:.3f}%)")
return fee_amount
def get_fee_analysis(self) -> Dict[str, Any]:
"""Get detailed fee analysis and statistics
Returns:
Dict with fee breakdowns, rates, and impact analysis
"""
try:
fee_structure = self.mexc_config.get('trading_fees', {})
maker_rate = fee_structure.get('maker', 0.0000)
taker_rate = fee_structure.get('taker', 0.0005)
default_rate = fee_structure.get('default', 0.0005)
# Calculate total fees paid
total_fees = sum(trade.fees for trade in self.trade_history)
total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history)
# Estimate fee breakdown (since we don't track maker vs taker separately)
# Assume most of our limit orders are takers due to our pricing strategy
estimated_taker_volume = total_volume * 0.9 # 90% taker assumption
estimated_maker_volume = total_volume * 0.1 # 10% maker assumption
estimated_taker_fees = estimated_taker_volume * taker_rate
estimated_maker_fees = estimated_maker_volume * maker_rate
# Fee impact analysis
total_pnl = sum(trade.pnl for trade in self.trade_history)
gross_pnl = total_pnl + total_fees
fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0
return {
'fee_rates': {
'maker': {
'rate': maker_rate,
'rate_percent': f"{maker_rate*100:.3f}%"
},
'taker': {
'rate': taker_rate,
'rate_percent': f"{taker_rate*100:.3f}%"
},
'default': {
'rate': default_rate,
'rate_percent': f"{default_rate*100:.3f}%"
}
},
'total_fees': total_fees,
'total_volume': total_volume,
'estimated_breakdown': {
'taker_fees': estimated_taker_fees,
'maker_fees': estimated_maker_fees,
'taker_volume': estimated_taker_volume,
'maker_volume': estimated_maker_volume
},
'impact_analysis': {
'fee_impact_percent': fee_impact_percent,
'pnl_after_fees': total_pnl,
'pnl_before_fees': gross_pnl,
'avg_fee_per_trade': total_fees / max(1, len(self.trade_history))
},
'fee_efficiency': {
'volume_to_fee_ratio': total_volume / max(0.01, total_fees),
'is_efficient': fee_impact_percent < 5.0 # Less than 5% impact is good
}
}
except Exception as e:
logger.error(f"Error calculating fee analysis: {e}")
return {
'error': str(e),
'fee_rates': {
'maker': {'rate': 0.0000, 'rate_percent': '0.000%'},
'taker': {'rate': 0.0005, 'rate_percent': '0.050%'}
}
}
def sync_fees_with_api(self, force: bool = False) -> Dict[str, Any]:
"""Manually trigger fee synchronization with MEXC API
Args:
force: Force sync even if last sync was recent
Returns:
dict: Sync result with status and details
"""
if not self.config_synchronizer:
return {
'status': 'error',
'error': 'Config synchronizer not initialized'
}
try:
logger.info("TRADING EXECUTOR: Manual fee sync requested")
sync_result = self.config_synchronizer.sync_trading_fees(force=force)
# If fees were updated, reload config
if sync_result.get('changes_made'):
logger.info("TRADING EXECUTOR: Reloading config after fee sync")
self.config = get_config(self.config_synchronizer.config_path)
# Update to use primary exchange config
self.exchanges_config = self.config.get('exchanges', {})
self.primary_name = self.exchanges_config.get('primary', 'mexc')
self.primary_config = self.exchanges_config.get(self.primary_name, {})
self.mexc_config = self.primary_config # Legacy compatibility
return sync_result
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error in manual fee sync: {e}")
return {
'status': 'error',
'error': str(e)
}
def auto_sync_fees_if_needed(self) -> bool:
"""Automatically sync fees if needed (called periodically)
Returns:
bool: True if sync was performed successfully
"""
if not self.config_synchronizer:
return False
try:
return self.config_synchronizer.auto_sync_fees()
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error in auto fee sync: {e}")
return False
def get_fee_sync_status(self) -> Dict[str, Any]:
"""Get current fee synchronization status
Returns:
dict: Fee sync status and history
"""
if not self.config_synchronizer:
return {
'sync_available': False,
'error': 'Config synchronizer not initialized'
}
try:
status = self.config_synchronizer.get_sync_status()
status['sync_available'] = True
return status
except Exception as e:
logger.error(f"TRADING EXECUTOR: Error getting sync status: {e}")
return {
'sync_available': False,
'error': str(e)
}
def execute_trade(self, symbol: str, action: str, quantity: float) -> bool:
"""Execute a trade directly (compatibility method for dashboard)
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
action: Trading action ('BUY', 'SELL')
quantity: Quantity to trade
Returns:
bool: True if trade executed successfully
"""
try:
# Get current price
current_price = None
# Always get real current price - never use simulated data
current_price = self._get_real_current_price(symbol)
if current_price is None:
logger.error(f"Failed to get real current price for {symbol}")
return False
# Calculate confidence based on manual trade (high confidence)
confidence = 1.0
# Execute using the existing signal execution method
return self.execute_signal(symbol, action, confidence, current_price)
except Exception as e:
logger.error(f"Error executing trade {action} for {symbol}: {e}")
return False
def get_closed_trades(self) -> List[Dict[str, Any]]:
"""Get closed trades in dashboard format"""
try:
trades = []
for trade in self.trade_history:
trade_dict = {
'symbol': trade.symbol,
'side': trade.side,
'quantity': trade.quantity,
'entry_price': trade.entry_price,
'exit_price': trade.exit_price,
'entry_time': trade.entry_time,
'exit_time': trade.exit_time,
'pnl': trade.pnl,
'fees': trade.fees,
'confidence': trade.confidence,
'hold_time_seconds': trade.hold_time_seconds
}
trades.append(trade_dict)
return trades
except Exception as e:
logger.error(f"Error getting closed trades: {e}")
return []
def get_current_position(self, symbol: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Get current position for a symbol or all positions
Args:
symbol: Optional symbol to get position for. If None, returns first position.
Returns:
dict: Position information or None if no position
"""
try:
if symbol:
if symbol in self.positions:
pos = self.positions[symbol]
return {
'symbol': pos.symbol,
'side': pos.side,
'size': pos.quantity,
'price': pos.entry_price,
'entry_time': pos.entry_time,
'unrealized_pnl': pos.unrealized_pnl
}
return None
else:
# Return first position if no symbol specified
if self.positions:
first_symbol = list(self.positions.keys())[0]
return self.get_current_position(first_symbol)
return None
except Exception as e:
logger.error(f"Error getting current position: {e}")
return None
def get_leverage(self) -> float:
"""Get current leverage setting"""
return self.mexc_config.get('leverage', 50.0)
def set_leverage(self, leverage: float) -> bool:
"""Set leverage (for UI control)
Args:
leverage: New leverage value
Returns:
bool: True if successful
"""
try:
# Update in-memory config
self.mexc_config['leverage'] = leverage
logger.info(f"TRADING EXECUTOR: Leverage updated to {leverage}x")
return True
except Exception as e:
logger.error(f"Error setting leverage: {e}")
return False
def get_account_info(self) -> Dict[str, Any]:
"""Get account information for UI display"""
try:
account_balance = self._get_account_balance_for_sizing()
leverage = self.get_leverage()
return {
'account_balance': account_balance,
'leverage': leverage,
'trading_mode': self.trading_mode,
'simulation_mode': self.simulation_mode,
'trading_enabled': self.trading_enabled,
'position_sizing': {
'base_percent': self.mexc_config.get('base_position_percent', 5.0),
'max_percent': self.mexc_config.get('max_position_percent', 20.0),
'min_percent': self.mexc_config.get('min_position_percent', 2.0)
}
}
except Exception as e:
logger.error(f"Error getting account info: {e}")
return {
'account_balance': 100.0,
'leverage': 50.0,
'trading_mode': 'simulation',
'simulation_mode': True,
'trading_enabled': False,
'position_sizing': {
'base_percent': 5.0,
'max_percent': 20.0,
'min_percent': 2.0
}
}
def set_test_mode(self, enabled: bool = True):
"""Enable test mode to bypass safety checks for testing"""
self._test_mode = enabled
if enabled:
logger.info("TRADING EXECUTOR: Test mode enabled - bypassing safety checks")
else:
logger.info("TRADING EXECUTOR: Test mode disabled - normal safety checks active")
def set_trading_mode(self, mode: str) -> bool:
"""Set trading mode (simulation/live) and update all related settings
Args:
mode: Trading mode ('simulation' or 'live')
Returns:
bool: True if mode was set successfully
"""
try:
if mode not in ['simulation', 'live']:
logger.error(f"Invalid trading mode: {mode}. Must be 'simulation' or 'live'")
return False
# Store original mode if not already stored
if not hasattr(self, 'original_trading_mode'):
self.original_trading_mode = self.trading_mode
# Update trading mode
self.trading_mode = mode
self.simulation_mode = (mode == 'simulation')
# Update primary config if available
if hasattr(self, 'primary_config') and self.primary_config:
self.primary_config['trading_mode'] = mode
# Log the change
if mode == 'live':
logger.warning("TRADING EXECUTOR: MODE CHANGED TO LIVE - Real orders will be executed!")
else:
logger.info("TRADING EXECUTOR: MODE CHANGED TO SIMULATION - Orders are simulated")
return True
except Exception as e:
logger.error(f"Error setting trading mode to {mode}: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get trading executor status with safety feature information"""
try:
# Get account balance
if self.simulation_mode:
balance = self.simulation_balance
else:
balance = self.exchange.get_balance('USDT') if self.exchange else 0.0
# Get open positions
positions = self.get_positions()
# Calculate total fees paid
total_fees = sum(trade.fees for trade in self.trade_history)
total_volume = sum(trade.quantity * trade.exit_price for trade in self.trade_history)
# Estimate fee breakdown (since we don't track maker vs taker separately)
maker_fee_rate = self.exchange_config.get('maker_fee', 0.0002)
taker_fee_rate = self.exchange_config.get('taker_fee', 0.0006)
avg_fee_rate = (maker_fee_rate + taker_fee_rate) / 2
# Fee impact analysis
total_pnl = sum(trade.pnl for trade in self.trade_history)
gross_pnl = total_pnl + total_fees
fee_impact_percent = (total_fees / max(1, abs(gross_pnl))) * 100 if gross_pnl != 0 else 0
# Calculate success rate for recent trades
recent_trades = self.trade_history[-self.trades_to_evaluate:] if len(self.trade_history) >= self.trades_to_evaluate else self.trade_history
winning_trades = sum(1 for trade in recent_trades if trade.pnl > 0.001) if recent_trades else 0
success_rate = (winning_trades / len(recent_trades)) if recent_trades else 0
# Safety feature status
safety_status = {
'active': self.safety_triggered,
'consecutive_losses': self.consecutive_losses,
'max_consecutive_losses': self.max_consecutive_losses,
'original_mode': self.original_trading_mode if self.safety_triggered else self.trading_mode,
'success_rate': success_rate,
'min_success_rate_to_reenable': self.min_success_rate_to_reenable,
'trades_evaluated': len(recent_trades),
'trades_needed': self.trades_to_evaluate,
'can_reenable': self._can_reenable_live_trading() if self.safety_triggered else False
}
return {
'trading_enabled': self.trading_enabled,
'simulation_mode': self.simulation_mode,
'trading_mode': self.trading_mode,
'balance': balance,
'positions': len(positions),
'daily_trades': self.daily_trades,
'daily_pnl': self.daily_pnl,
'daily_loss': self.daily_loss,
'consecutive_losses': self.consecutive_losses,
'total_trades': len(self.trade_history),
'safety_feature': safety_status,
'pnl': {
'total': total_pnl,
'gross': gross_pnl,
'fees': total_fees,
'fee_impact_percent': fee_impact_percent,
'pnl_after_fees': total_pnl,
'pnl_before_fees': gross_pnl,
'avg_fee_per_trade': total_fees / max(1, len(self.trade_history))
},
'fee_efficiency': {
'total_volume': total_volume,
'total_fees': total_fees,
'effective_fee_rate': (total_fees / max(0.01, total_volume)) if total_volume > 0 else 0,
'expected_fee_rate': avg_fee_rate,
'fee_efficiency': (avg_fee_rate / ((total_fees / max(0.01, total_volume)) if total_volume > 0 else 1)) if avg_fee_rate > 0 else 0
}
}
except Exception as e:
logger.error(f"Error getting trading executor status: {e}")
return {
'trading_enabled': self.trading_enabled,
'simulation_mode': self.simulation_mode,
'trading_mode': self.trading_mode,
'error': str(e)
}
def sync_position_with_mexc(self, symbol: str, desired_state: str) -> bool:
"""Synchronize dashboard position state with actual MEXC account positions
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
desired_state: Desired position state ('NO_POSITION', 'LONG', 'SHORT')
Returns:
bool: True if synchronization successful
"""
try:
logger.info(f"POSITION SYNC: Starting sync for {symbol} - desired state: {desired_state}")
if self.simulation_mode:
logger.info("POSITION SYNC: Simulation mode - skipping MEXC account sync")
return True
# Step 1: Cancel all pending orders for the symbol
cancelled_orders = self._cancel_open_orders(symbol)
if cancelled_orders > 0:
logger.info(f"POSITION SYNC: Cancelled {cancelled_orders} pending orders for {symbol}")
time.sleep(1) # Wait for cancellations to process
# Step 2: Get current MEXC account balances and positions
current_balances = self._get_mexc_account_balances()
current_holdings = self._get_current_holdings(symbol, current_balances)
# Step 3: Determine current position state from MEXC account
current_state = self._determine_position_state(symbol, current_holdings)
logger.info(f"POSITION SYNC: Current MEXC state: {current_state}, Holdings: {current_holdings}")
# Step 4: If states match, no action needed
if current_state == desired_state:
logger.info(f"POSITION SYNC: States already match ({current_state}) - no action needed")
return True
# Step 5: Execute corrective trades based on state mismatch
return self._execute_corrective_trades(symbol, current_state, desired_state, current_holdings)
except Exception as e:
logger.error(f"POSITION SYNC ERROR: Failed to sync {symbol}: {e}")
import traceback
logger.error(f"POSITION SYNC: Full traceback: {traceback.format_exc()}")
return False
def _get_mexc_account_balances(self) -> Dict[str, Dict[str, float]]:
"""Get current MEXC account balances"""
try:
return self.exchange.get_all_balances()
except Exception as e:
logger.error(f"Failed to get MEXC account balances: {e}")
return {}
def _get_current_holdings(self, symbol: str, balances: Dict[str, Dict[str, float]]) -> Dict[str, Any]:
"""Extract current holdings for the symbol from account balances"""
try:
# Parse symbol to get base and quote assets
if '/' in symbol:
base_asset, quote_asset = symbol.split('/')
else:
# Handle symbols like ETHUSDT
if symbol.upper().endswith('USDT'):
base_asset = symbol[:-4]
quote_asset = 'USDT'
elif symbol.upper().endswith('USDC'):
base_asset = symbol[:-4]
quote_asset = 'USDC'
else:
logger.error(f"Cannot parse symbol: {symbol}")
return {'base': 0.0, 'quote': 0.0, 'base_asset': 'UNKNOWN', 'quote_asset': 'UNKNOWN'}
base_asset = base_asset.upper()
quote_asset = quote_asset.upper()
# Get balances for base and quote assets
base_balance = balances.get(base_asset, {}).get('total', 0.0)
quote_balance = balances.get(quote_asset, {}).get('total', 0.0)
# Also check USDC if quote is USDT (MEXC uses USDC for trading)
if quote_asset == 'USDT':
usdc_balance = balances.get('USDC', {}).get('total', 0.0)
quote_balance = max(quote_balance, usdc_balance)
return {
'base': base_balance,
'quote': quote_balance,
'base_asset': base_asset, # Note: This contains string values but method returns Dict[str, float]
'quote_asset': quote_asset # We'll handle this in the calling method
}
except Exception as e:
logger.error(f"Error getting current holdings for {symbol}: {e}")
return {'base': 0.0, 'quote': 0.0, 'base_asset': 'UNKNOWN', 'quote_asset': 'UNKNOWN'}
def _determine_position_state(self, symbol: str, holdings: Dict[str, Any]) -> str:
"""Determine position state from current holdings"""
try:
base_balance = holdings.get('base', 0.0)
quote_balance = holdings.get('quote', 0.0)
# Minimum balance thresholds (to ignore dust)
min_base_threshold = 0.001 # 0.001 ETH minimum
min_quote_threshold = 1.0 # $1 minimum
has_base = base_balance >= min_base_threshold
has_quote = quote_balance >= min_quote_threshold
if has_base and not has_quote:
return 'LONG' # Holding crypto asset
elif not has_base and has_quote:
return 'SHORT' # Holding only fiat (after selling crypto)
elif has_base and has_quote:
# Mixed holdings - determine which is larger
try:
current_price = self._get_current_price_for_sync(symbol)
if current_price:
base_value = base_balance * current_price
if base_value > quote_balance * 1.5: # 50% threshold
return 'LONG'
else:
return 'SHORT'
except:
return 'LONG' # Default to LONG if price unavailable
else:
return 'NO_POSITION' # No significant holdings
except Exception as e:
logger.error(f"Error determining position state: {e}")
return 'NO_POSITION'
def _get_current_price_for_sync(self, symbol: str) -> Optional[float]:
"""Get current price for position synchronization"""
try:
if self.exchange:
ticker = self.exchange.get_ticker(symbol)
if ticker and 'last' in ticker:
return float(ticker['last'])
else:
# Get real current price - never use simulated data
return self._get_real_current_price(symbol)
return None
except Exception as e:
logger.error(f"Error getting current price for sync: {e}")
return None
def _execute_corrective_trades(self, symbol: str, current_state: str, desired_state: str, holdings: Dict[str, float]) -> bool:
"""Execute trades to correct position state mismatch"""
try:
logger.info(f"CORRECTIVE TRADE: {current_state} -> {desired_state} for {symbol}")
current_price = self._get_current_price_for_sync(symbol)
if not current_price:
logger.error("Cannot execute corrective trades without current price")
return False
base_balance = holdings.get('base', 0.0)
quote_balance = holdings.get('quote', 0.0)
base_asset = holdings.get('base_asset', 'ETH')
if desired_state == 'NO_POSITION':
# Need to sell all crypto holdings
if base_balance > 0.001: # Minimum to avoid dust
logger.info(f"CORRECTIVE: Selling {base_balance:.6f} {base_asset} to reach NO_POSITION")
result = self._place_order_with_retry(symbol, 'SELL', 'LIMIT', base_balance, current_price * 0.999)
if result:
# Wait for order fill and update internal position tracking
time.sleep(2)
if symbol in self.positions:
del self.positions[symbol]
logger.info(f"CORRECTIVE: Successfully sold holdings for NO_POSITION")
return True
else:
logger.error("CORRECTIVE: Failed to sell holdings")
return False
else:
logger.info("CORRECTIVE: Already at NO_POSITION (no crypto holdings)")
return True
elif desired_state == 'LONG':
# Need to buy crypto with available quote currency
if quote_balance < 10.0: # Minimum order value
logger.warning(f"CORRECTIVE: Insufficient quote balance ({quote_balance:.2f}) for LONG position")
return False
# Use 95% of quote balance for the trade (leaving some for fees)
trade_amount = quote_balance * 0.95
quantity = trade_amount / current_price
logger.info(f"CORRECTIVE: Buying {quantity:.6f} {base_asset} with ${trade_amount:.2f} for LONG position")
result = self._place_order_with_retry(symbol, 'BUY', 'LIMIT', quantity, current_price * 1.001)
if result:
# Update internal position tracking
time.sleep(2)
self.positions[symbol] = Position(
symbol=symbol,
side='LONG',
quantity=quantity,
entry_price=current_price,
entry_time=datetime.now(),
order_id=result.get('orderId', f"corrective_{int(time.time())}")
)
logger.info(f"CORRECTIVE: Successfully established LONG position")
return True
else:
logger.error("CORRECTIVE: Failed to buy for LONG position")
return False
elif desired_state == 'SHORT':
# Need to sell crypto holdings to get to cash-only position
if base_balance > 0.001:
logger.info(f"CORRECTIVE: Selling {base_balance:.6f} {base_asset} for SHORT position")
result = self._place_order_with_retry(symbol, 'SELL', 'LIMIT', base_balance, current_price * 0.999)
if result:
# Update internal position tracking for SHORT
time.sleep(2)
# For spot trading, SHORT means we sold our crypto and are holding fiat
# This is effectively being "short" the crypto asset
self.positions[symbol] = Position(
symbol=symbol,
side='SHORT',
quantity=base_balance, # Track the amount we sold
entry_price=current_price,
entry_time=datetime.now(),
order_id=result.get('orderId', f"corrective_{int(time.time())}")
)
logger.info(f"CORRECTIVE: Successfully established SHORT position")
return True
else:
logger.error("CORRECTIVE: Failed to sell for SHORT position")
return False
else:
logger.info("CORRECTIVE: Already in SHORT position (holding fiat only)")
return True
else:
logger.error(f"CORRECTIVE: Unknown desired state: {desired_state}")
return False
except Exception as e:
logger.error(f"Error executing corrective trades: {e}")
import traceback
logger.error(f"CORRECTIVE: Full traceback: {traceback.format_exc()}")
return False
def recalculate_all_trade_records(self):
"""Recalculate all existing trade records with correct leverage and PnL"""
logger.info("Recalculating all trade records with correct leverage and PnL...")
updated_count = 0
for i, trade in enumerate(self.trade_history):
try:
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = trade.entry_price * trade.quantity
# Calculate gross PnL (before fees) with leverage
if trade.side == 'LONG':
gross_pnl = (trade.exit_price - trade.entry_price) * trade.quantity * leverage
else: # SHORT
gross_pnl = (trade.entry_price - trade.exit_price) * trade.quantity * leverage
# Calculate fees (0.1% open + 0.1% close = 0.2% total)
entry_value = trade.entry_price * trade.quantity
exit_value = trade.exit_price * trade.quantity
fees = (entry_value + exit_value) * 0.001
# Calculate net PnL (after fees)
net_pnl = gross_pnl - fees
# Update trade record with corrected values
trade.leverage = leverage
trade.position_size_usd = position_size_usd
trade.gross_pnl = gross_pnl
trade.net_pnl = net_pnl
trade.pnl = net_pnl # Main PnL field
trade.fees = fees
updated_count += 1
except Exception as e:
logger.error(f"Error recalculating trade record {i}: {e}")
continue
logger.info(f"Updated {updated_count} trade records with correct leverage and PnL calculations")
# Also update trade_records list if it exists
if hasattr(self, 'trade_records') and self.trade_records:
logger.info("Updating trade_records list...")
for i, trade in enumerate(self.trade_records):
try:
# Get current leverage setting
leverage = self.get_leverage()
# Calculate position size in USD
position_size_usd = trade.entry_price * trade.quantity
# Calculate gross PnL (before fees) with leverage
if trade.side == 'LONG':
gross_pnl = (trade.exit_price - trade.entry_price) * trade.quantity * leverage
else: # SHORT
gross_pnl = (trade.entry_price - trade.exit_price) * trade.quantity * leverage
# Calculate fees (0.1% open + 0.1% close = 0.2% total)
entry_value = trade.entry_price * trade.quantity
exit_value = trade.exit_price * trade.quantity
fees = (entry_value + exit_value) * 0.001
# Calculate net PnL (after fees)
net_pnl = gross_pnl - fees
# Update trade record with corrected values
trade.leverage = leverage
trade.position_size_usd = position_size_usd
trade.gross_pnl = gross_pnl
trade.net_pnl = net_pnl
trade.pnl = net_pnl # Main PnL field
trade.fees = fees
except Exception as e:
logger.error(f"Error recalculating trade_records entry {i}: {e}")
continue
logger.info("Trade record recalculation completed")