705 lines
27 KiB
Python
705 lines
27 KiB
Python
"""
|
|
Data Interface for Neural Network Trading System
|
|
|
|
This module provides functionality to fetch, process, and prepare data for the neural network models.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import pickle
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
import sys
|
|
import ta
|
|
|
|
# Add project root to sys.path
|
|
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
# Import BinanceHistoricalData from the root module
|
|
from dataprovider_realtime import BinanceHistoricalData
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DataInterface:
|
|
"""
|
|
Enhanced Data Interface supporting:
|
|
- Multiple trading pairs (up to 3)
|
|
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
|
|
- Technical indicators (up to 20)
|
|
- Cross-timeframe normalization
|
|
- Real-time tick streaming
|
|
"""
|
|
|
|
def __init__(self, symbol=None, timeframes=None, data_dir="NN/data"):
|
|
"""
|
|
Initialize the data interface.
|
|
|
|
Args:
|
|
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
|
|
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
|
|
data_dir (str): Directory to store/load datasets
|
|
"""
|
|
self.symbol = symbol
|
|
self.timeframes = timeframes or ['1h', '4h', '1d']
|
|
self.data_dir = data_dir
|
|
self.scalers = {} # Store scalers for each timeframe
|
|
|
|
# Initialize the historical data fetcher
|
|
self.historical_data = BinanceHistoricalData()
|
|
|
|
# Create data directory if it doesn't exist
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
|
|
# Initialize empty dataframes for each timeframe
|
|
self.dataframes = {tf: None for tf in self.timeframes}
|
|
|
|
logger.info(f"DataInterface initialized for {symbol} with timeframes {timeframes}")
|
|
|
|
def get_historical_data(self, timeframe='1h', n_candles=1000, use_cache=True):
|
|
"""
|
|
Fetch historical price data for a given timeframe.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to fetch data for
|
|
n_candles (int): Number of candles to fetch
|
|
use_cache (bool): Whether to use cached data if available
|
|
|
|
Returns:
|
|
pd.DataFrame: DataFrame with OHLCV data
|
|
"""
|
|
# Map timeframe string to seconds for BinanceHistoricalData
|
|
timeframe_to_seconds = {
|
|
'1s': 1,
|
|
'1m': 60,
|
|
'5m': 300,
|
|
'15m': 900,
|
|
'30m': 1800,
|
|
'1h': 3600,
|
|
'4h': 14400,
|
|
'1d': 86400
|
|
}
|
|
|
|
interval_seconds = timeframe_to_seconds.get(timeframe, 3600) # Default to 1h if not found
|
|
|
|
try:
|
|
# Fetch data using BinanceHistoricalData
|
|
df = self.historical_data.get_historical_candles(
|
|
symbol=self.symbol,
|
|
interval_seconds=interval_seconds,
|
|
limit=n_candles
|
|
)
|
|
|
|
if not df.empty:
|
|
logger.info(f"Using data for {self.symbol} {timeframe} ({len(df)} candles)")
|
|
self.dataframes[timeframe] = df
|
|
return df
|
|
else:
|
|
logger.error(f"No data available for {self.symbol} {timeframe}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching data for {self.symbol} {timeframe}: {str(e)}")
|
|
return None
|
|
|
|
def prepare_nn_input(self, timeframes=None, n_candles=500, window_size=20):
|
|
"""
|
|
Prepare input data for neural network models.
|
|
|
|
Args:
|
|
timeframes (list): List of timeframes to use
|
|
n_candles (int): Number of candles to fetch for each timeframe
|
|
window_size (int): Size of the sliding window for feature creation
|
|
|
|
Returns:
|
|
tuple: (X, y, timestamps) where:
|
|
X is the input features array with shape (n_samples, window_size, n_features)
|
|
y is the target array with shape (n_samples,)
|
|
timestamps is an array of timestamps for each sample
|
|
"""
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
# Get data for all requested timeframes
|
|
dfs = {}
|
|
min_length = float('inf')
|
|
for tf in timeframes:
|
|
# For 1s timeframe, we need more data points
|
|
tf_candles = n_candles * 60 if tf == '1s' else n_candles
|
|
df = self.get_historical_data(timeframe=tf, n_candles=tf_candles)
|
|
if df is not None and not df.empty:
|
|
dfs[tf] = df
|
|
# Keep track of minimum length across all timeframes
|
|
min_length = min(min_length, len(df))
|
|
|
|
if not dfs:
|
|
logger.error("No data available for feature creation")
|
|
return None, None, None
|
|
|
|
# Align all dataframes to the same length
|
|
for tf in dfs:
|
|
dfs[tf] = dfs[tf].tail(min_length)
|
|
|
|
# Create features for each timeframe
|
|
features = []
|
|
targets = None
|
|
timestamps = None
|
|
|
|
for tf in timeframes:
|
|
if tf in dfs:
|
|
X, y, ts = self._create_features(dfs[tf], window_size)
|
|
if X is not None and y is not None:
|
|
features.append(X)
|
|
if targets is None: # Only need targets from one timeframe
|
|
targets = y
|
|
timestamps = ts
|
|
|
|
if not features or targets is None:
|
|
logger.error("Failed to create features for any timeframe")
|
|
return None, None, None
|
|
|
|
# Ensure all feature arrays have the same length
|
|
min_samples = min(f.shape[0] for f in features)
|
|
features = [f[-min_samples:] for f in features]
|
|
targets = targets[-min_samples:]
|
|
timestamps = timestamps[-min_samples:]
|
|
|
|
# Stack features from all timeframes
|
|
X = np.concatenate([f.reshape(min_samples, window_size, -1) for f in features], axis=2)
|
|
|
|
# Validate data
|
|
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
|
|
logger.error("Generated features contain NaN or infinite values")
|
|
return None, None, None
|
|
|
|
logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {targets.shape}")
|
|
return X, targets, timestamps
|
|
|
|
def _create_features(self, df, window_size):
|
|
"""
|
|
Create features from OHLCV data using a sliding window.
|
|
|
|
Args:
|
|
df (pd.DataFrame): DataFrame with OHLCV data
|
|
window_size (int): Size of the sliding window
|
|
|
|
Returns:
|
|
tuple: (X, y, timestamps) where:
|
|
X is the feature array
|
|
y is the target array
|
|
timestamps is the array of timestamps
|
|
"""
|
|
if len(df) < window_size + 1:
|
|
logger.error(f"Not enough data for feature creation (need {window_size + 1}, got {len(df)})")
|
|
return None, None, None
|
|
|
|
# Extract OHLCV data
|
|
data = df[['open', 'high', 'low', 'close', 'volume']].values
|
|
timestamps = df['timestamp'].values
|
|
|
|
# Create sliding windows
|
|
X = np.array([data[i:i+window_size] for i in range(len(data)-window_size)])
|
|
|
|
# Create targets (next candle's movement: 0=down, 1=neutral, 2=up)
|
|
next_close = data[window_size:, 3] # Close prices
|
|
curr_close = data[window_size-1:-1, 3]
|
|
price_changes = (next_close - curr_close) / curr_close
|
|
|
|
# Define thresholds for price movement classification
|
|
threshold = 0.0005 # 0.05% threshold - smaller to encourage more signals
|
|
y = np.zeros(len(price_changes), dtype=int)
|
|
y[price_changes > threshold] = 2 # Up
|
|
y[price_changes < -threshold] = 0 # Down
|
|
y[(price_changes >= -threshold) & (price_changes <= threshold)] = 1 # Neutral
|
|
|
|
# Log the target distribution to understand our data better
|
|
sell_count = np.sum(y == 0)
|
|
hold_count = np.sum(y == 1)
|
|
buy_count = np.sum(y == 2)
|
|
total_count = len(y)
|
|
logger.info(f"Target distribution for {self.symbol} {self.timeframes[0]}: SELL: {sell_count} ({sell_count/total_count:.2%}), " +
|
|
f"HOLD: {hold_count} ({hold_count/total_count:.2%}), BUY: {buy_count} ({buy_count/total_count:.2%})")
|
|
|
|
logger.info(f"Created features - X shape: {X.shape}, y shape: {y.shape}")
|
|
return X, y, timestamps[window_size:]
|
|
|
|
def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20):
|
|
"""
|
|
Generate and save a training dataset for neural network models.
|
|
|
|
Args:
|
|
timeframes (list): List of timeframes to use
|
|
n_candles (int): Number of candles to fetch for each timeframe
|
|
window_size (int): Size of the sliding window for feature creation
|
|
|
|
Returns:
|
|
dict: Dictionary of dataset file paths
|
|
"""
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
# Prepare inputs
|
|
X, y, timestamps = self.prepare_nn_input(timeframes, n_candles, window_size)
|
|
|
|
if X is None or y is None:
|
|
logger.error("Failed to prepare input data for dataset")
|
|
return None
|
|
|
|
# Prepare output paths
|
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
dataset_name = f"{self.symbol.replace('/', '_')}_{'_'.join(timeframes)}_{timestamp_str}"
|
|
|
|
X_path = os.path.join(self.data_dir, f"{dataset_name}_X.npy")
|
|
y_path = os.path.join(self.data_dir, f"{dataset_name}_y.npy")
|
|
timestamps_path = os.path.join(self.data_dir, f"{dataset_name}_timestamps.npy")
|
|
metadata_path = os.path.join(self.data_dir, f"{dataset_name}_metadata.json")
|
|
|
|
# Save arrays
|
|
np.save(X_path, X)
|
|
np.save(y_path, y)
|
|
np.save(timestamps_path, timestamps)
|
|
|
|
# Save metadata
|
|
metadata = {
|
|
'symbol': self.symbol,
|
|
'timeframes': timeframes,
|
|
'window_size': window_size,
|
|
'n_samples': len(X),
|
|
'feature_shape': X.shape[1:],
|
|
'created_at': datetime.now().isoformat(),
|
|
'dataset_name': dataset_name
|
|
}
|
|
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
# Save scalers
|
|
scaler_path = os.path.join(self.data_dir, f"{dataset_name}_scalers.pkl")
|
|
with open(scaler_path, 'wb') as f:
|
|
pickle.dump(self.scalers, f)
|
|
|
|
# Return dataset info
|
|
dataset_info = {
|
|
'X_path': X_path,
|
|
'y_path': y_path,
|
|
'timestamps_path': timestamps_path,
|
|
'metadata_path': metadata_path,
|
|
'scaler_path': scaler_path
|
|
}
|
|
|
|
logger.info(f"Dataset generated and saved: {dataset_name}")
|
|
return dataset_info
|
|
|
|
def get_feature_count(self):
|
|
"""
|
|
Calculate total number of features across all timeframes.
|
|
|
|
Returns:
|
|
int: Total number of features (5 features per timeframe)
|
|
"""
|
|
return len(self.timeframes) * 5 # OHLCV features for each timeframe
|
|
|
|
def get_features(self, timeframe, n_candles=1000):
|
|
"""
|
|
Get feature data with technical indicators for a specific timeframe.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to get features for ('1m', '5m', etc.)
|
|
n_candles (int): Number of candles to get
|
|
|
|
Returns:
|
|
np.ndarray: Array of feature data including technical indicators
|
|
and the close price as the last column
|
|
"""
|
|
# Get historical data
|
|
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles)
|
|
|
|
if df is None or df.empty:
|
|
logger.error(f"No data available for {self.symbol} {timeframe}")
|
|
return None
|
|
|
|
# Add technical indicators
|
|
df = self.add_technical_indicators(df)
|
|
|
|
# Drop NaN values that might have been introduced by indicators
|
|
df = df.dropna()
|
|
|
|
# Extract features (all columns except timestamp)
|
|
features = df.drop('timestamp', axis=1).values
|
|
|
|
logger.info(f"Prepared {len(features)} {timeframe} feature rows with {features.shape[1]} features")
|
|
return features
|
|
|
|
def add_technical_indicators(self, df):
|
|
"""
|
|
Add technical indicators to the dataframe.
|
|
|
|
Args:
|
|
df (pd.DataFrame): DataFrame with OHLCV data
|
|
|
|
Returns:
|
|
pd.DataFrame: DataFrame with added technical indicators
|
|
"""
|
|
# Make a copy to avoid modifying the original
|
|
df_copy = df.copy()
|
|
|
|
# Basic price indicators
|
|
df_copy['returns'] = df_copy['close'].pct_change()
|
|
df_copy['log_returns'] = np.log(df_copy['close']/df_copy['close'].shift(1))
|
|
|
|
# Moving Averages
|
|
df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7)
|
|
df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25)
|
|
df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99)
|
|
|
|
# MACD
|
|
macd = ta.trend.MACD(df_copy['close'])
|
|
df_copy['macd'] = macd.macd()
|
|
df_copy['macd_signal'] = macd.macd_signal()
|
|
df_copy['macd_diff'] = macd.macd_diff()
|
|
|
|
# RSI
|
|
df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14)
|
|
|
|
# Bollinger Bands
|
|
bollinger = ta.volatility.BollingerBands(df_copy['close'])
|
|
df_copy['bb_high'] = bollinger.bollinger_hband()
|
|
df_copy['bb_low'] = bollinger.bollinger_lband()
|
|
df_copy['bb_pct'] = bollinger.bollinger_pband()
|
|
|
|
return df_copy
|
|
|
|
def calculate_pnl(self, predictions, actual_prices, position_size=1.0, fee_rate=0.0002):
|
|
"""
|
|
Robust PnL calculator that handles:
|
|
- Action predictions (0=SELL, 1=HOLD, 2=BUY)
|
|
- Probability predictions (array of [sell_prob, hold_prob, buy_prob])
|
|
- Single price array or OHLC data
|
|
|
|
Args:
|
|
predictions: Array of predicted actions or probabilities
|
|
actual_prices: Array of actual prices (can be 1D or 2D OHLC format)
|
|
position_size: Position size multiplier
|
|
fee_rate: Trading fee rate (default: 0.0002 for 0.02% per trade)
|
|
|
|
Returns:
|
|
tuple: (total_pnl, win_rate, trades)
|
|
"""
|
|
# Convert inputs to numpy arrays if they aren't already
|
|
try:
|
|
predictions = np.array(predictions)
|
|
actual_prices = np.array(actual_prices)
|
|
except Exception as e:
|
|
logger.error(f"Error converting inputs: {str(e)}")
|
|
return 0.0, 0.0, []
|
|
|
|
# Validate input shapes
|
|
if len(predictions.shape) > 2 or len(actual_prices.shape) > 2:
|
|
logger.error("Invalid input dimensions")
|
|
return 0.0, 0.0, []
|
|
|
|
# Convert OHLC data to close prices if needed
|
|
if len(actual_prices.shape) == 2 and actual_prices.shape[1] >= 4:
|
|
prices = actual_prices[:, 3] # Use close prices
|
|
else:
|
|
prices = actual_prices
|
|
|
|
# Handle case where prices is 2D with single column
|
|
if len(prices.shape) == 2 and prices.shape[1] == 1:
|
|
prices = prices.flatten()
|
|
|
|
# Convert probabilities to actions if needed
|
|
if len(predictions.shape) == 2 and predictions.shape[1] > 1:
|
|
actions = np.argmax(predictions, axis=1)
|
|
else:
|
|
actions = predictions
|
|
|
|
# Ensure we have enough prices
|
|
if len(prices) < 2:
|
|
logger.error("Not enough price data")
|
|
return 0.0, 0.0, []
|
|
|
|
# Trim to matching length
|
|
min_length = min(len(actions), len(prices)-1)
|
|
actions = actions[:min_length]
|
|
prices = prices[:min_length+1]
|
|
|
|
pnl = 0.0
|
|
wins = 0
|
|
trades = []
|
|
|
|
for i in range(min_length):
|
|
current_price = prices[i]
|
|
next_price = prices[i+1]
|
|
action = actions[i]
|
|
|
|
# Skip HOLD actions
|
|
if action == 1:
|
|
continue
|
|
|
|
price_change = (next_price - current_price) / current_price
|
|
|
|
if action == 2: # BUY
|
|
# Calculate raw PnL
|
|
raw_pnl = price_change * position_size
|
|
|
|
# Calculate fees (entry and exit)
|
|
entry_fee = position_size * fee_rate
|
|
exit_fee = position_size * (1 + price_change) * fee_rate
|
|
total_fees = entry_fee + exit_fee
|
|
|
|
# Net PnL after fees
|
|
trade_pnl = raw_pnl - total_fees
|
|
|
|
trade_type = 'BUY'
|
|
is_win = trade_pnl > 0
|
|
elif action == 0: # SELL
|
|
# Calculate raw PnL
|
|
raw_pnl = -price_change * position_size
|
|
|
|
# Calculate fees (entry and exit)
|
|
entry_fee = position_size * fee_rate
|
|
exit_fee = position_size * (1 - price_change) * fee_rate
|
|
total_fees = entry_fee + exit_fee
|
|
|
|
# Net PnL after fees
|
|
trade_pnl = raw_pnl - total_fees
|
|
|
|
trade_type = 'SELL'
|
|
is_win = trade_pnl > 0
|
|
else:
|
|
continue # Invalid action
|
|
|
|
pnl += trade_pnl
|
|
wins += int(is_win)
|
|
|
|
# Track trade details
|
|
trades.append({
|
|
'type': trade_type,
|
|
'entry': current_price,
|
|
'exit': next_price,
|
|
'pnl': trade_pnl,
|
|
'raw_pnl': price_change * position_size if trade_type == 'BUY' else -price_change * position_size,
|
|
'fees': total_fees,
|
|
'win': is_win,
|
|
'duration': 1 # In number of candles
|
|
})
|
|
|
|
win_rate = wins / len(trades) if trades else 0.0
|
|
|
|
# Add timestamps to trades if available
|
|
if hasattr(self, 'dataframes') and self.timeframes and self.timeframes[0] in self.dataframes:
|
|
df = self.dataframes[self.timeframes[0]]
|
|
if df is not None and 'timestamp' in df.columns:
|
|
for i, trade in enumerate(trades[:len(df)]):
|
|
trade['timestamp'] = df['timestamp'].iloc[i]
|
|
|
|
return pnl, win_rate, trades
|
|
|
|
def get_future_prices(self, prices, n_candles=3):
|
|
"""
|
|
Extract future prices for retrospective training.
|
|
|
|
Args:
|
|
prices (np.ndarray): Array of prices
|
|
n_candles (int): Number of future candles to look at
|
|
|
|
Returns:
|
|
np.ndarray: Future prices array (1D array)
|
|
"""
|
|
if prices is None or len(prices) < n_candles + 1:
|
|
return None
|
|
|
|
# Convert to numpy array if it's not already
|
|
prices_np = np.array(prices).flatten() if not isinstance(prices, np.ndarray) else prices.flatten()
|
|
|
|
# For each price point, get the maximum price in the next n_candles
|
|
future_prices = np.zeros(len(prices_np))
|
|
|
|
for i in range(len(prices_np) - n_candles):
|
|
# Get the next n candles
|
|
next_candles = prices_np[i+1:i+n_candles+1]
|
|
# Use the maximum price as the future price
|
|
future_prices[i] = np.max(next_candles)
|
|
|
|
# For the last n_candles points, use the last available price
|
|
future_prices[-n_candles:] = prices_np[-1]
|
|
|
|
return future_prices.flatten() # Ensure it's a 1D array
|
|
|
|
def prepare_training_data(self, refresh=False, refresh_interval=300):
|
|
"""
|
|
Prepare data for training, including splitting into train/validation sets.
|
|
|
|
Args:
|
|
refresh (bool): Whether to refresh the data cache
|
|
refresh_interval (int): Interval in seconds to refresh data
|
|
|
|
Returns:
|
|
tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices)
|
|
"""
|
|
current_time = datetime.now()
|
|
|
|
# Check if we should refresh the data
|
|
if refresh or not hasattr(self, 'last_refresh_time') or \
|
|
(current_time - self.last_refresh_time).total_seconds() > refresh_interval:
|
|
logger.info("Refreshing training data...")
|
|
self.last_refresh_time = current_time
|
|
else:
|
|
# Use cached data
|
|
if hasattr(self, 'cached_train_data'):
|
|
return self.cached_train_data
|
|
|
|
# Prepare input data
|
|
X, y, _ = self.prepare_nn_input()
|
|
if X is None:
|
|
return None, None, None, None, None, None
|
|
|
|
# Get price data for PnL calculation
|
|
raw_prices = []
|
|
for tf in self.timeframes:
|
|
if tf in self.dataframes and self.dataframes[tf] is not None:
|
|
# Get the close prices for the same period as X
|
|
prices = self.dataframes[tf]['close'].values[-len(X):]
|
|
if len(prices) == len(X):
|
|
raw_prices = prices
|
|
break
|
|
|
|
if len(raw_prices) != len(X):
|
|
raw_prices = np.zeros(len(X)) # Fallback if no prices available
|
|
|
|
# Split data into training and validation sets (80/20)
|
|
split_idx = int(len(X) * 0.8)
|
|
X_train, X_val = X[:split_idx], X[split_idx:]
|
|
y_train, y_val = y[:split_idx], y[split_idx:]
|
|
train_prices, val_prices = raw_prices[:split_idx], raw_prices[split_idx:]
|
|
|
|
# Cache the data
|
|
self.cached_train_data = (X_train, y_train, X_val, y_val, train_prices, val_prices)
|
|
|
|
return X_train, y_train, X_val, y_val, train_prices, val_prices
|
|
|
|
def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20):
|
|
"""
|
|
Prepare a single input sample from the most recent data for real-time inference.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to use
|
|
n_candles (int): Number of recent candles to fetch
|
|
window_size (int): Size of the sliding window
|
|
|
|
Returns:
|
|
tuple: (X, timestamp) where:
|
|
X is the input features array with shape (1, window_size, n_features)
|
|
timestamp is the timestamp of the most recent candle
|
|
"""
|
|
# Get recent data
|
|
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=False)
|
|
|
|
if df is None or len(df) < window_size:
|
|
logger.error(f"Not enough data for inference (need at least {window_size} candles)")
|
|
return None, None
|
|
|
|
# Extract features from the most recent window
|
|
ohlcv = df[['open', 'high', 'low', 'close', 'volume']].tail(window_size).values
|
|
|
|
# Scale the data
|
|
if timeframe in self.scalers:
|
|
# Use existing scaler
|
|
scaler = self.scalers[timeframe]
|
|
else:
|
|
# Create new scaler
|
|
scaler = MinMaxScaler()
|
|
# Fit on all available data
|
|
all_data = df[['open', 'high', 'low', 'close', 'volume']].values
|
|
scaler.fit(all_data)
|
|
self.scalers[timeframe] = scaler
|
|
|
|
ohlcv_scaled = scaler.transform(ohlcv)
|
|
|
|
# Reshape to (1, window_size, n_features)
|
|
X = np.array([ohlcv_scaled])
|
|
|
|
# Get timestamp of the most recent candle
|
|
timestamp = df['timestamp'].iloc[-1]
|
|
|
|
return X, timestamp
|
|
|
|
def get_training_data(self, timeframe='1m', n_candles=5000):
|
|
"""
|
|
Get a consolidated dataframe for RL training with OHLCV and technical indicators
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to use
|
|
n_candles (int): Number of candles to fetch
|
|
|
|
Returns:
|
|
DataFrame: Combined dataframe with price data and technical indicators
|
|
"""
|
|
# Get historical data
|
|
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=True)
|
|
|
|
if df is None or len(df) < 100: # Minimum required for indicators
|
|
logger.error(f"Not enough data for RL training (need at least 100 candles)")
|
|
return None
|
|
|
|
# Calculate technical indicators
|
|
try:
|
|
# Add RSI (14)
|
|
df['rsi'] = ta.momentum.rsi(df['close'], window=14)
|
|
|
|
# Add MACD
|
|
macd = ta.trend.MACD(df['close'])
|
|
df['macd'] = macd.macd()
|
|
df['macd_signal'] = macd.macd_signal()
|
|
df['macd_hist'] = macd.macd_diff()
|
|
|
|
# Add Bollinger Bands
|
|
bbands = ta.volatility.BollingerBands(df['close'])
|
|
df['bb_upper'] = bbands.bollinger_hband()
|
|
df['bb_middle'] = bbands.bollinger_mavg()
|
|
df['bb_lower'] = bbands.bollinger_lband()
|
|
|
|
# Add ATR (Average True Range)
|
|
df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'], window=14)
|
|
|
|
# Add moving averages
|
|
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
|
|
df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50)
|
|
df['ema_20'] = ta.trend.ema_indicator(df['close'], window=20)
|
|
|
|
# Add OBV (On-Balance Volume)
|
|
df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume'])
|
|
|
|
# Add momentum indicators
|
|
df['mom'] = ta.momentum.roc(df['close'], window=10)
|
|
|
|
# Normalize price to previous close
|
|
df['close_norm'] = df['close'] / df['close'].shift(1) - 1
|
|
df['high_norm'] = df['high'] / df['close'].shift(1) - 1
|
|
df['low_norm'] = df['low'] / df['close'].shift(1) - 1
|
|
|
|
# Volatility features
|
|
df['volatility'] = df['high'] / df['low'] - 1
|
|
|
|
# Volume features
|
|
df['volume_norm'] = df['volume'] / df['volume'].rolling(20).mean()
|
|
|
|
# Calculate returns
|
|
df['returns_1'] = df['close'].pct_change(1)
|
|
df['returns_5'] = df['close'].pct_change(5)
|
|
df['returns_10'] = df['close'].pct_change(10)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating technical indicators: {str(e)}")
|
|
return None
|
|
|
|
# Drop NaN values
|
|
df = df.dropna()
|
|
|
|
return df
|