cnn training stats on dash
This commit is contained in:
@ -30,12 +30,14 @@ from dataclasses import dataclass, field
|
||||
import ta
|
||||
from threading import Thread, Lock
|
||||
from collections import deque
|
||||
import math
|
||||
|
||||
from .config import get_config
|
||||
from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
|
||||
from .cnn_monitor import log_cnn_prediction
|
||||
from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel
|
||||
from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket
|
||||
from .cob_integration import COBIntegration
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -203,6 +205,7 @@ class DataProvider:
|
||||
self._load_all_pivot_bounds()
|
||||
|
||||
# Centralized data collection for models and dashboard
|
||||
self.cob_integration = COBIntegration(data_provider=self, symbols=self.symbols)
|
||||
self.cob_data_cache: Dict[str, deque] = {} # COB data for models
|
||||
self.training_data_cache: Dict[str, deque] = {} # Training data for models
|
||||
self.model_data_subscribers: Dict[str, List[Callable]] = {} # Model-specific data callbacks
|
||||
@ -229,6 +232,11 @@ class DataProvider:
|
||||
self.training_data_collection_active = False
|
||||
self.training_data_thread = None
|
||||
|
||||
# Price-level bucketing
|
||||
self.bucketed_cob_data: Dict[str, Dict] = {}
|
||||
self.bucket_sizes = [1, 10] # $1 and $10 buckets
|
||||
self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes}
|
||||
|
||||
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
||||
logger.info(f"Timeframes: {self.timeframes}")
|
||||
logger.info("Centralized data distribution enabled")
|
||||
@ -242,6 +250,21 @@ class DataProvider:
|
||||
self.retry_delay = 60 # 1 minute retry delay for 451 errors
|
||||
self.max_retries = 3
|
||||
|
||||
# Start COB integration
|
||||
self.start_cob_integration()
|
||||
|
||||
def start_cob_integration(self):
|
||||
"""Starts the COB integration in a background thread."""
|
||||
cob_thread = Thread(target=self._run_cob_integration, daemon=True)
|
||||
cob_thread.start()
|
||||
|
||||
def _run_cob_integration(self):
|
||||
"""Runs the asyncio event loop for COB integration."""
|
||||
try:
|
||||
asyncio.run(self.cob_integration.start())
|
||||
except Exception as e:
|
||||
logger.error(f"Error running COB integration: {e}")
|
||||
|
||||
def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""Ensure dataframe has proper datetime index"""
|
||||
if df is None or df.empty:
|
||||
@ -1397,30 +1420,14 @@ class DataProvider:
|
||||
logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}")
|
||||
|
||||
async def start_real_time_streaming(self):
|
||||
"""Start real-time data streaming using Enhanced COB WebSocket"""
|
||||
"""Start real-time data streaming using COBIntegration"""
|
||||
if self.is_streaming:
|
||||
logger.warning("Real-time streaming already active")
|
||||
return
|
||||
|
||||
self.is_streaming = True
|
||||
logger.info("Starting Enhanced COB WebSocket streaming")
|
||||
|
||||
try:
|
||||
# Initialize Enhanced COB WebSocket
|
||||
self.enhanced_cob_websocket = await get_enhanced_cob_websocket(
|
||||
symbols=self.symbols,
|
||||
dashboard_callback=self._on_websocket_status_update
|
||||
)
|
||||
|
||||
# Add COB data callback
|
||||
self.enhanced_cob_websocket.add_cob_callback(self._on_enhanced_cob_data)
|
||||
|
||||
logger.info("Enhanced COB WebSocket streaming started successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting Enhanced COB WebSocket: {e}")
|
||||
# Fallback to old WebSocket method
|
||||
await self._start_fallback_websocket_streaming()
|
||||
logger.info("Starting real-time streaming via COBIntegration")
|
||||
# COBIntegration is started in the constructor
|
||||
|
||||
async def stop_real_time_streaming(self):
|
||||
"""Stop real-time data streaming"""
|
||||
@ -1430,6 +1437,14 @@ class DataProvider:
|
||||
logger.info("Stopping Enhanced COB WebSocket streaming")
|
||||
self.is_streaming = False
|
||||
|
||||
# Stop COB Integration
|
||||
if self.cob_integration:
|
||||
try:
|
||||
await self.cob_integration.stop()
|
||||
logger.info("COB Integration stopped")
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping COB Integration: {e}")
|
||||
|
||||
# Stop Enhanced COB WebSocket
|
||||
if self.enhanced_cob_websocket:
|
||||
try:
|
||||
@ -1453,6 +1468,7 @@ class DataProvider:
|
||||
async def _on_enhanced_cob_data(self, symbol: str, cob_data: Dict):
|
||||
"""Handle COB data from Enhanced WebSocket"""
|
||||
try:
|
||||
# This method will now be called by COBIntegration
|
||||
# Ensure cob_websocket_data is initialized
|
||||
if not hasattr(self, 'cob_websocket_data'):
|
||||
self.cob_websocket_data = {}
|
||||
@ -1460,6 +1476,9 @@ class DataProvider:
|
||||
# Store the latest COB data
|
||||
self.cob_websocket_data[symbol] = cob_data
|
||||
|
||||
# Trigger bucketing
|
||||
self._update_price_buckets(symbol, cob_data)
|
||||
|
||||
# Ensure cob_data_cache is initialized
|
||||
if not hasattr(self, 'cob_data_cache'):
|
||||
self.cob_data_cache = {}
|
||||
@ -4158,4 +4177,62 @@ class DataProvider:
|
||||
'training_samples': len(self.training_data_cache.get(binance_symbol, []))
|
||||
}
|
||||
|
||||
return summary
|
||||
return summary
|
||||
|
||||
def _update_price_buckets(self, symbol: str, cob_data: Dict):
|
||||
"""Update price-level buckets based on new COB data."""
|
||||
try:
|
||||
bids = cob_data.get('bids', [])
|
||||
asks = cob_data.get('asks', [])
|
||||
|
||||
for size in self.bucket_sizes:
|
||||
bid_buckets = self._calculate_buckets(bids, size)
|
||||
ask_buckets = self._calculate_buckets(asks, size)
|
||||
|
||||
bucketed_data = {
|
||||
'symbol': symbol,
|
||||
'timestamp': datetime.now(),
|
||||
'bucket_size': size,
|
||||
'bids': bid_buckets,
|
||||
'asks': ask_buckets
|
||||
}
|
||||
|
||||
if symbol not in self.bucketed_cob_data:
|
||||
self.bucketed_cob_data[symbol] = {}
|
||||
self.bucketed_cob_data[symbol][size] = bucketed_data
|
||||
|
||||
# Distribute to subscribers
|
||||
self._distribute_bucketed_data(symbol, size, bucketed_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating price buckets for {symbol}: {e}")
|
||||
|
||||
def _calculate_buckets(self, levels: List[Dict], bucket_size: int) -> Dict[float, float]:
|
||||
"""Calculates aggregated volume for price buckets."""
|
||||
buckets = {}
|
||||
for level in levels:
|
||||
price = level.get('price', 0)
|
||||
volume = level.get('volume', 0)
|
||||
if price > 0 and volume > 0:
|
||||
bucket = math.floor(price / bucket_size) * bucket_size
|
||||
if bucket not in buckets:
|
||||
buckets[bucket] = 0
|
||||
buckets[bucket] += volume
|
||||
return buckets
|
||||
|
||||
def subscribe_to_bucketed_cob(self, bucket_size: int, callback: Callable):
|
||||
"""Subscribe to bucketed COB data."""
|
||||
if bucket_size in self.bucketed_cob_callbacks:
|
||||
self.bucketed_cob_callbacks[bucket_size].append(callback)
|
||||
logger.info(f"New subscriber for ${bucket_size} bucketed COB data.")
|
||||
else:
|
||||
logger.warning(f"Bucket size {bucket_size} not supported.")
|
||||
|
||||
def _distribute_bucketed_data(self, symbol: str, bucket_size: int, data: Dict):
|
||||
"""Distribute bucketed data to subscribers."""
|
||||
if bucket_size in self.bucketed_cob_callbacks:
|
||||
for callback in self.bucketed_cob_callbacks[bucket_size]:
|
||||
try:
|
||||
callback(symbol, data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in bucketed COB callback: {e}")
|
Reference in New Issue
Block a user