faultdetectionchillers / multiclass_classification.py
DevNumb's picture
Upload 4 files
d3cb5c7 verified
# %%
import os
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
# Paso 1: Cargar la base de datos desde la carpeta "Data select"
moods = os.listdir("Data select")
list_of_dfs = []
for mood in moods:
if "Normal" in mood:
continue # saltar normales
files = os.listdir(f"Data select/{mood}")
for fn in files:
path = os.path.join("Data select", mood, fn)
df = pd.read_excel(path, sheet_name="Complete Data Set")
# Validar que la primera fila no sea una fila de nombres duplicada
if not df.columns[0].startswith("Time"):
df.columns = df.iloc[0]
df = df.drop(index=0)
df["Label"] = mood
df['TEI-TEO'] = df['TEI'].astype(float) - df['TEO'].astype(float)
df['TCO-TCI'] = df['TCO'].astype(float) - df['TCI'].astype(float)
list_of_dfs.append(df)
# Paso 2: Concatenar todos los datos
full_df = pd.concat(list_of_dfs, ignore_index=True)
# Rellenar valores nulos usando 'forward fill'
full_df.fillna(method='ffill', inplace=True)
# Rellenar cualquier valor nulo restante al principio usando 'backward fill'
full_df.fillna(method='bfill', inplace=True)
# Paso 3: Definición de variables de entrada (features) y salida (labels)
features = [
'FWC', 'FWE', 'VC', 'VE', 'TCA', 'TSI', 'TO_sump', 'TO_feed',
'PO_net', 'PO_feed', 'TRC_sub', 'TEI-TEO', 'TCO-TCI'
]
fallas_dict = {
"Condenser fouling": 0,
"Defective Pilot Valve": 1,
"Excess oil": 2,
"Non-condensables in refrigerant": 3,
"Reduced condenser water flow": 4,
"Reduced evaporator water flow": 5,
"Refrigerant leak": 6,
"Refrigerant overcharge": 7
}
num_classes = len(fallas_dict)
# Asignar el número entero a cada etiqueta de falla
full_df["Clase"] = full_df["Label"].map(fallas_dict)
# Entradas (X) y etiquetas numéricas (y)
X = full_df[features].values.astype(float)
y = full_df["Clase"].values.astype(int)
# One-Hot Encoding personalizado: la clase correcta es 1, las demás son -1
def custom_ohe(y_labels, n_classes):
ohe = -1 * np.ones((len(y_labels), n_classes), dtype=int)
ohe[np.arange(len(y_labels)), y_labels] = 1
return ohe
y_ohe = custom_ohe(y, num_classes)
# Paso 4: División de los datos (60% train, 20% validation, 20% test)
X_trainval, X_test, y_trainval, y_test_ohe = train_test_split(X, y_ohe, test_size=0.2, random_state=42, stratify=y)
X_train, X_val, y_train_ohe, y_val_ohe = train_test_split(X_trainval, y_trainval, test_size=0.25, random_state=42, stratify=np.argmax(y_trainval, axis=1))
# Paso 5: Estandarización de los datos
mu = X_train.mean(axis=0)
std = X_train.std(axis=0)
# Evitar división por cero si una característica es constante
std[std == 0] = 1
X_train_p = (X_train - mu) / std
X_val_p = (X_val - mu) / std
X_test_p = (X_test - mu) / std
class NNSVM_Multiclass:
"""
Implementación de una Red Neuronal seguida de una SVM multiclase.
Esta versión está refactorizada para usar matrices 2D (batch_size, features),
lo que simplifica enormemente el código y sigue las convenciones estándar.
"""
def __init__(self, input_dim, n, d, n_classes, AF="tanh", seed=0):
self.AF = AF
np.random.seed(seed)
# Inicialización de pesos (Xavier/Glorot)
# Capa 1 (Entrada -> Oculta)
limit_W0 = np.sqrt(6 / (input_dim + n))
self.W0 = np.random.uniform(-limit_W0, limit_W0, size=(input_dim, n))
self.b0 = np.zeros((1, n)) # (1, n) para broadcasting
# Capa 2 (Oculta -> Salida de la Red)
limit_W1 = np.sqrt(6 / (n + d))
self.W1 = np.random.uniform(-limit_W1, limit_W1, size=(n, d))
self.b1 = np.zeros((1, d)) # (1, d) para broadcasting
# Capa 3 (Parámetros de la SVM)
self.theta = np.zeros((d, n_classes))
self.theta_0 = np.zeros((1, n_classes)) # (1, n_classes) para broadcasting
def activation(self, z):
if self.AF == "tanh":
return np.tanh(z)
# Se pueden añadir otras funciones de activación si es necesario
def activation_derivative(self, a):
# Derivada en función de la salida de la activación 'a'
if self.AF == "tanh":
return 1 - a**2
def forward(self, x):
# Flujo de datos a través de la red (propagación hacia adelante)
self.x = x
self.z0 = np.matmul(self.x, self.W0) + self.b0
self.a0 = self.activation(self.z0)
self.z1 = np.matmul(self.a0, self.W1) + self.b1
self.phi = self.activation(self.z1) # Esta es la salida de la NN (features para la SVM)
# Salida final del modelo (puntuaciones de la SVM)
self.z_prime = np.matmul(self.phi, self.theta) + self.theta_0
return self.z_prime
def predict(self, x):
scores = self.forward(x)
# La predicción es la clase con la puntuación más alta
return np.argmax(scores, axis=1)
def fit(self, X, y_ohe, X_val, y_val_ohe, epochs=100, lr=1e-3, Lambda=1e-3,
beta1=0.9, beta2=0.999, eps=1e-8, batch_size=128):
n_samples = X.shape[0]
# Inicialización del optimizador Adam para cada parámetro
m_W0, v_W0 = np.zeros_like(self.W0), np.zeros_like(self.W0)
m_b0, v_b0 = np.zeros_like(self.b0), np.zeros_like(self.b0)
m_W1, v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1)
m_b1, v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1)
k = 0 # Contador de iteraciones de Adam
print(f"Iniciando entrenamiento por {epochs} épocas...")
for epoch in range(epochs):
# Mezclar los datos en cada época
permutation = np.random.permutation(n_samples)
X_shuffled = X[permutation]
y_shuffled = y_ohe[permutation]
for i in range(0, n_samples, batch_size):
# Tomar un lote (batch) de datos
x_batch = X_shuffled[i:i+batch_size]
y_batch = y_shuffled[i:i+batch_size]
batch_n = x_batch.shape[0]
# 1. Forward pass (Cálculo de la salida)
z_prime = self.forward(x_batch)
# 2. Cálculo de la pérdida y su gradiente (Hinge Loss Multiclase)
true_class_scores = np.sum(z_prime * y_batch, axis=1, keepdims=True)
margins = np.maximum(0, 1 + z_prime - true_class_scores)
margins[y_batch == 1] = 0
# Gradiente de la pérdida respecto a z_prime
loss_grad = (margins > 0).astype(float)
correct_class_indices = np.argmax(y_batch, axis=1)
loss_grad[np.arange(batch_n), correct_class_indices] = -np.sum(loss_grad, axis=1)
# 3. Backward pass (Cálculo de gradientes para cada capa)
# Gradientes de la capa SVM
dtheta = np.matmul(self.phi.T, loss_grad)
dtheta_0 = np.sum(loss_grad, axis=0, keepdims=True)
# Propagar gradiente hacia atrás
dphi = np.matmul(loss_grad, self.theta.T)
dz1 = dphi * self.activation_derivative(self.phi)
dW1 = np.matmul(self.a0.T, dz1)
db1 = np.sum(dz1, axis=0, keepdims=True)
da0 = np.matmul(dz1, self.W1.T)
dz0 = da0 * self.activation_derivative(self.a0)
dW0 = np.matmul(self.x.T, dz0)
db0 = np.sum(dz0, axis=0, keepdims=True)
# 4. Actualización de parámetros
# **CORRECCIÓN CLAVE**: Dividimos los gradientes por el tamaño del lote (batch_n)
# para obtener el promedio, en lugar de la suma.
# El error original estaba aquí, usando la suma implícita.
# Actualización de la capa SVM (con regularización L2)
self.theta -= lr * (dtheta / batch_n + Lambda * self.theta)
self.theta_0 -= lr * (dtheta_0 / batch_n)
# Actualización de la Red Neuronal (con optimizador Adam)
k += 1
# --- Actualización de W1, b1 ---
m_W1 = beta1 * m_W1 + (1 - beta1) * (dW1 / batch_n)
v_W1 = beta2 * v_W1 + (1 - beta2) * ((dW1 / batch_n)**2)
m_W1_hat = m_W1 / (1 - beta1**k)
v_W1_hat = v_W1 / (1 - beta2**k)
self.W1 -= lr * m_W1_hat / (np.sqrt(v_W1_hat) + eps)
m_b1 = beta1 * m_b1 + (1 - beta1) * (db1 / batch_n)
v_b1 = beta2 * v_b1 + (1 - beta2) * ((db1 / batch_n)**2)
m_b1_hat = m_b1 / (1 - beta1**k)
v_b1_hat = v_b1 / (1 - beta2**k)
self.b1 -= lr * m_b1_hat / (np.sqrt(v_b1_hat) + eps)
# --- Actualización de W0, b0 ---
m_W0 = beta1 * m_W0 + (1 - beta1) * (dW0 / batch_n)
v_W0 = beta2 * v_W0 + (1 - beta2) * ((dW0 / batch_n)**2)
m_W0_hat = m_W0 / (1 - beta1**k)
v_W0_hat = v_W0 / (1 - beta2**k)
self.W0 -= lr * m_W0_hat / (np.sqrt(v_W0_hat) + eps)
m_b0 = beta1 * m_b0 + (1 - beta1) * (db0 / batch_n)
v_b0 = beta2 * v_b0 + (1 - beta2) * ((db0 / batch_n)**2)
m_b0_hat = m_b0 / (1 - beta1**k)
v_b0_hat = v_b0 / (1 - beta2**k)
self.b0 -= lr * m_b0_hat / (np.sqrt(v_b0_hat) + eps)
# --- Evaluación al final de cada época ---
# Precisión en el conjunto de validación para monitorear el sobreajuste
val_preds = self.predict(X_val_p)
val_labels = np.argmax(y_val_ohe, axis=1)
val_acc = accuracy_score(val_labels, val_preds)
print(f"Época: {epoch + 1}/{epochs} - Accuracy de Validación: {val_acc:.4f}")
# Paso 9: Crear el modelo con la arquitectura de la tesis
input_dim = X_train_p.shape[1] # 13 entradas
hidden_neurons = 500 # una sola capa oculta de 500 neuronas
output_dim = 500 # una sola salida (SVM)
model = NNSVM_Multiclass(input_dim, hidden_neurons, output_dim, 8, AF="tanh", seed=42)
# %%
print(X_train_p.shape)
# %%
# Paso 10: Entrenar el modelo con los parámetros de la tesis
model.fit(X_train_p, y_train_ohe, X_val_p, y_val_ohe,
epochs=15, # Aumentar épocas puede ayudar
lr=1e-4, # Una tasa de aprendizaje más baja suele ser más estable
Lambda=1e-3,
batch_size=64) # Un batch size más pequeño a veces ayuda a generalizar mejor
# %%
y_pred = model.predict(X_test_p)
y_test_labels = np.argmax(y_test_ohe, axis=1)
# Métricas de rendimiento
acc = accuracy_score(y_test_labels, y_pred)
print(f"\n✅ Accuracy final en test: {acc:.4f}")
print("\n📊 Reporte de clasificación:")
print(classification_report(y_test_labels, y_pred, target_names=fallas_dict.keys()))
# %%
# Paso 11: Matriz de confusión
cm = confusion_matrix(y_test_labels, y_pred)
labels = list(fallas_dict.keys())
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=labels, yticklabels=labels)
plt.title("Matriz de Confusión - Resultados en Test", fontsize=16)
plt.xlabel("Etiqueta Predicha", fontsize=12)
plt.ylabel("Etiqueta Real", fontsize=12)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()
# %%
# Paso 12 (Opcional): Guardar el modelo multiclase entrenado y el diccionario de fallas
print("Guardando el modelo multiclase y el diccionario de fallas...")
# Guardar el modelo multiclase
joblib.dump(model, 'modelo_clasificacion_multiclase.pkl')
print("Modelo de clasificación multiclase guardado como 'modelo_clasificacion_multiclase.pkl'")
# Guardar el diccionario de mapeo de fallas
# Esto es útil para convertir las predicciones numéricas del modelo de nuevo a nombres de fallas
joblib.dump(fallas_dict, 'diccionario_fallas.pkl')
print("Diccionario de fallas guardado como 'diccionario_fallas.pkl'")
# %%