Files
gogo2/test_improved_data_integration.py
Dobromir Popov c349ff6f30 fifo n1 que
2025-07-26 21:34:16 +03:00

222 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
Test Improved Data Integration
Test the enhanced data integration with fallback strategies
"""
import time
from core.orchestrator import TradingOrchestrator
from core.data_provider import DataProvider
def test_enhanced_data_population():
"""Test enhanced data population with fallback strategies"""
print("=== Testing Enhanced Data Population ===")
try:
data_provider = DataProvider()
orchestrator = TradingOrchestrator(data_provider)
# Wait for initial population
print("Waiting 5 seconds for enhanced data population...")
time.sleep(5)
# Check detailed queue status
print("\nDetailed queue status after enhanced population:")
orchestrator.log_queue_status(detailed=True)
# Check minimum data requirements
symbols_to_check = ['ETH/USDT', 'BTC/USDT']
timeframes_to_check = ['1s', '1m', '1h', '1d']
min_requirements = {'1s': 100, '1m': 50, '1h': 20, '1d': 10}
print(f"\nChecking minimum data requirements with fallback:")
all_requirements_met = True
for symbol in symbols_to_check:
print(f"\n{symbol}:")
symbol_requirements_met = True
for timeframe in timeframes_to_check:
min_count = min_requirements.get(timeframe, 10)
has_min = orchestrator.ensure_minimum_data(f'ohlcv_{timeframe}', symbol, min_count)
actual_count = 0
if f'ohlcv_{timeframe}' in orchestrator.data_queues and symbol in orchestrator.data_queues[f'ohlcv_{timeframe}']:
with orchestrator.data_queue_locks[f'ohlcv_{timeframe}'][symbol]:
actual_count = len(orchestrator.data_queues[f'ohlcv_{timeframe}'][symbol])
status = "" if has_min else ""
print(f" {timeframe}: {status} {actual_count}/{min_count}")
if not has_min:
symbol_requirements_met = False
all_requirements_met = False
# Check technical indicators
indicators_count = 0
if 'technical_indicators' in orchestrator.data_queues and symbol in orchestrator.data_queues['technical_indicators']:
with orchestrator.data_queue_locks['technical_indicators'][symbol]:
indicators_data = list(orchestrator.data_queues['technical_indicators'][symbol])
if indicators_data:
indicators_count = len(indicators_data[-1]) # Latest indicators dict
indicators_status = "" if indicators_count > 0 else ""
print(f" indicators: {indicators_status} {indicators_count} calculated")
# Test BaseDataInput building
print(f"\nTesting BaseDataInput building with fallback:")
for symbol in ['ETH/USDT', 'BTC/USDT']:
base_data = orchestrator.build_base_data_input(symbol)
if base_data:
features = base_data.get_feature_vector()
print(f"{symbol}: BaseDataInput built successfully")
print(f" Feature vector size: {len(features)}")
print(f" OHLCV 1s bars: {len(base_data.ohlcv_1s)}")
print(f" OHLCV 1m bars: {len(base_data.ohlcv_1m)}")
print(f" OHLCV 1h bars: {len(base_data.ohlcv_1h)}")
print(f" OHLCV 1d bars: {len(base_data.ohlcv_1d)}")
print(f" BTC bars: {len(base_data.btc_ohlcv_1s)}")
print(f" Technical indicators: {len(base_data.technical_indicators)}")
# Validate feature vector
if len(features) == 7850:
print(f" ✅ Feature vector has correct size (7850)")
else:
print(f" ❌ Feature vector size mismatch: {len(features)} != 7850")
else:
print(f"{symbol}: Failed to build BaseDataInput")
return all_requirements_met
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def test_fallback_strategies():
"""Test specific fallback strategies"""
print("\n=== Testing Fallback Strategies ===")
try:
data_provider = DataProvider()
orchestrator = TradingOrchestrator(data_provider)
# Wait for initial population
time.sleep(3)
# Check if fallback strategies were used
print("Checking fallback strategy usage:")
# Check ETH/USDT 1s data (likely to need fallback)
eth_1s_count = 0
if 'ohlcv_1s' in orchestrator.data_queues and 'ETH/USDT' in orchestrator.data_queues['ohlcv_1s']:
with orchestrator.data_queue_locks['ohlcv_1s']['ETH/USDT']:
eth_1s_count = len(orchestrator.data_queues['ohlcv_1s']['ETH/USDT'])
if eth_1s_count >= 100:
print(f" ✅ ETH/USDT 1s data: {eth_1s_count} bars (fallback likely used)")
else:
print(f" ❌ ETH/USDT 1s data: {eth_1s_count} bars (fallback may have failed)")
# Check ETH/USDT 1h data (likely to need fallback)
eth_1h_count = 0
if 'ohlcv_1h' in orchestrator.data_queues and 'ETH/USDT' in orchestrator.data_queues['ohlcv_1h']:
with orchestrator.data_queue_locks['ohlcv_1h']['ETH/USDT']:
eth_1h_count = len(orchestrator.data_queues['ohlcv_1h']['ETH/USDT'])
if eth_1h_count >= 20:
print(f" ✅ ETH/USDT 1h data: {eth_1h_count} bars (fallback likely used)")
else:
print(f" ❌ ETH/USDT 1h data: {eth_1h_count} bars (fallback may have failed)")
# Test manual fallback strategy
print(f"\nTesting manual fallback strategy:")
missing_data = [('ohlcv_1s', 0, 100), ('ohlcv_1h', 0, 20)]
fallback_success = orchestrator._try_fallback_data_strategy('ETH/USDT', missing_data)
print(f" Manual fallback result: {'✅ SUCCESS' if fallback_success else '❌ FAILED'}")
return eth_1s_count >= 100 and eth_1h_count >= 20
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def test_model_predictions():
"""Test that models can now make predictions with the improved data"""
print("\n=== Testing Model Predictions ===")
try:
data_provider = DataProvider()
orchestrator = TradingOrchestrator(data_provider)
# Wait for data population
time.sleep(5)
# Try to make predictions
print("Testing model prediction capability:")
# Test CNN prediction
try:
base_data = orchestrator.build_base_data_input('ETH/USDT')
if base_data:
print(" ✅ BaseDataInput available for CNN")
# Test feature vector
features = base_data.get_feature_vector()
if len(features) == 7850:
print(" ✅ Feature vector has correct size for CNN")
print(" ✅ CNN should be able to make predictions without rebuilding")
else:
print(f" ❌ Feature vector size issue: {len(features)} != 7850")
else:
print(" ❌ BaseDataInput not available for CNN")
except Exception as e:
print(f" ❌ CNN prediction test failed: {e}")
# Test RL prediction
try:
base_data = orchestrator.build_base_data_input('ETH/USDT')
if base_data:
print(" ✅ BaseDataInput available for RL")
# Test state features
state_features = base_data.get_feature_vector()
if len(state_features) == 7850:
print(" ✅ State features have correct size for RL")
else:
print(f" ❌ State features size issue: {len(state_features)} != 7850")
else:
print(" ❌ BaseDataInput not available for RL")
except Exception as e:
print(f" ❌ RL prediction test failed: {e}")
return base_data is not None
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def main():
"""Run all enhanced data integration tests"""
print("=== Enhanced Data Integration Test Suite ===")
test1_passed = test_enhanced_data_population()
test2_passed = test_fallback_strategies()
test3_passed = test_model_predictions()
print(f"\n=== Results ===")
print(f"Enhanced data population: {'✅ PASSED' if test1_passed else '❌ FAILED'}")
print(f"Fallback strategies: {'✅ PASSED' if test2_passed else '❌ FAILED'}")
print(f"Model predictions: {'✅ PASSED' if test3_passed else '❌ FAILED'}")
if test1_passed and test2_passed and test3_passed:
print("\n✅ ALL TESTS PASSED!")
print("✅ Enhanced data integration is working!")
print("✅ Fallback strategies provide missing data")
print("✅ Models should be able to make predictions")
print("✅ No more 'Insufficient data' errors expected")
else:
print("\n⚠️ Some tests failed, but system may still work")
print("⚠️ Check specific failures above")
if __name__ == "__main__":
main()