Spaces:
Build error
Build error
| # %% | |
| import os | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, confusion_matrix, classification_report | |
| # Paso 1: Cargar la base de datos desde la carpeta "Data select" | |
| moods = os.listdir("Data select") | |
| list_of_dfs = [] | |
| for mood in moods: | |
| if "Normal" in mood: | |
| continue # saltar normales | |
| files = os.listdir(f"Data select/{mood}") | |
| for fn in files: | |
| path = os.path.join("Data select", mood, fn) | |
| df = pd.read_excel(path, sheet_name="Complete Data Set") | |
| # Validar que la primera fila no sea una fila de nombres duplicada | |
| if not df.columns[0].startswith("Time"): | |
| df.columns = df.iloc[0] | |
| df = df.drop(index=0) | |
| df["Label"] = mood | |
| df['TEI-TEO'] = df['TEI'].astype(float) - df['TEO'].astype(float) | |
| df['TCO-TCI'] = df['TCO'].astype(float) - df['TCI'].astype(float) | |
| list_of_dfs.append(df) | |
| # Paso 2: Concatenar todos los datos | |
| full_df = pd.concat(list_of_dfs, ignore_index=True) | |
| # Rellenar valores nulos usando 'forward fill' | |
| full_df.fillna(method='ffill', inplace=True) | |
| # Rellenar cualquier valor nulo restante al principio usando 'backward fill' | |
| full_df.fillna(method='bfill', inplace=True) | |
| # Paso 3: Definición de variables de entrada (features) y salida (labels) | |
| features = [ | |
| 'FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', 'TO_feed', | |
| 'PO_net', 'PO_feed', 'TRC_sub', 'TEI-TEO', 'TCO-TCI' | |
| ] | |
| fallas_dict = { | |
| "Condenser fouling": 0, | |
| "Defective Pilot Valve": 1, | |
| "Excess oil": 2, | |
| "Non-condensables in refrigerant": 3, | |
| "Reduced condenser water flow": 4, | |
| "Reduced evaporator water flow": 5, | |
| "Refrigerant leak": 6, | |
| "Refrigerant overcharge": 7 | |
| } | |
| num_classes = len(fallas_dict) | |
| # Asignar el número entero a cada etiqueta de falla | |
| full_df["Clase"] = full_df["Label"].map(fallas_dict) | |
| # Entradas (X) y etiquetas numéricas (y) | |
| X = full_df[features].values.astype(float) | |
| y = full_df["Clase"].values.astype(int) | |
| # One-Hot Encoding personalizado: la clase correcta es 1, las demás son -1 | |
| def custom_ohe(y_labels, n_classes): | |
| ohe = -1 * np.ones((len(y_labels), n_classes), dtype=int) | |
| ohe[np.arange(len(y_labels)), y_labels] = 1 | |
| return ohe | |
| y_ohe = custom_ohe(y, num_classes) | |
| # Paso 4: División de los datos (60% train, 20% validation, 20% test) | |
| X_trainval, X_test, y_trainval, y_test_ohe = train_test_split(X, y_ohe, test_size=0.2, random_state=42, stratify=y) | |
| X_train, X_val, y_train_ohe, y_val_ohe = train_test_split(X_trainval, y_trainval, test_size=0.25, random_state=42, stratify=np.argmax(y_trainval, axis=1)) | |
| # Paso 5: Estandarización de los datos | |
| mu = X_train.mean(axis=0) | |
| std = X_train.std(axis=0) | |
| # Evitar división por cero si una característica es constante | |
| std[std == 0] = 1 | |
| X_train_p = (X_train - mu) / std | |
| X_val_p = (X_val - mu) / std | |
| X_test_p = (X_test - mu) / std | |
| class NNSVM_Multiclass: | |
| """ | |
| Implementación de una Red Neuronal seguida de una SVM multiclase. | |
| Esta versión está refactorizada para usar matrices 2D (batch_size, features), | |
| lo que simplifica enormemente el código y sigue las convenciones estándar. | |
| """ | |
| def __init__(self, input_dim, n, d, n_classes, AF="tanh", seed=0): | |
| self.AF = AF | |
| np.random.seed(seed) | |
| # Inicialización de pesos (Xavier/Glorot) | |
| # Capa 1 (Entrada -> Oculta) | |
| limit_W0 = np.sqrt(6 / (input_dim + n)) | |
| self.W0 = np.random.uniform(-limit_W0, limit_W0, size=(input_dim, n)) | |
| self.b0 = np.zeros((1, n)) # (1, n) para broadcasting | |
| # Capa 2 (Oculta -> Salida de la Red) | |
| limit_W1 = np.sqrt(6 / (n + d)) | |
| self.W1 = np.random.uniform(-limit_W1, limit_W1, size=(n, d)) | |
| self.b1 = np.zeros((1, d)) # (1, d) para broadcasting | |
| # Capa 3 (Parámetros de la SVM) | |
| self.theta = np.zeros((d, n_classes)) | |
| self.theta_0 = np.zeros((1, n_classes)) # (1, n_classes) para broadcasting | |
| def activation(self, z): | |
| if self.AF == "tanh": | |
| return np.tanh(z) | |
| # Se pueden añadir otras funciones de activación si es necesario | |
| def activation_derivative(self, a): | |
| # Derivada en función de la salida de la activación 'a' | |
| if self.AF == "tanh": | |
| return 1 - a**2 | |
| def forward(self, x): | |
| # Flujo de datos a través de la red (propagación hacia adelante) | |
| self.x = x | |
| self.z0 = np.matmul(self.x, self.W0) + self.b0 | |
| self.a0 = self.activation(self.z0) | |
| self.z1 = np.matmul(self.a0, self.W1) + self.b1 | |
| self.phi = self.activation(self.z1) # Esta es la salida de la NN (features para la SVM) | |
| # Salida final del modelo (puntuaciones de la SVM) | |
| self.z_prime = np.matmul(self.phi, self.theta) + self.theta_0 | |
| return self.z_prime | |
| def predict(self, x): | |
| scores = self.forward(x) | |
| # La predicción es la clase con la puntuación más alta | |
| return np.argmax(scores, axis=1) | |
| def fit(self, X, y_ohe, X_val, y_val_ohe, epochs=100, lr=1e-3, Lambda=1e-3, | |
| beta1=0.9, beta2=0.999, eps=1e-8, batch_size=128): | |
| n_samples = X.shape[0] | |
| # Inicialización del optimizador Adam para cada parámetro | |
| m_W0, v_W0 = np.zeros_like(self.W0), np.zeros_like(self.W0) | |
| m_b0, v_b0 = np.zeros_like(self.b0), np.zeros_like(self.b0) | |
| m_W1, v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1) | |
| m_b1, v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1) | |
| k = 0 # Contador de iteraciones de Adam | |
| print(f"Iniciando entrenamiento por {epochs} épocas...") | |
| for epoch in range(epochs): | |
| # Mezclar los datos en cada época | |
| permutation = np.random.permutation(n_samples) | |
| X_shuffled = X[permutation] | |
| y_shuffled = y_ohe[permutation] | |
| for i in range(0, n_samples, batch_size): | |
| # Tomar un lote (batch) de datos | |
| x_batch = X_shuffled[i:i+batch_size] | |
| y_batch = y_shuffled[i:i+batch_size] | |
| batch_n = x_batch.shape[0] | |
| # 1. Forward pass (Cálculo de la salida) | |
| z_prime = self.forward(x_batch) | |
| # 2. Cálculo de la pérdida y su gradiente (Hinge Loss Multiclase) | |
| true_class_scores = np.sum(z_prime * y_batch, axis=1, keepdims=True) | |
| margins = np.maximum(0, 1 + z_prime - true_class_scores) | |
| margins[y_batch == 1] = 0 | |
| # Gradiente de la pérdida respecto a z_prime | |
| loss_grad = (margins > 0).astype(float) | |
| correct_class_indices = np.argmax(y_batch, axis=1) | |
| loss_grad[np.arange(batch_n), correct_class_indices] = -np.sum(loss_grad, axis=1) | |
| # 3. Backward pass (Cálculo de gradientes para cada capa) | |
| # Gradientes de la capa SVM | |
| dtheta = np.matmul(self.phi.T, loss_grad) | |
| dtheta_0 = np.sum(loss_grad, axis=0, keepdims=True) | |
| # Propagar gradiente hacia atrás | |
| dphi = np.matmul(loss_grad, self.theta.T) | |
| dz1 = dphi * self.activation_derivative(self.phi) | |
| dW1 = np.matmul(self.a0.T, dz1) | |
| db1 = np.sum(dz1, axis=0, keepdims=True) | |
| da0 = np.matmul(dz1, self.W1.T) | |
| dz0 = da0 * self.activation_derivative(self.a0) | |
| dW0 = np.matmul(self.x.T, dz0) | |
| db0 = np.sum(dz0, axis=0, keepdims=True) | |
| # 4. Actualización de parámetros | |
| # **CORRECCIÓN CLAVE**: Dividimos los gradientes por el tamaño del lote (batch_n) | |
| # para obtener el promedio, en lugar de la suma. | |
| # El error original estaba aquí, usando la suma implícita. | |
| # Actualización de la capa SVM (con regularización L2) | |
| self.theta -= lr * (dtheta / batch_n + Lambda * self.theta) | |
| self.theta_0 -= lr * (dtheta_0 / batch_n) | |
| # Actualización de la Red Neuronal (con optimizador Adam) | |
| k += 1 | |
| # --- Actualización de W1, b1 --- | |
| m_W1 = beta1 * m_W1 + (1 - beta1) * (dW1 / batch_n) | |
| v_W1 = beta2 * v_W1 + (1 - beta2) * ((dW1 / batch_n)**2) | |
| m_W1_hat = m_W1 / (1 - beta1**k) | |
| v_W1_hat = v_W1 / (1 - beta2**k) | |
| self.W1 -= lr * m_W1_hat / (np.sqrt(v_W1_hat) + eps) | |
| m_b1 = beta1 * m_b1 + (1 - beta1) * (db1 / batch_n) | |
| v_b1 = beta2 * v_b1 + (1 - beta2) * ((db1 / batch_n)**2) | |
| m_b1_hat = m_b1 / (1 - beta1**k) | |
| v_b1_hat = v_b1 / (1 - beta2**k) | |
| self.b1 -= lr * m_b1_hat / (np.sqrt(v_b1_hat) + eps) | |
| # --- Actualización de W0, b0 --- | |
| m_W0 = beta1 * m_W0 + (1 - beta1) * (dW0 / batch_n) | |
| v_W0 = beta2 * v_W0 + (1 - beta2) * ((dW0 / batch_n)**2) | |
| m_W0_hat = m_W0 / (1 - beta1**k) | |
| v_W0_hat = v_W0 / (1 - beta2**k) | |
| self.W0 -= lr * m_W0_hat / (np.sqrt(v_W0_hat) + eps) | |
| m_b0 = beta1 * m_b0 + (1 - beta1) * (db0 / batch_n) | |
| v_b0 = beta2 * v_b0 + (1 - beta2) * ((db0 / batch_n)**2) | |
| m_b0_hat = m_b0 / (1 - beta1**k) | |
| v_b0_hat = v_b0 / (1 - beta2**k) | |
| self.b0 -= lr * m_b0_hat / (np.sqrt(v_b0_hat) + eps) | |
| # --- Evaluación al final de cada época --- | |
| # Precisión en el conjunto de validación para monitorear el sobreajuste | |
| val_preds = self.predict(X_val_p) | |
| val_labels = np.argmax(y_val_ohe, axis=1) | |
| val_acc = accuracy_score(val_labels, val_preds) | |
| print(f"Época: {epoch + 1}/{epochs} - Accuracy de Validación: {val_acc:.4f}") | |
| # Paso 9: Crear el modelo con la arquitectura de la tesis | |
| input_dim = X_train_p.shape[1] # 13 entradas | |
| hidden_neurons = 500 # una sola capa oculta de 500 neuronas | |
| output_dim = 500 # una sola salida (SVM) | |
| model = NNSVM_Multiclass(input_dim, hidden_neurons, output_dim, 8, AF="tanh", seed=42) | |
| # %% | |
| print(X_train_p.shape) | |
| # %% | |
| # Paso 10: Entrenar el modelo con los parámetros de la tesis | |
| model.fit(X_train_p, y_train_ohe, X_val_p, y_val_ohe, | |
| epochs=15, # Aumentar épocas puede ayudar | |
| lr=1e-4, # Una tasa de aprendizaje más baja suele ser más estable | |
| Lambda=1e-3, | |
| batch_size=64) # Un batch size más pequeño a veces ayuda a generalizar mejor | |
| # %% | |
| y_pred = model.predict(X_test_p) | |
| y_test_labels = np.argmax(y_test_ohe, axis=1) | |
| # Métricas de rendimiento | |
| acc = accuracy_score(y_test_labels, y_pred) | |
| print(f"\n✅ Accuracy final en test: {acc:.4f}") | |
| print("\n📊 Reporte de clasificación:") | |
| print(classification_report(y_test_labels, y_pred, target_names=fallas_dict.keys())) | |
| # %% | |
| # Paso 11: Matriz de confusión | |
| cm = confusion_matrix(y_test_labels, y_pred) | |
| labels = list(fallas_dict.keys()) | |
| plt.figure(figsize=(12, 10)) | |
| sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', | |
| xticklabels=labels, yticklabels=labels) | |
| plt.title("Matriz de Confusión - Resultados en Test", fontsize=16) | |
| plt.xlabel("Etiqueta Predicha", fontsize=12) | |
| plt.ylabel("Etiqueta Real", fontsize=12) | |
| plt.xticks(rotation=45, ha="right") | |
| plt.yticks(rotation=0) | |
| plt.tight_layout() | |
| plt.show() | |
| # %% | |
| # Paso 12 (Opcional): Guardar el modelo multiclase entrenado y el diccionario de fallas | |
| print("Guardando el modelo multiclase y el diccionario de fallas...") | |
| # Guardar el modelo multiclase | |
| joblib.dump(model, 'modelo_clasificacion_multiclase.pkl') | |
| print("Modelo de clasificación multiclase guardado como 'modelo_clasificacion_multiclase.pkl'") | |
| # Guardar el diccionario de mapeo de fallas | |
| # Esto es útil para convertir las predicciones numéricas del modelo de nuevo a nombres de fallas | |
| joblib.dump(fallas_dict, 'diccionario_fallas.pkl') | |
| print("Diccionario de fallas guardado como 'diccionario_fallas.pkl'") | |
| # %% | |