Files
gogo2/NN/models/cob_rl_model.py
Dobromir Popov 9cd2d5d8a4 fixes
2025-07-07 23:39:12 +03:00

394 lines
15 KiB
Python

"""
COB RL Model - 1B Parameter Reinforcement Learning Network for COB Trading
This module contains the massive 1B+ parameter RL network optimized for real-time
Consolidated Order Book (COB) trading. The model processes COB features and performs
inference every 200ms for ultra-low latency trading decisions.
Architecture:
- Input: 2000-dimensional COB features
- Core: 12-layer transformer with 4096 hidden size (32 attention heads)
- Output: Price direction (DOWN/SIDEWAYS/UP), value estimation, confidence
- Parameters: ~1B total parameters for maximum market understanding
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import logging
from typing import Dict, List, Optional, Tuple, Any
from abc import ABC, abstractmethod
from models import ModelInterface
logger = logging.getLogger(__name__)
class MassiveRLNetwork(nn.Module):
"""
Massive 1B+ parameter RL network optimized for real-time COB trading
This network processes consolidated order book data and makes predictions about
future price movements with high confidence. Designed for 200ms inference cycles.
"""
def __init__(self, input_size: int = 2000, hidden_size: int = 2048, num_layers: int = 8):
super(MassiveRLNetwork, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
# Optimized input processing layers for 400M params
self.input_projection = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.LayerNorm(hidden_size),
nn.GELU(),
nn.Dropout(0.1)
)
# Efficient transformer-style encoder layers (400M target)
self.encoder_layers = nn.ModuleList([
nn.TransformerEncoderLayer(
d_model=hidden_size,
nhead=16, # Reduced attention heads for efficiency
dim_feedforward=hidden_size * 3, # 6K feedforward (reduced from 16K)
dropout=0.1,
activation='gelu',
batch_first=True
) for _ in range(num_layers)
])
# Market regime understanding layers (optimized for 400M)
self.regime_encoder = nn.Sequential(
nn.Linear(hidden_size, hidden_size + 512), # Smaller expansion
nn.LayerNorm(hidden_size + 512),
nn.GELU(),
nn.Dropout(0.1),
nn.Linear(hidden_size + 512, hidden_size),
nn.LayerNorm(hidden_size),
nn.GELU()
)
# Price prediction head (main RL objective)
self.price_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.LayerNorm(hidden_size // 2),
nn.GELU(),
nn.Dropout(0.2),
nn.Linear(hidden_size // 2, hidden_size // 4),
nn.LayerNorm(hidden_size // 4),
nn.GELU(),
nn.Linear(hidden_size // 4, 3) # DOWN, SIDEWAYS, UP
)
# Value estimation head for RL
self.value_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.LayerNorm(hidden_size // 2),
nn.GELU(),
nn.Dropout(0.2),
nn.Linear(hidden_size // 2, hidden_size // 4),
nn.LayerNorm(hidden_size // 4),
nn.GELU(),
nn.Linear(hidden_size // 4, 1)
)
# Confidence head
self.confidence_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 4),
nn.LayerNorm(hidden_size // 4),
nn.GELU(),
nn.Linear(hidden_size // 4, 1),
nn.Sigmoid()
)
# Initialize weights
self.apply(self._init_weights)
# Calculate total parameters
total_params = sum(p.numel() for p in self.parameters())
logger.info(f"COB RL Network initialized with {total_params:,} parameters")
def _init_weights(self, module):
"""Initialize weights with proper scaling for large models"""
if isinstance(module, nn.Linear):
torch.nn.init.xavier_uniform_(module.weight)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.LayerNorm):
torch.nn.init.ones_(module.weight)
torch.nn.init.zeros_(module.bias)
def forward(self, x):
"""
Forward pass through massive network
Args:
x: Input tensor of shape [batch_size, input_size] containing COB features
Returns:
Dict containing:
- price_logits: Logits for price direction (DOWN/SIDEWAYS/UP)
- value: Value estimation for RL
- confidence: Confidence score [0, 1]
- features: Hidden features for analysis
"""
batch_size = x.size(0)
# Project input
x = self.input_projection(x) # [batch, hidden_size]
# Add sequence dimension for transformer
x = x.unsqueeze(1) # [batch, 1, hidden_size]
# Pass through transformer layers
for layer in self.encoder_layers:
x = layer(x)
# Remove sequence dimension
x = x.squeeze(1) # [batch, hidden_size]
# Apply regime encoding
x = self.regime_encoder(x)
# Generate predictions
price_logits = self.price_head(x)
value = self.value_head(x)
confidence = self.confidence_head(x)
return {
'price_logits': price_logits,
'value': value,
'confidence': confidence,
'features': x # Hidden features for analysis
}
def predict(self, cob_features: np.ndarray) -> Dict[str, Any]:
"""
High-level prediction method for COB features
Args:
cob_features: COB features as numpy array [input_size]
Returns:
Dict containing prediction results
"""
self.eval()
with torch.no_grad():
# Convert to tensor and add batch dimension
if isinstance(cob_features, np.ndarray):
x = torch.from_numpy(cob_features).float()
else:
x = cob_features.float()
if x.dim() == 1:
x = x.unsqueeze(0) # Add batch dimension
# Move to device
device = next(self.parameters()).device
x = x.to(device)
# Forward pass
outputs = self.forward(x)
# Process outputs
price_probs = F.softmax(outputs['price_logits'], dim=1)
predicted_direction = torch.argmax(price_probs, dim=1).item()
confidence = outputs['confidence'].item()
value = outputs['value'].item()
return {
'predicted_direction': predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': confidence,
'value': value,
'probabilities': price_probs.cpu().numpy()[0],
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][predicted_direction]
}
def get_model_info(self) -> Dict[str, Any]:
"""Get model architecture information"""
total_params = sum(p.numel() for p in self.parameters())
trainable_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
return {
'model_name': 'MassiveRLNetwork',
'total_parameters': total_params,
'trainable_parameters': trainable_params,
'input_size': self.input_size,
'hidden_size': self.hidden_size,
'num_layers': self.num_layers,
'architecture': 'Transformer-based RL Network',
'designed_for': 'Real-time COB trading (200ms inference)',
'output_classes': ['DOWN', 'SIDEWAYS', 'UP']
}
class COBRLModelInterface(ModelInterface):
"""
Interface for the COB RL model that handles model management, training, and inference
"""
def __init__(self, model_checkpoint_dir: str = "models/realtime_rl_cob", device: str = None, name=None, **kwargs):
super().__init__(name=name) # Initialize ModelInterface with a name
self.model_checkpoint_dir = model_checkpoint_dir
self.device = torch.device(device if device else ('cuda' if torch.cuda.is_available() else 'cpu'))
# Initialize model
self.model = MassiveRLNetwork().to(self.device)
# Initialize optimizer
self.optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=1e-5, # Low learning rate for stability
weight_decay=1e-6,
betas=(0.9, 0.999)
)
# Initialize scaler for mixed precision training
self.scaler = torch.cuda.amp.GradScaler() if self.device.type == 'cuda' else None
logger.info(f"COB RL Model Interface initialized on {self.device}")
def predict(self, cob_features: np.ndarray) -> Dict[str, Any]:
"""Make prediction using the model"""
self.model.eval()
with torch.no_grad():
# Convert to tensor and add batch dimension
if isinstance(cob_features, np.ndarray):
x = torch.from_numpy(cob_features).float()
else:
x = cob_features.float()
if x.dim() == 1:
x = x.unsqueeze(0) # Add batch dimension
# Move to device
x = x.to(self.device)
# Forward pass
outputs = self.model(x)
# Process outputs
price_probs = F.softmax(outputs['price_logits'], dim=1)
predicted_direction = torch.argmax(price_probs, dim=1).item()
confidence = outputs['confidence'].item()
value = outputs['value'].item()
return {
'predicted_direction': predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': confidence,
'value': value,
'probabilities': price_probs.cpu().numpy()[0],
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][predicted_direction]
}
def train_step(self, features: torch.Tensor, targets: Dict[str, torch.Tensor]) -> float:
"""
Perform one training step
Args:
features: Input COB features [batch_size, input_size]
targets: Dict containing 'direction', 'value', 'confidence' targets
Returns:
Training loss value
"""
self.model.train()
self.optimizer.zero_grad()
if self.scaler:
with torch.cuda.amp.autocast():
outputs = self.model(features)
loss = self._calculate_loss(outputs, targets)
self.scaler.scale(loss).backward()
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.scaler.step(self.optimizer)
self.scaler.update()
else:
outputs = self.model(features)
loss = self._calculate_loss(outputs, targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
return loss.item()
def _calculate_loss(self, outputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Calculate combined loss for RL training"""
# Direction prediction loss (cross-entropy)
direction_loss = F.cross_entropy(outputs['price_logits'], targets['direction'])
# Value estimation loss (MSE)
value_loss = F.mse_loss(outputs['value'].squeeze(), targets['value'])
# Confidence loss (BCE)
confidence_loss = F.binary_cross_entropy(outputs['confidence'].squeeze(), targets['confidence'])
# Combined loss with weights
total_loss = direction_loss + 0.5 * value_loss + 0.3 * confidence_loss
return total_loss
def save_model(self, filepath: str = None):
"""Save model checkpoint"""
if filepath is None:
import os
os.makedirs(self.model_checkpoint_dir, exist_ok=True)
filepath = f"{self.model_checkpoint_dir}/cob_rl_model_latest.pt"
checkpoint = {
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'model_info': self.model.get_model_info()
}
if self.scaler:
checkpoint['scaler_state_dict'] = self.scaler.state_dict()
torch.save(checkpoint, filepath)
logger.info(f"COB RL model saved to {filepath}")
def load_model(self, filepath: str = None):
"""Load model checkpoint"""
if filepath is None:
filepath = f"{self.model_checkpoint_dir}/cob_rl_model_latest.pt"
try:
checkpoint = torch.load(filepath, map_location=self.device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
if self.scaler and 'scaler_state_dict' in checkpoint:
self.scaler.load_state_dict(checkpoint['scaler_state_dict'])
logger.info(f"COB RL model loaded from {filepath}")
return True
except Exception as e:
logger.warning(f"Failed to load COB RL model from {filepath}: {e}")
return False
def get_model_stats(self) -> Dict[str, Any]:
"""Get model statistics"""
return self.model.get_model_info()
def get_memory_usage(self) -> float:
"""Estimate COBRLModel memory usage in MB"""
# This is an estimation. For a more precise value, you'd inspect tensors.
# A massive network might take hundreds of MBs or even GBs.
# Let's use a more realistic estimate for a 1B parameter model.
# Assuming float32 (4 bytes per parameter), 1B params = 4GB.
# For a 400M parameter network (as mentioned in comments), it's 1.6GB.
# Let's use a placeholder if it's too complex to calculate dynamically.
try:
# Calculate total parameters and convert to MB
total_params = sum(p.numel() for p in self.model.parameters())
# Assuming float32 (4 bytes per parameter) and converting to MB
memory_bytes = total_params * 4
memory_mb = memory_bytes / (1024 * 1024)
return memory_mb
except Exception as e:
logger.debug(f"Could not estimate COBRLModel memory usage: {e}")
return 1600.0 # Default to 1.6 GB as an estimate if calculation fails