From 506458d55e82d8b1cd1533d2f606a7b9afaf174c Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 12 Mar 2025 01:46:48 +0200 Subject: [PATCH] wip enhanced multitimeframe model --- crypto/gogo2/TradingEnvironment | 42 + crypto/gogo2/_notes.md | 23 + crypto/gogo2/count_params.py | 373 +++-- crypto/gogo2/enhanced_models.py | 434 ++++++ crypto/gogo2/main.py | 2598 ++++++++++++++----------------- 5 files changed, 1972 insertions(+), 1498 deletions(-) create mode 100644 crypto/gogo2/TradingEnvironment create mode 100644 crypto/gogo2/enhanced_models.py diff --git a/crypto/gogo2/TradingEnvironment b/crypto/gogo2/TradingEnvironment new file mode 100644 index 0000000..8e10a46 --- /dev/null +++ b/crypto/gogo2/TradingEnvironment @@ -0,0 +1,42 @@ +def step(self, action): + """Take an action in the environment and return the next state, reward, and done flag""" + # Store current price before taking action + self.current_price = self.data[self.current_step]['close'] + + # Process action (0: HOLD, 1: BUY/LONG, 2: SELL/SHORT, 3: CLOSE) + if not self.demo and self.trading_client: + # Execute real trades in live mode + asyncio.create_task(self._execute_live_action(action)) + + # Calculate reward (simulation still runs in parallel with live trading) + reward, _ = self.calculate_reward(action) # Unpack the tuple here + + # Check for stop loss / take profit hits + self.check_sl_tp() + + # Move to next step + self.current_step += 1 + done = self.current_step >= len(self.data) - 1 + + # Get new state + next_state = self.get_state() + + return next_state, reward, done + +def calculate_reward(self, action): + """Calculate the reward for the current action.""" + # ... (existing code) + + # Combine all reward components + reward = pnl_reward + timing_reward + risk_reward + prediction_reward + + # Log components for analysis + info = { + 'pnl_reward': pnl_reward, + 'timing_reward': timing_reward, + 'risk_reward': risk_reward, + 'prediction_reward': prediction_reward, + 'total_reward': reward + } + + return reward # Return only the reward, not the info dictionary \ No newline at end of file diff --git a/crypto/gogo2/_notes.md b/crypto/gogo2/_notes.md index 49d8e5a..f782c14 100644 --- a/crypto/gogo2/_notes.md +++ b/crypto/gogo2/_notes.md @@ -38,3 +38,26 @@ it seems the model is not learning a lot. we keep hovering about the same starti it seems we may need another NN model down the loop jut to predict the extremums of the price. we may have to include a mechanism to calculate the extremums of the price retrospectively and to use that to bootstrap pre-train the model. + + + + + +Why Performance Might Be Stagnating +Several factors could explain why the model isn't improving significantly during training: +Insufficient Model Capacity for Price Prediction: While the price prediction model has 267,663 parameters, financial time series prediction is extremely challenging. The market may have patterns that are too complex or too random for the current model to capture effectively. +Overfitting to Training Data: The model might be memorizing patterns in the training data that don't generalize to new market conditions. +Transformer-LSTM Redundancy in DQN: Your DQN model uses both a transformer and an LSTM, which might be redundant. Both are designed to capture sequential dependencies, and having both could lead to overfitting or training instability. +Imbalanced Parameter Distribution: 64.5% of your DQN parameters are in the transformer component, which might be excessive for the task. +Reward Function Issues: The reward function might not be properly aligned with profitable trading strategies, or it might be too sparse to provide meaningful learning signals. +Suggested Improvements +1. Enhance Price Prediction Training +2. Simplify the DQN Architecture +Consider creating a more streamlined DQN model: +3. Improve the Reward Function +Make sure your reward function provides meaningful signals for learning: +4. Implement Curriculum Learning +Start with simpler market conditions and gradually increase complexity: +Conclusion +The issue appears to be a combination of model complexity, potential overfitting, and possibly insufficient learning signals from the reward function. By simplifying the DQN architecture (particularly reducing the transformer component), improving the price prediction training, and enhancing the reward function, you should see better learning progress. +Would you like me to implement any of these specific improvements to your codebase? \ No newline at end of file diff --git a/crypto/gogo2/count_params.py b/crypto/gogo2/count_params.py index 95f2fb0..81bf709 100644 --- a/crypto/gogo2/count_params.py +++ b/crypto/gogo2/count_params.py @@ -3,6 +3,278 @@ import torch.nn as nn import torch.nn.functional as F from torch.nn import TransformerEncoder, TransformerEncoderLayer +class EnhancedPricePredictionModel(nn.Module): + def __init__(self, input_dim=2, hidden_dim=256, num_layers=3, output_dim=5, num_timeframes=3): + super(EnhancedPricePredictionModel, self).__init__() + self.hidden_dim = hidden_dim + self.num_layers = num_layers + self.num_timeframes = num_timeframes + + # Separate LSTM for each timeframe + self.timeframe_lstms = nn.ModuleList([ + nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.2) + for _ in range(num_timeframes) + ]) + + # Cross-timeframe attention + self.cross_attention = nn.MultiheadAttention(hidden_dim, num_heads=8, batch_first=True, dropout=0.1) + + # Self-attention for each timeframe + self.self_attentions = nn.ModuleList([ + nn.MultiheadAttention(hidden_dim, num_heads=8, batch_first=True, dropout=0.1) + for _ in range(num_timeframes) + ]) + + # Timeframe fusion layer + self.fusion_layer = nn.Sequential( + nn.Linear(hidden_dim * num_timeframes, hidden_dim * 2), + nn.LeakyReLU(), + nn.Dropout(0.2), + nn.Linear(hidden_dim * 2, hidden_dim) + ) + + # Fully connected layer for price prediction + self.price_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, output_dim) + ) + + # Fully connected layer for extrema prediction (high and low points) + self.extrema_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, 10) # 5 time steps, 2 classes (high/low) each + ) + + # Volume prediction layer + self.volume_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, output_dim) + ) + + def forward(self, x_list): + # x_list is a list of tensors, one for each timeframe + # Each x shape: (batch_size, seq_len, input_dim) + + # Process each timeframe with its own LSTM + lstm_outputs = [] + for i, x in enumerate(x_list): + lstm_out, _ = self.timeframe_lstms[i](x) # lstm_out: (batch_size, seq_len, hidden_dim) + lstm_outputs.append(lstm_out) + + # Apply self-attention to each timeframe + attn_outputs = [] + for i, lstm_out in enumerate(lstm_outputs): + attn_output, _ = self.self_attentions[i](lstm_out, lstm_out, lstm_out) + attn_outputs.append(attn_output[:, -1, :]) # Use the last time step + + # Concatenate all timeframe representations + combined = torch.cat(attn_outputs, dim=1) # (batch_size, hidden_dim * num_timeframes) + + # Fuse timeframe information + fused = self.fusion_layer(combined) # (batch_size, hidden_dim) + + # Price prediction + price_pred = self.price_fc(fused) + + # Extrema prediction + extrema_logits = self.extrema_fc(fused) + + # Volume prediction + volume_pred = self.volume_fc(fused) + + return price_pred, extrema_logits, volume_pred + +class EnhancedDQN(nn.Module): + def __init__(self, state_dim, action_dim, hidden_dim=512): + super(EnhancedDQN, self).__init__() + + # Feature extraction layers with increased capacity + self.feature_extraction = nn.Sequential( + nn.Linear(state_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + ) + + # Advantage stream with increased capacity + self.advantage_stream = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, action_dim) + ) + + # Value stream with increased capacity + self.value_stream = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, 1) + ) + + # Enhanced transformer for temporal dependencies + encoder_layers = TransformerEncoderLayer( + d_model=hidden_dim, + nhead=8, + dim_feedforward=hidden_dim*4, + dropout=0.1, + batch_first=True + ) + self.transformer = TransformerEncoder(encoder_layers, num_layers=3) + + # LSTM for sequential decision making with increased capacity + self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=2, batch_first=True, dropout=0.1) + + # Final layers with increased capacity + self.final_layers = nn.Sequential( + nn.Linear(hidden_dim*2, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, action_dim) + ) + + # Market regime classification layer + self.market_regime_classifier = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, 3) # 3 regimes: trending, ranging, volatile + ) + + def forward(self, state, hidden=None): + # Extract features + features = self.feature_extraction(state) + features = features.unsqueeze(1) # Add sequence dimension for transformer/LSTM + + # Transformer processing + transformer_out = self.transformer(features) + + # LSTM processing + lstm_out, lstm_hidden = self.lstm(transformer_out) + + # Dueling architecture + advantage = self.advantage_stream(features.squeeze(1)) + value = self.value_stream(features.squeeze(1)) + + # Combine transformer, LSTM and dueling outputs + combined = torch.cat([transformer_out.squeeze(1), lstm_out.squeeze(1)], dim=1) + q_values = self.final_layers(combined) + + # Market regime classification + market_regime = self.market_regime_classifier(transformer_out.squeeze(1)) + + # Dueling Q-value computation + dueling_q = value + advantage - advantage.mean(dim=1, keepdim=True) + + # Final Q-values are a weighted combination of the dueling Q-values and the direct Q-values + # This allows the model to use either approach depending on the situation + q_values = 0.5 * dueling_q + 0.5 * q_values + + return q_values, lstm_hidden, market_regime + +def count_parameters(model): + total_params = 0 + layer_params = {} + + for name, param in model.named_parameters(): + if param.requires_grad: + param_count = param.numel() + total_params += param_count + layer_params[name] = (param_count, param.shape) + + return total_params, layer_params + +def main(): + # Initialize the original models for comparison + original_price_model = PricePredictionModel() + original_price_total_params, _ = count_parameters(original_price_model) + + state_dim = 50 + action_dim = 3 + original_dqn_model = DQN(state_dim=state_dim, action_dim=action_dim) + original_dqn_total_params, _ = count_parameters(original_dqn_model) + + # Initialize the enhanced models + enhanced_price_model = EnhancedPricePredictionModel(num_timeframes=3) + enhanced_price_total_params, enhanced_price_layer_params = count_parameters(enhanced_price_model) + + # Increase state dimension to accommodate multiple timeframes + enhanced_state_dim = 100 # Increased from 50 to accommodate more features + enhanced_dqn_model = EnhancedDQN(state_dim=enhanced_state_dim, action_dim=action_dim) + enhanced_dqn_total_params, enhanced_dqn_layer_params = count_parameters(enhanced_dqn_model) + + # Print comparison + print("=== MODEL SIZE COMPARISON ===") + print(f"Original Price Prediction Model: {original_price_total_params:,} parameters") + print(f"Enhanced Price Prediction Model: {enhanced_price_total_params:,} parameters") + print(f"Growth Factor: {enhanced_price_total_params / original_price_total_params:.2f}x\n") + + print(f"Original DQN Model: {original_dqn_total_params:,} parameters") + print(f"Enhanced DQN Model: {enhanced_dqn_total_params:,} parameters") + print(f"Growth Factor: {enhanced_dqn_total_params / original_dqn_total_params:.2f}x\n") + + print(f"Total Original Models: {original_price_total_params + original_dqn_total_params:,} parameters") + print(f"Total Enhanced Models: {enhanced_price_total_params + enhanced_dqn_total_params:,} parameters") + print(f"Overall Growth Factor: {(enhanced_price_total_params + enhanced_dqn_total_params) / (original_price_total_params + original_dqn_total_params):.2f}x\n") + + # Print VRAM usage estimate (rough approximation) + bytes_per_param = 4 # 4 bytes for float32 + original_vram_mb = (original_price_total_params + original_dqn_total_params) * bytes_per_param / (1024 * 1024) + enhanced_vram_mb = (enhanced_price_total_params + enhanced_dqn_total_params) * bytes_per_param / (1024 * 1024) + + print("=== ESTIMATED VRAM USAGE ===") + print(f"Original Models: {original_vram_mb:.2f} MB") + print(f"Enhanced Models: {enhanced_vram_mb:.2f} MB") + print(f"Available VRAM: 8,192 MB (8 GB)") + print(f"VRAM Utilization: {enhanced_vram_mb / 8192 * 100:.2f}%\n") + + # Print detailed breakdown of enhanced models + print("=== ENHANCED PRICE PREDICTION MODEL BREAKDOWN ===") + + # Group parameters by component + timeframe_lstm_params = sum(count for name, (count, _) in enhanced_price_layer_params.items() if "timeframe_lstms" in name) + attention_params = sum(count for name, (count, _) in enhanced_price_layer_params.items() if "attention" in name) + fusion_params = sum(count for name, (count, _) in enhanced_price_layer_params.items() if "fusion" in name) + output_params = sum(count for name, (count, _) in enhanced_price_layer_params.items() if any(x in name for x in ["price_fc", "extrema_fc", "volume_fc"])) + + print(f"Timeframe LSTMs: {timeframe_lstm_params:,} parameters ({timeframe_lstm_params/enhanced_price_total_params*100:.1f}%)") + print(f"Attention Mechanisms: {attention_params:,} parameters ({attention_params/enhanced_price_total_params*100:.1f}%)") + print(f"Fusion Layer: {fusion_params:,} parameters ({fusion_params/enhanced_price_total_params*100:.1f}%)") + print(f"Output Layers: {output_params:,} parameters ({output_params/enhanced_price_total_params*100:.1f}%)\n") + + print("=== ENHANCED DQN MODEL BREAKDOWN ===") + + # Group parameters by component + feature_extraction_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "feature_extraction" in name) + advantage_value_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "advantage_stream" in name or "value_stream" in name) + transformer_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "transformer" in name) + lstm_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "lstm" in name and "transformer" not in name) + final_layers_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "final_layers" in name) + market_regime_params = sum(count for name, (count, _) in enhanced_dqn_layer_params.items() if "market_regime" in name) + + print(f"Feature Extraction: {feature_extraction_params:,} parameters ({feature_extraction_params/enhanced_dqn_total_params*100:.1f}%)") + print(f"Advantage & Value Streams: {advantage_value_params:,} parameters ({advantage_value_params/enhanced_dqn_total_params*100:.1f}%)") + print(f"Transformer: {transformer_params:,} parameters ({transformer_params/enhanced_dqn_total_params*100:.1f}%)") + print(f"LSTM: {lstm_params:,} parameters ({lstm_params/enhanced_dqn_total_params*100:.1f}%)") + print(f"Final Layers: {final_layers_params:,} parameters ({final_layers_params/enhanced_dqn_total_params*100:.1f}%)") + print(f"Market Regime Classifier: {market_regime_params:,} parameters ({market_regime_params/enhanced_dqn_total_params*100:.1f}%)") + +# Keep the original models for comparison class PricePredictionModel(nn.Module): def __init__(self, input_dim=2, hidden_dim=128, num_layers=2, output_dim=5): super(PricePredictionModel, self).__init__() @@ -102,106 +374,5 @@ class DQN(nn.Module): return q_values, lstm_hidden -def count_parameters(model): - total_params = 0 - layer_params = {} - - for name, param in model.named_parameters(): - if param.requires_grad: - param_count = param.numel() - total_params += param_count - layer_params[name] = (param_count, param.shape) - - return total_params, layer_params - -def main(): - # Initialize the Price Prediction Model - price_model = PricePredictionModel() - price_total_params, price_layer_params = count_parameters(price_model) - - print(f"Price Prediction Model parameters: {price_total_params:,}") - print("\nPrice Prediction Model Layers:") - for name, (count, shape) in price_layer_params.items(): - print(f"{name}: {count:,} (shape: {shape})") - - # Initialize the DQN Model with typical dimensions - state_dim = 50 # Typical state dimension for the trading bot - action_dim = 3 # Typical action dimension (buy, sell, hold) - dqn_model = DQN(state_dim=state_dim, action_dim=action_dim) - dqn_total_params, dqn_layer_params = count_parameters(dqn_model) - - # Count parameters by category - feature_extraction_params = sum(count for name, (count, _) in dqn_layer_params.items() if "feature_extraction" in name) - advantage_value_params = sum(count for name, (count, _) in dqn_layer_params.items() if "advantage_stream" in name or "value_stream" in name) - transformer_params = sum(count for name, (count, _) in dqn_layer_params.items() if "transformer" in name) - lstm_params = sum(count for name, (count, _) in dqn_layer_params.items() if "lstm" in name and "transformer" not in name) - final_layers_params = sum(count for name, (count, _) in dqn_layer_params.items() if "final_layers" in name) - - print(f"\nDQN Model parameters: {dqn_total_params:,}") - - # Create sets to track which parameters we've printed - printed_params = set() - - # Print DQN layers in groups to avoid output truncation - print(f"\nDQN Model Layers (Feature Extraction): {feature_extraction_params:,} parameters") - for name, (count, shape) in dqn_layer_params.items(): - if "feature_extraction" in name: - print(f"{name}: {count:,} (shape: {shape})") - printed_params.add(name) - - print(f"\nDQN Model Layers (Advantage & Value Streams): {advantage_value_params:,} parameters") - for name, (count, shape) in dqn_layer_params.items(): - if "advantage_stream" in name or "value_stream" in name: - print(f"{name}: {count:,} (shape: {shape})") - printed_params.add(name) - - print(f"\nDQN Model Layers (Transformer): {transformer_params:,} parameters") - for name, (count, shape) in dqn_layer_params.items(): - if "transformer" in name: - print(f"{name}: {count:,} (shape: {shape})") - printed_params.add(name) - - print(f"\nDQN Model Layers (LSTM): {lstm_params:,} parameters") - for name, (count, shape) in dqn_layer_params.items(): - if "lstm" in name and "transformer" not in name: - print(f"{name}: {count:,} (shape: {shape})") - printed_params.add(name) - - print(f"\nDQN Model Layers (Final Layers): {final_layers_params:,} parameters") - for name, (count, shape) in dqn_layer_params.items(): - if "final_layers" in name: - print(f"{name}: {count:,} (shape: {shape})") - printed_params.add(name) - - # Print any remaining parameters that weren't caught by the categories above - remaining_params = set(dqn_layer_params.keys()) - printed_params - if remaining_params: - remaining_params_count = sum(dqn_layer_params[name][0] for name in remaining_params) - print(f"\nDQN Model Layers (Other): {remaining_params_count:,} parameters") - for name in remaining_params: - count, shape = dqn_layer_params[name] - print(f"{name}: {count:,} (shape: {shape})") - - # Total parameters across both models - print(f"\nTotal parameters (both models): {price_total_params + dqn_total_params:,}") - - # Print summary of parameter distribution - print("\nParameter Distribution Summary:") - print(f"Price Prediction Model: {price_total_params:,} parameters ({price_total_params/(price_total_params + dqn_total_params)*100:.1f}%)") - print(f"DQN Model: {dqn_total_params:,} parameters ({dqn_total_params/(price_total_params + dqn_total_params)*100:.1f}%)") - print("\nDQN Model Breakdown:") - print(f"- Feature Extraction: {feature_extraction_params:,} parameters ({feature_extraction_params/dqn_total_params*100:.1f}%)") - print(f"- Advantage & Value Streams: {advantage_value_params:,} parameters ({advantage_value_params/dqn_total_params*100:.1f}%)") - print(f"- Transformer: {transformer_params:,} parameters ({transformer_params/dqn_total_params*100:.1f}%)") - print(f"- LSTM: {lstm_params:,} parameters ({lstm_params/dqn_total_params*100:.1f}%)") - print(f"- Final Layers: {final_layers_params:,} parameters ({final_layers_params/dqn_total_params*100:.1f}%)") - - # Verify that all parameters are accounted for - total_by_category = feature_extraction_params + advantage_value_params + transformer_params + lstm_params + final_layers_params - if remaining_params: - total_by_category += remaining_params_count - print(f"\nSum of all categories: {total_by_category:,} parameters") - print(f"Difference from total: {dqn_total_params - total_by_category:,} parameters") - if __name__ == "__main__": main() \ No newline at end of file diff --git a/crypto/gogo2/enhanced_models.py b/crypto/gogo2/enhanced_models.py new file mode 100644 index 0000000..e9370aa --- /dev/null +++ b/crypto/gogo2/enhanced_models.py @@ -0,0 +1,434 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.nn import TransformerEncoder, TransformerEncoderLayer + +class EnhancedPricePredictionModel(nn.Module): + def __init__(self, input_dim=2, hidden_dim=256, num_layers=3, output_dim=5, num_timeframes=3): + super(EnhancedPricePredictionModel, self).__init__() + self.hidden_dim = hidden_dim + self.num_layers = num_layers + self.num_timeframes = num_timeframes + + # Separate LSTM for each timeframe + self.timeframe_lstms = nn.ModuleList([ + nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.2) + for _ in range(num_timeframes) + ]) + + # Cross-timeframe attention + self.cross_attention = nn.MultiheadAttention(hidden_dim, num_heads=8, batch_first=True, dropout=0.1) + + # Self-attention for each timeframe + self.self_attentions = nn.ModuleList([ + nn.MultiheadAttention(hidden_dim, num_heads=8, batch_first=True, dropout=0.1) + for _ in range(num_timeframes) + ]) + + # Timeframe fusion layer + self.fusion_layer = nn.Sequential( + nn.Linear(hidden_dim * num_timeframes, hidden_dim * 2), + nn.LeakyReLU(), + nn.Dropout(0.2), + nn.Linear(hidden_dim * 2, hidden_dim) + ) + + # Fully connected layer for price prediction + self.price_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, output_dim) + ) + + # Fully connected layer for extrema prediction (high and low points) + self.extrema_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, 10) # 5 time steps, 2 classes (high/low) each + ) + + # Volume prediction layer + self.volume_fc = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, output_dim) + ) + + def forward(self, x_list): + # x_list is a list of tensors, one for each timeframe + # Each x shape: (batch_size, seq_len, input_dim) + + # Process each timeframe with its own LSTM + lstm_outputs = [] + for i, x in enumerate(x_list): + lstm_out, _ = self.timeframe_lstms[i](x) # lstm_out: (batch_size, seq_len, hidden_dim) + lstm_outputs.append(lstm_out) + + # Apply self-attention to each timeframe + attn_outputs = [] + for i, lstm_out in enumerate(lstm_outputs): + attn_output, _ = self.self_attentions[i](lstm_out, lstm_out, lstm_out) + attn_outputs.append(attn_output[:, -1, :]) # Use the last time step + + # Concatenate all timeframe representations + combined = torch.cat(attn_outputs, dim=1) # (batch_size, hidden_dim * num_timeframes) + + # Fuse timeframe information + fused = self.fusion_layer(combined) # (batch_size, hidden_dim) + + # Price prediction + price_pred = self.price_fc(fused) + + # Extrema prediction + extrema_logits = self.extrema_fc(fused) + + # Volume prediction + volume_pred = self.volume_fc(fused) + + return price_pred, extrema_logits, volume_pred + +class EnhancedDQN(nn.Module): + def __init__(self, state_dim, action_dim, hidden_dim=512): + super(EnhancedDQN, self).__init__() + + # Feature extraction layers with increased capacity + self.feature_extraction = nn.Sequential( + nn.Linear(state_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + ) + + # Advantage stream with increased capacity + self.advantage_stream = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, action_dim) + ) + + # Value stream with increased capacity + self.value_stream = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, 1) + ) + + # Enhanced transformer for temporal dependencies + encoder_layers = TransformerEncoderLayer( + d_model=hidden_dim, + nhead=8, + dim_feedforward=hidden_dim*4, + dropout=0.1, + batch_first=True + ) + self.transformer = TransformerEncoder(encoder_layers, num_layers=3) + + # LSTM for sequential decision making with increased capacity + self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=2, batch_first=True, dropout=0.1) + + # Final layers with increased capacity + self.final_layers = nn.Sequential( + nn.Linear(hidden_dim*2, hidden_dim), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, action_dim) + ) + + # Market regime classification layer + self.market_regime_classifier = nn.Sequential( + nn.Linear(hidden_dim, hidden_dim // 2), + nn.LeakyReLU(), + nn.Linear(hidden_dim // 2, 3) # 3 regimes: trending, ranging, volatile + ) + + def forward(self, state, hidden=None): + # Extract features + features = self.feature_extraction(state) + features = features.unsqueeze(1) # Add sequence dimension for transformer/LSTM + + # Transformer processing + transformer_out = self.transformer(features) + + # LSTM processing + lstm_out, lstm_hidden = self.lstm(transformer_out) + + # Dueling architecture + advantage = self.advantage_stream(features.squeeze(1)) + value = self.value_stream(features.squeeze(1)) + + # Combine transformer, LSTM and dueling outputs + combined = torch.cat([transformer_out.squeeze(1), lstm_out.squeeze(1)], dim=1) + q_values = self.final_layers(combined) + + # Market regime classification + market_regime = self.market_regime_classifier(transformer_out.squeeze(1)) + + # Dueling Q-value computation + dueling_q = value + advantage - advantage.mean(dim=1, keepdim=True) + + # Final Q-values are a weighted combination of the dueling Q-values and the direct Q-values + # This allows the model to use either approach depending on the situation + q_values = 0.5 * dueling_q + 0.5 * q_values + + return q_values, lstm_hidden, market_regime + +class EnhancedReplayBuffer: + """Enhanced replay buffer with prioritized experience replay and n-step returns""" + def __init__(self, capacity, alpha=0.6, beta=0.4, beta_increment=0.001, n_step=3, gamma=0.99): + self.capacity = capacity + self.buffer = [] + self.position = 0 + self.priorities = torch.zeros(capacity) + self.alpha = alpha # Priority exponent + self.beta = beta # Importance sampling weight + self.beta_increment = beta_increment # Beta annealing + self.n_step = n_step # n-step returns + self.gamma = gamma # Discount factor + self.n_step_buffer = [] + self.max_priority = 1.0 + + def push(self, state, action, reward, next_state, done): + # Store experience in n-step buffer + self.n_step_buffer.append((state, action, reward, next_state, done)) + + # If we don't have enough experiences for n-step return, wait + if len(self.n_step_buffer) < self.n_step and not done: + return + + # Calculate n-step return + reward_n = 0 + for i in range(self.n_step): + if i >= len(self.n_step_buffer): + break + reward_n += self.gamma**i * self.n_step_buffer[i][2] + + # Get state, action from the first experience + state = self.n_step_buffer[0][0] + action = self.n_step_buffer[0][1] + + # Get next_state, done from the last experience + next_state = self.n_step_buffer[-1][3] + done = self.n_step_buffer[-1][4] + + # Store in replay buffer with max priority + if len(self.buffer) < self.capacity: + self.buffer.append(None) + self.buffer[self.position] = (state, action, reward_n, next_state, done) + + # Set priority to max priority to ensure it gets sampled + self.priorities[self.position] = self.max_priority + + # Move position pointer + self.position = (self.position + 1) % self.capacity + + # Remove the first experience from n-step buffer + self.n_step_buffer.pop(0) + + # If episode is done, clear n-step buffer + if done: + self.n_step_buffer = [] + + def sample(self, batch_size): + # Calculate sampling probabilities + if len(self.buffer) < self.capacity: + probs = self.priorities[:len(self.buffer)] + else: + probs = self.priorities + + # Normalize probabilities + probs = probs ** self.alpha + probs = probs / probs.sum() + + # Sample indices based on priorities + indices = torch.multinomial(probs, batch_size, replacement=True) + + # Get samples + states = [] + actions = [] + rewards = [] + next_states = [] + dones = [] + + # Calculate importance sampling weights + weights = (len(self.buffer) * probs[indices]) ** (-self.beta) + weights = weights / weights.max() + self.beta = min(1.0, self.beta + self.beta_increment) # Anneal beta + + # Get experiences + for idx in indices: + state, action, reward, next_state, done = self.buffer[idx] + states.append(state) + actions.append(action) + rewards.append(reward) + next_states.append(next_state) + dones.append(done) + + return ( + torch.stack(states), + torch.tensor(actions), + torch.tensor(rewards, dtype=torch.float32), + torch.stack(next_states), + torch.tensor(dones, dtype=torch.float32), + indices, + weights + ) + + def update_priorities(self, indices, td_errors): + for idx, td_error in zip(indices, td_errors): + # Update priority based on TD error + priority = abs(td_error) + 1e-5 # Small constant to ensure non-zero priority + self.priorities[idx] = priority + self.max_priority = max(self.max_priority, priority) + + def __len__(self): + return len(self.buffer) + +def train_price_predictor(model, data_loaders, optimizer, device, epochs=10): + """ + Train the price prediction model using data from multiple timeframes + + Args: + model: The EnhancedPricePredictionModel + data_loaders: List of DataLoader objects, one for each timeframe + optimizer: Optimizer for training + device: Device to train on (CPU or GPU) + epochs: Number of training epochs + """ + model.train() + for epoch in range(epochs): + total_loss = 0 + num_batches = 0 + + # Assume all dataloaders have the same length + for batch_idx, batch_data in enumerate(zip(*data_loaders)): + # Each batch_data is a tuple of (inputs, price_targets, extrema_targets, volume_targets) for each timeframe + optimizer.zero_grad() + + # Prepare inputs for each timeframe + inputs_list = [data[0].to(device) for data in batch_data] + price_targets = batch_data[0][1].to(device) # Use targets from the first timeframe (e.g., 1m) + extrema_targets = batch_data[0][2].to(device) + volume_targets = batch_data[0][3].to(device) + + # Forward pass + price_pred, extrema_logits, volume_pred = model(inputs_list) + + # Calculate losses + price_loss = F.mse_loss(price_pred, price_targets) + extrema_loss = F.binary_cross_entropy_with_logits(extrema_logits, extrema_targets) + volume_loss = F.mse_loss(volume_pred, volume_targets) + + # Combined loss with weighting + loss = price_loss + 0.5 * extrema_loss + 0.3 * volume_loss + + # Backward pass + loss.backward() + + # Gradient clipping to prevent exploding gradients + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + + optimizer.step() + + total_loss += loss.item() + num_batches += 1 + + if batch_idx % 100 == 0: + print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.6f}") + + avg_loss = total_loss / num_batches + print(f"Epoch {epoch+1}/{epochs}, Avg Loss: {avg_loss:.6f}") + + # Learning rate scheduling + if epoch > 0 and epoch % 5 == 0: + for param_group in optimizer.param_groups: + param_group['lr'] *= 0.9 + + return model + +def prepare_multi_timeframe_data(exchange, timeframes=['1m', '15m', '1h'], lookback=30): + """ + Prepare data from multiple timeframes for training + + Args: + exchange: Exchange object to fetch data from + timeframes: List of timeframes to fetch + lookback: Number of candles to look back + + Returns: + List of DataLoader objects, one for each timeframe + """ + data_loaders = [] + + for timeframe in timeframes: + # Fetch historical data for this timeframe + candles = exchange.fetch_ohlcv(timeframe=timeframe, limit=1000) + + # Prepare inputs and targets + inputs = [] + price_targets = [] + extrema_targets = [] + volume_targets = [] + + for i in range(lookback, len(candles) - 5): + # Input: lookback candles (price and volume) + input_data = torch.tensor([ + [candle[4], candle[5]] for candle in candles[i-lookback:i] + ], dtype=torch.float32) + + # Target: next 5 candles (price) + price_target = torch.tensor([ + candle[4] for candle in candles[i:i+5] + ], dtype=torch.float32) + + # Target: extrema points in next 5 candles + extrema_target = torch.zeros(10, dtype=torch.float32) # 5 time steps, 2 classes each + for j in range(5): + # Simple extrema detection for demonstration + if j > 0 and j < 4: + # Local high + if candles[i+j][2] > candles[i+j-1][2] and candles[i+j][2] > candles[i+j+1][2]: + extrema_target[j*2] = 1.0 + # Local low + if candles[i+j][3] < candles[i+j-1][3] and candles[i+j][3] < candles[i+j+1][3]: + extrema_target[j*2+1] = 1.0 + + # Target: volume for next 5 candles + volume_target = torch.tensor([ + candle[5] for candle in candles[i:i+5] + ], dtype=torch.float32) + + inputs.append(input_data) + price_targets.append(price_target) + extrema_targets.append(extrema_target) + volume_targets.append(volume_target) + + # Create dataset and dataloader + dataset = torch.utils.data.TensorDataset( + torch.stack(inputs), + torch.stack(price_targets), + torch.stack(extrema_targets), + torch.stack(volume_targets) + ) + + data_loader = torch.utils.data.DataLoader( + dataset, batch_size=32, shuffle=True + ) + + data_loaders.append(data_loader) + + return data_loaders \ No newline at end of file diff --git a/crypto/gogo2/main.py b/crypto/gogo2/main.py index c97b330..fb378c3 100644 --- a/crypto/gogo2/main.py +++ b/crypto/gogo2/main.py @@ -53,90 +53,169 @@ Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state' # Add this function near the top of the file, after the imports but before any classes def find_local_extrema(prices, window=5, volumes=None, volume_threshold=0.7): """ - Find local minima (bottoms) and maxima (tops) in price data with volume confirmation + Find local extrema (peaks and troughs) in price data with improved accuracy. - Parameters: - ----------- - prices : list or array - Price data - window : int - Window size for extrema detection - volumes : list or array, optional - Volume data corresponding to prices - volume_threshold : float - Threshold for volume significance (percentile) - - Returns: - -------- - bottoms : list - Indices of local bottoms - tops : list - Indices of local tops - """ - bottoms = [] - tops = [] - - if len(prices) < window * 2 + 1: - return bottoms, tops - - # Determine volume significance if volumes are provided - volume_significance = None - if volumes is not None and len(volumes) == len(prices): - # Calculate rolling average volume - rolling_vol = [] - for i in range(len(volumes)): - start_idx = max(0, i - window * 2) - end_idx = i + 1 - rolling_vol.append(np.mean(volumes[start_idx:end_idx])) + Args: + prices: List of price values + window: Window size for extrema detection + volumes: Optional list of volume values + volume_threshold: Volume threshold for confirming extrema - # Calculate volume threshold based on percentile - volume_significance = np.percentile(rolling_vol, volume_threshold * 100) + Returns: + peaks: Indices of local maxima + troughs: Indices of local minima + """ + if len(prices) < 2 * window + 1: + return [], [] + + # Convert to numpy array if not already + prices = np.array(prices) + + # Find potential extrema using rolling window + peaks = [] + troughs = [] for i in range(window, len(prices) - window): - # Check if this is a local minimum (bottom) - is_bottom = all(prices[i] <= prices[i-j] for j in range(1, window+1)) and \ - all(prices[i] <= prices[i+j] for j in range(1, window+1)) + # Get window around current point + window_left = prices[i - window:i] + window_right = prices[i + 1:i + window + 1] + current = prices[i] - # Check if this is a local maximum (top) - is_top = all(prices[i] >= prices[i-j] for j in range(1, window+1)) and \ - all(prices[i] >= prices[i+j] for j in range(1, window+1)) + # Check for peak + if current > np.max(window_left) and current > np.max(window_right): + # Volume confirmation if available + if volumes is None or volumes[i] > np.mean(volumes) * volume_threshold: + peaks.append(i) - # Apply volume filter if available - if volume_significance is not None: - # For bottoms, we want to see increased volume on the reversal - if is_bottom: - # Check if volume is significant at this point or in the next few candles - vol_confirmed = False - for j in range(3): # Check current and next 2 candles - if i+j < len(volumes) and volumes[i+j] > volume_significance: - vol_confirmed = True - break - - if vol_confirmed: - bottoms.append(i) - - # For tops, we want to see increased volume at the peak - elif is_top: - if volumes[i] > volume_significance: - tops.append(i) - else: - # If no volume data, just use price action - if is_bottom: - bottoms.append(i) - elif is_top: - tops.append(i) + # Check for trough + if current < np.min(window_left) and current < np.min(window_right): + # Volume confirmation if available + if volumes is None or volumes[i] > np.mean(volumes) * volume_threshold: + troughs.append(i) - return bottoms, tops + # Apply additional filtering to remove false extrema + if len(peaks) > 1: + # Filter out peaks that are too close to each other + filtered_peaks = [peaks[0]] + for i in range(1, len(peaks)): + if peaks[i] - filtered_peaks[-1] >= window: + filtered_peaks.append(peaks[i]) + peaks = filtered_peaks + + if len(troughs) > 1: + # Filter out troughs that are too close to each other + filtered_troughs = [troughs[0]] + for i in range(1, len(troughs)): + if troughs[i] - filtered_troughs[-1] >= window: + filtered_troughs.append(troughs[i]) + troughs = filtered_troughs + + return peaks, troughs class ReplayMemory: - def __init__(self, capacity): - self.memory = deque(maxlen=capacity) + def __init__(self, capacity, alpha=0.6, beta=0.4, beta_increment=0.001, n_step=3, gamma=0.99): + self.capacity = capacity + self.memory = [] + self.position = 0 + self.Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done')) + # Prioritized Experience Replay parameters + self.alpha = alpha # How much prioritization to use (0 = uniform sampling) + self.beta = beta # Importance sampling correction (0 = no correction) + self.beta_increment = beta_increment # Increment beta over time to 1 + self.max_priority = 1.0 # Initial max priority + + # N-step learning parameters + self.n_step = n_step + self.gamma = gamma + self.n_step_buffer = deque(maxlen=n_step) + def push(self, state, action, reward, next_state, done): - self.memory.append(Experience(state, action, reward, next_state, done)) + """Store transition with maximum priority""" + # Store experience in n-step buffer + self.n_step_buffer.append((state, action, reward, next_state, done)) + # If we don't have enough transitions for n-step yet, return + if len(self.n_step_buffer) < self.n_step and not done: + return + + # Calculate n-step reward and get the appropriate next state + n_step_reward = 0 + n_step_next_state = None + n_step_done = False + + # If the episode ended before we could collect n steps + if done and len(self.n_step_buffer) < self.n_step: + # Use what we have + n_step_next_state = self.n_step_buffer[-1][3] + n_step_done = True + + # Calculate n-step reward with discount + for i, (_, _, r, _, _) in enumerate(self.n_step_buffer): + n_step_reward += r * (self.gamma ** i) + else: + # Get the state after n steps + n_step_next_state = self.n_step_buffer[-1][3] + n_step_done = self.n_step_buffer[-1][4] + + # Calculate n-step reward with discount + for i, (_, _, r, _, _) in enumerate(self.n_step_buffer): + n_step_reward += r * (self.gamma ** i) + + # Get the initial state and action + initial_state = self.n_step_buffer[0][0] + initial_action = self.n_step_buffer[0][1] + + # Create transition with n-step values + transition = self.Transition(initial_state, initial_action, n_step_reward, n_step_next_state, n_step_done) + + # Add to memory with maximum priority + if len(self.memory) < self.capacity: + self.memory.append((transition, self.max_priority)) + else: + self.memory[self.position] = (transition, self.max_priority) + + self.position = (self.position + 1) % self.capacity + + # If this was the end of an episode, clear the n-step buffer + if done: + self.n_step_buffer.clear() + def sample(self, batch_size): - return random.sample(self.memory, batch_size) + """Sample a batch of transitions with prioritized sampling""" + if len(self.memory) < batch_size: + return None + + # Calculate sampling probabilities + priorities = np.array([p for _, p in self.memory]) + probs = priorities ** self.alpha + probs /= probs.sum() + + # Sample indices based on priorities + indices = np.random.choice(len(self.memory), batch_size, p=probs) + + # Get the sampled transitions + transitions = [self.memory[idx][0] for idx in indices] + + # Calculate importance sampling weights + weights = (len(self.memory) * probs[indices]) ** (-self.beta) + weights /= weights.max() # Normalize weights + + # Increment beta for next time + self.beta = min(1.0, self.beta + self.beta_increment) + + # Convert to batch + batch = self.Transition(*zip(*transitions)) + + return batch, indices, weights + + def update_priorities(self, indices, td_errors): + """Update priorities based on TD errors""" + for idx, td_error in zip(indices, td_errors): + # Add a small constant to avoid zero priority + priority = (abs(td_error) + 1e-5) ** self.alpha + self.memory[idx] = (self.memory[idx][0], priority) + self.max_priority = max(self.max_priority, priority) def __len__(self): return len(self.memory) @@ -145,304 +224,405 @@ class DQN(nn.Module): def __init__(self, state_size, action_size, hidden_size=384, lstm_layers=2, attention_heads=4): super(DQN, self).__init__() - # Ensure model parameters are float32 - self.float() - - self.state_size = state_size - self.hidden_size = hidden_size - self.lstm_layers = lstm_layers - - # Initial feature extraction - self.fc1 = nn.Linear(state_size, hidden_size) - # Use LayerNorm instead of BatchNorm for more stability with varying batch sizes - self.ln1 = nn.LayerNorm(hidden_size) - self.dropout1 = nn.Dropout(0.2) - - # LSTM layer for sequential data - self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=lstm_layers, batch_first=True, dropout=0.2) - - # Attention mechanism - self.attention = nn.MultiheadAttention(hidden_size, attention_heads) - - # Output layers with increased capacity - self.fc2 = nn.Linear(hidden_size, hidden_size) - self.ln2 = nn.LayerNorm(hidden_size) # LayerNorm instead of BatchNorm - self.dropout2 = nn.Dropout(0.2) - self.fc3 = nn.Linear(hidden_size, hidden_size // 2) - - # Dueling DQN architecture - self.value_stream = nn.Linear(hidden_size // 2, 1) - self.advantage_stream = nn.Linear(hidden_size // 2, action_size) - - # Transformer encoder for more complex pattern recognition - encoder_layer = nn.TransformerEncoderLayer( - d_model=hidden_size, - nhead=attention_heads, - dropout=0.1, - batch_first=True # Add this parameter + # Feature extraction layers + self.feature_extraction = nn.Sequential( + nn.Linear(state_size, hidden_size), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, hidden_size), + nn.LeakyReLU(), ) - self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2) - def forward(self, x): - batch_size = x.size(0) if x.dim() > 1 else 1 + # LSTM for sequential processing + self.lstm = nn.LSTM( + input_size=hidden_size, + hidden_size=hidden_size, + num_layers=lstm_layers, + batch_first=True, + dropout=0.1 if lstm_layers > 1 else 0 + ) - # Ensure input has correct shape and type - if x.dim() == 1: - x = x.unsqueeze(0) # Add batch dimension + # Dueling network architecture + # Advantage stream + self.advantage_stream = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 2), + nn.LeakyReLU(), + nn.Linear(hidden_size // 2, action_size) + ) - # Ensure float32 type - x = x.float() + # Value stream + self.value_stream = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 2), + nn.LeakyReLU(), + nn.Linear(hidden_size // 2, 1) + ) - # Check if state size matches expected input size - if x.size(1) != self.state_size: - # Handle mismatched input by either truncating or padding - if x.size(1) > self.state_size: - x = x[:, :self.state_size] # Truncate - else: - # Pad with zeros - padding = torch.zeros(batch_size, self.state_size - x.size(1), device=x.device) - x = torch.cat([x, padding], dim=1) + # Market regime classification + self.market_regime_classifier = nn.Sequential( + nn.Linear(hidden_size, hidden_size // 2), + nn.LeakyReLU(), + nn.Linear(hidden_size // 2, 3) # 3 regimes: trending, ranging, volatile + ) - # Initial feature extraction - x = self.fc1(x) - x = F.relu(self.ln1(x)) # LayerNorm works with any batch size - x = self.dropout1(x) + # Initialize weights + self._initialize_weights() - # Reshape for LSTM - x_lstm = x.unsqueeze(1) if x.dim() == 2 else x + def _initialize_weights(self): + for m in self.modules(): + if isinstance(m, nn.Linear): + nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu') + if m.bias is not None: + nn.init.constant_(m.bias, 0) + + def forward(self, x, hidden=None): + # Extract features + features = self.feature_extraction(x) - # Process through LSTM - lstm_out, _ = self.lstm(x_lstm) - lstm_out = lstm_out.squeeze(1) if lstm_out.size(1) == 1 else lstm_out[:, -1] + # Add sequence dimension for LSTM if not present + if len(features.shape) == 2: + features = features.unsqueeze(1) - # Process through transformer for more complex patterns - transformer_input = x.unsqueeze(1) if x.dim() == 2 else x - transformer_out = self.transformer_encoder(transformer_input) - transformer_out = transformer_out.mean(dim=1) # Average across sequence dimension + # LSTM processing + lstm_out, lstm_hidden = self.lstm(features, hidden) - # Combine LSTM and transformer outputs - x = lstm_out + transformer_out - - # Final layers - x = self.fc2(x) - x = F.relu(self.ln2(x)) # LayerNorm works with any batch size - x = self.dropout2(x) - x = F.relu(self.fc3(x)) + # Use the last LSTM output + lstm_out = lstm_out[:, -1, :] # Dueling architecture - value = self.value_stream(x) - advantages = self.advantage_stream(x) - qvals = value + (advantages - advantages.mean(dim=1, keepdim=True)) + advantage = self.advantage_stream(lstm_out) + value = self.value_stream(lstm_out) - return qvals + # Combine value and advantage for Q-values + # Q(s,a) = V(s) + (A(s,a) - mean(A(s,a'))) + q_values = value + advantage - advantage.mean(dim=1, keepdim=True) + + # Market regime classification + market_regime = self.market_regime_classifier(lstm_out) + + return q_values, lstm_hidden, market_regime class PricePredictionModel(nn.Module): - def __init__(self, input_size=30, hidden_size=128, output_size=5, num_layers=2): + def __init__(self, input_size=2, hidden_size=256, output_size=5, num_layers=3, num_timeframes=3): super(PricePredictionModel, self).__init__() - # Input features: price and volume - self.lstm = nn.LSTM(2, hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2) - self.attention = nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True) + self.hidden_size = hidden_size + self.num_layers = num_layers + self.num_timeframes = num_timeframes + self.output_size = output_size - # Price prediction head - self.price_fc = nn.Linear(hidden_size, output_size) + # Separate LSTM for each timeframe + self.timeframe_lstms = nn.ModuleList([ + nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2) + for _ in range(num_timeframes) + ]) - # Extrema prediction head (probability of high/low in next candles) - self.extrema_fc = nn.Linear(hidden_size, output_size * 2) # For each candle: [p_low, p_high] + # Self-attention for each timeframe + self.self_attentions = nn.ModuleList([ + nn.MultiheadAttention(hidden_size, num_heads=4, batch_first=True, dropout=0.1) + for _ in range(num_timeframes) + ]) - # Scalers + # Timeframe fusion layer + self.fusion_layer = nn.Sequential( + nn.Linear(hidden_size * num_timeframes, hidden_size * 2), + nn.LeakyReLU(), + nn.Dropout(0.2), + nn.Linear(hidden_size * 2, hidden_size) + ) + + # Price prediction layers + self.price_fc = nn.Sequential( + nn.Linear(hidden_size, hidden_size), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, output_size) + ) + + # Extrema prediction layers (high and low points) + self.extrema_fc = nn.Sequential( + nn.Linear(hidden_size, hidden_size), + nn.LeakyReLU(), + nn.Dropout(0.1), + nn.Linear(hidden_size, output_size * 2) # For each time step, predict high/low probability + ) + + # Initialize scalers self.price_scaler = MinMaxScaler(feature_range=(0, 1)) self.volume_scaler = MinMaxScaler(feature_range=(0, 1)) - self.is_fitted_price = False - self.is_fitted_volume = False - def forward(self, x): - # x shape: [batch_size, seq_len, 2] (price, volume) - lstm_out, _ = self.lstm(x) + # Initialize weights + self._initialize_weights() - # Apply attention mechanism - attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out) + def _initialize_weights(self): + for m in self.modules(): + if isinstance(m, nn.Linear): + nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu') + if m.bias is not None: + nn.init.constant_(m.bias, 0) + + def forward(self, x_list): + """ + Forward pass for multi-timeframe price prediction - # Use the last time step output with attention - last_hidden = attn_out[:, -1, :] + Args: + x_list: List of tensors, one for each timeframe + Each tensor shape: [batch_size, seq_len, input_features] - # Price predictions - price_pred = self.price_fc(last_hidden) + Returns: + price_pred: Predicted prices for next candles + extrema_pred: Predicted extrema points (high/low probabilities) + """ + # If only one timeframe is provided, duplicate it + if not isinstance(x_list, list): + x_list = [x_list] * self.num_timeframes - # Extrema predictions (probabilities of highs and lows) - extrema_logits = self.extrema_fc(last_hidden) + # Ensure we have the right number of timeframes + if len(x_list) < self.num_timeframes: + x_list = x_list + [x_list[0]] * (self.num_timeframes - len(x_list)) + elif len(x_list) > self.num_timeframes: + x_list = x_list[:self.num_timeframes] + + # Process each timeframe with its own LSTM + lstm_outputs = [] + for i, x in enumerate(x_list): + lstm_out, _ = self.timeframe_lstms[i](x) # lstm_out: [batch_size, seq_len, hidden_size] + lstm_outputs.append(lstm_out) + + # Apply self-attention to each timeframe + attn_outputs = [] + for i, lstm_out in enumerate(lstm_outputs): + attn_output, _ = self.self_attentions[i](lstm_out, lstm_out, lstm_out) + attn_outputs.append(attn_output[:, -1, :]) # Use the last time step + + # Concatenate all timeframe representations + combined = torch.cat(attn_outputs, dim=1) # [batch_size, hidden_size * num_timeframes] + + # Fuse timeframe information + fused = self.fusion_layer(combined) # [batch_size, hidden_size] + + # Price prediction + price_pred = self.price_fc(fused) + + # Extrema prediction + extrema_logits = self.extrema_fc(fused) extrema_pred = torch.sigmoid(extrema_logits) # Convert to probabilities return price_pred, extrema_pred - def preprocess(self, price_history, volume_history=None): - # Reshape price data for scaler - price_reshaped = np.array(price_history).reshape(-1, 1) + def preprocess(self, price_history, volume_history=None, timeframes=None): + """ + Preprocess price and volume data for model input - # Fit price scaler if not already fitted - if not self.is_fitted_price: - self.price_scaler.fit(price_reshaped) - self.is_fitted_price = True - - # Transform price data - scaled_price = self.price_scaler.transform(price_reshaped) + Args: + price_history: List of price histories for different timeframes + volume_history: List of volume histories for different timeframes + timeframes: List of timeframe names (for logging) - # If volume data is provided - if volume_history is not None: - # Reshape volume data for scaler - volume_reshaped = np.array(volume_history).reshape(-1, 1) + Returns: + Preprocessed data ready for model input + """ + # If single timeframe data is provided, convert to list format + if not isinstance(price_history, list): + price_history = [price_history] + if volume_history is not None: + volume_history = [volume_history] + + # Ensure volume history exists + if volume_history is None: + volume_history = [np.ones_like(prices) for prices in price_history] + + # Process each timeframe + processed_data = [] + + for i, (prices, volumes) in enumerate(zip(price_history, volume_history)): + # Convert to numpy arrays if they aren't already + prices = np.array(prices).reshape(-1, 1) + volumes = np.array(volumes).reshape(-1, 1) - # Fit volume scaler if not already fitted - if not self.is_fitted_volume: - self.volume_scaler.fit(volume_reshaped) - self.is_fitted_volume = True - - # Transform volume data - scaled_volume = self.volume_scaler.transform(volume_reshaped) + # Ensure volumes has the same length as prices + if len(volumes) != len(prices): + logger.warning(f"Volume length ({len(volumes)}) doesn't match price length ({len(prices)}). Adjusting...") + if len(volumes) > len(prices): + volumes = volumes[:len(prices)] + else: + # Pad volumes with the mean value + mean_volume = np.mean(volumes) + padding = np.full((len(prices) - len(volumes), 1), mean_volume) + volumes = np.vstack((volumes, padding)) + + # Fit and transform the data + if not hasattr(self, f'price_scaler_{i}'): + setattr(self, f'price_scaler_{i}', MinMaxScaler(feature_range=(0, 1))) + setattr(self, f'volume_scaler_{i}', MinMaxScaler(feature_range=(0, 1))) + + price_scaler = getattr(self, f'price_scaler_{i}') + volume_scaler = getattr(self, f'volume_scaler_{i}') + + # Fit scalers if not already fit + if not hasattr(price_scaler, 'data_min_'): + price_scaler.fit(prices) + if not hasattr(volume_scaler, 'data_min_'): + volume_scaler.fit(volumes) + + # Transform the data + scaled_prices = price_scaler.transform(prices) + scaled_volumes = volume_scaler.transform(volumes) # Combine price and volume data - scaled_data = np.hstack((scaled_price, scaled_volume)) - else: - # If no volume data, use zeros - scaled_volume = np.zeros_like(scaled_price) - scaled_data = np.hstack((scaled_price, scaled_volume)) + combined_data = np.hstack((scaled_prices, scaled_volumes)) - return scaled_data - - def postprocess_price(self, scaled_predictions): - # Inverse transform to get actual price values - return self.price_scaler.inverse_transform(scaled_predictions.reshape(-1, 1)).flatten() - - def predict_next_candles(self, price_history, volume_history=None, num_candles=5): - if len(price_history) < 30: # Need enough history - return np.zeros(num_candles), np.zeros((num_candles, 2)) + # Convert to tensor and move to the same device as the model + tensor_data = torch.FloatTensor(combined_data).unsqueeze(0) # Add batch dimension + tensor_data = tensor_data.to(next(self.parameters()).device) # Move to same device as model - # Use default volume if not provided - if volume_history is None: - volume_history = np.ones_like(price_history) + processed_data.append(tensor_data) - # Ensure same length - min_len = min(len(price_history), len(volume_history)) - price_history = price_history[-min_len:] - volume_history = volume_history[-min_len:] - - # Preprocess data - scaled_data = self.preprocess(price_history, volume_history) + if timeframes: + timeframe_name = timeframes[i] if i < len(timeframes) else f"timeframe_{i}" + logger.info(f"Processed {timeframe_name} data: {tensor_data.shape}") - # Create sequence - sequence = scaled_data[-30:].reshape(1, 30, 2) - sequence_tensor = torch.FloatTensor(sequence).to(next(self.parameters()).device) + return processed_data + + def postprocess_price(self, scaled_predictions, timeframe_idx=0): + """ + Convert scaled predictions back to actual price values - # Get predictions + Args: + scaled_predictions: Model output predictions (scaled) + timeframe_idx: Index of the timeframe to use for inverse scaling + + Returns: + Actual price predictions + """ + # Get the appropriate scaler + price_scaler = getattr(self, f'price_scaler_{timeframe_idx}', self.price_scaler) + + # Convert to numpy and reshape + scaled_predictions = scaled_predictions.detach().cpu().numpy() + scaled_predictions = scaled_predictions.reshape(-1, 1) + + # Inverse transform + actual_predictions = price_scaler.inverse_transform(scaled_predictions) + + return actual_predictions.flatten() + + def predict_next_candles(self, price_history, volume_history=None, timeframes=None, num_candles=5): + """ + Predict the next several candles and potential extrema points + + Args: + price_history: List of price histories for different timeframes + volume_history: List of volume histories for different timeframes + timeframes: List of timeframe names (for logging) + num_candles: Number of future candles to predict + + Returns: + price_predictions: Predicted prices for next candles + extrema_predictions: Predicted extrema points (high/low probabilities) + """ + # Preprocess the data + processed_data = self.preprocess(price_history, volume_history, timeframes) + + # Make prediction with torch.no_grad(): - price_pred, extrema_pred = self(sequence_tensor) - scaled_price_predictions = price_pred.cpu().numpy()[0] - extrema_predictions = extrema_pred.cpu().numpy()[0].reshape(num_candles, 2) - - # Postprocess price predictions - price_predictions = self.postprocess_price(scaled_price_predictions) + price_pred, extrema_pred = self.forward(processed_data) + + # Convert predictions back to actual values + price_predictions = self.postprocess_price(price_pred[0], timeframe_idx=0) + + # Process extrema predictions + extrema_predictions = extrema_pred[0].cpu().numpy() + + # Reshape extrema predictions to [num_candles, 2] (high/low probabilities for each candle) + extrema_predictions = extrema_predictions.reshape(num_candles, 2) return price_predictions, extrema_predictions - def train_on_new_data(self, price_history, volume_history=None, optimizer=None, epochs=5, extrema_weight=1.0): - """Train the model on new price and volume data with focus on extrema""" - # Convert to numpy array if it's not already - if isinstance(price_history, list): - price_history = np.array(price_history, dtype=np.float32) - - if volume_history is None: - volume_history = np.ones_like(price_history) - elif isinstance(volume_history, list): - volume_history = np.array(volume_history, dtype=np.float32) - - # Ensure same length - min_len = min(len(price_history), len(volume_history)) - price_history = price_history[-min_len:] - volume_history = volume_history[-min_len:] + def update_price_predictions(self): + """Update price and extrema predictions using the multi-timeframe model""" + if not hasattr(self, 'price_predictor') or self.price_predictor is None: + return - if len(price_history) < 35: # Need enough history for training - return 0.0, 0.0 - - # Find local extrema in the price history - bottoms, tops = find_local_extrema(price_history, window=5) + # Check if we have price and volume data + if len(self.features['price']) == 0 or len(self.features['volume']) == 0: + logger.warning("No price or volume data available for price predictions") + return - # Create extrema labels (0: normal, 1: bottom/buy, 2: top/sell) - extrema_labels = np.zeros(len(price_history)) - for idx in bottoms: - if 0 <= idx < len(extrema_labels): - extrema_labels[idx] = 1 # Bottom/Buy - for idx in tops: - if 0 <= idx < len(extrema_labels): - extrema_labels[idx] = 2 # Top/Sell - - # Preprocess data - scaled_data = self.preprocess(price_history, volume_history) + # Update timeframe data + self.timeframe_data['1m']['prices'] = self.features['price'].copy() + self.timeframe_data['1m']['volumes'] = self.features['volume'].copy() - # Create sequences and targets - sequences = [] - price_targets = [] - extrema_targets = [] + # Update higher timeframes if we have enough data + if len(self.features['price']) >= 15: + # Update 5-minute data + self.timeframe_data['5m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 5)], + 'volumes': [sum(self.features['volume'][i:i+5]) for i in range(0, len(self.features['volume']), 5) if i+5 <= len(self.features['volume'])] + } + + # Update 15-minute data + self.timeframe_data['15m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 15)], + 'volumes': [sum(self.features['volume'][i:i+15]) for i in range(0, len(self.features['volume']), 15) if i+15 <= len(self.features['volume'])] + } - for i in range(len(scaled_data) - 35): - # Sequence: 30 time steps of price and volume - seq = scaled_data[i:i+30] - - # Price target: next 5 time steps - price_target = scaled_data[i+30:i+35, 0].flatten() # Only price column - - # Extrema target: binary indicators for next 5 time steps - # For each of the next 5 candles, we predict [p_low, p_high] - extrema_target = np.zeros(5 * 2) - for j in range(5): - idx = i + 30 + j - if idx < len(extrema_labels): - if extrema_labels[idx] == 1: # Bottom/Buy - extrema_target[j*2] = 1.0 # p_low - elif extrema_labels[idx] == 2: # Top/Sell - extrema_target[j*2 + 1] = 1.0 # p_high - - sequences.append(seq) - price_targets.append(price_target) - extrema_targets.append(extrema_target) + # Prepare multi-timeframe data + timeframe_prices = [] + timeframe_volumes = [] + timeframe_names = [] - if not sequences: # If no sequences were created - return 0.0, 0.0 - - # Convert to tensors - sequences_tensor = torch.FloatTensor(np.array(sequences)).to(next(self.parameters()).device) - price_targets_tensor = torch.FloatTensor(np.array(price_targets)).to(next(self.parameters()).device) - extrema_targets_tensor = torch.FloatTensor(np.array(extrema_targets)).to(next(self.parameters()).device) + # Add 1-minute data + timeframe_prices.append(self.timeframe_data['1m']['prices']) + timeframe_volumes.append(self.timeframe_data['1m']['volumes']) + timeframe_names.append('1m') - # Create optimizer if not provided - if optimizer is None: - optimizer = optim.Adam(self.parameters(), lr=1e-3) + # Add 5-minute data if available + if len(self.timeframe_data['5m']) > 0 and len(self.timeframe_data['5m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['5m']['prices']) + timeframe_volumes.append(self.timeframe_data['5m']['volumes']) + timeframe_names.append('5m') - # Training loop - total_price_loss = 0 - total_extrema_loss = 0 + # Add 15-minute data if available + if len(self.timeframe_data['15m']) > 0 and len(self.timeframe_data['15m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['15m']['prices']) + timeframe_volumes.append(self.timeframe_data['15m']['volumes']) + timeframe_names.append('15m') - for _ in range(epochs): - # Forward pass - price_pred, extrema_pred = self(sequences_tensor) + # Get predictions + try: + price_predictions, extrema_predictions = self.price_predictor.predict_next_candles( + timeframe_prices, + timeframe_volumes, + timeframe_names + ) - # Reshape extrema predictions to match targets - extrema_pred_flat = extrema_pred.reshape(-1, 10) # 5 candles * 2 (low/high) + # Store predictions + self.price_predictions = price_predictions + self.extrema_predictions = extrema_predictions - # Calculate losses - price_loss = F.mse_loss(price_pred, price_targets_tensor) - extrema_loss = F.binary_cross_entropy(extrema_pred_flat, extrema_targets_tensor) + # Check for predicted extrema in the next few candles + self.has_predicted_low = False + self.has_predicted_high = False - # Combined loss with weighting - combined_loss = price_loss + extrema_weight * extrema_loss + # Check first 3 candles for extrema + for i in range(min(3, len(extrema_predictions))): + # Check if probability of low is high + if extrema_predictions[i, 0] > 0.7: + self.has_predicted_low = True + + # Check if probability of high is high + if extrema_predictions[i, 1] > 0.7: + self.has_predicted_high = True - # Backward pass and optimize - optimizer.zero_grad() - combined_loss.backward() - optimizer.step() + # Log predictions + logger.info(f"Price predictions: {price_predictions}") + logger.info(f"Extrema predictions: {extrema_predictions}") + logger.info(f"Predicted low: {self.has_predicted_low}, Predicted high: {self.has_predicted_high}") - total_price_loss += price_loss.item() - total_extrema_loss += extrema_loss.item() - - return total_price_loss / epochs, total_extrema_loss / epochs + return price_predictions, extrema_predictions + except Exception as e: + logger.error(f"Error in price prediction: {e}") + logger.error(traceback.format_exc()) + return None, None class TradingEnvironment: def __init__(self, initial_balance=INITIAL_BALANCE, window_size=30, demo=True, trading_client=None): @@ -467,6 +647,7 @@ class TradingEnvironment: self.max_drawdown = 0.0 self.current_step = 0 self.current_price = 0 + self.last_action = 2 # Default to HOLD (2) # Initialize trading client for live trading self.trading_client = trading_client @@ -491,7 +672,17 @@ class TradingEnvironment: # Initialize price predictor self.price_predictor = None - self.predicted_prices = np.array([]) + self.price_predictions = [] + self.extrema_predictions = [] + self.has_predicted_low = False + self.has_predicted_high = False + + # Initialize timeframe data structure + self.timeframe_data = { + '1m': {'prices': [], 'volumes': []}, + '5m': [], + '15m': [] + } # Initialize optimal trade tracking self.optimal_bottoms = [] @@ -625,7 +816,7 @@ class TradingEnvironment: asyncio.create_task(self._execute_live_action(action)) # Calculate reward (simulation still runs in parallel with live trading) - reward = self.calculate_reward(action) + reward, info = self.calculate_reward(action) # Unpack the tuple here # Check for stop loss / take profit hits self.check_sl_tp() @@ -637,7 +828,7 @@ class TradingEnvironment: # Get new state next_state = self.get_state() - return next_state, reward, done + return next_state, reward, done, info async def _execute_live_action(self, action): """Execute live trading action using the trading client""" @@ -1054,447 +1245,128 @@ class TradingEnvironment: return state.astype(np.float32) def calculate_reward(self, action): - """Calculate reward for the given action with improved penalties for losing trades""" - # Store previous balance for reward calculation - prev_balance = self.balance + """Calculate the reward for the current action.""" + reward = 0 + info = {} - # Initialize reward - reward = 0.0 + # Base reward components + pnl_reward = 0 + timing_reward = 0 + risk_reward = 0 + prediction_reward = 0 - # Process action (0: HOLD, 1: BUY/LONG, 2: SELL/SHORT, 3: CLOSE) - if action == 0: # HOLD - # Small negative reward for holding to encourage action - reward = -0.01 - - # But give positive reward for holding if we're predicting an extrema soon - if hasattr(self, 'has_predicted_low') and self.has_predicted_low and self.position == 'flat': - # Reward for waiting for a predicted bottom - reward = 0.05 - elif hasattr(self, 'has_predicted_high') and self.has_predicted_high and self.position == 'flat': - # Reward for waiting for a predicted top - reward = 0.05 - - # Check if holding a profitable position - if self.position == 'long' and self.current_price > self.entry_price: - # Reward for holding a profitable long - profit_pct = (self.current_price - self.entry_price) / self.entry_price - reward += profit_pct * 0.5 - elif self.position == 'short' and self.current_price < self.entry_price: - # Reward for holding a profitable short - profit_pct = (self.entry_price - self.current_price) / self.entry_price - reward += profit_pct * 0.5 - - elif action == 1: # BUY/LONG - if self.position == 'flat': - # Open long position - self.position = 'long' - self.entry_price = self.current_price - self.entry_index = self.current_step - self.position_size = self.calculate_position_size() - - # Set stop loss and take profit - self.stop_loss = self.current_price * (1 - STOP_LOSS_PERCENT / 100) - self.take_profit = self.current_price * (1 + TAKE_PROFIT_PERCENT / 100) - - # Calculate entry quality - entry_quality = self.evaluate_entry_quality('long') - - # Base reward on entry quality - reward = entry_quality - - # Additional reward if buying at a predicted low - if hasattr(self, 'has_predicted_low') and self.has_predicted_low: - reward += 0.5 - logger.info("Buying at predicted low point - additional reward") - - # Penalize buying at a predicted high - if hasattr(self, 'has_predicted_high') and self.has_predicted_high: - reward -= 0.5 - logger.info("Buying at predicted high point - penalty applied") - - logger.info(f"OPENED long at {self.current_price} | Size: {self.position_size:.2f} | Entry quality: {entry_quality:.2f}") - - elif self.position == 'short': - # Close short position - price_diff = self.entry_price - self.current_price - pnl_percent = price_diff / self.entry_price * 100 - pnl_dollar = pnl_percent / 100 * self.position_size - - # Apply fees - pnl_dollar -= self.calculate_fees(self.position_size) - - # Update balance - self.balance += pnl_dollar - self.total_pnl += pnl_dollar - self.episode_pnl += pnl_dollar - - # Update max drawdown - if self.balance < self.peak_balance: - current_drawdown = (self.peak_balance - self.balance) / self.peak_balance * 100 - self.max_drawdown = max(self.max_drawdown, current_drawdown) - elif self.balance > self.peak_balance: - self.peak_balance = self.balance - - # Record trade - self.trades.append({ - 'type': 'short', - 'entry': self.entry_price, - 'exit': self.current_price, - 'entry_time': self.data[self.entry_index]['timestamp'], - 'exit_time': self.data[self.current_step]['timestamp'], - 'pnl_percent': pnl_percent, - 'pnl_dollar': pnl_dollar, - 'duration': self.current_step - self.entry_index, - 'market_direction': self.get_market_direction(), - 'reason': 'switch_to_long' - }) - - # Reward based on PnL - if pnl_dollar > 0: - reward = 2.0 + pnl_dollar * 0.5 # Increased positive reward for profit - self.win_count += 1 - else: - reward = -2.0 - abs(pnl_dollar) * 0.3 # Stronger negative reward for loss - self.loss_count += 1 - - logger.info(f"CLOSED short at {self.current_price} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") - - # Open new long position - self.position = 'long' - self.entry_price = self.current_price - self.entry_index = self.current_step - self.position_size = self.calculate_position_size() - - # Set stop loss and take profit - self.stop_loss = self.current_price * (1 - STOP_LOSS_PERCENT / 100) - self.take_profit = self.current_price * (1 + TAKE_PROFIT_PERCENT / 100) - - logger.info(f"OPENED long at {self.current_price} | Size: {self.position_size:.2f}") - - elif action == 2: # SELL/SHORT - if self.position == 'flat': - # Open short position - self.position = 'short' - self.entry_price = self.current_price - self.entry_index = self.current_step - self.position_size = self.calculate_position_size() - - # Set stop loss and take profit - self.stop_loss = self.current_price * (1 + STOP_LOSS_PERCENT / 100) - self.take_profit = self.current_price * (1 - TAKE_PROFIT_PERCENT / 100) - - # Calculate entry quality - entry_quality = self.evaluate_entry_quality('short') - - # Base reward on entry quality - reward = entry_quality - - # Additional reward if selling at a predicted high - if hasattr(self, 'has_predicted_high') and self.has_predicted_high: - reward += 0.5 - logger.info("Selling at predicted high point - additional reward") - - # Penalize selling at a predicted low - if hasattr(self, 'has_predicted_low') and self.has_predicted_low: - reward -= 0.5 - logger.info("Selling at predicted low point - penalty applied") - - logger.info(f"OPENED short at {self.current_price} | Size: {self.position_size:.2f} | Entry quality: {entry_quality:.2f}") - - elif self.position == 'long': - # Close long position - price_diff = self.current_price - self.entry_price - pnl_percent = price_diff / self.entry_price * 100 - pnl_dollar = pnl_percent / 100 * self.position_size - - # Apply fees - pnl_dollar -= self.calculate_fees(self.position_size) - - # Update balance - self.balance += pnl_dollar - self.total_pnl += pnl_dollar - self.episode_pnl += pnl_dollar - - # Update max drawdown - if self.balance < self.peak_balance: - current_drawdown = (self.peak_balance - self.balance) / self.peak_balance * 100 - self.max_drawdown = max(self.max_drawdown, current_drawdown) - elif self.balance > self.peak_balance: - self.peak_balance = self.balance - - # Record trade - self.trades.append({ - 'type': 'long', - 'entry': self.entry_price, - 'exit': self.current_price, - 'entry_time': self.data[self.entry_index]['timestamp'], - 'exit_time': self.data[self.current_step]['timestamp'], - 'pnl_percent': pnl_percent, - 'pnl_dollar': pnl_dollar, - 'duration': self.current_step - self.entry_index, - 'market_direction': self.get_market_direction(), - 'reason': 'switch_to_short' - }) - - # Reward based on PnL - if pnl_dollar > 0: - reward = 2.0 + pnl_dollar * 0.5 # Increased positive reward for profit - self.win_count += 1 - - # Extra reward for closing at a predicted high - if hasattr(self, 'has_predicted_high') and self.has_predicted_high: - reward += 1.0 - logger.info("Closing long at predicted high - additional reward") - else: - reward = -2.0 - abs(pnl_dollar) * 0.3 # Stronger negative reward for loss - self.loss_count += 1 - - logger.info(f"CLOSED long at {self.current_price} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") - - # Open new short position - self.position = 'short' - self.entry_price = self.current_price - self.entry_index = self.current_step - self.position_size = self.calculate_position_size() - - # Set stop loss and take profit - self.stop_loss = self.current_price * (1 + STOP_LOSS_PERCENT / 100) - self.take_profit = self.current_price * (1 - TAKE_PROFIT_PERCENT / 100) - - logger.info(f"OPENED short at {self.current_price} | Size: {self.position_size:.2f}") - - elif action == 3: # CLOSE - if self.position == 'long': - # Close long position - price_diff = self.current_price - self.entry_price - pnl_percent = price_diff / self.entry_price * 100 - pnl_dollar = pnl_percent / 100 * self.position_size - - # Apply fees - pnl_dollar -= self.calculate_fees(self.position_size) - - # Update balance - self.balance += pnl_dollar - self.total_pnl += pnl_dollar - self.episode_pnl += pnl_dollar - - # Update max drawdown - if self.balance < self.peak_balance: - current_drawdown = (self.peak_balance - self.balance) / self.peak_balance * 100 - self.max_drawdown = max(self.max_drawdown, current_drawdown) - elif self.balance > self.peak_balance: - self.peak_balance = self.balance - - # Record trade - self.trades.append({ - 'type': 'long', - 'entry': self.entry_price, - 'exit': self.current_price, - 'entry_time': self.data[self.entry_index]['timestamp'], - 'exit_time': self.data[self.current_step]['timestamp'], - 'pnl_percent': pnl_percent, - 'pnl_dollar': pnl_dollar, - 'duration': self.current_step - self.entry_index, - 'market_direction': self.get_market_direction(), - 'reason': 'manual_close' - }) - - # Reward based on PnL - if pnl_dollar > 0: - reward = 2.0 + pnl_dollar * 0.5 # Increased positive reward for profit - self.win_count += 1 - - # Extra reward for closing at a predicted high - if hasattr(self, 'has_predicted_high') and self.has_predicted_high: - reward += 1.0 - logger.info("Closing long at predicted high - additional reward") - else: - reward = -2.0 - abs(pnl_dollar) * 0.3 # Stronger negative reward for loss - self.loss_count += 1 - - logger.info(f"CLOSED long at {self.current_price} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") - - # Reset position - self.position = 'flat' - self.entry_price = 0 - self.entry_index = 0 - self.position_size = 0 - self.stop_loss = 0 - self.take_profit = 0 - - elif self.position == 'short': - # Close short position - price_diff = self.entry_price - self.current_price - pnl_percent = price_diff / self.entry_price * 100 - pnl_dollar = pnl_percent / 100 * self.position_size - - # Apply fees - pnl_dollar -= self.calculate_fees(self.position_size) - - # Update balance - self.balance += pnl_dollar - self.total_pnl += pnl_dollar - self.episode_pnl += pnl_dollar - - # Update max drawdown - if self.balance < self.peak_balance: - current_drawdown = (self.peak_balance - self.balance) / self.peak_balance * 100 - self.max_drawdown = max(self.max_drawdown, current_drawdown) - elif self.balance > self.peak_balance: - self.peak_balance = self.balance - - # Record trade - self.trades.append({ - 'type': 'short', - 'entry': self.entry_price, - 'exit': self.current_price, - 'entry_time': self.data[self.entry_index]['timestamp'], - 'exit_time': self.data[self.current_step]['timestamp'], - 'pnl_percent': pnl_percent, - 'pnl_dollar': pnl_dollar, - 'duration': self.current_step - self.entry_index, - 'market_direction': self.get_market_direction(), - 'reason': 'manual_close' - }) - - # Reward based on PnL - if pnl_dollar > 0: - reward = 2.0 + pnl_dollar * 0.5 # Increased positive reward for profit - self.win_count += 1 - - # Extra reward for closing at a predicted low - if hasattr(self, 'has_predicted_low') and self.has_predicted_low: - reward += 1.0 - logger.info("Closing short at predicted low - additional reward") - else: - reward = -2.0 - abs(pnl_dollar) * 0.3 # Stronger negative reward for loss - self.loss_count += 1 - - logger.info(f"CLOSED short at {self.current_price} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") - - # Reset position - self.position = 'flat' - self.entry_price = 0 - self.entry_index = 0 - self.position_size = 0 - self.stop_loss = 0 - self.take_profit = 0 - - # Add reward based on direct PnL change - balance_change = self.balance - prev_balance - if balance_change > 0: - reward += balance_change * 1.0 # Increased positive reward for making money - else: - reward += balance_change * 2.0 # Stronger negative reward for losing money - - # Add reward for predicted price movement alignment - if hasattr(self, 'predicted_prices') and len(self.predicted_prices) > 0: - # Compare the first prediction with actual price - if len(self.data) > 1: - actual_price = self.data[-1]['close'] - predicted_price = self.predicted_prices[0] - prediction_error = abs(predicted_price - actual_price) / actual_price - - # Reward accurate predictions, penalize bad ones - if prediction_error < 0.005: # Less than 0.5% error - reward += 0.2 - elif prediction_error > 0.02: # More than 2% error - reward -= 0.2 - - # Add reward/penalty based on market trend alignment + # Get current market state + current_price = self.current_price + recent_volatility = self.get_recent_volatility() market_direction = self.get_market_direction() - if self.position == 'long' and market_direction == 'uptrend': - reward += 0.1 # Reward for being long in uptrend - elif self.position == 'short' and market_direction == 'downtrend': - reward += 0.1 # Reward for being short in downtrend - elif self.position == 'long' and market_direction == 'downtrend': - reward -= 0.1 # Penalty for being long in downtrend - elif self.position == 'short' and market_direction == 'uptrend': - reward -= 0.1 # Penalty for being short in uptrend + # Calculate PnL-based reward + if self.position != 'flat' and self.last_action != action: + # Position is being closed + if self.position == 'long': + pnl = current_price - self.entry_price + else: # short + pnl = self.entry_price - current_price + + # Normalize PnL by recent volatility to make rewards more consistent + if recent_volatility > 0: + normalized_pnl = pnl / recent_volatility + else: + normalized_pnl = pnl * 100 # Fallback if volatility is zero + + pnl_reward = normalized_pnl * 100 # Scale for better learning signal - # Add reward for trading with volume - if action in [1, 2] and self.position != 'flat': # Opening a position - current_volume = self.data[self.current_step]['volume'] - avg_volume = np.mean([candle['volume'] for candle in self.data[max(0, self.current_step-10):self.current_step]]) + # Add timing reward based on entry quality + entry_quality = self.evaluate_entry_quality(self.position) + timing_reward = entry_quality * 50 # Scale timing reward - if current_volume > avg_volume * 1.5: - # Trading with high volume - reward += 0.2 - logger.info("Trading with high volume - additional reward") + # Add risk-adjusted reward component + position_duration = self.current_step - self.entry_index + if position_duration > 0: + risk_adjusted_return = pnl / (recent_volatility * np.sqrt(position_duration)) + risk_reward = risk_adjusted_return * 30 # Scale risk reward - return reward + # Add prediction accuracy reward if we have price predictions + if hasattr(self, 'price_predictions') and len(self.price_predictions) > 0: + # Compare the most recent prediction with actual price movement + last_prediction = self.price_predictions[-1] + actual_movement = current_price - self.features['price'][-2] if len(self.features['price']) > 1 else 0 + + # Reward for correct direction prediction + if (last_prediction > 0 and actual_movement > 0) or (last_prediction < 0 and actual_movement < 0): + prediction_reward = 10 + else: + prediction_reward = -5 + + # Action-specific rewards + if action == 0: # Buy + if market_direction > 0.5: # Strong uptrend + reward += 5 + elif market_direction < -0.5: # Strong downtrend + reward -= 10 # Penalize buying in downtrend + + elif action == 1: # Sell + if market_direction < -0.5: # Strong downtrend + reward += 5 + elif market_direction > 0.5: # Strong uptrend + reward -= 10 # Penalize selling in uptrend + + elif action == 2: # Hold + # Small positive reward for holding during low volatility + if recent_volatility < 0.001: + reward += 1 + + # Combine all reward components + reward += pnl_reward + timing_reward + risk_reward + prediction_reward + + # Log components for analysis + info = { + 'pnl_reward': pnl_reward, + 'timing_reward': timing_reward, + 'risk_reward': risk_reward, + 'prediction_reward': prediction_reward, + 'total_reward': reward + } + + return reward, info # Return both the reward and the info dictionary def evaluate_entry_quality(self, position_type): - """Evaluate the quality of an entry point based on technical indicators""" - score = 0 + """Evaluate how good the entry timing was based on local extrema.""" + if len(self.features['price']) < 10: + return 0 - # Get current indicators - rsi = self.features['rsi'][-1] if len(self.features['rsi']) > 0 else 50 - macd = self.features['macd'][-1] if len(self.features['macd']) > 0 else 0 - macd_signal = self.features['macd_signal'][-1] if len(self.features['macd_signal']) > 0 else 0 - stoch_k = self.features['stoch_k'][-1] if len(self.features['stoch_k']) > 0 else 50 - stoch_d = self.features['stoch_d'][-1] if len(self.features['stoch_d']) > 0 else 50 + # Get recent price window + recent_prices = self.features['price'][-10:] if position_type == 'long': - # RSI oversold condition (good for long) - if rsi < 30: - score += 0.5 - elif rsi < 40: - score += 0.2 - elif rsi > 70: - score -= 0.5 # Overbought, bad for long + # For long positions, check if we bought near a local minimum + local_min = min(recent_prices) + entry_price = self.entry_price - # MACD crossover (bullish) - if macd > macd_signal and macd > 0: - score += 0.3 - elif macd < macd_signal and macd < 0: - score -= 0.3 - - # Stochastic oversold - if stoch_k < 20 and stoch_d < 20: - score += 0.3 - elif stoch_k > 80 and stoch_d > 80: - score -= 0.3 + # Calculate how close we are to the local minimum (0 to 1 scale) + price_range = max(recent_prices) - local_min + if price_range > 0: + entry_quality = 1 - (entry_price - local_min) / price_range + else: + entry_quality = 0.5 # Neutral if no range + + return entry_quality elif position_type == 'short': - # RSI overbought condition (good for short) - if rsi > 70: - score += 0.5 - elif rsi > 60: - score += 0.2 - elif rsi < 30: - score -= 0.5 # Oversold, bad for short + # For short positions, check if we sold near a local maximum + local_max = max(recent_prices) + entry_price = self.entry_price - # MACD crossover (bearish) - if macd < macd_signal and macd < 0: - score += 0.3 - elif macd > macd_signal and macd > 0: - score -= 0.3 - - # Stochastic overbought - if stoch_k > 80 and stoch_d > 80: - score += 0.3 - elif stoch_k < 20 and stoch_d < 20: - score -= 0.3 + # Calculate how close we are to the local maximum (0 to 1 scale) + price_range = local_max - min(recent_prices) + if price_range > 0: + entry_quality = 1 - (local_max - entry_price) / price_range + else: + entry_quality = 0.5 # Neutral if no range + + return entry_quality - # Check price relative to moving averages - if len(self.features['ema_9']) > 0 and len(self.features['ema_21']) > 0: - ema_9 = self.features['ema_9'][-1] - ema_21 = self.features['ema_21'][-1] - - if position_type == 'long': - if self.current_price > ema_9 > ema_21: # Strong uptrend - score += 0.4 - elif self.current_price < ema_9 < ema_21: # Strong downtrend - score -= 0.4 - elif position_type == 'short': - if self.current_price < ema_9 < ema_21: # Strong downtrend - score += 0.4 - elif self.current_price > ema_9 > ema_21: # Strong uptrend - score -= 0.4 - - return score + return 0 # No position def get_recent_volatility(self): """Calculate recent price volatility""" @@ -1535,49 +1407,34 @@ class TradingEnvironment: return short_ema > long_ema def get_market_direction(self): - """Determine the current market direction (uptrend, downtrend, or sideways)""" + """Get the current market direction as a numeric value between -1 and 1""" if len(self.features['price']) < 20: - return 'unknown' + return 0.0 # Neutral if not enough data - # Use EMAs to determine trend + # Use EMA to determine trend if len(self.features['ema_9']) > 0 and len(self.features['ema_21']) > 0: - ema_9 = self.features['ema_9'][-5:] - ema_21 = self.features['ema_21'][-5:] - price = self.features['price'][-5:] + short_ema = self.features['ema_9'][-1] + long_ema = self.features['ema_21'][-1] - # Check if price is above/below EMAs - price_above_ema9 = sum(p > e for p, e in zip(price, ema_9)) - price_above_ema21 = sum(p > e for p, e in zip(price, ema_21)) - ema9_above_ema21 = sum(e9 > e21 for e9, e21 in zip(ema_9, ema_21)) + # Calculate trend strength + if short_ema > long_ema: + # Uptrend - strength based on percentage difference + strength = min((short_ema / long_ema - 1) * 10, 1.0) + return strength # Value between 0 and 1 + else: + # Downtrend - strength based on percentage difference + strength = min((long_ema / short_ema - 1) * 10, 1.0) + return -strength # Value between -1 and 0 + + # Fallback to price-based trend detection + if len(self.features['price']) >= 20: + recent_prices = self.features['price'][-20:] + price_change = (recent_prices[-1] / recent_prices[0]) - 1 - # Strong uptrend: price > EMA9 > EMA21 - if price_above_ema9 >= 4 and price_above_ema21 >= 4 and ema9_above_ema21 >= 4: - return 'uptrend' - - # Strong downtrend: price < EMA9 < EMA21 - elif price_above_ema9 <= 1 and price_above_ema21 <= 1 and ema9_above_ema21 <= 1: - return 'downtrend' + # Normalize to a value between -1 and 1 + return max(min(price_change * 10, 1.0), -1.0) - # Check price action - price_data = self.features['price'][-20:] - price_change = (price_data[-1] / price_data[0] - 1) * 100 - - if price_change > 1.0: - return 'uptrend' - elif price_change < -1.0: - return 'downtrend' - - # Check RSI for trend confirmation - if len(self.features['rsi']) > 0: - rsi = self.features['rsi'][-5:] - avg_rsi = sum(rsi) / len(rsi) - - if avg_rsi > 60: - return 'uptrend' - elif avg_rsi < 40: - return 'downtrend' - - return 'sideways' + return 0.0 # Neutral if no data def analyze_trades(self): """Analyze completed trades to identify patterns""" @@ -1615,78 +1472,291 @@ class TradingEnvironment: return analysis def initialize_price_predictor(self, device="cpu"): - """Initialize the price prediction model""" - # Only create a new model if one doesn't already exist - if not hasattr(self, 'price_predictor') or self.price_predictor is None: - self.price_predictor = PricePredictionModel(input_size=30, hidden_size=128, output_size=5) - self.price_predictor.to(device) - self.price_predictor_optimizer = optim.Adam(self.price_predictor.parameters(), lr=1e-3) - else: - # If model exists, just ensure it's on the right device - self.price_predictor.to(device) + """Initialize the price prediction model with multi-timeframe support""" + # Create the price prediction model + self.price_predictor = PricePredictionModel( + input_size=2, # Price and volume + hidden_size=256, + output_size=5, # Predict 5 future candles + num_layers=3, + num_timeframes=3 # Support for multiple timeframes + ).to(device) + + # Check if we have price and volume data + if len(self.features['price']) == 0 or len(self.features['volume']) == 0: + logger.warning("No price or volume data available for price predictor initialization") + return self.price_predictor + + # Initialize timeframes data + self.timeframe_data = { + '1m': {'prices': self.features['price'].copy(), 'volumes': self.features['volume'].copy()}, + '5m': [], + '15m': [] + } + + # Create resampled data for higher timeframes + if len(self.features['price']) >= 15: + # Create 5-minute data (resample every 5 candles) + self.timeframe_data['5m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 5)], + 'volumes': [sum(self.features['volume'][i:i+5]) for i in range(0, len(self.features['volume']), 5) if i+5 <= len(self.features['volume'])] + } - self.predicted_prices = np.array([]) - self.predicted_extrema = np.array([]) - self.extrema_threshold = 0.7 # Threshold for extrema prediction confidence + # Create 15-minute data (resample every 15 candles) + self.timeframe_data['15m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 15)], + 'volumes': [sum(self.features['volume'][i:i+15]) for i in range(0, len(self.features['volume']), 15) if i+15 <= len(self.features['volume'])] + } + + # Initialize predictions + self.price_predictions = [] + self.extrema_predictions = [] + + logger.info(f"Price predictor initialized with {sum(p.numel() for p in self.price_predictor.parameters())} parameters") + return self.price_predictor def train_price_predictor(self): - """Train the price prediction model on recent data with focus on extrema""" - if len(self.features['price']) < 35: - return 0.0, 0.0 + """Train the price prediction model on historical data with multi-timeframe support""" + if not hasattr(self, 'price_predictor') or self.price_predictor is None: + self.initialize_price_predictor() - # Get price and volume history - price_history = self.features['price'] - volume_history = self.features['volume'] + # Need enough data for all timeframes + if len(self.features['price']) < 30: + logger.warning("Not enough data to train price predictor (need at least 30 candles)") + return False - # Train the model with emphasis on extrema prediction - price_loss, extrema_loss = self.price_predictor.train_on_new_data( - price_history, - volume_history, - self.price_predictor_optimizer, - epochs=5, - extrema_weight=1.5 # Give more weight to extrema prediction - ) + try: + # Create optimizer if not already created + if not hasattr(self, 'price_predictor_optimizer'): + self.price_predictor_optimizer = optim.Adam(self.price_predictor.parameters(), lr=0.001) + + # Prepare multi-timeframe data + timeframe_prices = [] + timeframe_volumes = [] + timeframe_names = [] + + # Add 1-minute data + timeframe_prices.append(self.timeframe_data['1m']['prices']) + timeframe_volumes.append(self.timeframe_data['1m']['volumes']) + timeframe_names.append('1m') + + # Add 5-minute data if available + if '5m' in self.timeframe_data and len(self.timeframe_data['5m']) > 0 and len(self.timeframe_data['5m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['5m']['prices']) + timeframe_volumes.append(self.timeframe_data['5m']['volumes']) + timeframe_names.append('5m') + + # Add 15-minute data if available + if '15m' in self.timeframe_data and len(self.timeframe_data['15m']) > 0 and len(self.timeframe_data['15m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['15m']['prices']) + timeframe_volumes.append(self.timeframe_data['15m']['volumes']) + timeframe_names.append('15m') + + # Ensure we have at least one timeframe + if len(timeframe_prices) == 0: + logger.warning("No timeframe data available for training") + return False + + # Preprocess data for training + processed_data = self.price_predictor.preprocess(timeframe_prices, timeframe_volumes, timeframe_names) + + # Get the device that the model is on + device = next(self.price_predictor.parameters()).device + + # Create targets for each timeframe (next 5 candles) + targets = [] + for i, prices in enumerate(timeframe_prices): + if len(prices) > 5: + # Get the next 5 candles as targets + target_prices = np.array(prices[1:6]) + + # Scale targets using the same scaler + price_scaler = getattr(self.price_predictor, f'price_scaler_{i}', self.price_predictor.price_scaler) + target_prices = price_scaler.transform(target_prices.reshape(-1, 1)).flatten() + + # Create tensor and move to the same device as the model + target_tensor = torch.FloatTensor(target_prices).unsqueeze(0).to(device) + targets.append(target_tensor) + + # If we don't have targets, return + if len(targets) == 0: + logger.warning("No target data available for training") + return False + + # Find local extrema for extrema prediction training + extrema_targets = [] + for i, prices in enumerate(timeframe_prices): + if len(prices) > 5: + # Find peaks and troughs in the next 5 candles + peaks, troughs = find_local_extrema(prices[:10], window=2) + + # Create binary targets for each of the next 5 candles + extrema_target = np.zeros((1, 5, 2)) # [batch, time_steps, (low, high)] + + for j in range(5): + if j+1 in troughs: # +1 because we're looking at future candles + extrema_target[0, j, 0] = 1.0 # Low point + if j+1 in peaks: # +1 because we're looking at future candles + extrema_target[0, j, 1] = 1.0 # High point + + # Move to the same device as the model + extrema_tensor = torch.FloatTensor(extrema_target.reshape(1, -1)).to(device) + extrema_targets.append(extrema_tensor) + + # If we don't have extrema targets, create empty ones + if len(extrema_targets) == 0: + for _ in range(len(targets)): + extrema_targets.append(torch.zeros(1, 10).to(device)) # 5 candles * 2 (low/high) + + # Train for a few epochs + epochs = 5 + total_loss = 0 + + for epoch in range(epochs): + # Zero gradients + self.price_predictor_optimizer.zero_grad() + + # Forward pass + price_preds, extrema_preds = self.price_predictor(processed_data) + + # Calculate loss for each timeframe + price_loss = 0 + extrema_loss = 0 + + for i in range(len(targets)): + if i < len(price_preds): + # Price prediction loss - ensure shapes match + price_pred = price_preds[i] + price_target = targets[i] + + # Make sure both have the same shape + if price_target.shape != price_pred.shape: + if len(price_target.shape) > len(price_pred.shape): + price_target = price_target.squeeze(0) + elif len(price_pred.shape) > len(price_target.shape): + price_pred = price_pred.squeeze(0) + + price_loss += F.mse_loss(price_pred, price_target) + + # Extrema prediction loss - ensure shapes match + extrema_target = extrema_targets[i] + extrema_pred = extrema_preds[i] + + # Make sure both have the same shape + if extrema_target.shape != extrema_pred.shape: + if len(extrema_target.shape) > len(extrema_pred.shape): + extrema_target = extrema_target.squeeze(0) + elif len(extrema_pred.shape) > len(extrema_target.shape): + extrema_pred = extrema_pred.squeeze(0) + + extrema_loss += F.binary_cross_entropy(extrema_pred, extrema_target) + + # Combined loss + loss = price_loss + 0.5 * extrema_loss + + # Backward pass and optimize + loss.backward() + self.price_predictor_optimizer.step() + + total_loss += loss.item() + + # Update predictions + self.update_price_predictions() + + logger.info(f"Price predictor trained for {epochs} epochs, avg loss: {total_loss/epochs:.6f}") + return total_loss / epochs - logger.info(f"Price predictor training - Price loss: {price_loss:.6f}, Extrema loss: {extrema_loss:.6f}") - - return price_loss, extrema_loss + except Exception as e: + logger.error(f"Error training price predictor: {e}") + logger.error(traceback.format_exc()) + return False def update_price_predictions(self): - """Update price and extrema predictions""" - if len(self.features['price']) < 30: - self.predicted_prices = np.array([]) - self.predicted_extrema = np.array([]) + """Update price and extrema predictions using the multi-timeframe model""" + if not hasattr(self, 'price_predictor') or self.price_predictor is None: return - # Get price and volume history - price_history = self.features['price'] - volume_history = self.features['volume'] + # Check if we have price and volume data + if len(self.features['price']) == 0 or len(self.features['volume']) == 0: + logger.warning("No price or volume data available for price predictions") + return + + # Update timeframe data + self.timeframe_data['1m']['prices'] = self.features['price'].copy() + self.timeframe_data['1m']['volumes'] = self.features['volume'].copy() + + # Update higher timeframes if we have enough data + if len(self.features['price']) >= 15: + # Update 5-minute data + self.timeframe_data['5m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 5)], + 'volumes': [sum(self.features['volume'][i:i+5]) for i in range(0, len(self.features['volume']), 5) if i+5 <= len(self.features['volume'])] + } + + # Update 15-minute data + self.timeframe_data['15m'] = { + 'prices': [self.features['price'][i] for i in range(0, len(self.features['price']), 15)], + 'volumes': [sum(self.features['volume'][i:i+15]) for i in range(0, len(self.features['volume']), 15) if i+15 <= len(self.features['volume'])] + } + + # Prepare multi-timeframe data + timeframe_prices = [] + timeframe_volumes = [] + timeframe_names = [] + + # Add 1-minute data + timeframe_prices.append(self.timeframe_data['1m']['prices']) + timeframe_volumes.append(self.timeframe_data['1m']['volumes']) + timeframe_names.append('1m') + + # Add 5-minute data if available + if len(self.timeframe_data['5m']) > 0 and len(self.timeframe_data['5m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['5m']['prices']) + timeframe_volumes.append(self.timeframe_data['5m']['volumes']) + timeframe_names.append('5m') + + # Add 15-minute data if available + if len(self.timeframe_data['15m']) > 0 and len(self.timeframe_data['15m']['prices']) > 0: + timeframe_prices.append(self.timeframe_data['15m']['prices']) + timeframe_volumes.append(self.timeframe_data['15m']['volumes']) + timeframe_names.append('15m') # Get predictions - self.predicted_prices, self.predicted_extrema = self.price_predictor.predict_next_candles( - price_history, - volume_history, - num_candles=5 - ) - - # Log predictions - logger.info(f"Predicted prices for next 5 candles: {self.predicted_prices}") - - # Identify predicted extrema points - predicted_lows = [] - predicted_highs = [] - - for i, (p_low, p_high) in enumerate(self.predicted_extrema): - if p_low > self.extrema_threshold: - predicted_lows.append(i) - logger.info(f"Predicted low at candle +{i+1} with confidence {p_low:.2f}") - if p_high > self.extrema_threshold: - predicted_highs.append(i) - logger.info(f"Predicted high at candle +{i+1} with confidence {p_high:.2f}") - - # Store predicted extrema indices - self.predicted_lows = predicted_lows - self.predicted_highs = predicted_highs + try: + price_predictions, extrema_predictions = self.price_predictor.predict_next_candles( + timeframe_prices, + timeframe_volumes, + timeframe_names + ) + + # Store predictions + self.price_predictions = price_predictions + self.extrema_predictions = extrema_predictions + + # Check for predicted extrema in the next few candles + self.has_predicted_low = False + self.has_predicted_high = False + + # Check first 3 candles for extrema + for i in range(min(3, len(extrema_predictions))): + # Check if probability of low is high + if extrema_predictions[i, 0] > 0.7: + self.has_predicted_low = True + + # Check if probability of high is high + if extrema_predictions[i, 1] > 0.7: + self.has_predicted_high = True + + # Log predictions + logger.info(f"Price predictions: {price_predictions}") + logger.info(f"Extrema predictions: {extrema_predictions}") + logger.info(f"Predicted low: {self.has_predicted_low}, Predicted high: {self.has_predicted_high}") + + return price_predictions, extrema_predictions + except Exception as e: + logger.error(f"Error in price prediction: {e}") + logger.error(traceback.format_exc()) + return None, None def identify_optimal_trades(self): """Identify optimal entry and exit points based on local extrema and volume""" @@ -1795,60 +1865,45 @@ def get_device(device_preference='auto'): # Update Agent class to use GPU properly class Agent: def __init__(self, state_size, action_size, hidden_size=256, lstm_layers=2, attention_heads=4, device=None): - """Initialize the agent with the policy and target networks""" + # Set device + self.device = device if device is not None else get_device() + + # Model parameters self.state_size = state_size self.action_size = action_size - - # Set device (GPU or CPU) - if device is None: - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - else: - self.device = device + self.hidden_size = hidden_size + self.lstm_layers = lstm_layers # Initialize networks self.policy_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(self.device) self.target_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(self.device) - ensure_model_float32(self.policy_net) - ensure_model_float32(self.target_net) self.target_net.load_state_dict(self.policy_net.state_dict()) - self.target_net.eval() + self.target_net.eval() # Set target network to evaluation mode - # Initialize optimizer with weight decay for regularization - self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE, weight_decay=1e-5) + # Optimizer + self.optimizer = optim.Adam(self.policy_net.parameters(), lr=0.0005) - # Initialize experience replay memory - self.memory = ReplayMemory(MEMORY_SIZE) + # Replay memory with prioritized experience replay + self.memory = ReplayMemory(capacity=100000, alpha=0.6, beta=0.4, n_step=3, gamma=0.99) - # Initialize steps counter + # Exploration parameters + self.eps_start = 1.0 + self.eps_end = 0.05 + self.eps_decay = 0.9995 + self.epsilon = self.eps_start + + # Learning parameters + self.gamma = 0.99 + self.batch_size = 64 + self.target_update = 1000 # Update target network every N steps + + # Training tracking self.steps_done = 0 + self.episodes_done = 0 + self.market_regime = None # Current detected market regime - # Initialize epsilon for exploration - self.epsilon = EPSILON_START - self.epsilon_start = EPSILON_START - self.epsilon_end = EPSILON_END - self.epsilon_decay = EPSILON_DECAY - - # Initialize mixed precision scaler with the new format - if self.device.type == "cuda": - self.scaler = amp.GradScaler('cuda') - else: - self.scaler = amp.GradScaler('cpu') - - # Initialize TensorBoard writer - self.writer = SummaryWriter(f'runs/trading_agent_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}') - - # Create models directory if it doesn't exist - os.makedirs("models", exist_ok=True) - - # Use pinned memory for faster CPU-to-GPU transfers - if self.device.type == "cuda": - self.use_pinned_memory = True - else: - self.use_pinned_memory = False - - # Ensure models are using float32 - self.policy_net.float() - self.target_net.float() + # LSTM hidden state + self.hidden = None def expand_model(self, new_state_size, new_hidden_size=512, new_lstm_layers=3, new_attention_heads=8): """Expand the model to handle more features or increase capacity""" @@ -1900,132 +1955,98 @@ class Agent: return True def select_action(self, state, training=True): - """Select an action using epsilon-greedy policy""" - sample = random.random() - eps_threshold = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \ - math.exp(-1. * self.steps_done / self.epsilon_decay) + """Select an action using epsilon-greedy policy with market regime awareness""" + # Update epsilon + self.epsilon = max(self.eps_end, self.epsilon * self.eps_decay) - if training: - self.epsilon = eps_threshold - else: - self.epsilon = 0.0 # No exploration during evaluation/live trading + # Convert state to tensor + state = torch.FloatTensor(state).to(self.device) + + # Add batch dimension if needed + if state.dim() == 1: + state = state.unsqueeze(0) + + # Epsilon-greedy action selection + if training and random.random() < self.epsilon: + # Random action + action = random.randrange(self.action_size) + # Reset hidden state when taking random actions + self.hidden = None + return action + + # Get action from policy network + with torch.no_grad(): + q_values, self.hidden, market_regime = self.policy_net(state, self.hidden) + + # Update market regime information + self.market_regime = torch.softmax(market_regime, dim=1).cpu().numpy()[0] + + # Get best action + action = q_values.max(1)[1].item() + + return action + + def learn(self): + """Update the network weights using prioritized experience replay and n-step returns""" + if len(self.memory) < self.batch_size: + return 0.0 # Not enough samples yet + + # Sample batch with priorities + batch, indices, weights = self.memory.sample(self.batch_size) + + # Convert weights to tensor + weights = torch.FloatTensor(weights).to(self.device) + + # Extract batch components + state_batch = torch.FloatTensor(np.array(batch.state)).to(self.device) + action_batch = torch.LongTensor(np.array(batch.action)).to(self.device) + reward_batch = torch.FloatTensor(np.array(batch.reward)).to(self.device) + next_state_batch = torch.FloatTensor(np.array(batch.next_state)).to(self.device) + done_batch = torch.FloatTensor(np.array(batch.done)).to(self.device) + + # Compute Q values for current states + q_values, _, _ = self.policy_net(state_batch) + q_values = q_values.gather(1, action_batch.unsqueeze(1)).squeeze(1) + + # Compute Q values for next states with Double Q-learning + with torch.no_grad(): + # Get actions from policy network + next_q_values, _, _ = self.policy_net(next_state_batch) + next_actions = next_q_values.max(1)[1].unsqueeze(1) + + # Get Q values from target network for those actions + next_target_q_values, _, _ = self.target_net(next_state_batch) + next_target_values = next_target_q_values.gather(1, next_actions).squeeze(1) + + # Zero out values for terminal states + next_target_values = next_target_values * (1 - done_batch) + + # Compute target Q values + target_q_values = reward_batch + (self.gamma ** self.memory.n_step) * next_target_values + + # Compute loss with importance sampling weights + td_errors = target_q_values - q_values + loss = (weights * (td_errors ** 2)).mean() + + # Update priorities in replay buffer + self.memory.update_priorities(indices, td_errors.detach().cpu().numpy()) + + # Optimize the model + self.optimizer.zero_grad() + loss.backward() + + # Clip gradients to prevent exploding gradients + torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0) + + self.optimizer.step() + + # Update target network periodically + if self.steps_done % self.target_update == 0: + self.update_target_network() self.steps_done += 1 - if sample > self.epsilon: - with torch.no_grad(): - # Convert state to tensor and ensure it's float32 (not double/float64) - state_tensor = torch.FloatTensor(state).to(self.device) - - # Ensure state has correct shape - if state_tensor.dim() == 1: - state_tensor = state_tensor.unsqueeze(0) - - # Get Q values - q_values = self.policy_net(state_tensor) - - # Return action with highest Q value - return q_values.max(1)[1].item() - else: - # Random action - return random.randrange(self.action_size) - - def learn(self): - """Learn from a batch of experiences""" - if len(self.memory) < BATCH_SIZE: - return None - - try: - # Sample a batch of experiences - experiences = self.memory.sample(BATCH_SIZE) - - # Convert experiences to tensors more efficiently - # First create numpy arrays, then convert to tensors - states_np = np.array([e.state for e in experiences], dtype=np.float32) # Ensure float32 - actions_np = np.array([e.action for e in experiences], dtype=np.int64) # Ensure int64 - rewards_np = np.array([e.reward for e in experiences], dtype=np.float32) # Ensure float32 - next_states_np = np.array([e.next_state for e in experiences], dtype=np.float32) # Ensure float32 - dones_np = np.array([e.done for e in experiences], dtype=np.float32) # Ensure float32 - - # Convert numpy arrays to tensors with pinned memory if using GPU - if self.use_pinned_memory: - states = torch.from_numpy(states_np).pin_memory().to(self.device, non_blocking=True) - actions = torch.from_numpy(actions_np).long().pin_memory().to(self.device, non_blocking=True) - rewards = torch.from_numpy(rewards_np).pin_memory().to(self.device, non_blocking=True) - next_states = torch.from_numpy(next_states_np).pin_memory().to(self.device, non_blocking=True) - dones = torch.from_numpy(dones_np).pin_memory().to(self.device, non_blocking=True) - else: - states = torch.FloatTensor(states_np).to(self.device) - actions = torch.LongTensor(actions_np).to(self.device) - rewards = torch.FloatTensor(rewards_np).to(self.device) - next_states = torch.FloatTensor(next_states_np).to(self.device) - dones = torch.FloatTensor(dones_np).to(self.device) - - # Use mixed precision for forward/backward passes - if self.device.type == "cuda": - with amp.autocast(device_type='cuda'): - # Compute Q values - current_q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)) - - # Compute next Q values with target network - with torch.no_grad(): - next_q_values = self.target_net(next_states).max(1)[0] - target_q_values = rewards + (GAMMA * next_q_values * (1 - dones)) - - # Reshape target values to match current_q_values - target_q_values = target_q_values.unsqueeze(1) - - # Compute loss - loss = F.smooth_l1_loss(current_q_values, target_q_values) - - # Backward pass with mixed precision - self.optimizer.zero_grad() - self.scaler.scale(loss).backward() - - # Gradient clipping to prevent exploding gradients - self.scaler.unscale_(self.optimizer) - torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=1.0) - - self.scaler.step(self.optimizer) - self.scaler.update() - else: - # Standard precision for CPU - # Compute Q values - current_q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)) - - # Compute next Q values with target network - with torch.no_grad(): - next_q_values = self.target_net(next_states).max(1)[0] - target_q_values = rewards + (GAMMA * next_q_values * (1 - dones)) - - # Reshape target values to match current_q_values - target_q_values = target_q_values.unsqueeze(1) - - # Compute loss - loss = F.smooth_l1_loss(current_q_values, target_q_values) - - # Backward pass - self.optimizer.zero_grad() - loss.backward() - - # Gradient clipping to prevent exploding gradients - torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=1.0) - - self.optimizer.step() - - # Update steps done - self.steps_done += 1 - - # Update target network - if self.steps_done % TARGET_UPDATE == 0: - self.target_net.load_state_dict(self.policy_net.state_dict()) - - return loss.item() - - except Exception as e: - logger.error(f"Error during learning: {e}") - logger.error(f"Traceback: {traceback.format_exc()}") - return None + return loss.item() def update_target_network(self): self.target_net.load_state_dict(self.policy_net.state_dict()) @@ -2123,436 +2144,216 @@ async def get_live_prices(symbol="ETH/USDT", timeframe="1m"): await asyncio.sleep(5) break -async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000, exchange=None, args=None): - """Train the agent using historical and live data with GPU acceleration""" +async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000, exchange=None, args=None, continuous=False): + """Train the agent on the environment""" + logger.info(f"Starting training for {num_episodes} episodes...") logger.info(f"Starting training on device: {agent.device}") - # Add early stopping based on performance - patience = 50 # Episodes to wait for improvement - best_pnl = -float('inf') - best_reward = -float('inf') # Initialize best_reward - best_win_rate = 0 # Initialize best_win_rate - episodes_without_improvement = 0 + # Create TensorBoard writer if not in continuous mode + if not continuous: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + agent.writer = SummaryWriter(f'runs/trading_agent_{timestamp}') - # Add adaptive learning rate - initial_lr = LEARNING_RATE - min_lr = LEARNING_RATE / 10 - - # Add curriculum learning - curriculum_stages = [ - {"episodes": 100, "risk_factor": 0.5, "exploration": 0.3}, # Conservative trading - {"episodes": 200, "risk_factor": 0.75, "exploration": 0.2}, # Moderate risk - {"episodes": 300, "risk_factor": 1.0, "exploration": 0.1}, # Normal risk - {"episodes": 400, "risk_factor": 1.25, "exploration": 0.05} # Aggressive trading - ] - current_stage = 0 - - # Initialize stats dictionary with the correct keys - stats = { - 'episode_rewards': [], - 'episode_profits': [], - 'win_rates': [], - 'trade_counts': [], - 'prediction_accuracies': [] - } - - # Create checkpoint directory if it doesn't exist - os.makedirs("checkpoints", exist_ok=True) - - # Load best model if it exists (to resume training) - best_model_path = "models/trading_agent_best_pnl.pt" - if os.path.exists(best_model_path): - try: - logger.info(f"Loading best model from {best_model_path} to resume training") - agent.load(best_model_path) - # Try to load best metrics from checkpoint file - checkpoint_info_path = "checkpoints/best_metrics.json" - if os.path.exists(checkpoint_info_path): - with open(checkpoint_info_path, 'r') as f: - best_metrics = json.load(f) - best_reward = best_metrics.get('best_reward', best_reward) - best_pnl = best_metrics.get('best_pnl', best_pnl) - best_win_rate = best_metrics.get('best_win_rate', best_win_rate) - logger.info(f"Resumed with best metrics - Reward: {best_reward:.2f}, PnL: ${best_pnl:.2f}, Win Rate: {best_win_rate:.1f}%") - except Exception as e: - logger.warning(f"Could not load best model: {e}") + # Training statistics + episode_rewards = [] + episode_lengths = [] + balances = [] + win_rates = [] + episode_pnls = [] + cumulative_pnl = [] + drawdowns = [] + prediction_accuracies = [] try: - # Initialize price predictor and attach it to the environment - price_predictor = PricePredictionModel(input_size=30, hidden_size=128, output_size=5) - price_predictor.to(agent.device) - price_predictor_optimizer = optim.Adam(price_predictor.parameters(), lr=1e-4) - - # Attach the price predictor to the environment - env.price_predictor = price_predictor - env.price_predictor_optimizer = price_predictor_optimizer - - # Initialize price predictor - env.initialize_price_predictor(device=agent.device) - logger.info("Price predictor initialized") - for episode in range(num_episodes): - try: - # Update curriculum stage if needed - if current_stage < len(curriculum_stages) - 1 and episode >= curriculum_stages[current_stage]["episodes"]: - current_stage += 1 - logger.info(f"Moving to curriculum stage {current_stage+1}: " - f"risk_factor={curriculum_stages[current_stage]['risk_factor']}, " - f"exploration={curriculum_stages[current_stage]['exploration']}") + # Reset environment + state = env.reset() + + # Initialize episode variables + episode_reward = 0 + step = 0 + done = False + + # Initialize price predictor if not already + if not hasattr(env, 'price_predictor') or env.price_predictor is None: + env.initialize_price_predictor(device=agent.device) + + # Train price predictor + train_result = env.train_price_predictor() + prediction_loss = 0.0 + if isinstance(train_result, (float, int)): + prediction_loss = train_result + logger.info(f"Price predictor training loss: {prediction_loss:.6f}") + + # Update price predictions + env.update_price_predictions() + + # Calculate prediction accuracy if we have predictions + prediction_accuracy = 0.0 + if hasattr(env, 'price_predictions') and len(env.price_predictions) > 0 and len(env.features['price']) > 5: + # Compare the last prediction with actual prices + predicted_direction = np.sign(np.diff(env.price_predictions[:2])) + actual_direction = np.sign(np.diff(env.features['price'][-5:])) - # Apply curriculum settings - risk_factor = curriculum_stages[current_stage]["risk_factor"] - exploration = curriculum_stages[current_stage]["exploration"] + # Calculate accuracy as percentage of correct direction predictions + if len(predicted_direction) > 0 and len(actual_direction) > 0: + correct_directions = sum(1 for p, a in zip(predicted_direction, actual_direction) if p == a) + prediction_accuracy = correct_directions / len(predicted_direction) * 100 + + # Episode loop + while not done and step < max_steps_per_episode: + # Select action + action = agent.select_action(state) - # Set exploration rate for this episode - agent.epsilon = exploration + # Take action + next_state, reward, done, info = env.step(action) - # Set risk factor for this episode - env.risk_factor = risk_factor + # Store transition in replay memory + agent.memory.push(state, action, reward, next_state, done) - # Update training data if exchange is available - if exchange and args and hasattr(args, 'refresh_data') and args.refresh_data: - # Fetch new data at the start of each episode - logger.info(f"Refreshing data for episode {episode}") - await env.fetch_new_data(exchange, "ETH/USDT", "1m", 100) + # Learn from experience + loss = agent.learn() - # Reset environment - state = env.reset() + # Update state and statistics + state = next_state + episode_reward += reward + step += 1 - # Initialize episode variables - episode_reward = 0 - done = False - step = 0 - - # Initialize trade analysis dictionary - trade_analysis = { - 'win_rate': 0, - 'uptrend_win_rate': 0, - 'downtrend_win_rate': 0, - 'sideways_win_rate': 0, - 'avg_win_pnl': 0, - 'avg_loss_pnl': 0, - 'max_drawdown': 0 - } - - # Train price predictor - prediction_loss, extrema_loss = env.train_price_predictor() - - # Update price predictions - env.update_price_predictions() - - # Log OHLCV data to TensorBoard at the start of the episode - if episode % 5 == 0: # Log every 5 episodes to avoid too much data - # Create a DataFrame from the environment's data - df_ohlcv = pd.DataFrame([{ - 'timestamp': candle['timestamp'], - 'open': candle['open'], - 'high': candle['high'], - 'low': candle['low'], - 'close': candle['close'], - 'volume': candle['volume'] - } for candle in env.data[-100:]]) # Use last 100 candles - - # Convert timestamp to datetime - df_ohlcv['timestamp'] = pd.to_datetime(df_ohlcv['timestamp'], unit='ms') - df_ohlcv.set_index('timestamp', inplace=True) - - # Extract buy/sell signals from trades - buy_signals = [] - sell_signals = [] - - if hasattr(env, 'trades') and env.trades: - for trade in env.trades: - if 'entry_time' in trade and 'entry' in trade: - if trade['type'] == 'long': - # Buy signal - entry_time = pd.to_datetime(trade['entry_time'], unit='ms') - buy_signals.append((entry_time, trade['entry'])) - - # Sell signal if closed - if 'exit_time' in trade and 'exit' in trade and trade['exit'] > 0: - exit_time = pd.to_datetime(trade['exit_time'], unit='ms') - sell_signals.append((exit_time, trade['exit'])) - - elif trade['type'] == 'short': - # Sell short signal - entry_time = pd.to_datetime(trade['entry_time'], unit='ms') - sell_signals.append((entry_time, trade['entry'])) - - # Buy to cover signal if closed - if 'exit_time' in trade and 'exit' in trade and trade['exit'] > 0: - exit_time = pd.to_datetime(trade['exit_time'], unit='ms') - buy_signals.append((exit_time, trade['exit'])) - - # Log to TensorBoard - log_ohlcv_to_tensorboard( - agent.writer, - df_ohlcv, - buy_signals, - sell_signals, - episode, - tag_prefix=f"episode_{episode}" - ) - - while not done: - # Select action - action = agent.select_action(state) - - # Take action - next_state, reward, done = env.step(action) - - # Store experience - agent.memory.push(state, action, reward, next_state, done) - - # Learn from experience - loss = agent.learn() - - # Update state and reward - state = next_state - episode_reward += reward - - # Break if done - if done: - break - - # Calculate win rate - total_trades = env.win_count + env.loss_count - win_rate = (env.win_count / total_trades * 100) if total_trades > 0 else 0 - - # Calculate prediction accuracy - if hasattr(env, 'predicted_prices') and len(env.predicted_prices) > 0: - # Compare predictions with actual prices - actual_prices = env.features['price'][-len(env.predicted_prices):] - prediction_errors = np.abs(env.predicted_prices - actual_prices) / actual_prices - prediction_accuracy = 100 * (1 - np.mean(prediction_errors)) - else: - prediction_accuracy = 0 - - # Analyze trades - trade_analysis = env.analyze_trades() if hasattr(env, 'analyze_trades') else {} - - # Update stats - stats['episode_rewards'].append(episode_reward) - stats['episode_profits'].append(env.episode_pnl) - stats['win_rates'].append(win_rate) - stats['trade_counts'].append(total_trades) - stats['prediction_accuracies'].append(prediction_accuracy) - - # Log detailed trade analysis - if trade_analysis: - logger.info(f"Trade Analysis: Win Rate={trade_analysis.get('uptrend_win_rate', 0):.1f}% in uptrends, " - f"{trade_analysis.get('downtrend_win_rate', 0):.1f}% in downtrends | " - f"Avg Win=${trade_analysis.get('avg_win', 0):.2f}, Avg Loss=${trade_analysis.get('avg_loss', 0):.2f}") - - # Log to TensorBoard - agent.writer.add_scalar('Reward/train', episode_reward, episode) - agent.writer.add_scalar('Balance/train', env.balance, episode) - agent.writer.add_scalar('WinRate/train', win_rate, episode) + # Fetch new data in continuous mode + if continuous and step % 10 == 0 and exchange is not None: + await env.fetch_new_data(exchange, symbol=args.symbol if args else "ETH/USDT") + + # End of episode + episode_rewards.append(episode_reward) + episode_lengths.append(step) + balances.append(env.balance) + win_rate = env.win_count / (env.win_count + env.loss_count) * 100 if (env.win_count + env.loss_count) > 0 else 0 + win_rates.append(win_rate) + episode_pnls.append(env.episode_pnl) + cumulative_pnl.append(env.total_pnl) + drawdowns.append(env.max_drawdown) + prediction_accuracies.append(prediction_accuracy) + + # Log episode statistics + logger.info(f"Episode {episode}: Reward={episode_reward:.2f}, Balance=${env.balance:.2f}, Win Rate={win_rate:.1f}%, " + f"Trades={env.win_count + env.loss_count}, Episode PnL=${env.episode_pnl:.2f}, " + f"Total PnL=${env.total_pnl:.2f}, Max Drawdown={env.max_drawdown*100:.1f}%, " + f"Pred Accuracy={prediction_accuracy:.1f}%") + + # Log to TensorBoard + if hasattr(agent, 'writer'): + agent.writer.add_scalar('Reward/episode', episode_reward, episode) + agent.writer.add_scalar('Steps/episode', step, episode) + agent.writer.add_scalar('Balance/final', env.balance, episode) + agent.writer.add_scalar('WinRate/episode', win_rate, episode) agent.writer.add_scalar('PnL/episode', env.episode_pnl, episode) agent.writer.add_scalar('PnL/cumulative', env.total_pnl, episode) agent.writer.add_scalar('Drawdown/percent', env.max_drawdown * 100, episode) agent.writer.add_scalar('PredictionLoss', prediction_loss, episode) agent.writer.add_scalar('PredictionAccuracy', prediction_accuracy, episode) - - logger.info(f"Episode {episode}: Reward={episode_reward:.2f}, Balance=${env.balance:.2f}, " - f"Win Rate={win_rate:.1f}%, Trades={len(env.trades)}, " - f"Episode PnL=${env.episode_pnl:.2f}, Total PnL=${env.total_pnl:.2f}, " - f"Max Drawdown={env.max_drawdown*100:.1f}%, Pred Accuracy={prediction_accuracy:.1f}%") - - # Save best model by reward - if episode_reward > best_reward: - best_reward = episode_reward - agent.save("models/trading_agent_best_reward.pt") - logger.info(f"New best reward model saved: {episode_reward:.2f}") - - # Save best model by PnL - if env.episode_pnl > best_pnl: - best_pnl = env.episode_pnl - agent.save("models/trading_agent_best_pnl.pt") - logger.info(f"New best PnL model saved: ${env.episode_pnl:.2f}") - - # Save best model by win rate (if enough trades) - if total_trades >= 10 and win_rate > best_win_rate: - best_win_rate = win_rate - agent.save("models/trading_agent_best_winrate.pt") - logger.info(f"New best win rate model saved: {win_rate:.1f}%") - - # Save checkpoint every 10 episodes - if episode % 10 == 0: - checkpoint_path = f"checkpoints/trading_agent_episode_{episode}.pt" - agent.save(checkpoint_path) - - # Save best metrics to resume training if interrupted - best_metrics = { - 'best_reward': float(best_reward), - 'best_pnl': float(best_pnl), - 'best_win_rate': float(best_win_rate), - 'last_episode': episode, - 'timestamp': datetime.datetime.now().isoformat() - } - with open("checkpoints/best_metrics.json", 'w') as f: - json.dump(best_metrics, f) - - logger.info(f"Checkpoint saved at episode {episode}") - - # Check for early stopping - if env.episode_pnl > best_pnl: - best_pnl = env.episode_pnl - episodes_without_improvement = 0 - else: - episodes_without_improvement += 1 - - # Adjust learning rate based on performance - if episodes_without_improvement > 20: - # Reduce learning rate - for param_group in agent.optimizer.param_groups: - param_group['lr'] = max(param_group['lr'] * 0.9, min_lr) - logger.info(f"Reducing learning rate to {agent.optimizer.param_groups[0]['lr']:.6f}") - - # Early stopping check - if episodes_without_improvement >= patience: - logger.info(f"Early stopping triggered after {episode+1} episodes without improvement") - break - - # Create visualization every 10 episodes or on the last episode - if episode % 10 == 0 or episode == num_episodes - 1: - visualize_training_results(env, agent, episode) - - # After episode is complete, log final state with all trades - if episode % 10 == 0 or episode == num_episodes - 1: - # Create a DataFrame from the environment's data - df_ohlcv = pd.DataFrame([{ - 'timestamp': candle['timestamp'], - 'open': candle['open'], - 'high': candle['high'], - 'low': candle['low'], - 'close': candle['close'], - 'volume': candle['volume'] - } for candle in env.data[-100:]]) # Use last 100 candles - - # Convert timestamp to datetime - df_ohlcv['timestamp'] = pd.to_datetime(df_ohlcv['timestamp'], unit='ms') - df_ohlcv.set_index('timestamp', inplace=True) - - # Extract buy/sell signals from trades - buy_signals = [] - sell_signals = [] - - if hasattr(env, 'trades') and env.trades: - for trade in env.trades: - if 'entry_time' in trade and 'entry' in trade: - if trade['type'] == 'long': - # Buy signal - entry_time = pd.to_datetime(trade['entry_time'], unit='ms') - buy_signals.append((entry_time, trade['entry'])) - - # Sell signal if closed - if 'exit_time' in trade and 'exit' in trade and trade['exit'] > 0: - exit_time = pd.to_datetime(trade['exit_time'], unit='ms') - sell_signals.append((exit_time, trade['exit'])) - - elif trade['type'] == 'short': - # Sell short signal - entry_time = pd.to_datetime(trade['entry_time'], unit='ms') - sell_signals.append((entry_time, trade['entry'])) - - # Buy to cover signal if closed - if 'exit_time' in trade and 'exit' in trade and trade['exit'] > 0: - exit_time = pd.to_datetime(trade['exit_time'], unit='ms') - buy_signals.append((exit_time, trade['exit'])) - - # Log to TensorBoard - use a fixed tag to overwrite previous charts - log_ohlcv_to_tensorboard( - agent.writer, - df_ohlcv, - buy_signals, - sell_signals, - episode, - tag_prefix="latest_trading_data" # Use a fixed tag to overwrite previous charts - ) - - # Create visualization - only keep the latest one - os.makedirs("visualizations", exist_ok=True) - # Remove previous visualizations to save disk space - for file in os.listdir("visualizations"): - if file.startswith("training_episode_") and file.endswith(".png"): - try: - os.remove(os.path.join("visualizations", file)) - except: - pass - - # Create new visualization - visualize_training_results(env, agent, episode) - except Exception as e: - logger.error(f"Error in episode {episode}: {e}") - logger.error(f"Traceback: {traceback.format_exc()}") - continue + # Visualize training results periodically + if episode % 10 == 0 and not continuous: + visualize_training_results(env, agent, episode) + + # Save model periodically + if episode % 50 == 0 and not continuous: + agent.save(f'models/trading_agent_{episode}.pt') # Save final model - agent.save("models/trading_agent_final.pt") + if not continuous: + agent.save('models/trading_agent_final.pt') + agent.writer.close() + + # Save training statistics to CSV + stats_df = pd.DataFrame({ + 'episode_rewards': episode_rewards, + 'episode_lengths': episode_lengths, + 'balances': balances, + 'win_rates': win_rates, + 'episode_pnls': episode_pnls, + 'cumulative_pnl': cumulative_pnl, + 'drawdowns': drawdowns, + 'prediction_accuracy': prediction_accuracies + }) + stats_df.to_csv('training_stats.csv', index=False) # Plot training results - plot_training_results(stats) + if not continuous: + plot_training_results(stats_df) + + # Return final statistics for the last episode + if len(episode_rewards) > 0: + return ( + episode_rewards[-1], + episode_lengths[-1], + balances[-1], + win_rates[-1], + episode_pnls[-1], + cumulative_pnl[-1], + drawdowns[-1] + ) + else: + return 0, 0, env.initial_balance, 0, 0, 0, 0 except Exception as e: logger.error(f"Training failed: {e}") logger.error(f"Traceback: {traceback.format_exc()}") - - # Save emergency checkpoint - try: - agent.save("models/trading_agent_emergency.pt") - logger.info("Emergency model saved due to training failure") - except Exception as save_error: - logger.error(f"Failed to save emergency model: {save_error}") - - return stats + return 0, 0, env.initial_balance, 0, 0, 0, 0 def plot_training_results(stats): """Plot training results""" - plt.figure(figsize=(15, 15)) + # Create figure with subplots + fig, axs = plt.subplots(3, 2, figsize=(15, 12)) + fig.suptitle('Trading Agent Training Results', fontsize=16) # Plot rewards - plt.subplot(3, 2, 1) - plt.plot(stats['episode_rewards']) - plt.title('Episode Rewards') - plt.xlabel('Episode') - plt.ylabel('Reward') + axs[0, 0].plot(stats['episode_rewards'], 'b-') + axs[0, 0].set_title('Episode Rewards') + axs[0, 0].set_xlabel('Episode') + axs[0, 0].set_ylabel('Reward') + axs[0, 0].grid(True) - # Plot balance/profits - plt.subplot(3, 2, 2) - plt.plot(stats['episode_profits']) - plt.title('Episode Profits') - plt.xlabel('Episode') - plt.ylabel('Profit ($)') + # Plot episode profits instead of balances + axs[0, 1].plot(stats['episode_profits'], 'g-') + axs[0, 1].set_title('Episode Profits') + axs[0, 1].set_xlabel('Episode') + axs[0, 1].set_ylabel('Profit ($)') + axs[0, 1].grid(True) - # Plot win rate - plt.subplot(3, 2, 3) - plt.plot(stats['win_rates']) - plt.title('Win Rate') - plt.xlabel('Episode') - plt.ylabel('Win Rate (%)') + # Plot win rates + axs[1, 0].plot(stats['win_rates'], 'r-') + axs[1, 0].set_title('Win Rate') + axs[1, 0].set_xlabel('Episode') + axs[1, 0].set_ylabel('Win Rate (%)') + axs[1, 0].grid(True) - # Plot trade count - plt.subplot(3, 2, 4) - plt.plot(stats['trade_counts']) - plt.title('Number of Trades') - plt.xlabel('Episode') - plt.ylabel('Trades') + # Plot trade counts + axs[1, 1].plot(stats['trade_counts'], 'm-') + axs[1, 1].set_title('Trade Counts') + axs[1, 1].set_xlabel('Episode') + axs[1, 1].set_ylabel('Number of Trades') + axs[1, 1].grid(True) + + # Plot cumulative profits (calculated from episode_profits) + if len(stats['episode_profits']) > 0: + cumulative_profits = np.cumsum(stats['episode_profits']) + axs[2, 0].plot(cumulative_profits, 'c-') + axs[2, 0].set_title('Cumulative Profits') + axs[2, 0].set_xlabel('Episode') + axs[2, 0].set_ylabel('Cumulative Profit ($)') + axs[2, 0].grid(True) # Plot prediction accuracy - plt.subplot(3, 2, 5) - plt.plot(stats['prediction_accuracies']) - plt.title('Prediction Accuracy') - plt.xlabel('Episode') - plt.ylabel('Accuracy (%)') + axs[2, 1].plot(stats['prediction_accuracies'], 'y-') + axs[2, 1].set_title('Prediction Accuracy') + axs[2, 1].set_xlabel('Episode') + axs[2, 1].set_ylabel('Accuracy (%)') + axs[2, 1].grid(True) - # Save the figure - plt.tight_layout() + plt.tight_layout(rect=[0, 0, 1, 0.96]) plt.savefig('training_results.png') plt.close() - - logger.info("Training results saved to training_results.png") def evaluate_agent(agent, env, num_episodes=10): """Evaluate the agent on test data""" @@ -2570,7 +2371,7 @@ def evaluate_agent(agent, env, num_episodes=10): while not done: # Select action (no exploration) action = agent.select_action(state, training=False) - next_state, reward, done = env.step(action) + next_state, reward, done, info = env.step(action) state = next_state episode_reward += reward @@ -2638,7 +2439,7 @@ async def test_training(): action = agent.select_action(state) # Take action - next_state, reward, done = env.step(action) + next_state, reward, done, info = env.step(action) # Store experience agent.memory.push(state, action, reward, next_state, done) @@ -2789,7 +2590,7 @@ async def live_trading(agent, env, exchange, demo=True): action = agent.select_action(state, training=False) # Take action - _, reward, _ = env.step(action) + _, reward, _, _ = env.step(action) # Log trading activity action_names = ["HOLD", "BUY", "SELL", "CLOSE"] @@ -3082,7 +2883,10 @@ async def main(): done = False # Train price predictor - prediction_loss, extrema_loss = env.train_price_predictor() + train_result = env.train_price_predictor() + if isinstance(train_result, (float, int)): + logger.info(f"Price predictor training loss: {train_result:.6f}") + writer.add_scalar('Loss/price_predictor', train_result, episode) # Update price predictions env.update_price_predictions() @@ -3093,7 +2897,7 @@ async def main(): action = agent.select_action(state) # Take action - next_state, reward, done = env.step(action) + next_state, reward, done, info = env.step(action) # Store experience agent.memory.push(state, action, reward, next_state, done) @@ -3132,7 +2936,7 @@ async def main(): writer.add_scalar('PnL/episode', env.episode_pnl, episode) writer.add_scalar('PnL/cumulative', env.total_pnl, episode) writer.add_scalar('Drawdown/percent', env.max_drawdown * 100, episode) - writer.add_scalar('PredictionLoss', prediction_loss, episode) + writer.add_scalar('PredictionLoss', train_result, episode) writer.add_scalar('PredictionAccuracy', prediction_accuracy, episode) # Log OHLCV data to TensorBoard every 5 episodes