182 lines
7.2 KiB
Python
182 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test Data Integration
|
|
|
|
Test that the FIFO queues are properly populated from the data provider
|
|
"""
|
|
|
|
import time
|
|
from core.orchestrator import TradingOrchestrator
|
|
from core.data_provider import DataProvider
|
|
|
|
def test_data_provider_methods():
|
|
"""Test what methods are available in the data provider"""
|
|
print("=== Testing Data Provider Methods ===")
|
|
|
|
try:
|
|
data_provider = DataProvider()
|
|
|
|
# Check available methods
|
|
methods = [method for method in dir(data_provider) if not method.startswith('_') and callable(getattr(data_provider, method))]
|
|
data_methods = [method for method in methods if 'data' in method.lower() or 'ohlcv' in method.lower() or 'historical' in method.lower() or 'latest' in method.lower()]
|
|
|
|
print("Available data-related methods:")
|
|
for method in sorted(data_methods):
|
|
print(f" - {method}")
|
|
|
|
# Test getting historical data
|
|
print(f"\nTesting get_historical_data:")
|
|
try:
|
|
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=5)
|
|
if df is not None and not df.empty:
|
|
print(f" ✅ Got {len(df)} rows of 1m data")
|
|
print(f" Columns: {list(df.columns)}")
|
|
print(f" Latest close: {df['close'].iloc[-1]}")
|
|
else:
|
|
print(f" ❌ No data returned")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
|
|
# Test getting latest candles if available
|
|
if hasattr(data_provider, 'get_latest_candles'):
|
|
print(f"\nTesting get_latest_candles:")
|
|
try:
|
|
df = data_provider.get_latest_candles('ETH/USDT', '1m', limit=5)
|
|
if df is not None and not df.empty:
|
|
print(f" ✅ Got {len(df)} rows of latest candles")
|
|
print(f" Latest close: {df['close'].iloc[-1]}")
|
|
else:
|
|
print(f" ❌ No data returned")
|
|
except Exception as e:
|
|
print(f" ❌ Error: {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
return False
|
|
|
|
def test_queue_population():
|
|
"""Test that queues get populated with data"""
|
|
print("\n=== Testing Queue Population ===")
|
|
|
|
try:
|
|
data_provider = DataProvider()
|
|
orchestrator = TradingOrchestrator(data_provider)
|
|
|
|
# Wait a moment for initial population
|
|
print("Waiting 3 seconds for initial data population...")
|
|
time.sleep(3)
|
|
|
|
# Check queue status
|
|
print("\nQueue status after initialization:")
|
|
orchestrator.log_queue_status(detailed=True)
|
|
|
|
# Check if we have minimum data
|
|
symbols_to_check = ['ETH/USDT', 'BTC/USDT']
|
|
timeframes_to_check = ['1s', '1m', '1h', '1d']
|
|
min_requirements = {'1s': 100, '1m': 50, '1h': 20, '1d': 10}
|
|
|
|
print(f"\nChecking minimum data requirements:")
|
|
for symbol in symbols_to_check:
|
|
print(f"\n{symbol}:")
|
|
for timeframe in timeframes_to_check:
|
|
min_count = min_requirements.get(timeframe, 10)
|
|
has_min = orchestrator.ensure_minimum_data(f'ohlcv_{timeframe}', symbol, min_count)
|
|
actual_count = 0
|
|
if f'ohlcv_{timeframe}' in orchestrator.data_queues and symbol in orchestrator.data_queues[f'ohlcv_{timeframe}']:
|
|
with orchestrator.data_queue_locks[f'ohlcv_{timeframe}'][symbol]:
|
|
actual_count = len(orchestrator.data_queues[f'ohlcv_{timeframe}'][symbol])
|
|
|
|
status = "✅" if has_min else "❌"
|
|
print(f" {timeframe}: {status} {actual_count}/{min_count}")
|
|
|
|
# Test BaseDataInput building
|
|
print(f"\nTesting BaseDataInput building:")
|
|
base_data = orchestrator.build_base_data_input('ETH/USDT')
|
|
if base_data:
|
|
features = base_data.get_feature_vector()
|
|
print(f" ✅ BaseDataInput built successfully")
|
|
print(f" Feature vector size: {len(features)}")
|
|
print(f" OHLCV 1s bars: {len(base_data.ohlcv_1s)}")
|
|
print(f" OHLCV 1m bars: {len(base_data.ohlcv_1m)}")
|
|
print(f" BTC bars: {len(base_data.btc_ohlcv_1s)}")
|
|
else:
|
|
print(f" ❌ Failed to build BaseDataInput")
|
|
|
|
return base_data is not None
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
return False
|
|
|
|
def test_polling_thread():
|
|
"""Test that the polling thread is working"""
|
|
print("\n=== Testing Polling Thread ===")
|
|
|
|
try:
|
|
data_provider = DataProvider()
|
|
orchestrator = TradingOrchestrator(data_provider)
|
|
|
|
# Get initial queue counts
|
|
initial_status = orchestrator.get_queue_status()
|
|
print("Initial queue counts:")
|
|
for data_type, symbols in initial_status.items():
|
|
for symbol, count in symbols.items():
|
|
if count > 0:
|
|
print(f" {data_type}/{symbol}: {count}")
|
|
|
|
# Wait for polling thread to run
|
|
print("\nWaiting 10 seconds for polling thread...")
|
|
time.sleep(10)
|
|
|
|
# Get updated queue counts
|
|
updated_status = orchestrator.get_queue_status()
|
|
print("\nUpdated queue counts:")
|
|
for data_type, symbols in updated_status.items():
|
|
for symbol, count in symbols.items():
|
|
if count > 0:
|
|
print(f" {data_type}/{symbol}: {count}")
|
|
|
|
# Check if any queues grew
|
|
growth_detected = False
|
|
for data_type in initial_status:
|
|
for symbol in initial_status[data_type]:
|
|
initial_count = initial_status[data_type][symbol]
|
|
updated_count = updated_status[data_type][symbol]
|
|
if updated_count > initial_count:
|
|
print(f" ✅ Growth detected: {data_type}/{symbol} {initial_count} -> {updated_count}")
|
|
growth_detected = True
|
|
|
|
if not growth_detected:
|
|
print(" ⚠️ No queue growth detected - polling may not be working")
|
|
|
|
return growth_detected
|
|
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Run all data integration tests"""
|
|
print("=== Data Integration Test Suite ===")
|
|
|
|
test1_passed = test_data_provider_methods()
|
|
test2_passed = test_queue_population()
|
|
test3_passed = test_polling_thread()
|
|
|
|
print(f"\n=== Results ===")
|
|
print(f"Data provider methods: {'✅ PASSED' if test1_passed else '❌ FAILED'}")
|
|
print(f"Queue population: {'✅ PASSED' if test2_passed else '❌ FAILED'}")
|
|
print(f"Polling thread: {'✅ PASSED' if test3_passed else '❌ FAILED'}")
|
|
|
|
if test1_passed and test2_passed:
|
|
print("\n✅ Data integration is working!")
|
|
print("✅ FIFO queues should be populated with data")
|
|
print("✅ Models should be able to make predictions")
|
|
else:
|
|
print("\n❌ Data integration issues detected")
|
|
print("❌ Check data provider connectivity and methods")
|
|
|
|
if __name__ == "__main__":
|
|
main() |