bai-Epilepsy-6 / v1 /epilepsy_detection_model.py
eyupipler's picture
Upload epilepsy_detection_model.py
96fa53f verified
import numpy as np
import pandas as pd
import ast
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
class EpilepsyDataset(Dataset):
def __init__(self, csv_path):
self.data = pd.read_csv(csv_path)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
# Parse the data string to list
data_list = ast.literal_eval(self.data.iloc[idx]['data'])
data_tensor = torch.FloatTensor(data_list)
label = torch.LongTensor([self.data.iloc[idx]['label']])[0]
return data_tensor, label
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, dropout=0.1):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
batch_size = x.size(0)
Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
context = torch.matmul(attn_weights, V)
context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
output = self.W_o(context)
return output, attn_weights
class TemporalConvBlock(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):
super().__init__()
padding = (kernel_size - 1) * dilation // 2
self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size,
padding=padding, dilation=dilation)
self.bn1 = nn.BatchNorm1d(out_channels)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size,
padding=padding, dilation=dilation)
self.bn2 = nn.BatchNorm1d(out_channels)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None
def forward(self, x):
residual = x if self.downsample is None else self.downsample(x)
out = self.conv1(x)
out = self.bn1(out)
out = self.relu1(out)
out = self.dropout1(out)
out = self.conv2(out)
out = self.bn2(out)
out = out + residual
out = self.relu2(out)
out = self.dropout2(out)
return out
class ChannelAttention(nn.Module):
def __init__(self, channels, reduction=8):
super().__init__()
self.avg_pool = nn.AdaptiveAvgPool1d(1)
self.max_pool = nn.AdaptiveMaxPool1d(1)
self.fc = nn.Sequential(
nn.Linear(channels, channels // reduction, bias=False),
nn.ReLU(),
nn.Linear(channels // reduction, channels, bias=False),
nn.Sigmoid()
)
def forward(self, x):
b, c, _ = x.size()
avg_out = self.fc(self.avg_pool(x).view(b, c))
max_out = self.fc(self.max_pool(x).view(b, c))
out = avg_out + max_out
return x * out.view(b, c, 1)
class AdvancedEpilepsyDetector(nn.Module):
def __init__(self, input_dim=178, num_classes=2, dropout=0.3):
super().__init__()
self.input_proj = nn.Linear(input_dim, 256)
self.input_bn = nn.BatchNorm1d(256)
self.tcn_blocks = nn.ModuleList([
TemporalConvBlock(1, 64, kernel_size=7, dilation=1, dropout=dropout),
TemporalConvBlock(64, 128, kernel_size=5, dilation=2, dropout=dropout),
TemporalConvBlock(128, 256, kernel_size=3, dilation=4, dropout=dropout),
TemporalConvBlock(256, 256, kernel_size=3, dilation=8, dropout=dropout),
])
self.channel_attn = ChannelAttention(256)
self.mha1 = MultiHeadAttention(256, num_heads=8, dropout=dropout)
self.mha2 = MultiHeadAttention(256, num_heads=8, dropout=dropout)
self.layer_norm1 = nn.LayerNorm(256)
self.layer_norm2 = nn.LayerNorm(256)
self.ffn = nn.Sequential(
nn.Linear(256, 512),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(512, 256),
nn.Dropout(dropout)
)
self.bilstm = nn.LSTM(256, 128, num_layers=2, batch_first=True,
bidirectional=True, dropout=dropout)
self.classifier = nn.Sequential(
nn.Linear(256 + 256, 512), # TCN output + LSTM output
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(128, num_classes)
)
def forward(self, x):
batch_size = x.size(0)
x_proj = self.input_proj(x)
x_proj = self.input_bn(x_proj)
x_proj = F.relu(x_proj)
x_tcn = x_proj.unsqueeze(1)
for tcn_block in self.tcn_blocks:
x_tcn = tcn_block(x_tcn)
x_tcn = self.channel_attn(x_tcn)
x_tcn = x_tcn.squeeze(1) if x_tcn.dim() == 3 and x_tcn.size(1) == 1 else x_tcn.mean(dim=-1)
x_trans = x_proj.unsqueeze(1)
attn_out1, _ = self.mha1(x_trans)
x_trans = self.layer_norm1(x_trans + attn_out1)
attn_out2, _ = self.mha2(x_trans)
x_trans = self.layer_norm2(x_trans + attn_out2)
ffn_out = self.ffn(x_trans)
x_trans = x_trans + ffn_out
lstm_out, _ = self.bilstm(x_trans)
lstm_out = lstm_out[:, -1, :]
combined = torch.cat([x_tcn, lstm_out], dim=1)
output = self.classifier(combined)
return output
class FocalLoss(nn.Module):
def __init__(self, alpha=0.25, gamma=2):
super().__init__()
self.alpha = alpha
self.gamma = gamma
def forward(self, inputs, targets):
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss
return focal_loss.mean()
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
all_preds = []
all_labels = []
pbar = tqdm(dataloader, desc='Training')
for data, labels in pbar:
data, labels = data.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
running_loss += loss.item()
_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
pbar.set_postfix({'loss': loss.item()})
epoch_loss = running_loss / len(dataloader)
epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
return epoch_loss, epoch_f1
def validate(model, dataloader, criterion, device):
model.eval()
running_loss = 0.0
all_preds = []
all_labels = []
all_probs = []
with torch.no_grad():
for data, labels in tqdm(dataloader, desc='Validation'):
data, labels = data.to(device), labels.to(device)
outputs = model(data)
loss = criterion(outputs, labels)
running_loss += loss.item()
probs = F.softmax(outputs, dim=1)
_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
all_probs.extend(probs.cpu().numpy()[:, 1])
epoch_loss = running_loss / len(dataloader)
epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
epoch_auc = roc_auc_score(all_labels, all_probs)
return epoch_loss, epoch_f1, epoch_auc, all_preds, all_labels
def main():
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUM_EPOCHS = 100
PATIENCE = 15
print("Loading datasets...")
train_dataset = EpilepsyDataset(r'train/path')
val_dataset = EpilepsyDataset(r'val/path')
test_dataset = EpilepsyDataset(r'test/path')
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
print(f"Train samples: {len(train_dataset)}")
print(f"Val samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")
model = AdvancedEpilepsyDetector(input_dim=178, num_classes=2, dropout=0.3).to(device)
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
criterion = FocalLoss(alpha=0.25, gamma=2)
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5,
patience=5, verbose=True)
best_val_f1 = 0.0
patience_counter = 0
train_losses, val_losses = [], []
train_f1s, val_f1s = [], []
print("\nStarting training...\n")
for epoch in range(NUM_EPOCHS):
print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
print("-" * 50)
train_loss, train_f1 = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_f1, val_auc, _, _ = validate(model, val_loader, criterion, device)
scheduler.step(val_loss)
train_losses.append(train_loss)
val_losses.append(val_loss)
train_f1s.append(train_f1)
val_f1s.append(val_f1)
print(f"Train Loss: {train_loss:.4f} | Train F1: {train_f1:.4f}")
print(f"Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f} | Val AUC: {val_auc:.4f}\n")
if val_f1 > best_val_f1:
best_val_f1 = val_f1
patience_counter = 0
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'val_f1': val_f1,
'val_auc': val_auc
}, r'best_epilepsy_model.pth')
print(f"[SAVED] Model saved with Val F1: {val_f1:.4f}\n")
else:
patience_counter += 1
if patience_counter >= PATIENCE:
print(f"\nEarly stopping triggered after {epoch+1} epochs")
break
print("\nLoading best model for testing...")
checkpoint = torch.load(r'best_epilepsy_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])
print("\nEvaluating on test set...")
test_loss, test_f1, test_auc, test_preds, test_labels = validate(model, test_loader, criterion, device)
print(f"\nTest Results:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1: {test_f1:.4f}")
print(f"Test AUC: {test_auc:.4f}")
print("\nClassification Report:")
print(classification_report(test_labels, test_preds, target_names=['Non-Seizure', 'Seizure']))
# Confusion matrix
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['Non-Seizure', 'Seizure'],
yticklabels=['Non-Seizure', 'Seizure'])
plt.title('Confusion Matrix - Epilepsy Detection')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig(r'confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(train_f1s, label='Train F1')
plt.plot(val_f1s, label='Val F1')
plt.xlabel('Epoch')
plt.ylabel('F1 Score')
plt.title('Training and Validation F1 Score')
plt.legend()
plt.grid(True)
plt.savefig(r'training_curves.png', dpi=300, bbox_inches='tight')
plt.close()
print("\nTraining completed! Results saved.")
if __name__ == "__main__":
main()