#!/usr/bin/env python3 """ Test Enhanced Order Flow Integration Tests the enhanced order flow analysis capabilities including: - Aggressive vs passive participant ratios - Institutional vs retail trade detection - Market maker vs taker flow analysis - Order flow intensity measurements - Liquidity consumption and price impact analysis - Block trade and iceberg order detection - High-frequency trading activity detection Usage: python test_enhanced_order_flow_integration.py """ import asyncio import logging import time import json from datetime import datetime, timedelta from core.bookmap_integration import BookmapIntegration # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('enhanced_order_flow_test.log') ] ) logger = logging.getLogger(__name__) class EnhancedOrderFlowTester: """Test enhanced order flow analysis features""" def __init__(self): self.bookmap = None self.symbols = ['ETHUSDT', 'BTCUSDT'] self.test_duration = 300 # 5 minutes self.metrics_history = [] async def setup_integration(self): """Initialize the Bookmap integration""" try: logger.info("Setting up Enhanced Order Flow Integration...") self.bookmap = BookmapIntegration(symbols=self.symbols) # Add callbacks for testing self.bookmap.add_cnn_callback(self._cnn_callback) self.bookmap.add_dqn_callback(self._dqn_callback) logger.info(f"Integration setup complete for symbols: {self.symbols}") return True except Exception as e: logger.error(f"Failed to setup integration: {e}") return False def _cnn_callback(self, symbol: str, features: dict): """CNN callback for testing""" logger.debug(f"CNN features received for {symbol}: {len(features.get('features', []))} dimensions") def _dqn_callback(self, symbol: str, state: dict): """DQN callback for testing""" logger.debug(f"DQN state received for {symbol}: {len(state.get('state', []))} dimensions") async def start_streaming(self): """Start real-time data streaming""" try: logger.info("Starting enhanced order flow streaming...") await self.bookmap.start_streaming() logger.info("Streaming started successfully") return True except Exception as e: logger.error(f"Failed to start streaming: {e}") return False async def monitor_order_flow(self): """Monitor and analyze order flow for test duration""" logger.info(f"Monitoring enhanced order flow for {self.test_duration} seconds...") start_time = time.time() iteration = 0 while time.time() - start_time < self.test_duration: try: iteration += 1 # Test each symbol for symbol in self.symbols: await self._analyze_symbol_flow(symbol, iteration) # Wait 10 seconds between analyses await asyncio.sleep(10) except Exception as e: logger.error(f"Error during monitoring iteration {iteration}: {e}") await asyncio.sleep(5) logger.info("Order flow monitoring completed") async def _analyze_symbol_flow(self, symbol: str, iteration: int): """Analyze order flow for a specific symbol""" try: # Get enhanced order flow metrics flow_metrics = self.bookmap.get_enhanced_order_flow_metrics(symbol) if not flow_metrics: logger.warning(f"No flow metrics available for {symbol}") return # Log key metrics aggressive_passive = flow_metrics['aggressive_passive'] institutional_retail = flow_metrics['institutional_retail'] flow_intensity = flow_metrics['flow_intensity'] price_impact = flow_metrics['price_impact'] maker_taker = flow_metrics['maker_taker_flow'] logger.info(f"\n=== {symbol} Order Flow Analysis (Iteration {iteration}) ===") logger.info(f"Aggressive Ratio: {aggressive_passive['aggressive_ratio']:.2%}") logger.info(f"Passive Ratio: {aggressive_passive['passive_ratio']:.2%}") logger.info(f"Institutional Ratio: {institutional_retail['institutional_ratio']:.2%}") logger.info(f"Retail Ratio: {institutional_retail['retail_ratio']:.2%}") logger.info(f"Flow Intensity: {flow_intensity['current_intensity']:.2f} ({flow_intensity['intensity_category']})") logger.info(f"Price Impact: {price_impact['avg_impact']:.2f} bps ({price_impact['impact_category']})") logger.info(f"Buy Pressure: {maker_taker['buy_pressure']:.2%}") logger.info(f"Sell Pressure: {maker_taker['sell_pressure']:.2%}") # Trade size analysis size_dist = flow_metrics['size_distribution'] total_trades = sum(size_dist.values()) if total_trades > 0: logger.info(f"Trade Size Distribution (last 100 trades):") logger.info(f" Micro (<$1K): {size_dist.get('micro', 0)} ({size_dist.get('micro', 0)/total_trades:.1%})") logger.info(f" Small ($1K-$10K): {size_dist.get('small', 0)} ({size_dist.get('small', 0)/total_trades:.1%})") logger.info(f" Medium ($10K-$50K): {size_dist.get('medium', 0)} ({size_dist.get('medium', 0)/total_trades:.1%})") logger.info(f" Large ($50K-$100K): {size_dist.get('large', 0)} ({size_dist.get('large', 0)/total_trades:.1%})") logger.info(f" Block (>$100K): {size_dist.get('block', 0)} ({size_dist.get('block', 0)/total_trades:.1%})") # Volume analysis if 'volume_stats' in flow_metrics and flow_metrics['volume_stats']: volume_stats = flow_metrics['volume_stats'] logger.info(f"24h Volume: {volume_stats.get('volume_24h', 0):,.0f}") logger.info(f"24h Quote Volume: ${volume_stats.get('quote_volume_24h', 0):,.0f}") # Store metrics for analysis self.metrics_history.append({ 'timestamp': datetime.now(), 'symbol': symbol, 'iteration': iteration, 'metrics': flow_metrics }) # Test CNN and DQN features await self._test_model_features(symbol) except Exception as e: logger.error(f"Error analyzing flow for {symbol}: {e}") async def _test_model_features(self, symbol: str): """Test CNN and DQN feature extraction""" try: # Test CNN features cnn_features = self.bookmap.get_cnn_features(symbol) if cnn_features is not None: logger.info(f"CNN Features: {len(cnn_features)} dimensions") logger.info(f" Order book features: {cnn_features[:80].mean():.4f} (avg)") logger.info(f" Liquidity metrics: {cnn_features[80:90].mean():.4f} (avg)") logger.info(f" Imbalance features: {cnn_features[90:95].mean():.4f} (avg)") logger.info(f" Enhanced flow features: {cnn_features[95:].mean():.4f} (avg)") # Test DQN features dqn_features = self.bookmap.get_dqn_state_features(symbol) if dqn_features is not None: logger.info(f"DQN State: {len(dqn_features)} dimensions") logger.info(f" Order book state: {dqn_features[:20].mean():.4f} (avg)") logger.info(f" Market indicators: {dqn_features[20:30].mean():.4f} (avg)") logger.info(f" Enhanced flow state: {dqn_features[30:].mean():.4f} (avg)") # Test dashboard data dashboard_data = self.bookmap.get_dashboard_data(symbol) if dashboard_data and 'enhanced_order_flow' in dashboard_data: logger.info("Dashboard data includes enhanced order flow metrics") except Exception as e: logger.error(f"Error testing model features for {symbol}: {e}") async def stop_streaming(self): """Stop data streaming""" try: logger.info("Stopping order flow streaming...") await self.bookmap.stop_streaming() logger.info("Streaming stopped") except Exception as e: logger.error(f"Error stopping streaming: {e}") def generate_summary_report(self): """Generate a summary report of the test""" try: logger.info("\n" + "="*60) logger.info("ENHANCED ORDER FLOW ANALYSIS SUMMARY") logger.info("="*60) if not self.metrics_history: logger.warning("No metrics data collected during test") return # Group by symbol symbol_data = {} for entry in self.metrics_history: symbol = entry['symbol'] if symbol not in symbol_data: symbol_data[symbol] = [] symbol_data[symbol].append(entry) # Analyze each symbol for symbol, data in symbol_data.items(): logger.info(f"\n--- {symbol} Analysis ---") logger.info(f"Data points collected: {len(data)}") if len(data) > 0: # Calculate averages avg_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in data) / len(data) avg_institutional = sum(d['metrics']['institutional_retail']['institutional_ratio'] for d in data) / len(data) avg_intensity = sum(d['metrics']['flow_intensity']['current_intensity'] for d in data) / len(data) avg_impact = sum(d['metrics']['price_impact']['avg_impact'] for d in data) / len(data) logger.info(f"Average Aggressive Ratio: {avg_aggressive:.2%}") logger.info(f"Average Institutional Ratio: {avg_institutional:.2%}") logger.info(f"Average Flow Intensity: {avg_intensity:.2f}") logger.info(f"Average Price Impact: {avg_impact:.2f} bps") # Detect trends first_half = data[:len(data)//2] if len(data) > 1 else data second_half = data[len(data)//2:] if len(data) > 1 else data if len(first_half) > 0 and len(second_half) > 0: first_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in first_half) / len(first_half) second_aggressive = sum(d['metrics']['aggressive_passive']['aggressive_ratio'] for d in second_half) / len(second_half) trend = "increasing" if second_aggressive > first_aggressive else "decreasing" logger.info(f"Aggressive trading trend: {trend}") logger.info("\n" + "="*60) logger.info("Test completed successfully!") logger.info("Enhanced order flow analysis is working correctly.") logger.info("="*60) except Exception as e: logger.error(f"Error generating summary report: {e}") async def run_enhanced_order_flow_test(): """Run the complete enhanced order flow test""" tester = EnhancedOrderFlowTester() try: # Setup logger.info("Starting Enhanced Order Flow Integration Test") logger.info("This test will demonstrate:") logger.info("- Aggressive vs Passive participant analysis") logger.info("- Institutional vs Retail trade detection") logger.info("- Order flow intensity measurements") logger.info("- Price impact and liquidity consumption analysis") logger.info("- Block trade and iceberg order detection") logger.info("- Enhanced CNN and DQN feature extraction") if not await tester.setup_integration(): logger.error("Failed to setup integration") return False # Start streaming if not await tester.start_streaming(): logger.error("Failed to start streaming") return False # Wait for initial data logger.info("Waiting 30 seconds for initial data...") await asyncio.sleep(30) # Monitor order flow await tester.monitor_order_flow() # Generate report tester.generate_summary_report() return True except Exception as e: logger.error(f"Test failed: {e}") return False finally: # Cleanup try: await tester.stop_streaming() except Exception as e: logger.error(f"Error during cleanup: {e}") if __name__ == "__main__": try: # Run the test success = asyncio.run(run_enhanced_order_flow_test()) if success: print("\n✅ Enhanced Order Flow Integration Test PASSED") print("All enhanced order flow analysis features are working correctly!") else: print("\n❌ Enhanced Order Flow Integration Test FAILED") print("Check the logs for details.") except KeyboardInterrupt: print("\n⚠️ Test interrupted by user") except Exception as e: print(f"\n💥 Test crashed: {e}")