From b3edd21f1b6f8cee8689175a2af6898bb147e1c9 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Thu, 24 Jul 2025 14:28:28 +0300 Subject: [PATCH] cnn training stats on dash --- config.yaml | 8 +++ core/cob_integration.py | 4 +- core/config.py | 22 ++++++++ core/data_provider.py | 117 +++++++++++++++++++++++++++++++++------- web/clean_dashboard.py | 4 +- 5 files changed, 131 insertions(+), 24 deletions(-) diff --git a/config.yaml b/config.yaml index b6121d5..43bd547 100644 --- a/config.yaml +++ b/config.yaml @@ -6,6 +6,14 @@ system: log_level: "INFO" # DEBUG, INFO, WARNING, ERROR session_timeout: 3600 # Session timeout in seconds +# Cold Start Mode Configuration +cold_start: + enabled: true # Enable cold start mode logic + inference_interval: 0.5 # Inference interval (seconds) during cold start + training_interval: 2 # Training interval (seconds) during cold start + heavy_adjustments: true # Allow more aggressive parameter/training adjustments + log_cold_start: true # Log when in cold start mode + # Exchange Configuration exchanges: primary: "bybit" # Primary exchange: mexc, deribit, binance, bybit diff --git a/core/cob_integration.py b/core/cob_integration.py index d14f6e4..29c5f16 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -25,8 +25,8 @@ import math from collections import defaultdict from .multi_exchange_cob_provider import MultiExchangeCOBProvider, COBSnapshot, ConsolidatedOrderBookLevel -from .data_provider import DataProvider, MarketTick from .enhanced_cob_websocket import EnhancedCOBWebSocket +# Import DataProvider and MarketTick only when needed to avoid circular import logger = logging.getLogger(__name__) @@ -35,7 +35,7 @@ class COBIntegration: Integration layer for Multi-Exchange COB data with gogo2 trading system """ - def __init__(self, data_provider: Optional[DataProvider] = None, symbols: Optional[List[str]] = None): + def __init__(self, data_provider: Optional['DataProvider'] = None, symbols: Optional[List[str]] = None): """ Initialize COB Integration diff --git a/core/config.py b/core/config.py index 3696872..2158236 100644 --- a/core/config.py +++ b/core/config.py @@ -124,6 +124,15 @@ class Config: 'epochs': 100, 'validation_split': 0.2, 'early_stopping_patience': 10 + }, + 'cold_start': { + 'enabled': True, + 'min_ticks': 100, + 'min_candles': 100, + 'inference_interval': 0.5, + 'training_interval': 2, + 'heavy_adjustments': True, + 'log_cold_start': True } } @@ -210,6 +219,19 @@ class Config: 'early_stopping_patience': self._config.get('training', {}).get('early_stopping_patience', 10) } + @property + def cold_start(self) -> Dict[str, Any]: + """Get cold start mode settings""" + return self._config.get('cold_start', { + 'enabled': True, + 'min_ticks': 100, + 'min_candles': 100, + 'inference_interval': 0.5, + 'training_interval': 2, + 'heavy_adjustments': True, + 'log_cold_start': True + }) + def get(self, key: str, default: Any = None) -> Any: """Get configuration value by key with optional default""" return self._config.get(key, default) diff --git a/core/data_provider.py b/core/data_provider.py index a3634bc..e8d339c 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -30,12 +30,14 @@ from dataclasses import dataclass, field import ta from threading import Thread, Lock from collections import deque +import math from .config import get_config from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar from .cnn_monitor import log_cnn_prediction from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket +from .cob_integration import COBIntegration logger = logging.getLogger(__name__) @@ -203,6 +205,7 @@ class DataProvider: self._load_all_pivot_bounds() # Centralized data collection for models and dashboard + self.cob_integration = COBIntegration(data_provider=self, symbols=self.symbols) self.cob_data_cache: Dict[str, deque] = {} # COB data for models self.training_data_cache: Dict[str, deque] = {} # Training data for models self.model_data_subscribers: Dict[str, List[Callable]] = {} # Model-specific data callbacks @@ -229,6 +232,11 @@ class DataProvider: self.training_data_collection_active = False self.training_data_thread = None + # Price-level bucketing + self.bucketed_cob_data: Dict[str, Dict] = {} + self.bucket_sizes = [1, 10] # $1 and $10 buckets + self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes} + logger.info(f"DataProvider initialized for symbols: {self.symbols}") logger.info(f"Timeframes: {self.timeframes}") logger.info("Centralized data distribution enabled") @@ -242,6 +250,21 @@ class DataProvider: self.retry_delay = 60 # 1 minute retry delay for 451 errors self.max_retries = 3 + # Start COB integration + self.start_cob_integration() + + def start_cob_integration(self): + """Starts the COB integration in a background thread.""" + cob_thread = Thread(target=self._run_cob_integration, daemon=True) + cob_thread.start() + + def _run_cob_integration(self): + """Runs the asyncio event loop for COB integration.""" + try: + asyncio.run(self.cob_integration.start()) + except Exception as e: + logger.error(f"Error running COB integration: {e}") + def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame: """Ensure dataframe has proper datetime index""" if df is None or df.empty: @@ -1397,30 +1420,14 @@ class DataProvider: logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}") async def start_real_time_streaming(self): - """Start real-time data streaming using Enhanced COB WebSocket""" + """Start real-time data streaming using COBIntegration""" if self.is_streaming: logger.warning("Real-time streaming already active") return self.is_streaming = True - logger.info("Starting Enhanced COB WebSocket streaming") - - try: - # Initialize Enhanced COB WebSocket - self.enhanced_cob_websocket = await get_enhanced_cob_websocket( - symbols=self.symbols, - dashboard_callback=self._on_websocket_status_update - ) - - # Add COB data callback - self.enhanced_cob_websocket.add_cob_callback(self._on_enhanced_cob_data) - - logger.info("Enhanced COB WebSocket streaming started successfully") - - except Exception as e: - logger.error(f"Error starting Enhanced COB WebSocket: {e}") - # Fallback to old WebSocket method - await self._start_fallback_websocket_streaming() + logger.info("Starting real-time streaming via COBIntegration") + # COBIntegration is started in the constructor async def stop_real_time_streaming(self): """Stop real-time data streaming""" @@ -1430,6 +1437,14 @@ class DataProvider: logger.info("Stopping Enhanced COB WebSocket streaming") self.is_streaming = False + # Stop COB Integration + if self.cob_integration: + try: + await self.cob_integration.stop() + logger.info("COB Integration stopped") + except Exception as e: + logger.error(f"Error stopping COB Integration: {e}") + # Stop Enhanced COB WebSocket if self.enhanced_cob_websocket: try: @@ -1453,6 +1468,7 @@ class DataProvider: async def _on_enhanced_cob_data(self, symbol: str, cob_data: Dict): """Handle COB data from Enhanced WebSocket""" try: + # This method will now be called by COBIntegration # Ensure cob_websocket_data is initialized if not hasattr(self, 'cob_websocket_data'): self.cob_websocket_data = {} @@ -1460,6 +1476,9 @@ class DataProvider: # Store the latest COB data self.cob_websocket_data[symbol] = cob_data + # Trigger bucketing + self._update_price_buckets(symbol, cob_data) + # Ensure cob_data_cache is initialized if not hasattr(self, 'cob_data_cache'): self.cob_data_cache = {} @@ -4158,4 +4177,62 @@ class DataProvider: 'training_samples': len(self.training_data_cache.get(binance_symbol, [])) } - return summary \ No newline at end of file + return summary + + def _update_price_buckets(self, symbol: str, cob_data: Dict): + """Update price-level buckets based on new COB data.""" + try: + bids = cob_data.get('bids', []) + asks = cob_data.get('asks', []) + + for size in self.bucket_sizes: + bid_buckets = self._calculate_buckets(bids, size) + ask_buckets = self._calculate_buckets(asks, size) + + bucketed_data = { + 'symbol': symbol, + 'timestamp': datetime.now(), + 'bucket_size': size, + 'bids': bid_buckets, + 'asks': ask_buckets + } + + if symbol not in self.bucketed_cob_data: + self.bucketed_cob_data[symbol] = {} + self.bucketed_cob_data[symbol][size] = bucketed_data + + # Distribute to subscribers + self._distribute_bucketed_data(symbol, size, bucketed_data) + + except Exception as e: + logger.error(f"Error updating price buckets for {symbol}: {e}") + + def _calculate_buckets(self, levels: List[Dict], bucket_size: int) -> Dict[float, float]: + """Calculates aggregated volume for price buckets.""" + buckets = {} + for level in levels: + price = level.get('price', 0) + volume = level.get('volume', 0) + if price > 0 and volume > 0: + bucket = math.floor(price / bucket_size) * bucket_size + if bucket not in buckets: + buckets[bucket] = 0 + buckets[bucket] += volume + return buckets + + def subscribe_to_bucketed_cob(self, bucket_size: int, callback: Callable): + """Subscribe to bucketed COB data.""" + if bucket_size in self.bucketed_cob_callbacks: + self.bucketed_cob_callbacks[bucket_size].append(callback) + logger.info(f"New subscriber for ${bucket_size} bucketed COB data.") + else: + logger.warning(f"Bucket size {bucket_size} not supported.") + + def _distribute_bucketed_data(self, symbol: str, bucket_size: int, data: Dict): + """Distribute bucketed data to subscribers.""" + if bucket_size in self.bucketed_cob_callbacks: + for callback in self.bucketed_cob_callbacks[bucket_size]: + try: + callback(symbol, data) + except Exception as e: + logger.error(f"Error in bucketed COB callback: {e}") \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index 1ad71b9..8731bdf 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -5859,7 +5859,7 @@ class CleanTradingDashboard: from core.data_models import OHLCVBar # Get data from data provider - df = self.data_provider.get_candles(symbol, timeframe) + df = self.data_provider.get_historical_data(symbol, timeframe) if df is None or len(df) == 0: return [] @@ -6106,7 +6106,7 @@ class CleanTradingDashboard: def _get_recent_price_history(self, symbol: str, count: int) -> List[float]: """Get recent price history for reward calculation""" try: - df = self.data_provider.get_candles(symbol, '1s') + df = self.data_provider.get_historical_data(symbol, '1s') if df is None or len(df) == 0: return []