diff --git a/core/trading_executor.py b/core/trading_executor.py index d0256f5..9147a4a 100644 --- a/core/trading_executor.py +++ b/core/trading_executor.py @@ -14,7 +14,9 @@ import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass -from threading import Lock +from threading import Lock, RLock +import threading +import time import sys # Add NN directory to path for exchange interfaces @@ -111,8 +113,9 @@ class TradingExecutor: # Legacy compatibility (deprecated) self.dry_run = self.simulation_mode - # Thread safety - self.lock = Lock() + # Thread safety with timeout support + self.lock = RLock() + self.lock_timeout = 10.0 # 10 second timeout for order execution # Connect to exchange if self.trading_enabled: @@ -252,20 +255,37 @@ class TradingExecutor: logger.debug(f"SIMULATION MODE: Skipping balance check for {symbol} {action} - allowing trade for model training") # --- End Balance check --- - with self.lock: - try: - if action == 'BUY': - return self._execute_buy(symbol, confidence, current_price) - elif action == 'SELL': - return self._execute_sell(symbol, confidence, current_price) - elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal - return self._execute_short(symbol, confidence, current_price) - else: - logger.warning(f"Unknown action: {action}") - return False - except Exception as e: - logger.error(f"Error executing {action} signal for {symbol}: {e}") - return False + # Try to acquire lock with timeout to prevent hanging + lock_acquired = self.lock.acquire(timeout=self.lock_timeout) + if not lock_acquired: + logger.error(f"TIMEOUT: Failed to acquire lock within {self.lock_timeout}s for {action} {symbol}") + logger.error(f"This indicates another operation is hanging. Cancelling {action} order.") + return False + + try: + logger.info(f"LOCK ACQUIRED: Executing {action} for {symbol}") + start_time = time.time() + + if action == 'BUY': + result = self._execute_buy(symbol, confidence, current_price) + elif action == 'SELL': + result = self._execute_sell(symbol, confidence, current_price) + elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal + result = self._execute_short(symbol, confidence, current_price) + else: + logger.warning(f"Unknown action: {action}") + result = False + + execution_time = time.time() - start_time + logger.info(f"EXECUTION COMPLETE: {action} for {symbol} took {execution_time:.2f}s, result: {result}") + return result + + except Exception as e: + logger.error(f"Error executing {action} signal for {symbol}: {e}") + return False + finally: + self.lock.release() + logger.debug(f"LOCK RELEASED: {action} for {symbol}") def _cancel_open_orders(self, symbol: str) -> bool: """Cancel all open orders for a symbol before placing new orders""" @@ -463,7 +483,16 @@ class TradingExecutor: def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]: """Place order with retry logic for MEXC error handling""" + order_start_time = time.time() + max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout) + for attempt in range(max_retries): + # Check if we've exceeded the maximum order time + elapsed_time = time.time() - order_start_time + if elapsed_time > max_order_time: + logger.error(f"ORDER TIMEOUT: Order placement exceeded {max_order_time}s limit for {side} {symbol}") + logger.error(f"Elapsed time: {elapsed_time:.2f}s, cancelling order to prevent lock hanging") + return {} try: result = self.exchange.place_order(symbol, side, order_type, quantity, current_price) @@ -478,10 +507,18 @@ class TradingExecutor: logger.warning(f"Error: {error_msg}") if attempt < max_retries - 1: - # Wait with exponential backoff - wait_time = result.get('retry_after', 60) * (2 ** attempt) - logger.info(f"Waiting {wait_time} seconds before retry due to oversold condition...") - time.sleep(wait_time) + # Calculate wait time but respect timeout limit + suggested_wait = result.get('retry_after', 60) * (2 ** attempt) + elapsed_time = time.time() - order_start_time + remaining_time = max_order_time - elapsed_time + + if suggested_wait > remaining_time: + logger.warning(f"Oversold retry wait ({suggested_wait}s) exceeds timeout limit") + logger.warning(f"Remaining time: {remaining_time:.2f}s, skipping retry to prevent hanging") + return {} + + logger.info(f"Waiting {suggested_wait} seconds before retry due to oversold condition...") + time.sleep(suggested_wait) # Reduce quantity for next attempt to avoid oversold quantity = quantity * 0.8 # Reduce by 20% @@ -502,7 +539,15 @@ class TradingExecutor: else: logger.error(f"MEXC API error: {error_code} - {error_msg}") if attempt < max_retries - 1: - time.sleep(5 * (attempt + 1)) # Wait 5, 10, 15 seconds + wait_time = 5 * (attempt + 1) # Wait 5, 10, 15 seconds + elapsed_time = time.time() - order_start_time + remaining_time = max_order_time - elapsed_time + + if wait_time > remaining_time: + logger.warning(f"API error retry wait ({wait_time}s) exceeds timeout, cancelling") + return {} + + time.sleep(wait_time) continue else: return {} @@ -516,7 +561,15 @@ class TradingExecutor: else: logger.warning(f"Empty result on attempt {attempt + 1}/{max_retries}") if attempt < max_retries - 1: - time.sleep(2 * (attempt + 1)) # Wait 2, 4, 6 seconds + wait_time = 2 * (attempt + 1) # Wait 2, 4, 6 seconds + elapsed_time = time.time() - order_start_time + remaining_time = max_order_time - elapsed_time + + if wait_time > remaining_time: + logger.warning(f"Empty result retry wait ({wait_time}s) exceeds timeout, cancelling") + return {} + + time.sleep(wait_time) continue else: return {} @@ -524,7 +577,15 @@ class TradingExecutor: except Exception as e: logger.error(f"Exception on order attempt {attempt + 1}/{max_retries}: {e}") if attempt < max_retries - 1: - time.sleep(3 * (attempt + 1)) # Wait 3, 6, 9 seconds + wait_time = 3 * (attempt + 1) # Wait 3, 6, 9 seconds + elapsed_time = time.time() - order_start_time + remaining_time = max_order_time - elapsed_time + + if wait_time > remaining_time: + logger.warning(f"Exception retry wait ({wait_time}s) exceeds timeout, cancelling") + return {} + + time.sleep(wait_time) continue else: return {}