Files
gogo2/tests/test_universal_stream_integration.py
2025-06-25 11:42:12 +03:00

177 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Test Universal Data Stream Integration with Dashboard
This script validates that:
1. CleanTradingDashboard properly subscribes to UnifiedDataStream
2. All 5 timeseries are properly received and processed
3. Data flows correctly from provider -> adapter -> stream -> dashboard
4. Consumer callback functions work as expected
"""
import asyncio
import logging
import sys
import time
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from core.config import get_config
from core.data_provider import DataProvider
from core.enhanced_orchestrator import EnhancedTradingOrchestrator
from core.trading_executor import TradingExecutor
from web.clean_dashboard import CleanTradingDashboard
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def test_universal_stream_integration():
"""Test Universal Data Stream integration with dashboard"""
logger.info("="*80)
logger.info("🧪 TESTING UNIVERSAL DATA STREAM INTEGRATION")
logger.info("="*80)
try:
# Initialize components
logger.info("\n📦 STEP 1: Initialize Components")
logger.info("-" * 40)
config = get_config()
data_provider = DataProvider()
orchestrator = EnhancedTradingOrchestrator(
data_provider=data_provider,
symbols=['ETH/USDT', 'BTC/USDT'],
enhanced_rl_training=True
)
trading_executor = TradingExecutor()
logger.info("✅ Core components initialized")
# Initialize dashboard with Universal Data Stream
logger.info("\n📊 STEP 2: Initialize Dashboard with Universal Stream")
logger.info("-" * 40)
dashboard = CleanTradingDashboard(
data_provider=data_provider,
orchestrator=orchestrator,
trading_executor=trading_executor
)
# Check Universal Stream initialization
if hasattr(dashboard, 'unified_stream') and dashboard.unified_stream:
logger.info("✅ Universal Data Stream initialized successfully")
logger.info(f"📋 Consumer ID: {dashboard.stream_consumer_id}")
else:
logger.error("❌ Universal Data Stream not initialized")
return False
# Test consumer registration
logger.info("\n🔗 STEP 3: Validate Consumer Registration")
logger.info("-" * 40)
stream_stats = dashboard.unified_stream.get_stream_stats()
logger.info(f"📊 Stream Stats: {stream_stats}")
if stream_stats['total_consumers'] > 0:
logger.info(f"{stream_stats['total_consumers']} consumers registered")
else:
logger.warning("⚠️ No consumers registered")
# Test data callback
logger.info("\n📡 STEP 4: Test Data Callback")
logger.info("-" * 40)
# Create test data packet
test_data = {
'timestamp': time.time(),
'consumer_id': dashboard.stream_consumer_id,
'consumer_name': 'CleanTradingDashboard',
'ticks': [
{'symbol': 'ETHUSDT', 'price': 3000.0, 'volume': 1.5, 'timestamp': time.time()},
{'symbol': 'ETHUSDT', 'price': 3001.0, 'volume': 2.0, 'timestamp': time.time()},
],
'ohlcv': {'one_second_bars': [], 'multi_timeframe': {
'ETH/USDT': {
'1s': [{'timestamp': time.time(), 'open': 3000, 'high': 3002, 'low': 2999, 'close': 3001, 'volume': 10}],
'1m': [{'timestamp': time.time(), 'open': 2990, 'high': 3010, 'low': 2985, 'close': 3001, 'volume': 100}],
'1h': [{'timestamp': time.time(), 'open': 2900, 'high': 3050, 'low': 2880, 'close': 3001, 'volume': 1000}],
'1d': [{'timestamp': time.time(), 'open': 2800, 'high': 3200, 'low': 2750, 'close': 3001, 'volume': 10000}]
},
'BTC/USDT': {
'1s': [{'timestamp': time.time(), 'open': 65000, 'high': 65020, 'low': 64980, 'close': 65010, 'volume': 0.5}]
}
}},
'training_data': {'market_state': 'test', 'features': []},
'ui_data': {'formatted_data': 'test_ui_data'}
}
# Test callback manually
try:
dashboard._handle_unified_stream_data(test_data)
logger.info("✅ Data callback executed successfully")
# Check if data was processed
if hasattr(dashboard, 'current_prices') and 'ETH/USDT' in dashboard.current_prices:
logger.info(f"✅ Price updated: ETH/USDT = ${dashboard.current_prices['ETH/USDT']}")
else:
logger.warning("⚠️ Prices not updated in dashboard")
except Exception as e:
logger.error(f"❌ Data callback failed: {e}")
return False
# Test Universal Data Adapter
logger.info("\n🔄 STEP 5: Test Universal Data Adapter")
logger.info("-" * 40)
if hasattr(orchestrator, 'universal_adapter'):
universal_stream = orchestrator.universal_adapter.get_universal_data_stream()
if universal_stream:
logger.info("✅ Universal Data Adapter working")
logger.info(f"📊 ETH ticks: {len(universal_stream.eth_ticks)} samples")
logger.info(f"📊 ETH 1m: {len(universal_stream.eth_1m)} candles")
logger.info(f"📊 ETH 1h: {len(universal_stream.eth_1h)} candles")
logger.info(f"📊 ETH 1d: {len(universal_stream.eth_1d)} candles")
logger.info(f"📊 BTC ticks: {len(universal_stream.btc_ticks)} samples")
# Validate format
is_valid, issues = orchestrator.universal_adapter.validate_universal_format(universal_stream)
if is_valid:
logger.info("✅ Universal format validation passed")
else:
logger.warning(f"⚠️ Format issues: {issues}")
else:
logger.error("❌ Universal Data Adapter failed to get stream")
return False
else:
logger.error("❌ Universal Data Adapter not found in orchestrator")
return False
# Summary
logger.info("\n🎯 SUMMARY")
logger.info("-" * 40)
logger.info("✅ Universal Data Stream properly integrated")
logger.info("✅ Dashboard subscribes as consumer")
logger.info("✅ All 5 timeseries format validated")
logger.info("✅ Data callback processing works")
logger.info("✅ Universal Data Adapter functional")
logger.info("\n🏆 INTEGRATION TEST PASSED")
return True
except Exception as e:
logger.error(f"❌ Integration test failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = asyncio.run(test_universal_stream_integration())
sys.exit(0 if success else 1)