Files
gogo2/tests/test_multi_exchange_cob.py
2025-06-25 11:42:12 +03:00

327 lines
14 KiB
Python

"""
Test Multi-Exchange Consolidated Order Book (COB) Provider
This script demonstrates the functionality of the new multi-exchange COB data provider:
1. Real-time order book aggregation from multiple exchanges
2. Fine-grain price bucket generation
3. CNN/DQN feature generation
4. Dashboard integration
5. Market analysis and signal generation
Run this to test the COB provider with live data streams.
"""
import asyncio
import logging
import time
from datetime import datetime
from core.multi_exchange_cob_provider import MultiExchangeCOBProvider
from core.cob_integration import COBIntegration
from core.data_provider import DataProvider
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class COBTester:
"""Test harness for Multi-Exchange COB Provider"""
def __init__(self):
self.symbols = ['BTC/USDT', 'ETH/USDT']
self.data_provider = None
self.cob_integration = None
self.test_duration = 300 # 5 minutes
# Statistics tracking
self.stats = {
'cob_updates_received': 0,
'bucket_updates_received': 0,
'cnn_features_generated': 0,
'dqn_features_generated': 0,
'signals_generated': 0,
'start_time': None
}
async def run_test(self):
"""Run comprehensive COB provider test"""
logger.info("Starting Multi-Exchange COB Provider Test")
logger.info(f"Testing symbols: {self.symbols}")
logger.info(f"Test duration: {self.test_duration} seconds")
try:
# Initialize components
await self._initialize_components()
# Run test scenarios
await self._run_basic_functionality_test()
await self._run_feature_generation_test()
await self._run_dashboard_integration_test()
await self._run_signal_analysis_test()
# Monitor for specified duration
await self._monitor_live_data()
# Generate final report
self._generate_test_report()
except Exception as e:
logger.error(f"Test failed: {e}")
finally:
await self._cleanup()
async def _initialize_components(self):
"""Initialize COB provider and integration components"""
logger.info("Initializing COB components...")
# Create data provider (optional - for integration testing)
self.data_provider = DataProvider(symbols=self.symbols)
# Create COB integration
self.cob_integration = COBIntegration(
data_provider=self.data_provider,
symbols=self.symbols
)
# Register test callbacks
self.cob_integration.add_cnn_callback(self._cnn_callback)
self.cob_integration.add_dqn_callback(self._dqn_callback)
self.cob_integration.add_dashboard_callback(self._dashboard_callback)
# Start COB integration
await self.cob_integration.start()
# Allow time for connections
await asyncio.sleep(5)
self.stats['start_time'] = datetime.now()
logger.info("COB components initialized successfully")
async def _run_basic_functionality_test(self):
"""Test basic COB provider functionality"""
logger.info("Testing basic COB functionality...")
# Wait for order book data
await asyncio.sleep(10)
for symbol in self.symbols:
# Test consolidated order book retrieval
cob_snapshot = self.cob_integration.get_cob_snapshot(symbol)
if cob_snapshot:
logger.info(f"{symbol} COB Status:")
logger.info(f" Exchanges active: {cob_snapshot.exchanges_active}")
logger.info(f" Volume weighted mid: ${cob_snapshot.volume_weighted_mid:.2f}")
logger.info(f" Spread: {cob_snapshot.spread_bps:.2f} bps")
logger.info(f" Bid liquidity: ${cob_snapshot.total_bid_liquidity:,.0f}")
logger.info(f" Ask liquidity: ${cob_snapshot.total_ask_liquidity:,.0f}")
logger.info(f" Liquidity imbalance: {cob_snapshot.liquidity_imbalance:.3f}")
# Test price buckets
price_buckets = self.cob_integration.get_price_buckets(symbol)
if price_buckets:
bid_buckets = len(price_buckets.get('bids', {}))
ask_buckets = len(price_buckets.get('asks', {}))
logger.info(f" Price buckets: {bid_buckets} bids, {ask_buckets} asks")
# Test exchange breakdown
exchange_breakdown = self.cob_integration.get_exchange_breakdown(symbol)
if exchange_breakdown:
logger.info(f" Exchange breakdown:")
for exchange, data in exchange_breakdown.items():
market_share = data.get('market_share', 0) * 100
logger.info(f" {exchange}: {market_share:.1f}% market share")
else:
logger.warning(f"No COB data available for {symbol}")
logger.info("Basic functionality test completed")
async def _run_feature_generation_test(self):
"""Test CNN and DQN feature generation"""
logger.info("Testing feature generation...")
for symbol in self.symbols:
# Test CNN features
cnn_features = self.cob_integration.get_cob_features(symbol)
if cnn_features is not None:
logger.info(f"{symbol} CNN features: shape={cnn_features.shape}, "
f"min={cnn_features.min():.4f}, max={cnn_features.max():.4f}")
else:
logger.warning(f"No CNN features available for {symbol}")
# Test market depth analysis
depth_analysis = self.cob_integration.get_market_depth_analysis(symbol)
if depth_analysis:
logger.info(f"{symbol} Market Depth Analysis:")
logger.info(f" Depth levels: {depth_analysis['depth_analysis']['bid_levels']} bids, "
f"{depth_analysis['depth_analysis']['ask_levels']} asks")
dominant_exchanges = depth_analysis['depth_analysis'].get('dominant_exchanges', {})
logger.info(f" Dominant exchanges: {dominant_exchanges}")
logger.info("Feature generation test completed")
async def _run_dashboard_integration_test(self):
"""Test dashboard data generation"""
logger.info("Testing dashboard integration...")
# Dashboard integration is tested via callbacks
# Statistics are tracked in the callback functions
await asyncio.sleep(5)
logger.info("Dashboard integration test completed")
async def _run_signal_analysis_test(self):
"""Test signal generation and analysis"""
logger.info("Testing signal analysis...")
for symbol in self.symbols:
# Get recent signals
recent_signals = self.cob_integration.get_recent_signals(symbol, count=10)
logger.info(f"{symbol} recent signals: {len(recent_signals)} generated")
for signal in recent_signals[-3:]: # Show last 3 signals
logger.info(f" Signal: {signal.get('type')} - {signal.get('side')} - "
f"Confidence: {signal.get('confidence', 0):.3f}")
logger.info("Signal analysis test completed")
async def _monitor_live_data(self):
"""Monitor live data for the specified duration"""
logger.info(f"Monitoring live data for {self.test_duration} seconds...")
start_time = time.time()
last_stats_time = start_time
while time.time() - start_time < self.test_duration:
# Print periodic statistics
current_time = time.time()
if current_time - last_stats_time >= 30: # Every 30 seconds
self._print_periodic_stats()
last_stats_time = current_time
await asyncio.sleep(1)
logger.info("Live data monitoring completed")
def _print_periodic_stats(self):
"""Print periodic statistics during monitoring"""
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
logger.info("Periodic Statistics:")
logger.info(f" Elapsed time: {elapsed:.0f} seconds")
logger.info(f" COB updates: {self.stats['cob_updates_received']}")
logger.info(f" Bucket updates: {self.stats['bucket_updates_received']}")
logger.info(f" CNN features: {self.stats['cnn_features_generated']}")
logger.info(f" DQN features: {self.stats['dqn_features_generated']}")
logger.info(f" Signals: {self.stats['signals_generated']}")
# Calculate rates
if elapsed > 0:
cob_rate = self.stats['cob_updates_received'] / elapsed
logger.info(f" COB update rate: {cob_rate:.2f}/sec")
def _generate_test_report(self):
"""Generate final test report"""
elapsed = (datetime.now() - self.stats['start_time']).total_seconds()
logger.info("=" * 60)
logger.info("MULTI-EXCHANGE COB PROVIDER TEST REPORT")
logger.info("=" * 60)
logger.info(f"Test Duration: {elapsed:.0f} seconds")
logger.info(f"Symbols Tested: {', '.join(self.symbols)}")
logger.info("")
# Data Reception Statistics
logger.info("Data Reception:")
logger.info(f" COB Updates Received: {self.stats['cob_updates_received']}")
logger.info(f" Bucket Updates Received: {self.stats['bucket_updates_received']}")
logger.info(f" Average COB Rate: {self.stats['cob_updates_received'] / elapsed:.2f}/sec")
logger.info("")
# Feature Generation Statistics
logger.info("Feature Generation:")
logger.info(f" CNN Features Generated: {self.stats['cnn_features_generated']}")
logger.info(f" DQN Features Generated: {self.stats['dqn_features_generated']}")
logger.info("")
# Signal Generation Statistics
logger.info("Signal Analysis:")
logger.info(f" Signals Generated: {self.stats['signals_generated']}")
logger.info("")
# Component Statistics
cob_stats = self.cob_integration.get_statistics()
logger.info("Component Statistics:")
logger.info(f" Active Exchanges: {', '.join(cob_stats.get('active_exchanges', []))}")
logger.info(f" Streaming Status: {cob_stats.get('is_streaming', False)}")
logger.info(f" Bucket Size: {cob_stats.get('bucket_size_bps', 0)} bps")
logger.info(f" Average Processing Time: {cob_stats.get('avg_processing_time_ms', 0):.2f} ms")
logger.info("")
# Per-Symbol Analysis
logger.info("Per-Symbol Analysis:")
for symbol in self.symbols:
cob_snapshot = self.cob_integration.get_cob_snapshot(symbol)
if cob_snapshot:
logger.info(f" {symbol}:")
logger.info(f" Active Exchanges: {len(cob_snapshot.exchanges_active)}")
logger.info(f" Spread: {cob_snapshot.spread_bps:.2f} bps")
logger.info(f" Total Liquidity: ${(cob_snapshot.total_bid_liquidity + cob_snapshot.total_ask_liquidity):,.0f}")
recent_signals = self.cob_integration.get_recent_signals(symbol)
logger.info(f" Signals Generated: {len(recent_signals)}")
logger.info("=" * 60)
logger.info("Test completed successfully!")
async def _cleanup(self):
"""Cleanup resources"""
logger.info("Cleaning up resources...")
if self.cob_integration:
await self.cob_integration.stop()
if self.data_provider and hasattr(self.data_provider, 'stop_real_time_streaming'):
await self.data_provider.stop_real_time_streaming()
logger.info("Cleanup completed")
# Callback functions for testing
def _cnn_callback(self, symbol: str, data: dict):
"""CNN feature callback for testing"""
self.stats['cnn_features_generated'] += 1
if self.stats['cnn_features_generated'] % 100 == 0:
logger.debug(f"CNN features generated: {self.stats['cnn_features_generated']}")
def _dqn_callback(self, symbol: str, data: dict):
"""DQN feature callback for testing"""
self.stats['dqn_features_generated'] += 1
if self.stats['dqn_features_generated'] % 100 == 0:
logger.debug(f"DQN features generated: {self.stats['dqn_features_generated']}")
def _dashboard_callback(self, symbol: str, data: dict):
"""Dashboard data callback for testing"""
self.stats['cob_updates_received'] += 1
# Check for signals in dashboard data
signals = data.get('recent_signals', [])
self.stats['signals_generated'] += len(signals)
async def main():
"""Main test function"""
logger.info("Multi-Exchange COB Provider Test Starting...")
try:
tester = COBTester()
await tester.run_test()
except KeyboardInterrupt:
logger.info("Test interrupted by user")
except Exception as e:
logger.error(f"Test failed with error: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())