orchestrator uses BaseDataInput
This commit is contained in:
@ -161,6 +161,15 @@ class DataProvider:
|
||||
# Enhanced WebSocket integration
|
||||
self.enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None
|
||||
self.websocket_tasks = {}
|
||||
|
||||
# COB collection state guard to prevent duplicate starts
|
||||
self._cob_started: bool = False
|
||||
|
||||
# Ensure COB collection is started so BaseDataInput includes real order book data
|
||||
try:
|
||||
self.start_cob_collection()
|
||||
except Exception as _cob_init_ex:
|
||||
logger.error(f"Failed to start COB collection at init: {_cob_init_ex}")
|
||||
self.is_streaming = False
|
||||
self.data_lock = Lock()
|
||||
|
||||
@ -1133,7 +1142,7 @@ class DataProvider:
|
||||
recent_ticks = self.get_cob_raw_ticks(symbol, count=limit * 10) # Get more ticks than needed
|
||||
|
||||
if not recent_ticks:
|
||||
logger.warning(f"No tick data available for {symbol}, cannot generate 1s candles")
|
||||
logger.debug(f"No tick data available for {symbol}, cannot generate 1s candles")
|
||||
return None
|
||||
|
||||
# Group ticks by second and create OHLCV candles
|
||||
@ -1156,8 +1165,7 @@ class DataProvider:
|
||||
bid_vol = stats.get('bid_volume', 0) or 0
|
||||
ask_vol = stats.get('ask_volume', 0) or 0
|
||||
volume = float(bid_vol) + float(ask_vol)
|
||||
if volume == 0:
|
||||
volume = 1.0 # Minimal placeholder to avoid zero-volume bars
|
||||
# Do not create synthetic volume; keep zero if not available
|
||||
else:
|
||||
continue
|
||||
|
||||
@ -1212,7 +1220,7 @@ class DataProvider:
|
||||
candles.append(current_candle)
|
||||
|
||||
if not candles:
|
||||
logger.warning(f"No valid candles generated for {symbol}")
|
||||
logger.debug(f"No valid candles generated for {symbol}")
|
||||
return None
|
||||
|
||||
# Convert to DataFrame (timestamps remain UTC tz-aware)
|
||||
@ -1251,7 +1259,7 @@ class DataProvider:
|
||||
logger.info(f"Successfully generated 1s candles from WebSocket ticks for {symbol}")
|
||||
return generated_df
|
||||
else:
|
||||
logger.warning(f"Failed to generate 1s candles from ticks for {symbol}, trying Binance API")
|
||||
logger.info(f"Could not generate 1s candles from ticks for {symbol}; trying Binance API")
|
||||
|
||||
# Convert symbol format
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
@ -2673,7 +2681,16 @@ class DataProvider:
|
||||
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||
"""Get latest COB data from Enhanced WebSocket"""
|
||||
try:
|
||||
return self.cob_websocket_data.get(symbol)
|
||||
# First try the websocket data cache
|
||||
if symbol in self.cob_websocket_data and self.cob_websocket_data[symbol]:
|
||||
return self.cob_websocket_data[symbol]
|
||||
|
||||
# Fallback to raw ticks
|
||||
if symbol in self.cob_raw_ticks and len(self.cob_raw_ticks[symbol]) > 0:
|
||||
return self.cob_raw_ticks[symbol][-1] # Get latest raw tick
|
||||
|
||||
# No COB data available
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting latest COB data for {symbol}: {e}")
|
||||
return None
|
||||
@ -4210,18 +4227,23 @@ class DataProvider:
|
||||
Start enhanced COB data collection with WebSocket and raw tick aggregation
|
||||
"""
|
||||
try:
|
||||
# Guard against duplicate starts
|
||||
if getattr(self, "_cob_started", False):
|
||||
return
|
||||
# Initialize COB WebSocket system
|
||||
self._initialize_enhanced_cob_websocket()
|
||||
|
||||
# Start aggregation system
|
||||
self._start_cob_tick_aggregation()
|
||||
|
||||
self._cob_started = True
|
||||
logger.info("Enhanced COB data collection started with WebSocket and tick aggregation")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting enhanced COB collection: {e}")
|
||||
# Fallback to REST-only collection
|
||||
self._start_rest_only_cob_collection()
|
||||
self._cob_started = True
|
||||
|
||||
def _initialize_enhanced_cob_websocket(self):
|
||||
"""Initialize the enhanced COB WebSocket system"""
|
||||
|
Reference in New Issue
Block a user