#!/usr/bin/env python3 """ Test Queue Logging Test the improved logging for FIFO queue status """ from datetime import datetime from core.orchestrator import TradingOrchestrator from core.data_provider import DataProvider from core.data_models import OHLCVBar def test_insufficient_data_logging(): """Test logging when there's insufficient data""" print("=== Testing Insufficient Data Logging ===") try: # Create orchestrator data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Log initial empty queue status print("\n1. Initial queue status:") orchestrator.log_queue_status(detailed=True) # Try to build BaseDataInput with no data (should show detailed warnings) print("\n2. Attempting to build BaseDataInput with no data:") base_data = orchestrator.build_base_data_input('ETH/USDT') print(f"Result: {base_data is not None}") # Add some data but not enough print("\n3. Adding insufficient data (50 bars, need 100):") for i in range(50): test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'ETH/USDT', test_bar) # Log queue status after adding some data print("\n4. Queue status after adding 50 bars:") orchestrator.log_queue_status(detailed=True) # Try to build BaseDataInput again (should show we have 50, need 100) print("\n5. Attempting to build BaseDataInput with insufficient data:") base_data = orchestrator.build_base_data_input('ETH/USDT') print(f"Result: {base_data is not None}") # Add enough data for ohlcv_1s but not other timeframes print("\n6. Adding enough 1s data (150 total) but missing other timeframes:") for i in range(50, 150): test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'ETH/USDT', test_bar) # Try again (should show 1s is OK but 1m/1h/1d are missing) print("\n7. Attempting to build BaseDataInput with mixed data availability:") base_data = orchestrator.build_base_data_input('ETH/USDT') print(f"Result: {base_data is not None}") return True except Exception as e: print(f"❌ Test failed: {e}") return False def test_queue_status_logging(): """Test detailed queue status logging""" print("\n=== Testing Queue Status Logging ===") try: data_provider = DataProvider() orchestrator = TradingOrchestrator(data_provider) # Add various types of data print("\n1. Adding mixed data types:") # Add OHLCV data for i in range(75): test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'ETH/USDT', test_bar) # Add some 1m data for i in range(25): test_bar = OHLCVBar( symbol="ETH/USDT", timestamp=datetime.now(), open=2500.0 + i, high=2510.0 + i, low=2490.0 + i, close=2505.0 + i, volume=1000.0 + i, timeframe="1m" ) orchestrator.update_data_queue('ohlcv_1m', 'ETH/USDT', test_bar) # Add technical indicators indicators = {'rsi': 45.5, 'macd': 0.15, 'bb_upper': 2520.0} orchestrator.update_data_queue('technical_indicators', 'ETH/USDT', indicators) # Add BTC data for i in range(60): btc_bar = OHLCVBar( symbol="BTC/USDT", timestamp=datetime.now(), open=50000.0 + i, high=50100.0 + i, low=49900.0 + i, close=50050.0 + i, volume=100.0 + i, timeframe="1s" ) orchestrator.update_data_queue('ohlcv_1s', 'BTC/USDT', btc_bar) print("\n2. Detailed queue status:") orchestrator.log_queue_status(detailed=True) print("\n3. Simple queue status:") orchestrator.log_queue_status(detailed=False) print("\n4. Attempting to build BaseDataInput:") base_data = orchestrator.build_base_data_input('ETH/USDT') print(f"Result: {base_data is not None}") return True except Exception as e: print(f"❌ Test failed: {e}") return False def main(): """Run logging tests""" print("=== Queue Logging Test Suite ===") test1_passed = test_insufficient_data_logging() test2_passed = test_queue_status_logging() print(f"\n=== Results ===") print(f"Insufficient data logging: {'✅ PASSED' if test1_passed else '❌ FAILED'}") print(f"Queue status logging: {'✅ PASSED' if test2_passed else '❌ FAILED'}") if test1_passed and test2_passed: print("\n✅ ALL TESTS PASSED!") print("✅ Improved logging shows actual vs required data counts") print("✅ Detailed queue status provides debugging information") else: print("\n❌ Some tests failed") if __name__ == "__main__": main()