118 lines
4.5 KiB
Python
118 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for imbalance calculation logic
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from core.data_provider import DataProvider
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def test_imbalance_calculation():
|
|
"""Test the imbalance calculation logic with mock data"""
|
|
logger.info("Testing imbalance calculation logic...")
|
|
|
|
# Initialize data provider
|
|
dp = DataProvider()
|
|
|
|
# Create mock COB tick data
|
|
mock_ticks = []
|
|
current_time = time.time()
|
|
|
|
# Create 10 mock ticks with different imbalances
|
|
for i in range(10):
|
|
tick = {
|
|
'symbol': 'ETH/USDT',
|
|
'timestamp': current_time - (10 - i), # 10 seconds ago to now
|
|
'bids': [
|
|
[3800 + i, 100 + i * 10], # Price, Volume
|
|
[3799 + i, 50 + i * 5],
|
|
[3798 + i, 25 + i * 2]
|
|
],
|
|
'asks': [
|
|
[3801 + i, 80 + i * 8], # Price, Volume
|
|
[3802 + i, 40 + i * 4],
|
|
[3803 + i, 20 + i * 2]
|
|
],
|
|
'stats': {
|
|
'mid_price': 3800.5 + i,
|
|
'spread_bps': 2.5 + i * 0.1,
|
|
'imbalance': (i - 5) * 0.1 # Varying imbalance from -0.5 to +0.4
|
|
},
|
|
'source': 'mock'
|
|
}
|
|
mock_ticks.append(tick)
|
|
|
|
# Add mock ticks to the data provider
|
|
for tick in mock_ticks:
|
|
dp.cob_raw_ticks['ETH/USDT'].append(tick)
|
|
|
|
logger.info(f"Added {len(mock_ticks)} mock COB ticks")
|
|
|
|
# Test the aggregation function
|
|
logger.info("\n=== Testing COB Aggregation ===")
|
|
target_second = int(current_time - 5) # 5 seconds ago
|
|
|
|
# Manually call the aggregation function
|
|
dp._aggregate_cob_1s('ETH/USDT', target_second)
|
|
|
|
# Check the results
|
|
aggregated_data = list(dp.cob_1s_aggregated['ETH/USDT'])
|
|
if aggregated_data:
|
|
latest = aggregated_data[-1]
|
|
logger.info(f"Aggregated data created:")
|
|
logger.info(f" Timestamp: {latest.get('timestamp')}")
|
|
logger.info(f" Tick count: {latest.get('tick_count')}")
|
|
logger.info(f" Current imbalance: {latest.get('imbalance', 0):.4f}")
|
|
logger.info(f" Total volume: {latest.get('total_volume', 0):.2f}")
|
|
logger.info(f" Bid buckets: {len(latest.get('bid_buckets', {}))}")
|
|
logger.info(f" Ask buckets: {len(latest.get('ask_buckets', {}))}")
|
|
|
|
# Check multi-timeframe imbalances
|
|
logger.info(f" Imbalance 1s: {latest.get('imbalance_1s', 0):.4f}")
|
|
logger.info(f" Imbalance 5s: {latest.get('imbalance_5s', 0):.4f}")
|
|
logger.info(f" Imbalance 15s: {latest.get('imbalance_15s', 0):.4f}")
|
|
logger.info(f" Imbalance 60s: {latest.get('imbalance_60s', 0):.4f}")
|
|
else:
|
|
logger.warning("No aggregated data created")
|
|
|
|
# Test multiple aggregations to build history
|
|
logger.info("\n=== Testing Multi-timeframe Imbalances ===")
|
|
for i in range(1, 6):
|
|
target_second = int(current_time - 5 + i)
|
|
dp._aggregate_cob_1s('ETH/USDT', target_second)
|
|
|
|
# Check the final results
|
|
final_data = list(dp.cob_1s_aggregated['ETH/USDT'])
|
|
logger.info(f"Created {len(final_data)} aggregated records")
|
|
|
|
if final_data:
|
|
latest = final_data[-1]
|
|
logger.info(f"Final imbalance indicators:")
|
|
logger.info(f" 1s: {latest.get('imbalance_1s', 0):.4f}")
|
|
logger.info(f" 5s: {latest.get('imbalance_5s', 0):.4f}")
|
|
logger.info(f" 15s: {latest.get('imbalance_15s', 0):.4f}")
|
|
logger.info(f" 60s: {latest.get('imbalance_60s', 0):.4f}")
|
|
|
|
# Test the COB data quality function
|
|
logger.info("\n=== Testing COB Data Quality Function ===")
|
|
quality = dp.get_cob_data_quality()
|
|
|
|
eth_quality = quality.get('imbalance_indicators', {}).get('ETH/USDT', {})
|
|
if eth_quality:
|
|
logger.info("COB quality indicators:")
|
|
logger.info(f" Imbalance 1s: {eth_quality.get('imbalance_1s', 0):.4f}")
|
|
logger.info(f" Imbalance 5s: {eth_quality.get('imbalance_5s', 0):.4f}")
|
|
logger.info(f" Imbalance 15s: {eth_quality.get('imbalance_15s', 0):.4f}")
|
|
logger.info(f" Imbalance 60s: {eth_quality.get('imbalance_60s', 0):.4f}")
|
|
logger.info(f" Total volume: {eth_quality.get('total_volume', 0):.2f}")
|
|
logger.info(f" Bucket count: {eth_quality.get('bucket_count', 0)}")
|
|
|
|
logger.info("\n✅ Imbalance calculation test completed successfully!")
|
|
|
|
if __name__ == "__main__":
|
|
test_imbalance_calculation() |