import numpy as np import pandas as pd import ast import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score import matplotlib.pyplot as plt import seaborn as sns from tqdm import tqdm import warnings warnings.filterwarnings('ignore') device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") class EpilepsyDataset(Dataset): def __init__(self, csv_path): self.data = pd.read_csv(csv_path) def __len__(self): return len(self.data) def __getitem__(self, idx): # Parse the data string to list data_list = ast.literal_eval(self.data.iloc[idx]['data']) data_tensor = torch.FloatTensor(data_list) label = torch.LongTensor([self.data.iloc[idx]['label']])[0] return data_tensor, label class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads, dropout=0.1): super().__init__() assert d_model % num_heads == 0 self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): batch_size = x.size(0) Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k) attn_weights = F.softmax(scores, dim=-1) attn_weights = self.dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(context) return output, attn_weights class TemporalConvBlock(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2): super().__init__() padding = (kernel_size - 1) * dilation // 2 self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation) self.bn1 = nn.BatchNorm1d(out_channels) self.relu1 = nn.ReLU() self.dropout1 = nn.Dropout(dropout) self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, padding=padding, dilation=dilation) self.bn2 = nn.BatchNorm1d(out_channels) self.relu2 = nn.ReLU() self.dropout2 = nn.Dropout(dropout) self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None def forward(self, x): residual = x if self.downsample is None else self.downsample(x) out = self.conv1(x) out = self.bn1(out) out = self.relu1(out) out = self.dropout1(out) out = self.conv2(out) out = self.bn2(out) out = out + residual out = self.relu2(out) out = self.dropout2(out) return out class ChannelAttention(nn.Module): def __init__(self, channels, reduction=8): super().__init__() self.avg_pool = nn.AdaptiveAvgPool1d(1) self.max_pool = nn.AdaptiveMaxPool1d(1) self.fc = nn.Sequential( nn.Linear(channels, channels // reduction, bias=False), nn.ReLU(), nn.Linear(channels // reduction, channels, bias=False), nn.Sigmoid() ) def forward(self, x): b, c, _ = x.size() avg_out = self.fc(self.avg_pool(x).view(b, c)) max_out = self.fc(self.max_pool(x).view(b, c)) out = avg_out + max_out return x * out.view(b, c, 1) class AdvancedEpilepsyDetector(nn.Module): def __init__(self, input_dim=178, num_classes=2, dropout=0.3): super().__init__() self.input_proj = nn.Linear(input_dim, 256) self.input_bn = nn.BatchNorm1d(256) self.tcn_blocks = nn.ModuleList([ TemporalConvBlock(1, 64, kernel_size=7, dilation=1, dropout=dropout), TemporalConvBlock(64, 128, kernel_size=5, dilation=2, dropout=dropout), TemporalConvBlock(128, 256, kernel_size=3, dilation=4, dropout=dropout), TemporalConvBlock(256, 256, kernel_size=3, dilation=8, dropout=dropout), ]) self.channel_attn = ChannelAttention(256) self.mha1 = MultiHeadAttention(256, num_heads=8, dropout=dropout) self.mha2 = MultiHeadAttention(256, num_heads=8, dropout=dropout) self.layer_norm1 = nn.LayerNorm(256) self.layer_norm2 = nn.LayerNorm(256) self.ffn = nn.Sequential( nn.Linear(256, 512), nn.ReLU(), nn.Dropout(dropout), nn.Linear(512, 256), nn.Dropout(dropout) ) self.bilstm = nn.LSTM(256, 128, num_layers=2, batch_first=True, bidirectional=True, dropout=dropout) self.classifier = nn.Sequential( nn.Linear(256 + 256, 512), # TCN output + LSTM output nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(dropout), nn.Linear(512, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(dropout), nn.Linear(256, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(dropout), nn.Linear(128, num_classes) ) def forward(self, x): batch_size = x.size(0) x_proj = self.input_proj(x) x_proj = self.input_bn(x_proj) x_proj = F.relu(x_proj) x_tcn = x_proj.unsqueeze(1) for tcn_block in self.tcn_blocks: x_tcn = tcn_block(x_tcn) x_tcn = self.channel_attn(x_tcn) x_tcn = x_tcn.squeeze(1) if x_tcn.dim() == 3 and x_tcn.size(1) == 1 else x_tcn.mean(dim=-1) x_trans = x_proj.unsqueeze(1) attn_out1, _ = self.mha1(x_trans) x_trans = self.layer_norm1(x_trans + attn_out1) attn_out2, _ = self.mha2(x_trans) x_trans = self.layer_norm2(x_trans + attn_out2) ffn_out = self.ffn(x_trans) x_trans = x_trans + ffn_out lstm_out, _ = self.bilstm(x_trans) lstm_out = lstm_out[:, -1, :] combined = torch.cat([x_tcn, lstm_out], dim=1) output = self.classifier(combined) return output class FocalLoss(nn.Module): def __init__(self, alpha=0.25, gamma=2): super().__init__() self.alpha = alpha self.gamma = gamma def forward(self, inputs, targets): ce_loss = F.cross_entropy(inputs, targets, reduction='none') pt = torch.exp(-ce_loss) focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss return focal_loss.mean() def train_epoch(model, dataloader, criterion, optimizer, device): model.train() running_loss = 0.0 all_preds = [] all_labels = [] pbar = tqdm(dataloader, desc='Training') for data, labels in pbar: data, labels = data.to(device), labels.to(device) optimizer.zero_grad() outputs = model(data) loss = criterion(outputs, labels) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() running_loss += loss.item() _, preds = torch.max(outputs, 1) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) pbar.set_postfix({'loss': loss.item()}) epoch_loss = running_loss / len(dataloader) epoch_f1 = f1_score(all_labels, all_preds, average='weighted') return epoch_loss, epoch_f1 def validate(model, dataloader, criterion, device): model.eval() running_loss = 0.0 all_preds = [] all_labels = [] all_probs = [] with torch.no_grad(): for data, labels in tqdm(dataloader, desc='Validation'): data, labels = data.to(device), labels.to(device) outputs = model(data) loss = criterion(outputs, labels) running_loss += loss.item() probs = F.softmax(outputs, dim=1) _, preds = torch.max(outputs, 1) all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) all_probs.extend(probs.cpu().numpy()[:, 1]) epoch_loss = running_loss / len(dataloader) epoch_f1 = f1_score(all_labels, all_preds, average='weighted') epoch_auc = roc_auc_score(all_labels, all_probs) return epoch_loss, epoch_f1, epoch_auc, all_preds, all_labels def main(): BATCH_SIZE = 64 LEARNING_RATE = 0.001 NUM_EPOCHS = 100 PATIENCE = 15 print("Loading datasets...") train_dataset = EpilepsyDataset(r'train/path') val_dataset = EpilepsyDataset(r'val/path') test_dataset = EpilepsyDataset(r'test/path') train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0) val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0) print(f"Train samples: {len(train_dataset)}") print(f"Val samples: {len(val_dataset)}") print(f"Test samples: {len(test_dataset)}") model = AdvancedEpilepsyDetector(input_dim=178, num_classes=2, dropout=0.3).to(device) total_params = sum(p.numel() for p in model.parameters()) trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f"\nTotal parameters: {total_params:,}") print(f"Trainable parameters: {trainable_params:,}") criterion = FocalLoss(alpha=0.25, gamma=2) optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) best_val_f1 = 0.0 patience_counter = 0 train_losses, val_losses = [], [] train_f1s, val_f1s = [], [] print("\nStarting training...\n") for epoch in range(NUM_EPOCHS): print(f"Epoch {epoch+1}/{NUM_EPOCHS}") print("-" * 50) train_loss, train_f1 = train_epoch(model, train_loader, criterion, optimizer, device) val_loss, val_f1, val_auc, _, _ = validate(model, val_loader, criterion, device) scheduler.step(val_loss) train_losses.append(train_loss) val_losses.append(val_loss) train_f1s.append(train_f1) val_f1s.append(val_f1) print(f"Train Loss: {train_loss:.4f} | Train F1: {train_f1:.4f}") print(f"Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f} | Val AUC: {val_auc:.4f}\n") if val_f1 > best_val_f1: best_val_f1 = val_f1 patience_counter = 0 torch.save({ 'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'val_f1': val_f1, 'val_auc': val_auc }, r'best_epilepsy_model.pth') print(f"[SAVED] Model saved with Val F1: {val_f1:.4f}\n") else: patience_counter += 1 if patience_counter >= PATIENCE: print(f"\nEarly stopping triggered after {epoch+1} epochs") break print("\nLoading best model for testing...") checkpoint = torch.load(r'best_epilepsy_model.pth') model.load_state_dict(checkpoint['model_state_dict']) print("\nEvaluating on test set...") test_loss, test_f1, test_auc, test_preds, test_labels = validate(model, test_loader, criterion, device) print(f"\nTest Results:") print(f"Test Loss: {test_loss:.4f}") print(f"Test F1: {test_f1:.4f}") print(f"Test AUC: {test_auc:.4f}") print("\nClassification Report:") print(classification_report(test_labels, test_preds, target_names=['Non-Seizure', 'Seizure'])) # Confusion matrix cm = confusion_matrix(test_labels, test_preds) plt.figure(figsize=(10, 8)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Non-Seizure', 'Seizure'], yticklabels=['Non-Seizure', 'Seizure']) plt.title('Confusion Matrix - Epilepsy Detection') plt.ylabel('True Label') plt.xlabel('Predicted Label') plt.savefig(r'confusion_matrix.png', dpi=300, bbox_inches='tight') plt.close() plt.figure(figsize=(15, 5)) plt.subplot(1, 2, 1) plt.plot(train_losses, label='Train Loss') plt.plot(val_losses, label='Val Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.title('Training and Validation Loss') plt.legend() plt.grid(True) plt.subplot(1, 2, 2) plt.plot(train_f1s, label='Train F1') plt.plot(val_f1s, label='Val F1') plt.xlabel('Epoch') plt.ylabel('F1 Score') plt.title('Training and Validation F1 Score') plt.legend() plt.grid(True) plt.savefig(r'training_curves.png', dpi=300, bbox_inches='tight') plt.close() print("\nTraining completed! Results saved.") if __name__ == "__main__": main()