2024-06-12 15:46:47 +03:00

90 lines
4.4 KiB
Python

import numpy as np
import pandas as pd
class NNFXStrategy:
def __init__(self, ssl_period=140, t3_fast_length=40, t3_slow_length=90,
adx_len=100, di_len=110, adx_ema_length=80,
atr_length=120, atr_stop_loss_multiplier=10, atr_take_profit_multiplier=20):
self.ssl_period = ssl_period
self.t3_fast_length = t3_fast_length
self.t3_slow_length = t3_slow_length
self.adx_len = adx_len
self.di_len = di_len
self.adx_ema_length = adx_ema_length
self.atr_length = atr_length
self.atr_stop_loss_multiplier = atr_stop_loss_multiplier
self.atr_take_profit_multiplier = atr_take_profit_multiplier
def sma(self, series, period):
return series.rolling(window=period).mean()
def t3(self, series, length, b=0.7):
e1 = series.ewm(span=length).mean()
e2 = e1.ewm(span=length).mean()
e3 = e2.ewm(span=length).mean()
e4 = e3.ewm(span=length).mean()
e5 = e4.ewm(span=length).mean()
e6 = e5.ewm(span=length).mean()
c1 = -b * b * b
c2 = 3 * b * b + 3 * b * b * b
c3 = -6 * b * b - 3 * b - 3 * b * b * b
c4 = 1 + 3 * b + b * b * b + 3 * b * b
return c1 * e6 + c2 * e5 + c3 * e4 + c4 * e3
def adx(self, high, low, close, di_len, adx_len):
plus_dm = high.diff().clip(lower=0)
minus_dm = low.diff().clip(upper=0).abs()
tr = np.maximum.reduce([high - low, (high - close.shift()).abs(), (low - close.shift()).abs()])
atr = tr.rolling(window=di_len).mean()
plus_di = 100 * (plus_dm.rolling(window=di_len).mean() / atr)
minus_di = 100 * (minus_dm.rolling(window=di_len).mean() / atr)
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di)
adx = dx.rolling(window=adx_len).mean()
adx_ema = adx.ewm(span=self.adx_ema_length).mean()
return adx, adx_ema
def atr(self, high, low, close, atr_length):
tr = np.maximum.reduce([high - low, (high - close.shift()).abs(), (low - close.shift()).abs()])
return tr.rolling(window=atr_length).mean()
def generate_signals(self, data):
data['sma_high'] = self.sma(data['high'], self.ssl_period)
data['sma_low'] = self.sma(data['low'], self.ssl_period)
data['hlv'] = np.where(data['close'] > data['sma_high'], 1, np.where(data['close'] < data['sma_low'], -1, np.nan))
data['hlv'] = data['hlv'].ffill().fillna(0)
data['ssl_down'] = np.where(data['hlv'] < 0, data['sma_high'], data['sma_low'])
data['ssl_up'] = np.where(data['hlv'] < 0, data['sma_low'], data['sma_high'])
data['t3_fast'] = self.t3(data['close'], self.t3_fast_length)
data['t3_slow'] = self.t3(data['close'], self.t3_slow_length)
data['adx'], data['adx_ema'] = self.adx(data['high'], data['low'], data['close'], self.di_len, self.adx_len)
data['atr'] = self.atr(data['high'], data['low'], data['close'], self.atr_length)
data['long_condition'] = (data['t3_fast'] > data['t3_slow']) & (data['adx'] > data['adx_ema']) & (data['hlv'] > 0)
data['short_condition'] = (data['t3_fast'] < data['t3_slow']) & (data['adx'] > data['adx_ema']) & (data['hlv'] < 0)
data['exit_long_condition'] = (data['t3_fast'] < data['t3_slow']) | (data['hlv'] < 0)
data['exit_short_condition'] = (data['t3_fast'] > data['t3_slow']) | (data['hlv'] > 0)
return data
def apply_strategy(self, data):
data = self.generate_signals(data)
trades = []
for i in range(1, len(data)):
if data['long_condition'].iloc[i]:
stop_loss = data['close'].iloc[i] - self.atr_stop_loss_multiplier * data['atr'].iloc[i]
take_profit = data['close'].iloc[i] + self.atr_take_profit_multiplier * data['atr'].iloc[i]
trades.append(('long', data.index[i], stop_loss, take_profit))
elif data['short_condition'].iloc[i]:
stop_loss = data['close'].iloc[i] + self.atr_stop_loss_multiplier * data['atr'].iloc[i]
take_profit = data['close'].iloc[i] - self.atr_take_profit_multiplier * data['atr'].iloc[i]
trades.append(('short', data.index[i], stop_loss, take_profit))
elif data['exit_long_condition'].iloc[i]:
trades.append(('exit_long', data.index[i]))
elif data['exit_short_condition'].iloc[i]:
trades.append(('exit_short', data.index[i]))
return trades