Files
gogo2/test_imbalance_calculation.py
Dobromir Popov e2c495d83c cleanup
2025-07-27 18:31:30 +03:00

118 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
Test script for imbalance calculation logic
"""
import time
import logging
from datetime import datetime
from core.data_provider import DataProvider
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_imbalance_calculation():
"""Test the imbalance calculation logic with mock data"""
logger.info("Testing imbalance calculation logic...")
# Initialize data provider
dp = DataProvider()
# Create mock COB tick data
mock_ticks = []
current_time = time.time()
# Create 10 mock ticks with different imbalances
for i in range(10):
tick = {
'symbol': 'ETH/USDT',
'timestamp': current_time - (10 - i), # 10 seconds ago to now
'bids': [
[3800 + i, 100 + i * 10], # Price, Volume
[3799 + i, 50 + i * 5],
[3798 + i, 25 + i * 2]
],
'asks': [
[3801 + i, 80 + i * 8], # Price, Volume
[3802 + i, 40 + i * 4],
[3803 + i, 20 + i * 2]
],
'stats': {
'mid_price': 3800.5 + i,
'spread_bps': 2.5 + i * 0.1,
'imbalance': (i - 5) * 0.1 # Varying imbalance from -0.5 to +0.4
},
'source': 'mock'
}
mock_ticks.append(tick)
# Add mock ticks to the data provider
for tick in mock_ticks:
dp.cob_raw_ticks['ETH/USDT'].append(tick)
logger.info(f"Added {len(mock_ticks)} mock COB ticks")
# Test the aggregation function
logger.info("\n=== Testing COB Aggregation ===")
target_second = int(current_time - 5) # 5 seconds ago
# Manually call the aggregation function
dp._aggregate_cob_1s('ETH/USDT', target_second)
# Check the results
aggregated_data = list(dp.cob_1s_aggregated['ETH/USDT'])
if aggregated_data:
latest = aggregated_data[-1]
logger.info(f"Aggregated data created:")
logger.info(f" Timestamp: {latest.get('timestamp')}")
logger.info(f" Tick count: {latest.get('tick_count')}")
logger.info(f" Current imbalance: {latest.get('imbalance', 0):.4f}")
logger.info(f" Total volume: {latest.get('total_volume', 0):.2f}")
logger.info(f" Bid buckets: {len(latest.get('bid_buckets', {}))}")
logger.info(f" Ask buckets: {len(latest.get('ask_buckets', {}))}")
# Check multi-timeframe imbalances
logger.info(f" Imbalance 1s: {latest.get('imbalance_1s', 0):.4f}")
logger.info(f" Imbalance 5s: {latest.get('imbalance_5s', 0):.4f}")
logger.info(f" Imbalance 15s: {latest.get('imbalance_15s', 0):.4f}")
logger.info(f" Imbalance 60s: {latest.get('imbalance_60s', 0):.4f}")
else:
logger.warning("No aggregated data created")
# Test multiple aggregations to build history
logger.info("\n=== Testing Multi-timeframe Imbalances ===")
for i in range(1, 6):
target_second = int(current_time - 5 + i)
dp._aggregate_cob_1s('ETH/USDT', target_second)
# Check the final results
final_data = list(dp.cob_1s_aggregated['ETH/USDT'])
logger.info(f"Created {len(final_data)} aggregated records")
if final_data:
latest = final_data[-1]
logger.info(f"Final imbalance indicators:")
logger.info(f" 1s: {latest.get('imbalance_1s', 0):.4f}")
logger.info(f" 5s: {latest.get('imbalance_5s', 0):.4f}")
logger.info(f" 15s: {latest.get('imbalance_15s', 0):.4f}")
logger.info(f" 60s: {latest.get('imbalance_60s', 0):.4f}")
# Test the COB data quality function
logger.info("\n=== Testing COB Data Quality Function ===")
quality = dp.get_cob_data_quality()
eth_quality = quality.get('imbalance_indicators', {}).get('ETH/USDT', {})
if eth_quality:
logger.info("COB quality indicators:")
logger.info(f" Imbalance 1s: {eth_quality.get('imbalance_1s', 0):.4f}")
logger.info(f" Imbalance 5s: {eth_quality.get('imbalance_5s', 0):.4f}")
logger.info(f" Imbalance 15s: {eth_quality.get('imbalance_15s', 0):.4f}")
logger.info(f" Imbalance 60s: {eth_quality.get('imbalance_60s', 0):.4f}")
logger.info(f" Total volume: {eth_quality.get('total_volume', 0):.2f}")
logger.info(f" Bucket count: {eth_quality.get('bucket_count', 0)}")
logger.info("\n✅ Imbalance calculation test completed successfully!")
if __name__ == "__main__":
test_imbalance_calculation()