COB info restored, better COB heatmap, restore kill processes
This commit is contained in:
@ -4198,9 +4198,9 @@ class DataProvider:
|
||||
if symbol not in self.cob_data_cache:
|
||||
self.cob_data_cache[symbol] = []
|
||||
|
||||
# Add to cache with max size limit
|
||||
# Add to cache with max size limit (30 minutes of 1s data)
|
||||
self.cob_data_cache[symbol].append(cob_snapshot)
|
||||
if len(self.cob_data_cache[symbol]) > 300: # Keep 5 minutes of 1s data
|
||||
if len(self.cob_data_cache[symbol]) > 1800:
|
||||
self.cob_data_cache[symbol].pop(0)
|
||||
|
||||
# Notify subscribers
|
||||
|
@ -430,17 +430,9 @@ class EnhancedCOBWebSocket:
|
||||
# 3. ticker - 24hr ticker statistics (includes volume)
|
||||
# 4. aggTrade - Aggregated trade data for volume analysis
|
||||
|
||||
# Configure kline stream with timezone offset if specified
|
||||
if self.timezone_offset:
|
||||
kline_stream = f"{ws_symbol}@kline_1s@{self.timezone_offset}"
|
||||
logger.info(f"Using timezone offset {self.timezone_offset} for kline stream")
|
||||
else:
|
||||
kline_stream = f"{ws_symbol}@kline_1s"
|
||||
logger.info("Using UTC timezone for kline stream")
|
||||
|
||||
# Build only supported, critical streams for stability (exclude kline_1s which may be unsupported)
|
||||
streams = [
|
||||
f"{ws_symbol}@depth@{self.update_speed}", # Order book diff depth
|
||||
kline_stream, # 1-second candlesticks (with timezone)
|
||||
f"{ws_symbol}@ticker", # 24hr ticker with volume
|
||||
f"{ws_symbol}@aggTrade" # Aggregated trades
|
||||
]
|
||||
|
@ -74,35 +74,11 @@ class StandardizedDataProvider(DataProvider):
|
||||
logger.info("StandardizedDataProvider initialized with BaseDataInput support")
|
||||
|
||||
def _initialize_cob_provider(self):
|
||||
"""Initialize COB provider for order book data"""
|
||||
"""Unify on parent EnhancedCOBWebSocket. Disable separate COB provider."""
|
||||
try:
|
||||
from .multi_exchange_cob_provider import MultiExchangeCOBProvider, ExchangeConfig, ExchangeType
|
||||
|
||||
# Configure exchanges (focusing on Binance for now)
|
||||
exchange_configs = {
|
||||
'binance': ExchangeConfig(
|
||||
exchange_type=ExchangeType.BINANCE,
|
||||
weight=1.0,
|
||||
enabled=True,
|
||||
websocket_url="wss://stream.binance.com:9443/ws/",
|
||||
symbols_mapping={symbol: symbol.replace('/', '').lower() for symbol in self.symbols}
|
||||
),
|
||||
# CoinAPI REST for supplemental depth snapshots (merged with WS streams)
|
||||
'coinapi': ExchangeConfig(
|
||||
exchange_type=ExchangeType.COINAPI,
|
||||
weight=0.6,
|
||||
enabled=True,
|
||||
rest_api_url="https://rest.coinapi.io/v1/",
|
||||
symbols_mapping={symbol: symbol.replace('/', '_').replace('USDT', 'USD') for symbol in self.symbols},
|
||||
rate_limits={"min_interval_ms": 500}
|
||||
)
|
||||
}
|
||||
|
||||
self.cob_provider = MultiExchangeCOBProvider(self.symbols, exchange_configs)
|
||||
logger.info("COB provider initialized successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize COB provider: {e}")
|
||||
self.cob_provider = None
|
||||
logger.info("Using unified EnhancedCOBWebSocket from DataProvider; external COB provider disabled")
|
||||
except Exception:
|
||||
self.cob_provider = None
|
||||
|
||||
def get_base_data_input(self, symbol: str, timestamp: Optional[datetime] = None) -> Optional[BaseDataInput]:
|
||||
@ -445,10 +421,7 @@ class StandardizedDataProvider(DataProvider):
|
||||
if hasattr(super(), 'start_real_time_processing'):
|
||||
super().start_real_time_processing()
|
||||
|
||||
# Start COB provider if available
|
||||
if self.cob_provider:
|
||||
import asyncio
|
||||
asyncio.create_task(self.cob_provider.start_streaming())
|
||||
# Unified: COB streaming handled by parent DataProvider.start_cob_collection()
|
||||
|
||||
logger.info("Started real-time processing for standardized data")
|
||||
|
||||
@ -458,10 +431,7 @@ class StandardizedDataProvider(DataProvider):
|
||||
def stop_real_time_processing(self):
|
||||
"""Stop real-time processing"""
|
||||
try:
|
||||
# Stop COB provider if available
|
||||
if self.cob_provider:
|
||||
import asyncio
|
||||
asyncio.create_task(self.cob_provider.stop_streaming())
|
||||
# Unified: No separate COB provider to stop
|
||||
|
||||
# Stop parent class processing
|
||||
if hasattr(super(), 'stop_real_time_processing'):
|
||||
|
Reference in New Issue
Block a user