Files
gogo2/test_data_integration.py
Dobromir Popov c349ff6f30 fifo n1 que
2025-07-26 21:34:16 +03:00

182 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Test Data Integration
Test that the FIFO queues are properly populated from the data provider
"""
import time
from core.orchestrator import TradingOrchestrator
from core.data_provider import DataProvider
def test_data_provider_methods():
"""Test what methods are available in the data provider"""
print("=== Testing Data Provider Methods ===")
try:
data_provider = DataProvider()
# Check available methods
methods = [method for method in dir(data_provider) if not method.startswith('_') and callable(getattr(data_provider, method))]
data_methods = [method for method in methods if 'data' in method.lower() or 'ohlcv' in method.lower() or 'historical' in method.lower() or 'latest' in method.lower()]
print("Available data-related methods:")
for method in sorted(data_methods):
print(f" - {method}")
# Test getting historical data
print(f"\nTesting get_historical_data:")
try:
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=5)
if df is not None and not df.empty:
print(f" ✅ Got {len(df)} rows of 1m data")
print(f" Columns: {list(df.columns)}")
print(f" Latest close: {df['close'].iloc[-1]}")
else:
print(f" ❌ No data returned")
except Exception as e:
print(f" ❌ Error: {e}")
# Test getting latest candles if available
if hasattr(data_provider, 'get_latest_candles'):
print(f"\nTesting get_latest_candles:")
try:
df = data_provider.get_latest_candles('ETH/USDT', '1m', limit=5)
if df is not None and not df.empty:
print(f" ✅ Got {len(df)} rows of latest candles")
print(f" Latest close: {df['close'].iloc[-1]}")
else:
print(f" ❌ No data returned")
except Exception as e:
print(f" ❌ Error: {e}")
return True
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def test_queue_population():
"""Test that queues get populated with data"""
print("\n=== Testing Queue Population ===")
try:
data_provider = DataProvider()
orchestrator = TradingOrchestrator(data_provider)
# Wait a moment for initial population
print("Waiting 3 seconds for initial data population...")
time.sleep(3)
# Check queue status
print("\nQueue status after initialization:")
orchestrator.log_queue_status(detailed=True)
# Check if we have minimum data
symbols_to_check = ['ETH/USDT', 'BTC/USDT']
timeframes_to_check = ['1s', '1m', '1h', '1d']
min_requirements = {'1s': 100, '1m': 50, '1h': 20, '1d': 10}
print(f"\nChecking minimum data requirements:")
for symbol in symbols_to_check:
print(f"\n{symbol}:")
for timeframe in timeframes_to_check:
min_count = min_requirements.get(timeframe, 10)
has_min = orchestrator.ensure_minimum_data(f'ohlcv_{timeframe}', symbol, min_count)
actual_count = 0
if f'ohlcv_{timeframe}' in orchestrator.data_queues and symbol in orchestrator.data_queues[f'ohlcv_{timeframe}']:
with orchestrator.data_queue_locks[f'ohlcv_{timeframe}'][symbol]:
actual_count = len(orchestrator.data_queues[f'ohlcv_{timeframe}'][symbol])
status = "" if has_min else ""
print(f" {timeframe}: {status} {actual_count}/{min_count}")
# Test BaseDataInput building
print(f"\nTesting BaseDataInput building:")
base_data = orchestrator.build_base_data_input('ETH/USDT')
if base_data:
features = base_data.get_feature_vector()
print(f" ✅ BaseDataInput built successfully")
print(f" Feature vector size: {len(features)}")
print(f" OHLCV 1s bars: {len(base_data.ohlcv_1s)}")
print(f" OHLCV 1m bars: {len(base_data.ohlcv_1m)}")
print(f" BTC bars: {len(base_data.btc_ohlcv_1s)}")
else:
print(f" ❌ Failed to build BaseDataInput")
return base_data is not None
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def test_polling_thread():
"""Test that the polling thread is working"""
print("\n=== Testing Polling Thread ===")
try:
data_provider = DataProvider()
orchestrator = TradingOrchestrator(data_provider)
# Get initial queue counts
initial_status = orchestrator.get_queue_status()
print("Initial queue counts:")
for data_type, symbols in initial_status.items():
for symbol, count in symbols.items():
if count > 0:
print(f" {data_type}/{symbol}: {count}")
# Wait for polling thread to run
print("\nWaiting 10 seconds for polling thread...")
time.sleep(10)
# Get updated queue counts
updated_status = orchestrator.get_queue_status()
print("\nUpdated queue counts:")
for data_type, symbols in updated_status.items():
for symbol, count in symbols.items():
if count > 0:
print(f" {data_type}/{symbol}: {count}")
# Check if any queues grew
growth_detected = False
for data_type in initial_status:
for symbol in initial_status[data_type]:
initial_count = initial_status[data_type][symbol]
updated_count = updated_status[data_type][symbol]
if updated_count > initial_count:
print(f" ✅ Growth detected: {data_type}/{symbol} {initial_count} -> {updated_count}")
growth_detected = True
if not growth_detected:
print(" ⚠️ No queue growth detected - polling may not be working")
return growth_detected
except Exception as e:
print(f"❌ Test failed: {e}")
return False
def main():
"""Run all data integration tests"""
print("=== Data Integration Test Suite ===")
test1_passed = test_data_provider_methods()
test2_passed = test_queue_population()
test3_passed = test_polling_thread()
print(f"\n=== Results ===")
print(f"Data provider methods: {'✅ PASSED' if test1_passed else '❌ FAILED'}")
print(f"Queue population: {'✅ PASSED' if test2_passed else '❌ FAILED'}")
print(f"Polling thread: {'✅ PASSED' if test3_passed else '❌ FAILED'}")
if test1_passed and test2_passed:
print("\n✅ Data integration is working!")
print("✅ FIFO queues should be populated with data")
print("✅ Models should be able to make predictions")
else:
print("\n❌ Data integration issues detected")
print("❌ Check data provider connectivity and methods")
if __name__ == "__main__":
main()