gogo2/NN/models/transformer_model.py
2025-03-25 12:48:58 +02:00

553 lines
19 KiB
Python

import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
Input, Dense, Dropout, LayerNormalization, MultiHeadAttention,
GlobalAveragePooling1D, Concatenate, Add, Activation, Flatten
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import (
EarlyStopping, ModelCheckpoint, ReduceLROnPlateau,
TensorBoard, CSVLogger
)
import matplotlib.pyplot as plt
import logging
import time
import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('nn_transformer_model.log')
]
)
logger = logging.getLogger('transformer_model')
class TransformerBlock(tf.keras.layers.Layer):
"""
Transformer block with multi-head self-attention and feed-forward network
"""
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
super(TransformerBlock, self).__init__()
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = tf.keras.Sequential([
Dense(ff_dim, activation="relu"),
Dense(embed_dim)
])
self.layernorm1 = LayerNormalization(epsilon=1e-6)
self.layernorm2 = LayerNormalization(epsilon=1e-6)
self.dropout1 = Dropout(rate)
self.dropout2 = Dropout(rate)
def call(self, inputs, training=False):
# Normalization and attention
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
# Feed-forward network
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
# Skip connection and normalization
return self.layernorm2(out1 + ffn_output)
class TransformerModel:
"""
Transformer-based model for financial time series analysis.
This model processes both raw time series data and high-level features from the CNN model.
"""
def __init__(self, ts_input_shape=(20, 5), feature_input_shape=128, output_size=3, model_dir='NN/models/saved'):
"""
Initialize the Transformer model
Args:
ts_input_shape: Shape of time series input data (sequence_length, features)
feature_input_shape: Shape of high-level feature input (from CNN)
output_size: Number of output classes or values
model_dir: Directory to save model files
"""
self.ts_input_shape = ts_input_shape
self.feature_input_shape = feature_input_shape
self.output_size = output_size
self.model_dir = model_dir
self.model = None
self.history = None
# Create model directory if it doesn't exist
os.makedirs(model_dir, exist_ok=True)
logger.info(f"Initialized TransformerModel with time series input shape {ts_input_shape}, "
f"feature input shape {feature_input_shape}, and output size {output_size}")
def build_model(self, embed_dim=64, num_heads=4, ff_dim=128, num_transformer_blocks=2,
dropout_rate=0.2, learning_rate=0.001):
"""
Build the Transformer model architecture
Args:
embed_dim: Embedding dimension for the transformer
num_heads: Number of attention heads
ff_dim: Hidden layer size in the feed-forward network
num_transformer_blocks: Number of transformer blocks to stack
dropout_rate: Dropout rate for regularization
learning_rate: Learning rate for the optimizer
Returns:
Compiled Keras model
"""
# Time series input (price and volume data)
ts_inputs = Input(shape=self.ts_input_shape, name='time_series_input')
# High-level feature input (from CNN or other sources)
feature_inputs = Input(shape=(self.feature_input_shape,), name='feature_input')
# Process time series with transformer blocks
x = ts_inputs
for _ in range(num_transformer_blocks):
x = TransformerBlock(embed_dim, num_heads, ff_dim, dropout_rate)(x)
# Global pooling to get fixed-size representation
x = GlobalAveragePooling1D()(x)
# Combine with the high-level features
combined = Concatenate()([x, feature_inputs])
# Dense layers
dense1 = Dense(128, activation='relu')(combined)
dropout1 = Dropout(dropout_rate)(dense1)
dense2 = Dense(64, activation='relu')(dropout1)
dropout2 = Dropout(dropout_rate)(dense2)
# Output layer
if self.output_size == 1:
# Binary classification
outputs = Dense(1, activation='sigmoid')(dropout2)
elif self.output_size == 3:
# For BUY/HOLD/SELL signals (3 classes)
outputs = Dense(3, activation='softmax')(dropout2)
else:
# Regression or multi-class classification
outputs = Dense(self.output_size, activation='linear')(dropout2)
# Create and compile the model
model = Model(inputs=[ts_inputs, feature_inputs], outputs=outputs)
if self.output_size == 1:
# Binary classification
model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='binary_crossentropy',
metrics=['accuracy']
)
elif self.output_size == 3:
# Multi-class classification for BUY/HOLD/SELL
model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy']
)
else:
# Regression
model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='mse',
metrics=['mae']
)
self.model = model
logger.info(f"Model built with {model.count_params()} parameters")
model.summary(print_fn=logger.info)
return model
def train(self, X_ts, X_features, y, batch_size=32, epochs=100, validation_split=0.2,
early_stopping_patience=20, reduce_lr_patience=10, verbose=1):
"""
Train the Transformer model
Args:
X_ts: Time series input data
X_features: High-level feature input data
y: Target values
batch_size: Batch size for training
epochs: Maximum number of epochs
validation_split: Fraction of data to use for validation
early_stopping_patience: Patience for early stopping
reduce_lr_patience: Patience for learning rate reduction
verbose: Verbosity level
Returns:
Training history
"""
if self.model is None:
logger.warning("Model not built yet, building with default parameters")
self.build_model()
# Create a timestamp for this training run
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
model_name = f"transformer_model_{timestamp}"
# Set up callbacks
callbacks = [
# Early stopping to prevent overfitting
EarlyStopping(
monitor='val_loss',
patience=early_stopping_patience,
restore_best_weights=True,
verbose=1
),
# Reduce learning rate when training plateaus
ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=reduce_lr_patience,
min_lr=1e-6,
verbose=1
),
# Save the best model
ModelCheckpoint(
filepath=os.path.join(self.model_dir, f"{model_name}_best.h5"),
monitor='val_loss',
save_best_only=True,
verbose=1
),
# TensorBoard logging
TensorBoard(
log_dir=os.path.join(self.model_dir, 'logs', model_name),
histogram_freq=1
),
# CSV logging
CSVLogger(
filename=os.path.join(self.model_dir, f"{model_name}_training.csv"),
separator=',',
append=False
)
]
# Train the model
logger.info(f"Starting training with {len(X_ts)} samples, {epochs} max epochs")
start_time = time.time()
history = self.model.fit(
[X_ts, X_features], y,
batch_size=batch_size,
epochs=epochs,
validation_split=validation_split,
callbacks=callbacks,
verbose=verbose
)
# Calculate training time
training_time = time.time() - start_time
logger.info(f"Training completed in {training_time:.2f} seconds")
# Save the final model
self.model.save(os.path.join(self.model_dir, f"{model_name}_final.h5"))
logger.info(f"Model saved to {os.path.join(self.model_dir, model_name + '_final.h5')}")
# Save training history
hist_df = pd.DataFrame(history.history)
hist_df.to_csv(os.path.join(self.model_dir, f"{model_name}_history.csv"), index=False)
self.history = history
return history
def predict(self, X_ts, X_features, threshold=0.5):
"""
Make predictions with the model
Args:
X_ts: Time series input data
X_features: High-level feature input data
threshold: Threshold for binary classification
Returns:
Predicted values or classes
"""
if self.model is None:
logger.error("Model not built or trained yet")
return None
# Get raw predictions
y_pred_proba = self.model.predict([X_ts, X_features])
# Format predictions based on output type
if self.output_size == 1:
# Binary classification
y_pred = (y_pred_proba > threshold).astype(int).flatten()
return y_pred, y_pred_proba.flatten()
elif self.output_size == 3:
# Multi-class (BUY/HOLD/SELL)
y_pred = np.argmax(y_pred_proba, axis=1)
return y_pred, y_pred_proba
else:
# Regression
return y_pred_proba
def save_model(self, filepath=None):
"""
Save the model to a file
Args:
filepath: Path to save the model to
Returns:
Path to the saved model
"""
if self.model is None:
logger.error("Model not built or trained yet")
return None
if filepath is None:
# Create a default filepath
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(self.model_dir, f"transformer_model_{timestamp}.h5")
self.model.save(filepath)
logger.info(f"Model saved to {filepath}")
return filepath
def load_model(self, filepath):
"""
Load a model from a file
Args:
filepath: Path to load the model from
Returns:
Loaded model
"""
try:
self.model = tf.keras.models.load_model(filepath)
logger.info(f"Model loaded from {filepath}")
return self.model
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
return None
class MixtureOfExpertsModel:
"""
Mixture of Experts (MoE) model that combines predictions from multiple models.
This implementation focuses on combining CNN and Transformer models for financial analysis.
"""
def __init__(self, output_size=3, model_dir='NN/models/saved'):
"""
Initialize the MoE model
Args:
output_size: Number of output classes or values
model_dir: Directory to save model files
"""
self.output_size = output_size
self.model_dir = model_dir
self.models = {} # Dictionary to store expert models
self.gating_model = None # Model to determine which expert to use
self.model = None # Combined MoE model
# Create model directory if it doesn't exist
os.makedirs(model_dir, exist_ok=True)
logger.info(f"Initialized MixtureOfExpertsModel with output size {output_size}")
def add_expert(self, name, model):
"""
Add an expert model to the MoE
Args:
name: Name of the expert
model: Expert model instance
Returns:
None
"""
self.models[name] = model
logger.info(f"Added expert model '{name}' to MoE")
def build_model(self, ts_input_shape=(20, 5), expert_weights=None, learning_rate=0.001):
"""
Build the MoE model architecture
Args:
ts_input_shape: Shape of time series input data
expert_weights: Dictionary of expert weights (if None, equal weighting)
learning_rate: Learning rate for the optimizer
Returns:
Compiled Keras model
"""
if not self.models:
logger.error("No expert models added to MoE")
return None
# Time series input
ts_inputs = Input(shape=ts_input_shape, name='time_series_input')
# Get predictions from each expert
expert_outputs = []
expert_names = []
for name, model in self.models.items():
if hasattr(model, 'predict') and callable(model.predict):
expert_names.append(name)
if name == 'cnn':
# For CNN, we directly use the time series input
# We need to extract the raw prediction function from the model's predict method
# which typically returns both predictions and probabilities
expert_outputs.append(model.model(ts_inputs))
elif name == 'transformer':
# For transformer, we need features from the CNN as well
# This is a simplification - in a real implementation, we would need to
# extract features from the CNN model and pass them to the transformer
# Here we just create dummy features
dummy_features = Dense(128, activation='relu')(Flatten()(ts_inputs))
expert_outputs.append(model.model([ts_inputs, dummy_features]))
else:
logger.warning(f"Unknown model type: {name}, skipping")
if not expert_outputs:
logger.error("No valid expert models found")
return None
# Use expert weighting
if expert_weights is None:
# Equal weighting
weights = [1.0 / len(expert_outputs)] * len(expert_outputs)
else:
# User-provided weights
weights = [expert_weights.get(name, 1.0 / len(expert_outputs)) for name in expert_names]
# Normalize weights
weights = [w / sum(weights) for w in weights]
# Combine expert outputs using weighted average
if len(expert_outputs) == 1:
# Only one expert, use its output directly
combined_output = expert_outputs[0]
else:
# Multiple experts, compute weighted average
weighted_outputs = [output * weight for output, weight in zip(expert_outputs, weights)]
combined_output = Add()(weighted_outputs)
# Create the MoE model
moe_model = Model(inputs=ts_inputs, outputs=combined_output)
# Compile the model
if self.output_size == 1:
# Binary classification
moe_model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='binary_crossentropy',
metrics=['accuracy']
)
elif self.output_size == 3:
# Multi-class classification for BUY/HOLD/SELL
moe_model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='categorical_crossentropy',
metrics=['accuracy']
)
else:
# Regression
moe_model.compile(
optimizer=Adam(learning_rate=learning_rate),
loss='mse',
metrics=['mae']
)
self.model = moe_model
logger.info(f"MoE model built with experts: {expert_names}, weights: {weights}")
moe_model.summary(print_fn=logger.info)
return moe_model
def predict(self, X, threshold=0.5):
"""
Make predictions with the MoE model
Args:
X: Input data
threshold: Threshold for binary classification
Returns:
Predicted values or classes
"""
if self.model is None:
logger.error("MoE model not built yet")
return None
# Get raw predictions
y_pred_proba = self.model.predict(X)
# Format predictions based on output type
if self.output_size == 1:
# Binary classification
y_pred = (y_pred_proba > threshold).astype(int).flatten()
return y_pred, y_pred_proba.flatten()
elif self.output_size == 3:
# Multi-class (BUY/HOLD/SELL)
y_pred = np.argmax(y_pred_proba, axis=1)
return y_pred, y_pred_proba
else:
# Regression
return y_pred_proba
def save_model(self, filepath=None):
"""
Save the MoE model to a file
Args:
filepath: Path to save the model to
Returns:
Path to the saved model
"""
if self.model is None:
logger.error("MoE model not built yet")
return None
if filepath is None:
# Create a default filepath
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(self.model_dir, f"moe_model_{timestamp}.h5")
self.model.save(filepath)
logger.info(f"MoE model saved to {filepath}")
return filepath
def load_model(self, filepath):
"""
Load an MoE model from a file
Args:
filepath: Path to load the model from
Returns:
Loaded model
"""
try:
self.model = tf.keras.models.load_model(filepath)
logger.info(f"MoE model loaded from {filepath}")
return self.model
except Exception as e:
logger.error(f"Error loading MoE model: {str(e)}")
return None
# Example usage:
if __name__ == "__main__":
# This would be a complete implementation in a real system
print("Transformer and MoE models defined, but not implemented here.")