""" Convolutional Neural Network for timeseries analysis This module implements a deep CNN model for cryptocurrency price analysis. The model uses multiple parallel convolutional pathways and LSTM layers to detect patterns at different time scales. """ import os import logging import numpy as np import matplotlib.pyplot as plt import tensorflow as tf from tensorflow.keras.models import Model, load_model from tensorflow.keras.layers import ( Input, Conv1D, MaxPooling1D, Dense, Dropout, BatchNormalization, LSTM, Bidirectional, Flatten, Concatenate, GlobalAveragePooling1D, LeakyReLU, Attention ) from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau from tensorflow.keras.metrics import AUC from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc import datetime import json logger = logging.getLogger(__name__) class CNNModel: """ Convolutional Neural Network for time series analysis. This model uses a multi-pathway architecture with different filter sizes to detect patterns at different time scales, combined with LSTM layers for temporal dependencies. """ def __init__(self, input_shape=(20, 5), output_size=1, model_dir="NN/models/saved"): """ Initialize the CNN model. Args: input_shape (tuple): Shape of input data (sequence_length, features) output_size (int): Number of output classes (1 for binary, 3 for buy/hold/sell) model_dir (str): Directory to save trained models """ self.input_shape = input_shape self.output_size = output_size self.model_dir = model_dir self.model = None self.history = None # Create model directory if it doesn't exist os.makedirs(self.model_dir, exist_ok=True) logger.info(f"Initialized CNN model with input shape {input_shape} and output size {output_size}") def build_model(self, filters=(32, 64, 128), kernel_sizes=(3, 5, 7), dropout_rate=0.3, learning_rate=0.001): """ Build the CNN model architecture. Args: filters (tuple): Number of filters for each convolutional pathway kernel_sizes (tuple): Kernel sizes for each convolutional pathway dropout_rate (float): Dropout rate for regularization learning_rate (float): Learning rate for Adam optimizer Returns: The compiled model """ # Input layer inputs = Input(shape=self.input_shape) # Multiple parallel convolutional pathways with different kernel sizes # to capture patterns at different time scales conv_layers = [] for i, (filter_size, kernel_size) in enumerate(zip(filters, kernel_sizes)): conv_path = Conv1D( filters=filter_size, kernel_size=kernel_size, padding='same', name=f'conv1d_{i+1}' )(inputs) conv_path = BatchNormalization()(conv_path) conv_path = LeakyReLU(alpha=0.1)(conv_path) conv_path = MaxPooling1D(pool_size=2, padding='same')(conv_path) conv_path = Dropout(dropout_rate)(conv_path) conv_layers.append(conv_path) # Merge convolutional pathways if len(conv_layers) > 1: merged = Concatenate()(conv_layers) else: merged = conv_layers[0] # Add another Conv1D layer after merging x = Conv1D(filters=filters[-1], kernel_size=3, padding='same')(merged) x = BatchNormalization()(x) x = LeakyReLU(alpha=0.1)(x) x = MaxPooling1D(pool_size=2, padding='same')(x) x = Dropout(dropout_rate)(x) # Bidirectional LSTM for temporal dependencies x = Bidirectional(LSTM(128, return_sequences=True))(x) x = Dropout(dropout_rate)(x) # Attention mechanism to focus on important time steps x = Bidirectional(LSTM(64, return_sequences=True))(x) # Global average pooling to reduce parameters x = GlobalAveragePooling1D()(x) x = Dropout(dropout_rate)(x) # Dense layers for final classification/regression x = Dense(64, activation='relu')(x) x = BatchNormalization()(x) x = Dropout(dropout_rate)(x) # Output layer if self.output_size == 1: # Binary classification (up/down) outputs = Dense(1, activation='sigmoid', name='output')(x) loss = 'binary_crossentropy' metrics = ['accuracy', AUC()] elif self.output_size == 3: # Multi-class classification (buy/hold/sell) outputs = Dense(3, activation='softmax', name='output')(x) loss = 'categorical_crossentropy' metrics = ['accuracy'] else: # Regression outputs = Dense(self.output_size, activation='linear', name='output')(x) loss = 'mse' metrics = ['mae'] # Create and compile model self.model = Model(inputs=inputs, outputs=outputs) # Compile with Adam optimizer self.model.compile( optimizer=Adam(learning_rate=learning_rate), loss=loss, metrics=metrics ) # Log model summary self.model.summary(print_fn=lambda x: logger.info(x)) return self.model def train(self, X_train, y_train, batch_size=32, epochs=100, validation_split=0.2, callbacks=None, class_weights=None): """ Train the CNN model on the provided data. Args: X_train (numpy.ndarray): Training features y_train (numpy.ndarray): Training targets batch_size (int): Batch size epochs (int): Number of epochs validation_split (float): Fraction of data to use for validation callbacks (list): List of Keras callbacks class_weights (dict): Class weights for imbalanced datasets Returns: History object containing training metrics """ if self.model is None: self.build_model() # Default callbacks if none provided if callbacks is None: # Create a timestamp for model checkpoints timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") callbacks = [ EarlyStopping( monitor='val_loss', patience=10, restore_best_weights=True ), ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6 ), ModelCheckpoint( filepath=os.path.join(self.model_dir, f"cnn_model_{timestamp}.h5"), monitor='val_loss', save_best_only=True ) ] # Check if y_train needs to be one-hot encoded for multi-class if self.output_size == 3 and len(y_train.shape) == 1: y_train = tf.keras.utils.to_categorical(y_train, num_classes=3) # Train the model logger.info(f"Training CNN model with {len(X_train)} samples, batch size {batch_size}, epochs {epochs}") self.history = self.model.fit( X_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=validation_split, callbacks=callbacks, class_weight=class_weights, verbose=2 ) # Save the trained model timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") model_path = os.path.join(self.model_dir, f"cnn_model_final_{timestamp}.h5") self.model.save(model_path) logger.info(f"Model saved to {model_path}") # Save training history history_path = os.path.join(self.model_dir, f"cnn_model_history_{timestamp}.json") with open(history_path, 'w') as f: # Convert numpy values to Python native types for JSON serialization history_dict = {key: [float(value) for value in values] for key, values in self.history.history.items()} json.dump(history_dict, f, indent=2) return self.history def evaluate(self, X_test, y_test, plot_results=False): """ Evaluate the model on test data. Args: X_test (numpy.ndarray): Test features y_test (numpy.ndarray): Test targets plot_results (bool): Whether to plot evaluation results Returns: dict: Evaluation metrics """ if self.model is None: raise ValueError("Model has not been built or trained yet") # Convert y_test to one-hot encoding for multi-class y_test_original = y_test.copy() if self.output_size == 3 and len(y_test.shape) == 1: y_test = tf.keras.utils.to_categorical(y_test, num_classes=3) # Evaluate model logger.info(f"Evaluating CNN model on {len(X_test)} samples") eval_results = self.model.evaluate(X_test, y_test, verbose=0) metrics = {} for metric, value in zip(self.model.metrics_names, eval_results): metrics[metric] = value logger.info(f"{metric}: {value:.4f}") # Get predictions y_pred_prob = self.model.predict(X_test) # Different processing based on output type if self.output_size == 1: # Binary classification y_pred = (y_pred_prob > 0.5).astype(int).flatten() # Classification report report = classification_report(y_test, y_pred) logger.info(f"Classification Report:\n{report}") # Confusion matrix cm = confusion_matrix(y_test, y_pred) logger.info(f"Confusion Matrix:\n{cm}") # ROC curve and AUC fpr, tpr, _ = roc_curve(y_test, y_pred_prob) roc_auc = auc(fpr, tpr) metrics['auc'] = roc_auc if plot_results: self._plot_binary_results(y_test, y_pred, y_pred_prob, fpr, tpr, roc_auc) elif self.output_size == 3: # Multi-class classification y_pred = np.argmax(y_pred_prob, axis=1) # Classification report report = classification_report(y_test_original, y_pred) logger.info(f"Classification Report:\n{report}") # Confusion matrix cm = confusion_matrix(y_test_original, y_pred) logger.info(f"Confusion Matrix:\n{cm}") if plot_results: self._plot_multiclass_results(y_test_original, y_pred, y_pred_prob) return metrics def predict(self, X): """ Make predictions on new data. Args: X (numpy.ndarray): Input features Returns: tuple: (y_pred, y_proba) where: y_pred is the predicted class (0/1 for binary, 0/1/2 for multi-class) y_proba is the class probability """ if self.model is None: raise ValueError("Model has not been built or trained yet") # Ensure X has the right shape if len(X.shape) == 2: # Single sample, add batch dimension X = np.expand_dims(X, axis=0) # Get predictions y_proba = self.model.predict(X) # Process based on output type if self.output_size == 1: # Binary classification y_pred = (y_proba > 0.5).astype(int).flatten() return y_pred, y_proba.flatten() elif self.output_size == 3: # Multi-class classification y_pred = np.argmax(y_proba, axis=1) return y_pred, y_proba else: # Regression return y_proba, y_proba def save(self, filepath=None): """ Save the model to disk. Args: filepath (str): Path to save the model Returns: str: Path where the model was saved """ if self.model is None: raise ValueError("Model has not been built yet") if filepath is None: # Create a default filepath with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filepath = os.path.join(self.model_dir, f"cnn_model_{timestamp}.h5") self.model.save(filepath) logger.info(f"Model saved to {filepath}") return filepath def load(self, filepath): """ Load a saved model from disk. Args: filepath (str): Path to the saved model Returns: The loaded model """ self.model = load_model(filepath) logger.info(f"Model loaded from {filepath}") return self.model def extract_hidden_features(self, X): """ Extract features from the last hidden layer of the CNN for transfer learning. Args: X (numpy.ndarray): Input data Returns: numpy.ndarray: Extracted features """ if self.model is None: raise ValueError("Model has not been built or trained yet") # Create a new model that outputs the features from the layer before the output feature_layer_name = self.model.layers[-2].name feature_extractor = Model( inputs=self.model.input, outputs=self.model.get_layer(feature_layer_name).output ) # Extract features features = feature_extractor.predict(X) return features def _plot_binary_results(self, y_true, y_pred, y_proba, fpr, tpr, roc_auc): """ Plot evaluation results for binary classification. Args: y_true (numpy.ndarray): True labels y_pred (numpy.ndarray): Predicted labels y_proba (numpy.ndarray): Prediction probabilities fpr (numpy.ndarray): False positive rates for ROC curve tpr (numpy.ndarray): True positive rates for ROC curve roc_auc (float): Area under ROC curve """ plt.figure(figsize=(15, 5)) # Confusion Matrix plt.subplot(1, 3, 1) cm = confusion_matrix(y_true, y_pred) plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) plt.title('Confusion Matrix') plt.colorbar() tick_marks = [0, 1] plt.xticks(tick_marks, ['0', '1']) plt.yticks(tick_marks, ['0', '1']) plt.xlabel('Predicted Label') plt.ylabel('True Label') # Add text annotations to confusion matrix thresh = cm.max() / 2. for i in range(cm.shape[0]): for j in range(cm.shape[1]): plt.text(j, i, format(cm[i, j], 'd'), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black") # Histogram of prediction probabilities plt.subplot(1, 3, 2) plt.hist(y_proba[y_true == 0], alpha=0.5, label='Class 0') plt.hist(y_proba[y_true == 1], alpha=0.5, label='Class 1') plt.title('Prediction Probabilities') plt.xlabel('Probability of Class 1') plt.ylabel('Count') plt.legend() # ROC Curve plt.subplot(1, 3, 3) plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc:.3f})') plt.plot([0, 1], [0, 1], 'k--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver Operating Characteristic') plt.legend(loc="lower right") plt.tight_layout() # Save figure timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fig_path = os.path.join(self.model_dir, f"cnn_evaluation_{timestamp}.png") plt.savefig(fig_path) plt.close() logger.info(f"Evaluation plots saved to {fig_path}") def _plot_multiclass_results(self, y_true, y_pred, y_proba): """ Plot evaluation results for multi-class classification. Args: y_true (numpy.ndarray): True labels y_pred (numpy.ndarray): Predicted labels y_proba (numpy.ndarray): Prediction probabilities """ plt.figure(figsize=(12, 5)) # Confusion Matrix plt.subplot(1, 2, 1) cm = confusion_matrix(y_true, y_pred) plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) plt.title('Confusion Matrix') plt.colorbar() classes = ['BUY', 'HOLD', 'SELL'] # Assumes classes are 0, 1, 2 tick_marks = np.arange(len(classes)) plt.xticks(tick_marks, classes) plt.yticks(tick_marks, classes) plt.xlabel('Predicted Label') plt.ylabel('True Label') # Add text annotations to confusion matrix thresh = cm.max() / 2. for i in range(cm.shape[0]): for j in range(cm.shape[1]): plt.text(j, i, format(cm[i, j], 'd'), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black") # Class probability distributions plt.subplot(1, 2, 2) for i, cls in enumerate(classes): plt.hist(y_proba[y_true == i, i], alpha=0.5, label=f'Class {cls}') plt.title('Class Probability Distributions') plt.xlabel('Probability') plt.ylabel('Count') plt.legend() plt.tight_layout() # Save figure timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fig_path = os.path.join(self.model_dir, f"cnn_multiclass_evaluation_{timestamp}.png") plt.savefig(fig_path) plt.close() logger.info(f"Multiclass evaluation plots saved to {fig_path}") def plot_training_history(self): """ Plot training history (loss and metrics). Returns: str: Path to the saved plot """ if self.history is None: raise ValueError("Model has not been trained yet") plt.figure(figsize=(12, 5)) # Plot loss plt.subplot(1, 2, 1) plt.plot(self.history.history['loss'], label='Training Loss') if 'val_loss' in self.history.history: plt.plot(self.history.history['val_loss'], label='Validation Loss') plt.title('Model Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() # Plot accuracy plt.subplot(1, 2, 2) if 'accuracy' in self.history.history: plt.plot(self.history.history['accuracy'], label='Training Accuracy') if 'val_accuracy' in self.history.history: plt.plot(self.history.history['val_accuracy'], label='Validation Accuracy') plt.title('Model Accuracy') plt.ylabel('Accuracy') elif 'mae' in self.history.history: plt.plot(self.history.history['mae'], label='Training MAE') if 'val_mae' in self.history.history: plt.plot(self.history.history['val_mae'], label='Validation MAE') plt.title('Model MAE') plt.ylabel('MAE') plt.xlabel('Epoch') plt.legend() plt.tight_layout() # Save figure timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fig_path = os.path.join(self.model_dir, f"cnn_training_history_{timestamp}.png") plt.savefig(fig_path) plt.close() logger.info(f"Training history plot saved to {fig_path}") return fig_path