gogo2/NN/utils/data_interface.py
2025-03-29 01:56:45 +02:00

436 lines
17 KiB
Python

"""
Data Interface for Neural Network Trading System
This module provides functionality to fetch, process, and prepare data for the neural network models.
"""
import os
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json
import pickle
from sklearn.preprocessing import MinMaxScaler
logger = logging.getLogger(__name__)
class DataInterface:
"""
Enhanced Data Interface supporting:
- Multiple trading pairs (up to 3)
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
- Technical indicators (up to 20)
- Cross-timeframe normalization
- Real-time tick streaming
"""
def __init__(self, symbol=None, timeframes=None, data_dir="NN/data"):
"""
Initialize the data interface.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
data_dir (str): Directory to store/load datasets
"""
self.symbol = symbol
self.timeframes = timeframes or ['1h', '4h', '1d']
self.data_dir = data_dir
self.scalers = {} # Store scalers for each timeframe
# Create data directory if it doesn't exist
os.makedirs(self.data_dir, exist_ok=True)
# Initialize empty dataframes for each timeframe
self.dataframes = {tf: None for tf in self.timeframes}
logger.info(f"DataInterface initialized for {symbol} with timeframes {timeframes}")
def get_historical_data(self, timeframe='1h', n_candles=1000, use_cache=True):
"""
Fetch historical price data for a given timeframe.
Args:
timeframe (str): Timeframe to fetch data for
n_candles (int): Number of candles to fetch
use_cache (bool): Whether to use cached data if available
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
cache_file = os.path.join(self.data_dir, f"{self.symbol.replace('/', '_')}_{timeframe}.csv")
# Check if cached data exists and is recent
if use_cache and os.path.exists(cache_file):
try:
df = pd.read_csv(cache_file, parse_dates=['timestamp'])
# If we have enough data and it's recent, use it
if len(df) >= n_candles:
logger.info(f"Using cached data for {self.symbol} {timeframe} ({len(df)} candles)")
self.dataframes[timeframe] = df
return df.tail(n_candles)
except Exception as e:
logger.error(f"Error reading cached data: {str(e)}")
# If we get here, we need to fetch data
# For now, we'll use a placeholder for fetching data from an exchange
try:
# In a real implementation, we would fetch data from an exchange or API here
# For this example, we'll create dummy data if we can't load from cache
logger.info(f"Fetching historical data for {self.symbol} {timeframe}")
# Placeholder for real data fetching
# In a real implementation, this would be replaced with API calls
self._fetch_data_from_exchange(timeframe, n_candles)
# Save to cache
if self.dataframes[timeframe] is not None:
self.dataframes[timeframe].to_csv(cache_file, index=False)
return self.dataframes[timeframe]
else:
# Create dummy data as fallback
logger.warning(f"Could not fetch data for {self.symbol} {timeframe}, using dummy data")
df = self._create_dummy_data(timeframe, n_candles)
self.dataframes[timeframe] = df
return df
except Exception as e:
logger.error(f"Error fetching data: {str(e)}")
return None
def _fetch_data_from_exchange(self, timeframe, n_candles):
"""
Placeholder method for fetching data from an exchange.
In a real implementation, this would connect to an exchange API.
"""
# This is a placeholder - in a real implementation this would make API calls
# to a cryptocurrency exchange to fetch OHLCV data
# For now, just generate dummy data
self.dataframes[timeframe] = self._create_dummy_data(timeframe, n_candles)
def _create_dummy_data(self, timeframe, n_candles):
"""
Create dummy OHLCV data for testing purposes.
Args:
timeframe (str): Timeframe to create data for
n_candles (int): Number of candles to create
Returns:
pd.DataFrame: DataFrame with dummy OHLCV data
"""
# Map timeframe to seconds
tf_seconds = {
'1m': 60,
'5m': 300,
'15m': 900,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
seconds = tf_seconds.get(timeframe, 3600) # Default to 1h
# Create timestamps
end_time = datetime.now()
timestamps = [end_time - timedelta(seconds=seconds * i) for i in range(n_candles)]
timestamps.reverse() # Oldest first
# Generate random price data with realistic patterns
np.random.seed(42) # For reproducibility
# Start price
price = 50000 # For BTC/USDT
prices = []
volumes = []
for i in range(n_candles):
# Random walk with drift and volatility based on timeframe
drift = 0.0001 * seconds # Larger drift for larger timeframes
volatility = 0.01 * np.sqrt(seconds / 3600) # Scale volatility by sqrt of time
# Daily/weekly patterns
if timeframe in ['1d', '4h']:
# Add some cyclical patterns
cycle = np.sin(i / 7 * np.pi) * 0.02 # Weekly cycle
else:
cycle = np.sin(i / 24 * np.pi) * 0.01 # Daily cycle
# Calculate price change with random walk + cycles (clamped to prevent overflow)
price_change = price * np.clip(drift + volatility * np.random.randn() + cycle, -0.1, 0.1)
price = np.clip(price + price_change, 1000, 100000) # Keep price in reasonable range
# Generate OHLC from the price
open_price = price
high_price = price * (1 + abs(0.005 * np.random.randn()))
low_price = price * (1 - abs(0.005 * np.random.randn()))
close_price = price * (1 + 0.002 * np.random.randn())
# Ensure high >= open, close, low and low <= open, close
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
# Generate volume (higher for larger price movements) with safe calculation
volume = 10000 + 5000 * np.random.rand() + abs(price_change)/price * 10000
prices.append((open_price, high_price, low_price, close_price))
volumes.append(volume)
# Update price for next iteration
price = close_price
# Create DataFrame
df = pd.DataFrame(
[(t, o, h, l, c, v) for t, (o, h, l, c), v in zip(timestamps, prices, volumes)],
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
)
return df
def prepare_nn_input(self, timeframes=None, n_candles=500, window_size=20):
"""
Prepare input data for neural network models.
Args:
timeframes (list): List of timeframes to use
n_candles (int): Number of candles to fetch for each timeframe
window_size (int): Size of the sliding window for feature creation
Returns:
tuple: (X, y, timestamps) where:
X is the input features array with shape (n_samples, window_size, n_features)
y is the target array with shape (n_samples,)
timestamps is an array of timestamps for each sample
"""
if timeframes is None:
timeframes = self.timeframes
# Get data for all requested timeframes
dfs = {}
for tf in timeframes:
df = self.get_historical_data(timeframe=tf, n_candles=n_candles)
if df is not None and not df.empty:
dfs[tf] = df
if not dfs:
logger.error("No data available for feature creation")
return None, None, None
# Create features for each timeframe
features = []
targets = []
timestamps = []
for tf in timeframes:
if tf in dfs:
X, y, ts = self._create_features(dfs[tf], window_size)
features.append(X)
if len(targets) == 0: # Only need targets from one timeframe
targets = y
timestamps = ts
if not features:
return None, None, None
# Stack features from all timeframes along the time dimension
# Reshape each timeframe's features to [samples, window, 1, features]
reshaped_features = [f.reshape(f.shape[0], f.shape[1], 1, f.shape[2])
for f in features]
# Concatenate along the channel dimension
X = np.concatenate(reshaped_features, axis=2)
# Reshape to [samples, window, features*timeframes]
X = X.reshape(X.shape[0], X.shape[1], -1)
# Validate data
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
logger.error("Generated features contain NaN or infinite values")
return None, None, None
# Ensure all values are finite and normalized
X = np.nan_to_num(X, nan=0.0, posinf=1.0, neginf=-1.0)
X = np.clip(X, -1e6, 1e6) # Clip extreme values
return X, targets, timestamps
def _create_features(self, df, window_size):
"""
Create features from OHLCV data using a sliding window approach.
Args:
df (pd.DataFrame): DataFrame with OHLCV data
window_size (int): Size of the sliding window
Returns:
tuple: (X, y, timestamps) where:
X is the input features array
y is the target array
timestamps is an array of timestamps for each sample
"""
# Extract OHLCV columns
ohlcv = df[['open', 'high', 'low', 'close', 'volume']].values
# Validate data before scaling
if np.any(np.isnan(ohlcv)) or np.any(np.isinf(ohlcv)):
logger.error("Input data contains NaN or infinite values")
return None, None, None
# Handle potential constant columns (avoid division by zero in scaler)
ohlcv = np.nan_to_num(ohlcv, nan=0.0)
ranges = np.ptp(ohlcv, axis=0)
for i in range(len(ranges)):
if ranges[i] == 0: # Constant column
ohlcv[:, i] = 1 if i == 3 else 0 # Set close to 1, others to 0
# Scale the data with safety checks
try:
scaler = MinMaxScaler()
ohlcv_scaled = scaler.fit_transform(ohlcv)
if np.any(np.isnan(ohlcv_scaled)) or np.any(np.isinf(ohlcv_scaled)):
logger.error("Scaling produced invalid values")
return None, None, None
except Exception as e:
logger.error(f"Scaling failed: {str(e)}")
return None, None, None
# Store the scaler for later use
timeframe = next((tf for tf in self.timeframes if self.dataframes.get(tf) is not None and
self.dataframes[tf].equals(df)), 'unknown')
self.scalers[timeframe] = scaler
# Create sliding windows
X = []
y = []
timestamps = []
for i in range(len(ohlcv_scaled) - window_size):
# Input: window_size candles of OHLCV data
X.append(ohlcv_scaled[i:i+window_size])
# Target: binary classification - price goes up (1) or down (0)
# 1 if close price increases in the next candle, 0 otherwise
price_change = ohlcv[i+window_size, 3] - ohlcv[i+window_size-1, 3]
y.append(1 if price_change > 0 else 0)
# Store timestamp for reference
timestamps.append(df['timestamp'].iloc[i+window_size])
return np.array(X), np.array(y), np.array(timestamps)
def generate_training_dataset(self, timeframes=None, n_candles=1000, window_size=20):
"""
Generate and save a training dataset for neural network models.
Args:
timeframes (list): List of timeframes to use
n_candles (int): Number of candles to fetch for each timeframe
window_size (int): Size of the sliding window for feature creation
Returns:
dict: Dictionary of dataset file paths
"""
if timeframes is None:
timeframes = self.timeframes
# Prepare inputs
X, y, timestamps = self.prepare_nn_input(timeframes, n_candles, window_size)
if X is None or y is None:
logger.error("Failed to prepare input data for dataset")
return None
# Prepare output paths
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
dataset_name = f"{self.symbol.replace('/', '_')}_{'_'.join(timeframes)}_{timestamp_str}"
X_path = os.path.join(self.data_dir, f"{dataset_name}_X.npy")
y_path = os.path.join(self.data_dir, f"{dataset_name}_y.npy")
timestamps_path = os.path.join(self.data_dir, f"{dataset_name}_timestamps.npy")
metadata_path = os.path.join(self.data_dir, f"{dataset_name}_metadata.json")
# Save arrays
np.save(X_path, X)
np.save(y_path, y)
np.save(timestamps_path, timestamps)
# Save metadata
metadata = {
'symbol': self.symbol,
'timeframes': timeframes,
'window_size': window_size,
'n_samples': len(X),
'feature_shape': X.shape[1:],
'created_at': datetime.now().isoformat(),
'dataset_name': dataset_name
}
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
# Save scalers
scaler_path = os.path.join(self.data_dir, f"{dataset_name}_scalers.pkl")
with open(scaler_path, 'wb') as f:
pickle.dump(self.scalers, f)
# Return dataset info
dataset_info = {
'X_path': X_path,
'y_path': y_path,
'timestamps_path': timestamps_path,
'metadata_path': metadata_path,
'scaler_path': scaler_path
}
logger.info(f"Dataset generated and saved: {dataset_name}")
return dataset_info
def get_feature_count(self):
"""Get the number of features per input sample"""
# OHLCV (5 features) per timeframe
return 5 * len(self.timeframes)
def prepare_realtime_input(self, timeframe='1h', n_candles=30, window_size=20):
"""
Prepare a single input sample from the most recent data for real-time inference.
Args:
timeframe (str): Timeframe to use
n_candles (int): Number of recent candles to fetch
window_size (int): Size of the sliding window
Returns:
tuple: (X, timestamp) where:
X is the input features array with shape (1, window_size, n_features)
timestamp is the timestamp of the most recent candle
"""
# Get recent data
df = self.get_historical_data(timeframe=timeframe, n_candles=n_candles, use_cache=False)
if df is None or len(df) < window_size:
logger.error(f"Not enough data for inference (need at least {window_size} candles)")
return None, None
# Extract features from the most recent window
ohlcv = df[['open', 'high', 'low', 'close', 'volume']].tail(window_size).values
# Scale the data
if timeframe in self.scalers:
# Use existing scaler
scaler = self.scalers[timeframe]
else:
# Create new scaler
scaler = MinMaxScaler()
# Fit on all available data
all_data = df[['open', 'high', 'low', 'close', 'volume']].values
scaler.fit(all_data)
self.scalers[timeframe] = scaler
ohlcv_scaled = scaler.transform(ohlcv)
# Reshape to (1, window_size, n_features)
X = np.array([ohlcv_scaled])
# Get timestamp of the most recent candle
timestamp = df['timestamp'].iloc[-1]
return X, timestamp