Files
gogo2/tests/test_enhanced_order_flow_integration.py
2025-06-25 11:42:12 +03:00

318 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Test Enhanced Order Flow Integration
Tests the enhanced order flow analysis capabilities including:
- Aggressive vs passive participant ratios
- Institutional vs retail trade detection
- Market maker vs taker flow analysis
- Order flow intensity measurements
- Liquidity consumption and price impact analysis
- Block trade and iceberg order detection
- High-frequency trading activity detection
Usage:
python test_enhanced_order_flow_integration.py
"""
import asyncio
import logging
import time
import json
from datetime import datetime, timedelta
from core.bookmap_integration import BookmapIntegration
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('enhanced_order_flow_test.log')
]
)
logger = logging.getLogger(__name__)
class EnhancedOrderFlowTester:
"""Test enhanced order flow analysis features"""
def __init__(self):
self.bookmap = None
self.symbols = ['ETHUSDT', 'BTCUSDT']
self.test_duration = 300 # 5 minutes
self.metrics_history = []
async def setup_integration(self):
"""Initialize the Bookmap integration"""
try:
logger.info("Setting up Enhanced Order Flow Integration...")
self.bookmap = BookmapIntegration(symbols=self.symbols)
# Add callbacks for testing
self.bookmap.add_cnn_callback(self._cnn_callback)
self.bookmap.add_dqn_callback(self._dqn_callback)
logger.info(f"Integration setup complete for symbols: {self.symbols}")
return True
except Exception as e:
logger.error(f"Failed to setup integration: {e}")
return False
def _cnn_callback(self, symbol: str, features: dict):
"""CNN callback for testing"""
logger.debug(f"CNN features received for {symbol}: {len(features.get('features', []))} dimensions")
def _dqn_callback(self, symbol: str, state: dict):
"""DQN callback for testing"""
logger.debug(f"DQN state received for {symbol}: {len(state.get('state', []))} dimensions")
async def start_streaming(self):
"""Start real-time data streaming"""
try:
logger.info("Starting enhanced order flow streaming...")
await self.bookmap.start_streaming()
logger.info("Streaming started successfully")
return True
except Exception as e:
logger.error(f"Failed to start streaming: {e}")
return False
async def monitor_order_flow(self):
"""Monitor and analyze order flow for test duration"""
logger.info(f"Monitoring enhanced order flow for {self.test_duration} seconds...")
start_time = time.time()
iteration = 0
while time.time() - start_time < self.test_duration:
try:
iteration += 1
# Test each symbol
for symbol in self.symbols:
await self._analyze_symbol_flow(symbol, iteration)
# Wait 10 seconds between analyses
await asyncio.sleep(10)
except Exception as e:
logger.error(f"Error during monitoring iteration {iteration}: {e}")
await asyncio.sleep(5)
logger.info("Order flow monitoring completed")
async def _analyze_symbol_flow(self, symbol: str, iteration: int):
"""Analyze order flow for a specific symbol"""
try:
# Get enhanced order flow metrics
flow_metrics = self.bookmap.get_enhanced_order_flow_metrics(symbol)
if not flow_metrics:
logger.warning(f"No flow metrics available for {symbol}")
return
# Log key metrics
aggressive_passive = flow_metrics['aggressive_passive']
institutional_retail = flow_metrics['institutional_retail']
flow_intensity = flow_metrics['flow_intensity']
price_impact = flow_metrics['price_impact']
maker_taker = flow_metrics['maker_taker_flow']
logger.info(f"\n=== {symbol} Order Flow Analysis (Iteration {iteration}) ===")
logger.info(f"Aggressive Ratio: {aggressive_passive['aggressive_ratio']:.2%}")
logger.info(f"Passive Ratio: {aggressive_passive['passive_ratio']:.2%}")
logger.info(f"Institutional Ratio: {institutional_retail['institutional_ratio']:.2%}")
logger.info(f"Retail Ratio: {institutional_retail['retail_ratio']:.2%}")
logger.info(f"Flow Intensity: {flow_intensity['current_intensity']:.2f} ({flow_intensity['intensity_category']})")
logger.info(f"Price Impact: {price_impact['avg_impact']:.2f} bps ({price_impact['impact_category']})")
logger.info(f"Buy Pressure: {maker_taker['buy_pressure']:.2%}")
logger.info(f"Sell Pressure: {maker_taker['sell_pressure']:.2%}")
# Trade size analysis
size_dist = flow_metrics['size_distribution']
total_trades = sum(size_dist.values())
if total_trades > 0:
logger.info(f"Trade Size Distribution (last 100 trades):")
logger.info(f" Micro (<$1K): {size_dist.get('micro', 0)} ({size_dist.get('micro', 0)/total_trades:.1%})")
logger.info(f" Small ($1K-$10K): {size_dist.get('small', 0)} ({size_dist.get('small', 0)/total_trades:.1%})")
logger.info(f" Medium ($10K-$50K): {size_dist.get('medium', 0)} ({size_dist.get('medium', 0)/total_trades:.1%})")
logger.info(f" Large ($50K-$100K): {size_dist.get('large', 0)} ({size_dist.get('large', 0)/total_trades:.1%})")
logger.info(f" Block (>$100K): {size_dist.get('block', 0)} ({size_dist.get('block', 0)/total_trades:.1%})")
# Volume analysis
if 'volume_stats' in flow_metrics and flow_metrics['volume_stats']:
volume_stats = flow_metrics['volume_stats']
logger.info(f"24h Volume: {volume_stats.get('volume_24h', 0):,.0f}")
logger.info(f"24h Quote Volume: ${volume_stats.get('quote_volume_24h', 0):,.0f}")
# Store metrics for analysis
self.metrics_history.append({
'timestamp': datetime.now(),
'symbol': symbol,
'iteration': iteration,
'metrics': flow_metrics
})
# Test CNN and DQN features
await self._test_model_features(symbol)
except Exception as e:
logger.error(f"Error analyzing flow for {symbol}: {e}")
async def _test_model_features(self, symbol: str):
"""Test CNN and DQN feature extraction"""
try:
# Test CNN features
cnn_features = self.bookmap.get_cnn_features(symbol)
if cnn_features is not None:
logger.info(f"CNN Features: {len(cnn_features)} dimensions")
logger.info(f" Order book features: {cnn_features[:80].mean():.4f} (avg)")
logger.info(f" Liquidity metrics: {cnn_features[80:90].mean():.4f} (avg)")
logger.info(f" Imbalance features: {cnn_features[90:95].mean():.4f} (avg)")
logger.info(f" Enhanced flow features: {cnn_features[95:].mean():.4f} (avg)")
# Test DQN features
dqn_features = self.bookmap.get_dqn_state_features(symbol)
if dqn_features is not None:
logger.info(f"DQN State: {len(dqn_features)} dimensions")
logger.info(f" Order book state: {dqn_features[:20].mean():.4f} (avg)")
logger.info(f" Market indicators: {dqn_features[20:30].mean():.4f} (avg)")
logger.info(f" Enhanced flow state: {dqn_features[30:].mean():.4f} (avg)")
# Test dashboard data
dashboard_data = self.bookmap.get_dashboard_data(symbol)
if dashboard_data and 'enhanced_order_flow' in dashboard_data:
logger.info("Dashboard data includes enhanced order flow metrics")
except Exception as e:
logger.error(f"Error testing model features for {symbol}: {e}")
async def stop_streaming(self):
"""Stop data streaming"""
try:
logger.info("Stopping order flow streaming...")
await self.bookmap.stop_streaming()
logger.info("Streaming stopped")
except Exception as e:
logger.error(f"Error stopping streaming: {e}")
def generate_summary_report(self):
"""Generate a summary report of the test"""
try:
logger.info("\n" + "="*60)
logger.info("ENHANCED ORDER FLOW ANALYSIS SUMMARY")
logger.info("="*60)
if not self.metrics_history:
logger.warning("No metrics data collected during test")
return
# Group by symbol
symbol_data = {}
for entry in self.metrics_history:
symbol = entry['symbol']
if symbol not in symbol_data:
symbol_data[symbol] = []
symbol_data[symbol].append(entry)
# Analyze each symbol
for symbol, data in symbol_data.items():
logger.info(f"\n--- {symbol} Analysis ---")
logger.info(f"Data points collected: {len(data)}")
if len(data) > 0:
# Calculate averages
avg_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in data) / len(data)
avg_institutional = sum(d['metrics']['institutional_retail']['institutional_ratio'] for d in data) / len(data)
avg_intensity = sum(d['metrics']['flow_intensity']['current_intensity'] for d in data) / len(data)
avg_impact = sum(d['metrics']['price_impact']['avg_impact'] for d in data) / len(data)
logger.info(f"Average Aggressive Ratio: {avg_aggressive:.2%}")
logger.info(f"Average Institutional Ratio: {avg_institutional:.2%}")
logger.info(f"Average Flow Intensity: {avg_intensity:.2f}")
logger.info(f"Average Price Impact: {avg_impact:.2f} bps")
# Detect trends
first_half = data[:len(data)//2] if len(data) > 1 else data
second_half = data[len(data)//2:] if len(data) > 1 else data
if len(first_half) > 0 and len(second_half) > 0:
first_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in first_half) / len(first_half)
second_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in second_half) / len(second_half)
trend = "increasing" if second_aggressive > first_aggressive else "decreasing"
logger.info(f"Aggressive trading trend: {trend}")
logger.info("\n" + "="*60)
logger.info("Test completed successfully!")
logger.info("Enhanced order flow analysis is working correctly.")
logger.info("="*60)
except Exception as e:
logger.error(f"Error generating summary report: {e}")
async def run_enhanced_order_flow_test():
"""Run the complete enhanced order flow test"""
tester = EnhancedOrderFlowTester()
try:
# Setup
logger.info("Starting Enhanced Order Flow Integration Test")
logger.info("This test will demonstrate:")
logger.info("- Aggressive vs Passive participant analysis")
logger.info("- Institutional vs Retail trade detection")
logger.info("- Order flow intensity measurements")
logger.info("- Price impact and liquidity consumption analysis")
logger.info("- Block trade and iceberg order detection")
logger.info("- Enhanced CNN and DQN feature extraction")
if not await tester.setup_integration():
logger.error("Failed to setup integration")
return False
# Start streaming
if not await tester.start_streaming():
logger.error("Failed to start streaming")
return False
# Wait for initial data
logger.info("Waiting 30 seconds for initial data...")
await asyncio.sleep(30)
# Monitor order flow
await tester.monitor_order_flow()
# Generate report
tester.generate_summary_report()
return True
except Exception as e:
logger.error(f"Test failed: {e}")
return False
finally:
# Cleanup
try:
await tester.stop_streaming()
except Exception as e:
logger.error(f"Error during cleanup: {e}")
if __name__ == "__main__":
try:
# Run the test
success = asyncio.run(run_enhanced_order_flow_test())
if success:
print("\n✅ Enhanced Order Flow Integration Test PASSED")
print("All enhanced order flow analysis features are working correctly!")
else:
print("\n❌ Enhanced Order Flow Integration Test FAILED")
print("Check the logs for details.")
except KeyboardInterrupt:
print("\n⚠️ Test interrupted by user")
except Exception as e:
print(f"\n💥 Test crashed: {e}")