lock with timeout

This commit is contained in:
Dobromir Popov
2025-07-14 13:03:42 +03:00
parent ab232a1262
commit d7205a9745

View File

@ -14,7 +14,9 @@ import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from threading import Lock
from threading import Lock, RLock
import threading
import time
import sys
# Add NN directory to path for exchange interfaces
@ -111,8 +113,9 @@ class TradingExecutor:
# Legacy compatibility (deprecated)
self.dry_run = self.simulation_mode
# Thread safety
self.lock = Lock()
# Thread safety with timeout support
self.lock = RLock()
self.lock_timeout = 10.0 # 10 second timeout for order execution
# Connect to exchange
if self.trading_enabled:
@ -252,20 +255,37 @@ class TradingExecutor:
logger.debug(f"SIMULATION MODE: Skipping balance check for {symbol} {action} - allowing trade for model training")
# --- End Balance check ---
with self.lock:
try:
if action == 'BUY':
return self._execute_buy(symbol, confidence, current_price)
elif action == 'SELL':
return self._execute_sell(symbol, confidence, current_price)
elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal
return self._execute_short(symbol, confidence, current_price)
else:
logger.warning(f"Unknown action: {action}")
return False
except Exception as e:
logger.error(f"Error executing {action} signal for {symbol}: {e}")
return False
# Try to acquire lock with timeout to prevent hanging
lock_acquired = self.lock.acquire(timeout=self.lock_timeout)
if not lock_acquired:
logger.error(f"TIMEOUT: Failed to acquire lock within {self.lock_timeout}s for {action} {symbol}")
logger.error(f"This indicates another operation is hanging. Cancelling {action} order.")
return False
try:
logger.info(f"LOCK ACQUIRED: Executing {action} for {symbol}")
start_time = time.time()
if action == 'BUY':
result = self._execute_buy(symbol, confidence, current_price)
elif action == 'SELL':
result = self._execute_sell(symbol, confidence, current_price)
elif action == 'SHORT': # Explicitly handle SHORT if it's a direct signal
result = self._execute_short(symbol, confidence, current_price)
else:
logger.warning(f"Unknown action: {action}")
result = False
execution_time = time.time() - start_time
logger.info(f"EXECUTION COMPLETE: {action} for {symbol} took {execution_time:.2f}s, result: {result}")
return result
except Exception as e:
logger.error(f"Error executing {action} signal for {symbol}: {e}")
return False
finally:
self.lock.release()
logger.debug(f"LOCK RELEASED: {action} for {symbol}")
def _cancel_open_orders(self, symbol: str) -> bool:
"""Cancel all open orders for a symbol before placing new orders"""
@ -463,7 +483,16 @@ class TradingExecutor:
def _place_order_with_retry(self, symbol: str, side: str, order_type: str, quantity: float, current_price: float, max_retries: int = 3) -> Dict[str, Any]:
"""Place order with retry logic for MEXC error handling"""
order_start_time = time.time()
max_order_time = 8.0 # Maximum 8 seconds for order placement (leaves 2s buffer for lock timeout)
for attempt in range(max_retries):
# Check if we've exceeded the maximum order time
elapsed_time = time.time() - order_start_time
if elapsed_time > max_order_time:
logger.error(f"ORDER TIMEOUT: Order placement exceeded {max_order_time}s limit for {side} {symbol}")
logger.error(f"Elapsed time: {elapsed_time:.2f}s, cancelling order to prevent lock hanging")
return {}
try:
result = self.exchange.place_order(symbol, side, order_type, quantity, current_price)
@ -478,10 +507,18 @@ class TradingExecutor:
logger.warning(f"Error: {error_msg}")
if attempt < max_retries - 1:
# Wait with exponential backoff
wait_time = result.get('retry_after', 60) * (2 ** attempt)
logger.info(f"Waiting {wait_time} seconds before retry due to oversold condition...")
time.sleep(wait_time)
# Calculate wait time but respect timeout limit
suggested_wait = result.get('retry_after', 60) * (2 ** attempt)
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if suggested_wait > remaining_time:
logger.warning(f"Oversold retry wait ({suggested_wait}s) exceeds timeout limit")
logger.warning(f"Remaining time: {remaining_time:.2f}s, skipping retry to prevent hanging")
return {}
logger.info(f"Waiting {suggested_wait} seconds before retry due to oversold condition...")
time.sleep(suggested_wait)
# Reduce quantity for next attempt to avoid oversold
quantity = quantity * 0.8 # Reduce by 20%
@ -502,7 +539,15 @@ class TradingExecutor:
else:
logger.error(f"MEXC API error: {error_code} - {error_msg}")
if attempt < max_retries - 1:
time.sleep(5 * (attempt + 1)) # Wait 5, 10, 15 seconds
wait_time = 5 * (attempt + 1) # Wait 5, 10, 15 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"API error retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
@ -516,7 +561,15 @@ class TradingExecutor:
else:
logger.warning(f"Empty result on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
time.sleep(2 * (attempt + 1)) # Wait 2, 4, 6 seconds
wait_time = 2 * (attempt + 1) # Wait 2, 4, 6 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Empty result retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}
@ -524,7 +577,15 @@ class TradingExecutor:
except Exception as e:
logger.error(f"Exception on order attempt {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
time.sleep(3 * (attempt + 1)) # Wait 3, 6, 9 seconds
wait_time = 3 * (attempt + 1) # Wait 3, 6, 9 seconds
elapsed_time = time.time() - order_start_time
remaining_time = max_order_time - elapsed_time
if wait_time > remaining_time:
logger.warning(f"Exception retry wait ({wait_time}s) exceeds timeout, cancelling")
return {}
time.sleep(wait_time)
continue
else:
return {}