#!/usr/bin/env python3 """ Test script for COB data quality and imbalance indicators """ import time import logging from core.data_provider import DataProvider # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def test_cob_data_quality(): """Test COB data quality and imbalance indicators""" logger.info("Testing COB data quality and imbalance indicators...") # Initialize data provider dp = DataProvider() # Wait for initial data load and COB connection logger.info("Waiting for initial data load and COB connection...") time.sleep(20) # Test 1: Check cached data summary logger.info("\n=== Test 1: Cached Data Summary ===") cache_summary = dp.get_cached_data_summary() for symbol in cache_summary['cached_data']: logger.info(f"\n{symbol}:") for timeframe, info in cache_summary['cached_data'][symbol].items(): if 'candle_count' in info and info['candle_count'] > 0: logger.info(f" {timeframe}: {info['candle_count']} candles, latest: ${info['latest_price']}") else: logger.info(f" {timeframe}: {info.get('status', 'no data')}") # Test 2: Check COB data quality logger.info("\n=== Test 2: COB Data Quality ===") cob_quality = dp.get_cob_data_quality() for symbol in cob_quality['symbols']: logger.info(f"\n{symbol} COB Data:") # Raw ticks raw_info = cob_quality['raw_ticks'].get(symbol, {}) logger.info(f" Raw ticks: {raw_info.get('count', 0)} ticks") if raw_info.get('age_seconds') is not None: logger.info(f" Raw data age: {raw_info['age_seconds']:.1f} seconds") # Aggregated 1s data agg_info = cob_quality['aggregated_1s'].get(symbol, {}) logger.info(f" Aggregated 1s: {agg_info.get('count', 0)} records") if agg_info.get('age_seconds') is not None: logger.info(f" Aggregated data age: {agg_info['age_seconds']:.1f} seconds") # Imbalance indicators imbalance_info = cob_quality['imbalance_indicators'].get(symbol, {}) if imbalance_info: logger.info(f" Imbalance 1s: {imbalance_info.get('imbalance_1s', 0):.4f}") logger.info(f" Imbalance 5s: {imbalance_info.get('imbalance_5s', 0):.4f}") logger.info(f" Imbalance 15s: {imbalance_info.get('imbalance_15s', 0):.4f}") logger.info(f" Imbalance 60s: {imbalance_info.get('imbalance_60s', 0):.4f}") logger.info(f" Total volume: {imbalance_info.get('total_volume', 0):.2f}") logger.info(f" Price buckets: {imbalance_info.get('bucket_count', 0)}") # Data freshness freshness = cob_quality['data_freshness'].get(symbol, 'unknown') logger.info(f" Data freshness: {freshness}") # Test 3: Get recent COB aggregated data logger.info("\n=== Test 3: Recent COB Aggregated Data ===") for symbol in ['ETH/USDT', 'BTC/USDT']: recent_cob = dp.get_cob_1s_aggregated(symbol, count=5) logger.info(f"\n{symbol} - Last 5 aggregated records:") for i, record in enumerate(recent_cob[-5:]): timestamp = record.get('timestamp', 0) imbalance_1s = record.get('imbalance_1s', 0) imbalance_5s = record.get('imbalance_5s', 0) total_volume = record.get('total_volume', 0) bucket_count = len(record.get('bid_buckets', {})) + len(record.get('ask_buckets', {})) logger.info(f" [{i+1}] Time: {timestamp}, Imb1s: {imbalance_1s:.4f}, " f"Imb5s: {imbalance_5s:.4f}, Vol: {total_volume:.2f}, Buckets: {bucket_count}") # Test 4: Monitor real-time updates logger.info("\n=== Test 4: Real-time Updates (30 seconds) ===") logger.info("Monitoring COB data updates...") initial_quality = dp.get_cob_data_quality() time.sleep(30) updated_quality = dp.get_cob_data_quality() for symbol in ['ETH/USDT', 'BTC/USDT']: initial_count = initial_quality['raw_ticks'].get(symbol, {}).get('count', 0) updated_count = updated_quality['raw_ticks'].get(symbol, {}).get('count', 0) new_ticks = updated_count - initial_count initial_agg = initial_quality['aggregated_1s'].get(symbol, {}).get('count', 0) updated_agg = updated_quality['aggregated_1s'].get(symbol, {}).get('count', 0) new_agg = updated_agg - initial_agg logger.info(f"{symbol}: +{new_ticks} raw ticks, +{new_agg} aggregated records") # Show latest imbalances latest_imbalances = updated_quality['imbalance_indicators'].get(symbol, {}) if latest_imbalances: logger.info(f" Latest imbalances: 1s={latest_imbalances.get('imbalance_1s', 0):.4f}, " f"5s={latest_imbalances.get('imbalance_5s', 0):.4f}, " f"15s={latest_imbalances.get('imbalance_15s', 0):.4f}, " f"60s={latest_imbalances.get('imbalance_60s', 0):.4f}") # Clean shutdown logger.info("\n=== Shutting Down ===") dp.stop_automatic_data_maintenance() logger.info("COB data quality test completed") if __name__ == "__main__": test_cob_data_quality()