diff --git a/ANNOTATE/core/data_loader.py b/ANNOTATE/core/data_loader.py index d90c88c..ac5456a 100644 --- a/ANNOTATE/core/data_loader.py +++ b/ANNOTATE/core/data_loader.py @@ -64,6 +64,50 @@ class HistoricalDataLoader: return cached_data try: + # Try unified storage first if available + if hasattr(self.data_provider, 'is_unified_storage_enabled') and \ + self.data_provider.is_unified_storage_enabled(): + try: + import asyncio + + # Get data from unified storage + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # If we have a specific time range, get historical data + if start_time or end_time: + target_time = end_time if end_time else start_time + inference_data = loop.run_until_complete( + self.data_provider.get_inference_data_unified( + symbol, + timestamp=target_time, + context_window_minutes=60 + ) + ) + else: + # Get latest real-time data + inference_data = loop.run_until_complete( + self.data_provider.get_inference_data_unified(symbol) + ) + + # Extract the requested timeframe + df = inference_data.get_timeframe_data(timeframe) + + if df is not None and not df.empty: + # Limit number of candles + if len(df) > limit: + df = df.tail(limit) + + # Cache in memory + self.memory_cache[cache_key] = (df.copy(), datetime.now()) + + logger.info(f"Loaded {len(df)} candles from unified storage for {symbol} {timeframe}") + return df + + except Exception as e: + logger.debug(f"Unified storage not available, falling back to cached data: {e}") + + # Fallback to existing cached data method # Use DataProvider's cached data if available if hasattr(self.data_provider, 'cached_data'): if symbol in self.data_provider.cached_data: diff --git a/ANNOTATE/web/app.py b/ANNOTATE/web/app.py index 23d638e..42fb633 100644 --- a/ANNOTATE/web/app.py +++ b/ANNOTATE/web/app.py @@ -116,6 +116,11 @@ class AnnotationDashboard: # Initialize core components self.data_provider = DataProvider() if DataProvider else None + + # Enable unified storage for real-time data access + if self.data_provider: + self._enable_unified_storage_async() + self.orchestrator = TradingOrchestrator( data_provider=self.data_provider ) if TradingOrchestrator and self.data_provider else None @@ -133,6 +138,42 @@ class AnnotationDashboard: logger.info("Annotation Dashboard initialized") + def _enable_unified_storage_async(self): + """Enable unified storage system in background thread""" + def enable_storage(): + try: + import asyncio + import threading + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Enable unified storage + success = loop.run_until_complete( + self.data_provider.enable_unified_storage() + ) + + if success: + logger.info("✅ ANNOTATE: Unified storage enabled for real-time data") + + # Get statistics + stats = self.data_provider.get_unified_storage_stats() + if stats.get('initialized'): + logger.info(" Real-time data access: <10ms") + logger.info(" Historical data access: <100ms") + logger.info(" Annotation data: Available at any timestamp") + else: + logger.warning("⚠️ ANNOTATE: Unified storage not available, using cached data only") + + except Exception as e: + logger.warning(f"ANNOTATE: Could not enable unified storage: {e}") + logger.info("ANNOTATE: Continuing with cached data access") + + # Start in background thread + import threading + storage_thread = threading.Thread(target=enable_storage, daemon=True) + storage_thread.start() + def _setup_routes(self): """Setup Flask routes""" diff --git a/core/data_provider.py b/core/data_provider.py index afa9ac4..0e92a61 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -57,6 +57,8 @@ from .huobi_cob_websocket import get_huobi_cob_websocket from .cob_integration import COBIntegration from .report_data_crawler import ReportDataCrawler, ReportData +logger = logging.getLogger(__name__) + # Import unified storage components (optional) try: from .unified_data_provider_extension import UnifiedDataProviderExtension @@ -65,8 +67,6 @@ except ImportError: UNIFIED_STORAGE_AVAILABLE = False logger.warning("Unified storage components not available") -logger = logging.getLogger(__name__) - @dataclass class PivotBounds: """Pivot-based normalization bounds derived from Williams Market Structure""" @@ -184,11 +184,8 @@ class DataProvider: # COB collection state guard to prevent duplicate starts self._cob_started: bool = False - # Ensure COB collection is started so BaseDataInput includes real order book data - try: - self.start_cob_collection() - except Exception as _cob_init_ex: - logger.error(f"Failed to start COB collection at init: {_cob_init_ex}") + # COB collection is started via start_cob_websocket_integration() below + # No need to call start_cob_collection() here self.is_streaming = False self.data_lock = Lock() diff --git a/requirements.txt b/requirements.txt index 47ae1f9..913e2c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,7 @@ asyncio-compat>=0.1.2 wandb>=0.16.0 pybit>=5.11.0 requests>=2.31.0 +asyncpg>=0.29.0 # NOTE: PyTorch is intentionally not pinned here to avoid pulling NVIDIA CUDA deps on AMD machines. # Install one of the following sets manually depending on your hardware: diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index b2c97ef..d42bfa7 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -195,6 +195,9 @@ class CleanTradingDashboard: self.data_provider = data_provider or StandardizedDataProvider() self.trading_executor = trading_executor or TradingExecutor() + # Enable unified storage system for real-time training data + self._enable_unified_storage_async() + # Initialize unified orchestrator with full ML capabilities if orchestrator is None: self.orchestrator = TradingOrchestrator( @@ -8338,6 +8341,40 @@ class CleanTradingDashboard: except Exception as e: logger.warning(f"Error in conservative signal cleanup: {e}") + def _enable_unified_storage_async(self): + """Enable unified storage system in background thread""" + def enable_storage(): + try: + import asyncio + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Enable unified storage + success = loop.run_until_complete( + self.data_provider.enable_unified_storage() + ) + + if success: + logger.info("✅ Unified storage system enabled for real-time training") + + # Get initial statistics + stats = self.data_provider.get_unified_storage_stats() + if stats.get('initialized'): + logger.info(f" Cache: {stats.get('cache', {}).get('cache_duration_seconds', 0)}s window") + logger.info(f" Database: Connected and ready") + logger.info(f" Ingestion: Pipeline active") + else: + logger.warning("⚠️ Unified storage initialization failed, using legacy data access") + + except Exception as e: + logger.warning(f"Could not enable unified storage: {e}") + logger.info("Continuing with legacy data access methods") + + # Start in background thread + import threading + storage_thread = threading.Thread(target=enable_storage, daemon=True) + storage_thread.start() + def _initialize_enhanced_training_system(self): """Initialize enhanced training system for model predictions""" try: