fixes
This commit is contained in:
@@ -156,6 +156,14 @@ class DataProvider:
|
||||
self.real_time_data = {} # {symbol: {timeframe: deque}}
|
||||
self.current_prices = {} # {symbol: float}
|
||||
|
||||
# Thread-safe data access with RLock (reentrant lock)
|
||||
from threading import RLock
|
||||
self.data_lock = RLock()
|
||||
|
||||
# Catch-up state tracking
|
||||
self.catch_up_in_progress = False
|
||||
self.catch_up_completed = False
|
||||
|
||||
# Live price cache for low-latency price updates
|
||||
self.live_price_cache: Dict[str, Tuple[float, datetime]] = {}
|
||||
self.live_price_cache_ttl = timedelta(milliseconds=500)
|
||||
@@ -583,69 +591,114 @@ class DataProvider:
|
||||
|
||||
logger.info("Initial data load completed")
|
||||
|
||||
# Catch up on missing candles if needed
|
||||
self._catch_up_missing_candles()
|
||||
# Start background candle catch-up with proper locking
|
||||
self._start_background_catch_up()
|
||||
|
||||
def _start_background_catch_up(self):
|
||||
"""
|
||||
Start background candle catch-up with proper thread safety
|
||||
This runs in a separate thread and uses locks to prevent race conditions
|
||||
"""
|
||||
import threading
|
||||
|
||||
def catch_up_worker():
|
||||
# Wait a bit for initial data to settle
|
||||
import time
|
||||
time.sleep(2)
|
||||
|
||||
logger.info("Starting background candle catch-up with thread safety")
|
||||
self._catch_up_missing_candles()
|
||||
logger.info("Background candle catch-up completed")
|
||||
|
||||
catch_up_thread = threading.Thread(
|
||||
target=catch_up_worker,
|
||||
daemon=True,
|
||||
name="CandleCatchUpWorker"
|
||||
)
|
||||
catch_up_thread.start()
|
||||
|
||||
def _catch_up_missing_candles(self):
|
||||
"""
|
||||
Catch up on missing candles at startup
|
||||
Catch up on missing candles at startup with thread-safe locking
|
||||
Fetches up to 1500 candles per timeframe if we're missing data
|
||||
"""
|
||||
logger.info("Checking for missing candles to catch up...")
|
||||
# Mark catch-up as in progress
|
||||
with self.data_lock:
|
||||
if self.catch_up_in_progress:
|
||||
logger.warning("Catch-up already in progress, skipping")
|
||||
return
|
||||
self.catch_up_in_progress = True
|
||||
|
||||
target_candles = 1500 # Target number of candles per timeframe
|
||||
|
||||
for symbol in self.symbols:
|
||||
for timeframe in self.timeframes:
|
||||
try:
|
||||
# Check current candle count
|
||||
current_df = self.cached_data[symbol][timeframe]
|
||||
current_count = len(current_df) if not current_df.empty else 0
|
||||
|
||||
if current_count >= target_candles:
|
||||
logger.debug(f"{symbol} {timeframe}: Already have {current_count} candles (target: {target_candles})")
|
||||
continue
|
||||
|
||||
# Calculate how many candles we need
|
||||
needed = target_candles - current_count
|
||||
logger.info(f"{symbol} {timeframe}: Need {needed} more candles (have {current_count}/{target_candles})")
|
||||
|
||||
# Fetch missing candles
|
||||
# Try Binance first (usually has better historical data)
|
||||
df = self._fetch_from_binance(symbol, timeframe, needed)
|
||||
|
||||
if df is None or df.empty:
|
||||
# Fallback to MEXC
|
||||
logger.debug(f"Binance fetch failed for {symbol} {timeframe}, trying MEXC...")
|
||||
df = self._fetch_from_mexc(symbol, timeframe, needed)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
try:
|
||||
logger.info("Checking for missing candles to catch up...")
|
||||
|
||||
target_candles = 1500 # Target number of candles per timeframe
|
||||
|
||||
for symbol in self.symbols:
|
||||
for timeframe in self.timeframes:
|
||||
try:
|
||||
# Read current count with lock
|
||||
with self.data_lock:
|
||||
current_df = self.cached_data[symbol][timeframe].copy()
|
||||
current_count = len(current_df) if not current_df.empty else 0
|
||||
|
||||
# Merge with existing data
|
||||
if not current_df.empty:
|
||||
combined_df = pd.concat([current_df, df], ignore_index=False)
|
||||
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
|
||||
combined_df = combined_df.sort_index()
|
||||
self.cached_data[symbol][timeframe] = combined_df.tail(target_candles)
|
||||
if current_count >= target_candles:
|
||||
logger.debug(f"{symbol} {timeframe}: Already have {current_count} candles (target: {target_candles})")
|
||||
continue
|
||||
|
||||
# Calculate how many candles we need
|
||||
needed = target_candles - current_count
|
||||
logger.info(f"{symbol} {timeframe}: Need {needed} more candles (have {current_count}/{target_candles})")
|
||||
|
||||
# Fetch missing candles (outside lock - network I/O)
|
||||
# Try Binance first (usually has better historical data)
|
||||
df = self._fetch_from_binance(symbol, timeframe, needed)
|
||||
|
||||
if df is None or df.empty:
|
||||
# Fallback to MEXC
|
||||
logger.debug(f"Binance fetch failed for {symbol} {timeframe}, trying MEXC...")
|
||||
df = self._fetch_from_mexc(symbol, timeframe, needed)
|
||||
|
||||
if df is not None and not df.empty:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
|
||||
# Update cached data with lock
|
||||
with self.data_lock:
|
||||
current_df = self.cached_data[symbol][timeframe]
|
||||
|
||||
# Merge with existing data
|
||||
if not current_df.empty:
|
||||
combined_df = pd.concat([current_df, df], ignore_index=False)
|
||||
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
|
||||
combined_df = combined_df.sort_index()
|
||||
self.cached_data[symbol][timeframe] = combined_df.tail(target_candles)
|
||||
else:
|
||||
self.cached_data[symbol][timeframe] = df.tail(target_candles)
|
||||
|
||||
final_count = len(self.cached_data[symbol][timeframe])
|
||||
|
||||
logger.info(f"✅ {symbol} {timeframe}: Caught up! Now have {final_count} candles")
|
||||
else:
|
||||
self.cached_data[symbol][timeframe] = df.tail(target_candles)
|
||||
|
||||
final_count = len(self.cached_data[symbol][timeframe])
|
||||
logger.info(f"✅ {symbol} {timeframe}: Caught up! Now have {final_count} candles")
|
||||
else:
|
||||
logger.warning(f"❌ {symbol} {timeframe}: Could not fetch historical data from any exchange")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error catching up candles for {symbol} {timeframe}: {e}")
|
||||
|
||||
logger.info("Candle catch-up completed")
|
||||
logger.warning(f"❌ {symbol} {timeframe}: Could not fetch historical data from any exchange")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error catching up candles for {symbol} {timeframe}: {e}")
|
||||
|
||||
logger.info("Candle catch-up completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error in candle catch-up: {e}")
|
||||
finally:
|
||||
# Mark catch-up as complete
|
||||
with self.data_lock:
|
||||
self.catch_up_in_progress = False
|
||||
self.catch_up_completed = True
|
||||
|
||||
def _update_cached_data(self, symbol: str, timeframe: str):
|
||||
"""Update cached data by fetching last 2 candles"""
|
||||
"""Update cached data by fetching last 2 candles with thread-safe locking"""
|
||||
try:
|
||||
# Fetch last 2 candles
|
||||
# Fetch last 2 candles (outside lock - network I/O)
|
||||
df = self._fetch_from_binance(symbol, timeframe, 2)
|
||||
|
||||
if df is None or df.empty:
|
||||
@@ -655,21 +708,24 @@ class DataProvider:
|
||||
# Ensure proper datetime index
|
||||
df = self._ensure_datetime_index(df)
|
||||
|
||||
# Get existing cached data
|
||||
existing_df = self.cached_data[symbol][timeframe]
|
||||
|
||||
if not existing_df.empty:
|
||||
# Merge new data with existing, avoiding duplicates
|
||||
combined_df = pd.concat([existing_df, df], ignore_index=False)
|
||||
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
|
||||
combined_df = combined_df.sort_index()
|
||||
# Update cached data with lock
|
||||
with self.data_lock:
|
||||
existing_df = self.cached_data[symbol][timeframe]
|
||||
|
||||
# Keep only last 1500 candles
|
||||
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
|
||||
else:
|
||||
self.cached_data[symbol][timeframe] = df
|
||||
if not existing_df.empty:
|
||||
# Merge new data with existing, avoiding duplicates
|
||||
combined_df = pd.concat([existing_df, df], ignore_index=False)
|
||||
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
|
||||
combined_df = combined_df.sort_index()
|
||||
|
||||
# Keep only last 1500 candles
|
||||
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
|
||||
else:
|
||||
self.cached_data[symbol][timeframe] = df
|
||||
|
||||
candle_count = len(self.cached_data[symbol][timeframe])
|
||||
|
||||
logger.debug(f"Updated cached data for {symbol} {timeframe}: {len(self.cached_data[symbol][timeframe])} candles")
|
||||
logger.debug(f"Updated cached data for {symbol} {timeframe}: {candle_count} candles")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user