gogo2/NN/utils/data_interface.py
Dobromir Popov 8981ad0691 wip
2025-03-31 02:22:51 +03:00

494 lines
19 KiB
Python

"""
Data Interface for Neural Network Trading System
This module provides functionality to fetch, process, and prepare data for the neural network models.
"""
import os
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json
import pickle
from sklearn.preprocessing import MinMaxScaler
import sys
# Add project root to sys.path
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if project_root not in sys.path:
sys.path.append(project_root)
# Import BinanceHistoricalData from the root module
from realtime import BinanceHistoricalData
logger = logging.getLogger(__name__)
class DataInterface:
"""
Enhanced Data Interface supporting:
- Multiple trading pairs (up to 3)
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
- Technical indicators (up to 20)
- Cross-timeframe normalization
- Real-time tick streaming
"""
def __init__(self, symbol=None, timeframes=None, data_dir="NN/data"):
"""
Initialize the data interface.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
data_dir (str): Directory to store/load datasets
"""
self.symbol = symbol
self.timeframes = timeframes or ['1h', '4h', '1d']
self.data_dir = data_dir
self.scalers = {} # Store scalers for each timeframe
# Initialize the historical data fetcher
self.historical_data = BinanceHistoricalData()
# Create data directory if it doesn't exist
os.makedirs(self.data_dir, exist_ok=True)
# Initialize empty dataframes for each timeframe
self.dataframes = {tf: None for tf in self.timeframes}
logger.info(f"DataInterface initialized for {symbol} with timeframes {timeframes}")
def get_historical_data(self, timeframe='1h', n_candles=1000, use_cache=True):
"""
Fetch historical price data for a given timeframe.
Args:
timeframe (str): Timeframe to fetch data for
n_candles (int): Number of candles to fetch
use_cache (bool): Whether to use cached data if available
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Map timeframe string to seconds for BinanceHistoricalData
timeframe_to_seconds = {
'1s': 1,
'1m': 60,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
interval_seconds = timeframe_to_seconds.get(timeframe, 3600) # Default to 1h if not found
try:
# Fetch data using BinanceHistoricalData
df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=n_candles
)
if not df.empty:
logger.info(f"Using data for {self.symbol} {timeframe} ({len(df)} candles)")
self.dataframes[timeframe] = df
return df
else:
logger.error(f"No data available for {self.symbol} {timeframe}")
return None
except Exception as e:
logger.error(f"Error fetching data for {self.symbol} {timeframe}: {str(e)}")
return None
def prepare_nn_input(self, timeframes=None, n_candles=500, window_size=20):
"""
Prepare input data for neural network models.
Args:
timeframes (list): List of timeframes to use
n_candles (int): Number of candles to fetch for each timeframe
window_size (int): Size of the sliding window for feature creation
Returns:
tuple: (X, y, timestamps) where:
X is the input features array with shape (n_samples, window_size, n_features)
y is the target array with shape (n_samples,)
timestamps is an array of timestamps for each sample
"""
if timeframes is None:
timeframes = self.timeframes
# Get data for all requested timeframes
dfs = {}
min_length = float('inf')
for tf in timeframes:
# For 1s timeframe, we need more data points
tf_candles = n_candles * 60 if tf == '1s' else n_candles
df = self.get_historical_data(timeframe=tf, n_candles=tf_candles)
if df is not None and not df.empty:
dfs[tf] = df
# Keep track of minimum length across all timeframes
min_length = min(min_length, len(df))
if not dfs:
logger.error("No data available for feature creation")
return None, None, None
# Align all dataframes to the same length
for tf in dfs:
dfs[tf] = dfs[tf].tail(min_length)
# Create features for each timeframe
features = []
targets = None
timestamps = None
for tf in timeframes:
if tf in dfs:
X, y, ts = self._create_features(dfs[tf], window_size)
if X is not None and y is not None:
features.append(X)
if targets is None: # Only need targets from one timeframe
targets = y
timestamps = ts
if not features or targets is None:
logger.error("Failed to create features for any timeframe")
return None, None, None
# Ensure all feature arrays have the same length
min_samples = min(f.shape[0] for f in features)
features = [f[-min_samples:] for f in features]
targets = targets[-min_samples:]
timestamps = timestamps[-min_samples:]
# Stack features from all timeframes
X = np.concatenate([f.reshape(min_samples, window_size, -1) for f in features], axis=2)
# Validate data
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
logger.error("Generated features contain NaN or infinite values")
return None, None, None
logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {targets.shape}")
return X, targets, timestamps
def _create_features(self, df, window_size):
"""
Create features from OHLCV data using a sliding window.
Args:
df (pd.DataFrame): DataFrame with OHLCV data
window_size (int): Size of the sliding window
Returns:
tuple: (X, y, timestamps) where:
X is the feature array
y is the target array
timestamps is the array of timestamps
"""
if len(df) < window_size + 1:
logger.error(f"Not enough data for feature creation (need {window_size + 1}, got {len(df)})")
return None, None, None
# Extract OHLCV data
data = df[['open', 'high', 'low', 'close', 'volume']].values
timestamps = df['timestamp'].values
# Create sliding windows
X = np.array([data[i:i+window_size] for i in range(len(data)-window_size)])
# Create targets (next candle's movement: 0=down, 1=neutral, 2=up)
next_close = data[window_size:, 3] # Close prices
curr_close = data[window_size-1:-1, 3]
price_changes = (next_close - curr_close) / curr_close
# Define thresholds for price movement classification
threshold = 0.001 # 0.1% threshold
y = np.zeros(len(price_changes), dtype=int)
y[price_changes > threshold] = 2 # Up
y[(price_changes >= -threshold) & (price_changes <= threshold)] = 1 # Neutral
logger.info(f"Created features - X shape: {X.shape}, y shape: {y.shape}")
return X, y, timestamps[window_size:]
def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20):
"""
Generate and save a training dataset for neural network models.
Args:
timeframes (list): List of timeframes to use
n_candles (int): Number of candles to fetch for each timeframe
window_size (int): Size of the sliding window for feature creation
Returns:
dict: Dictionary of dataset file paths
"""
if timeframes is None:
timeframes = self.timeframes
# Prepare inputs
X, y, timestamps = self.prepare_nn_input(timeframes, n_candles, window_size)
if X is None or y is None:
logger.error("Failed to prepare input data for dataset")
return None
# Prepare output paths
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
dataset_name = f"{self.symbol.replace('/', '_')}_{'_'.join(timeframes)}_{timestamp_str}"
X_path = os.path.join(self.data_dir, f"{dataset_name}_X.npy")
y_path = os.path.join(self.data_dir, f"{dataset_name}_y.npy")
timestamps_path = os.path.join(self.data_dir, f"{dataset_name}_timestamps.npy")
metadata_path = os.path.join(self.data_dir, f"{dataset_name}_metadata.json")
# Save arrays
np.save(X_path, X)
np.save(y_path, y)
np.save(timestamps_path, timestamps)
# Save metadata
metadata = {
'symbol': self.symbol,
'timeframes': timeframes,
'window_size': window_size,
'n_samples': len(X),
'feature_shape': X.shape[1:],
'created_at': datetime.now().isoformat(),
'dataset_name': dataset_name
}
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
# Save scalers
scaler_path = os.path.join(self.data_dir, f"{dataset_name}_scalers.pkl")
with open(scaler_path, 'wb') as f:
pickle.dump(self.scalers, f)
# Return dataset info
dataset_info = {
'X_path': X_path,
'y_path': y_path,
'timestamps_path': timestamps_path,
'metadata_path': metadata_path,
'scaler_path': scaler_path
}
logger.info(f"Dataset generated and saved: {dataset_name}")
return dataset_info
def get_feature_count(self):
"""
Calculate total number of features across all timeframes.
Returns:
int: Total number of features (5 features per timeframe)
"""
return len(self.timeframes) * 5 # OHLCV features for each timeframe
def calculate_pnl(self, predictions, actual_prices, position_size=1.0):
"""
Calculate PnL and win rates based on predictions and actual price movements.
Args:
predictions: Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) or probabilities
actual_prices: Array of actual close prices
position_size: Position size for each trade
Returns:
tuple: (pnl, win_rate, trades) where:
pnl is the total profit and loss
win_rate is the ratio of winning trades
trades is a list of trade dictionaries
"""
# Ensure we have enough prices for the predictions
if len(actual_prices) <= 1:
logger.error("Not enough price data for PnL calculation")
return 0.0, 0.0, []
# Adjust predictions length to match available price data
n_prices = len(actual_prices) - 1 # We need current and next price for each prediction
if len(predictions) > n_prices:
predictions = predictions[:n_prices]
elif len(predictions) < n_prices:
n_prices = len(predictions)
actual_prices = actual_prices[:n_prices + 1] # +1 to include the next price
pnl = 0.0
trades = 0
wins = 0
trade_history = []
for i in range(len(predictions)):
pred = predictions[i]
current_price = actual_prices[i]
next_price = actual_prices[i + 1]
# Calculate price change percentage
price_change = (next_price - current_price) / current_price
# Calculate PnL based on prediction
if pred == 2: # Buy
trade_pnl = price_change * position_size
trades += 1
if trade_pnl > 0:
wins += 1
trade_history.append({
'type': 'buy',
'price': current_price,
'pnl': trade_pnl,
'timestamp': self.dataframes[self.timeframes[0]]['timestamp'].iloc[i] if self.dataframes[self.timeframes[0]] is not None else None
})
elif pred == 0: # Sell
trade_pnl = -price_change * position_size
trades += 1
if trade_pnl > 0:
wins += 1
trade_history.append({
'type': 'sell',
'price': current_price,
'pnl': trade_pnl,
'timestamp': self.dataframes[self.timeframes[0]]['timestamp'].iloc[i] if self.dataframes[self.timeframes[0]] is not None else None
})
pnl += trade_pnl if pred in [0, 2] else 0
win_rate = wins / trades if trades > 0 else 0.0
return pnl, win_rate, trade_history
def get_future_prices(self, prices, n_candles=3):
"""
Extract future prices for retrospective training.
Args:
prices (np.ndarray): Array of prices
n_candles (int): Number of future candles to look at
Returns:
np.ndarray: Future prices array (1D array)
"""
if prices is None or len(prices) < n_candles + 1:
return None
# Convert to numpy array if it's not already
prices_np = np.array(prices).flatten() if not isinstance(prices, np.ndarray) else prices.flatten()
# For each price point, get the maximum price in the next n_candles
future_prices = np.zeros(len(prices_np))
for i in range(len(prices_np) - n_candles):
# Get the next n candles
next_candles = prices_np[i+1:i+n_candles+1]
# Use the maximum price as the future price
future_prices[i] = np.max(next_candles)
# For the last n_candles points, use the last available price
future_prices[-n_candles:] = prices_np[-1]
return future_prices.flatten() # Ensure it's a 1D array
def prepare_training_data(self, refresh=False, refresh_interval=300):
"""
Prepare data for training, including splitting into train/validation sets.
Args:
refresh (bool): Whether to refresh the data cache
refresh_interval (int): Interval in seconds to refresh data
Returns:
tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices)
"""
current_time = datetime.now()
# Check if we should refresh the data
if refresh or not hasattr(self, 'last_refresh_time') or \
(current_time - self.last_refresh_time).total_seconds() > refresh_interval:
logger.info("Refreshing training data...")
self.last_refresh_time = current_time
else:
# Use cached data
if hasattr(self, 'cached_train_data'):
return self.cached_train_data
# Prepare input data
X, y, _ = self.prepare_nn_input()
if X is None:
return None, None, None, None, None, None
# Get price data for PnL calculation
raw_prices = []
for tf in self.timeframes:
if tf in self.dataframes and self.dataframes[tf] is not None:
# Get the close prices for the same period as X
prices = self.dataframes[tf]['close'].values[-len(X):]
if len(prices) == len(X):
raw_prices = prices
break
if len(raw_prices) != len(X):
raw_prices = np.zeros(len(X)) # Fallback if no prices available
# Split data into training and validation sets (80/20)
split_idx = int(len(X) * 0.8)
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]
train_prices, val_prices = raw_prices[:split_idx], raw_prices[split_idx:]
# Cache the data
self.cached_train_data = (X_train, y_train, X_val, y_val, train_prices, val_prices)
return X_train, y_train, X_val, y_val, train_prices, val_prices
def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20):
"""
Prepare a single input sample from the most recent data for real-time inference.
Args:
timeframe (str): Timeframe to use
n_candles (int): Number of recent candles to fetch
window_size (int): Size of the sliding window
Returns:
tuple: (X, timestamp) where:
X is the input features array with shape (1, window_size, n_features)
timestamp is the timestamp of the most recent candle
"""
# Get recent data
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=False)
if df is None or len(df) < window_size:
logger.error(f"Not enough data for inference (need at least {window_size} candles)")
return None, None
# Extract features from the most recent window
ohlcv = df[['open', 'high', 'low', 'close', 'volume']].tail(window_size).values
# Scale the data
if timeframe in self.scalers:
# Use existing scaler
scaler = self.scalers[timeframe]
else:
# Create new scaler
scaler = MinMaxScaler()
# Fit on all available data
all_data = df[['open', 'high', 'low', 'close', 'volume']].values
scaler.fit(all_data)
self.scalers[timeframe] = scaler
ohlcv_scaled = scaler.transform(ohlcv)
# Reshape to (1, window_size, n_features)
X = np.array([ohlcv_scaled])
# Get timestamp of the most recent candle
timestamp = df['timestamp'].iloc[-1]
return X, timestamp