diff --git a/.vscode/launch.json b/.vscode/launch.json index 91914d3..a9ee94f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -78,6 +78,21 @@ "COB_ETH_BUCKET_SIZE": "1" }, "preLaunchTask": "Kill Stale Processes" + }, + { + "name": "šŸ”„ Real-time RL COB Trader (1B Parameters)", + "type": "python", + "request": "launch", + "program": "run_realtime_rl_cob_trader.py", + "console": "integratedTerminal", + "justMyCode": false, + "env": { + "PYTHONUNBUFFERED": "1", + "CUDA_VISIBLE_DEVICES": "0", + "PYTORCH_CUDA_ALLOC_CONF": "max_split_size_mb:512", + "ENABLE_REALTIME_RL": "1" + }, + "preLaunchTask": "Kill Stale Processes" } ], "compounds": [ diff --git a/config.yaml b/config.yaml index 414be4f..552aed6 100644 --- a/config.yaml +++ b/config.yaml @@ -196,6 +196,50 @@ memory: model_limit_gb: 4.0 # Per-model memory limit cleanup_interval: 1800 # Memory cleanup every 30 minutes +# Real-time RL COB Trader Configuration +realtime_rl: + # Model parameters for 1B parameter network + model: + input_size: 2000 # COB feature dimensions + hidden_size: 4096 # Massive hidden layer size + num_layers: 12 # Deep transformer layers + learning_rate: 0.00001 # Very low for stability + weight_decay: 0.000001 # L2 regularization + + # Inference configuration + inference_interval_ms: 200 # Inference every 200ms + min_confidence_threshold: 0.7 # Minimum confidence for signal accumulation + required_confident_predictions: 3 # Need 3 confident predictions for trade + + # Training configuration + training_interval_s: 1.0 # Train every second + batch_size: 32 # Training batch size + replay_buffer_size: 1000 # Store last 1000 predictions for training + + # Signal accumulation + signal_buffer_size: 10 # Buffer size for signal accumulation + consensus_threshold: 3 # Need 3 signals in same direction + + # Model checkpointing + model_checkpoint_dir: "models/realtime_rl_cob" + save_interval_s: 300 # Save models every 5 minutes + + # COB integration + symbols: ["BTC/USDT", "ETH/USDT"] # Symbols to trade + cob_feature_normalization: "robust" # Feature normalization method + + # Reward engineering for RL + reward_structure: + correct_direction_base: 1.0 # Base reward for correct prediction + confidence_scaling: true # Scale reward by confidence + magnitude_bonus: 0.5 # Bonus for predicting magnitude accurately + overconfidence_penalty: 1.5 # Penalty multiplier for wrong high-confidence predictions + trade_execution_multiplier: 10.0 # Higher weight for actual trade outcomes + + # Performance monitoring + statistics_interval_s: 60 # Print stats every minute + detailed_logging: true # Enable detailed performance logging + # Web Dashboard web: host: "127.0.0.1" diff --git a/core/realtime_rl_cob_trader.py b/core/realtime_rl_cob_trader.py new file mode 100644 index 0000000..09d3e39 --- /dev/null +++ b/core/realtime_rl_cob_trader.py @@ -0,0 +1,1047 @@ +#!/usr/bin/env python3 +""" +Real-time Reinforcement Learning COB Trader + +A sophisticated real-time RL system that: +1. Uses COB (Consolidated Order Book) data for training a 1B parameter RL model +2. Performs inference every 200ms or when new data comes +3. Predicts next price moves in real-time +4. Trains continuously based on prediction success +5. Accumulates signals based on confidence +6. Issues trade signals after 3 confident and successful predictions +7. Trains with higher weight when closing trades + +Integrates with existing gogo2 trading system architecture. +""" + +import asyncio +import logging +import numpy as np +import torch +import torch.nn as nn +import torch.optim as optim +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any, Callable, Tuple +from collections import deque, defaultdict +from dataclasses import dataclass +import json +import time +import threading +from threading import Lock +import pickle +import os + +# Local imports +from .cob_integration import COBIntegration +from .trading_executor import TradingExecutor + +logger = logging.getLogger(__name__) + +@dataclass +class PredictionResult: + """Result of a model prediction""" + timestamp: datetime + symbol: str + predicted_direction: int # 0=DOWN, 1=SIDEWAYS, 2=UP + confidence: float + predicted_change: float # Predicted price change % + features: np.ndarray + actual_direction: Optional[int] = None # Filled later for training + actual_change: Optional[float] = None # Filled later for training + reward: Optional[float] = None # Calculated reward for RL training + +@dataclass +class SignalAccumulator: + """Accumulates signals for trade decision making""" + symbol: str + signals: deque # Recent signals + confidence_sum: float = 0.0 + successful_predictions: int = 0 + total_predictions: int = 0 + last_reset_time: datetime = None + + def __post_init__(self): + if self.signals is None: + self.signals = deque(maxlen=10) + if self.last_reset_time is None: + self.last_reset_time = datetime.now() + +class MassiveRLNetwork(nn.Module): + """ + Massive 1B+ parameter RL network optimized for real-time COB trading + """ + + def __init__(self, input_size: int = 2000, hidden_size: int = 4096, num_layers: int = 12): + super(MassiveRLNetwork, self).__init__() + + self.input_size = input_size + self.hidden_size = hidden_size + self.num_layers = num_layers + + # Massive input processing layers + self.input_projection = nn.Sequential( + nn.Linear(input_size, hidden_size), + nn.LayerNorm(hidden_size), + nn.GELU(), + nn.Dropout(0.1) + ) + + # Massive transformer-style encoder layers + self.encoder_layers = nn.ModuleList([ + nn.TransformerEncoderLayer( + d_model=hidden_size, + nhead=32, # Large number of attention heads + dim_feedforward=hidden_size * 4, # 16K feedforward + dropout=0.1, + activation='gelu', + batch_first=True + ) for _ in range(num_layers) + ]) + + # Market regime understanding layers + self.regime_encoder = nn.Sequential( + nn.Linear(hidden_size, hidden_size * 2), + nn.LayerNorm(hidden_size * 2), + nn.GELU(), + nn.Dropout(0.1), + nn.Linear(hidden_size * 2, hidden_size), + nn.LayerNorm(hidden_size), + nn.GELU() + ) + + # Price prediction head (main RL objective) + self.price_head = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 2), + nn.LayerNorm(hidden_size // 2), + nn.GELU(), + nn.Dropout(0.2), + nn.Linear(hidden_size // 2, hidden_size // 4), + nn.LayerNorm(hidden_size // 4), + nn.GELU(), + nn.Linear(hidden_size // 4, 3) # DOWN, SIDEWAYS, UP + ) + + # Value estimation head for RL + self.value_head = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 2), + nn.LayerNorm(hidden_size // 2), + nn.GELU(), + nn.Dropout(0.2), + nn.Linear(hidden_size // 2, hidden_size // 4), + nn.LayerNorm(hidden_size // 4), + nn.GELU(), + nn.Linear(hidden_size // 4, 1) + ) + + # Confidence head + self.confidence_head = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 4), + nn.LayerNorm(hidden_size // 4), + nn.GELU(), + nn.Linear(hidden_size // 4, 1), + nn.Sigmoid() + ) + + # Initialize weights + self.apply(self._init_weights) + + # Calculate total parameters + total_params = sum(p.numel() for p in self.parameters()) + logger.info(f"Massive RL Network initialized with {total_params:,} parameters") + + def _init_weights(self, module): + """Initialize weights with proper scaling for large models""" + if isinstance(module, nn.Linear): + torch.nn.init.xavier_uniform_(module.weight) + if module.bias is not None: + torch.nn.init.zeros_(module.bias) + elif isinstance(module, nn.LayerNorm): + torch.nn.init.ones_(module.weight) + torch.nn.init.zeros_(module.bias) + + def forward(self, x): + """Forward pass through massive network""" + batch_size = x.size(0) + + # Project input + x = self.input_projection(x) # [batch, hidden_size] + + # Add sequence dimension for transformer + x = x.unsqueeze(1) # [batch, 1, hidden_size] + + # Pass through transformer layers + for layer in self.encoder_layers: + x = layer(x) + + # Remove sequence dimension + x = x.squeeze(1) # [batch, hidden_size] + + # Apply regime encoding + x = self.regime_encoder(x) + + # Generate predictions + price_logits = self.price_head(x) + value = self.value_head(x) + confidence = self.confidence_head(x) + + return { + 'price_logits': price_logits, + 'value': value, + 'confidence': confidence, + 'features': x # Hidden features for analysis + } + +class RealtimeRLCOBTrader: + """ + Real-time RL trader using COB data + """ + + def __init__(self, + symbols: List[str] = None, + trading_executor: TradingExecutor = None, + model_checkpoint_dir: str = "models/realtime_rl_cob", + inference_interval_ms: int = 200, + min_confidence_threshold: float = 0.7, + required_confident_predictions: int = 3): + + self.symbols = symbols or ['BTC/USDT', 'ETH/USDT'] + self.trading_executor = trading_executor + self.model_checkpoint_dir = model_checkpoint_dir + self.inference_interval_ms = inference_interval_ms + self.min_confidence_threshold = min_confidence_threshold + self.required_confident_predictions = required_confident_predictions + + # Setup device + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + logger.info(f"Using device: {self.device}") + + # Initialize models for each symbol + self.models: Dict[str, MassiveRLNetwork] = {} + self.optimizers: Dict[str, optim.AdamW] = {} + self.scalers: Dict[str, torch.cuda.amp.GradScaler] = {} + + for symbol in self.symbols: + model = MassiveRLNetwork().to(self.device) + self.models[symbol] = model + self.optimizers[symbol] = optim.AdamW( + model.parameters(), + lr=1e-5, # Low learning rate for stability + weight_decay=1e-6, + betas=(0.9, 0.999) + ) + self.scalers[symbol] = torch.cuda.amp.GradScaler() + + # COB integration + self.cob_integration = COBIntegration(symbols=self.symbols) + self.cob_integration.add_dqn_callback(self._on_cob_update) + + # Data storage for real-time training + self.prediction_history: Dict[str, deque] = {} + self.feature_buffers: Dict[str, deque] = {} + self.price_history: Dict[str, deque] = {} + + # Signal accumulation + self.signal_accumulators: Dict[str, SignalAccumulator] = {} + + # Performance tracking + self.training_stats: Dict[str, Dict] = {} + self.inference_stats: Dict[str, Dict] = {} + + # Initialize per symbol + for symbol in self.symbols: + self.prediction_history[symbol] = deque(maxlen=1000) + self.feature_buffers[symbol] = deque(maxlen=100) + self.price_history[symbol] = deque(maxlen=1000) + self.signal_accumulators[symbol] = SignalAccumulator( + symbol=symbol, + signals=deque(maxlen=self.required_confident_predictions * 2) + ) + self.training_stats[symbol] = { + 'total_predictions': 0, + 'successful_predictions': 0, + 'total_training_steps': 0, + 'average_loss': 0.0, + 'last_training_time': None + } + self.inference_stats[symbol] = { + 'total_inferences': 0, + 'average_inference_time_ms': 0.0, + 'last_inference_time': None + } + + # Threading + self.running = False + self.inference_lock = Lock() + self.training_lock = Lock() + + # Create checkpoint directory + os.makedirs(self.model_checkpoint_dir, exist_ok=True) + + logger.info(f"RealtimeRLCOBTrader initialized for symbols: {self.symbols}") + logger.info(f"Inference interval: {self.inference_interval_ms}ms") + logger.info(f"Required confident predictions: {self.required_confident_predictions}") + + async def start(self): + """Start the real-time RL trader""" + logger.info("Starting Real-time RL COB Trader") + + self.running = True + + # Load existing models if available + self._load_models() + + # Start COB integration + await self.cob_integration.start() + + # Start inference loop + asyncio.create_task(self._inference_loop()) + + # Start training loop + asyncio.create_task(self._training_loop()) + + # Start signal processing loop + asyncio.create_task(self._signal_processing_loop()) + + # Start model saving loop + asyncio.create_task(self._model_saving_loop()) + + logger.info("Real-time RL COB Trader started successfully") + + async def stop(self): + """Stop the real-time RL trader""" + logger.info("Stopping Real-time RL COB Trader") + + self.running = False + + # Save models + self._save_models() + + # Stop COB integration + await self.cob_integration.stop() + + logger.info("Real-time RL COB Trader stopped") + + async def _on_cob_update(self, symbol: str, data: Dict): + """Handle COB updates for real-time inference""" + try: + if symbol not in self.symbols: + return + + # Extract features from COB data + features = self._extract_features(symbol, data) + if features is None: + return + + # Store in buffer + self.feature_buffers[symbol].append({ + 'timestamp': datetime.now(), + 'features': features, + 'raw_data': data + }) + + # Store price for later reward calculation + if 'state' in data: + price = self._extract_price_from_state(data['state']) + if price > 0: + self.price_history[symbol].append({ + 'timestamp': datetime.now(), + 'price': price + }) + + except Exception as e: + logger.error(f"Error handling COB update for {symbol}: {e}") + + def _extract_features(self, symbol: str, data: Dict) -> Optional[np.ndarray]: + """Extract features from COB data for model input""" + try: + # Get state from COB data + if 'state' not in data: + return None + + state = data['state'] + + # Ensure we have the right feature size (2000 features) + if isinstance(state, np.ndarray): + features = state.flatten() + else: + features = np.array(state).flatten() + + # Pad or truncate to exact size + target_size = 2000 + if len(features) < target_size: + # Pad with zeros + padded = np.zeros(target_size) + padded[:len(features)] = features + features = padded + elif len(features) > target_size: + # Truncate + features = features[:target_size] + + # Normalize features + features = self._normalize_features(features) + + return features + + except Exception as e: + logger.error(f"Error extracting features for {symbol}: {e}") + return None + + def _normalize_features(self, features: np.ndarray) -> np.ndarray: + """Normalize features for model input""" + try: + # Clip extreme values + features = np.clip(features, -10.0, 10.0) + + # Z-score normalization with robust statistics + median = np.median(features) + mad = np.median(np.abs(features - median)) + if mad > 1e-6: + features = (features - median) / (mad * 1.4826) + + # Final clipping + features = np.clip(features, -5.0, 5.0) + + return features.astype(np.float32) + + except Exception as e: + logger.warning(f"Error normalizing features: {e}") + return features.astype(np.float32) + + def _extract_price_from_state(self, state) -> float: + """Extract current price from state data""" + try: + # Try different ways to extract price + if isinstance(state, np.ndarray) and len(state) > 0: + # Assume first few elements might contain price info + return float(state[0]) + elif isinstance(state, (list, tuple)) and len(state) > 0: + return float(state[0]) + else: + return 0.0 + except: + return 0.0 + + async def _inference_loop(self): + """Main inference loop - runs every 200ms or when new data arrives""" + logger.info("Starting inference loop") + + while self.running: + try: + start_time = time.time() + + # Run inference for all symbols + for symbol in self.symbols: + await self._run_inference(symbol) + + # Calculate sleep time to maintain interval + elapsed_ms = (time.time() - start_time) * 1000 + sleep_ms = max(0, self.inference_interval_ms - elapsed_ms) + + if sleep_ms > 0: + await asyncio.sleep(sleep_ms / 1000) + + except Exception as e: + logger.error(f"Error in inference loop: {e}") + await asyncio.sleep(0.1) + + async def _run_inference(self, symbol: str): + """Run inference for a specific symbol""" + try: + with self.inference_lock: + # Check if we have recent features + if not self.feature_buffers[symbol]: + return + + # Get latest features + latest_data = self.feature_buffers[symbol][-1] + features = latest_data['features'] + timestamp = latest_data['timestamp'] + + # Run model inference + start_time = time.time() + prediction = self._predict(symbol, features) + inference_time_ms = (time.time() - start_time) * 1000 + + # Update inference stats + stats = self.inference_stats[symbol] + stats['total_inferences'] += 1 + stats['average_inference_time_ms'] = ( + (stats['average_inference_time_ms'] * (stats['total_inferences'] - 1) + inference_time_ms) + / stats['total_inferences'] + ) + stats['last_inference_time'] = timestamp + + # Create prediction result + result = PredictionResult( + timestamp=timestamp, + symbol=symbol, + predicted_direction=prediction['direction'], + confidence=prediction['confidence'], + predicted_change=prediction['change'], + features=features + ) + + # Store prediction for later training + self.prediction_history[symbol].append(result) + + # Add to signal accumulator if confident enough + if prediction['confidence'] >= self.min_confidence_threshold: + self._add_signal(symbol, result) + + logger.debug(f"Inference {symbol}: direction={prediction['direction']}, " + f"confidence={prediction['confidence']:.3f}, " + f"change={prediction['change']:.4f}, " + f"time={inference_time_ms:.1f}ms") + + except Exception as e: + logger.error(f"Error running inference for {symbol}: {e}") + + def _predict(self, symbol: str, features: np.ndarray) -> Dict: + """Run model prediction""" + try: + model = self.models[symbol] + model.eval() + + # Convert to tensor + features_tensor = torch.from_numpy(features).unsqueeze(0).to(self.device) + + with torch.no_grad(): + with torch.cuda.amp.autocast(): + outputs = model(features_tensor) + + # Extract predictions + price_probs = torch.softmax(outputs['price_logits'], dim=1) + direction = torch.argmax(price_probs, dim=1).item() + confidence = outputs['confidence'].item() + value = outputs['value'].item() + + # Calculate predicted change based on direction and confidence + if direction == 2: # UP + predicted_change = confidence * 0.001 # Max 0.1% up + elif direction == 0: # DOWN + predicted_change = -confidence * 0.001 # Max 0.1% down + else: # SIDEWAYS + predicted_change = 0.0 + + return { + 'direction': direction, + 'confidence': confidence, + 'change': predicted_change, + 'value': value + } + + except Exception as e: + logger.error(f"Error in prediction for {symbol}: {e}") + return { + 'direction': 1, # SIDEWAYS + 'confidence': 0.0, + 'change': 0.0, + 'value': 0.0 + } + + def _add_signal(self, symbol: str, prediction: PredictionResult): + """Add confident prediction to signal accumulator""" + try: + accumulator = self.signal_accumulators[symbol] + accumulator.signals.append(prediction) + accumulator.confidence_sum += prediction.confidence + accumulator.total_predictions += 1 + + logger.debug(f"Added signal for {symbol}: {len(accumulator.signals)} total signals") + + except Exception as e: + logger.error(f"Error adding signal for {symbol}: {e}") + + async def _signal_processing_loop(self): + """Process accumulated signals and generate trade decisions""" + logger.info("Starting signal processing loop") + + while self.running: + try: + for symbol in self.symbols: + await self._process_signals(symbol) + + await asyncio.sleep(0.1) # Process signals every 100ms + + except Exception as e: + logger.error(f"Error in signal processing loop: {e}") + await asyncio.sleep(1) + + async def _process_signals(self, symbol: str): + """Process signals for a specific symbol and make trade decisions""" + try: + accumulator = self.signal_accumulators[symbol] + + # Check if we have enough confident predictions + if len(accumulator.signals) < self.required_confident_predictions: + return + + # Get recent signals + recent_signals = list(accumulator.signals)[-self.required_confident_predictions:] + + # Check if all recent signals are in the same direction + directions = [signal.predicted_direction for signal in recent_signals] + confidences = [signal.confidence for signal in recent_signals] + + # Count direction consensus + direction_counts = {0: 0, 1: 0, 2: 0} # DOWN, SIDEWAYS, UP + for direction in directions: + direction_counts[direction] += 1 + + # Find dominant direction + dominant_direction = max(direction_counts, key=direction_counts.get) + consensus_count = direction_counts[dominant_direction] + + # Check if we have enough consensus + if consensus_count >= self.required_confident_predictions and dominant_direction != 1: + # We have consensus for action (not sideways) + avg_confidence = np.mean(confidences) + + # Determine action + if dominant_direction == 2: # UP + action = 'BUY' + elif dominant_direction == 0: # DOWN + action = 'SELL' + else: + return # No action for sideways + + # Execute trade signal + await self._execute_trade_signal(symbol, action, avg_confidence, recent_signals) + + # Reset accumulator after trade signal + self._reset_accumulator(symbol) + + except Exception as e: + logger.error(f"Error processing signals for {symbol}: {e}") + + async def _execute_trade_signal(self, symbol: str, action: str, confidence: float, signals: List[PredictionResult]): + """Execute a trade signal""" + try: + logger.info(f"Executing trade signal: {action} {symbol} with confidence {confidence:.3f}") + + # Get current price + current_price = 0.0 + if self.price_history[symbol]: + current_price = self.price_history[symbol][-1]['price'] + + # Execute through trading executor if available + if self.trading_executor and current_price > 0: + success = self.trading_executor.execute_signal( + symbol=symbol, + action=action, + confidence=confidence, + current_price=current_price + ) + + if success: + logger.info(f"Trade executed successfully: {action} {symbol}") + + # Schedule training with higher weight for trade closure + asyncio.create_task(self._train_on_trade_execution(symbol, signals, action, current_price)) + else: + logger.warning(f"Trade execution failed: {action} {symbol}") + else: + logger.info(f"No trading executor available or price unknown for {symbol}") + + except Exception as e: + logger.error(f"Error executing trade signal for {symbol}: {e}") + + def _reset_accumulator(self, symbol: str): + """Reset signal accumulator after trade execution""" + try: + accumulator = self.signal_accumulators[symbol] + accumulator.signals.clear() + accumulator.confidence_sum = 0.0 + accumulator.last_reset_time = datetime.now() + + logger.debug(f"Reset signal accumulator for {symbol}") + + except Exception as e: + logger.error(f"Error resetting accumulator for {symbol}: {e}") + + async def _training_loop(self): + """Main training loop for real-time model updates""" + logger.info("Starting training loop") + + while self.running: + try: + for symbol in self.symbols: + await self._train_symbol_model(symbol) + + await asyncio.sleep(1.0) # Train every second + + except Exception as e: + logger.error(f"Error in training loop: {e}") + await asyncio.sleep(5) + + async def _train_symbol_model(self, symbol: str): + """Train model for a specific symbol using recent predictions""" + try: + with self.training_lock: + # Check if we have enough data for training + predictions = list(self.prediction_history[symbol]) + if len(predictions) < 10: + return + + # Calculate rewards for recent predictions + self._calculate_rewards(symbol, predictions) + + # Filter predictions with calculated rewards + training_predictions = [p for p in predictions if p.reward is not None] + if len(training_predictions) < 5: + return + + # Prepare training batch + batch_size = min(32, len(training_predictions)) + batch_predictions = training_predictions[-batch_size:] + + # Train model + loss = await self._train_batch(symbol, batch_predictions) + + # Update training stats + stats = self.training_stats[symbol] + stats['total_training_steps'] += 1 + stats['average_loss'] = ( + (stats['average_loss'] * (stats['total_training_steps'] - 1) + loss) + / stats['total_training_steps'] + ) + stats['last_training_time'] = datetime.now() + + logger.debug(f"Training {symbol}: loss={loss:.6f}, batch_size={batch_size}") + + except Exception as e: + logger.error(f"Error training model for {symbol}: {e}") + + def _calculate_rewards(self, symbol: str, predictions: List[PredictionResult]): + """Calculate rewards for predictions based on actual price movements""" + try: + price_history = list(self.price_history[symbol]) + if len(price_history) < 2: + return + + for prediction in predictions: + if prediction.reward is not None: + continue # Already calculated + + # Find actual price change after prediction + pred_time = prediction.timestamp + + # Look for price data after prediction (with reasonable timeout) + future_prices = [ + p for p in price_history + if p['timestamp'] > pred_time and + (p['timestamp'] - pred_time).total_seconds() <= 60 # 1 minute timeout + ] + + if not future_prices: + continue + + # Find price at prediction time + past_prices = [ + p for p in price_history + if abs((p['timestamp'] - pred_time).total_seconds()) <= 10 # 10 second window + ] + + if not past_prices: + continue + + # Calculate actual price change + pred_price = past_prices[-1]['price'] + future_price = future_prices[0]['price'] # Use first future price + + actual_change = (future_price - pred_price) / pred_price + + # Determine actual direction + if actual_change > 0.0005: # 0.05% threshold + actual_direction = 2 # UP + elif actual_change < -0.0005: + actual_direction = 0 # DOWN + else: + actual_direction = 1 # SIDEWAYS + + # Calculate reward based on prediction accuracy + reward = self._calculate_prediction_reward( + prediction.predicted_direction, + actual_direction, + prediction.confidence, + prediction.predicted_change, + actual_change + ) + + # Update prediction + prediction.actual_direction = actual_direction + prediction.actual_change = actual_change + prediction.reward = reward + + # Update training stats + stats = self.training_stats[symbol] + stats['total_predictions'] += 1 + if reward > 0: + stats['successful_predictions'] += 1 + + except Exception as e: + logger.error(f"Error calculating rewards for {symbol}: {e}") + + def _calculate_prediction_reward(self, + predicted_direction: int, + actual_direction: int, + confidence: float, + predicted_change: float, + actual_change: float) -> float: + """Calculate reward for a prediction""" + try: + # Base reward for correct direction + if predicted_direction == actual_direction: + base_reward = 1.0 + else: + base_reward = -1.0 + + # Scale by confidence + confidence_scaled_reward = base_reward * confidence + + # Additional reward for magnitude accuracy + if predicted_direction != 1: # Not sideways + magnitude_accuracy = 1.0 - abs(predicted_change - actual_change) / max(abs(actual_change), 0.001) + magnitude_accuracy = max(0.0, magnitude_accuracy) + confidence_scaled_reward += magnitude_accuracy * 0.5 + + # Penalty for overconfident wrong predictions + if base_reward < 0 and confidence > 0.8: + confidence_scaled_reward *= 1.5 # Increase penalty + + return float(confidence_scaled_reward) + + except Exception as e: + logger.error(f"Error calculating reward: {e}") + return 0.0 + + async def _train_batch(self, symbol: str, predictions: List[PredictionResult]) -> float: + """Train model on a batch of predictions""" + try: + model = self.models[symbol] + optimizer = self.optimizers[symbol] + scaler = self.scalers[symbol] + + model.train() + optimizer.zero_grad() + + # Prepare batch data + features = torch.stack([ + torch.from_numpy(p.features) for p in predictions + ]).to(self.device) + + # Targets + direction_targets = torch.tensor([ + p.actual_direction for p in predictions + ], dtype=torch.long).to(self.device) + + value_targets = torch.tensor([ + p.reward for p in predictions + ], dtype=torch.float32).to(self.device) + + # Forward pass with mixed precision + with torch.cuda.amp.autocast(): + outputs = model(features) + + # Calculate losses + direction_loss = nn.CrossEntropyLoss()(outputs['price_logits'], direction_targets) + value_loss = nn.MSELoss()(outputs['value'].squeeze(), value_targets) + + # Confidence loss (encourage high confidence for correct predictions) + correct_predictions = (torch.argmax(outputs['price_logits'], dim=1) == direction_targets).float() + confidence_loss = nn.BCELoss()(outputs['confidence'].squeeze(), correct_predictions) + + # Combined loss + total_loss = direction_loss + 0.5 * value_loss + 0.3 * confidence_loss + + # Backward pass with gradient scaling + scaler.scale(total_loss).backward() + scaler.unscale_(optimizer) + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + scaler.step(optimizer) + scaler.update() + + return total_loss.item() + + except Exception as e: + logger.error(f"Error training batch for {symbol}: {e}") + return 0.0 + + async def _train_on_trade_execution(self, symbol: str, signals: List[PredictionResult], + action: str, price: float): + """Train with higher weight when a trade is executed""" + try: + logger.info(f"Training on trade execution: {action} {symbol} at ${price:.2f}") + + # Wait a bit to see trade outcome + await asyncio.sleep(30) # 30 seconds to see initial outcome + + # Calculate actual outcome + current_prices = [p['price'] for p in list(self.price_history[symbol])[-5:]] + if len(current_prices) >= 2: + current_price = current_prices[-1] + entry_price = price + + # Calculate P&L + if action == 'BUY': + pnl_ratio = (current_price - entry_price) / entry_price + elif action == 'SELL': + pnl_ratio = (entry_price - current_price) / entry_price + else: + pnl_ratio = 0.0 + + # Create enhanced reward for trade execution + trade_reward = pnl_ratio * 10.0 # Amplify trade outcomes + + # Apply enhanced training weight to signals that led to trade + for signal in signals: + if signal.reward is None: + signal.reward = trade_reward + else: + signal.reward += trade_reward # Add to existing reward + + logger.info(f"Trade outcome for {symbol}: P&L ratio={pnl_ratio:.4f}, " + f"enhanced reward={trade_reward:.4f}") + + # Immediate training step with higher weight + if len(signals) > 0: + loss = await self._train_batch(symbol, signals[-3:]) # Train on last 3 signals + logger.info(f"Enhanced training loss for {symbol}: {loss:.6f}") + + except Exception as e: + logger.error(f"Error in trade execution training for {symbol}: {e}") + + async def _model_saving_loop(self): + """Periodically save models""" + logger.info("Starting model saving loop") + + while self.running: + try: + await asyncio.sleep(300) # Save every 5 minutes + self._save_models() + + except Exception as e: + logger.error(f"Error in model saving loop: {e}") + await asyncio.sleep(60) + + def _save_models(self): + """Save all models to disk""" + try: + for symbol in self.symbols: + symbol_safe = symbol.replace('/', '_') + model_path = os.path.join(self.model_checkpoint_dir, f"{symbol_safe}_model.pt") + + # Save model state + torch.save({ + 'model_state_dict': self.models[symbol].state_dict(), + 'optimizer_state_dict': self.optimizers[symbol].state_dict(), + 'training_stats': self.training_stats[symbol], + 'inference_stats': self.inference_stats[symbol], + 'timestamp': datetime.now().isoformat() + }, model_path) + + logger.debug(f"Saved model for {symbol}") + + except Exception as e: + logger.error(f"Error saving models: {e}") + + def _load_models(self): + """Load existing models from disk""" + try: + for symbol in self.symbols: + symbol_safe = symbol.replace('/', '_') + model_path = os.path.join(self.model_checkpoint_dir, f"{symbol_safe}_model.pt") + + if os.path.exists(model_path): + checkpoint = torch.load(model_path, map_location=self.device) + + self.models[symbol].load_state_dict(checkpoint['model_state_dict']) + self.optimizers[symbol].load_state_dict(checkpoint['optimizer_state_dict']) + + if 'training_stats' in checkpoint: + self.training_stats[symbol].update(checkpoint['training_stats']) + if 'inference_stats' in checkpoint: + self.inference_stats[symbol].update(checkpoint['inference_stats']) + + logger.info(f"Loaded existing model for {symbol}") + else: + logger.info(f"No existing model found for {symbol}, starting fresh") + + except Exception as e: + logger.error(f"Error loading models: {e}") + + def get_performance_stats(self) -> Dict[str, Any]: + """Get comprehensive performance statistics""" + try: + stats = { + 'symbols': self.symbols, + 'training_stats': self.training_stats.copy(), + 'inference_stats': self.inference_stats.copy(), + 'signal_stats': {}, + 'model_info': {} + } + + # Add signal accumulator stats + for symbol in self.symbols: + accumulator = self.signal_accumulators[symbol] + stats['signal_stats'][symbol] = { + 'current_signals': len(accumulator.signals), + 'confidence_sum': accumulator.confidence_sum, + 'total_predictions': accumulator.total_predictions, + 'successful_predictions': accumulator.successful_predictions, + 'success_rate': ( + accumulator.successful_predictions / max(1, accumulator.total_predictions) + ) + } + + # Add model parameter info + for symbol in self.symbols: + model = self.models[symbol] + total_params = sum(p.numel() for p in model.parameters()) + stats['model_info'][symbol] = { + 'total_parameters': total_params, + 'trainable_parameters': sum(p.numel() for p in model.parameters() if p.requires_grad) + } + + return stats + + except Exception as e: + logger.error(f"Error getting performance stats: {e}") + return {} + +# Example usage +async def main(): + """Example usage of RealtimeRLCOBTrader""" + from ..core.trading_executor import TradingExecutor + + # Initialize trading executor (simulation mode) + trading_executor = TradingExecutor(simulation_mode=True) + + # Initialize real-time RL trader + trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT', 'ETH/USDT'], + trading_executor=trading_executor, + inference_interval_ms=200, + min_confidence_threshold=0.7, + required_confident_predictions=3 + ) + + try: + # Start the trader + await trader.start() + + # Run for demonstration + logger.info("Real-time RL COB Trader running...") + await asyncio.sleep(300) # Run for 5 minutes + + # Print performance stats + stats = trader.get_performance_stats() + logger.info(f"Performance stats: {json.dumps(stats, indent=2, default=str)}") + + finally: + # Stop the trader + await trader.stop() + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) \ No newline at end of file diff --git a/run_realtime_rl_cob_trader.py b/run_realtime_rl_cob_trader.py new file mode 100644 index 0000000..61c35fd --- /dev/null +++ b/run_realtime_rl_cob_trader.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +""" +Real-time RL COB Trader Launcher + +Launch script for the real-time reinforcement learning trader that: +1. Uses COB data for training a 1B parameter model +2. Performs inference every 200ms +3. Accumulates confident signals for trade execution +4. Trains continuously in real-time based on outcomes + +This script provides a complete trading system integration. +""" + +import asyncio +import logging +import signal +import sys +import json +import os +from datetime import datetime +from typing import Dict, Any + +# Local imports +from core.realtime_rl_cob_trader import RealtimeRLCOBTrader +from core.trading_executor import TradingExecutor +from core.config import load_config + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/realtime_rl_cob_trader.log'), + logging.StreamHandler(sys.stdout) + ] +) + +logger = logging.getLogger(__name__) + +class RealtimeRLCOBTraderLauncher: + """ + Launcher for Real-time RL COB Trader system + """ + + def __init__(self, config_path: str = "config.yaml"): + """Initialize launcher with configuration""" + self.config = load_config(config_path) + self.trader = None + self.trading_executor = None + self.running = False + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + logger.info("RealtimeRLCOBTraderLauncher initialized") + + def _signal_handler(self, signum, frame): + """Handle shutdown signals""" + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + self.running = False + + async def start(self): + """Start the real-time RL COB trading system""" + try: + logger.info("=" * 60) + logger.info("REAL-TIME RL COB TRADER SYSTEM STARTING") + logger.info("=" * 60) + + # Initialize trading executor + await self._initialize_trading_executor() + + # Initialize RL trader + await self._initialize_rl_trader() + + # Start the trading system + await self._start_trading_system() + + # Run main loop + await self._run_main_loop() + + except Exception as e: + logger.error(f"Critical error in trader launcher: {e}") + raise + finally: + await self.stop() + + async def _initialize_trading_executor(self): + """Initialize the trading executor""" + logger.info("Initializing Trading Executor...") + + # Get trading configuration + trading_config = self.config.get('trading', {}) + mexc_config = self.config.get('mexc', {}) + + # Determine if we should run in simulation mode + simulation_mode = mexc_config.get('simulation_mode', True) + + if simulation_mode: + logger.info("Running in SIMULATION mode - no real trades will be executed") + else: + logger.warning("Running in LIVE TRADING mode - real money at risk!") + + # Add safety confirmation for live trading + confirmation = input("Type 'CONFIRM_LIVE_TRADING' to proceed with live trading: ") + if confirmation != 'CONFIRM_LIVE_TRADING': + logger.info("Live trading not confirmed, switching to simulation mode") + simulation_mode = True + + # Initialize trading executor + self.trading_executor = TradingExecutor( + simulation_mode=simulation_mode, + mexc_config=mexc_config + ) + + logger.info(f"Trading Executor initialized in {'SIMULATION' if simulation_mode else 'LIVE'} mode") + + async def _initialize_rl_trader(self): + """Initialize the RL trader""" + logger.info("Initializing Real-time RL COB Trader...") + + # Get RL configuration + rl_config = self.config.get('realtime_rl', {}) + + # Trading symbols + symbols = rl_config.get('symbols', ['BTC/USDT', 'ETH/USDT']) + + # RL parameters + inference_interval_ms = rl_config.get('inference_interval_ms', 200) + min_confidence_threshold = rl_config.get('min_confidence_threshold', 0.7) + required_confident_predictions = rl_config.get('required_confident_predictions', 3) + model_checkpoint_dir = rl_config.get('model_checkpoint_dir', 'models/realtime_rl_cob') + + # Initialize RL trader + self.trader = RealtimeRLCOBTrader( + symbols=symbols, + trading_executor=self.trading_executor, + model_checkpoint_dir=model_checkpoint_dir, + inference_interval_ms=inference_interval_ms, + min_confidence_threshold=min_confidence_threshold, + required_confident_predictions=required_confident_predictions + ) + + logger.info(f"RL Trader initialized for symbols: {symbols}") + logger.info(f"Inference interval: {inference_interval_ms}ms") + logger.info(f"Confidence threshold: {min_confidence_threshold}") + logger.info(f"Required predictions: {required_confident_predictions}") + + async def _start_trading_system(self): + """Start the complete trading system""" + logger.info("Starting Real-time RL COB Trading System...") + + # Start RL trader (this will start COB integration internally) + await self.trader.start() + + self.running = True + + logger.info("āœ… Real-time RL COB Trading System started successfully!") + logger.info("šŸ”„ 1B parameter model training and inference active") + logger.info("šŸ“Š COB data streaming and processing") + logger.info("šŸŽÆ Signal accumulation and trade execution ready") + logger.info("⚔ Real-time training on prediction outcomes") + + async def _run_main_loop(self): + """Main monitoring and statistics loop""" + logger.info("Starting main monitoring loop...") + + last_stats_time = datetime.now() + stats_interval = 60 # Print stats every 60 seconds + + while self.running: + try: + # Sleep for a bit + await asyncio.sleep(10) + + # Print periodic statistics + current_time = datetime.now() + if (current_time - last_stats_time).total_seconds() >= stats_interval: + await self._print_performance_stats() + last_stats_time = current_time + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in main loop: {e}") + await asyncio.sleep(5) + + logger.info("Main monitoring loop stopped") + + async def _print_performance_stats(self): + """Print comprehensive performance statistics""" + try: + if not self.trader: + return + + stats = self.trader.get_performance_stats() + + logger.info("=" * 80) + logger.info("šŸ”„ REAL-TIME RL COB TRADER PERFORMANCE STATISTICS") + logger.info("=" * 80) + + # Model information + logger.info("šŸ“Š Model Information:") + for symbol, model_info in stats.get('model_info', {}).items(): + total_params = model_info.get('total_parameters', 0) + logger.info(f" {symbol}: {total_params:,} parameters ({total_params/1e9:.2f}B)") + + # Training statistics + logger.info("\n🧠 Training Statistics:") + for symbol, training_stats in stats.get('training_stats', {}).items(): + total_preds = training_stats.get('total_predictions', 0) + successful_preds = training_stats.get('successful_predictions', 0) + success_rate = (successful_preds / max(1, total_preds)) * 100 + avg_loss = training_stats.get('average_loss', 0.0) + training_steps = training_stats.get('total_training_steps', 0) + last_training = training_stats.get('last_training_time') + + logger.info(f" {symbol}:") + logger.info(f" Predictions: {total_preds} (Success: {success_rate:.1f}%)") + logger.info(f" Training Steps: {training_steps}") + logger.info(f" Average Loss: {avg_loss:.6f}") + if last_training: + logger.info(f" Last Training: {last_training}") + + # Inference statistics + logger.info("\n⚔ Inference Statistics:") + for symbol, inference_stats in stats.get('inference_stats', {}).items(): + total_inferences = inference_stats.get('total_inferences', 0) + avg_time = inference_stats.get('average_inference_time_ms', 0.0) + last_inference = inference_stats.get('last_inference_time') + + logger.info(f" {symbol}:") + logger.info(f" Total Inferences: {total_inferences}") + logger.info(f" Average Time: {avg_time:.1f}ms") + if last_inference: + logger.info(f" Last Inference: {last_inference}") + + # Signal statistics + logger.info("\nšŸŽÆ Signal Accumulation:") + for symbol, signal_stats in stats.get('signal_stats', {}).items(): + current_signals = signal_stats.get('current_signals', 0) + confidence_sum = signal_stats.get('confidence_sum', 0.0) + success_rate = signal_stats.get('success_rate', 0.0) * 100 + + logger.info(f" {symbol}:") + logger.info(f" Current Signals: {current_signals}") + logger.info(f" Confidence Sum: {confidence_sum:.2f}") + logger.info(f" Historical Success Rate: {success_rate:.1f}%") + + # Trading executor statistics + if self.trading_executor: + positions = self.trading_executor.get_positions() + trade_history = self.trading_executor.get_trade_history() + + logger.info("\nšŸ’° Trading Statistics:") + logger.info(f" Active Positions: {len(positions)}") + logger.info(f" Total Trades: {len(trade_history)}") + + if trade_history: + # Calculate P&L statistics + total_pnl = sum(trade.pnl for trade in trade_history) + profitable_trades = sum(1 for trade in trade_history if trade.pnl > 0) + win_rate = (profitable_trades / len(trade_history)) * 100 + + logger.info(f" Total P&L: ${total_pnl:.2f}") + logger.info(f" Win Rate: {win_rate:.1f}%") + + # Show active positions + if positions: + logger.info("\nšŸ“ Active Positions:") + for symbol, position in positions.items(): + logger.info(f" {symbol}: {position.side} {position.quantity:.6f} @ ${position.entry_price:.2f}") + + logger.info("=" * 80) + + except Exception as e: + logger.error(f"Error printing performance stats: {e}") + + async def stop(self): + """Stop the trading system gracefully""" + if not self.running: + return + + logger.info("Stopping Real-time RL COB Trading System...") + + self.running = False + + # Stop RL trader + if self.trader: + await self.trader.stop() + logger.info("āœ… RL Trader stopped") + + # Print final statistics + if self.trader: + logger.info("\nšŸ“Š Final Performance Summary:") + await self._print_performance_stats() + + logger.info("Real-time RL COB Trading System stopped successfully") + +async def main(): + """Main entry point""" + try: + # Create logs directory if it doesn't exist + os.makedirs('logs', exist_ok=True) + + # Initialize and start launcher + launcher = RealtimeRLCOBTraderLauncher() + await launcher.start() + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down...") + except Exception as e: + logger.error(f"Critical error: {e}") + raise + +if __name__ == "__main__": + # Set event loop policy for Windows compatibility + if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + + asyncio.run(main()) \ No newline at end of file diff --git a/test_realtime_rl_cob_trader.py b/test_realtime_rl_cob_trader.py new file mode 100644 index 0000000..b8b7fb1 --- /dev/null +++ b/test_realtime_rl_cob_trader.py @@ -0,0 +1,547 @@ +#!/usr/bin/env python3 +""" +Test Script for Real-time RL COB Trader + +This script tests the real-time reinforcement learning system to ensure: +1. Proper model initialization and parameter count (~1B parameters) +2. COB data integration and feature extraction +3. Real-time inference pipeline +4. Signal accumulation and consensus +5. Training loop functionality +6. Trade execution integration + +Run this before deploying the live system. +""" + +import asyncio +import logging +import numpy as np +import torch +import time +import json +from datetime import datetime +from typing import Dict, Any + +# Local imports +from core.realtime_rl_cob_trader import RealtimeRLCOBTrader, MassiveRLNetwork, PredictionResult +from core.trading_executor import TradingExecutor + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger(__name__) + +class RealtimeRLTester: + """ + Comprehensive tester for Real-time RL COB Trader + """ + + def __init__(self): + self.test_results = {} + self.trader = None + self.trading_executor = None + + async def run_all_tests(self): + """Run all tests and generate report""" + logger.info("=" * 60) + logger.info("REAL-TIME RL COB TRADER TESTING SUITE") + logger.info("=" * 60) + + tests = [ + self.test_model_initialization, + self.test_model_parameter_count, + self.test_feature_extraction, + self.test_inference_performance, + self.test_signal_accumulation, + self.test_training_pipeline, + self.test_trading_integration, + self.test_performance_monitoring + ] + + for test in tests: + try: + await test() + except Exception as e: + logger.error(f"Test {test.__name__} failed: {e}") + self.test_results[test.__name__] = {'status': 'FAILED', 'error': str(e)} + + await self.generate_test_report() + + async def test_model_initialization(self): + """Test model initialization and architecture""" + logger.info("🧠 Testing Model Initialization...") + + try: + # Test model creation + model = MassiveRLNetwork(input_size=2000, hidden_size=4096, num_layers=12) + + # Check if CUDA is available + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + model = model.to(device) + + # Test forward pass + batch_size = 4 + test_input = torch.randn(batch_size, 2000).to(device) + + with torch.no_grad(): + outputs = model(test_input) + + # Verify outputs + assert 'price_logits' in outputs + assert 'value' in outputs + assert 'confidence' in outputs + assert 'features' in outputs + + assert outputs['price_logits'].shape == (batch_size, 3) # DOWN, SIDEWAYS, UP + assert outputs['value'].shape == (batch_size, 1) + assert outputs['confidence'].shape == (batch_size, 1) + + self.test_results['test_model_initialization'] = { + 'status': 'PASSED', + 'device': str(device), + 'output_shapes': {k: list(v.shape) for k, v in outputs.items()} + } + + logger.info("āœ… Model initialization test PASSED") + + except Exception as e: + self.test_results['test_model_initialization'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_model_parameter_count(self): + """Test that model has approximately 1B parameters""" + logger.info("šŸ”¢ Testing Model Parameter Count...") + + try: + model = MassiveRLNetwork(input_size=2000, hidden_size=4096, num_layers=12) + + total_params = sum(p.numel() for p in model.parameters()) + trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) + + logger.info(f"Total parameters: {total_params:,}") + logger.info(f"Trainable parameters: {trainable_params:,}") + + self.test_results['test_model_parameter_count'] = { + 'status': 'PASSED', + 'total_parameters': total_params, + 'trainable_parameters': trainable_params, + 'parameter_size_gb': (total_params * 4) / (1024**3), # 4 bytes per float32 + 'is_massive': total_params > 100_000_000 # At least 100M parameters + } + + logger.info(f"āœ… Model has {total_params:,} parameters ({total_params/1e9:.2f}B)") + + except Exception as e: + self.test_results['test_model_parameter_count'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_feature_extraction(self): + """Test feature extraction from COB data""" + logger.info("šŸ” Testing Feature Extraction...") + + try: + # Initialize trader + self.trading_executor = TradingExecutor(simulation_mode=True) + self.trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT'], + trading_executor=self.trading_executor, + inference_interval_ms=1000 # Slower for testing + ) + + # Create mock COB data + mock_cob_data = { + 'state': np.random.randn(1500), # Mock state features + 'timestamp': datetime.now(), + 'type': 'cob_state' + } + + # Test feature extraction + features = self.trader._extract_features('BTC/USDT', mock_cob_data) + + assert features is not None + assert len(features) == 2000 # Target feature size + assert features.dtype == np.float32 + assert not np.any(np.isnan(features)) + assert not np.any(np.isinf(features)) + + # Test normalization + assert np.abs(np.mean(features)) < 1.0 # Roughly normalized + assert np.std(features) < 10.0 # Not too spread out + + self.test_results['test_feature_extraction'] = { + 'status': 'PASSED', + 'feature_size': len(features), + 'feature_range': [float(np.min(features)), float(np.max(features))], + 'feature_stats': { + 'mean': float(np.mean(features)), + 'std': float(np.std(features)), + 'median': float(np.median(features)) + } + } + + logger.info("āœ… Feature extraction test PASSED") + + except Exception as e: + self.test_results['test_feature_extraction'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_inference_performance(self): + """Test inference speed and quality""" + logger.info("⚔ Testing Inference Performance...") + + try: + if not self.trader: + self.trading_executor = TradingExecutor(simulation_mode=True) + self.trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT'], + trading_executor=self.trading_executor + ) + + # Test multiple inferences + num_tests = 10 + inference_times = [] + + for i in range(num_tests): + # Create test features + test_features = np.random.randn(2000).astype(np.float32) + test_features = self.trader._normalize_features(test_features) + + # Time inference + start_time = time.time() + prediction = self.trader._predict('BTC/USDT', test_features) + inference_time = (time.time() - start_time) * 1000 + + inference_times.append(inference_time) + + # Verify prediction structure + assert 'direction' in prediction + assert 'confidence' in prediction + assert 'change' in prediction + assert 'value' in prediction + + assert 0 <= prediction['direction'] <= 2 + assert 0.0 <= prediction['confidence'] <= 1.0 + assert isinstance(prediction['change'], float) + assert isinstance(prediction['value'], float) + + avg_inference_time = np.mean(inference_times) + max_inference_time = np.max(inference_times) + + # Check if inference is fast enough (target: <50ms per inference) + inference_target_ms = 50.0 + + self.test_results['test_inference_performance'] = { + 'status': 'PASSED' if avg_inference_time < inference_target_ms else 'WARNING', + 'average_inference_time_ms': float(avg_inference_time), + 'max_inference_time_ms': float(max_inference_time), + 'target_time_ms': inference_target_ms, + 'meets_target': avg_inference_time < inference_target_ms, + 'inferences_per_second': 1000.0 / avg_inference_time + } + + logger.info(f"āœ… Average inference time: {avg_inference_time:.2f}ms") + logger.info(f"āœ… Max inference time: {max_inference_time:.2f}ms") + logger.info(f"āœ… Inferences per second: {1000.0/avg_inference_time:.1f}") + + except Exception as e: + self.test_results['test_inference_performance'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_signal_accumulation(self): + """Test signal accumulation and consensus logic""" + logger.info("šŸŽÆ Testing Signal Accumulation...") + + try: + if not self.trader: + self.trading_executor = TradingExecutor(simulation_mode=True) + self.trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT'], + trading_executor=self.trading_executor, + required_confident_predictions=3 + ) + + symbol = 'BTC/USDT' + accumulator = self.trader.signal_accumulators[symbol] + + # Test adding signals + test_predictions = [] + for i in range(5): + prediction = PredictionResult( + timestamp=datetime.now(), + symbol=symbol, + predicted_direction=2, # UP + confidence=0.8, + predicted_change=0.001, + features=np.random.randn(2000).astype(np.float32) + ) + test_predictions.append(prediction) + self.trader._add_signal(symbol, prediction) + + # Check accumulator state + assert len(accumulator.signals) == 5 + assert accumulator.confidence_sum == 5 * 0.8 + assert accumulator.total_predictions == 5 + + # Test consensus logic (simulate processing) + recent_signals = list(accumulator.signals)[-3:] + directions = [signal.predicted_direction for signal in recent_signals] + + # All should be direction 2 (UP) + direction_counts = {0: 0, 1: 0, 2: 0} + for direction in directions: + direction_counts[direction] += 1 + + dominant_direction = max(direction_counts, key=direction_counts.get) + consensus_count = direction_counts[dominant_direction] + + assert dominant_direction == 2 + assert consensus_count == 3 + + self.test_results['test_signal_accumulation'] = { + 'status': 'PASSED', + 'signals_added': len(accumulator.signals), + 'confidence_sum': accumulator.confidence_sum, + 'consensus_direction': dominant_direction, + 'consensus_count': consensus_count + } + + logger.info("āœ… Signal accumulation test PASSED") + + except Exception as e: + self.test_results['test_signal_accumulation'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_training_pipeline(self): + """Test training pipeline functionality""" + logger.info("🧠 Testing Training Pipeline...") + + try: + if not self.trader: + self.trading_executor = TradingExecutor(simulation_mode=True) + self.trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT'], + trading_executor=self.trading_executor + ) + + symbol = 'BTC/USDT' + + # Create mock training data + test_predictions = [] + for i in range(10): + prediction = PredictionResult( + timestamp=datetime.now(), + symbol=symbol, + predicted_direction=np.random.randint(0, 3), + confidence=np.random.uniform(0.5, 1.0), + predicted_change=np.random.uniform(-0.001, 0.001), + features=np.random.randn(2000).astype(np.float32), + actual_direction=np.random.randint(0, 3), + actual_change=np.random.uniform(-0.001, 0.001), + reward=np.random.uniform(-1.0, 1.0) + ) + test_predictions.append(prediction) + + # Test training batch + loss = await self.trader._train_batch(symbol, test_predictions) + + assert isinstance(loss, float) + assert not np.isnan(loss) + assert not np.isinf(loss) + assert loss >= 0.0 # Loss should be non-negative + + self.test_results['test_training_pipeline'] = { + 'status': 'PASSED', + 'training_loss': float(loss), + 'batch_size': len(test_predictions), + 'training_successful': True + } + + logger.info(f"āœ… Training pipeline test PASSED (loss: {loss:.6f})") + + except Exception as e: + self.test_results['test_training_pipeline'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_trading_integration(self): + """Test integration with trading executor""" + logger.info("šŸ’° Testing Trading Integration...") + + try: + # Initialize with simulation mode + trading_executor = TradingExecutor(simulation_mode=True) + + # Test signal execution + success = trading_executor.execute_signal( + symbol='BTC/USDT', + action='BUY', + confidence=0.8, + current_price=50000.0 + ) + + # In simulation mode, this should always succeed + assert success == True + + # Check positions + positions = trading_executor.get_positions() + assert 'BTC/USDT' in positions + + # Test sell signal + success = trading_executor.execute_signal( + symbol='BTC/USDT', + action='SELL', + confidence=0.8, + current_price=50100.0 + ) + + assert success == True + + # Check trade history + trade_history = trading_executor.get_trade_history() + assert len(trade_history) > 0 + + last_trade = trade_history[-1] + assert last_trade.symbol == 'BTC/USDT' + assert last_trade.pnl != 0 # Should have some P&L + + self.test_results['test_trading_integration'] = { + 'status': 'PASSED', + 'simulation_mode': True, + 'trades_executed': len(trade_history), + 'last_trade_pnl': float(last_trade.pnl) + } + + logger.info("āœ… Trading integration test PASSED") + + except Exception as e: + self.test_results['test_trading_integration'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def test_performance_monitoring(self): + """Test performance monitoring and statistics""" + logger.info("šŸ“Š Testing Performance Monitoring...") + + try: + if not self.trader: + self.trading_executor = TradingExecutor(simulation_mode=True) + self.trader = RealtimeRLCOBTrader( + symbols=['BTC/USDT', 'ETH/USDT'], + trading_executor=self.trading_executor + ) + + # Get performance stats + stats = self.trader.get_performance_stats() + + # Verify structure + assert 'symbols' in stats + assert 'training_stats' in stats + assert 'inference_stats' in stats + assert 'signal_stats' in stats + assert 'model_info' in stats + + # Check symbols + assert 'BTC/USDT' in stats['symbols'] + assert 'ETH/USDT' in stats['symbols'] + + # Check model info + for symbol in stats['symbols']: + assert symbol in stats['model_info'] + model_info = stats['model_info'][symbol] + assert 'total_parameters' in model_info + assert 'trainable_parameters' in model_info + assert model_info['total_parameters'] > 0 + + self.test_results['test_performance_monitoring'] = { + 'status': 'PASSED', + 'stats_structure_valid': True, + 'symbols_tracked': len(stats['symbols']), + 'model_info_available': len(stats['model_info']) + } + + logger.info("āœ… Performance monitoring test PASSED") + + except Exception as e: + self.test_results['test_performance_monitoring'] = {'status': 'FAILED', 'error': str(e)} + raise + + async def generate_test_report(self): + """Generate comprehensive test report""" + logger.info("=" * 60) + logger.info("REAL-TIME RL COB TRADER TEST REPORT") + logger.info("=" * 60) + + total_tests = len(self.test_results) + passed_tests = sum(1 for result in self.test_results.values() if result['status'] == 'PASSED') + failed_tests = sum(1 for result in self.test_results.values() if result['status'] == 'FAILED') + warning_tests = sum(1 for result in self.test_results.values() if result['status'] == 'WARNING') + + logger.info(f"šŸ“Š Test Summary:") + logger.info(f" Total Tests: {total_tests}") + logger.info(f" āœ… Passed: {passed_tests}") + logger.info(f" āš ļø Warnings: {warning_tests}") + logger.info(f" āŒ Failed: {failed_tests}") + + success_rate = (passed_tests / total_tests) * 100 if total_tests > 0 else 0 + logger.info(f" Success Rate: {success_rate:.1f}%") + + logger.info("\nšŸ“‹ Detailed Results:") + for test_name, result in self.test_results.items(): + status_icon = "āœ…" if result['status'] == 'PASSED' else "āš ļø" if result['status'] == 'WARNING' else "āŒ" + logger.info(f" {status_icon} {test_name}: {result['status']}") + + if result['status'] == 'FAILED': + logger.error(f" Error: {result.get('error', 'Unknown error')}") + + # System readiness assessment + logger.info("\nšŸŽÆ System Readiness Assessment:") + if failed_tests == 0: + if warning_tests == 0: + logger.info(" 🟢 SYSTEM READY FOR DEPLOYMENT") + logger.info(" All tests passed. The real-time RL COB trader is ready for live operation.") + else: + logger.info(" 🟔 SYSTEM READY WITH WARNINGS") + logger.info(" System is functional but some performance warnings exist.") + else: + logger.info(" šŸ”“ SYSTEM NOT READY") + logger.info(" Critical issues found. Fix errors before deployment.") + + # Save detailed report + report_data = { + 'timestamp': datetime.now().isoformat(), + 'test_summary': { + 'total_tests': total_tests, + 'passed_tests': passed_tests, + 'warning_tests': warning_tests, + 'failed_tests': failed_tests, + 'success_rate': success_rate + }, + 'test_results': self.test_results, + 'system_readiness': 'READY' if failed_tests == 0 else 'NOT_READY' + } + + report_file = f"test_reports/realtime_rl_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + import os + os.makedirs('test_reports', exist_ok=True) + + with open(report_file, 'w') as f: + json.dump(report_data, f, indent=2, default=str) + + logger.info(f"\nšŸ“„ Detailed report saved to: {report_file}") + logger.info("=" * 60) + +async def main(): + """Main test entry point""" + logger.info("Starting Real-time RL COB Trader Test Suite...") + + tester = RealtimeRLTester() + await tester.run_all_tests() + +if __name__ == "__main__": + # Set event loop policy for Windows compatibility + if hasattr(asyncio, 'WindowsProactorEventLoopPolicy'): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + + asyncio.run(main()) \ No newline at end of file