gogo2/core/realtime_tick_processor.py
2025-05-26 16:02:40 +03:00

649 lines
26 KiB
Python

"""
Real-Time Tick Processing Neural Network Module
This module acts as a Neural Network DPS (Data Processing System) alternative,
processing raw tick data with ultra-low latency and feeding processed features
to trading models in real-time.
Features:
- Real-time tick ingestion with volume processing
- Neural network feature extraction from tick streams
- Ultra-low latency processing (sub-millisecond)
- Volume-weighted price analysis
- Microstructure pattern detection
- Real-time feature streaming to models
"""
import asyncio
import logging
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any, Deque
from collections import deque
from threading import Thread, Lock
import websockets
import json
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class TickData:
"""Raw tick data structure"""
timestamp: datetime
price: float
volume: float
side: str # 'buy' or 'sell'
trade_id: Optional[str] = None
@dataclass
class ProcessedTickFeatures:
"""Processed tick features for model consumption"""
timestamp: datetime
price_features: np.ndarray # Price-based features
volume_features: np.ndarray # Volume-based features
microstructure_features: np.ndarray # Market microstructure features
neural_features: np.ndarray # Neural network extracted features
confidence: float # Feature quality confidence
class TickProcessingNN(nn.Module):
"""
Neural Network for real-time tick processing
Extracts high-level features from raw tick data
"""
def __init__(self, input_size: int = 9, hidden_size: int = 128, output_size: int = 64):
super(TickProcessingNN, self).__init__()
# Tick sequence processing layers
self.tick_encoder = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1)
)
# LSTM for temporal patterns
self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, num_layers=2)
# Attention mechanism for important tick selection
self.attention = nn.MultiheadAttention(hidden_size, num_heads=8, batch_first=True)
# Feature extraction heads
self.price_head = nn.Linear(hidden_size, 16) # Price pattern features
self.volume_head = nn.Linear(hidden_size, 16) # Volume pattern features
self.microstructure_head = nn.Linear(hidden_size, 16) # Microstructure features
# Final feature fusion
self.feature_fusion = nn.Sequential(
nn.Linear(48, output_size), # 16+16+16 = 48
nn.ReLU(),
nn.Linear(output_size, output_size)
)
# Confidence estimation
self.confidence_head = nn.Sequential(
nn.Linear(output_size, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, tick_sequence: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Process tick sequence and extract features
Args:
tick_sequence: [batch, sequence_length, features]
Returns:
features: [batch, output_size] - extracted features
confidence: [batch, 1] - feature confidence
"""
batch_size, seq_len, _ = tick_sequence.shape
# Encode each tick
encoded = self.tick_encoder(tick_sequence) # [batch, seq_len, hidden_size]
# LSTM processing for temporal patterns
lstm_out, _ = self.lstm(encoded) # [batch, seq_len, hidden_size]
# Attention to focus on important ticks
attended, _ = self.attention(lstm_out, lstm_out, lstm_out) # [batch, seq_len, hidden_size]
# Use the last attended output
final_features = attended[:, -1, :] # [batch, hidden_size]
# Extract specialized features
price_features = self.price_head(final_features)
volume_features = self.volume_head(final_features)
microstructure_features = self.microstructure_head(final_features)
# Fuse all features
combined_features = torch.cat([price_features, volume_features, microstructure_features], dim=1)
final_features = self.feature_fusion(combined_features)
# Estimate confidence
confidence = self.confidence_head(final_features)
return final_features, confidence
class RealTimeTickProcessor:
"""
Real-time tick processing system with neural network feature extraction
Acts as a DPS alternative for ultra-low latency tick processing
"""
def __init__(self, symbols: List[str] = None, tick_buffer_size: int = 1000):
"""Initialize the real-time tick processor"""
self.symbols = symbols or ['ETH/USDT', 'BTC/USDT']
self.tick_buffer_size = tick_buffer_size
# Tick storage buffers
self.tick_buffers: Dict[str, Deque[TickData]] = {}
self.processed_features: Dict[str, Deque[ProcessedTickFeatures]] = {}
# Initialize buffers for each symbol
for symbol in self.symbols:
self.tick_buffers[symbol] = deque(maxlen=tick_buffer_size)
self.processed_features[symbol] = deque(maxlen=100) # Keep last 100 processed features
# Neural network for feature extraction
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.tick_nn = TickProcessingNN(input_size=9).to(self.device)
self.tick_nn.eval() # Start in evaluation mode
# Processing parameters
self.processing_window = 50 # Number of ticks to process at once
self.min_ticks_for_processing = 10 # Minimum ticks before processing
# Real-time streaming
self.streaming = False
self.websocket_tasks = {}
self.processing_threads = {}
# Performance tracking
self.processing_times = deque(maxlen=1000)
self.tick_counts = {symbol: 0 for symbol in self.symbols}
# Thread safety
self.data_lock = Lock()
# Feature subscribers (models that want real-time features)
self.feature_subscribers = []
logger.info(f"RealTimeTickProcessor initialized for symbols: {self.symbols}")
logger.info(f"Neural network device: {self.device}")
logger.info(f"Tick buffer size: {tick_buffer_size}")
def add_feature_subscriber(self, callback):
"""Add a callback function to receive processed features"""
self.feature_subscribers.append(callback)
logger.info(f"Added feature subscriber: {callback.__name__}")
def remove_feature_subscriber(self, callback):
"""Remove a feature subscriber"""
if callback in self.feature_subscribers:
self.feature_subscribers.remove(callback)
logger.info(f"Removed feature subscriber: {callback.__name__}")
async def start_processing(self):
"""Start real-time tick processing"""
logger.info("Starting real-time tick processing...")
self.streaming = True
# Start WebSocket streams for each symbol
for symbol in self.symbols:
task = asyncio.create_task(self._websocket_stream(symbol))
self.websocket_tasks[symbol] = task
# Start processing thread for each symbol
thread = Thread(target=self._processing_loop, args=(symbol,), daemon=True)
thread.start()
self.processing_threads[symbol] = thread
logger.info("Real-time tick processing started")
async def stop_processing(self):
"""Stop real-time tick processing"""
logger.info("Stopping real-time tick processing...")
self.streaming = False
# Cancel WebSocket tasks
for symbol, task in self.websocket_tasks.items():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self.websocket_tasks.clear()
logger.info("Real-time tick processing stopped")
async def _websocket_stream(self, symbol: str):
"""WebSocket stream for real-time tick data"""
binance_symbol = symbol.replace('/', '').lower()
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@trade"
while self.streaming:
try:
async with websockets.connect(url) as websocket:
logger.info(f"Tick WebSocket connected for {symbol}")
async for message in websocket:
if not self.streaming:
break
try:
data = json.loads(message)
await self._process_raw_tick(symbol, data)
except Exception as e:
logger.warning(f"Error processing tick for {symbol}: {e}")
except Exception as e:
logger.error(f"WebSocket error for {symbol}: {e}")
if self.streaming:
logger.info(f"Reconnecting tick WebSocket for {symbol} in 2 seconds...")
await asyncio.sleep(2)
async def _process_raw_tick(self, symbol: str, raw_data: Dict):
"""Process raw tick data from WebSocket"""
try:
# Extract tick information
tick = TickData(
timestamp=datetime.fromtimestamp(int(raw_data['T']) / 1000),
price=float(raw_data['p']),
volume=float(raw_data['q']),
side='buy' if raw_data['m'] == False else 'sell', # m=true means buyer is market maker (sell)
trade_id=raw_data.get('t')
)
# Add to buffer
with self.data_lock:
self.tick_buffers[symbol].append(tick)
self.tick_counts[symbol] += 1
except Exception as e:
logger.error(f"Error processing raw tick for {symbol}: {e}")
def _processing_loop(self, symbol: str):
"""Main processing loop for a symbol"""
logger.info(f"Starting processing loop for {symbol}")
while self.streaming:
try:
# Check if we have enough ticks to process
with self.data_lock:
tick_count = len(self.tick_buffers[symbol])
if tick_count >= self.min_ticks_for_processing:
start_time = time.time()
# Process ticks
features = self._extract_neural_features(symbol)
if features is not None:
# Store processed features
with self.data_lock:
self.processed_features[symbol].append(features)
# Notify subscribers
self._notify_feature_subscribers(symbol, features)
# Track processing time
processing_time = (time.time() - start_time) * 1000 # Convert to ms
self.processing_times.append(processing_time)
if len(self.processing_times) % 100 == 0:
avg_time = np.mean(list(self.processing_times))
logger.info(f"Average processing time: {avg_time:.2f}ms")
# Small sleep to prevent CPU overload
time.sleep(0.001) # 1ms sleep for ultra-low latency
except Exception as e:
logger.error(f"Error in processing loop for {symbol}: {e}")
time.sleep(0.01) # Longer sleep on error
def _extract_neural_features(self, symbol: str) -> Optional[ProcessedTickFeatures]:
"""Extract neural network features from recent ticks"""
try:
with self.data_lock:
# Get recent ticks
recent_ticks = list(self.tick_buffers[symbol])[-self.processing_window:]
if len(recent_ticks) < self.min_ticks_for_processing:
return None
# Convert ticks to neural network input
tick_features = self._ticks_to_features(recent_ticks)
# Process with neural network
with torch.no_grad():
tick_tensor = torch.FloatTensor(tick_features).unsqueeze(0).to(self.device)
neural_features, confidence = self.tick_nn(tick_tensor)
neural_features = neural_features.cpu().numpy().flatten()
confidence = confidence.cpu().numpy().item()
# Extract traditional features
price_features = self._extract_price_features(recent_ticks)
volume_features = self._extract_volume_features(recent_ticks)
microstructure_features = self._extract_microstructure_features(recent_ticks)
# Create processed features object
processed = ProcessedTickFeatures(
timestamp=recent_ticks[-1].timestamp,
price_features=price_features,
volume_features=volume_features,
microstructure_features=microstructure_features,
neural_features=neural_features,
confidence=confidence
)
return processed
except Exception as e:
logger.error(f"Error extracting neural features for {symbol}: {e}")
return None
def _ticks_to_features(self, ticks: List[TickData]) -> np.ndarray:
"""Convert tick data to neural network input features"""
features = []
for i, tick in enumerate(ticks):
tick_features = [
tick.price,
tick.volume,
1.0 if tick.side == 'buy' else 0.0, # Buy/sell indicator
tick.timestamp.timestamp(), # Timestamp
]
# Add relative features if we have previous ticks
if i > 0:
prev_tick = ticks[i-1]
price_change = (tick.price - prev_tick.price) / prev_tick.price
volume_ratio = tick.volume / (prev_tick.volume + 1e-8)
time_delta = (tick.timestamp - prev_tick.timestamp).total_seconds()
tick_features.extend([
price_change,
volume_ratio,
time_delta
])
else:
tick_features.extend([0.0, 1.0, 0.0]) # Default values for first tick
# Add moving averages if we have enough data
if i >= 5:
recent_prices = [t.price for t in ticks[max(0, i-4):i+1]]
recent_volumes = [t.volume for t in ticks[max(0, i-4):i+1]]
price_ma = np.mean(recent_prices)
volume_ma = np.mean(recent_volumes)
tick_features.extend([
(tick.price - price_ma) / price_ma, # Price deviation from MA
(tick.volume - volume_ma) / (volume_ma + 1e-8) # Volume deviation from MA
])
else:
tick_features.extend([0.0, 0.0])
features.append(tick_features)
# Pad or truncate to fixed size
target_length = self.processing_window
if len(features) < target_length:
# Pad with zeros
padding = [[0.0] * len(features[0])] * (target_length - len(features))
features = padding + features
elif len(features) > target_length:
# Take the most recent ticks
features = features[-target_length:]
return np.array(features, dtype=np.float32)
def _extract_price_features(self, ticks: List[TickData]) -> np.ndarray:
"""Extract price-based features"""
prices = np.array([tick.price for tick in ticks])
features = [
prices[-1], # Current price
np.mean(prices), # Average price
np.std(prices), # Price volatility
np.max(prices), # High
np.min(prices), # Low
(prices[-1] - prices[0]) / prices[0] if prices[0] != 0 else 0, # Total return
]
# Price momentum features
if len(prices) >= 10:
short_ma = np.mean(prices[-5:])
long_ma = np.mean(prices[-10:])
momentum = (short_ma - long_ma) / long_ma if long_ma != 0 else 0
features.append(momentum)
else:
features.append(0.0)
return np.array(features, dtype=np.float32)
def _extract_volume_features(self, ticks: List[TickData]) -> np.ndarray:
"""Extract volume-based features"""
volumes = np.array([tick.volume for tick in ticks])
buy_volumes = np.array([tick.volume for tick in ticks if tick.side == 'buy'])
sell_volumes = np.array([tick.volume for tick in ticks if tick.side == 'sell'])
features = [
np.sum(volumes), # Total volume
np.mean(volumes), # Average volume
np.std(volumes), # Volume volatility
np.sum(buy_volumes) if len(buy_volumes) > 0 else 0, # Buy volume
np.sum(sell_volumes) if len(sell_volumes) > 0 else 0, # Sell volume
]
# Volume imbalance
total_buy = np.sum(buy_volumes) if len(buy_volumes) > 0 else 0
total_sell = np.sum(sell_volumes) if len(sell_volumes) > 0 else 0
total_volume = total_buy + total_sell
if total_volume > 0:
buy_ratio = total_buy / total_volume
volume_imbalance = buy_ratio - 0.5 # -0.5 to 0.5 range
else:
volume_imbalance = 0.0
features.append(volume_imbalance)
# VWAP (Volume Weighted Average Price)
if np.sum(volumes) > 0:
prices = np.array([tick.price for tick in ticks])
vwap = np.sum(prices * volumes) / np.sum(volumes)
current_price = ticks[-1].price
vwap_deviation = (current_price - vwap) / vwap if vwap != 0 else 0
else:
vwap_deviation = 0.0
features.append(vwap_deviation)
return np.array(features, dtype=np.float32)
def _extract_microstructure_features(self, ticks: List[TickData]) -> np.ndarray:
"""Extract market microstructure features"""
features = []
# Trade frequency
if len(ticks) >= 2:
time_deltas = [(ticks[i].timestamp - ticks[i-1].timestamp).total_seconds()
for i in range(1, len(ticks))]
avg_time_delta = np.mean(time_deltas)
trade_frequency = 1.0 / avg_time_delta if avg_time_delta > 0 else 0
else:
trade_frequency = 0.0
features.append(trade_frequency)
# Price impact features
prices = [tick.price for tick in ticks]
volumes = [tick.volume for tick in ticks]
if len(prices) >= 3:
# Calculate price changes and corresponding volumes
price_changes = [(prices[i] - prices[i-1]) / prices[i-1]
for i in range(1, len(prices)) if prices[i-1] != 0]
corresponding_volumes = volumes[1:len(price_changes)+1]
if len(price_changes) > 0 and len(corresponding_volumes) > 0:
# Simple price impact measure
price_impact = np.corrcoef(np.abs(price_changes), corresponding_volumes)[0, 1]
if np.isnan(price_impact):
price_impact = 0.0
else:
price_impact = 0.0
else:
price_impact = 0.0
features.append(price_impact)
# Bid-ask spread proxy (using price volatility)
if len(prices) >= 5:
recent_prices = prices[-5:]
spread_proxy = (np.max(recent_prices) - np.min(recent_prices)) / np.mean(recent_prices)
else:
spread_proxy = 0.0
features.append(spread_proxy)
# Order flow imbalance (already calculated in volume features, but different perspective)
buy_count = sum(1 for tick in ticks if tick.side == 'buy')
sell_count = len(ticks) - buy_count
total_trades = len(ticks)
if total_trades > 0:
order_flow_imbalance = (buy_count - sell_count) / total_trades
else:
order_flow_imbalance = 0.0
features.append(order_flow_imbalance)
return np.array(features, dtype=np.float32)
def _notify_feature_subscribers(self, symbol: str, features: ProcessedTickFeatures):
"""Notify all feature subscribers of new processed features"""
for callback in self.feature_subscribers:
try:
callback(symbol, features)
except Exception as e:
logger.error(f"Error notifying feature subscriber {callback.__name__}: {e}")
def get_latest_features(self, symbol: str) -> Optional[ProcessedTickFeatures]:
"""Get the latest processed features for a symbol"""
with self.data_lock:
if symbol in self.processed_features and self.processed_features[symbol]:
return self.processed_features[symbol][-1]
return None
def get_processing_stats(self) -> Dict[str, Any]:
"""Get processing performance statistics"""
stats = {
'symbols': self.symbols,
'streaming': self.streaming,
'tick_counts': dict(self.tick_counts),
'buffer_sizes': {symbol: len(self.tick_buffers[symbol]) for symbol in self.symbols},
'feature_counts': {symbol: len(self.processed_features[symbol]) for symbol in self.symbols},
'subscribers': len(self.feature_subscribers)
}
if self.processing_times:
stats['processing_performance'] = {
'avg_time_ms': np.mean(list(self.processing_times)),
'min_time_ms': np.min(list(self.processing_times)),
'max_time_ms': np.max(list(self.processing_times)),
'std_time_ms': np.std(list(self.processing_times))
}
return stats
def train_neural_network(self, training_data: List[Tuple[np.ndarray, np.ndarray]], epochs: int = 100):
"""Train the tick processing neural network"""
logger.info("Training tick processing neural network...")
self.tick_nn.train()
optimizer = torch.optim.Adam(self.tick_nn.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(epochs):
total_loss = 0.0
for batch_features, batch_targets in training_data:
optimizer.zero_grad()
# Convert to tensors
features_tensor = torch.FloatTensor(batch_features).to(self.device)
targets_tensor = torch.FloatTensor(batch_targets).to(self.device)
# Forward pass
outputs, confidence = self.tick_nn(features_tensor)
# Calculate loss
loss = criterion(outputs, targets_tensor)
# Backward pass
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 10 == 0:
avg_loss = total_loss / len(training_data)
logger.info(f"Epoch {epoch}/{epochs}, Average Loss: {avg_loss:.6f}")
self.tick_nn.eval()
logger.info("Neural network training completed")
# Integration with existing orchestrator
def integrate_with_orchestrator(orchestrator, tick_processor: RealTimeTickProcessor):
"""Integrate tick processor with enhanced orchestrator"""
def feature_callback(symbol: str, features: ProcessedTickFeatures):
"""Callback to feed processed features to orchestrator"""
try:
# Convert processed features to format expected by orchestrator
feature_dict = {
'symbol': symbol,
'timestamp': features.timestamp,
'neural_features': features.neural_features,
'price_features': features.price_features,
'volume_features': features.volume_features,
'microstructure_features': features.microstructure_features,
'confidence': features.confidence
}
# Feed to orchestrator's real-time feature processing
if hasattr(orchestrator, 'process_realtime_features'):
orchestrator.process_realtime_features(feature_dict)
except Exception as e:
logger.error(f"Error integrating features with orchestrator: {e}")
# Add the callback to tick processor
tick_processor.add_feature_subscriber(feature_callback)
logger.info("Tick processor integrated with orchestrator")
# Factory function for easy creation
def create_realtime_tick_processor(symbols: List[str] = None) -> RealTimeTickProcessor:
"""Create and configure a real-time tick processor"""
if symbols is None:
symbols = ['ETH/USDT', 'BTC/USDT']
processor = RealTimeTickProcessor(symbols=symbols)
logger.info(f"Created RealTimeTickProcessor for symbols: {symbols}")
return processor