#!/usr/bin/env python3 """ Test Data Integration Test that the FIFO queues are properly populated from the data provider """ import time from core.orchestrator import TradingOrchestrator from core.data_provider import DataProvider def test_data_provider_methods(): """Test what methods are available in the data provider""" print("=== Testing Data Provider Methods ===") try: data_provider = DataProvider() # Check available methods methods = [method for method in dir(data_provider) if not method.startswith('_') and callable(getattr(data_provider, method))] data_methods = [method for method in methods if 'data' in method.lower() or 'ohlcv' in method.lower() or 'historical' in method.lower() or 'latest' in method.lower()] print("Available data-related methods:") for method in sorted(data_methods): print(f" - {method}") # Test getting historical data print(f"\nTesting get_historical_data:") try: df = data_provider.get_historical_data('ETH/USDT', '1m', limit=5) if df is not None and not df.empty: print(f" ✅ Got {len(df)} rows of 1m data") print(f" Columns: {list(df.columns)}") print(f" Latest close: {df['close'].iloc[-1]}") else: print(f" ❌ No data returned") except Exception as e: print(f" ❌ Error: {e}") # Test getting latest candles if available if hasattr(data_provider, 'get_latest_candles'): print(f"\nTesting get_latest_candles:") try: df = data_provider.get_latest_candles('ETH/USDT', '1m', limit=5) if df is not None and not df.empty: print(f" ✅ Got {len(df)} rows of latest candles") print(f" Latest close: {df['close'].iloc[-1]}") else: print(f" ❌ No data returned") except Exception as e: print(f" ❌ Error: {e}") return True except Exception as e: print(f"❌ Test failed: {e}") return False def test_queue_population(): """Test that queues get populated with data""" print("\n=== Testing Queue Population ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Wait a moment for initial population print("Waiting 3 seconds for initial data population...") time.sleep(3) # Check queue status print("\nQueue status after initialization:") orchestrator.log_queue_status(detailed=True) # Check if we have minimum data symbols_to_check = ['ETH/USDT', 'BTC/USDT'] timeframes_to_check = ['1s', '1m', '1h', '1d'] min_requirements = {'1s': 100, '1m': 50, '1h': 20, '1d': 10} print(f"\nChecking minimum data requirements:") for symbol in symbols_to_check: print(f"\n{symbol}:") for timeframe in timeframes_to_check: min_count = min_requirements.get(timeframe, 10) has_min = orchestrator.ensure_minimum_data(f'ohlcv_{timeframe}', symbol, min_count) actual_count = 0 if f'ohlcv_{timeframe}' in orchestrator.data_queues and symbol in orchestrator.data_queues[f'ohlcv_{timeframe}']: with orchestrator.data_queue_locks[f'ohlcv_{timeframe}'][symbol]: actual_count = len(orchestrator.data_queues[f'ohlcv_{timeframe}'][symbol]) status = "✅" if has_min else "❌" print(f" {timeframe}: {status} {actual_count}/{min_count}") # Test BaseDataInput building print(f"\nTesting BaseDataInput building:") base_data = orchestrator.build_base_data_input('ETH/USDT') if base_data: features = base_data.get_feature_vector() print(f" ✅ BaseDataInput built successfully") print(f" Feature vector size: {len(features)}") print(f" OHLCV 1s bars: {len(base_data.ohlcv_1s)}") print(f" OHLCV 1m bars: {len(base_data.ohlcv_1m)}") print(f" BTC bars: {len(base_data.btc_ohlcv_1s)}") else: print(f" ❌ Failed to build BaseDataInput") return base_data is not None except Exception as e: print(f"❌ Test failed: {e}") return False def test_polling_thread(): """Test that the polling thread is working""" print("\n=== Testing Polling Thread ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Get initial queue counts initial_status = orchestrator.get_queue_status() print("Initial queue counts:") for data_type, symbols in initial_status.items(): for symbol, count in symbols.items(): if count > 0: print(f" {data_type}/{symbol}: {count}") # Wait for polling thread to run print("\nWaiting 10 seconds for polling thread...") time.sleep(10) # Get updated queue counts updated_status = orchestrator.get_queue_status() print("\nUpdated queue counts:") for data_type, symbols in updated_status.items(): for symbol, count in symbols.items(): if count > 0: print(f" {data_type}/{symbol}: {count}") # Check if any queues grew growth_detected = False for data_type in initial_status: for symbol in initial_status[data_type]: initial_count = initial_status[data_type][symbol] updated_count = updated_status[data_type][symbol] if updated_count > initial_count: print(f" ✅ Growth detected: {data_type}/{symbol} {initial_count} -> {updated_count}") growth_detected = True if not growth_detected: print(" ⚠️ No queue growth detected - polling may not be working") return growth_detected except Exception as e: print(f"❌ Test failed: {e}") return False def main(): """Run all data integration tests""" print("=== Data Integration Test Suite ===") test1_passed = test_data_provider_methods() test2_passed = test_queue_population() test3_passed = test_polling_thread() print(f"\n=== Results ===") print(f"Data provider methods: {'✅ PASSED' if test1_passed else '❌ FAILED'}") print(f"Queue population: {'✅ PASSED' if test2_passed else '❌ FAILED'}") print(f"Polling thread: {'✅ PASSED' if test3_passed else '❌ FAILED'}") if test1_passed and test2_passed: print("\n✅ Data integration is working!") print("✅ FIFO queues should be populated with data") print("✅ Models should be able to make predictions") else: print("\n❌ Data integration issues detected") print("❌ Check data provider connectivity and methods") if __name__ == "__main__": main()