From b66fafb06f748952426aaf3da8722404a212dc55 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 2 Feb 2025 00:19:54 +0200 Subject: [PATCH] added backtest --- crypto/brian/index.py | 341 +++++++++++++++++++++++------------------- 1 file changed, 188 insertions(+), 153 deletions(-) diff --git a/crypto/brian/index.py b/crypto/brian/index.py index 4c33712..b108685 100644 --- a/crypto/brian/index.py +++ b/crypto/brian/index.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio import os +import time import ccxt.async_support as ccxt import torch import torch.nn as nn @@ -8,14 +9,13 @@ import torch.optim as optim import numpy as np from collections import deque -# ------------------------------ -# Neural Network Architecture -# ------------------------------ +# ------------------------------------- +# Neural Network Architecture Definition +# ------------------------------------- class TradingModel(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim): super(TradingModel, self).__init__() - # This is a simplified feed-forward model. - # A production-grade 8B parameter model would need a distributed strategy. + # This is a minimal feed-forward network. self.net = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), @@ -27,9 +27,9 @@ class TradingModel(nn.Module): def forward(self, x): return self.net(x) -# ------------------------------ -# Replay Buffer for Continuous Learning -# ------------------------------ +# ------------------------------------- +# Replay Buffer for Experience Storage +# ------------------------------------- class ReplayBuffer: def __init__(self, capacity=10000): self.buffer = deque(maxlen=capacity) @@ -44,17 +44,16 @@ class ReplayBuffer: def __len__(self): return len(self.buffer) -# ------------------------------ -# Feature Engineering & Indicator Calculation -# ------------------------------ +# ------------------------------------- +# A Simple Indicator and Feature Preparation Function +# ------------------------------------- def compute_indicators(candle, additional_data): """ - Combine basic OHLCV candle data with additional sentiment/indicator data. - In production, you might use dedicated libraries (like TA‑Lib) to calculate RSI, stochastic, - and support up to 100 channels. + Combine OHLCV candle data with extra indicator information. + Base features: open, high, low, close, volume. + Additional channels (e.g. sentiment score, news volume, etc.) are appended. """ features = [] - # Base candle features: open, high, low, close, volume features.extend([ candle.get('open', 0.0), candle.get('high', 0.0), @@ -62,83 +61,37 @@ def compute_indicators(candle, additional_data): candle.get('close', 0.0), candle.get('volume', 0.0) ]) - - # Append additional indicators (e.g., sentiment, news volume, etc.) + # Append additional indicators (e.g., simulated sentiment here) for key, value in additional_data.items(): features.append(value) - return np.array(features, dtype=np.float32) -# ------------------------------ -# Data Ingestion from MEXC: Live Candle Data -# ------------------------------ -async def get_live_candle_data(exchange, symbol, timeframe='1m'): - """ - Fetch the latest OHLCV candle for the given symbol and timeframe. - MEXC (or other exchanges via ccxt) returns a list of candles: - [ timestamp, open, high, low, close, volume ] - We use limit=1 to get the last candle. - """ - try: - ohlcv = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=1) - if ohlcv and len(ohlcv) > 0: - ts, open_, high, low, close, volume = ohlcv[-1] - candle = { - 'timestamp': ts, - 'open': open_, - 'high': high, - 'low': low, - 'close': close, - 'volume': volume - } - return candle - return None - except Exception as e: - print("Error fetching candle data:", e) - return None - -# ------------------------------ -# Simulated Sentiment Data -# ------------------------------ -async def get_sentiment_data(): - """ - Simulate fetching live sentiment data. - In production, integrate sentiment analysis from social media, news APIs, etc. - """ - await asyncio.sleep(0.1) # Simulated latency - return { - 'sentiment_score': np.random.rand(), # Example: normalized between 0 and 1 - 'news_volume': np.random.rand(), - 'social_engagement': np.random.rand() - } - -# ------------------------------ -# RL Agent with Continuous Learning -# ------------------------------ +# ------------------------------------- +# RL Agent that Uses the Neural Network +# ------------------------------------- class ContinuousRLAgent: def __init__(self, model, optimizer, replay_buffer, batch_size=32): self.model = model self.optimizer = optimizer self.replay_buffer = replay_buffer self.batch_size = batch_size - # For demonstration, we use a basic MSE loss. - self.loss_fn = nn.MSELoss() + self.loss_fn = nn.MSELoss() # Using a simple MSE loss for demonstration. def act(self, state): """ - Given the latest state, decide on an action. - Mapping: 0: SELL, 1: HOLD, 2: BUY. + Given state features, output an action. + Mapping: 0 -> SELL, 1 -> HOLD, 2 -> BUY. """ state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) with torch.no_grad(): output = self.model(state_tensor) - action = torch.argmax(output, dim=1).item() + action = torch.argmax(output, dim=1).item() return action def train_step(self): """ - Perform one training step using a sample from the replay buffer. - Note: A true RL implementation would incorporate rewards and discounted returns. + Sample a batch from the replay buffer and update the network. + (Note: A real RL algorithm will have a more-complex target calculation.) """ if len(self.replay_buffer) < self.batch_size: return # Not enough samples yet @@ -156,7 +109,7 @@ class ContinuousRLAgent: targets_tensor = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1) outputs = self.model(states_tensor) - # For demonstration, we use the first output as our estimation. + # For this simple demonstration we use the first output as the value estimate. predictions = outputs[:, 0].unsqueeze(1) loss = self.loss_fn(predictions, targets_tensor) @@ -164,105 +117,187 @@ class ContinuousRLAgent: loss.backward() self.optimizer.step() -# ------------------------------ -# Trading Bot: Execution with MEXC API -# ------------------------------ -class TradingBot: - def __init__(self, rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001): - self.rl_agent = rl_agent - self.exchange = exchange - self.symbol = symbol - self.trade_amount = trade_amount # Amount to trade (in base currency) - - async def execute_trade(self, action): - """ - Convert the RL agent's decision into a market order. - Action mapping: 0 => SELL, 1 => HOLD, 2 => BUY. - """ - if action == 0: - print("Executing SELL order") - await self.place_market_order('sell') - elif action == 2: - print("Executing BUY order") - await self.place_market_order('buy') - else: - print("Holding position - no order executed") - - async def place_market_order(self, side): - """ - Place a market order on MEXC using ccxt. - """ +# ------------------------------------- +# Historical Data Fetching Function +# ------------------------------------- +async def fetch_historical_data(exchange, symbol, timeframe, since, end_time, batch_size=500): + """ + Fetch historical OHLCV data for the given symbol and timeframe. + The "since" and "end_time" parameters are in milliseconds. + """ + candles = [] + since_ms = since + while True: try: - order = await self.exchange.create_order(self.symbol, 'market', side, self.trade_amount, None) - print(f"Order executed: {order}") + batch = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since_ms, limit=batch_size) except Exception as e: - print(f"Error executing {side} order:", e) - - async def trading_loop(self, timeframe='1m'): - """ - Main loop: - - Fetch live candlestick and sentiment data. - - Compute state features. - - Let the RL agent decide an action. - - Execute market orders. - - Store experience and perform continuous training. - """ - while True: - candle = await get_live_candle_data(self.exchange, self.symbol, timeframe) - if candle is None: - await asyncio.sleep(1) - continue - - sentiment = await get_sentiment_data() - state = compute_indicators(candle, sentiment) - action = self.rl_agent.act(state) - await self.execute_trade(action) - - # Simulate reward computation. In production, use trading performance to compute reward. - reward = np.random.rand() - next_state = state # In a real system, this would be updated after the trade. - done = False - self.rl_agent.replay_buffer.add((state, reward, next_state, done)) - self.rl_agent.train_step() - - # Wait to match the candle timeframe (for a 1m candle, sleep 60 seconds) - await asyncio.sleep(60) + print("Error fetching historical data:", e) + break + if not batch: + break + # Convert each candle from a list to a dict. + for c in batch: + candle_dict = { + 'timestamp': c[0], + 'open': c[1], + 'high': c[2], + 'low': c[3], + 'close': c[4], + 'volume': c[5] + } + candles.append(candle_dict) + last_timestamp = batch[-1][0] + if last_timestamp >= end_time: + break + since_ms = last_timestamp + 1 + print(f"Fetched {len(candles)} candles.") + return candles -# ------------------------------ -# Main Execution Loop -# ------------------------------ -async def main_loop(): - # Retrieve MEXC API credentials from environment variables or replace with your keys. +# ------------------------------------- +# Backtest Environment Class Definition +# ------------------------------------- +class BacktestEnvironment: + def __init__(self, candles): + self.candles = candles + self.current_index = 0 + self.position = None # Will hold a dict once a BUY is simulated. + + def reset(self): + self.current_index = 0 + self.position = None + return self.get_state(self.current_index) + + def get_state(self, index): + candle = self.candles[index] + # Simulate additional sentiment features. + sentiment = { + 'sentiment_score': np.random.rand(), + 'news_volume': np.random.rand(), + 'social_engagement': np.random.rand() + } + return compute_indicators(candle, sentiment) + + def step(self, action): + """ + Simulate a trading step. + • Uses the current candle as state. + • Decides on an action: + - If not in a position and action is BUY (2), we buy at the next candle's open. + - If in position and action is SELL (0), we sell at the next candle's open. + - Otherwise, no trade is executed. + • Returns: (state, reward, next_state, done) + """ + if self.current_index >= len(self.candles) - 1: + # End of the historical data. + return self.get_state(self.current_index), 0, None, True + + state = self.get_state(self.current_index) + next_index = self.current_index + 1 + next_state = self.get_state(next_index) + current_candle = self.candles[self.current_index] + next_candle = self.candles[next_index] + reward = 0.0 + + # Action mapping: 0 -> SELL, 1 -> HOLD, 2 -> BUY. + if self.position is None: + if action == 2: # BUY signal: enter a long position. + # Buy at the next candle's open price. + entry_price = next_candle['open'] + self.position = {'entry_price': entry_price, 'entry_index': self.current_index} + # No immediate reward on entry. + else: + if action == 0: # SELL signal: exit the long position. + sell_price = next_candle['open'] + reward = sell_price - self.position['entry_price'] + self.position = None + self.current_index = next_index + done = (self.current_index >= len(self.candles) - 1) + return state, reward, next_state, done + +# ------------------------------------- +# Training Loop Over Historical Data (Backtest) +# ------------------------------------- +def train_on_historical_data(env, rl_agent, num_epochs=10): + """ + For each epoch, run through the historical data episode. + At every step, let the agent decide an action and simulate a trade. + The experience (state, reward, next_state, done) is stored and used to update the network. + """ + for epoch in range(num_epochs): + state = env.reset() + done = False + total_reward = 0.0 + steps = 0 + while not done: + action = rl_agent.act(state) + state_next, reward, next_state, done = env.step(action) + if next_state is None: + next_state = np.zeros_like(state) + rl_agent.replay_buffer.add((state, reward, next_state, done)) + rl_agent.train_step() + state = next_state + total_reward += reward + steps += 1 + print(f"Epoch {epoch+1}/{num_epochs} completed, total reward: {total_reward:.4f} over {steps} steps.") + +# ------------------------------------- +# Main Asynchronous Function for Backtest Training +# ------------------------------------- +async def main_backtest(): + # Define symbol, timeframe, and historical period. + symbol = 'BTC/USDT' + timeframe = '1m' + now = int(time.time() * 1000) + one_day_ms = 24 * 60 * 60 * 1000 + # For example, fetch a 1-day period from 2 days ago until 1 day ago. + since = now - one_day_ms * 2 + end_time = now - one_day_ms + + # Initialize the exchange (using MEXC in this example). mexc_api_key = os.environ.get('MEXC_API_KEY', 'YOUR_API_KEY') mexc_api_secret = os.environ.get('MEXC_API_SECRET', 'YOUR_SECRET_KEY') - - # Create the MEXC exchange instance (ccxt asynchronous) exchange = ccxt.mexc({ 'apiKey': mexc_api_key, 'secret': mexc_api_secret, 'enableRateLimit': True, }) - # Define the model input dimensions. - # Base candle features (5: open, high, low, close, volume) + simulated 3 sentiment features - input_dim = 8 + print("Fetching historical data...") + candles = await fetch_historical_data(exchange, symbol, timeframe, since, end_time) + if not candles: + print("No historical data fetched.") + await exchange.close() + return + + # Initialize the backtest environment with the historical candles. + env = BacktestEnvironment(candles) + + # Model dimensions: + # 5 base OHLCV features + 3 simulated sentiment features. + input_dim = 5 + 3 hidden_dim = 128 - output_dim = 3 # SELL, HOLD, BUY + output_dim = 3 # SELL, HOLD, BUY model = TradingModel(input_dim, hidden_dim, output_dim) optimizer = optim.Adam(model.parameters(), lr=1e-4) replay_buffer = ReplayBuffer(capacity=10000) rl_agent = ContinuousRLAgent(model, optimizer, replay_buffer, batch_size=32) - trading_bot = TradingBot(rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001) + # Train the RL agent via backtesting. + num_epochs = 10 # Adjust number of epochs as needed. + train_on_historical_data(env, rl_agent, num_epochs=num_epochs) - try: - # Begin the continuous trading loop (using 1-minute candles) - await trading_bot.trading_loop(timeframe='1m') - except Exception as e: - print("Error in trading loop:", e) - finally: - await exchange.close() + # Optionally, perform a final test episode to simulate trading with the trained model. + state = env.reset() + done = False + cumulative_reward = 0.0 + while not done: + action = rl_agent.act(state) + state, reward, next_state, done = env.step(action) + cumulative_reward += reward + print("Final backtest simulation cumulative profit:", cumulative_reward) + + await exchange.close() if __name__ == "__main__": - asyncio.run(main_loop()) \ No newline at end of file + asyncio.run(main_backtest()) \ No newline at end of file