added backtest

This commit is contained in:
Dobromir Popov 2025-02-02 00:19:54 +02:00
parent 015b79c395
commit b66fafb06f

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import asyncio
import os
import time
import ccxt.async_support as ccxt
import torch
import torch.nn as nn
@ -8,14 +9,13 @@ import torch.optim as optim
import numpy as np
from collections import deque
# ------------------------------
# Neural Network Architecture
# ------------------------------
# -------------------------------------
# Neural Network Architecture Definition
# -------------------------------------
class TradingModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(TradingModel, self).__init__()
# This is a simplified feed-forward model.
# A production-grade 8B parameter model would need a distributed strategy.
# This is a minimal feed-forward network.
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
@ -27,9 +27,9 @@ class TradingModel(nn.Module):
def forward(self, x):
return self.net(x)
# ------------------------------
# Replay Buffer for Continuous Learning
# ------------------------------
# -------------------------------------
# Replay Buffer for Experience Storage
# -------------------------------------
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
@ -44,17 +44,16 @@ class ReplayBuffer:
def __len__(self):
return len(self.buffer)
# ------------------------------
# Feature Engineering & Indicator Calculation
# ------------------------------
# -------------------------------------
# A Simple Indicator and Feature Preparation Function
# -------------------------------------
def compute_indicators(candle, additional_data):
"""
Combine basic OHLCV candle data with additional sentiment/indicator data.
In production, you might use dedicated libraries (like TALib) to calculate RSI, stochastic,
and support up to 100 channels.
Combine OHLCV candle data with extra indicator information.
Base features: open, high, low, close, volume.
Additional channels (e.g. sentiment score, news volume, etc.) are appended.
"""
features = []
# Base candle features: open, high, low, close, volume
features.extend([
candle.get('open', 0.0),
candle.get('high', 0.0),
@ -62,72 +61,26 @@ def compute_indicators(candle, additional_data):
candle.get('close', 0.0),
candle.get('volume', 0.0)
])
# Append additional indicators (e.g., sentiment, news volume, etc.)
# Append additional indicators (e.g., simulated sentiment here)
for key, value in additional_data.items():
features.append(value)
return np.array(features, dtype=np.float32)
# ------------------------------
# Data Ingestion from MEXC: Live Candle Data
# ------------------------------
async def get_live_candle_data(exchange, symbol, timeframe='1m'):
"""
Fetch the latest OHLCV candle for the given symbol and timeframe.
MEXC (or other exchanges via ccxt) returns a list of candles:
[ timestamp, open, high, low, close, volume ]
We use limit=1 to get the last candle.
"""
try:
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=1)
if ohlcv and len(ohlcv) > 0:
ts, open_, high, low, close, volume = ohlcv[-1]
candle = {
'timestamp': ts,
'open': open_,
'high': high,
'low': low,
'close': close,
'volume': volume
}
return candle
return None
except Exception as e:
print("Error fetching candle data:", e)
return None
# ------------------------------
# Simulated Sentiment Data
# ------------------------------
async def get_sentiment_data():
"""
Simulate fetching live sentiment data.
In production, integrate sentiment analysis from social media, news APIs, etc.
"""
await asyncio.sleep(0.1) # Simulated latency
return {
'sentiment_score': np.random.rand(), # Example: normalized between 0 and 1
'news_volume': np.random.rand(),
'social_engagement': np.random.rand()
}
# ------------------------------
# RL Agent with Continuous Learning
# ------------------------------
# -------------------------------------
# RL Agent that Uses the Neural Network
# -------------------------------------
class ContinuousRLAgent:
def __init__(self, model, optimizer, replay_buffer, batch_size=32):
self.model = model
self.optimizer = optimizer
self.replay_buffer = replay_buffer
self.batch_size = batch_size
# For demonstration, we use a basic MSE loss.
self.loss_fn = nn.MSELoss()
self.loss_fn = nn.MSELoss() # Using a simple MSE loss for demonstration.
def act(self, state):
"""
Given the latest state, decide on an action.
Mapping: 0: SELL, 1: HOLD, 2: BUY.
Given state features, output an action.
Mapping: 0 -> SELL, 1 -> HOLD, 2 -> BUY.
"""
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
@ -137,8 +90,8 @@ class ContinuousRLAgent:
def train_step(self):
"""
Perform one training step using a sample from the replay buffer.
Note: A true RL implementation would incorporate rewards and discounted returns.
Sample a batch from the replay buffer and update the network.
(Note: A real RL algorithm will have a more-complex target calculation.)
"""
if len(self.replay_buffer) < self.batch_size:
return # Not enough samples yet
@ -156,7 +109,7 @@ class ContinuousRLAgent:
targets_tensor = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
outputs = self.model(states_tensor)
# For demonstration, we use the first output as our estimation.
# For this simple demonstration we use the first output as the value estimate.
predictions = outputs[:, 0].unsqueeze(1)
loss = self.loss_fn(predictions, targets_tensor)
@ -164,88 +117,164 @@ class ContinuousRLAgent:
loss.backward()
self.optimizer.step()
# ------------------------------
# Trading Bot: Execution with MEXC API
# ------------------------------
class TradingBot:
def __init__(self, rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001):
self.rl_agent = rl_agent
self.exchange = exchange
self.symbol = symbol
self.trade_amount = trade_amount # Amount to trade (in base currency)
async def execute_trade(self, action):
# -------------------------------------
# Historical Data Fetching Function
# -------------------------------------
async def fetch_historical_data(exchange, symbol, timeframe, since, end_time, batch_size=500):
"""
Convert the RL agent's decision into a market order.
Action mapping: 0 => SELL, 1 => HOLD, 2 => BUY.
"""
if action == 0:
print("Executing SELL order")
await self.place_market_order('sell')
elif action == 2:
print("Executing BUY order")
await self.place_market_order('buy')
else:
print("Holding position - no order executed")
async def place_market_order(self, side):
"""
Place a market order on MEXC using ccxt.
"""
try:
order = await self.exchange.create_order(self.symbol, 'market', side, self.trade_amount, None)
print(f"Order executed: {order}")
except Exception as e:
print(f"Error executing {side} order:", e)
async def trading_loop(self, timeframe='1m'):
"""
Main loop:
- Fetch live candlestick and sentiment data.
- Compute state features.
- Let the RL agent decide an action.
- Execute market orders.
- Store experience and perform continuous training.
Fetch historical OHLCV data for the given symbol and timeframe.
The "since" and "end_time" parameters are in milliseconds.
"""
candles = []
since_ms = since
while True:
candle = await get_live_candle_data(self.exchange, self.symbol, timeframe)
if candle is None:
await asyncio.sleep(1)
continue
try:
batch = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since_ms, limit=batch_size)
except Exception as e:
print("Error fetching historical data:", e)
break
if not batch:
break
# Convert each candle from a list to a dict.
for c in batch:
candle_dict = {
'timestamp': c[0],
'open': c[1],
'high': c[2],
'low': c[3],
'close': c[4],
'volume': c[5]
}
candles.append(candle_dict)
last_timestamp = batch[-1][0]
if last_timestamp >= end_time:
break
since_ms = last_timestamp + 1
print(f"Fetched {len(candles)} candles.")
return candles
sentiment = await get_sentiment_data()
state = compute_indicators(candle, sentiment)
action = self.rl_agent.act(state)
await self.execute_trade(action)
# -------------------------------------
# Backtest Environment Class Definition
# -------------------------------------
class BacktestEnvironment:
def __init__(self, candles):
self.candles = candles
self.current_index = 0
self.position = None # Will hold a dict once a BUY is simulated.
# Simulate reward computation. In production, use trading performance to compute reward.
reward = np.random.rand()
next_state = state # In a real system, this would be updated after the trade.
def reset(self):
self.current_index = 0
self.position = None
return self.get_state(self.current_index)
def get_state(self, index):
candle = self.candles[index]
# Simulate additional sentiment features.
sentiment = {
'sentiment_score': np.random.rand(),
'news_volume': np.random.rand(),
'social_engagement': np.random.rand()
}
return compute_indicators(candle, sentiment)
def step(self, action):
"""
Simulate a trading step.
Uses the current candle as state.
Decides on an action:
- If not in a position and action is BUY (2), we buy at the next candle's open.
- If in position and action is SELL (0), we sell at the next candle's open.
- Otherwise, no trade is executed.
Returns: (state, reward, next_state, done)
"""
if self.current_index >= len(self.candles) - 1:
# End of the historical data.
return self.get_state(self.current_index), 0, None, True
state = self.get_state(self.current_index)
next_index = self.current_index + 1
next_state = self.get_state(next_index)
current_candle = self.candles[self.current_index]
next_candle = self.candles[next_index]
reward = 0.0
# Action mapping: 0 -> SELL, 1 -> HOLD, 2 -> BUY.
if self.position is None:
if action == 2: # BUY signal: enter a long position.
# Buy at the next candle's open price.
entry_price = next_candle['open']
self.position = {'entry_price': entry_price, 'entry_index': self.current_index}
# No immediate reward on entry.
else:
if action == 0: # SELL signal: exit the long position.
sell_price = next_candle['open']
reward = sell_price - self.position['entry_price']
self.position = None
self.current_index = next_index
done = (self.current_index >= len(self.candles) - 1)
return state, reward, next_state, done
# -------------------------------------
# Training Loop Over Historical Data (Backtest)
# -------------------------------------
def train_on_historical_data(env, rl_agent, num_epochs=10):
"""
For each epoch, run through the historical data episode.
At every step, let the agent decide an action and simulate a trade.
The experience (state, reward, next_state, done) is stored and used to update the network.
"""
for epoch in range(num_epochs):
state = env.reset()
done = False
self.rl_agent.replay_buffer.add((state, reward, next_state, done))
self.rl_agent.train_step()
total_reward = 0.0
steps = 0
while not done:
action = rl_agent.act(state)
state_next, reward, next_state, done = env.step(action)
if next_state is None:
next_state = np.zeros_like(state)
rl_agent.replay_buffer.add((state, reward, next_state, done))
rl_agent.train_step()
state = next_state
total_reward += reward
steps += 1
print(f"Epoch {epoch+1}/{num_epochs} completed, total reward: {total_reward:.4f} over {steps} steps.")
# Wait to match the candle timeframe (for a 1m candle, sleep 60 seconds)
await asyncio.sleep(60)
# -------------------------------------
# Main Asynchronous Function for Backtest Training
# -------------------------------------
async def main_backtest():
# Define symbol, timeframe, and historical period.
symbol = 'BTC/USDT'
timeframe = '1m'
now = int(time.time() * 1000)
one_day_ms = 24 * 60 * 60 * 1000
# For example, fetch a 1-day period from 2 days ago until 1 day ago.
since = now - one_day_ms * 2
end_time = now - one_day_ms
# ------------------------------
# Main Execution Loop
# ------------------------------
async def main_loop():
# Retrieve MEXC API credentials from environment variables or replace with your keys.
# Initialize the exchange (using MEXC in this example).
mexc_api_key = os.environ.get('MEXC_API_KEY', 'YOUR_API_KEY')
mexc_api_secret = os.environ.get('MEXC_API_SECRET', 'YOUR_SECRET_KEY')
# Create the MEXC exchange instance (ccxt asynchronous)
exchange = ccxt.mexc({
'apiKey': mexc_api_key,
'secret': mexc_api_secret,
'enableRateLimit': True,
})
# Define the model input dimensions.
# Base candle features (5: open, high, low, close, volume) + simulated 3 sentiment features
input_dim = 8
print("Fetching historical data...")
candles = await fetch_historical_data(exchange, symbol, timeframe, since, end_time)
if not candles:
print("No historical data fetched.")
await exchange.close()
return
# Initialize the backtest environment with the historical candles.
env = BacktestEnvironment(candles)
# Model dimensions:
# 5 base OHLCV features + 3 simulated sentiment features.
input_dim = 5 + 3
hidden_dim = 128
output_dim = 3 # SELL, HOLD, BUY
@ -254,15 +283,21 @@ async def main_loop():
replay_buffer = ReplayBuffer(capacity=10000)
rl_agent = ContinuousRLAgent(model, optimizer, replay_buffer, batch_size=32)
trading_bot = TradingBot(rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001)
# Train the RL agent via backtesting.
num_epochs = 10 # Adjust number of epochs as needed.
train_on_historical_data(env, rl_agent, num_epochs=num_epochs)
# Optionally, perform a final test episode to simulate trading with the trained model.
state = env.reset()
done = False
cumulative_reward = 0.0
while not done:
action = rl_agent.act(state)
state, reward, next_state, done = env.step(action)
cumulative_reward += reward
print("Final backtest simulation cumulative profit:", cumulative_reward)
try:
# Begin the continuous trading loop (using 1-minute candles)
await trading_bot.trading_loop(timeframe='1m')
except Exception as e:
print("Error in trading loop:", e)
finally:
await exchange.close()
if __name__ == "__main__":
asyncio.run(main_loop())
asyncio.run(main_backtest())