trading risk management

This commit is contained in:
Dobromir Popov
2025-07-28 16:42:11 +03:00
parent 44821b2a89
commit db23ad10da
3 changed files with 305 additions and 90 deletions

View File

@ -260,8 +260,113 @@ class TradingExecutor:
elif self.trading_enabled and self.exchange:
logger.info(f"TRADING EXECUTOR: Using {self.primary_name.upper()} exchange - fee sync not available")
# Sync positions from exchange on startup if in live mode
if not self.simulation_mode and self.exchange and self.trading_enabled:
self._sync_positions_on_startup()
logger.info(f"Trading Executor initialized - Exchange: {self.primary_name.upper()}, Mode: {self.trading_mode}, Enabled: {self.trading_enabled}")
def _sync_positions_on_startup(self):
"""Sync positions from exchange on startup"""
try:
logger.info("TRADING EXECUTOR: Syncing positions from exchange on startup...")
# Get all open positions from exchange
if hasattr(self.exchange, 'get_positions'):
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for position in exchange_positions:
symbol = position.get('symbol', '').replace('USDT', '/USDT')
size = float(position.get('size', 0))
side = position.get('side', '').upper()
entry_price = float(position.get('entry_price', 0))
if size > 0 and symbol and side in ['LONG', 'SHORT']:
# Create position object
pos_obj = Position(
symbol=symbol,
side=side,
quantity=size,
entry_price=entry_price,
entry_time=datetime.now()
)
self.positions[symbol] = pos_obj
logger.info(f"POSITION SYNC: Found {side} position for {symbol}: {size} @ ${entry_price:.2f}")
logger.info(f"POSITION SYNC: Synced {len(self.positions)} positions from exchange")
else:
logger.warning("Exchange does not support position retrieval")
except Exception as e:
logger.error(f"POSITION SYNC: Error syncing positions on startup: {e}")
def _sync_single_position_from_exchange(self, symbol: str, exchange_position: dict):
"""Sync a single position from exchange to local state"""
try:
size = float(exchange_position.get('size', 0))
side = exchange_position.get('side', '').upper()
entry_price = float(exchange_position.get('entry_price', 0))
if size > 0 and side in ['LONG', 'SHORT']:
pos_obj = Position(
symbol=symbol,
side=side,
quantity=size,
entry_price=entry_price,
entry_time=datetime.now()
)
self.positions[symbol] = pos_obj
logger.info(f"POSITION SYNC: Added {side} position for {symbol}: {size} @ ${entry_price:.2f}")
return True
except Exception as e:
logger.error(f"Error syncing single position for {symbol}: {e}")
return False
def close_all_positions(self):
"""Emergency close all positions - both local and exchange"""
logger.warning("CLOSE ALL POSITIONS: Starting emergency position closure")
positions_closed = 0
# Get all positions to close (local + exchange)
positions_to_close = set()
# Add local positions
for symbol in self.positions.keys():
positions_to_close.add(symbol)
# Add exchange positions if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for pos in exchange_positions:
symbol = pos.get('symbol', '').replace('USDT', '/USDT')
size = float(pos.get('size', 0))
if size > 0:
positions_to_close.add(symbol)
except Exception as e:
logger.error(f"Error getting exchange positions for closure: {e}")
# Close all positions
for symbol in positions_to_close:
try:
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'LONG':
if self._close_long_position(symbol, 1.0, position.entry_price):
positions_closed += 1
elif position.side == 'SHORT':
if self._close_short_position(symbol, 1.0, position.entry_price):
positions_closed += 1
else:
logger.warning(f"Position {symbol} found on exchange but not locally - manual intervention needed")
except Exception as e:
logger.error(f"Error closing position {symbol}: {e}")
logger.warning(f"CLOSE ALL POSITIONS: Closed {positions_closed} positions")
return positions_closed
def _safe_exchange_call(self, method_name: str, *args, **kwargs):
"""Safely call exchange methods with null checking"""
if not self.exchange:
@ -374,6 +479,27 @@ class TradingExecutor:
if action == 'HOLD':
return True
# PERIODIC POSITION SYNC: Every 10th signal execution, sync positions from exchange to prevent desync
if not hasattr(self, '_signal_count'):
self._signal_count = 0
self._signal_count += 1
if self._signal_count % 10 == 0 and not self.simulation_mode and self.exchange:
logger.debug(f"PERIODIC SYNC: Checking position sync for {symbol} (signal #{self._signal_count})")
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
size = float(pos.get('size', 0))
if size > 0 and symbol not in self.positions:
logger.warning(f"DESYNC DETECTED: Found position on exchange but not locally for {symbol}")
self._sync_single_position_from_exchange(symbol, pos)
elif symbol in self.positions:
logger.warning(f"DESYNC DETECTED: Have local position but none on exchange for {symbol}")
# Consider removing local position or investigating further
except Exception as e:
logger.debug(f"Error in periodic position sync: {e}")
# Check safety conditions
if not self._check_safety_conditions(symbol, action):
return False
@ -866,17 +992,33 @@ class TradingExecutor:
return True
def _execute_buy(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a buy order"""
# Check if we have a short position to close
"""Execute a buy order with enhanced position management"""
# CRITICAL: Check for existing positions (both local and exchange)
if symbol in self.positions:
position = self.positions[symbol]
if position.side == 'SHORT':
logger.info(f"Closing SHORT position in {symbol}")
return self._close_short_position(symbol, confidence, current_price)
else:
logger.info(f"Already have LONG position in {symbol}")
logger.warning(f"POSITION SAFETY: Already have LONG position in {symbol} - blocking duplicate trade")
return False
# ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
if float(pos.get('size', 0)) > 0:
logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking duplicate trade")
logger.warning(f"Position details: {pos}")
# Sync this position to local state
self._sync_single_position_from_exchange(symbol, pos)
return False
except Exception as e:
logger.debug(f"Error checking exchange positions for {symbol}: {e}")
# Don't block trade if we can't check - but log it
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
@ -902,6 +1044,12 @@ class TradingExecutor:
else:
# Place real order with enhanced error handling
result = self._place_order_with_retry(symbol, 'BUY', 'MARKET', quantity, current_price)
# Check for position check error
if result and 'error' in result and result['error'] == 'existing_position':
logger.error(f"BUY order blocked: {result['message']}")
return False
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
@ -943,7 +1091,27 @@ class TradingExecutor:
return self._execute_short(symbol, confidence, current_price)
def _execute_short(self, symbol: str, confidence: float, current_price: float) -> bool:
"""Execute a short order (sell without holding the asset)"""
"""Execute a short order (sell without holding the asset) with enhanced position management"""
# CRITICAL: Check for any existing positions before opening SHORT
if symbol in self.positions:
logger.warning(f"POSITION SAFETY: Already have position in {symbol} - blocking SHORT trade")
return False
# ADDITIONAL SAFETY: Double-check with exchange if not in simulation mode
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
if float(pos.get('size', 0)) > 0:
logger.warning(f"POSITION SAFETY: Found existing position on exchange for {symbol} - blocking SHORT trade")
logger.warning(f"Position details: {pos}")
# Sync this position to local state
self._sync_single_position_from_exchange(symbol, pos)
return False
except Exception as e:
logger.debug(f"Error checking exchange positions for SHORT {symbol}: {e}")
# Cancel any existing open orders before placing new order
if not self.simulation_mode:
self._cancel_open_orders(symbol)
@ -969,6 +1137,12 @@ class TradingExecutor:
else:
# Place real short order with enhanced error handling
result = self._place_order_with_retry(symbol, 'SELL', 'MARKET', quantity, current_price)
# Check for position check error
if result and 'error' in result and result['error'] == 'existing_position':
logger.error(f"SHORT order blocked: {result['message']}")
return False
if result and 'orderId' in result:
# Use actual fill information if available, otherwise fall back to order parameters
filled_quantity = result.get('executedQty', quantity)
@ -996,6 +1170,25 @@ class TradingExecutor:
def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]:
"""Place order with retry logic for MEXC error handling"""
# FINAL POSITION CHECK: Verify no existing position before placing order
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions(symbol)
if exchange_positions:
for pos in exchange_positions:
size = float(pos.get('size', 0))
if size > 0:
logger.error(f"FINAL POSITION CHECK FAILED: Found existing position for {symbol} before placing order")
logger.error(f"Position details: {pos}")
logger.error(f"Order details: {side} {quantity} @ ${current_price}")
# Sync the position to local state
self._sync_single_position_from_exchange(symbol, pos)
return {'error': 'existing_position', 'message': f'Position already exists for {symbol}'}
except Exception as e:
logger.warning(f"Error in final position check for {symbol}: {e}")
# Continue with order placement if we can't check positions
order_start_time = time.time()
max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout)
@ -1808,7 +2001,27 @@ class TradingExecutor:
# Calculate total current position value
total_position_value = 0.0
# Add existing positions
# ENHANCED: Also check exchange positions to ensure we don't miss any
if not self.simulation_mode and self.exchange:
try:
exchange_positions = self.exchange.get_positions()
if exchange_positions:
for pos in exchange_positions:
symbol = pos.get('symbol', '').replace('USDT', '/USDT')
size = float(pos.get('size', 0))
entry_price = float(pos.get('entry_price', 0))
if size > 0 and symbol:
# Check if this position is also in our local state
if symbol not in self.positions:
logger.warning(f"POSITION LIMIT: Found untracked exchange position for {symbol}: {size} @ ${entry_price:.2f}")
# Add to total even if not in local state
position_value = size * entry_price
total_position_value += position_value
logger.debug(f"Exchange position {symbol}: {size:.6f} @ ${entry_price:.2f} = ${position_value:.2f}")
except Exception as e:
logger.debug(f"Error checking exchange positions for limit: {e}")
# Add existing local positions
for symbol, position in self.positions.items():
# Get current price for the symbol
try: