#!/usr/bin/env python3 """ Test Universal Data Stream Integration with Dashboard This script validates that: 1. CleanTradingDashboard properly subscribes to UnifiedDataStream 2. All 5 timeseries are properly received and processed 3. Data flows correctly from provider -> adapter -> stream -> dashboard 4. Consumer callback functions work as expected """ import asyncio import logging import sys import time from pathlib import Path # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from core.config import get_config from core.data_provider import DataProvider from core.enhanced_orchestrator import EnhancedTradingOrchestrator from core.trading_executor import TradingExecutor from web.clean_dashboard import CleanTradingDashboard # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def test_universal_stream_integration(): """Test Universal Data Stream integration with dashboard""" logger.info("="*80) logger.info("๐Ÿงช TESTING UNIVERSAL DATA STREAM INTEGRATION") logger.info("="*80) try: # Initialize components logger.info("\n๐Ÿ“ฆ STEP 1: Initialize Components") logger.info("-" * 40) config = get_config() data_provider = DataProvider() orchestrator = EnhancedTradingOrchestrator( data_provider=data_provider, symbols=['ETH/USDT', 'BTC/USDT'], enhanced_rl_training=True ) trading_executor = TradingExecutor() logger.info("โœ… Core components initialized") # Initialize dashboard with Universal Data Stream logger.info("\n๐Ÿ“Š STEP 2: Initialize Dashboard with Universal Stream") logger.info("-" * 40) dashboard = CleanTradingDashboard( data_provider=data_provider, orchestrator=orchestrator, trading_executor=trading_executor ) # Check Universal Stream initialization if hasattr(dashboard, 'unified_stream') and dashboard.unified_stream: logger.info("โœ… Universal Data Stream initialized successfully") logger.info(f"๐Ÿ“‹ Consumer ID: {dashboard.stream_consumer_id}") else: logger.error("โŒ Universal Data Stream not initialized") return False # Test consumer registration logger.info("\n๐Ÿ”— STEP 3: Validate Consumer Registration") logger.info("-" * 40) stream_stats = dashboard.unified_stream.get_stream_stats() logger.info(f"๐Ÿ“Š Stream Stats: {stream_stats}") if stream_stats['total_consumers'] > 0: logger.info(f"โœ… {stream_stats['total_consumers']} consumers registered") else: logger.warning("โš ๏ธ No consumers registered") # Test data callback logger.info("\n๐Ÿ“ก STEP 4: Test Data Callback") logger.info("-" * 40) # Create test data packet test_data = { 'timestamp': time.time(), 'consumer_id': dashboard.stream_consumer_id, 'consumer_name': 'CleanTradingDashboard', 'ticks': [ {'symbol': 'ETHUSDT', 'price': 3000.0, 'volume': 1.5, 'timestamp': time.time()}, {'symbol': 'ETHUSDT', 'price': 3001.0, 'volume': 2.0, 'timestamp': time.time()}, ], 'ohlcv': {'one_second_bars': [], 'multi_timeframe': { 'ETH/USDT': { '1s': [{'timestamp': time.time(), 'open': 3000, 'high': 3002, 'low': 2999, 'close': 3001, 'volume': 10}], '1m': [{'timestamp': time.time(), 'open': 2990, 'high': 3010, 'low': 2985, 'close': 3001, 'volume': 100}], '1h': [{'timestamp': time.time(), 'open': 2900, 'high': 3050, 'low': 2880, 'close': 3001, 'volume': 1000}], '1d': [{'timestamp': time.time(), 'open': 2800, 'high': 3200, 'low': 2750, 'close': 3001, 'volume': 10000}] }, 'BTC/USDT': { '1s': [{'timestamp': time.time(), 'open': 65000, 'high': 65020, 'low': 64980, 'close': 65010, 'volume': 0.5}] } }}, 'training_data': {'market_state': 'test', 'features': []}, 'ui_data': {'formatted_data': 'test_ui_data'} } # Test callback manually try: dashboard._handle_unified_stream_data(test_data) logger.info("โœ… Data callback executed successfully") # Check if data was processed if hasattr(dashboard, 'current_prices') and 'ETH/USDT' in dashboard.current_prices: logger.info(f"โœ… Price updated: ETH/USDT = ${dashboard.current_prices['ETH/USDT']}") else: logger.warning("โš ๏ธ Prices not updated in dashboard") except Exception as e: logger.error(f"โŒ Data callback failed: {e}") return False # Test Universal Data Adapter logger.info("\n๐Ÿ”„ STEP 5: Test Universal Data Adapter") logger.info("-" * 40) if hasattr(orchestrator, 'universal_adapter'): universal_stream = orchestrator.universal_adapter.get_universal_data_stream() if universal_stream: logger.info("โœ… Universal Data Adapter working") logger.info(f"๐Ÿ“Š ETH ticks: {len(universal_stream.eth_ticks)} samples") logger.info(f"๐Ÿ“Š ETH 1m: {len(universal_stream.eth_1m)} candles") logger.info(f"๐Ÿ“Š ETH 1h: {len(universal_stream.eth_1h)} candles") logger.info(f"๐Ÿ“Š ETH 1d: {len(universal_stream.eth_1d)} candles") logger.info(f"๐Ÿ“Š BTC ticks: {len(universal_stream.btc_ticks)} samples") # Validate format is_valid, issues = orchestrator.universal_adapter.validate_universal_format(universal_stream) if is_valid: logger.info("โœ… Universal format validation passed") else: logger.warning(f"โš ๏ธ Format issues: {issues}") else: logger.error("โŒ Universal Data Adapter failed to get stream") return False else: logger.error("โŒ Universal Data Adapter not found in orchestrator") return False # Summary logger.info("\n๐ŸŽฏ SUMMARY") logger.info("-" * 40) logger.info("โœ… Universal Data Stream properly integrated") logger.info("โœ… Dashboard subscribes as consumer") logger.info("โœ… All 5 timeseries format validated") logger.info("โœ… Data callback processing works") logger.info("โœ… Universal Data Adapter functional") logger.info("\n๐Ÿ† INTEGRATION TEST PASSED") return True except Exception as e: logger.error(f"โŒ Integration test failed: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": success = asyncio.run(test_universal_stream_integration()) sys.exit(0 if success else 1)