gogo2/crypto/brian/index.py
Dobromir Popov 015b79c395 brcusd
2025-02-01 23:26:42 +02:00

268 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import asyncio
import os
import ccxt.async_support as ccxt
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
# ------------------------------
# Neural Network Architecture
# ------------------------------
class TradingModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(TradingModel, self).__init__()
# This is a simplified feed-forward model.
# A production-grade 8B parameter model would need a distributed strategy.
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.net(x)
# ------------------------------
# Replay Buffer for Continuous Learning
# ------------------------------
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), size=batch_size, replace=False)
return [self.buffer[i] for i in indices]
def __len__(self):
return len(self.buffer)
# ------------------------------
# Feature Engineering & Indicator Calculation
# ------------------------------
def compute_indicators(candle, additional_data):
"""
Combine basic OHLCV candle data with additional sentiment/indicator data.
In production, you might use dedicated libraries (like TALib) to calculate RSI, stochastic,
and support up to 100 channels.
"""
features = []
# Base candle features: open, high, low, close, volume
features.extend([
candle.get('open', 0.0),
candle.get('high', 0.0),
candle.get('low', 0.0),
candle.get('close', 0.0),
candle.get('volume', 0.0)
])
# Append additional indicators (e.g., sentiment, news volume, etc.)
for key, value in additional_data.items():
features.append(value)
return np.array(features, dtype=np.float32)
# ------------------------------
# Data Ingestion from MEXC: Live Candle Data
# ------------------------------
async def get_live_candle_data(exchange, symbol, timeframe='1m'):
"""
Fetch the latest OHLCV candle for the given symbol and timeframe.
MEXC (or other exchanges via ccxt) returns a list of candles:
[ timestamp, open, high, low, close, volume ]
We use limit=1 to get the last candle.
"""
try:
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=1)
if ohlcv and len(ohlcv) > 0:
ts, open_, high, low, close, volume = ohlcv[-1]
candle = {
'timestamp': ts,
'open': open_,
'high': high,
'low': low,
'close': close,
'volume': volume
}
return candle
return None
except Exception as e:
print("Error fetching candle data:", e)
return None
# ------------------------------
# Simulated Sentiment Data
# ------------------------------
async def get_sentiment_data():
"""
Simulate fetching live sentiment data.
In production, integrate sentiment analysis from social media, news APIs, etc.
"""
await asyncio.sleep(0.1) # Simulated latency
return {
'sentiment_score': np.random.rand(), # Example: normalized between 0 and 1
'news_volume': np.random.rand(),
'social_engagement': np.random.rand()
}
# ------------------------------
# RL Agent with Continuous Learning
# ------------------------------
class ContinuousRLAgent:
def __init__(self, model, optimizer, replay_buffer, batch_size=32):
self.model = model
self.optimizer = optimizer
self.replay_buffer = replay_buffer
self.batch_size = batch_size
# For demonstration, we use a basic MSE loss.
self.loss_fn = nn.MSELoss()
def act(self, state):
"""
Given the latest state, decide on an action.
Mapping: 0: SELL, 1: HOLD, 2: BUY.
"""
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
output = self.model(state_tensor)
action = torch.argmax(output, dim=1).item()
return action
def train_step(self):
"""
Perform one training step using a sample from the replay buffer.
Note: A true RL implementation would incorporate rewards and discounted returns.
"""
if len(self.replay_buffer) < self.batch_size:
return # Not enough samples yet
batch = self.replay_buffer.sample(self.batch_size)
states, rewards, next_states, dones = [], [], [], []
for experience in batch:
state, reward, next_state, done = experience
states.append(state)
rewards.append(reward)
next_states.append(next_state)
dones.append(done)
states_tensor = torch.tensor(states, dtype=torch.float32)
targets_tensor = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
outputs = self.model(states_tensor)
# For demonstration, we use the first output as our estimation.
predictions = outputs[:, 0].unsqueeze(1)
loss = self.loss_fn(predictions, targets_tensor)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# ------------------------------
# Trading Bot: Execution with MEXC API
# ------------------------------
class TradingBot:
def __init__(self, rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001):
self.rl_agent = rl_agent
self.exchange = exchange
self.symbol = symbol
self.trade_amount = trade_amount # Amount to trade (in base currency)
async def execute_trade(self, action):
"""
Convert the RL agent's decision into a market order.
Action mapping: 0 => SELL, 1 => HOLD, 2 => BUY.
"""
if action == 0:
print("Executing SELL order")
await self.place_market_order('sell')
elif action == 2:
print("Executing BUY order")
await self.place_market_order('buy')
else:
print("Holding position - no order executed")
async def place_market_order(self, side):
"""
Place a market order on MEXC using ccxt.
"""
try:
order = await self.exchange.create_order(self.symbol, 'market', side, self.trade_amount, None)
print(f"Order executed: {order}")
except Exception as e:
print(f"Error executing {side} order:", e)
async def trading_loop(self, timeframe='1m'):
"""
Main loop:
- Fetch live candlestick and sentiment data.
- Compute state features.
- Let the RL agent decide an action.
- Execute market orders.
- Store experience and perform continuous training.
"""
while True:
candle = await get_live_candle_data(self.exchange, self.symbol, timeframe)
if candle is None:
await asyncio.sleep(1)
continue
sentiment = await get_sentiment_data()
state = compute_indicators(candle, sentiment)
action = self.rl_agent.act(state)
await self.execute_trade(action)
# Simulate reward computation. In production, use trading performance to compute reward.
reward = np.random.rand()
next_state = state # In a real system, this would be updated after the trade.
done = False
self.rl_agent.replay_buffer.add((state, reward, next_state, done))
self.rl_agent.train_step()
# Wait to match the candle timeframe (for a 1m candle, sleep 60 seconds)
await asyncio.sleep(60)
# ------------------------------
# Main Execution Loop
# ------------------------------
async def main_loop():
# Retrieve MEXC API credentials from environment variables or replace with your keys.
mexc_api_key = os.environ.get('MEXC_API_KEY', 'YOUR_API_KEY')
mexc_api_secret = os.environ.get('MEXC_API_SECRET', 'YOUR_SECRET_KEY')
# Create the MEXC exchange instance (ccxt asynchronous)
exchange = ccxt.mexc({
'apiKey': mexc_api_key,
'secret': mexc_api_secret,
'enableRateLimit': True,
})
# Define the model input dimensions.
# Base candle features (5: open, high, low, close, volume) + simulated 3 sentiment features
input_dim = 8
hidden_dim = 128
output_dim = 3 # SELL, HOLD, BUY
model = TradingModel(input_dim, hidden_dim, output_dim)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
replay_buffer = ReplayBuffer(capacity=10000)
rl_agent = ContinuousRLAgent(model, optimizer, replay_buffer, batch_size=32)
trading_bot = TradingBot(rl_agent, exchange, symbol='BTC/USDT', trade_amount=0.001)
try:
# Begin the continuous trading loop (using 1-minute candles)
await trading_bot.trading_loop(timeframe='1m')
except Exception as e:
print("Error in trading loop:", e)
finally:
await exchange.close()
if __name__ == "__main__":
asyncio.run(main_loop())