578 lines
22 KiB
Python
578 lines
22 KiB
Python
"""
|
|
Data Interface for Neural Network Trading System
|
|
|
|
This module provides functionality to fetch, process, and prepare data for the neural network models.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import numpy as np
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import pickle
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DataInterface:
|
|
"""
|
|
Enhanced Data Interface supporting:
|
|
- Multiple trading pairs (up to 3)
|
|
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
|
|
- Technical indicators (up to 20)
|
|
- Cross-timeframe normalization
|
|
- Real-time tick streaming
|
|
"""
|
|
|
|
def __init__(self, symbol=None, timeframes=None, data_dir="NN/data"):
|
|
"""
|
|
Initialize the data interface.
|
|
|
|
Args:
|
|
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
|
|
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
|
|
data_dir (str): Directory to store/load datasets
|
|
"""
|
|
self.symbol = symbol
|
|
self.timeframes = timeframes or ['1h', '4h', '1d']
|
|
self.data_dir = data_dir
|
|
self.scalers = {} # Store scalers for each timeframe
|
|
|
|
# Create data directory if it doesn't exist
|
|
os.makedirs(self.data_dir, exist_ok=True)
|
|
|
|
# Initialize empty dataframes for each timeframe
|
|
self.dataframes = {tf: None for tf in self.timeframes}
|
|
|
|
logger.info(f"DataInterface initialized for {symbol} with timeframes {timeframes}")
|
|
|
|
def get_historical_data(self, timeframe='1h', n_candles=1000, use_cache=True):
|
|
"""
|
|
Fetch historical price data for a given timeframe.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to fetch data for
|
|
n_candles (int): Number of candles to fetch
|
|
use_cache (bool): Whether to use cached data if available
|
|
|
|
Returns:
|
|
pd.DataFrame: DataFrame with OHLCV data
|
|
"""
|
|
cache_file = os.path.join(self.data_dir, f"{self.symbol.replace('/', '_')}_{timeframe}.csv")
|
|
|
|
# For 1s timeframe, always fetch fresh data
|
|
if timeframe == '1s':
|
|
use_cache = False
|
|
|
|
# Check if cached data exists and is recent
|
|
if use_cache and os.path.exists(cache_file):
|
|
try:
|
|
df = pd.read_csv(cache_file, parse_dates=['timestamp'])
|
|
# If we have enough data and it's recent, use it
|
|
if len(df) >= n_candles:
|
|
logger.info(f"Using cached data for {self.symbol} {timeframe} ({len(df)} candles)")
|
|
self.dataframes[timeframe] = df
|
|
return df.tail(n_candles)
|
|
except Exception as e:
|
|
logger.error(f"Error reading cached data: {str(e)}")
|
|
|
|
# If we get here, we need to fetch data
|
|
try:
|
|
logger.info(f"Fetching historical data for {self.symbol} {timeframe}")
|
|
|
|
# For 1s timeframe, we need more data points
|
|
if timeframe == '1s':
|
|
n_candles = min(n_candles * 60, 10000) # Up to 10k ticks
|
|
|
|
# Placeholder for real data fetching
|
|
self._fetch_data_from_exchange(timeframe, n_candles)
|
|
|
|
# Save to cache (except for 1s timeframe)
|
|
if self.dataframes[timeframe] is not None and timeframe != '1s':
|
|
self.dataframes[timeframe].to_csv(cache_file, index=False)
|
|
return self.dataframes[timeframe]
|
|
else:
|
|
# Create dummy data as fallback
|
|
logger.warning(f"Could not fetch data for {self.symbol} {timeframe}, using dummy data")
|
|
df = self._create_dummy_data(timeframe, n_candles)
|
|
self.dataframes[timeframe] = df
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Error fetching data: {str(e)}")
|
|
return None
|
|
|
|
def _fetch_data_from_exchange(self, timeframe, n_candles):
|
|
"""
|
|
Placeholder method for fetching data from an exchange.
|
|
In a real implementation, this would connect to an exchange API.
|
|
"""
|
|
# This is a placeholder - in a real implementation this would make API calls
|
|
# to a cryptocurrency exchange to fetch OHLCV data
|
|
|
|
# For now, just generate dummy data
|
|
self.dataframes[timeframe] = self._create_dummy_data(timeframe, n_candles)
|
|
|
|
def _create_dummy_data(self, timeframe, n_candles):
|
|
"""
|
|
Create dummy OHLCV data for testing purposes.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to create data for
|
|
n_candles (int): Number of candles to create
|
|
|
|
Returns:
|
|
pd.DataFrame: DataFrame with dummy OHLCV data
|
|
"""
|
|
# Map timeframe to seconds
|
|
tf_seconds = {
|
|
'1s': 1, # Added 1s timeframe
|
|
'1m': 60,
|
|
'5m': 300,
|
|
'15m': 900,
|
|
'1h': 3600,
|
|
'4h': 14400,
|
|
'1d': 86400
|
|
}
|
|
seconds = tf_seconds.get(timeframe, 3600) # Default to 1h
|
|
|
|
# Create timestamps
|
|
end_time = datetime.now()
|
|
timestamps = [end_time - timedelta(seconds=seconds * i) for i in range(n_candles)]
|
|
timestamps.reverse() # Oldest first
|
|
|
|
# Generate random price data with realistic patterns
|
|
np.random.seed(42) # For reproducibility
|
|
|
|
# Start price
|
|
price = 50000 # For BTC/USDT
|
|
prices = []
|
|
volumes = []
|
|
|
|
for i in range(n_candles):
|
|
# Random walk with drift and volatility based on timeframe
|
|
drift = 0.0001 * seconds # Larger drift for larger timeframes
|
|
volatility = 0.01 * np.sqrt(seconds / 3600) # Scale volatility by sqrt of time
|
|
|
|
# Daily/weekly patterns
|
|
if timeframe in ['1d', '4h']:
|
|
# Add some cyclical patterns
|
|
cycle = np.sin(i / 7 * np.pi) * 0.02 # Weekly cycle
|
|
else:
|
|
cycle = np.sin(i / 24 * np.pi) * 0.01 # Daily cycle
|
|
|
|
# Calculate price change with random walk + cycles (clamped to prevent overflow)
|
|
price_change = price * np.clip(drift + volatility * np.random.randn() + cycle, -0.1, 0.1)
|
|
price = np.clip(price + price_change, 1000, 100000) # Keep price in reasonable range
|
|
|
|
# Generate OHLC from the price
|
|
open_price = price
|
|
high_price = price * (1 + abs(0.005 * np.random.randn()))
|
|
low_price = price * (1 - abs(0.005 * np.random.randn()))
|
|
close_price = price * (1 + 0.002 * np.random.randn())
|
|
|
|
# Ensure high >= open, close, low and low <= open, close
|
|
high_price = max(high_price, open_price, close_price)
|
|
low_price = min(low_price, open_price, close_price)
|
|
|
|
# Generate volume (higher for larger price movements) with safe calculation
|
|
volume = 10000 + 5000 * np.random.rand() + abs(price_change)/price * 10000
|
|
|
|
prices.append((open_price, high_price, low_price, close_price))
|
|
volumes.append(volume)
|
|
|
|
# Update price for next iteration
|
|
price = close_price
|
|
|
|
# Create DataFrame
|
|
df = pd.DataFrame(
|
|
[(t, o, h, l, c, v) for t, (o, h, l, c), v in zip(timestamps, prices, volumes)],
|
|
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
|
|
)
|
|
|
|
return df
|
|
|
|
def prepare_nn_input(self, timeframes=None, n_candles=500, window_size=20):
|
|
"""
|
|
Prepare input data for neural network models.
|
|
|
|
Args:
|
|
timeframes (list): List of timeframes to use
|
|
n_candles (int): Number of candles to fetch for each timeframe
|
|
window_size (int): Size of the sliding window for feature creation
|
|
|
|
Returns:
|
|
tuple: (X, y, timestamps) where:
|
|
X is the input features array with shape (n_samples, window_size, n_features)
|
|
y is the target array with shape (n_samples,)
|
|
timestamps is an array of timestamps for each sample
|
|
"""
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
# Get data for all requested timeframes
|
|
dfs = {}
|
|
min_length = float('inf')
|
|
for tf in timeframes:
|
|
# For 1s timeframe, we need more data points
|
|
tf_candles = n_candles * 60 if tf == '1s' else n_candles
|
|
df = self.get_historical_data(timeframe=tf, n_candles=tf_candles)
|
|
if df is not None and not df.empty:
|
|
dfs[tf] = df
|
|
# Keep track of minimum length across all timeframes
|
|
min_length = min(min_length, len(df))
|
|
|
|
if not dfs:
|
|
logger.error("No data available for feature creation")
|
|
return None, None, None
|
|
|
|
# Align all dataframes to the same length
|
|
for tf in dfs:
|
|
dfs[tf] = dfs[tf].tail(min_length)
|
|
|
|
# Create features for each timeframe
|
|
features = []
|
|
targets = None
|
|
timestamps = None
|
|
|
|
for tf in timeframes:
|
|
if tf in dfs:
|
|
X, y, ts = self._create_features(dfs[tf], window_size)
|
|
if X is not None and y is not None:
|
|
features.append(X)
|
|
if targets is None: # Only need targets from one timeframe
|
|
targets = y
|
|
timestamps = ts
|
|
|
|
if not features or targets is None:
|
|
logger.error("Failed to create features for any timeframe")
|
|
return None, None, None
|
|
|
|
# Ensure all feature arrays have the same length
|
|
min_samples = min(f.shape[0] for f in features)
|
|
features = [f[-min_samples:] for f in features]
|
|
targets = targets[-min_samples:]
|
|
timestamps = timestamps[-min_samples:]
|
|
|
|
# Stack features from all timeframes
|
|
X = np.concatenate([f.reshape(min_samples, window_size, -1) for f in features], axis=2)
|
|
|
|
# Validate data
|
|
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
|
|
logger.error("Generated features contain NaN or infinite values")
|
|
return None, None, None
|
|
|
|
logger.info(f"Prepared input data - X shape: {X.shape}, y shape: {targets.shape}")
|
|
return X, targets, timestamps
|
|
|
|
def _create_features(self, df, window_size):
|
|
"""
|
|
Create features from OHLCV data using a sliding window.
|
|
|
|
Args:
|
|
df (pd.DataFrame): DataFrame with OHLCV data
|
|
window_size (int): Size of the sliding window
|
|
|
|
Returns:
|
|
tuple: (X, y, timestamps) where:
|
|
X is the feature array
|
|
y is the target array
|
|
timestamps is the array of timestamps
|
|
"""
|
|
if len(df) < window_size + 1:
|
|
logger.error(f"Not enough data for feature creation (need {window_size + 1}, got {len(df)})")
|
|
return None, None, None
|
|
|
|
# Extract OHLCV data
|
|
data = df[['open', 'high', 'low', 'close', 'volume']].values
|
|
timestamps = df['timestamp'].values
|
|
|
|
# Create sliding windows
|
|
X = np.array([data[i:i+window_size] for i in range(len(data)-window_size)])
|
|
|
|
# Create targets (next candle's movement: 0=down, 1=neutral, 2=up)
|
|
next_close = data[window_size:, 3] # Close prices
|
|
curr_close = data[window_size-1:-1, 3]
|
|
price_changes = (next_close - curr_close) / curr_close
|
|
|
|
# Define thresholds for price movement classification
|
|
threshold = 0.001 # 0.1% threshold
|
|
y = np.zeros(len(price_changes), dtype=int)
|
|
y[price_changes > threshold] = 2 # Up
|
|
y[(price_changes >= -threshold) & (price_changes <= threshold)] = 1 # Neutral
|
|
|
|
logger.info(f"Created features - X shape: {X.shape}, y shape: {y.shape}")
|
|
return X, y, timestamps[window_size:]
|
|
|
|
def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20):
|
|
"""
|
|
Generate and save a training dataset for neural network models.
|
|
|
|
Args:
|
|
timeframes (list): List of timeframes to use
|
|
n_candles (int): Number of candles to fetch for each timeframe
|
|
window_size (int): Size of the sliding window for feature creation
|
|
|
|
Returns:
|
|
dict: Dictionary of dataset file paths
|
|
"""
|
|
if timeframes is None:
|
|
timeframes = self.timeframes
|
|
|
|
# Prepare inputs
|
|
X, y, timestamps = self.prepare_nn_input(timeframes, n_candles, window_size)
|
|
|
|
if X is None or y is None:
|
|
logger.error("Failed to prepare input data for dataset")
|
|
return None
|
|
|
|
# Prepare output paths
|
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
dataset_name = f"{self.symbol.replace('/', '_')}_{'_'.join(timeframes)}_{timestamp_str}"
|
|
|
|
X_path = os.path.join(self.data_dir, f"{dataset_name}_X.npy")
|
|
y_path = os.path.join(self.data_dir, f"{dataset_name}_y.npy")
|
|
timestamps_path = os.path.join(self.data_dir, f"{dataset_name}_timestamps.npy")
|
|
metadata_path = os.path.join(self.data_dir, f"{dataset_name}_metadata.json")
|
|
|
|
# Save arrays
|
|
np.save(X_path, X)
|
|
np.save(y_path, y)
|
|
np.save(timestamps_path, timestamps)
|
|
|
|
# Save metadata
|
|
metadata = {
|
|
'symbol': self.symbol,
|
|
'timeframes': timeframes,
|
|
'window_size': window_size,
|
|
'n_samples': len(X),
|
|
'feature_shape': X.shape[1:],
|
|
'created_at': datetime.now().isoformat(),
|
|
'dataset_name': dataset_name
|
|
}
|
|
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
# Save scalers
|
|
scaler_path = os.path.join(self.data_dir, f"{dataset_name}_scalers.pkl")
|
|
with open(scaler_path, 'wb') as f:
|
|
pickle.dump(self.scalers, f)
|
|
|
|
# Return dataset info
|
|
dataset_info = {
|
|
'X_path': X_path,
|
|
'y_path': y_path,
|
|
'timestamps_path': timestamps_path,
|
|
'metadata_path': metadata_path,
|
|
'scaler_path': scaler_path
|
|
}
|
|
|
|
logger.info(f"Dataset generated and saved: {dataset_name}")
|
|
return dataset_info
|
|
|
|
def get_feature_count(self):
|
|
"""
|
|
Calculate total number of features across all timeframes.
|
|
|
|
Returns:
|
|
int: Total number of features (5 features per timeframe)
|
|
"""
|
|
return len(self.timeframes) * 5 # OHLCV features for each timeframe
|
|
|
|
def calculate_pnl(self, predictions, actual_prices, position_size=1.0):
|
|
"""
|
|
Calculate PnL and win rates based on predictions and actual price movements.
|
|
|
|
Args:
|
|
predictions: Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) or probabilities
|
|
actual_prices: Array of actual close prices
|
|
position_size: Position size for each trade
|
|
|
|
Returns:
|
|
tuple: (pnl, win_rate, trades) where:
|
|
pnl is the total profit and loss
|
|
win_rate is the ratio of winning trades
|
|
trades is a list of trade dictionaries
|
|
"""
|
|
# Ensure we have enough prices for the predictions
|
|
if len(actual_prices) <= 1:
|
|
logger.error("Not enough price data for PnL calculation")
|
|
return 0.0, 0.0, []
|
|
|
|
# Adjust predictions length to match available price data
|
|
n_prices = len(actual_prices) - 1 # We need current and next price for each prediction
|
|
if len(predictions) > n_prices:
|
|
predictions = predictions[:n_prices]
|
|
elif len(predictions) < n_prices:
|
|
n_prices = len(predictions)
|
|
actual_prices = actual_prices[:n_prices + 1] # +1 to include the next price
|
|
|
|
pnl = 0.0
|
|
trades = 0
|
|
wins = 0
|
|
trade_history = []
|
|
|
|
for i in range(len(predictions)):
|
|
pred = predictions[i]
|
|
current_price = actual_prices[i]
|
|
next_price = actual_prices[i + 1]
|
|
|
|
# Calculate price change percentage
|
|
price_change = (next_price - current_price) / current_price
|
|
|
|
# Calculate PnL based on prediction
|
|
if pred == 2: # Buy
|
|
trade_pnl = price_change * position_size
|
|
trades += 1
|
|
if trade_pnl > 0:
|
|
wins += 1
|
|
trade_history.append({
|
|
'type': 'buy',
|
|
'price': current_price,
|
|
'pnl': trade_pnl,
|
|
'timestamp': self.dataframes[self.timeframes[0]]['timestamp'].iloc[i] if self.dataframes[self.timeframes[0]] is not None else None
|
|
})
|
|
elif pred == 0: # Sell
|
|
trade_pnl = -price_change * position_size
|
|
trades += 1
|
|
if trade_pnl > 0:
|
|
wins += 1
|
|
trade_history.append({
|
|
'type': 'sell',
|
|
'price': current_price,
|
|
'pnl': trade_pnl,
|
|
'timestamp': self.dataframes[self.timeframes[0]]['timestamp'].iloc[i] if self.dataframes[self.timeframes[0]] is not None else None
|
|
})
|
|
|
|
pnl += trade_pnl if pred in [0, 2] else 0
|
|
|
|
win_rate = wins / trades if trades > 0 else 0.0
|
|
return pnl, win_rate, trade_history
|
|
|
|
def get_future_prices(self, prices, n_candles=3):
|
|
"""
|
|
Extract future prices for retrospective training.
|
|
|
|
Args:
|
|
prices (np.ndarray): Array of prices
|
|
n_candles (int): Number of future candles to look at
|
|
|
|
Returns:
|
|
np.ndarray: Future prices array
|
|
"""
|
|
if len(prices) < n_candles + 1:
|
|
return None
|
|
|
|
# For each price point, get the maximum price in the next n_candles
|
|
future_prices = np.zeros(len(prices))
|
|
|
|
for i in range(len(prices) - n_candles):
|
|
# Get the next n candles
|
|
next_candles = prices[i+1:i+n_candles+1]
|
|
# Use the maximum price as the future price
|
|
future_prices[i] = np.max(next_candles)
|
|
|
|
# For the last n_candles points, use the last available price
|
|
future_prices[-n_candles:] = prices[-1]
|
|
|
|
return future_prices
|
|
|
|
def prepare_training_data(self, refresh=False, refresh_interval=300):
|
|
"""
|
|
Prepare data for training, including splitting into train/validation sets.
|
|
|
|
Args:
|
|
refresh (bool): Whether to refresh the data cache
|
|
refresh_interval (int): Interval in seconds to refresh data
|
|
|
|
Returns:
|
|
tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices)
|
|
"""
|
|
current_time = datetime.now()
|
|
|
|
# Check if we should refresh the data
|
|
if refresh or not hasattr(self, 'last_refresh_time') or \
|
|
(current_time - self.last_refresh_time).total_seconds() > refresh_interval:
|
|
logger.info("Refreshing training data...")
|
|
self.last_refresh_time = current_time
|
|
else:
|
|
# Use cached data
|
|
if hasattr(self, 'cached_train_data'):
|
|
return self.cached_train_data
|
|
|
|
# Prepare input data
|
|
X, y, _ = self.prepare_nn_input()
|
|
if X is None:
|
|
return None, None, None, None, None, None
|
|
|
|
# Get price data for PnL calculation
|
|
raw_prices = []
|
|
for tf in self.timeframes:
|
|
if tf in self.dataframes and self.dataframes[tf] is not None:
|
|
# Get the close prices for the same period as X
|
|
prices = self.dataframes[tf]['close'].values[-len(X):]
|
|
if len(prices) == len(X):
|
|
raw_prices = prices
|
|
break
|
|
|
|
if len(raw_prices) != len(X):
|
|
raw_prices = np.zeros(len(X)) # Fallback if no prices available
|
|
|
|
# Split data into training and validation sets (80/20)
|
|
split_idx = int(len(X) * 0.8)
|
|
X_train, X_val = X[:split_idx], X[split_idx:]
|
|
y_train, y_val = y[:split_idx], y[split_idx:]
|
|
train_prices, val_prices = raw_prices[:split_idx], raw_prices[split_idx:]
|
|
|
|
# Cache the data
|
|
self.cached_train_data = (X_train, y_train, X_val, y_val, train_prices, val_prices)
|
|
|
|
return X_train, y_train, X_val, y_val, train_prices, val_prices
|
|
|
|
def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20):
|
|
"""
|
|
Prepare a single input sample from the most recent data for real-time inference.
|
|
|
|
Args:
|
|
timeframe (str): Timeframe to use
|
|
n_candles (int): Number of recent candles to fetch
|
|
window_size (int): Size of the sliding window
|
|
|
|
Returns:
|
|
tuple: (X, timestamp) where:
|
|
X is the input features array with shape (1, window_size, n_features)
|
|
timestamp is the timestamp of the most recent candle
|
|
"""
|
|
# Get recent data
|
|
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=False)
|
|
|
|
if df is None or len(df) < window_size:
|
|
logger.error(f"Not enough data for inference (need at least {window_size} candles)")
|
|
return None, None
|
|
|
|
# Extract features from the most recent window
|
|
ohlcv = df[['open', 'high', 'low', 'close', 'volume']].tail(window_size).values
|
|
|
|
# Scale the data
|
|
if timeframe in self.scalers:
|
|
# Use existing scaler
|
|
scaler = self.scalers[timeframe]
|
|
else:
|
|
# Create new scaler
|
|
scaler = MinMaxScaler()
|
|
# Fit on all available data
|
|
all_data = df[['open', 'high', 'low', 'close', 'volume']].values
|
|
scaler.fit(all_data)
|
|
self.scalers[timeframe] = scaler
|
|
|
|
ohlcv_scaled = scaler.transform(ohlcv)
|
|
|
|
# Reshape to (1, window_size, n_features)
|
|
X = np.array([ohlcv_scaled])
|
|
|
|
# Get timestamp of the most recent candle
|
|
timestamp = df['timestamp'].iloc[-1]
|
|
|
|
return X, timestamp
|