Files
gogo2/test_data_provider_integration.py
Dobromir Popov e2c495d83c cleanup
2025-07-27 18:31:30 +03:00

112 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
Integration test for the simplified data provider with other components
"""
import time
import logging
import pandas as pd
from core.data_provider import DataProvider
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_integration():
"""Test integration with other components"""
logger.info("Testing DataProvider integration...")
# Initialize data provider
dp = DataProvider()
# Wait for initial data load
logger.info("Waiting for initial data load...")
time.sleep(15)
# Test 1: Feature matrix generation
logger.info("\n=== Test 1: Feature Matrix Generation ===")
try:
feature_matrix = dp.get_feature_matrix('ETH/USDT', ['1m', '1h'], window_size=20)
if feature_matrix is not None:
logger.info(f"✅ Feature matrix shape: {feature_matrix.shape}")
else:
logger.warning("❌ Feature matrix generation failed")
except Exception as e:
logger.error(f"❌ Feature matrix error: {e}")
# Test 2: Multi-symbol data access
logger.info("\n=== Test 2: Multi-Symbol Data Access ===")
for symbol in ['ETH/USDT', 'BTC/USDT']:
for timeframe in ['1s', '1m', '1h', '1d']:
data = dp.get_historical_data(symbol, timeframe, limit=10)
if data is not None and not data.empty:
logger.info(f"{symbol} {timeframe}: {len(data)} candles")
else:
logger.warning(f"{symbol} {timeframe}: No data")
# Test 3: Data consistency checks
logger.info("\n=== Test 3: Data Consistency ===")
eth_1m = dp.get_historical_data('ETH/USDT', '1m', limit=100)
if eth_1m is not None and not eth_1m.empty:
# Check for proper OHLCV structure
required_cols = ['open', 'high', 'low', 'close', 'volume']
has_all_cols = all(col in eth_1m.columns for col in required_cols)
logger.info(f"✅ OHLCV columns present: {has_all_cols}")
# Check data types
numeric_cols = eth_1m[required_cols].dtypes
all_numeric = all(pd.api.types.is_numeric_dtype(dtype) for dtype in numeric_cols)
logger.info(f"✅ All columns numeric: {all_numeric}")
# Check for NaN values
has_nan = eth_1m[required_cols].isna().any().any()
logger.info(f"✅ No NaN values: {not has_nan}")
# Check price relationships (high >= low, etc.)
price_valid = (eth_1m['high'] >= eth_1m['low']).all()
logger.info(f"✅ Price relationships valid: {price_valid}")
# Test 4: Performance test
logger.info("\n=== Test 4: Performance Test ===")
start_time = time.time()
for i in range(100):
data = dp.get_historical_data('ETH/USDT', '1m', limit=50)
end_time = time.time()
avg_time = (end_time - start_time) / 100 * 1000 # ms
logger.info(f"✅ Average data access time: {avg_time:.2f}ms")
# Test 5: Current price accuracy
logger.info("\n=== Test 5: Current Price Accuracy ===")
eth_price = dp.get_current_price('ETH/USDT')
eth_data = dp.get_historical_data('ETH/USDT', '1s', limit=1)
if eth_data is not None and not eth_data.empty:
latest_close = eth_data.iloc[-1]['close']
price_match = abs(eth_price - latest_close) < 0.01
logger.info(f"✅ Current price matches latest candle: {price_match}")
logger.info(f" Current price: ${eth_price}")
logger.info(f" Latest close: ${latest_close}")
# Test 6: Cache efficiency
logger.info("\n=== Test 6: Cache Efficiency ===")
cache_summary = dp.get_cached_data_summary()
total_candles = 0
for symbol_data in cache_summary['cached_data'].values():
for tf_data in symbol_data.values():
if isinstance(tf_data, dict) and 'candle_count' in tf_data:
total_candles += tf_data['candle_count']
logger.info(f"✅ Total cached candles: {total_candles}")
logger.info(f"✅ Data maintenance active: {cache_summary['data_maintenance_active']}")
# Test 7: Memory usage estimation
logger.info("\n=== Test 7: Memory Usage Estimation ===")
# Rough estimation: 8 columns * 8 bytes * total_candles
estimated_memory_mb = (total_candles * 8 * 8) / (1024 * 1024)
logger.info(f"✅ Estimated memory usage: {estimated_memory_mb:.2f} MB")
# Clean shutdown
dp.stop_automatic_data_maintenance()
logger.info("\n✅ Integration test completed successfully!")
if __name__ == "__main__":
test_integration()