769 lines
28 KiB
Python
769 lines
28 KiB
Python
"""
|
|
Transformer Neural Network for timeseries analysis
|
|
|
|
This module implements a Transformer model with attention mechanisms for cryptocurrency price analysis.
|
|
It also includes a Mixture of Experts model that combines predictions from multiple models.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import tensorflow as tf
|
|
from tensorflow.keras.models import Model, load_model
|
|
from tensorflow.keras.layers import (
|
|
Input, Dense, Dropout, BatchNormalization,
|
|
Concatenate, Layer, LayerNormalization, MultiHeadAttention,
|
|
Add, GlobalAveragePooling1D, Conv1D, Reshape
|
|
)
|
|
from tensorflow.keras.optimizers import Adam
|
|
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
|
|
import datetime
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TransformerBlock(Layer):
|
|
"""
|
|
Transformer block implementation with multi-head attention and feed-forward networks.
|
|
"""
|
|
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
|
|
super(TransformerBlock, self).__init__()
|
|
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
|
|
self.ffn = tf.keras.Sequential([
|
|
Dense(ff_dim, activation="relu"),
|
|
Dense(embed_dim),
|
|
])
|
|
self.layernorm1 = LayerNormalization(epsilon=1e-6)
|
|
self.layernorm2 = LayerNormalization(epsilon=1e-6)
|
|
self.dropout1 = Dropout(rate)
|
|
self.dropout2 = Dropout(rate)
|
|
|
|
def call(self, inputs, training=False):
|
|
attn_output = self.att(inputs, inputs)
|
|
attn_output = self.dropout1(attn_output, training=training)
|
|
out1 = self.layernorm1(inputs + attn_output)
|
|
ffn_output = self.ffn(out1)
|
|
ffn_output = self.dropout2(ffn_output, training=training)
|
|
return self.layernorm2(out1 + ffn_output)
|
|
|
|
def get_config(self):
|
|
config = super().get_config()
|
|
config.update({
|
|
'att': self.att,
|
|
'ffn': self.ffn,
|
|
'layernorm1': self.layernorm1,
|
|
'layernorm2': self.layernorm2,
|
|
'dropout1': self.dropout1,
|
|
'dropout2': self.dropout2
|
|
})
|
|
return config
|
|
|
|
class PositionalEncoding(Layer):
|
|
"""
|
|
Positional encoding layer to add position information to input embeddings.
|
|
"""
|
|
def __init__(self, position, d_model):
|
|
super(PositionalEncoding, self).__init__()
|
|
self.position = position
|
|
self.d_model = d_model
|
|
self.pos_encoding = self.positional_encoding(position, d_model)
|
|
|
|
def get_angles(self, position, i, d_model):
|
|
angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
|
|
return position * angles
|
|
|
|
def positional_encoding(self, position, d_model):
|
|
angle_rads = self.get_angles(
|
|
position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
|
|
i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
|
|
d_model=d_model
|
|
)
|
|
|
|
# Apply sin to even indices in the array
|
|
sines = tf.math.sin(angle_rads[:, 0::2])
|
|
|
|
# Apply cos to odd indices in the array
|
|
cosines = tf.math.cos(angle_rads[:, 1::2])
|
|
|
|
pos_encoding = tf.concat([sines, cosines], axis=-1)
|
|
pos_encoding = pos_encoding[tf.newaxis, ...]
|
|
|
|
return tf.cast(pos_encoding, tf.float32)
|
|
|
|
def call(self, inputs):
|
|
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
|
|
|
|
def get_config(self):
|
|
config = super().get_config()
|
|
config.update({
|
|
'position': self.position,
|
|
'd_model': self.d_model,
|
|
'pos_encoding': self.pos_encoding
|
|
})
|
|
return config
|
|
|
|
class TransformerModel:
|
|
"""
|
|
Transformer Neural Network for time series analysis.
|
|
|
|
This model uses self-attention mechanisms to capture relationships between
|
|
different time points in the input data.
|
|
"""
|
|
|
|
def __init__(self, ts_input_shape=(20, 5), feature_input_shape=64, output_size=1, model_dir="NN/models/saved"):
|
|
"""
|
|
Initialize the Transformer model.
|
|
|
|
Args:
|
|
ts_input_shape (tuple): Shape of time series input data (sequence_length, features)
|
|
feature_input_shape (int): Shape of additional feature input (e.g., from CNN)
|
|
output_size (int): Number of output classes (1 for binary, 3 for buy/hold/sell)
|
|
model_dir (str): Directory to save trained models
|
|
"""
|
|
self.ts_input_shape = ts_input_shape
|
|
self.feature_input_shape = feature_input_shape
|
|
self.output_size = output_size
|
|
self.model_dir = model_dir
|
|
self.model = None
|
|
self.history = None
|
|
|
|
# Create model directory if it doesn't exist
|
|
os.makedirs(self.model_dir, exist_ok=True)
|
|
|
|
logger.info(f"Initialized Transformer model with TS input shape {ts_input_shape}, "
|
|
f"feature input shape {feature_input_shape}, and output size {output_size}")
|
|
|
|
def build_model(self, embed_dim=32, num_heads=4, ff_dim=64, num_transformer_blocks=2, dropout_rate=0.1, learning_rate=0.001):
|
|
"""
|
|
Build the Transformer model architecture.
|
|
|
|
Args:
|
|
embed_dim (int): Embedding dimension for transformer
|
|
num_heads (int): Number of attention heads
|
|
ff_dim (int): Hidden dimension of the feed forward network
|
|
num_transformer_blocks (int): Number of transformer blocks
|
|
dropout_rate (float): Dropout rate for regularization
|
|
learning_rate (float): Learning rate for Adam optimizer
|
|
|
|
Returns:
|
|
The compiled model
|
|
"""
|
|
# Time series input
|
|
ts_inputs = Input(shape=self.ts_input_shape, name="ts_input")
|
|
|
|
# Additional feature input (e.g., from CNN)
|
|
feature_inputs = Input(shape=(self.feature_input_shape,), name="feature_input")
|
|
|
|
# Process time series with transformer
|
|
# First, project the input to the embedding dimension
|
|
x = Conv1D(embed_dim, 1, activation="relu")(ts_inputs)
|
|
|
|
# Add positional encoding
|
|
x = PositionalEncoding(self.ts_input_shape[0], embed_dim)(x)
|
|
|
|
# Add transformer blocks
|
|
for _ in range(num_transformer_blocks):
|
|
x = TransformerBlock(embed_dim, num_heads, ff_dim, dropout_rate)(x)
|
|
|
|
# Global pooling to get a single vector representation
|
|
x = GlobalAveragePooling1D()(x)
|
|
x = Dropout(dropout_rate)(x)
|
|
|
|
# Combine with additional features
|
|
combined = Concatenate()([x, feature_inputs])
|
|
|
|
# Dense layers for final classification/regression
|
|
x = Dense(64, activation="relu")(combined)
|
|
x = BatchNormalization()(x)
|
|
x = Dropout(dropout_rate)(x)
|
|
|
|
# Output layer
|
|
if self.output_size == 1:
|
|
# Binary classification (up/down)
|
|
outputs = Dense(1, activation='sigmoid', name='output')(x)
|
|
loss = 'binary_crossentropy'
|
|
metrics = ['accuracy']
|
|
elif self.output_size == 3:
|
|
# Multi-class classification (buy/hold/sell)
|
|
outputs = Dense(3, activation='softmax', name='output')(x)
|
|
loss = 'categorical_crossentropy'
|
|
metrics = ['accuracy']
|
|
else:
|
|
# Regression
|
|
outputs = Dense(self.output_size, activation='linear', name='output')(x)
|
|
loss = 'mse'
|
|
metrics = ['mae']
|
|
|
|
# Create and compile model
|
|
self.model = Model(inputs=[ts_inputs, feature_inputs], outputs=outputs)
|
|
|
|
# Compile with Adam optimizer
|
|
self.model.compile(
|
|
optimizer=Adam(learning_rate=learning_rate),
|
|
loss=loss,
|
|
metrics=metrics
|
|
)
|
|
|
|
# Log model summary
|
|
self.model.summary(print_fn=lambda x: logger.info(x))
|
|
|
|
return self.model
|
|
|
|
def train(self, X_ts, X_features, y, batch_size=32, epochs=100, validation_split=0.2,
|
|
callbacks=None, class_weights=None):
|
|
"""
|
|
Train the Transformer model on the provided data.
|
|
|
|
Args:
|
|
X_ts (numpy.ndarray): Time series input features
|
|
X_features (numpy.ndarray): Additional input features
|
|
y (numpy.ndarray): Target labels
|
|
batch_size (int): Batch size
|
|
epochs (int): Number of epochs
|
|
validation_split (float): Fraction of data to use for validation
|
|
callbacks (list): List of Keras callbacks
|
|
class_weights (dict): Class weights for imbalanced datasets
|
|
|
|
Returns:
|
|
History object containing training metrics
|
|
"""
|
|
if self.model is None:
|
|
self.build_model()
|
|
|
|
# Default callbacks if none provided
|
|
if callbacks is None:
|
|
# Create a timestamp for model checkpoints
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
callbacks = [
|
|
EarlyStopping(
|
|
monitor='val_loss',
|
|
patience=10,
|
|
restore_best_weights=True
|
|
),
|
|
ReduceLROnPlateau(
|
|
monitor='val_loss',
|
|
factor=0.5,
|
|
patience=5,
|
|
min_lr=1e-6
|
|
),
|
|
ModelCheckpoint(
|
|
filepath=os.path.join(self.model_dir, f"transformer_model_{timestamp}.h5"),
|
|
monitor='val_loss',
|
|
save_best_only=True
|
|
)
|
|
]
|
|
|
|
# Check if y needs to be one-hot encoded for multi-class
|
|
if self.output_size == 3 and len(y.shape) == 1:
|
|
y = tf.keras.utils.to_categorical(y, num_classes=3)
|
|
|
|
# Train the model
|
|
logger.info(f"Training Transformer model with {len(X_ts)} samples, batch size {batch_size}, epochs {epochs}")
|
|
self.history = self.model.fit(
|
|
[X_ts, X_features], y,
|
|
batch_size=batch_size,
|
|
epochs=epochs,
|
|
validation_split=validation_split,
|
|
callbacks=callbacks,
|
|
class_weight=class_weights,
|
|
verbose=2
|
|
)
|
|
|
|
# Save the trained model
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
model_path = os.path.join(self.model_dir, f"transformer_model_final_{timestamp}.h5")
|
|
self.model.save(model_path)
|
|
logger.info(f"Model saved to {model_path}")
|
|
|
|
# Save training history
|
|
history_path = os.path.join(self.model_dir, f"transformer_model_history_{timestamp}.json")
|
|
with open(history_path, 'w') as f:
|
|
# Convert numpy values to Python native types for JSON serialization
|
|
history_dict = {key: [float(value) for value in values] for key, values in self.history.history.items()}
|
|
json.dump(history_dict, f, indent=2)
|
|
|
|
return self.history
|
|
|
|
def evaluate(self, X_ts, X_features, y):
|
|
"""
|
|
Evaluate the model on test data.
|
|
|
|
Args:
|
|
X_ts (numpy.ndarray): Time series input features
|
|
X_features (numpy.ndarray): Additional input features
|
|
y (numpy.ndarray): Target labels
|
|
|
|
Returns:
|
|
dict: Evaluation metrics
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model has not been built or trained yet")
|
|
|
|
# Convert y to one-hot encoding for multi-class
|
|
if self.output_size == 3 and len(y.shape) == 1:
|
|
y = tf.keras.utils.to_categorical(y, num_classes=3)
|
|
|
|
# Evaluate model
|
|
logger.info(f"Evaluating Transformer model on {len(X_ts)} samples")
|
|
eval_results = self.model.evaluate([X_ts, X_features], y, verbose=0)
|
|
|
|
metrics = {}
|
|
for metric, value in zip(self.model.metrics_names, eval_results):
|
|
metrics[metric] = value
|
|
logger.info(f"{metric}: {value:.4f}")
|
|
|
|
return metrics
|
|
|
|
def predict(self, X_ts, X_features=None):
|
|
"""
|
|
Make predictions on new data.
|
|
|
|
Args:
|
|
X_ts (numpy.ndarray): Time series input features
|
|
X_features (numpy.ndarray): Additional input features
|
|
|
|
Returns:
|
|
tuple: (y_pred, y_proba) where:
|
|
y_pred is the predicted class (0/1 for binary, 0/1/2 for multi-class)
|
|
y_proba is the class probability
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model has not been built or trained yet")
|
|
|
|
# Ensure X_ts has the right shape
|
|
if len(X_ts.shape) == 2:
|
|
# Single sample, add batch dimension
|
|
X_ts = np.expand_dims(X_ts, axis=0)
|
|
|
|
# Ensure X_features has the right shape
|
|
if X_features is None:
|
|
# Create dummy features with zeros
|
|
X_features = np.zeros((X_ts.shape[0], self.feature_input_shape))
|
|
elif len(X_features.shape) == 1:
|
|
# Single sample, add batch dimension
|
|
X_features = np.expand_dims(X_features, axis=0)
|
|
|
|
# Get predictions
|
|
y_proba = self.model.predict([X_ts, X_features])
|
|
|
|
# Process based on output type
|
|
if self.output_size == 1:
|
|
# Binary classification
|
|
y_pred = (y_proba > 0.5).astype(int).flatten()
|
|
return y_pred, y_proba.flatten()
|
|
elif self.output_size == 3:
|
|
# Multi-class classification
|
|
y_pred = np.argmax(y_proba, axis=1)
|
|
return y_pred, y_proba
|
|
else:
|
|
# Regression
|
|
return y_proba, y_proba
|
|
|
|
def save(self, filepath=None):
|
|
"""
|
|
Save the model to disk.
|
|
|
|
Args:
|
|
filepath (str): Path to save the model
|
|
|
|
Returns:
|
|
str: Path where the model was saved
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model has not been built yet")
|
|
|
|
if filepath is None:
|
|
# Create a default filepath with timestamp
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filepath = os.path.join(self.model_dir, f"transformer_model_{timestamp}.h5")
|
|
|
|
self.model.save(filepath)
|
|
logger.info(f"Model saved to {filepath}")
|
|
return filepath
|
|
|
|
def load(self, filepath):
|
|
"""
|
|
Load a saved model from disk.
|
|
|
|
Args:
|
|
filepath (str): Path to the saved model
|
|
|
|
Returns:
|
|
The loaded model
|
|
"""
|
|
# Register custom layers
|
|
custom_objects = {
|
|
'TransformerBlock': TransformerBlock,
|
|
'PositionalEncoding': PositionalEncoding
|
|
}
|
|
|
|
self.model = load_model(filepath, custom_objects=custom_objects)
|
|
logger.info(f"Model loaded from {filepath}")
|
|
return self.model
|
|
|
|
def plot_training_history(self):
|
|
"""
|
|
Plot training history (loss and metrics).
|
|
|
|
Returns:
|
|
str: Path to the saved plot
|
|
"""
|
|
if self.history is None:
|
|
raise ValueError("Model has not been trained yet")
|
|
|
|
plt.figure(figsize=(12, 5))
|
|
|
|
# Plot loss
|
|
plt.subplot(1, 2, 1)
|
|
plt.plot(self.history.history['loss'], label='Training Loss')
|
|
if 'val_loss' in self.history.history:
|
|
plt.plot(self.history.history['val_loss'], label='Validation Loss')
|
|
plt.title('Model Loss')
|
|
plt.xlabel('Epoch')
|
|
plt.ylabel('Loss')
|
|
plt.legend()
|
|
|
|
# Plot accuracy
|
|
plt.subplot(1, 2, 2)
|
|
|
|
if 'accuracy' in self.history.history:
|
|
plt.plot(self.history.history['accuracy'], label='Training Accuracy')
|
|
if 'val_accuracy' in self.history.history:
|
|
plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy')
|
|
plt.title('Model Accuracy')
|
|
plt.ylabel('Accuracy')
|
|
elif 'mae' in self.history.history:
|
|
plt.plot(self.history.history['mae'], label='Training MAE')
|
|
if 'val_mae' in self.history.history:
|
|
plt.plot(self.history.history['val_mae'], label='Validation MAE')
|
|
plt.title('Model MAE')
|
|
plt.ylabel('MAE')
|
|
|
|
plt.xlabel('Epoch')
|
|
plt.legend()
|
|
|
|
plt.tight_layout()
|
|
|
|
# Save figure
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
fig_path = os.path.join(self.model_dir, f"transformer_training_history_{timestamp}.png")
|
|
plt.savefig(fig_path)
|
|
plt.close()
|
|
|
|
logger.info(f"Training history plot saved to {fig_path}")
|
|
return fig_path
|
|
|
|
|
|
class MixtureOfExpertsModel:
|
|
"""
|
|
Mixture of Experts (MoE) model.
|
|
|
|
This model combines predictions from multiple expert models (such as CNN and Transformer)
|
|
using a weighted ensemble approach.
|
|
"""
|
|
|
|
def __init__(self, output_size=1, model_dir="NN/models/saved"):
|
|
"""
|
|
Initialize the MoE model.
|
|
|
|
Args:
|
|
output_size (int): Number of output classes (1 for binary, 3 for buy/hold/sell)
|
|
model_dir (str): Directory to save trained models
|
|
"""
|
|
self.output_size = output_size
|
|
self.model_dir = model_dir
|
|
self.model = None
|
|
self.history = None
|
|
self.experts = {}
|
|
|
|
# Create model directory if it doesn't exist
|
|
os.makedirs(self.model_dir, exist_ok=True)
|
|
|
|
logger.info(f"Initialized Mixture of Experts model with output size {output_size}")
|
|
|
|
def add_expert(self, name, model):
|
|
"""
|
|
Add an expert model to the MoE.
|
|
|
|
Args:
|
|
name (str): Name of the expert model
|
|
model: The expert model instance
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
self.experts[name] = model
|
|
logger.info(f"Added expert model '{name}' to MoE")
|
|
|
|
def build_model(self, ts_input_shape=(20, 5), expert_weights=None, learning_rate=0.001):
|
|
"""
|
|
Build the MoE model by combining expert models.
|
|
|
|
Args:
|
|
ts_input_shape (tuple): Shape of time series input data
|
|
expert_weights (dict): Weights for each expert model
|
|
learning_rate (float): Learning rate for Adam optimizer
|
|
|
|
Returns:
|
|
The compiled model
|
|
"""
|
|
# Time series input
|
|
ts_inputs = Input(shape=ts_input_shape, name="ts_input")
|
|
|
|
# Additional feature input (from CNN)
|
|
feature_inputs = Input(shape=(64,), name="feature_input") # Default size for features
|
|
|
|
# Process with each expert model
|
|
expert_outputs = []
|
|
expert_names = []
|
|
|
|
for name, expert in self.experts.items():
|
|
# Skip if expert model is not valid or doesn't have a call/predict method
|
|
if expert is None:
|
|
logger.warning(f"Expert model '{name}' is None, skipping")
|
|
continue
|
|
|
|
try:
|
|
# Different handling based on model type
|
|
if name == 'cnn':
|
|
# CNN model takes only time series input
|
|
expert_output = expert(ts_inputs)
|
|
expert_outputs.append(expert_output)
|
|
expert_names.append(name)
|
|
elif name == 'transformer':
|
|
# Transformer model takes both time series and feature inputs
|
|
expert_output = expert([ts_inputs, feature_inputs])
|
|
expert_outputs.append(expert_output)
|
|
expert_names.append(name)
|
|
else:
|
|
logger.warning(f"Unknown expert model type: {name}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding expert '{name}': {str(e)}")
|
|
|
|
if not expert_outputs:
|
|
logger.error("No valid expert models found")
|
|
return None
|
|
|
|
# Use expert weighting
|
|
if expert_weights is None:
|
|
# Equal weighting
|
|
weights = [1.0 / len(expert_outputs)] * len(expert_outputs)
|
|
else:
|
|
# User-provided weights
|
|
weights = [expert_weights.get(name, 1.0 / len(expert_outputs)) for name in expert_names]
|
|
# Normalize weights
|
|
weights = [w / sum(weights) for w in weights]
|
|
|
|
# Combine expert outputs using weighted average
|
|
if len(expert_outputs) == 1:
|
|
# Only one expert, use its output directly
|
|
combined_output = expert_outputs[0]
|
|
else:
|
|
# Multiple experts, compute weighted average
|
|
weighted_outputs = [output * weight for output, weight in zip(expert_outputs, weights)]
|
|
combined_output = Add()(weighted_outputs)
|
|
|
|
# Create the MoE model
|
|
moe_model = Model(inputs=[ts_inputs, feature_inputs], outputs=combined_output)
|
|
|
|
# Compile the model
|
|
if self.output_size == 1:
|
|
# Binary classification
|
|
moe_model.compile(
|
|
optimizer=Adam(learning_rate=learning_rate),
|
|
loss='binary_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
elif self.output_size == 3:
|
|
# Multi-class classification for BUY/HOLD/SELL
|
|
moe_model.compile(
|
|
optimizer=Adam(learning_rate=learning_rate),
|
|
loss='categorical_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
else:
|
|
# Regression
|
|
moe_model.compile(
|
|
optimizer=Adam(learning_rate=learning_rate),
|
|
loss='mse',
|
|
metrics=['mae']
|
|
)
|
|
|
|
self.model = moe_model
|
|
|
|
# Log model summary
|
|
self.model.summary(print_fn=lambda x: logger.info(x))
|
|
|
|
logger.info(f"Built MoE model with weights: {weights}")
|
|
return self.model
|
|
|
|
def train(self, X_ts, X_features, y, batch_size=32, epochs=100, validation_split=0.2,
|
|
callbacks=None, class_weights=None):
|
|
"""
|
|
Train the MoE model on the provided data.
|
|
|
|
Args:
|
|
X_ts (numpy.ndarray): Time series input features
|
|
X_features (numpy.ndarray): Additional input features
|
|
y (numpy.ndarray): Target labels
|
|
batch_size (int): Batch size
|
|
epochs (int): Number of epochs
|
|
validation_split (float): Fraction of data to use for validation
|
|
callbacks (list): List of Keras callbacks
|
|
class_weights (dict): Class weights for imbalanced datasets
|
|
|
|
Returns:
|
|
History object containing training metrics
|
|
"""
|
|
if self.model is None:
|
|
logger.error("MoE model has not been built yet")
|
|
return None
|
|
|
|
# Default callbacks if none provided
|
|
if callbacks is None:
|
|
# Create a timestamp for model checkpoints
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
callbacks = [
|
|
EarlyStopping(
|
|
monitor='val_loss',
|
|
patience=10,
|
|
restore_best_weights=True
|
|
),
|
|
ReduceLROnPlateau(
|
|
monitor='val_loss',
|
|
factor=0.5,
|
|
patience=5,
|
|
min_lr=1e-6
|
|
),
|
|
ModelCheckpoint(
|
|
filepath=os.path.join(self.model_dir, f"moe_model_{timestamp}.h5"),
|
|
monitor='val_loss',
|
|
save_best_only=True
|
|
)
|
|
]
|
|
|
|
# Check if y needs to be one-hot encoded for multi-class
|
|
if self.output_size == 3 and len(y.shape) == 1:
|
|
y = tf.keras.utils.to_categorical(y, num_classes=3)
|
|
|
|
# Train the model
|
|
logger.info(f"Training MoE model with {len(X_ts)} samples, batch size {batch_size}, epochs {epochs}")
|
|
self.history = self.model.fit(
|
|
[X_ts, X_features], y,
|
|
batch_size=batch_size,
|
|
epochs=epochs,
|
|
validation_split=validation_split,
|
|
callbacks=callbacks,
|
|
class_weight=class_weights,
|
|
verbose=2
|
|
)
|
|
|
|
# Save the trained model
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
model_path = os.path.join(self.model_dir, f"moe_model_final_{timestamp}.h5")
|
|
self.model.save(model_path)
|
|
logger.info(f"Model saved to {model_path}")
|
|
|
|
# Save training history
|
|
history_path = os.path.join(self.model_dir, f"moe_model_history_{timestamp}.json")
|
|
with open(history_path, 'w') as f:
|
|
# Convert numpy values to Python native types for JSON serialization
|
|
history_dict = {key: [float(value) for value in values] for key, values in self.history.history.items()}
|
|
json.dump(history_dict, f, indent=2)
|
|
|
|
return self.history
|
|
|
|
def predict(self, X_ts, X_features=None):
|
|
"""
|
|
Make predictions on new data.
|
|
|
|
Args:
|
|
X_ts (numpy.ndarray): Time series input features
|
|
X_features (numpy.ndarray): Additional input features
|
|
|
|
Returns:
|
|
tuple: (y_pred, y_proba) where:
|
|
y_pred is the predicted class (0/1 for binary, 0/1/2 for multi-class)
|
|
y_proba is the class probability
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model has not been built or trained yet")
|
|
|
|
# Ensure X_ts has the right shape
|
|
if len(X_ts.shape) == 2:
|
|
# Single sample, add batch dimension
|
|
X_ts = np.expand_dims(X_ts, axis=0)
|
|
|
|
# Ensure X_features has the right shape
|
|
if X_features is None:
|
|
# Create dummy features with zeros
|
|
X_features = np.zeros((X_ts.shape[0], 64)) # Default size
|
|
elif len(X_features.shape) == 1:
|
|
# Single sample, add batch dimension
|
|
X_features = np.expand_dims(X_features, axis=0)
|
|
|
|
# Get predictions
|
|
y_proba = self.model.predict([X_ts, X_features])
|
|
|
|
# Process based on output type
|
|
if self.output_size == 1:
|
|
# Binary classification
|
|
y_pred = (y_proba > 0.5).astype(int).flatten()
|
|
return y_pred, y_proba.flatten()
|
|
elif self.output_size == 3:
|
|
# Multi-class classification
|
|
y_pred = np.argmax(y_proba, axis=1)
|
|
return y_pred, y_proba
|
|
else:
|
|
# Regression
|
|
return y_proba, y_proba
|
|
|
|
def save(self, filepath=None):
|
|
"""
|
|
Save the model to disk.
|
|
|
|
Args:
|
|
filepath (str): Path to save the model
|
|
|
|
Returns:
|
|
str: Path where the model was saved
|
|
"""
|
|
if self.model is None:
|
|
raise ValueError("Model has not been built yet")
|
|
|
|
if filepath is None:
|
|
# Create a default filepath with timestamp
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filepath = os.path.join(self.model_dir, f"moe_model_{timestamp}.h5")
|
|
|
|
self.model.save(filepath)
|
|
logger.info(f"Model saved to {filepath}")
|
|
return filepath
|
|
|
|
def load(self, filepath):
|
|
"""
|
|
Load a saved model from disk.
|
|
|
|
Args:
|
|
filepath (str): Path to the saved model
|
|
|
|
Returns:
|
|
The loaded model
|
|
"""
|
|
# Register custom layers
|
|
custom_objects = {
|
|
'TransformerBlock': TransformerBlock,
|
|
'PositionalEncoding': PositionalEncoding
|
|
}
|
|
|
|
self.model = load_model(filepath, custom_objects=custom_objects)
|
|
logger.info(f"Model loaded from {filepath}")
|
|
return self.model
|
|
|
|
# Example usage:
|
|
if __name__ == "__main__":
|
|
# This would be a complete implementation in a real system
|
|
print("Transformer and MoE models defined, but not implemented here.") |