""" Real-Time Tick Processing Neural Network Module This module acts as a Neural Network DPS (Data Processing System) alternative, processing raw tick data with ultra-low latency and feeding processed features to trading models in real-time. Features: - Real-time tick ingestion with volume processing - Neural network feature extraction from tick streams - Ultra-low latency processing (sub-millisecond) - Volume-weighted price analysis - Microstructure pattern detection - Real-time feature streaming to models """ import asyncio import logging import time import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any, Deque from collections import deque from threading import Thread, Lock import websockets import json from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class TickData: """Raw tick data structure""" timestamp: datetime price: float volume: float side: str # 'buy' or 'sell' trade_id: Optional[str] = None @dataclass class ProcessedTickFeatures: """Processed tick features for model consumption""" timestamp: datetime price_features: np.ndarray # Price-based features volume_features: np.ndarray # Volume-based features microstructure_features: np.ndarray # Market microstructure features neural_features: np.ndarray # Neural network extracted features confidence: float # Feature quality confidence class TickProcessingNN(nn.Module): """ Neural Network for real-time tick processing Extracts high-level features from raw tick data """ def __init__(self, input_size: int = 9, hidden_size: int = 128, output_size: int = 64): super(TickProcessingNN, self).__init__() # Tick sequence processing layers self.tick_encoder = nn.Sequential( nn.Linear(input_size, hidden_size), nn.ReLU(), nn.Dropout(0.1), nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Dropout(0.1) ) # LSTM for temporal patterns self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, num_layers=2) # Attention mechanism for important tick selection self.attention = nn.MultiheadAttention(hidden_size, num_heads=8, batch_first=True) # Feature extraction heads self.price_head = nn.Linear(hidden_size, 16) # Price pattern features self.volume_head = nn.Linear(hidden_size, 16) # Volume pattern features self.microstructure_head = nn.Linear(hidden_size, 16) # Microstructure features # Final feature fusion self.feature_fusion = nn.Sequential( nn.Linear(48, output_size), # 16+16+16 = 48 nn.ReLU(), nn.Linear(output_size, output_size) ) # Confidence estimation self.confidence_head = nn.Sequential( nn.Linear(output_size, 32), nn.ReLU(), nn.Linear(32, 1), nn.Sigmoid() ) def forward(self, tick_sequence: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ Process tick sequence and extract features Args: tick_sequence: [batch, sequence_length, features] Returns: features: [batch, output_size] - extracted features confidence: [batch, 1] - feature confidence """ batch_size, seq_len, _ = tick_sequence.shape # Encode each tick encoded = self.tick_encoder(tick_sequence) # [batch, seq_len, hidden_size] # LSTM processing for temporal patterns lstm_out, _ = self.lstm(encoded) # [batch, seq_len, hidden_size] # Attention to focus on important ticks attended, _ = self.attention(lstm_out, lstm_out, lstm_out) # [batch, seq_len, hidden_size] # Use the last attended output final_features = attended[:, -1, :] # [batch, hidden_size] # Extract specialized features price_features = self.price_head(final_features) volume_features = self.volume_head(final_features) microstructure_features = self.microstructure_head(final_features) # Fuse all features combined_features = torch.cat([price_features, volume_features, microstructure_features], dim=1) final_features = self.feature_fusion(combined_features) # Estimate confidence confidence = self.confidence_head(final_features) return final_features, confidence class RealTimeTickProcessor: """ Real-time tick processing system with neural network feature extraction Acts as a DPS alternative for ultra-low latency tick processing """ def __init__(self, symbols: List[str] = None, tick_buffer_size: int = 1000): """Initialize the real-time tick processor""" self.symbols = symbols or ['ETH/USDT', 'BTC/USDT'] self.tick_buffer_size = tick_buffer_size # Tick storage buffers self.tick_buffers: Dict[str, Deque[TickData]] = {} self.processed_features: Dict[str, Deque[ProcessedTickFeatures]] = {} # Initialize buffers for each symbol for symbol in self.symbols: self.tick_buffers[symbol] = deque(maxlen=tick_buffer_size) self.processed_features[symbol] = deque(maxlen=100) # Keep last 100 processed features # Neural network for feature extraction self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.tick_nn = TickProcessingNN(input_size=9).to(self.device) self.tick_nn.eval() # Start in evaluation mode # Processing parameters self.processing_window = 50 # Number of ticks to process at once self.min_ticks_for_processing = 10 # Minimum ticks before processing # Real-time streaming self.streaming = False self.websocket_tasks = {} self.processing_threads = {} # Performance tracking self.processing_times = deque(maxlen=1000) self.tick_counts = {symbol: 0 for symbol in self.symbols} # Thread safety self.data_lock = Lock() # Feature subscribers (models that want real-time features) self.feature_subscribers = [] logger.info(f"RealTimeTickProcessor initialized for symbols: {self.symbols}") logger.info(f"Neural network device: {self.device}") logger.info(f"Tick buffer size: {tick_buffer_size}") def add_feature_subscriber(self, callback): """Add a callback function to receive processed features""" self.feature_subscribers.append(callback) logger.info(f"Added feature subscriber: {callback.__name__}") def remove_feature_subscriber(self, callback): """Remove a feature subscriber""" if callback in self.feature_subscribers: self.feature_subscribers.remove(callback) logger.info(f"Removed feature subscriber: {callback.__name__}") async def start_processing(self): """Start real-time tick processing""" logger.info("Starting real-time tick processing...") self.streaming = True # Start WebSocket streams for each symbol for symbol in self.symbols: task = asyncio.create_task(self._websocket_stream(symbol)) self.websocket_tasks[symbol] = task # Start processing thread for each symbol thread = Thread(target=self._processing_loop, args=(symbol,), daemon=True) thread.start() self.processing_threads[symbol] = thread logger.info("Real-time tick processing started") async def stop_processing(self): """Stop real-time tick processing""" logger.info("Stopping real-time tick processing...") self.streaming = False # Cancel WebSocket tasks for symbol, task in self.websocket_tasks.items(): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass self.websocket_tasks.clear() logger.info("Real-time tick processing stopped") async def _websocket_stream(self, symbol: str): """WebSocket stream for real-time tick data""" binance_symbol = symbol.replace('/', '').lower() url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@trade" while self.streaming: try: async with websockets.connect(url) as websocket: logger.info(f"Tick WebSocket connected for {symbol}") async for message in websocket: if not self.streaming: break try: data = json.loads(message) await self._process_raw_tick(symbol, data) except Exception as e: logger.warning(f"Error processing tick for {symbol}: {e}") except Exception as e: logger.error(f"WebSocket error for {symbol}: {e}") if self.streaming: logger.info(f"Reconnecting tick WebSocket for {symbol} in 2 seconds...") await asyncio.sleep(2) async def _process_raw_tick(self, symbol: str, raw_data: Dict): """Process raw tick data from WebSocket""" try: # Extract tick information tick = TickData( timestamp=datetime.fromtimestamp(int(raw_data['T']) / 1000), price=float(raw_data['p']), volume=float(raw_data['q']), side='buy' if raw_data['m'] == False else 'sell', # m=true means buyer is market maker (sell) trade_id=raw_data.get('t') ) # Add to buffer with self.data_lock: self.tick_buffers[symbol].append(tick) self.tick_counts[symbol] += 1 except Exception as e: logger.error(f"Error processing raw tick for {symbol}: {e}") def _processing_loop(self, symbol: str): """Main processing loop for a symbol""" logger.info(f"Starting processing loop for {symbol}") while self.streaming: try: # Check if we have enough ticks to process with self.data_lock: tick_count = len(self.tick_buffers[symbol]) if tick_count >= self.min_ticks_for_processing: start_time = time.time() # Process ticks features = self._extract_neural_features(symbol) if features is not None: # Store processed features with self.data_lock: self.processed_features[symbol].append(features) # Notify subscribers self._notify_feature_subscribers(symbol, features) # Track processing time processing_time = (time.time() - start_time) * 1000 # Convert to ms self.processing_times.append(processing_time) if len(self.processing_times) % 100 == 0: avg_time = np.mean(list(self.processing_times)) logger.info(f"Average processing time: {avg_time:.2f}ms") # Small sleep to prevent CPU overload time.sleep(0.001) # 1ms sleep for ultra-low latency except Exception as e: logger.error(f"Error in processing loop for {symbol}: {e}") time.sleep(0.01) # Longer sleep on error def _extract_neural_features(self, symbol: str) -> Optional[ProcessedTickFeatures]: """Extract neural network features from recent ticks""" try: with self.data_lock: # Get recent ticks recent_ticks = list(self.tick_buffers[symbol])[-self.processing_window:] if len(recent_ticks) < self.min_ticks_for_processing: return None # Convert ticks to neural network input tick_features = self._ticks_to_features(recent_ticks) # Process with neural network with torch.no_grad(): tick_tensor = torch.FloatTensor(tick_features).unsqueeze(0).to(self.device) neural_features, confidence = self.tick_nn(tick_tensor) neural_features = neural_features.cpu().numpy().flatten() confidence = confidence.cpu().numpy().item() # Extract traditional features price_features = self._extract_price_features(recent_ticks) volume_features = self._extract_volume_features(recent_ticks) microstructure_features = self._extract_microstructure_features(recent_ticks) # Create processed features object processed = ProcessedTickFeatures( timestamp=recent_ticks[-1].timestamp, price_features=price_features, volume_features=volume_features, microstructure_features=microstructure_features, neural_features=neural_features, confidence=confidence ) return processed except Exception as e: logger.error(f"Error extracting neural features for {symbol}: {e}") return None def _ticks_to_features(self, ticks: List[TickData]) -> np.ndarray: """Convert tick data to neural network input features""" features = [] for i, tick in enumerate(ticks): tick_features = [ tick.price, tick.volume, 1.0 if tick.side == 'buy' else 0.0, # Buy/sell indicator tick.timestamp.timestamp(), # Timestamp ] # Add relative features if we have previous ticks if i > 0: prev_tick = ticks[i-1] price_change = (tick.price - prev_tick.price) / prev_tick.price volume_ratio = tick.volume / (prev_tick.volume + 1e-8) time_delta = (tick.timestamp - prev_tick.timestamp).total_seconds() tick_features.extend([ price_change, volume_ratio, time_delta ]) else: tick_features.extend([0.0, 1.0, 0.0]) # Default values for first tick # Add moving averages if we have enough data if i >= 5: recent_prices = [t.price for t in ticks[max(0, i-4):i+1]] recent_volumes = [t.volume for t in ticks[max(0, i-4):i+1]] price_ma = np.mean(recent_prices) volume_ma = np.mean(recent_volumes) tick_features.extend([ (tick.price - price_ma) / price_ma, # Price deviation from MA (tick.volume - volume_ma) / (volume_ma + 1e-8) # Volume deviation from MA ]) else: tick_features.extend([0.0, 0.0]) features.append(tick_features) # Pad or truncate to fixed size target_length = self.processing_window if len(features) < target_length: # Pad with zeros padding = [[0.0] * len(features[0])] * (target_length - len(features)) features = padding + features elif len(features) > target_length: # Take the most recent ticks features = features[-target_length:] return np.array(features, dtype=np.float32) def _extract_price_features(self, ticks: List[TickData]) -> np.ndarray: """Extract price-based features""" prices = np.array([tick.price for tick in ticks]) features = [ prices[-1], # Current price np.mean(prices), # Average price np.std(prices), # Price volatility np.max(prices), # High np.min(prices), # Low (prices[-1] - prices[0]) / prices[0] if prices[0] != 0 else 0, # Total return ] # Price momentum features if len(prices) >= 10: short_ma = np.mean(prices[-5:]) long_ma = np.mean(prices[-10:]) momentum = (short_ma - long_ma) / long_ma if long_ma != 0 else 0 features.append(momentum) else: features.append(0.0) return np.array(features, dtype=np.float32) def _extract_volume_features(self, ticks: List[TickData]) -> np.ndarray: """Extract volume-based features""" volumes = np.array([tick.volume for tick in ticks]) buy_volumes = np.array([tick.volume for tick in ticks if tick.side == 'buy']) sell_volumes = np.array([tick.volume for tick in ticks if tick.side == 'sell']) features = [ np.sum(volumes), # Total volume np.mean(volumes), # Average volume np.std(volumes), # Volume volatility np.sum(buy_volumes) if len(buy_volumes) > 0 else 0, # Buy volume np.sum(sell_volumes) if len(sell_volumes) > 0 else 0, # Sell volume ] # Volume imbalance total_buy = np.sum(buy_volumes) if len(buy_volumes) > 0 else 0 total_sell = np.sum(sell_volumes) if len(sell_volumes) > 0 else 0 total_volume = total_buy + total_sell if total_volume > 0: buy_ratio = total_buy / total_volume volume_imbalance = buy_ratio - 0.5 # -0.5 to 0.5 range else: volume_imbalance = 0.0 features.append(volume_imbalance) # VWAP (Volume Weighted Average Price) if np.sum(volumes) > 0: prices = np.array([tick.price for tick in ticks]) vwap = np.sum(prices * volumes) / np.sum(volumes) current_price = ticks[-1].price vwap_deviation = (current_price - vwap) / vwap if vwap != 0 else 0 else: vwap_deviation = 0.0 features.append(vwap_deviation) return np.array(features, dtype=np.float32) def _extract_microstructure_features(self, ticks: List[TickData]) -> np.ndarray: """Extract market microstructure features""" features = [] # Trade frequency if len(ticks) >= 2: time_deltas = [(ticks[i].timestamp - ticks[i-1].timestamp).total_seconds() for i in range(1, len(ticks))] avg_time_delta = np.mean(time_deltas) trade_frequency = 1.0 / avg_time_delta if avg_time_delta > 0 else 0 else: trade_frequency = 0.0 features.append(trade_frequency) # Price impact features prices = [tick.price for tick in ticks] volumes = [tick.volume for tick in ticks] if len(prices) >= 3: # Calculate price changes and corresponding volumes price_changes = [(prices[i] - prices[i-1]) / prices[i-1] for i in range(1, len(prices)) if prices[i-1] != 0] corresponding_volumes = volumes[1:len(price_changes)+1] if len(price_changes) > 0 and len(corresponding_volumes) > 0: # Simple price impact measure price_impact = np.corrcoef(np.abs(price_changes), corresponding_volumes)[0, 1] if np.isnan(price_impact): price_impact = 0.0 else: price_impact = 0.0 else: price_impact = 0.0 features.append(price_impact) # Bid-ask spread proxy (using price volatility) if len(prices) >= 5: recent_prices = prices[-5:] spread_proxy = (np.max(recent_prices) - np.min(recent_prices)) / np.mean(recent_prices) else: spread_proxy = 0.0 features.append(spread_proxy) # Order flow imbalance (already calculated in volume features, but different perspective) buy_count = sum(1 for tick in ticks if tick.side == 'buy') sell_count = len(ticks) - buy_count total_trades = len(ticks) if total_trades > 0: order_flow_imbalance = (buy_count - sell_count) / total_trades else: order_flow_imbalance = 0.0 features.append(order_flow_imbalance) return np.array(features, dtype=np.float32) def _notify_feature_subscribers(self, symbol: str, features: ProcessedTickFeatures): """Notify all feature subscribers of new processed features""" for callback in self.feature_subscribers: try: callback(symbol, features) except Exception as e: logger.error(f"Error notifying feature subscriber {callback.__name__}: {e}") def get_latest_features(self, symbol: str) -> Optional[ProcessedTickFeatures]: """Get the latest processed features for a symbol""" with self.data_lock: if symbol in self.processed_features and self.processed_features[symbol]: return self.processed_features[symbol][-1] return None def get_processing_stats(self) -> Dict[str, Any]: """Get processing performance statistics""" stats = { 'symbols': self.symbols, 'streaming': self.streaming, 'tick_counts': dict(self.tick_counts), 'buffer_sizes': {symbol: len(self.tick_buffers[symbol]) for symbol in self.symbols}, 'feature_counts': {symbol: len(self.processed_features[symbol]) for symbol in self.symbols}, 'subscribers': len(self.feature_subscribers) } if self.processing_times: stats['processing_performance'] = { 'avg_time_ms': np.mean(list(self.processing_times)), 'min_time_ms': np.min(list(self.processing_times)), 'max_time_ms': np.max(list(self.processing_times)), 'std_time_ms': np.std(list(self.processing_times)) } return stats def train_neural_network(self, training_data: List[Tuple[np.ndarray, np.ndarray]], epochs: int = 100): """Train the tick processing neural network""" logger.info("Training tick processing neural network...") self.tick_nn.train() optimizer = torch.optim.Adam(self.tick_nn.parameters(), lr=0.001) criterion = nn.MSELoss() for epoch in range(epochs): total_loss = 0.0 for batch_features, batch_targets in training_data: optimizer.zero_grad() # Convert to tensors features_tensor = torch.FloatTensor(batch_features).to(self.device) targets_tensor = torch.FloatTensor(batch_targets).to(self.device) # Forward pass outputs, confidence = self.tick_nn(features_tensor) # Calculate loss loss = criterion(outputs, targets_tensor) # Backward pass loss.backward() optimizer.step() total_loss += loss.item() if epoch % 10 == 0: avg_loss = total_loss / len(training_data) logger.info(f"Epoch {epoch}/{epochs}, Average Loss: {avg_loss:.6f}") self.tick_nn.eval() logger.info("Neural network training completed") # Integration with existing orchestrator def integrate_with_orchestrator(orchestrator, tick_processor: RealTimeTickProcessor): """Integrate tick processor with enhanced orchestrator""" def feature_callback(symbol: str, features: ProcessedTickFeatures): """Callback to feed processed features to orchestrator""" try: # Convert processed features to format expected by orchestrator feature_dict = { 'symbol': symbol, 'timestamp': features.timestamp, 'neural_features': features.neural_features, 'price_features': features.price_features, 'volume_features': features.volume_features, 'microstructure_features': features.microstructure_features, 'confidence': features.confidence } # Feed to orchestrator's real-time feature processing if hasattr(orchestrator, 'process_realtime_features'): orchestrator.process_realtime_features(feature_dict) except Exception as e: logger.error(f"Error integrating features with orchestrator: {e}") # Add the callback to tick processor tick_processor.add_feature_subscriber(feature_callback) logger.info("Tick processor integrated with orchestrator") # Factory function for easy creation def create_realtime_tick_processor(symbols: List[str] = None) -> RealTimeTickProcessor: """Create and configure a real-time tick processor""" if symbols is None: symbols = ['ETH/USDT', 'BTC/USDT'] processor = RealTimeTickProcessor(symbols=symbols) logger.info(f"Created RealTimeTickProcessor for symbols: {symbols}") return processor