|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import ast
|
|
|
import torch
|
|
|
import torch.nn as nn
|
|
|
import torch.nn.functional as F
|
|
|
from torch.utils.data import Dataset, DataLoader
|
|
|
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score
|
|
|
import matplotlib.pyplot as plt
|
|
|
import seaborn as sns
|
|
|
from tqdm import tqdm
|
|
|
import warnings
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
print(f"Using device: {device}")
|
|
|
|
|
|
class EpilepsyDataset(Dataset):
|
|
|
def __init__(self, csv_path):
|
|
|
self.data = pd.read_csv(csv_path)
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.data)
|
|
|
|
|
|
def __getitem__(self, idx):
|
|
|
|
|
|
data_list = ast.literal_eval(self.data.iloc[idx]['data'])
|
|
|
data_tensor = torch.FloatTensor(data_list)
|
|
|
label = torch.LongTensor([self.data.iloc[idx]['label']])[0]
|
|
|
return data_tensor, label
|
|
|
|
|
|
class MultiHeadAttention(nn.Module):
|
|
|
def __init__(self, d_model, num_heads, dropout=0.1):
|
|
|
super().__init__()
|
|
|
assert d_model % num_heads == 0
|
|
|
|
|
|
self.d_model = d_model
|
|
|
self.num_heads = num_heads
|
|
|
self.d_k = d_model // num_heads
|
|
|
|
|
|
self.W_q = nn.Linear(d_model, d_model)
|
|
|
self.W_k = nn.Linear(d_model, d_model)
|
|
|
self.W_v = nn.Linear(d_model, d_model)
|
|
|
self.W_o = nn.Linear(d_model, d_model)
|
|
|
|
|
|
self.dropout = nn.Dropout(dropout)
|
|
|
|
|
|
def forward(self, x):
|
|
|
batch_size = x.size(0)
|
|
|
|
|
|
Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
|
|
|
K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
|
|
|
V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
|
|
|
|
|
|
scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
|
|
|
attn_weights = F.softmax(scores, dim=-1)
|
|
|
attn_weights = self.dropout(attn_weights)
|
|
|
|
|
|
context = torch.matmul(attn_weights, V)
|
|
|
context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
|
|
|
|
|
|
output = self.W_o(context)
|
|
|
return output, attn_weights
|
|
|
|
|
|
class TemporalConvBlock(nn.Module):
|
|
|
def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):
|
|
|
super().__init__()
|
|
|
padding = (kernel_size - 1) * dilation // 2
|
|
|
|
|
|
self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size,
|
|
|
padding=padding, dilation=dilation)
|
|
|
self.bn1 = nn.BatchNorm1d(out_channels)
|
|
|
self.relu1 = nn.ReLU()
|
|
|
self.dropout1 = nn.Dropout(dropout)
|
|
|
|
|
|
self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size,
|
|
|
padding=padding, dilation=dilation)
|
|
|
self.bn2 = nn.BatchNorm1d(out_channels)
|
|
|
self.relu2 = nn.ReLU()
|
|
|
self.dropout2 = nn.Dropout(dropout)
|
|
|
|
|
|
self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None
|
|
|
|
|
|
def forward(self, x):
|
|
|
residual = x if self.downsample is None else self.downsample(x)
|
|
|
|
|
|
out = self.conv1(x)
|
|
|
out = self.bn1(out)
|
|
|
out = self.relu1(out)
|
|
|
out = self.dropout1(out)
|
|
|
|
|
|
out = self.conv2(out)
|
|
|
out = self.bn2(out)
|
|
|
|
|
|
out = out + residual
|
|
|
out = self.relu2(out)
|
|
|
out = self.dropout2(out)
|
|
|
|
|
|
return out
|
|
|
|
|
|
class ChannelAttention(nn.Module):
|
|
|
def __init__(self, channels, reduction=8):
|
|
|
super().__init__()
|
|
|
self.avg_pool = nn.AdaptiveAvgPool1d(1)
|
|
|
self.max_pool = nn.AdaptiveMaxPool1d(1)
|
|
|
|
|
|
self.fc = nn.Sequential(
|
|
|
nn.Linear(channels, channels // reduction, bias=False),
|
|
|
nn.ReLU(),
|
|
|
nn.Linear(channels // reduction, channels, bias=False),
|
|
|
nn.Sigmoid()
|
|
|
)
|
|
|
|
|
|
def forward(self, x):
|
|
|
b, c, _ = x.size()
|
|
|
|
|
|
avg_out = self.fc(self.avg_pool(x).view(b, c))
|
|
|
max_out = self.fc(self.max_pool(x).view(b, c))
|
|
|
|
|
|
out = avg_out + max_out
|
|
|
return x * out.view(b, c, 1)
|
|
|
|
|
|
class AdvancedEpilepsyDetector(nn.Module):
|
|
|
def __init__(self, input_dim=178, num_classes=2, dropout=0.3):
|
|
|
super().__init__()
|
|
|
|
|
|
self.input_proj = nn.Linear(input_dim, 256)
|
|
|
self.input_bn = nn.BatchNorm1d(256)
|
|
|
|
|
|
self.tcn_blocks = nn.ModuleList([
|
|
|
TemporalConvBlock(1, 64, kernel_size=7, dilation=1, dropout=dropout),
|
|
|
TemporalConvBlock(64, 128, kernel_size=5, dilation=2, dropout=dropout),
|
|
|
TemporalConvBlock(128, 256, kernel_size=3, dilation=4, dropout=dropout),
|
|
|
TemporalConvBlock(256, 256, kernel_size=3, dilation=8, dropout=dropout),
|
|
|
])
|
|
|
|
|
|
self.channel_attn = ChannelAttention(256)
|
|
|
|
|
|
self.mha1 = MultiHeadAttention(256, num_heads=8, dropout=dropout)
|
|
|
self.mha2 = MultiHeadAttention(256, num_heads=8, dropout=dropout)
|
|
|
|
|
|
self.layer_norm1 = nn.LayerNorm(256)
|
|
|
self.layer_norm2 = nn.LayerNorm(256)
|
|
|
|
|
|
self.ffn = nn.Sequential(
|
|
|
nn.Linear(256, 512),
|
|
|
nn.ReLU(),
|
|
|
nn.Dropout(dropout),
|
|
|
nn.Linear(512, 256),
|
|
|
nn.Dropout(dropout)
|
|
|
)
|
|
|
|
|
|
self.bilstm = nn.LSTM(256, 128, num_layers=2, batch_first=True,
|
|
|
bidirectional=True, dropout=dropout)
|
|
|
|
|
|
self.classifier = nn.Sequential(
|
|
|
nn.Linear(256 + 256, 512),
|
|
|
nn.BatchNorm1d(512),
|
|
|
nn.ReLU(),
|
|
|
nn.Dropout(dropout),
|
|
|
|
|
|
nn.Linear(512, 256),
|
|
|
nn.BatchNorm1d(256),
|
|
|
nn.ReLU(),
|
|
|
nn.Dropout(dropout),
|
|
|
|
|
|
nn.Linear(256, 128),
|
|
|
nn.BatchNorm1d(128),
|
|
|
nn.ReLU(),
|
|
|
nn.Dropout(dropout),
|
|
|
|
|
|
nn.Linear(128, num_classes)
|
|
|
)
|
|
|
|
|
|
def forward(self, x):
|
|
|
batch_size = x.size(0)
|
|
|
|
|
|
x_proj = self.input_proj(x)
|
|
|
x_proj = self.input_bn(x_proj)
|
|
|
x_proj = F.relu(x_proj)
|
|
|
|
|
|
x_tcn = x_proj.unsqueeze(1)
|
|
|
for tcn_block in self.tcn_blocks:
|
|
|
x_tcn = tcn_block(x_tcn)
|
|
|
|
|
|
x_tcn = self.channel_attn(x_tcn)
|
|
|
x_tcn = x_tcn.squeeze(1) if x_tcn.dim() == 3 and x_tcn.size(1) == 1 else x_tcn.mean(dim=-1)
|
|
|
|
|
|
x_trans = x_proj.unsqueeze(1)
|
|
|
|
|
|
attn_out1, _ = self.mha1(x_trans)
|
|
|
x_trans = self.layer_norm1(x_trans + attn_out1)
|
|
|
|
|
|
attn_out2, _ = self.mha2(x_trans)
|
|
|
x_trans = self.layer_norm2(x_trans + attn_out2)
|
|
|
|
|
|
ffn_out = self.ffn(x_trans)
|
|
|
x_trans = x_trans + ffn_out
|
|
|
|
|
|
lstm_out, _ = self.bilstm(x_trans)
|
|
|
lstm_out = lstm_out[:, -1, :]
|
|
|
|
|
|
combined = torch.cat([x_tcn, lstm_out], dim=1)
|
|
|
|
|
|
output = self.classifier(combined)
|
|
|
|
|
|
return output
|
|
|
|
|
|
class FocalLoss(nn.Module):
|
|
|
def __init__(self, alpha=0.25, gamma=2):
|
|
|
super().__init__()
|
|
|
self.alpha = alpha
|
|
|
self.gamma = gamma
|
|
|
|
|
|
def forward(self, inputs, targets):
|
|
|
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
|
|
|
pt = torch.exp(-ce_loss)
|
|
|
focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss
|
|
|
return focal_loss.mean()
|
|
|
|
|
|
def train_epoch(model, dataloader, criterion, optimizer, device):
|
|
|
model.train()
|
|
|
running_loss = 0.0
|
|
|
all_preds = []
|
|
|
all_labels = []
|
|
|
|
|
|
pbar = tqdm(dataloader, desc='Training')
|
|
|
for data, labels in pbar:
|
|
|
data, labels = data.to(device), labels.to(device)
|
|
|
|
|
|
optimizer.zero_grad()
|
|
|
outputs = model(data)
|
|
|
loss = criterion(outputs, labels)
|
|
|
loss.backward()
|
|
|
|
|
|
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
|
|
|
|
|
|
optimizer.step()
|
|
|
|
|
|
running_loss += loss.item()
|
|
|
_, preds = torch.max(outputs, 1)
|
|
|
all_preds.extend(preds.cpu().numpy())
|
|
|
all_labels.extend(labels.cpu().numpy())
|
|
|
|
|
|
pbar.set_postfix({'loss': loss.item()})
|
|
|
|
|
|
epoch_loss = running_loss / len(dataloader)
|
|
|
epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
|
|
|
|
|
|
return epoch_loss, epoch_f1
|
|
|
|
|
|
def validate(model, dataloader, criterion, device):
|
|
|
model.eval()
|
|
|
running_loss = 0.0
|
|
|
all_preds = []
|
|
|
all_labels = []
|
|
|
all_probs = []
|
|
|
|
|
|
with torch.no_grad():
|
|
|
for data, labels in tqdm(dataloader, desc='Validation'):
|
|
|
data, labels = data.to(device), labels.to(device)
|
|
|
|
|
|
outputs = model(data)
|
|
|
loss = criterion(outputs, labels)
|
|
|
|
|
|
running_loss += loss.item()
|
|
|
probs = F.softmax(outputs, dim=1)
|
|
|
_, preds = torch.max(outputs, 1)
|
|
|
|
|
|
all_preds.extend(preds.cpu().numpy())
|
|
|
all_labels.extend(labels.cpu().numpy())
|
|
|
all_probs.extend(probs.cpu().numpy()[:, 1])
|
|
|
|
|
|
epoch_loss = running_loss / len(dataloader)
|
|
|
epoch_f1 = f1_score(all_labels, all_preds, average='weighted')
|
|
|
epoch_auc = roc_auc_score(all_labels, all_probs)
|
|
|
|
|
|
return epoch_loss, epoch_f1, epoch_auc, all_preds, all_labels
|
|
|
|
|
|
def main():
|
|
|
BATCH_SIZE = 64
|
|
|
LEARNING_RATE = 0.001
|
|
|
NUM_EPOCHS = 100
|
|
|
PATIENCE = 15
|
|
|
|
|
|
print("Loading datasets...")
|
|
|
train_dataset = EpilepsyDataset(r'train/path')
|
|
|
val_dataset = EpilepsyDataset(r'val/path')
|
|
|
test_dataset = EpilepsyDataset(r'test/path')
|
|
|
|
|
|
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
|
|
|
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
|
|
|
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
|
|
|
|
|
|
print(f"Train samples: {len(train_dataset)}")
|
|
|
print(f"Val samples: {len(val_dataset)}")
|
|
|
print(f"Test samples: {len(test_dataset)}")
|
|
|
|
|
|
model = AdvancedEpilepsyDetector(input_dim=178, num_classes=2, dropout=0.3).to(device)
|
|
|
|
|
|
total_params = sum(p.numel() for p in model.parameters())
|
|
|
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
|
|
|
print(f"\nTotal parameters: {total_params:,}")
|
|
|
print(f"Trainable parameters: {trainable_params:,}")
|
|
|
|
|
|
criterion = FocalLoss(alpha=0.25, gamma=2)
|
|
|
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
|
|
|
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5,
|
|
|
patience=5, verbose=True)
|
|
|
|
|
|
best_val_f1 = 0.0
|
|
|
patience_counter = 0
|
|
|
train_losses, val_losses = [], []
|
|
|
train_f1s, val_f1s = [], []
|
|
|
|
|
|
print("\nStarting training...\n")
|
|
|
|
|
|
for epoch in range(NUM_EPOCHS):
|
|
|
print(f"Epoch {epoch+1}/{NUM_EPOCHS}")
|
|
|
print("-" * 50)
|
|
|
|
|
|
train_loss, train_f1 = train_epoch(model, train_loader, criterion, optimizer, device)
|
|
|
|
|
|
val_loss, val_f1, val_auc, _, _ = validate(model, val_loader, criterion, device)
|
|
|
|
|
|
scheduler.step(val_loss)
|
|
|
|
|
|
train_losses.append(train_loss)
|
|
|
val_losses.append(val_loss)
|
|
|
train_f1s.append(train_f1)
|
|
|
val_f1s.append(val_f1)
|
|
|
|
|
|
print(f"Train Loss: {train_loss:.4f} | Train F1: {train_f1:.4f}")
|
|
|
print(f"Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f} | Val AUC: {val_auc:.4f}\n")
|
|
|
|
|
|
if val_f1 > best_val_f1:
|
|
|
best_val_f1 = val_f1
|
|
|
patience_counter = 0
|
|
|
torch.save({
|
|
|
'epoch': epoch,
|
|
|
'model_state_dict': model.state_dict(),
|
|
|
'optimizer_state_dict': optimizer.state_dict(),
|
|
|
'val_f1': val_f1,
|
|
|
'val_auc': val_auc
|
|
|
}, r'best_epilepsy_model.pth')
|
|
|
print(f"[SAVED] Model saved with Val F1: {val_f1:.4f}\n")
|
|
|
else:
|
|
|
patience_counter += 1
|
|
|
if patience_counter >= PATIENCE:
|
|
|
print(f"\nEarly stopping triggered after {epoch+1} epochs")
|
|
|
break
|
|
|
|
|
|
print("\nLoading best model for testing...")
|
|
|
checkpoint = torch.load(r'best_epilepsy_model.pth')
|
|
|
model.load_state_dict(checkpoint['model_state_dict'])
|
|
|
|
|
|
print("\nEvaluating on test set...")
|
|
|
test_loss, test_f1, test_auc, test_preds, test_labels = validate(model, test_loader, criterion, device)
|
|
|
|
|
|
print(f"\nTest Results:")
|
|
|
print(f"Test Loss: {test_loss:.4f}")
|
|
|
print(f"Test F1: {test_f1:.4f}")
|
|
|
print(f"Test AUC: {test_auc:.4f}")
|
|
|
|
|
|
print("\nClassification Report:")
|
|
|
print(classification_report(test_labels, test_preds, target_names=['Non-Seizure', 'Seizure']))
|
|
|
|
|
|
|
|
|
cm = confusion_matrix(test_labels, test_preds)
|
|
|
plt.figure(figsize=(10, 8))
|
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
|
|
xticklabels=['Non-Seizure', 'Seizure'],
|
|
|
yticklabels=['Non-Seizure', 'Seizure'])
|
|
|
plt.title('Confusion Matrix - Epilepsy Detection')
|
|
|
plt.ylabel('True Label')
|
|
|
plt.xlabel('Predicted Label')
|
|
|
plt.savefig(r'confusion_matrix.png', dpi=300, bbox_inches='tight')
|
|
|
plt.close()
|
|
|
|
|
|
plt.figure(figsize=(15, 5))
|
|
|
|
|
|
plt.subplot(1, 2, 1)
|
|
|
plt.plot(train_losses, label='Train Loss')
|
|
|
plt.plot(val_losses, label='Val Loss')
|
|
|
plt.xlabel('Epoch')
|
|
|
plt.ylabel('Loss')
|
|
|
plt.title('Training and Validation Loss')
|
|
|
plt.legend()
|
|
|
plt.grid(True)
|
|
|
|
|
|
plt.subplot(1, 2, 2)
|
|
|
plt.plot(train_f1s, label='Train F1')
|
|
|
plt.plot(val_f1s, label='Val F1')
|
|
|
plt.xlabel('Epoch')
|
|
|
plt.ylabel('F1 Score')
|
|
|
plt.title('Training and Validation F1 Score')
|
|
|
plt.legend()
|
|
|
plt.grid(True)
|
|
|
|
|
|
plt.savefig(r'training_curves.png', dpi=300, bbox_inches='tight')
|
|
|
plt.close()
|
|
|
|
|
|
print("\nTraining completed! Results saved.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|