#!/usr/bin/env python3 """ Simple Real-Time Tick Processor Test This script tests the core Neural Network functionality of the Real-Time Tick Processing Module without requiring live WebSocket connections. """ import logging import sys import numpy as np from pathlib import Path from datetime import datetime # Add project root to path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from core.realtime_tick_processor import ( RealTimeTickProcessor, ProcessedTickFeatures, TickData, create_realtime_tick_processor ) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def test_neural_network_functionality(): """Test the neural network processing without WebSocket connections""" logger.info("="*80) logger.info("๐Ÿงช TESTING NEURAL NETWORK TICK PROCESSING") logger.info("="*80) try: # Test 1: Create tick processor logger.info("\n๐Ÿ“Š TEST 1: Creating Real-Time Tick Processor") logger.info("-" * 40) symbols = ['ETH/USDT', 'BTC/USDT'] tick_processor = create_realtime_tick_processor(symbols) logger.info("โœ… Tick processor created successfully") logger.info(f" Symbols: {tick_processor.symbols}") logger.info(f" Device: {tick_processor.device}") logger.info(f" Neural network input size: 9") # Test 2: Generate mock tick data logger.info("\n๐Ÿ“ˆ TEST 2: Generating Mock Tick Data") logger.info("-" * 40) # Create realistic mock tick data mock_ticks = [] base_price = 3500.0 # ETH price base_time = datetime.now() for i in range(50): # Generate 50 ticks # Simulate price movement price_change = np.random.normal(0, 0.001) # Small random changes price = base_price * (1 + price_change) # Simulate volume volume = np.random.exponential(0.1) # Exponential distribution for volume # Random buy/sell side = 'buy' if np.random.random() > 0.5 else 'sell' tick = TickData( timestamp=base_time, price=price, volume=volume, side=side, trade_id=f"trade_{i}" ) mock_ticks.append(tick) base_price = price # Update base price for next tick logger.info(f"โœ… Generated {len(mock_ticks)} mock ticks") logger.info(f" Price range: {min(t.price for t in mock_ticks):.2f} - {max(t.price for t in mock_ticks):.2f}") logger.info(f" Volume range: {min(t.volume for t in mock_ticks):.4f} - {max(t.volume for t in mock_ticks):.4f}") # Test 3: Add ticks to processor buffer logger.info("\n๐Ÿ’พ TEST 3: Adding Ticks to Processor Buffer") logger.info("-" * 40) symbol = 'ETH/USDT' with tick_processor.data_lock: for tick in mock_ticks: tick_processor.tick_buffers[symbol].append(tick) buffer_size = len(tick_processor.tick_buffers[symbol]) logger.info(f"โœ… Added ticks to buffer: {buffer_size} ticks") # Test 4: Extract neural features logger.info("\n๐Ÿง  TEST 4: Neural Network Feature Extraction") logger.info("-" * 40) features = tick_processor._extract_neural_features(symbol) if features is not None: logger.info("โœ… Neural features extracted successfully") logger.info(f" Timestamp: {features.timestamp}") logger.info(f" Confidence: {features.confidence:.3f}") logger.info(f" Neural features shape: {features.neural_features.shape}") logger.info(f" Price features shape: {features.price_features.shape}") logger.info(f" Volume features shape: {features.volume_features.shape}") logger.info(f" Microstructure features shape: {features.microstructure_features.shape}") # Show sample values logger.info(f" Neural features sample: {features.neural_features[:5]}") logger.info(f" Price features sample: {features.price_features[:3]}") logger.info(f" Volume features sample: {features.volume_features[:3]}") else: logger.error("โŒ Failed to extract neural features") return False # Test 5: Test feature conversion methods logger.info("\n๐Ÿ”ง TEST 5: Feature Conversion Methods") logger.info("-" * 40) # Test tick-to-features conversion tick_features = tick_processor._ticks_to_features(mock_ticks) logger.info(f"โœ… Tick features converted: shape {tick_features.shape}") logger.info(f" Expected shape: ({tick_processor.processing_window}, 9)") # Test individual feature extraction price_features = tick_processor._extract_price_features(mock_ticks) volume_features = tick_processor._extract_volume_features(mock_ticks) microstructure_features = tick_processor._extract_microstructure_features(mock_ticks) logger.info(f"โœ… Price features: {len(price_features)} features") logger.info(f"โœ… Volume features: {len(volume_features)} features") logger.info(f"โœ… Microstructure features: {len(microstructure_features)} features") # Test 6: Neural network forward pass logger.info("\nโšก TEST 6: Neural Network Forward Pass") logger.info("-" * 40) import torch # Test direct neural network inference tick_tensor = torch.FloatTensor(tick_features).unsqueeze(0).to(tick_processor.device) with torch.no_grad(): neural_features, confidence = tick_processor.tick_nn(tick_tensor) logger.info("โœ… Neural network forward pass successful") logger.info(f" Input shape: {tick_tensor.shape}") logger.info(f" Output features shape: {neural_features.shape}") logger.info(f" Confidence shape: {confidence.shape}") logger.info(f" Confidence value: {confidence.item():.3f}") return True except Exception as e: logger.error(f"โŒ Neural network test failed: {e}") import traceback logger.error(traceback.format_exc()) return False def test_dqn_integration(): """Test DQN integration with real-time tick features""" logger.info("\n๐Ÿค– TESTING DQN INTEGRATION WITH TICK FEATURES") logger.info("-" * 50) try: from NN.models.dqn_agent import DQNAgent import numpy as np # Create DQN agent state_shape = (3, 5) # 3 timeframes, 5 features dqn = DQNAgent(state_shape=state_shape, n_actions=3) logger.info("โœ… DQN agent created") logger.info(f" State shape: {state_shape}") logger.info(f" Actions: {dqn.n_actions}") logger.info(f" Device: {dqn.device}") logger.info(f" Tick feature weight: {dqn.tick_feature_weight}") # Test state enhancement test_state = np.random.rand(3, 5) logger.info(f" Test state shape: {test_state.shape}") # Simulate realistic tick features mock_tick_features = { 'neural_features': np.random.rand(64) * 0.1, # Small neural features 'volume_features': np.array([1.2, 0.8, 0.15, 850.5, 720.3, 0.05, 0.02]), # Realistic volume features 'microstructure_features': np.array([12.5, 0.3, 0.001, 0.1]), # Realistic microstructure 'confidence': 0.85 } # Update DQN with tick features dqn.update_realtime_tick_features(mock_tick_features) logger.info("โœ… DQN updated with mock tick features") # Test enhanced action selection action_with_ticks = dqn.act(test_state, explore=False) logger.info(f"โœ… DQN action with tick features: {action_with_ticks}") # Test without tick features dqn.realtime_tick_features = None action_without_ticks = dqn.act(test_state, explore=False) logger.info(f"โœ… DQN action without tick features: {action_without_ticks}") # Test state enhancement method directly dqn.realtime_tick_features = mock_tick_features enhanced_state = dqn._enhance_state_with_tick_features(test_state) logger.info(f"โœ… State enhancement test:") logger.info(f" Original state shape: {test_state.shape}") logger.info(f" Enhanced state shape: {enhanced_state.shape}") logger.info("โœ… DQN integration test completed successfully") return True except Exception as e: logger.error(f"โŒ DQN integration test failed: {e}") import traceback logger.error(traceback.format_exc()) return False def test_performance_metrics(): """Test performance and statistics functionality""" logger.info("\n๐Ÿ“Š TESTING PERFORMANCE METRICS") logger.info("-" * 40) try: tick_processor = create_realtime_tick_processor(['ETH/USDT']) # Test stats without processing stats = tick_processor.get_processing_stats() logger.info("โœ… Basic stats retrieved") logger.info(f" Symbols: {stats['symbols']}") logger.info(f" Streaming: {stats['streaming']}") logger.info(f" Tick counts: {stats['tick_counts']}") logger.info(f" Buffer sizes: {stats['buffer_sizes']}") logger.info(f" Subscribers: {stats['subscribers']}") # Test feature subscriber received_features = [] def test_callback(symbol: str, features: ProcessedTickFeatures): received_features.append((symbol, features)) tick_processor.add_feature_subscriber(test_callback) logger.info("โœ… Feature subscriber added") # Test subscriber removal tick_processor.remove_feature_subscriber(test_callback) logger.info("โœ… Feature subscriber removed") return True except Exception as e: logger.error(f"โŒ Performance metrics test failed: {e}") return False def main(): """Main test function""" logger.info("๐Ÿš€ Starting Simple Real-Time Tick Processor Tests...") # Test neural network functionality nn_success = test_neural_network_functionality() # Test DQN integration dqn_success = test_dqn_integration() # Test performance metrics perf_success = test_performance_metrics() # Summary logger.info("\n" + "="*80) logger.info("๐ŸŽ‰ SIMPLE TICK PROCESSOR TEST SUMMARY") logger.info("="*80) if nn_success and dqn_success and perf_success: logger.info("โœ… ALL TESTS PASSED!") logger.info("") logger.info("๐Ÿ“‹ VERIFIED FUNCTIONALITY:") logger.info(" โœ“ Neural network tick processing") logger.info(" โœ“ Feature extraction (price, volume, microstructure)") logger.info(" โœ“ DQN integration with tick features") logger.info(" โœ“ State enhancement for RL models") logger.info(" โœ“ Performance monitoring") logger.info("") logger.info("๐ŸŽฏ NEURAL DPS ALTERNATIVE READY:") logger.info(" โ€ข Real-time tick processing โœ“") logger.info(" โ€ข Volume-weighted analysis โœ“") logger.info(" โ€ข Neural feature extraction โœ“") logger.info(" โ€ข Model integration ready โœ“") logger.info("") logger.info("๐Ÿš€ Your Neural DPS alternative is working correctly!") logger.info(" The system can now process real-time tick data with volume") logger.info(" information and feed enhanced features to your DQN models.") else: logger.error("โŒ SOME TESTS FAILED!") logger.error(f" Neural Network: {'โœ…' if nn_success else 'โŒ'}") logger.error(f" DQN Integration: {'โœ…' if dqn_success else 'โŒ'}") logger.error(f" Performance: {'โœ…' if perf_success else 'โŒ'}") sys.exit(1) logger.info("="*80) if __name__ == "__main__": main()