# %% import os import joblib import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, confusion_matrix, classification_report # Paso 1: Cargar la base de datos desde la carpeta "Data select" moods = os.listdir("Data select") list_of_dfs = [] for mood in moods: if "Normal" in mood: continue # saltar normales files = os.listdir(f"Data select/{mood}") for fn in files: path = os.path.join("Data select", mood, fn) df = pd.read_excel(path, sheet_name="Complete Data Set") # Validar que la primera fila no sea una fila de nombres duplicada if not df.columns[0].startswith("Time"): df.columns = df.iloc[0] df = df.drop(index=0) df["Label"] = mood df['TEI-TEO'] = df['TEI'].astype(float) - df['TEO'].astype(float) df['TCO-TCI'] = df['TCO'].astype(float) - df['TCI'].astype(float) list_of_dfs.append(df) # Paso 2: Concatenar todos los datos full_df = pd.concat(list_of_dfs, ignore_index=True) # Rellenar valores nulos usando 'forward fill' full_df.fillna(method='ffill', inplace=True) # Rellenar cualquier valor nulo restante al principio usando 'backward fill' full_df.fillna(method='bfill', inplace=True) # Paso 3: Definición de variables de entrada (features) y salida (labels) features = [ 'FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', 'TO_feed', 'PO_net', 'PO_feed', 'TRC_sub', 'TEI-TEO', 'TCO-TCI' ] fallas_dict = { "Condenser fouling": 0, "Defective Pilot Valve": 1, "Excess oil": 2, "Non-condensables in refrigerant": 3, "Reduced condenser water flow": 4, "Reduced evaporator water flow": 5, "Refrigerant leak": 6, "Refrigerant overcharge": 7 } num_classes = len(fallas_dict) # Asignar el número entero a cada etiqueta de falla full_df["Clase"] = full_df["Label"].map(fallas_dict) # Entradas (X) y etiquetas numéricas (y) X = full_df[features].values.astype(float) y = full_df["Clase"].values.astype(int) # One-Hot Encoding personalizado: la clase correcta es 1, las demás son -1 def custom_ohe(y_labels, n_classes): ohe = -1 * np.ones((len(y_labels), n_classes), dtype=int) ohe[np.arange(len(y_labels)), y_labels] = 1 return ohe y_ohe = custom_ohe(y, num_classes) # Paso 4: División de los datos (60% train, 20% validation, 20% test) X_trainval, X_test, y_trainval, y_test_ohe = train_test_split(X, y_ohe, test_size=0.2, random_state=42, stratify=y) X_train, X_val, y_train_ohe, y_val_ohe = train_test_split(X_trainval, y_trainval, test_size=0.25, random_state=42, stratify=np.argmax(y_trainval, axis=1)) # Paso 5: Estandarización de los datos mu = X_train.mean(axis=0) std = X_train.std(axis=0) # Evitar división por cero si una característica es constante std[std == 0] = 1 X_train_p = (X_train - mu) / std X_val_p = (X_val - mu) / std X_test_p = (X_test - mu) / std class NNSVM_Multiclass: """ Implementación de una Red Neuronal seguida de una SVM multiclase. Esta versión está refactorizada para usar matrices 2D (batch_size, features), lo que simplifica enormemente el código y sigue las convenciones estándar. """ def __init__(self, input_dim, n, d, n_classes, AF="tanh", seed=0): self.AF = AF np.random.seed(seed) # Inicialización de pesos (Xavier/Glorot) # Capa 1 (Entrada -> Oculta) limit_W0 = np.sqrt(6 / (input_dim + n)) self.W0 = np.random.uniform(-limit_W0, limit_W0, size=(input_dim, n)) self.b0 = np.zeros((1, n)) # (1, n) para broadcasting # Capa 2 (Oculta -> Salida de la Red) limit_W1 = np.sqrt(6 / (n + d)) self.W1 = np.random.uniform(-limit_W1, limit_W1, size=(n, d)) self.b1 = np.zeros((1, d)) # (1, d) para broadcasting # Capa 3 (Parámetros de la SVM) self.theta = np.zeros((d, n_classes)) self.theta_0 = np.zeros((1, n_classes)) # (1, n_classes) para broadcasting def activation(self, z): if self.AF == "tanh": return np.tanh(z) # Se pueden añadir otras funciones de activación si es necesario def activation_derivative(self, a): # Derivada en función de la salida de la activación 'a' if self.AF == "tanh": return 1 - a**2 def forward(self, x): # Flujo de datos a través de la red (propagación hacia adelante) self.x = x self.z0 = np.matmul(self.x, self.W0) + self.b0 self.a0 = self.activation(self.z0) self.z1 = np.matmul(self.a0, self.W1) + self.b1 self.phi = self.activation(self.z1) # Esta es la salida de la NN (features para la SVM) # Salida final del modelo (puntuaciones de la SVM) self.z_prime = np.matmul(self.phi, self.theta) + self.theta_0 return self.z_prime def predict(self, x): scores = self.forward(x) # La predicción es la clase con la puntuación más alta return np.argmax(scores, axis=1) def fit(self, X, y_ohe, X_val, y_val_ohe, epochs=100, lr=1e-3, Lambda=1e-3, beta1=0.9, beta2=0.999, eps=1e-8, batch_size=128): n_samples = X.shape[0] # Inicialización del optimizador Adam para cada parámetro m_W0, v_W0 = np.zeros_like(self.W0), np.zeros_like(self.W0) m_b0, v_b0 = np.zeros_like(self.b0), np.zeros_like(self.b0) m_W1, v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1) m_b1, v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1) k = 0 # Contador de iteraciones de Adam print(f"Iniciando entrenamiento por {epochs} épocas...") for epoch in range(epochs): # Mezclar los datos en cada época permutation = np.random.permutation(n_samples) X_shuffled = X[permutation] y_shuffled = y_ohe[permutation] for i in range(0, n_samples, batch_size): # Tomar un lote (batch) de datos x_batch = X_shuffled[i:i+batch_size] y_batch = y_shuffled[i:i+batch_size] batch_n = x_batch.shape[0] # 1. Forward pass (Cálculo de la salida) z_prime = self.forward(x_batch) # 2. Cálculo de la pérdida y su gradiente (Hinge Loss Multiclase) true_class_scores = np.sum(z_prime * y_batch, axis=1, keepdims=True) margins = np.maximum(0, 1 + z_prime - true_class_scores) margins[y_batch == 1] = 0 # Gradiente de la pérdida respecto a z_prime loss_grad = (margins > 0).astype(float) correct_class_indices = np.argmax(y_batch, axis=1) loss_grad[np.arange(batch_n), correct_class_indices] = -np.sum(loss_grad, axis=1) # 3. Backward pass (Cálculo de gradientes para cada capa) # Gradientes de la capa SVM dtheta = np.matmul(self.phi.T, loss_grad) dtheta_0 = np.sum(loss_grad, axis=0, keepdims=True) # Propagar gradiente hacia atrás dphi = np.matmul(loss_grad, self.theta.T) dz1 = dphi * self.activation_derivative(self.phi) dW1 = np.matmul(self.a0.T, dz1) db1 = np.sum(dz1, axis=0, keepdims=True) da0 = np.matmul(dz1, self.W1.T) dz0 = da0 * self.activation_derivative(self.a0) dW0 = np.matmul(self.x.T, dz0) db0 = np.sum(dz0, axis=0, keepdims=True) # 4. Actualización de parámetros # **CORRECCIÓN CLAVE**: Dividimos los gradientes por el tamaño del lote (batch_n) # para obtener el promedio, en lugar de la suma. # El error original estaba aquí, usando la suma implícita. # Actualización de la capa SVM (con regularización L2) self.theta -= lr * (dtheta / batch_n + Lambda * self.theta) self.theta_0 -= lr * (dtheta_0 / batch_n) # Actualización de la Red Neuronal (con optimizador Adam) k += 1 # --- Actualización de W1, b1 --- m_W1 = beta1 * m_W1 + (1 - beta1) * (dW1 / batch_n) v_W1 = beta2 * v_W1 + (1 - beta2) * ((dW1 / batch_n)**2) m_W1_hat = m_W1 / (1 - beta1**k) v_W1_hat = v_W1 / (1 - beta2**k) self.W1 -= lr * m_W1_hat / (np.sqrt(v_W1_hat) + eps) m_b1 = beta1 * m_b1 + (1 - beta1) * (db1 / batch_n) v_b1 = beta2 * v_b1 + (1 - beta2) * ((db1 / batch_n)**2) m_b1_hat = m_b1 / (1 - beta1**k) v_b1_hat = v_b1 / (1 - beta2**k) self.b1 -= lr * m_b1_hat / (np.sqrt(v_b1_hat) + eps) # --- Actualización de W0, b0 --- m_W0 = beta1 * m_W0 + (1 - beta1) * (dW0 / batch_n) v_W0 = beta2 * v_W0 + (1 - beta2) * ((dW0 / batch_n)**2) m_W0_hat = m_W0 / (1 - beta1**k) v_W0_hat = v_W0 / (1 - beta2**k) self.W0 -= lr * m_W0_hat / (np.sqrt(v_W0_hat) + eps) m_b0 = beta1 * m_b0 + (1 - beta1) * (db0 / batch_n) v_b0 = beta2 * v_b0 + (1 - beta2) * ((db0 / batch_n)**2) m_b0_hat = m_b0 / (1 - beta1**k) v_b0_hat = v_b0 / (1 - beta2**k) self.b0 -= lr * m_b0_hat / (np.sqrt(v_b0_hat) + eps) # --- Evaluación al final de cada época --- # Precisión en el conjunto de validación para monitorear el sobreajuste val_preds = self.predict(X_val_p) val_labels = np.argmax(y_val_ohe, axis=1) val_acc = accuracy_score(val_labels, val_preds) print(f"Época: {epoch + 1}/{epochs} - Accuracy de Validación: {val_acc:.4f}") # Paso 9: Crear el modelo con la arquitectura de la tesis input_dim = X_train_p.shape[1] # 13 entradas hidden_neurons = 500 # una sola capa oculta de 500 neuronas output_dim = 500 # una sola salida (SVM) model = NNSVM_Multiclass(input_dim, hidden_neurons, output_dim, 8, AF="tanh", seed=42) # %% print(X_train_p.shape) # %% # Paso 10: Entrenar el modelo con los parámetros de la tesis model.fit(X_train_p, y_train_ohe, X_val_p, y_val_ohe, epochs=15, # Aumentar épocas puede ayudar lr=1e-4, # Una tasa de aprendizaje más baja suele ser más estable Lambda=1e-3, batch_size=64) # Un batch size más pequeño a veces ayuda a generalizar mejor # %% y_pred = model.predict(X_test_p) y_test_labels = np.argmax(y_test_ohe, axis=1) # Métricas de rendimiento acc = accuracy_score(y_test_labels, y_pred) print(f"\n✅ Accuracy final en test: {acc:.4f}") print("\n📊 Reporte de clasificación:") print(classification_report(y_test_labels, y_pred, target_names=fallas_dict.keys())) # %% # Paso 11: Matriz de confusión cm = confusion_matrix(y_test_labels, y_pred) labels = list(fallas_dict.keys()) plt.figure(figsize=(12, 10)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels) plt.title("Matriz de Confusión - Resultados en Test", fontsize=16) plt.xlabel("Etiqueta Predicha", fontsize=12) plt.ylabel("Etiqueta Real", fontsize=12) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) plt.tight_layout() plt.show() # %% # Paso 12 (Opcional): Guardar el modelo multiclase entrenado y el diccionario de fallas print("Guardando el modelo multiclase y el diccionario de fallas...") # Guardar el modelo multiclase joblib.dump(model, 'modelo_clasificacion_multiclase.pkl') print("Modelo de clasificación multiclase guardado como 'modelo_clasificacion_multiclase.pkl'") # Guardar el diccionario de mapeo de fallas # Esto es útil para convertir las predicciones numéricas del modelo de nuevo a nombres de fallas joblib.dump(fallas_dict, 'diccionario_fallas.pkl') print("Diccionario de fallas guardado como 'diccionario_fallas.pkl'") # %%