From 14d70f938e8f4ceed0f37938785b2ab4ac12608c Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 10 Mar 2025 12:13:17 +0200 Subject: [PATCH] enhancements --- crypto/gogo2/_notes.md | 19 + crypto/gogo2/main.py | 1364 +++++++++++++++++++++------------------- 2 files changed, 749 insertions(+), 634 deletions(-) create mode 100644 crypto/gogo2/_notes.md diff --git a/crypto/gogo2/_notes.md b/crypto/gogo2/_notes.md new file mode 100644 index 0000000..0cb4fd6 --- /dev/null +++ b/crypto/gogo2/_notes.md @@ -0,0 +1,19 @@ + + +ensure we use GPU if available to train faster. during training we need to have RL loop that looks at streaming data, and retrospective backtesting/training on predictions. sincr the start of the traing we're only loosing. implement robust penalty and analysis when closing a loosing trade and improve the reward function. + + + + +2025-03-10 12:11:28,651 - INFO - Initialized environment with 500 candles +C:\Users\popov\miniforge3\Lib\site-packages\torch\nn\modules\transformer.py:385: UserWarning: enable_nested_tensor is True, but self.use_nested_tensor is False because encoder_layer.self_attn.batch_first was not True(use batch_first for better inference performance) + warnings.warn( +main.py:1105: FutureWarning: `torch.cuda.amp.GradScaler(args...)` is deprecated. Please use `torch.amp.GradScaler('cuda', args...)` instead. + self.scaler = amp.GradScaler() +C:\Users\popov\miniforge3\Lib\site-packages\torch\amp\grad_scaler.py:132: UserWarning: torch.cuda.amp.GradScaler is enabled, but CUDA is not available. Disabling. + warnings.warn( +2025-03-10 12:11:30,927 - INFO - Starting training for 1000 episodes... +2025-03-10 12:11:30,927 - INFO - Starting training on device: cpu +2025-03-10 12:11:30,928 - ERROR - Training failed: 'TradingEnvironment' object has no attribute 'initialize_price_predictor' +2025-03-10 12:11:30,928 - INFO - Exchange connection closed +Backend tkagg is interactive backend. Turning interactive mode on. \ No newline at end of file diff --git a/crypto/gogo2/main.py b/crypto/gogo2/main.py index 78c46af..1f8093c 100644 --- a/crypto/gogo2/main.py +++ b/crypto/gogo2/main.py @@ -20,6 +20,7 @@ from torch.utils.tensorboard import SummaryWriter import torch.cuda.amp as amp # Add this import at the top from sklearn.preprocessing import MinMaxScaler import copy +import argparse # Configure logging logging.basicConfig( @@ -52,6 +53,28 @@ TARGET_UPDATE = 10 # Update target network every 10 episodes # Experience replay tuple Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done']) +# Add this function near the top of the file, after the imports but before any classes +def find_local_extrema(prices, window=5): + """Find local minima (bottoms) and maxima (tops) in price data""" + bottoms = [] + tops = [] + + if len(prices) < window * 2 + 1: + return bottoms, tops + + for i in range(window, len(prices) - window): + # Check if this is a local minimum (bottom) + if all(prices[i] <= prices[i-j] for j in range(1, window+1)) and \ + all(prices[i] <= prices[i+j] for j in range(1, window+1)): + bottoms.append(i) + + # Check if this is a local maximum (top) + if all(prices[i] >= prices[i-j] for j in range(1, window+1)) and \ + all(prices[i] >= prices[i+j] for j in range(1, window+1)): + tops.append(i) + + return bottoms, tops + class ReplayMemory: def __init__(self, capacity): self.memory = deque(maxlen=capacity) @@ -212,78 +235,64 @@ class PricePredictionModel(nn.Module): targets = [] for i in range(len(scaled_data) - 35): + # Sequence: 30 time steps seq = scaled_data[i:i+30] - target = scaled_data[i+30:i+35] + # Target: next 5 time steps + target = scaled_data[i+30:i+35].flatten() + sequences.append(seq) targets.append(target) - - if not sequences: + + if not sequences: # If no sequences were created return 0.0 - sequences = np.array(sequences).reshape(-1, 30, 1) - targets = np.array(targets).reshape(-1, 5) - # Convert to tensors - sequences_tensor = torch.FloatTensor(sequences).to(next(self.parameters()).device) - targets_tensor = torch.FloatTensor(targets).to(next(self.parameters()).device) + sequences_tensor = torch.FloatTensor(np.array(sequences).reshape(-1, 30, 1)).to(next(self.parameters()).device) + targets_tensor = torch.FloatTensor(np.array(targets)).to(next(self.parameters()).device) - # Train + # Training loop total_loss = 0 for _ in range(epochs): - optimizer.zero_grad() + # Forward pass predictions = self(sequences_tensor) + + # Calculate loss loss = F.mse_loss(predictions, targets_tensor) + + # Backward pass and optimize + optimizer.zero_grad() loss.backward() optimizer.step() - total_loss += loss.item() + total_loss += loss.item() + return total_loss / epochs class TradingEnvironment: - def __init__(self, exchange, symbol="ETH/USDT", timeframe="1m", leverage=MAX_LEVERAGE, - initial_balance=INITIAL_BALANCE, window_size=60, is_demo=True): - self.exchange = exchange - self.symbol = symbol - self.timeframe = timeframe - self.leverage = leverage - self.balance = initial_balance + def __init__(self, initial_balance=INITIAL_BALANCE, window_size=30, demo=True): + """Initialize the trading environment""" self.initial_balance = initial_balance + self.balance = initial_balance self.window_size = window_size - self.is_demo = is_demo - - self.position = None # 'long', 'short', or None - self.entry_price = 0.0 - self.position_size = 0.0 - self.stop_loss = 0.0 - self.take_profit = 0.0 - - self.data = deque(maxlen=window_size) + self.demo = demo + self.data = [] + self.position = 'flat' # 'flat', 'long', or 'short' + self.position_size = 0 + self.entry_price = 0 + self.entry_index = 0 + self.stop_loss = 0 + self.take_profit = 0 self.trades = [] - self.current_step = 0 - - # Action space: 0 = hold, 1 = buy, 2 = sell, 3 = close - self.action_space = 4 - - self._initialize_features() - - # Add price prediction model - self.price_predictor = None - self.predicted_prices = [] - - # Add statistics tracking - self.episode_pnl = 0.0 - self.total_pnl = 0.0 self.win_count = 0 self.loss_count = 0 - self.trade_count = 0 - self.max_drawdown = 0.0 + self.total_pnl = 0.0 + self.episode_pnl = 0.0 self.peak_balance = initial_balance + self.max_drawdown = 0.0 + self.current_step = 0 + self.current_price = 0 - # For backtesting optimal trades - self.optimal_trades = [] - - def _initialize_features(self): - """Initialize technical indicators and features""" + # Initialize features self.features = { 'price': [], 'volume': [], @@ -292,172 +301,339 @@ class TradingEnvironment: 'macd_signal': [], 'macd_hist': [], 'bollinger_upper': [], - 'bollinger_lower': [], 'bollinger_mid': [], - 'atr': [], - 'ema_fast': [], - 'ema_slow': [], + 'bollinger_lower': [], 'stoch_k': [], 'stoch_d': [], - 'mom': [] # Momentum + 'ema_9': [], + 'ema_21': [], + 'atr': [] } - async def fetch_initial_data(self): - """Fetch historical data to initialize the environment""" - logger.info(f"Fetching initial {self.window_size} candles for {self.symbol}...") - try: - # Try to use fetch_ohlcv directly - try: - # Check if exchange has async methods - if hasattr(self.exchange, 'has') and self.exchange.has.get('fetchOHLCVAsync', False): - ohlcv = await self.exchange.fetchOHLCVAsync( - self.symbol, - timeframe=self.timeframe, - limit=self.window_size - ) - else: - # Use synchronous method in an executor - loop = asyncio.get_event_loop() - ohlcv = await loop.run_in_executor( - None, - lambda: self.exchange.fetch_ohlcv( - self.symbol, - timeframe=self.timeframe, - limit=self.window_size - ) - ) - except AttributeError: - # Fallback to synchronous method - loop = asyncio.get_event_loop() - ohlcv = await loop.run_in_executor( - None, - lambda: self.exchange.fetch_ohlcv( - self.symbol, - timeframe=self.timeframe, - limit=self.window_size - ) - ) - - for candle in ohlcv: - timestamp, open_price, high, low, close, volume = candle - self.data.append({ - 'timestamp': timestamp, - 'open': open_price, - 'high': high, - 'low': low, - 'close': close, - 'volume': volume - }) - - self._update_features() - logger.info(f"Successfully fetched {len(self.data)} initial candles") - - except Exception as e: - logger.error(f"Error fetching initial data: {e}") - raise - - def _update_features(self): - """Calculate technical indicators from price data""" - if len(self.data) < 14: # Need minimum data for indicators + # Initialize price predictor + self.price_predictor = None + self.predicted_prices = np.array([]) + + # Initialize optimal trade tracking + self.optimal_bottoms = [] + self.optimal_tops = [] + self.optimal_signals = np.array([]) + + def reset(self): + """Reset the environment to initial state""" + self.balance = self.initial_balance + self.position = 'flat' + self.position_size = 0 + self.entry_price = 0 + self.entry_index = 0 + self.stop_loss = 0 + self.take_profit = 0 + self.trades = [] + self.win_count = 0 + self.loss_count = 0 + self.episode_pnl = 0.0 + self.peak_balance = self.initial_balance + self.max_drawdown = 0.0 + self.current_step = 0 + + # Keep data but reset current position + if len(self.data) > self.window_size: + self.current_step = self.window_size + self.current_price = self.data[self.current_step]['close'] + + return self.get_state() + + def add_data(self, candle): + """Add a new candle to the data""" + self.data.append(candle) + self._update_features() + self.current_price = candle['close'] + + def _initialize_features(self): + """Initialize technical indicators and features""" + if len(self.data) < 30: return - df = pd.DataFrame(list(self.data)) + # Convert data to pandas DataFrame for easier calculation + df = pd.DataFrame(self.data) # Basic price and volume self.features['price'] = df['close'].values self.features['volume'] = df['volume'].values - # EMAs - self.features['ema_fast'] = df['close'].ewm(span=12, adjust=False).mean().values - self.features['ema_slow'] = df['close'].ewm(span=26, adjust=False).mean().values - - # RSI + # Calculate RSI (14 periods) delta = df['close'].diff() gain = delta.where(delta > 0, 0).rolling(window=14).mean() loss = -delta.where(delta < 0, 0).rolling(window=14).mean() rs = gain / loss - rsi = 100 - (100 / (1 + rs)) - self.features['rsi'] = rsi.fillna(50).values + self.features['rsi'] = 100 - (100 / (1 + rs)).fillna(50).values - # MACD + # Calculate MACD ema12 = df['close'].ewm(span=12, adjust=False).mean() ema26 = df['close'].ewm(span=26, adjust=False).mean() macd = ema12 - ema26 - macd_signal = macd.ewm(span=9, adjust=False).mean() - macd_hist = macd - macd_signal - + signal = macd.ewm(span=9, adjust=False).mean() self.features['macd'] = macd.values - self.features['macd_signal'] = macd_signal.values - self.features['macd_hist'] = macd_hist.values + self.features['macd_signal'] = signal.values + self.features['macd_hist'] = (macd - signal).values - # Bollinger Bands + # Calculate Bollinger Bands sma20 = df['close'].rolling(window=20).mean() std20 = df['close'].rolling(window=20).std() - upper_band = sma20 + (std20 * 2) - lower_band = sma20 - (std20 * 2) + self.features['bollinger_upper'] = (sma20 + 2 * std20).values + self.features['bollinger_mid'] = sma20.values + self.features['bollinger_lower'] = (sma20 - 2 * std20).values - self.features['bollinger_upper'] = upper_band.fillna(method='bfill').values - self.features['bollinger_mid'] = sma20.fillna(method='bfill').values - self.features['bollinger_lower'] = lower_band.fillna(method='bfill').values + # Calculate Stochastic Oscillator + low_14 = df['low'].rolling(window=14).min() + high_14 = df['high'].rolling(window=14).max() + k = 100 * ((df['close'] - low_14) / (high_14 - low_14)) + self.features['stoch_k'] = k.values + self.features['stoch_d'] = k.rolling(window=3).mean().values - # ATR (Average True Range) + # Calculate EMAs + self.features['ema_9'] = df['close'].ewm(span=9, adjust=False).mean().values + self.features['ema_21'] = df['close'].ewm(span=21, adjust=False).mean().values + + # Calculate ATR high_low = df['high'] - df['low'] high_close = (df['high'] - df['close'].shift()).abs() low_close = (df['low'] - df['close'].shift()).abs() - ranges = pd.concat([high_low, high_close, low_close], axis=1) - true_range = ranges.max(axis=1) - atr = true_range.rolling(window=14).mean() + tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) + self.features['atr'] = tr.rolling(window=14).mean().fillna(0).values + + def _update_features(self): + """Update technical indicators with new data""" + self._initialize_features() # Recalculate all features + + async def fetch_initial_data(self, exchange, symbol="ETH/USDT", timeframe="1m", limit=1000): + """Fetch initial historical data for the environment""" + try: + logger.info(f"Fetching initial data for {symbol}") + + # Use the refactored fetch method + data = await fetch_ohlcv_data(exchange, symbol, timeframe, limit) + + # Update environment with fetched data + if data: + self.data = data + self._initialize_features() + logger.info(f"Initialized environment with {len(data)} candles") + else: + logger.warning("No initial data received") + + return len(data) > 0 + except Exception as e: + logger.error(f"Error fetching initial data: {e}") + return False + + def step(self, action): + """Take an action in the environment""" + # Store previous balance for reward calculation + prev_balance = self.balance - self.features['atr'] = atr.fillna(method='bfill').values + # Update current price + if self.current_step < len(self.data) - 1: + self.current_step += 1 + self.current_price = self.data[self.current_step]['close'] + else: + # End of data + return self.get_state(), 0, True - # Stochastic Oscillator - low_min = df['low'].rolling(window=14).min() - high_max = df['high'].rolling(window=14).max() + # Check for stop loss or take profit + self._check_sl_tp() - k = 100 * ((df['close'] - low_min) / (high_max - low_min)) - d = k.rolling(window=3).mean() + # Calculate reward based on action + reward = self.calculate_reward(action) - self.features['stoch_k'] = k.fillna(50).values - self.features['stoch_d'] = d.fillna(50).values + # Check if we've reached the end of the data + done = self.current_step >= len(self.data) - 1 - # Momentum - self.features['mom'] = df['close'].diff(periods=10).values - - async def _update_with_new_data(self, candle): - """Update environment with new candle data""" - self.data.append(candle) - self._update_features() - self._check_position() - - def _check_position(self): + return self.get_state(), reward, done + + def _check_sl_tp(self): """Check if stop loss or take profit has been hit""" - if self.position is None or len(self.features['price']) == 0: + if self.position == 'flat': return - current_price = self.features['price'][-1] - if self.position == 'long': # Check stop loss - if current_price <= self.stop_loss: - logger.info(f"STOP LOSS triggered at {current_price} (long position)") - self._close_position(current_price, 'stop_loss') + if self.current_price <= self.stop_loss: + # Stop loss hit + pnl_percent = (self.stop_loss - self.entry_price) / self.entry_price * 100 + pnl_dollar = pnl_percent / 100 * self.position_size + + # Apply fees + pnl_dollar -= self.calculate_fees(self.position_size) + + # Update balance + self.balance += pnl_dollar + self.total_pnl += pnl_dollar + self.episode_pnl += pnl_dollar + + # Update max drawdown + if self.balance > self.peak_balance: + self.peak_balance = self.balance + drawdown = (self.peak_balance - self.balance) / self.peak_balance + self.max_drawdown = max(self.max_drawdown, drawdown) + + # Record trade + self.trades.append({ + 'type': 'long', + 'entry': self.entry_price, + 'exit': self.stop_loss, + 'pnl_percent': pnl_percent, + 'pnl_dollar': pnl_dollar, + 'duration': self.current_step - self.entry_index, + 'market_direction': self.get_market_direction(), + 'reason': 'stop_loss' + }) + + # Update win/loss count + self.loss_count += 1 + + logger.info(f"STOP LOSS hit for long at {self.stop_loss} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") + + # Reset position + self.position = 'flat' + self.entry_price = 0 + self.entry_index = 0 + self.position_size = 0 + self.stop_loss = 0 + self.take_profit = 0 # Check take profit - elif current_price >= self.take_profit: - logger.info(f"TAKE PROFIT triggered at {current_price} (long position)") - self._close_position(current_price, 'take_profit') + elif self.current_price >= self.take_profit: + # Take profit hit + pnl_percent = (self.take_profit - self.entry_price) / self.entry_price * 100 + pnl_dollar = pnl_percent / 100 * self.position_size + + # Apply fees + pnl_dollar -= self.calculate_fees(self.position_size) + + # Update balance + self.balance += pnl_dollar + self.total_pnl += pnl_dollar + self.episode_pnl += pnl_dollar + + # Update max drawdown + if self.balance > self.peak_balance: + self.peak_balance = self.balance + + # Record trade + self.trades.append({ + 'type': 'long', + 'entry': self.entry_price, + 'exit': self.take_profit, + 'pnl_percent': pnl_percent, + 'pnl_dollar': pnl_dollar, + 'duration': self.current_step - self.entry_index, + 'market_direction': self.get_market_direction(), + 'reason': 'take_profit' + }) + + # Update win/loss count + self.win_count += 1 + + logger.info(f"TAKE PROFIT hit for long at {self.take_profit} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") + + # Reset position + self.position = 'flat' + self.entry_price = 0 + self.entry_index = 0 + self.position_size = 0 + self.stop_loss = 0 + self.take_profit = 0 elif self.position == 'short': # Check stop loss - if current_price >= self.stop_loss: - logger.info(f"STOP LOSS triggered at {current_price} (short position)") - self._close_position(current_price, 'stop_loss') + if self.current_price >= self.stop_loss: + # Stop loss hit + pnl_percent = (self.entry_price - self.stop_loss) / self.entry_price * 100 + pnl_dollar = pnl_percent / 100 * self.position_size + + # Apply fees + pnl_dollar -= self.calculate_fees(self.position_size) + + # Update balance + self.balance += pnl_dollar + self.total_pnl += pnl_dollar + self.episode_pnl += pnl_dollar + + # Update max drawdown + if self.balance > self.peak_balance: + self.peak_balance = self.balance + drawdown = (self.peak_balance - self.balance) / self.peak_balance + self.max_drawdown = max(self.max_drawdown, drawdown) + + # Record trade + self.trades.append({ + 'type': 'short', + 'entry': self.entry_price, + 'exit': self.stop_loss, + 'pnl_percent': pnl_percent, + 'pnl_dollar': pnl_dollar, + 'duration': self.current_step - self.entry_index, + 'market_direction': self.get_market_direction(), + 'reason': 'stop_loss' + }) + + # Update win/loss count + self.loss_count += 1 + + logger.info(f"STOP LOSS hit for short at {self.stop_loss} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") + + # Reset position + self.position = 'flat' + self.entry_price = 0 + self.entry_index = 0 + self.position_size = 0 + self.stop_loss = 0 + self.take_profit = 0 # Check take profit - elif current_price <= self.take_profit: - logger.info(f"TAKE PROFIT triggered at {current_price} (short position)") - self._close_position(current_price, 'take_profit') + elif self.current_price <= self.take_profit: + # Take profit hit + pnl_percent = (self.entry_price - self.take_profit) / self.entry_price * 100 + pnl_dollar = pnl_percent / 100 * self.position_size + # Apply fees + pnl_dollar -= self.calculate_fees(self.position_size) + + # Update balance + self.balance += pnl_dollar + self.total_pnl += pnl_dollar + self.episode_pnl += pnl_dollar + + # Update max drawdown + if self.balance > self.peak_balance: + self.peak_balance = self.balance + + # Record trade + self.trades.append({ + 'type': 'short', + 'entry': self.entry_price, + 'exit': self.take_profit, + 'pnl_percent': pnl_percent, + 'pnl_dollar': pnl_dollar, + 'duration': self.current_step - self.entry_index, + 'market_direction': self.get_market_direction(), + 'reason': 'take_profit' + }) + + # Update win/loss count + self.win_count += 1 + + logger.info(f"TAKE PROFIT hit for short at {self.take_profit} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") + + # Reset position + self.position = 'flat' + self.entry_price = 0 + self.entry_index = 0 + self.position_size = 0 + self.stop_loss = 0 + self.take_profit = 0 + def get_state(self): """Create state representation for the agent""" if len(self.data) < 30 or len(self.features['price']) == 0: @@ -469,22 +645,22 @@ class TradingEnvironment: # Price features (normalize recent prices by the latest price) latest_price = self.features['price'][-1] - price_features = self.features['price'][-10:] / latest_price - 1.0 + price_features = np.array(self.features['price'][-10:]) / latest_price - 1.0 state_components.append(price_features) # Volume features (normalize by max volume) max_vol = max(self.features['volume'][-20:]) if len(self.features['volume']) >= 20 else 1 - vol_features = self.features['volume'][-5:] / max_vol + vol_features = np.array(self.features['volume'][-5:]) / max_vol state_components.append(vol_features) # Technical indicators - rsi = self.features['rsi'][-3:] / 100.0 # Scale to 0-1 + rsi = np.array(self.features['rsi'][-3:]) / 100.0 # Scale to 0-1 state_components.append(rsi) # MACD (normalize) - macd_vals = self.features['macd'][-3:] - macd_signal = self.features['macd_signal'][-3:] - macd_hist = self.features['macd_hist'][-3:] + macd_vals = np.array(self.features['macd'][-3:]) + macd_signal = np.array(self.features['macd_signal'][-3:]) + macd_hist = np.array(self.features['macd_hist'][-3:]) macd_scale = max(abs(np.max(macd_vals)), abs(np.min(macd_vals)), 1e-5) macd_norm = macd_vals / macd_scale macd_signal_norm = macd_signal / macd_scale @@ -493,18 +669,18 @@ class TradingEnvironment: state_components.extend([macd_norm, macd_signal_norm, macd_hist_norm]) # Bollinger position (where is price relative to bands) - bb_upper = self.features['bollinger_upper'][-3:] - bb_lower = self.features['bollinger_lower'][-3:] - bb_mid = self.features['bollinger_mid'][-3:] - price = self.features['price'][-3:] + bb_upper = np.array(self.features['bollinger_upper'][-3:]) + bb_lower = np.array(self.features['bollinger_lower'][-3:]) + bb_mid = np.array(self.features['bollinger_mid'][-3:]) + price = np.array(self.features['price'][-3:]) # Calculate position of price within Bollinger Bands (0 to 1) bb_pos = [(p - l) / (u - l) if u != l else 0.5 for p, u, l in zip(price, bb_upper, bb_lower)] - state_components.append(bb_pos) + state_components.append(np.array(bb_pos)) # Stochastic oscillator - state_components.append(self.features['stoch_k'][-3:] / 100.0) - state_components.append(self.features['stoch_d'][-3:] / 100.0) + state_components.append(np.array(self.features['stoch_k'][-3:]) / 100.0) + state_components.append(np.array(self.features['stoch_d'][-3:]) / 100.0) # Add predicted prices (if available) if hasattr(self, 'predicted_prices') and len(self.predicted_prices) > 0: @@ -519,9 +695,12 @@ class TradingEnvironment: state_components.append(np.zeros(3)) # Add extrema signals (if available) - if hasattr(self, 'optimal_signals'): + if hasattr(self, 'optimal_signals') and len(self.optimal_signals) > 0: # Get recent signals - recent_signals = self.optimal_signals[-5:] + idx = len(self.optimal_signals) - 5 + if idx < 0: + idx = 0 + recent_signals = self.optimal_signals[idx:idx+5] # Pad if needed if len(recent_signals) < 5: recent_signals = np.pad(recent_signals, (0, 5 - len(recent_signals)), 'constant') @@ -548,7 +727,7 @@ class TradingEnvironment: state_components.append(position_info) # Combine all features - state = np.concatenate(state_components) + state = np.concatenate([comp.flatten() for comp in state_components]) # Replace any NaN values state = np.nan_to_num(state, nan=0.0) @@ -563,304 +742,9 @@ class TradingEnvironment: state = np.concatenate([state, padding]) return state - - def step(self, action): - """Execute trading action and return next state, reward, done""" - reward = 0.0 - done = False - - # Get current price - if len(self.features['price']) == 0: - return self.get_state(), reward, done - - current_price = self.features['price'][-1] - - # Execute action - if action == 0: # Hold - pass - elif action == 1: # Buy (go long) - if self.position is None: - reward = self._open_long_position() - else: - reward = -0.1 # Penalty for invalid action - elif action == 2: # Sell (go short) - if self.position is None: - reward = self._open_short_position() - else: - reward = -0.1 # Penalty for invalid action - elif action == 3: # Close position - if self.position is not None: - reward = self._close_position(current_price, 'agent_decision') - else: - reward = -0.1 # Penalty for invalid action - - self.current_step += 1 - - # Check if episode should end - if self.current_step >= 10000 or self.balance <= 0.1 * self.initial_balance: - done = True - - # Update statistics - if reward != 0: # If a trade was closed - self.episode_pnl += reward - self.total_pnl += reward - - if reward > 0: - self.win_count += 1 - else: - self.loss_count += 1 - - self.trade_count += 1 - - # Update peak balance and drawdown - if self.balance > self.peak_balance: - self.peak_balance = self.balance - - drawdown = (self.peak_balance - self.balance) / self.peak_balance - if drawdown > self.max_drawdown: - self.max_drawdown = drawdown - - return self.get_state(), reward, done - - def _open_long_position(self): - """Open a long position""" - current_price = self.features['price'][-1] - - # Calculate position size (90% of balance for fees) - position_value = self.balance * 0.9 - position_size = position_value * self.leverage / current_price - - # Set stop loss and take profit - stop_loss = current_price * (1 - STOP_LOSS_PERCENT / 100) - take_profit = current_price * (1 + TAKE_PROFIT_PERCENT / 100) - - if not self.is_demo: - try: - # Create real order on MEXC - order = self.exchange.create_market_buy_order( - self.symbol, - position_size, - params={'leverage': self.leverage} - ) - logger.info(f"Opened long position: {order}") - except Exception as e: - logger.error(f"Error opening long position: {e}") - return -0.2 # Penalty for failed order - - # Update state - self.position = 'long' - self.entry_price = current_price - self.position_size = position_size - self.stop_loss = stop_loss - self.take_profit = take_profit - - # Record trade - self.trades.append({ - 'type': 'long', - 'entry_time': datetime.datetime.now().isoformat(), - 'entry_price': current_price, - 'position_size': position_size, - 'stop_loss': stop_loss, - 'take_profit': take_profit, - 'balance_before': self.balance - }) - - logger.info(f"OPENED LONG at {current_price} | Stop loss: {stop_loss} | Take profit: {take_profit}") - - return 0.1 # Small reward for taking action - - def _open_short_position(self): - """Open a short position""" - current_price = self.features['price'][-1] - - # Calculate position size (90% of balance for fees) - position_value = self.balance * 0.9 - position_size = position_value * self.leverage / current_price - - # Set stop loss and take profit - stop_loss = current_price * (1 + STOP_LOSS_PERCENT / 100) - take_profit = current_price * (1 - TAKE_PROFIT_PERCENT / 100) - - if not self.is_demo: - try: - # Create real order on MEXC - order = self.exchange.create_market_sell_order( - self.symbol, - position_size, - params={'leverage': self.leverage} - ) - logger.info(f"Opened short position: {order}") - except Exception as e: - logger.error(f"Error opening short position: {e}") - return -0.2 # Penalty for failed order - - # Update state - self.position = 'short' - self.entry_price = current_price - self.position_size = position_size - self.stop_loss = stop_loss - self.take_profit = take_profit - - # Record trade - self.trades.append({ - 'type': 'short', - 'entry_time': datetime.datetime.now().isoformat(), - 'entry_price': current_price, - 'position_size': position_size, - 'stop_loss': stop_loss, - 'take_profit': take_profit, - 'balance_before': self.balance - }) - - logger.info(f"OPENED SHORT at {current_price} | Stop loss: {stop_loss} | Take profit: {take_profit}") - - return 0.1 # Small reward for taking action - - def _close_position(self, current_price, reason): - """Close the current position""" - if self.position is None: - return 0.0 - - pnl = 0.0 - - if self.position == 'long': - pnl = (current_price - self.entry_price) / self.entry_price - elif self.position == 'short': - pnl = (self.entry_price - current_price) / self.entry_price - - # Apply leverage to PnL - pnl = pnl * self.leverage - - # Account for 0.1% trading fee - position_value = self.position_size * self.entry_price - fee = position_value * 0.001 - pnl_dollar = position_value * pnl - fee - - new_balance = self.balance + pnl_dollar - - if not self.is_demo: - try: - # Execute real order - if self.position == 'long': - order = self.exchange.create_market_sell_order(self.symbol, self.position_size) - else: - order = self.exchange.create_market_buy_order(self.symbol, self.position_size) - logger.info(f"Closed position: {order}") - except Exception as e: - logger.error(f"Error closing position: {e}") - # Still calculate PnL but add penalty - pnl -= 0.001 - - # Update trade record - last_trade = self.trades[-1] - last_trade.update({ - 'exit_time': datetime.datetime.now().isoformat(), - 'exit_price': current_price, - 'exit_reason': reason, - 'pnl_percent': pnl * 100, - 'pnl_dollar': pnl_dollar, - 'balance_after': new_balance - }) - - logger.info(f"CLOSED {self.position} at {current_price} | PnL: {pnl*100:.2f}% | ${pnl_dollar:.2f}") - - # Reset position - self.balance = new_balance - self.position = None - self.entry_price = 0.0 - self.position_size = 0.0 - self.stop_loss = 0.0 - self.take_profit = 0.0 - - # Calculate reward (scaled PnL) - reward = pnl * 100 # Scale for better learning signal - - return reward - - def reset(self): - """Reset the environment for a new episode""" - self.balance = self.initial_balance - self.position = None - self.entry_price = 0.0 - self.position_size = 0.0 - self.stop_loss = 0.0 - self.take_profit = 0.0 - self.current_step = 0 - - # Reset episode statistics - self.episode_pnl = 0.0 - - # Find optimal trades for bootstrapping - self.find_optimal_trades() - - # Update price predictions - self.update_price_predictions() - - return self.get_state() - - def initialize_price_predictor(self, device): - """Initialize the price prediction model""" - self.price_predictor = PricePredictionModel().to(device) - self.price_predictor_optimizer = optim.Adam(self.price_predictor.parameters(), lr=1e-4) - - def update_price_predictions(self): - """Update price predictions based on current data""" - if self.price_predictor is not None and len(self.features['price']) > 30: - self.predicted_prices = self.price_predictor.predict_next_candles( - self.features['price'][-100:], num_candles=5 - ) - - def train_price_predictor(self): - """Train the price prediction model on new data""" - if self.price_predictor is not None and len(self.features['price']) > 35: - loss = self.price_predictor.train_on_new_data( - self.features['price'], self.price_predictor_optimizer - ) - return loss - return 0.0 - - def find_optimal_trades(self): - """Find optimal buy/sell points for bootstrapping""" - if len(self.features['price']) < 30: - return - - prices = np.array(self.features['price']) - window = 10 # Window to look for local minima/maxima - - self.optimal_trades = np.zeros(len(prices)) - - for i in range(window, len(prices) - window): - # Check for local minimum (buy signal) - if prices[i] == min(prices[i-window:i+window+1]): - self.optimal_trades[i] = 1 # Buy signal - - # Check for local maximum (sell signal) - if prices[i] == max(prices[i-window:i+window+1]): - self.optimal_trades[i] = 2 # Sell signal - - def identify_optimal_trades(self): - """Identify optimal entry and exit points based on local extrema""" - if len(self.features['price']) < 20: - return - - # Find local bottoms and tops - bottoms, tops = find_local_extrema(self.features['price'], window=5) - - # Store optimal trade points - self.optimal_bottoms = [i for i in bottoms] # Buy points - self.optimal_tops = [i for i in tops] # Sell points - - # Create optimal trade signals - self.optimal_signals = np.zeros(len(self.features['price'])) - for i in self.optimal_bottoms: - self.optimal_signals[i] = 1 # Buy signal - for i in self.optimal_tops: - self.optimal_signals[i] = -1 # Sell signal - - logger.info(f"Identified {len(self.optimal_bottoms)} optimal buy points and {len(self.optimal_tops)} optimal sell points") - + def calculate_reward(self, action): - """Calculate reward for the given action""" + """Calculate reward for the given action with improved penalties for losing trades""" reward = 0 # Base reward for actions @@ -881,7 +765,11 @@ class TradingEnvironment: if hasattr(self, 'optimal_bottoms') and current_idx in self.optimal_bottoms: reward += 2.0 # Bonus for buying at a bottom else: - reward += 0.1 # Small reward for opening a position + # Check if we're buying in a downtrend (bad) + if self.is_downtrend(): + reward -= 0.5 # Penalty for buying in downtrend + else: + reward += 0.1 # Small reward for opening a position logger.info(f"OPENED LONG at {self.entry_price} | Stop loss: {self.stop_loss} | Take profit: {self.take_profit}") @@ -898,34 +786,43 @@ class TradingEnvironment: self.total_pnl += pnl_dollar # Record trade + trade_duration = len(self.features['price']) - self.entry_index self.trades.append({ 'type': 'short', 'entry': self.entry_price, 'exit': self.current_price, 'pnl_percent': pnl_percent, - 'pnl_dollar': pnl_dollar + 'pnl_dollar': pnl_dollar, + 'duration': trade_duration, + 'market_direction': self.get_market_direction() }) - # Reward based on PnL + # Reward based on PnL with stronger penalties for losses if pnl_dollar > 0: reward += 1.0 + pnl_dollar / 10 # Positive reward for profit self.win_count += 1 else: - reward -= 1.0 # Negative reward for loss + # Stronger penalty for losses, scaled by the size of the loss + loss_penalty = 1.0 + abs(pnl_dollar) / 5 + reward -= loss_penalty self.loss_count += 1 + + # Extra penalty for closing a losing trade too quickly + if trade_duration < 5: + reward -= 0.5 # Penalty for very short losing trades logger.info(f"CLOSED short at {self.current_price} | PnL: {pnl_percent:.2f}% | ${pnl_dollar:.2f}") # Now open long self.position = 'long' self.entry_price = self.current_price + self.entry_index = len(self.features['price']) - 1 self.position_size = self.calculate_position_size() self.stop_loss = self.entry_price * (1 - STOP_LOSS_PERCENT/100) self.take_profit = self.entry_price * (1 + TAKE_PROFIT_PERCENT/100) # Check if this is an optimal buy point - current_idx = len(self.features['price']) - 1 - if hasattr(self, 'optimal_bottoms') and current_idx in self.optimal_bottoms: + if hasattr(self, 'optimal_bottoms') and self.entry_index in self.optimal_bottoms: reward += 2.0 # Bonus for buying at a bottom logger.info(f"OPENED LONG at {self.entry_price} | Stop loss: {self.stop_loss} | Take profit: {self.take_profit}") @@ -1100,41 +997,115 @@ class TradingEnvironment: return reward + def is_downtrend(self): + """Check if the market is in a downtrend""" + if len(self.features['price']) < 20: + return False + + # Use EMA to determine trend + short_ema = self.features['ema_9'][-1] + long_ema = self.features['ema_21'][-1] + + # Downtrend if short EMA is below long EMA + return short_ema < long_ema + + def is_uptrend(self): + """Check if the market is in an uptrend""" + if len(self.features['price']) < 20: + return False + + # Use EMA to determine trend + short_ema = self.features['ema_9'][-1] + long_ema = self.features['ema_21'][-1] + + # Uptrend if short EMA is above long EMA + return short_ema > long_ema + + def get_market_direction(self): + """Get the current market direction""" + if self.is_uptrend(): + return "uptrend" + elif self.is_downtrend(): + return "downtrend" + else: + return "sideways" + + def analyze_trades(self): + """Analyze completed trades to identify patterns""" + if not self.trades: + return {} + + analysis = { + 'total_trades': len(self.trades), + 'winning_trades': sum(1 for t in self.trades if t.get('pnl_dollar', 0) > 0), + 'losing_trades': sum(1 for t in self.trades if t.get('pnl_dollar', 0) <= 0), + 'avg_win': 0, + 'avg_loss': 0, + 'avg_duration': 0, + 'uptrend_win_rate': 0, + 'downtrend_win_rate': 0, + 'sideways_win_rate': 0 + } + + # Calculate averages + wins = [t.get('pnl_dollar', 0) for t in self.trades if t.get('pnl_dollar', 0) > 0] + losses = [t.get('pnl_dollar', 0) for t in self.trades if t.get('pnl_dollar', 0) <= 0] + durations = [t.get('duration', 0) for t in self.trades] + + analysis['avg_win'] = sum(wins) / len(wins) if wins else 0 + analysis['avg_loss'] = sum(losses) / len(losses) if losses else 0 + analysis['avg_duration'] = sum(durations) / len(durations) if durations else 0 + + # Calculate win rates by market direction + for direction in ['uptrend', 'downtrend', 'sideways']: + direction_trades = [t for t in self.trades if t.get('market_direction') == direction] + if direction_trades: + wins_in_direction = sum(1 for t in direction_trades if t.get('pnl_dollar', 0) > 0) + analysis[f'{direction}_win_rate'] = wins_in_direction / len(direction_trades) * 100 + + return analysis + +# Ensure GPU usage if available +def get_device(): + """Get the best available device (CUDA GPU or CPU)""" + if torch.cuda.is_available(): + device = torch.device("cuda") + logger.info(f"Using GPU: {torch.cuda.get_device_name(0)}") + # Set up for mixed precision training + torch.backends.cudnn.benchmark = True + else: + device = torch.device("cpu") + logger.info("GPU not available, using CPU") + return device + +# Update Agent class to use GPU properly class Agent: def __init__(self, state_size, action_size, hidden_size=256, lstm_layers=2, attention_heads=4, - device="cuda" if torch.cuda.is_available() else "cpu"): + device=None): + if device is None: + self.device = get_device() + else: + self.device = device + self.state_size = state_size self.action_size = action_size - self.device = device self.memory = ReplayMemory(MEMORY_SIZE) + self.steps_done = 0 - # Configure for RTX 4060 (8GB VRAM) - if device == "cuda": - torch.backends.cudnn.benchmark = True # Optimize for fixed input sizes - logger.info(f"Using GPU: {torch.cuda.get_device_name(0)}") - logger.info(f"Available VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB") - - # Q-Networks with configurable size - self.policy_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(device) - self.target_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(device) + # Initialize policy and target networks + self.policy_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(self.device) + self.target_net = DQN(state_size, action_size, hidden_size, lstm_layers, attention_heads).to(self.device) self.target_net.load_state_dict(self.policy_net.state_dict()) self.target_net.eval() - # Print model size - total_params = sum(p.numel() for p in self.policy_net.parameters()) - logger.info(f"Model size: {total_params:,} parameters") + # Initialize optimizer with weight decay for regularization + self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE, weight_decay=1e-5) - self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE) - - # Mixed precision training + # Initialize gradient scaler for mixed precision training self.scaler = amp.GradScaler() - self.use_amp = device == "cuda" # Only use mixed precision on GPU - self.epsilon = EPSILON_START - self.steps_done = 0 - - # TensorBoard logging - self.writer = SummaryWriter(log_dir='runs/trading_agent') + # TensorBoard writer + self.writer = SummaryWriter() # Create models directory if it doesn't exist os.makedirs("models", exist_ok=True) @@ -1227,7 +1198,7 @@ class Agent: dones = torch.FloatTensor([exp.done for exp in experiences]).to(self.device) # Use mixed precision for forward/backward passes - if self.use_amp: + if self.device.type == "cuda": with amp.autocast(): # Compute Q values current_q_values = self.policy_net(states).gather(1, actions) @@ -1258,7 +1229,6 @@ class Agent: # Compute next state values using target network with torch.no_grad(): next_q_values = self.target_net(next_states).max(1)[0] - target_q_values = rewards + (GAMMA * next_q_values * (1 - dones)) # Reshape target values to match current_q_values target_q_values = target_q_values.unsqueeze(1) @@ -1346,8 +1316,8 @@ async def get_live_prices(symbol="ETH/USDT", timeframe="1m"): break async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000): - """Train the agent using historical and live data""" - logger.info("Starting training...") + """Train the agent using historical and live data with GPU acceleration""" + logger.info(f"Starting training on device: {agent.device}") stats = { 'episode_rewards': [], @@ -1357,24 +1327,32 @@ async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000) 'episode_pnls': [], 'cumulative_pnl': [], 'drawdowns': [], - 'prediction_losses': [] + 'prediction_accuracy': [], + 'trade_analysis': [] } best_reward = -float('inf') best_pnl = -float('inf') - # Initialize price predictor - env.initialize_price_predictor(agent.device) - try: + # Initialize price predictor + env.initialize_price_predictor(agent.device) + for episode in range(num_episodes): try: + # Reset environment state = env.reset() episode_reward = 0 + env.episode_pnl = 0.0 # Reset episode PnL + + # Identify optimal trade points for this episode + env.identify_optimal_trades() # Train price predictor prediction_loss = env.train_price_predictor() - stats['prediction_losses'].append(prediction_loss) + + # Update price predictions + env.update_price_predictions() for step in range(max_steps_per_episode): # Select action @@ -1389,7 +1367,7 @@ async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000) state = next_state episode_reward += reward - # Learn from experience + # Learn from experience with mixed precision try: loss = agent.learn() if loss is not None: @@ -1397,20 +1375,37 @@ async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000) except Exception as e: logger.error(f"Learning error in episode {episode}, step {step}: {e}") + # Update price predictions periodically + if step % 10 == 0: + env.update_price_predictions() + if done: break # Update target network if episode % TARGET_UPDATE == 0: - agent.update_target_network() + agent.target_net.load_state_dict(agent.policy_net.state_dict()) - # Calculate statistics + # Calculate win rate if len(env.trades) > 0: wins = sum(1 for trade in env.trades if trade.get('pnl_percent', 0) > 0) win_rate = wins / len(env.trades) * 100 else: win_rate = 0 + # Analyze trades + trade_analysis = env.analyze_trades() + stats['trade_analysis'].append(trade_analysis) + + # Calculate prediction accuracy + prediction_accuracy = 0.0 + if hasattr(env, 'predicted_prices') and len(env.predicted_prices) > 0: + if len(env.data) > 5: + actual_prices = [candle['close'] for candle in env.data[-5:]] + predicted = env.predicted_prices[:min(5, len(actual_prices))] + errors = [abs(p - a) / a for p, a in zip(predicted, actual_prices[:len(predicted)])] + prediction_accuracy = 100 * (1 - sum(errors) / len(errors)) + # Log statistics stats['episode_rewards'].append(episode_reward) stats['episode_lengths'].append(step + 1) @@ -1419,6 +1414,13 @@ async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000) stats['episode_pnls'].append(env.episode_pnl) stats['cumulative_pnl'].append(env.total_pnl) stats['drawdowns'].append(env.max_drawdown * 100) + stats['prediction_accuracy'].append(prediction_accuracy) + + # Log detailed trade analysis + if trade_analysis: + logger.info(f"Trade Analysis: Win Rate={trade_analysis.get('uptrend_win_rate', 0):.1f}% in uptrends, " + f"{trade_analysis.get('downtrend_win_rate', 0):.1f}% in downtrends | " + f"Avg Win=${trade_analysis.get('avg_win', 0):.2f}, Avg Loss=${trade_analysis.get('avg_loss', 0):.2f}") # Log to TensorBoard agent.writer.add_scalar('Reward/train', episode_reward, episode) @@ -1428,11 +1430,12 @@ async def train_agent(agent, env, num_episodes=1000, max_steps_per_episode=1000) agent.writer.add_scalar('PnL/cumulative', env.total_pnl, episode) agent.writer.add_scalar('Drawdown/percent', env.max_drawdown * 100, episode) agent.writer.add_scalar('PredictionLoss', prediction_loss, episode) + agent.writer.add_scalar('PredictionAccuracy', prediction_accuracy, episode) logger.info(f"Episode {episode}: Reward={episode_reward:.2f}, Balance=${env.balance:.2f}, " f"Win Rate={win_rate:.1f}%, Trades={len(env.trades)}, " f"Episode PnL=${env.episode_pnl:.2f}, Total PnL=${env.total_pnl:.2f}, " - f"Max Drawdown={env.max_drawdown*100:.1f}%") + f"Max Drawdown={env.max_drawdown*100:.1f}%, Pred Accuracy={prediction_accuracy:.1f}%") # Save best model by reward if episode_reward > best_reward: @@ -1579,11 +1582,11 @@ async def test_training(): timeframe="1m", leverage=MAX_LEVERAGE, initial_balance=100, # Small balance for testing - is_demo=True # Always use demo mode for testing + demo=True # Always use demo mode for testing ) # Fetch initial data - await env.fetch_initial_data() + await env.fetch_initial_data(exchange, "ETH/USDT", "1m", 1000) # Create agent agent = Agent(state_size=STATE_SIZE, action_size=env.action_space) @@ -1638,136 +1641,229 @@ async def test_training(): finally: await exchange.close() +async def initialize_exchange(): + """Initialize the exchange connection""" + try: + # Try to initialize with async support first + try: + exchange = ccxt.pro.mexc({ + 'apiKey': MEXC_API_KEY, + 'secret': MEXC_SECRET_KEY, + 'enableRateLimit': True + }) + logger.info(f"Exchange initialized with async support: {exchange.id}") + except (AttributeError, ImportError): + # Fall back to standard CCXT + exchange = ccxt.mexc({ + 'apiKey': MEXC_API_KEY, + 'secret': MEXC_SECRET_KEY, + 'enableRateLimit': True + }) + logger.info(f"Exchange initialized with standard CCXT: {exchange.id}") + + return exchange + except Exception as e: + logger.error(f"Failed to initialize exchange: {e}") + raise + +async def get_historical_data(exchange, symbol="ETH/USDT", timeframe="1m", limit=1000): + """Fetch historical OHLCV data from the exchange""" + try: + logger.info(f"Fetching historical data for {symbol}, timeframe {timeframe}, limit {limit}") + + # Use the refactored fetch method + data = await fetch_ohlcv_data(exchange, symbol, timeframe, limit) + + if not data: + logger.warning("No historical data received") + + return data + except Exception as e: + logger.error(f"Failed to fetch historical data: {e}") + return [] + +async def live_trading(agent, env, exchange, demo=True): + """Run live trading with the trained agent""" + logger.info(f"Starting live trading (demo mode: {demo})") + + try: + # Subscribe to websocket for real-time data + symbol = "ETH/USDT" + timeframe = "1m" + + # Initialize with historical data + success = await env.fetch_initial_data(exchange, symbol, timeframe, 100) + if not success: + logger.error("Failed to initialize with historical data") + return + + # Main trading loop + while True: + # Wait for the next candle (1 minute) + await asyncio.sleep(5) # Check every 5 seconds + + # Fetch latest candle + latest_candle = await get_latest_candle(exchange, symbol) + + if not latest_candle: + logger.warning("No latest candle received, skipping update") + continue + + # Update environment with new data + env.add_data(latest_candle) + + # Get current state + state = env.get_state() + + # Select action (no exploration in live trading) + action = agent.select_action(state, training=False) + + # Take action + _, reward, _ = env.step(action) + + # Log trading activity + action_names = ["HOLD", "BUY", "SELL", "CLOSE"] + logger.info(f"Price: ${latest_candle['close']:.2f} | Action: {action_names[action]}") + + # Log performance metrics + if env.trades: + wins = sum(1 for t in env.trades if t.get('pnl_percent', 0) > 0) + win_rate = wins / len(env.trades) * 100 + total_pnl = sum(t.get('pnl_dollar', 0) for t in env.trades) + + logger.info(f"Balance: ${env.balance:.2f} | Trades: {len(env.trades)} | " + f"Win Rate: {win_rate:.1f}% | Total PnL: ${total_pnl:.2f}") + + # Analyze recent trades + trade_analysis = env.analyze_trades() + if trade_analysis: + logger.info(f"Recent Performance: Win Rate={trade_analysis.get('uptrend_win_rate', 0):.1f}% in uptrends, " + f"{trade_analysis.get('downtrend_win_rate', 0):.1f}% in downtrends") + + except KeyboardInterrupt: + logger.info("Live trading stopped by user") + except Exception as e: + logger.error(f"Error in live trading: {e}") + raise + +async def get_latest_candle(exchange, symbol): + """Get the latest candle data""" + try: + # Use the refactored fetch method with limit=1 + data = await fetch_ohlcv_data(exchange, symbol, "1m", 1) + + if data and len(data) > 0: + return data[0] + else: + logger.warning("No candle data received") + return None + except Exception as e: + logger.error(f"Failed to fetch latest candle: {e}") + return None + +async def fetch_ohlcv_data(exchange, symbol, timeframe, limit): + """Fetch OHLCV data with proper handling for both async and standard CCXT""" + try: + # Check if exchange has fetchOHLCV method + if not hasattr(exchange, 'fetchOHLCV'): + logger.error("Exchange does not support OHLCV data fetching") + return [] + + # Handle different CCXT versions + if hasattr(exchange, 'has') and exchange.has.get('fetchOHLCVAsync', False): + # Use async method if available + ohlcv = await exchange.fetchOHLCV(symbol, timeframe, limit=limit) + else: + # Use synchronous method with run_in_executor + loop = asyncio.get_event_loop() + ohlcv = await loop.run_in_executor( + None, + lambda: exchange.fetch_ohlcv(symbol, timeframe, limit=limit) + ) + + # Convert to list of dictionaries + data = [] + for candle in ohlcv: + timestamp, open_price, high, low, close, volume = candle + data.append({ + 'timestamp': timestamp, + 'open': open_price, + 'high': high, + 'low': low, + 'close': close, + 'volume': volume + }) + + logger.info(f"Fetched {len(data)} candles for {symbol} ({timeframe})") + return data + + except Exception as e: + logger.error(f"Error fetching OHLCV data: {e}") + return [] + async def main(): - # Parse command line arguments - import argparse - parser = argparse.ArgumentParser(description='ETH/USD Trading Bot with RL') - parser.add_argument('--mode', type=str, default='train', choices=['train', 'eval', 'live', 'test'], - help='Operation mode: train, eval, live, or test') - parser.add_argument('--episodes', type=int, default=1000, help='Number of episodes for training/evaluation') - parser.add_argument('--demo', action='store_true', help='Run in demo mode (no real trading)') + """Main function to run the trading bot""" + parser = argparse.ArgumentParser(description='Crypto Trading Bot') + parser.add_argument('--mode', type=str, default='train', choices=['train', 'evaluate', 'live'], + help='Mode to run the bot in') + parser.add_argument('--episodes', type=int, default=1000, help='Number of episodes to train') + parser.add_argument('--demo', action='store_true', help='Run in demo mode (no real trades)') args = parser.parse_args() - if args.mode == 'test': - # Run training tests - success = await test_training() - if success: - logger.info("All tests passed!") - else: - logger.error("Tests failed!") - return + # Get device (GPU or CPU) + device = get_device() - # Initialize exchange with async capabilities - try: - # Try the newer CCXT approach first - exchange = ccxt.mexc({ - 'apiKey': MEXC_API_KEY, - 'secret': MEXC_SECRET_KEY, - 'enableRateLimit': True, - 'asyncio_loop': asyncio.get_event_loop() - }) - except Exception as e: - logger.warning(f"Could not initialize exchange with asyncio_loop: {e}") - # Fallback to standard exchange - exchange = ccxt.mexc({ - 'apiKey': MEXC_API_KEY, - 'secret': MEXC_SECRET_KEY, - 'enableRateLimit': True, - }) + exchange = None try: + # Initialize exchange + exchange = await initialize_exchange() + # Create environment - env = TradingEnvironment( - exchange=exchange, - symbol="ETH/USDT", - timeframe="1m", - leverage=MAX_LEVERAGE, - initial_balance=INITIAL_BALANCE, - is_demo=args.demo or args.mode != 'live' - ) + env = TradingEnvironment(initial_balance=INITIAL_BALANCE, window_size=30, demo=args.demo) # Fetch initial data - await env.fetch_initial_data() + await env.fetch_initial_data(exchange, "ETH/USDT", "1m", 1000) # Create agent - agent = Agent(state_size=STATE_SIZE, action_size=env.action_space) - - # Try to load existing model - model_loaded = agent.load() - if not model_loaded and args.mode in ['eval', 'live']: - logger.warning("No pre-trained model found. Consider training first!") + agent = Agent(STATE_SIZE, 4, hidden_size=384, lstm_layers=2, attention_heads=4, device=device) if args.mode == 'train': - # Training mode - logger.info("Starting training mode") - await train_agent(agent, env, num_episodes=args.episodes) + # Train the agent + logger.info(f"Starting training for {args.episodes} episodes...") + stats = await train_agent(agent, env, num_episodes=args.episodes) - elif args.mode == 'eval': - # Evaluation mode - logger.info("Starting evaluation mode") - eval_reward, eval_profit, win_rate = evaluate_agent(agent, env, num_episodes=args.episodes) + elif args.mode == 'evaluate': + # Load trained model + agent.load("models/trading_agent_best_pnl.pt") + + # Evaluate the agent + logger.info("Evaluating agent...") + avg_reward, avg_profit, win_rate = evaluate_agent(agent, env) elif args.mode == 'live': - # Live trading mode - logger.info("Starting live trading mode with real-time data") - logger.info(f"Demo mode: {args.demo}") + # Load trained model + agent.load("models/trading_agent_best_pnl.pt") + + # Run live trading + logger.info("Starting live trading...") + await live_trading(agent, env, exchange, demo=args.demo) - # Live trading loop - async for candle in get_live_prices("ETH/USDT", "1m"): - # Update environment with new data - await env._update_with_new_data(candle) - - # Only trade if we have enough data - if len(env.data) >= env.window_size: - # Get current state - state = env.get_state() - - # Select action (no exploration in live trading) - action = agent.select_action(state, training=False) - - # Convert action number to readable format - action_names = ["HOLD", "BUY", "SELL", "CLOSE"] - logger.info(f"Price: ${candle['close']:.2f} | Action: {action_names[action]}") - - # Take action - _, reward, _ = env.step(action) - - # Print statistics - if len(env.trades) > 0: - wins = sum(1 for trade in env.trades if trade.get('pnl_percent', 0) > 0) - win_rate = wins / len(env.trades) * 100 - total_pnl = sum(trade.get('pnl_dollar', 0) for trade in env.trades) - logger.info(f"Balance: ${env.balance:.2f} | Trades: {len(env.trades)} | " - f"Win Rate: {win_rate:.1f}% | Total PnL: ${total_pnl:.2f}") - finally: - # Clean up exchange connection - await exchange.close() + # Clean up exchange connection - safely close if possible + if exchange: + try: + # Some CCXT exchanges have close method, others don't + if hasattr(exchange, 'close'): + await exchange.close() + elif hasattr(exchange, 'client') and hasattr(exchange.client, 'close'): + await exchange.client.close() + logger.info("Exchange connection closed") + except Exception as e: + logger.warning(f"Could not properly close exchange connection: {e}") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - logger.info("Program terminated by user") - -# Add these functions to identify tops and bottoms -def find_local_extrema(prices, window=5): - """Find local minima (bottoms) and maxima (tops) in price data""" - bottoms = [] - tops = [] - - if len(prices) < window * 2 + 1: - return bottoms, tops - - for i in range(window, len(prices) - window): - # Check if this is a local minimum (bottom) - if all(prices[i] <= prices[i-j] for j in range(1, window+1)) and \ - all(prices[i] <= prices[i+j] for j in range(1, window+1)): - bottoms.append(i) - - # Check if this is a local maximum (top) - if all(prices[i] >= prices[i-j] for j in range(1, window+1)) and \ - all(prices[i] >= prices[i+j] for j in range(1, window+1)): - tops.append(i) - - return bottoms, tops \ No newline at end of file + logger.info("Program terminated by user") \ No newline at end of file