#!/usr/bin/env python3 """ Test script for imbalance calculation logic """ import time import logging from datetime import datetime from core.data_provider import DataProvider # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def test_imbalance_calculation(): """Test the imbalance calculation logic with mock data""" logger.info("Testing imbalance calculation logic...") # Initialize data provider dp = DataProvider() # Create mock COB tick data mock_ticks = [] current_time = time.time() # Create 10 mock ticks with different imbalances for i in range(10): tick = { 'symbol': 'ETH/USDT', 'timestamp': current_time - (10 - i), # 10 seconds ago to now 'bids': [ [3800 + i, 100 + i * 10], # Price, Volume [3799 + i, 50 + i * 5], [3798 + i, 25 + i * 2] ], 'asks': [ [3801 + i, 80 + i * 8], # Price, Volume [3802 + i, 40 + i * 4], [3803 + i, 20 + i * 2] ], 'stats': { 'mid_price': 3800.5 + i, 'spread_bps': 2.5 + i * 0.1, 'imbalance': (i - 5) * 0.1 # Varying imbalance from -0.5 to +0.4 }, 'source': 'mock' } mock_ticks.append(tick) # Add mock ticks to the data provider for tick in mock_ticks: dp.cob_raw_ticks['ETH/USDT'].append(tick) logger.info(f"Added {len(mock_ticks)} mock COB ticks") # Test the aggregation function logger.info("\n=== Testing COB Aggregation ===") target_second = int(current_time - 5) # 5 seconds ago # Manually call the aggregation function dp._aggregate_cob_1s('ETH/USDT', target_second) # Check the results aggregated_data = list(dp.cob_1s_aggregated['ETH/USDT']) if aggregated_data: latest = aggregated_data[-1] logger.info(f"Aggregated data created:") logger.info(f" Timestamp: {latest.get('timestamp')}") logger.info(f" Tick count: {latest.get('tick_count')}") logger.info(f" Current imbalance: {latest.get('imbalance', 0):.4f}") logger.info(f" Total volume: {latest.get('total_volume', 0):.2f}") logger.info(f" Bid buckets: {len(latest.get('bid_buckets', {}))}") logger.info(f" Ask buckets: {len(latest.get('ask_buckets', {}))}") # Check multi-timeframe imbalances logger.info(f" Imbalance 1s: {latest.get('imbalance_1s', 0):.4f}") logger.info(f" Imbalance 5s: {latest.get('imbalance_5s', 0):.4f}") logger.info(f" Imbalance 15s: {latest.get('imbalance_15s', 0):.4f}") logger.info(f" Imbalance 60s: {latest.get('imbalance_60s', 0):.4f}") else: logger.warning("No aggregated data created") # Test multiple aggregations to build history logger.info("\n=== Testing Multi-timeframe Imbalances ===") for i in range(1, 6): target_second = int(current_time - 5 + i) dp._aggregate_cob_1s('ETH/USDT', target_second) # Check the final results final_data = list(dp.cob_1s_aggregated['ETH/USDT']) logger.info(f"Created {len(final_data)} aggregated records") if final_data: latest = final_data[-1] logger.info(f"Final imbalance indicators:") logger.info(f" 1s: {latest.get('imbalance_1s', 0):.4f}") logger.info(f" 5s: {latest.get('imbalance_5s', 0):.4f}") logger.info(f" 15s: {latest.get('imbalance_15s', 0):.4f}") logger.info(f" 60s: {latest.get('imbalance_60s', 0):.4f}") # Test the COB data quality function logger.info("\n=== Testing COB Data Quality Function ===") quality = dp.get_cob_data_quality() eth_quality = quality.get('imbalance_indicators', {}).get('ETH/USDT', {}) if eth_quality: logger.info("COB quality indicators:") logger.info(f" Imbalance 1s: {eth_quality.get('imbalance_1s', 0):.4f}") logger.info(f" Imbalance 5s: {eth_quality.get('imbalance_5s', 0):.4f}") logger.info(f" Imbalance 15s: {eth_quality.get('imbalance_15s', 0):.4f}") logger.info(f" Imbalance 60s: {eth_quality.get('imbalance_60s', 0):.4f}") logger.info(f" Total volume: {eth_quality.get('total_volume', 0):.2f}") logger.info(f" Bucket count: {eth_quality.get('bucket_count', 0)}") logger.info("\n✅ Imbalance calculation test completed successfully!") if __name__ == "__main__": test_imbalance_calculation()