| import os |
| import logging |
| import numpy as np |
| import tensorflow as tf |
| import pyarrow.parquet as pq |
| import pandas as pd |
| from sklearn.preprocessing import MinMaxScaler |
| from tensorflow import keras |
| from typing import Tuple |
| from datetime import datetime, timezone, timedelta |
| import sys |
|
|
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))) |
| from components.model.data_utils import create_data_loader |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s %(levelname)s: %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S %Z' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| def create_sequences(data: np.ndarray, seq_length: int) -> Tuple[np.ndarray, np.ndarray]: |
| """Create sequences of data for LSTM model training and prediction. |
| |
| Args: |
| data (np.ndarray): Input time series data (scaled), shape (n_samples, n_features). |
| seq_length (int): Length of each sequence. |
| |
| Returns: |
| Tuple[np.ndarray, np.ndarray]: (X, y) where X is input sequences (n_samples, seq_length, n_features) |
| and y is target values (n_samples, n_features). |
| |
| Raises: |
| ValueError: If data is empty, seq_length is invalid, or data has insufficient length. |
| """ |
| if not isinstance(data, np.ndarray): |
| logger.error("Input data must be a numpy array") |
| raise ValueError("Input data must be a numpy array") |
| if data.size == 0: |
| logger.error("Input data is empty") |
| raise ValueError("Input data is empty") |
| if not isinstance(seq_length, int) or seq_length <= 0: |
| logger.error(f"Invalid seq_length: {seq_length}") |
| raise ValueError("seq_length must be a positive integer") |
| if len(data) <= seq_length: |
| logger.error(f"Data length {len(data)} is insufficient for seq_length {seq_length}") |
| raise ValueError(f"Data length {len(data)} is insufficient for seq_length {seq_length}") |
|
|
| X, y = [], [] |
| for i in range(len(data) - seq_length): |
| sequence = data[i:i + seq_length] |
| target = data[i + seq_length] |
| X.append(sequence) |
| y.append(target) |
|
|
| X = np.array(X) |
| y = np.array(y) |
|
|
| if len(X.shape) == 2: |
| X = X.reshape(X.shape[0], X.shape[1], 1) |
|
|
| logger.info(f"Created {X.shape[0]} sequences: X shape {X.shape}, y shape {y.shape}") |
| return X, y |
|
|
| def build_model_from_config(seq_length: int, cfg: dict) -> keras.Model: |
| """Build an LSTM-based model based on configuration. |
| |
| Args: |
| seq_length (int): Length of input sequences. |
| cfg (dict): Model configuration dictionary with 'model' key containing architecture, units, etc. |
| |
| Returns: |
| keras.Model: Compiled Keras model. |
| |
| Raises: |
| ValueError: If configuration is invalid or architecture is unsupported. |
| """ |
| if not isinstance(cfg, dict) or 'model' not in cfg: |
| logger.error("Invalid configuration: 'model' key missing") |
| raise ValueError("Configuration must be a dictionary with a 'model' key") |
|
|
| model_cfg = cfg['model'] |
| arch = model_cfg.get('architecture') |
| units = model_cfg.get('units') |
| layers = model_cfg.get('layers', 1) |
| dropout = model_cfg.get('dropout', 0.2) |
| activation = model_cfg.get('activation', 'tanh') |
| learning_rate = model_cfg.get('learning_rate', 0.001) |
|
|
| if not isinstance(units, int) or units <= 0: |
| logger.error(f"Invalid units: {units}") |
| raise ValueError("units must be a positive integer") |
| if not isinstance(layers, int) or layers <= 0: |
| logger.error(f"Invalid layers: {layers}") |
| raise ValueError("layers must be a positive integer") |
| if not isinstance(dropout, float) or not 0 <= dropout < 1: |
| logger.error(f"Invalid dropout: {dropout}") |
| raise ValueError("dropout must be a float between 0 and 1") |
| if arch not in ['lstm', 'bilstm', 'gru', 'custom']: |
| logger.error(f"Unsupported architecture: {arch}") |
| raise ValueError(f"Unsupported architecture: {arch}") |
| if not isinstance(seq_length, int) or seq_length <= 0: |
| logger.error(f"Invalid seq_length: {seq_length}") |
| raise ValueError("seq_length must be a positive integer") |
| if not isinstance(learning_rate, (int, float)) or learning_rate <= 0: |
| logger.error(f"Invalid learning_rate: {learning_rate}") |
| raise ValueError("learning_rate must be a positive number") |
|
|
| inputs = keras.layers.Input(shape=(seq_length, 1)) |
| x = inputs |
|
|
| if arch == 'lstm': |
| |
| x = keras.layers.LSTM( |
| units, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| attention = keras.layers.Attention()([x, x]) |
| x = keras.layers.Add()([x, attention]) |
| |
| |
| x = keras.layers.LSTM( |
| units // 2, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| x = keras.layers.LSTM( |
| units // 4, return_sequences=False, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1 |
| )(x) |
| |
| |
| x = keras.layers.Dense(50, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x) |
| x = keras.layers.Dropout(dropout)(x) |
| x = keras.layers.Dense(25, activation='relu')(x) |
| x = keras.layers.Dropout(dropout)(x) |
| |
| |
| x = keras.layers.Dense(1)(x) |
| elif arch == 'gru': |
| |
| x = keras.layers.GRU( |
| units, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| attention = keras.layers.Attention()([x, x]) |
| x = keras.layers.Add()([x, attention]) |
| |
| |
| x = keras.layers.GRU( |
| units // 2, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| x = keras.layers.GRU( |
| units // 4, return_sequences=False, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1 |
| )(x) |
| |
| |
| x = keras.layers.Dense(50, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x) |
| x = keras.layers.Dropout(dropout)(x) |
| x = keras.layers.Dense(25, activation='relu')(x) |
| x = keras.layers.Dropout(dropout)(x) |
| |
| |
| x = keras.layers.Dense(1)(x) |
| elif arch == 'bilstm': |
| |
| x = keras.layers.Bidirectional( |
| keras.layers.LSTM( |
| units, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| ) |
| )(x) |
| |
| |
| attention = keras.layers.Attention()([x, x]) |
| x = keras.layers.Add()([x, attention]) |
| |
| |
| x = keras.layers.Bidirectional( |
| keras.layers.LSTM( |
| units // 2, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| ) |
| )(x) |
| |
| |
| x = keras.layers.Bidirectional( |
| keras.layers.LSTM( |
| units // 4, return_sequences=False, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1 |
| ) |
| )(x) |
| |
| |
| x = keras.layers.Dense(50, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x) |
| x = keras.layers.Dropout(dropout)(x) |
| x = keras.layers.Dense(25, activation='relu')(x) |
| x = keras.layers.Dropout(dropout)(x) |
| |
| |
| x = keras.layers.Dense(1)(x) |
| elif arch == 'custom': |
| |
| x = keras.layers.LSTM( |
| units, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| attention = keras.layers.Attention()([x, x]) |
| x = keras.layers.Add()([x, attention]) |
| |
| |
| x = keras.layers.LSTM( |
| units // 2, return_sequences=True, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1, kernel_regularizer=tf.keras.regularizers.l2(0.01) |
| )(x) |
| |
| |
| x = keras.layers.LSTM( |
| units // 4, return_sequences=False, activation=activation, |
| dropout=dropout, recurrent_dropout=0.1 |
| )(x) |
| |
| |
| x = keras.layers.Dense(50, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x) |
| x = keras.layers.Dropout(dropout)(x) |
| x = keras.layers.Dense(25, activation='relu')(x) |
| x = keras.layers.Dropout(dropout)(x) |
| |
| |
| x = keras.layers.Dense(1)(x) |
|
|
| model = keras.Model(inputs, x) |
|
|
| optimizer_name = model_cfg.get('optimizer', 'adam').lower() |
| if optimizer_name == 'adam': |
| optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) |
| else: |
| logger.warning(f"Optimizer {optimizer_name} not explicitly handled, using default parameters") |
| optimizer = optimizer_name |
|
|
| model.compile( |
| optimizer=optimizer, |
| loss=model_cfg.get('loss', 'mse'), |
| metrics=['mae'] |
| ) |
| |
| logger.info(f"Built model: architecture={arch}, units={units}, layers={layers}, learning_rate={learning_rate}") |
| return model |
|
|
| if __name__ == "__main__": |
| import pandas as pd |
| from components.utils.file_utils import load_config |
|
|
| logger.info("Running standalone tests for model_utils.py") |
| |
| data = np.array([[10000], [10050], [10100], [10150], [10200]]) |
| seq_length = 3 |
| X, y = create_sequences(data, seq_length) |
| print(f"create_sequences: X shape {X.shape}, y shape {y.shape}") |
| print(f"Sample sequence: {X[0]}, target: {y[0]}") |
|
|
| |
| scaler = MinMaxScaler() |
| scaler.fit(data) |
| parquet_paths = ['temp/extracted_from_minio/btcusdt_1h.parquet'] |
| if not os.path.exists(parquet_paths[0]): |
| os.makedirs(os.path.dirname(parquet_paths[0]), exist_ok=True) |
| pd.DataFrame({'Close': [10000, 10050, 10100, 10150, 10200]}).to_parquet(parquet_paths[0]) |
| |
| dataset = create_data_loader(parquet_paths, scaler, seq_length=3, batch_size=2) |
| for x, y in dataset.take(1): |
| print(f"create_data_loader: x shape {x.shape}, y shape {y.shape}") |
|
|
| |
| config = load_config('configs/model_config.yml') |
| for arch in ['lstm', 'gru', 'bilstm', 'custom']: |
| config['model']['architecture'] = arch |
| model = build_model_from_config(seq_length=3, cfg=config) |
| print(f"\nModel summary for {arch}:") |
| model.summary() |
| logger.info("Standalone tests completed successfully.") |