#!/usr/bin/env python3 """ Test FIFO Queue System Verify that the orchestrator's FIFO queue system works correctly """ import time from datetime import datetime from core.orchestrator import TradingOrchestrator from core.data_provider import DataProvider from core.data_models import OHLCVBar def test_fifo_queue_operations(): """Test basic FIFO queue operations""" print("=== Testing FIFO Queue Operations ===") try: # Create orchestrator data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Test queue status status = orchestrator.get_queue_status() print(f"Initial queue status: {status}") # Test adding data to queues test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0, high=2510.0, low=2490.0, close=2505.0, volume=1000.0, timeframe="1s" ) # Add test data success = orchestrator.update_data_queue('ohlcv_1s', 'ETH/USDT', test_bar) print(f"Added OHLCV data: {success}") # Check queue status after adding data status = orchestrator.get_queue_status() print(f"Queue status after adding data: {status}") # Test retrieving data latest_data = orchestrator.get_latest_data('ohlcv_1s', 'ETH/USDT', 1) print(f"Retrieved latest data: {len(latest_data)} items") if latest_data: bar = latest_data[0] print(f" Bar: {bar.symbol} {bar.close} @ {bar.timestamp}") # Test minimum data check has_min_data = orchestrator.ensure_minimum_data('ohlcv_1s', 'ETH/USDT', 1) print(f"Has minimum data (1): {has_min_data}") has_min_data_100 = orchestrator.ensure_minimum_data('ohlcv_1s', 'ETH/USDT', 100) print(f"Has minimum data (100): {has_min_data_100}") return True except Exception as e: print(f"❌ FIFO queue operations test failed: {e}") return False def test_data_queue_filling(): """Test filling queues with multiple data points""" print("\n=== Testing Data Queue Filling ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Add multiple OHLCV bars for i in range(150): # Add 150 bars test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'ETH/USDT', test_bar) # Check queue status status = orchestrator.get_queue_status() print(f"Queue status after adding 150 bars: {status}") # Test minimum data requirements has_min_data = orchestrator.ensure_minimum_data('ohlcv_1s', 'ETH/USDT', 100) print(f"Has minimum data (100): {has_min_data}") # Get all data all_data = orchestrator.get_queue_data('ohlcv_1s', 'ETH/USDT') print(f"Total data in queue: {len(all_data)} items") # Test max_items parameter limited_data = orchestrator.get_queue_data('ohlcv_1s', 'ETH/USDT', max_items=50) print(f"Limited data (50): {len(limited_data)} items") return True except Exception as e: print(f"❌ Data queue filling test failed: {e}") return False def test_base_data_input_building(): """Test building BaseDataInput from FIFO queues""" print("\n=== Testing BaseDataInput Building ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Fill queues with sufficient data timeframes = ['1s', '1m', '1h', '1d'] min_counts = [100, 50, 20, 10] for timeframe, min_count in zip(timeframes, min_counts): for i in range(min_count + 10): # Add a bit more than minimum test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe=timeframe ) orchestrator.update_data_queue(f'ohlcv_{timeframe}', 'ETH/USDT', test_bar) # Add BTC data for i in range(110): btc_bar = OHLCVBar( symbol="BTC/USDT", timestamp=datetime.now(), open=50000.0 + i, high=50100.0 + i, low=49900.0 + i, close=50050.0 + i, volume=100.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'BTC/USDT', btc_bar) # Add technical indicators test_indicators = {'rsi': 50.0, 'macd': 0.1, 'bb_upper': 2520.0, 'bb_lower': 2480.0} orchestrator.update_data_queue('technical_indicators', 'ETH/USDT', test_indicators) # Try to build BaseDataInput base_data = orchestrator.build_base_data_input('ETH/USDT') if base_data: print("✅ BaseDataInput built successfully") # Test feature vector features = base_data.get_feature_vector() print(f" Feature vector size: {len(features)}") print(f" Symbol: {base_data.symbol}") print(f" OHLCV 1s data: {len(base_data.ohlcv_1s)} bars") print(f" OHLCV 1m data: {len(base_data.ohlcv_1m)} bars") print(f" BTC data: {len(base_data.btc_ohlcv_1s)} bars") print(f" Technical indicators: {len(base_data.technical_indicators)}") # Validate is_valid = base_data.validate() print(f" Validation: {is_valid}") return True else: print("❌ Failed to build BaseDataInput") return False except Exception as e: print(f"❌ BaseDataInput building test failed: {e}") return False def test_consistent_feature_size(): """Test that feature vectors are always the same size""" print("\n=== Testing Consistent Feature Size ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Fill with minimal data first for timeframe, min_count in [('1s', 100), ('1m', 50), ('1h', 20), ('1d', 10)]: for i in range(min_count): test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe=timeframe ) orchestrator.update_data_queue(f'ohlcv_{timeframe}', 'ETH/USDT', test_bar) # Add BTC data for i in range(100): btc_bar = OHLCVBar( symbol="BTC/USDT", timestamp=datetime.now(), open=50000.0 + i, high=50100.0 + i, low=49900.0 + i, close=50050.0 + i, volume=100.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'BTC/USDT', btc_bar) feature_sizes = [] # Test multiple scenarios scenarios = [ ("Minimal data", {}), ("With indicators", {'rsi': 50.0, 'macd': 0.1}), ("More indicators", {'rsi': 45.0, 'macd': 0.2, 'bb_upper': 2520.0, 'bb_lower': 2480.0, 'ema_20': 2500.0}) ] for name, indicators in scenarios: if indicators: orchestrator.update_data_queue('technical_indicators', 'ETH/USDT', indicators) base_data = orchestrator.build_base_data_input('ETH/USDT') if base_data: features = base_data.get_feature_vector() feature_sizes.append(len(features)) print(f"{name}: {len(features)} features") else: print(f"{name}: Failed to build BaseDataInput") return False # Check consistency if len(set(feature_sizes)) == 1: print(f"✅ All feature vectors have consistent size: {feature_sizes[0]}") return True else: print(f"❌ Inconsistent feature sizes: {feature_sizes}") return False except Exception as e: print(f"❌ Consistent feature size test failed: {e}") return False def main(): """Run all FIFO queue tests""" print("=== FIFO Queue System Test Suite ===\n") tests = [ test_fifo_queue_operations, test_data_queue_filling, test_base_data_input_building, test_consistent_feature_size ] passed = 0 total = len(tests) for test in tests: if test(): passed += 1 print() print(f"=== Test Results: {passed}/{total} passed ===") if passed == total: print("✅ ALL TESTS PASSED!") print("✅ FIFO queue system is working correctly") print("✅ Consistent data flow ensured") print("✅ No more network rebuilding issues") else: print("❌ Some tests failed") if __name__ == "__main__": main()