import numpy as np import pandas as pd class NNFXStrategy: def __init__(self, ssl_period=140, t3_fast_length=40, t3_slow_length=90, adx_len=100, di_len=110, adx_ema_length=80, atr_length=120, atr_stop_loss_multiplier=10, atr_take_profit_multiplier=20): self.ssl_period = ssl_period self.t3_fast_length = t3_fast_length self.t3_slow_length = t3_slow_length self.adx_len = adx_len self.di_len = di_len self.adx_ema_length = adx_ema_length self.atr_length = atr_length self.atr_stop_loss_multiplier = atr_stop_loss_multiplier self.atr_take_profit_multiplier = atr_take_profit_multiplier def sma(self, series, period): return series.rolling(window=period).mean() def t3(self, series, length, b=0.7): e1 = series.ewm(span=length).mean() e2 = e1.ewm(span=length).mean() e3 = e2.ewm(span=length).mean() e4 = e3.ewm(span=length).mean() e5 = e4.ewm(span=length).mean() e6 = e5.ewm(span=length).mean() c1 = -b * b * b c2 = 3 * b * b + 3 * b * b * b c3 = -6 * b * b - 3 * b - 3 * b * b * b c4 = 1 + 3 * b + b * b * b + 3 * b * b return c1 * e6 + c2 * e5 + c3 * e4 + c4 * e3 def adx(self, high, low, close, di_len, adx_len): plus_dm = high.diff().clip(lower=0) minus_dm = low.diff().clip(upper=0).abs() tr = np.maximum.reduce([high - low, (high - close.shift()).abs(), (low - close.shift()).abs()]) atr = tr.rolling(window=di_len).mean() plus_di = 100 * (plus_dm.rolling(window=di_len).mean() / atr) minus_di = 100 * (minus_dm.rolling(window=di_len).mean() / atr) dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di) adx = dx.rolling(window=adx_len).mean() adx_ema = adx.ewm(span=self.adx_ema_length).mean() return adx, adx_ema def atr(self, high, low, close, atr_length): tr = np.maximum.reduce([high - low, (high - close.shift()).abs(), (low - close.shift()).abs()]) return tr.rolling(window=atr_length).mean() def generate_signals(self, data): data['sma_high'] = self.sma(data['high'], self.ssl_period) data['sma_low'] = self.sma(data['low'], self.ssl_period) data['hlv'] = np.where(data['close'] > data['sma_high'], 1, np.where(data['close'] < data['sma_low'], -1, np.nan)) data['hlv'] = data['hlv'].ffill().fillna(0) data['ssl_down'] = np.where(data['hlv'] < 0, data['sma_high'], data['sma_low']) data['ssl_up'] = np.where(data['hlv'] < 0, data['sma_low'], data['sma_high']) data['t3_fast'] = self.t3(data['close'], self.t3_fast_length) data['t3_slow'] = self.t3(data['close'], self.t3_slow_length) data['adx'], data['adx_ema'] = self.adx(data['high'], data['low'], data['close'], self.di_len, self.adx_len) data['atr'] = self.atr(data['high'], data['low'], data['close'], self.atr_length) data['long_condition'] = (data['t3_fast'] > data['t3_slow']) & (data['adx'] > data['adx_ema']) & (data['hlv'] > 0) data['short_condition'] = (data['t3_fast'] < data['t3_slow']) & (data['adx'] > data['adx_ema']) & (data['hlv'] < 0) data['exit_long_condition'] = (data['t3_fast'] < data['t3_slow']) | (data['hlv'] < 0) data['exit_short_condition'] = (data['t3_fast'] > data['t3_slow']) | (data['hlv'] > 0) return data def apply_strategy(self, data): data = self.generate_signals(data) trades = [] for i in range(1, len(data)): if data['long_condition'].iloc[i]: stop_loss = data['close'].iloc[i] - self.atr_stop_loss_multiplier * data['atr'].iloc[i] take_profit = data['close'].iloc[i] + self.atr_take_profit_multiplier * data['atr'].iloc[i] trades.append(('long', data.index[i], stop_loss, take_profit)) elif data['short_condition'].iloc[i]: stop_loss = data['close'].iloc[i] + self.atr_stop_loss_multiplier * data['atr'].iloc[i] take_profit = data['close'].iloc[i] - self.atr_take_profit_multiplier * data['atr'].iloc[i] trades.append(('short', data.index[i], stop_loss, take_profit)) elif data['exit_long_condition'].iloc[i]: trades.append(('exit_long', data.index[i])) elif data['exit_short_condition'].iloc[i]: trades.append(('exit_short', data.index[i])) return trades