-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrain.py
More file actions
112 lines (96 loc) · 3.52 KB
/
train.py
File metadata and controls
112 lines (96 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
import warnings
import argparse
import os
warnings.filterwarnings("ignore")
from utils import set_seed, get_device, save_checkpoint, get_model_parameters, evaluate_model, get_model, get_datasets, get_checkpoint_path
from engine.trainer import Trainer
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--task", type=str, required=True, choices=["kws", "ecg", "geometric"])
p.add_argument("--model", type=str, default="m5", choices=["m5", "m3", "f2", "f4"])
p.add_argument("--n_channel", type=int, default=32)
p.add_argument("--data_dir", type=str, default="./data", help="Root directory for data")
p.add_argument("--batch_size", type=int, default=32)
p.add_argument("--epochs", type=int, default=50)
p.add_argument("--lr", type=float, default=1e-4)
p.add_argument("--weight_decay", type=float, default=1e-5)
p.add_argument("--num_workers", type=int, default=os.cpu_count())
p.add_argument("--seed", type=int, default=42)
p.add_argument("--checkpoint_dir", type=str, default="./checkpoints/")
args = p.parse_args()
os.makedirs(args.checkpoint_dir, exist_ok=True)
os.makedirs(args.data_dir, exist_ok=True)
return args
def main():
args = parse_args()
set_seed(args.seed)
checkpoint_path = get_checkpoint_path(args)
if os.path.exists(checkpoint_path):
print(f"Checkpoint found at {checkpoint_path}. Done.")
return
device = get_device()
print(f"Using {device=}")
train_ds, val_ds, test_ds, label_mapping, num_classes = get_datasets(args)
# Dataloaders
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
shuffle=True,
num_workers=args.num_workers,
pin_memory=True,
drop_last=False,
)
val_loader = DataLoader(
val_ds,
batch_size=args.batch_size,
shuffle=False,
num_workers=args.num_workers,
pin_memory=True,
drop_last=False,
)
test_loader = DataLoader(
test_ds,
batch_size=args.batch_size,
shuffle=False,
num_workers=args.num_workers,
pin_memory=True,
drop_last=False,
)
print(f'Dataloaders: {len(train_ds)=} {len(val_ds)=} {len(test_ds)=}')
# Model
model = get_model(args, num_classes)
print(model)
print(f"Model parameters: {get_model_parameters(model)}")
model.to(device)
# Optimizer and criterion
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay)
# Add gradient clipping for sigmoid/tanh activations
trainer = Trainer(
model=model,
optimizer=optimizer,
criterion=criterion,
device=device,
scheduler=None,
use_amp=True,
max_grad_norm=5
)
print(f"Starting {args.task} training...")
trainer.fit(train_loader, val_loader, epochs=args.epochs, log_interval=1)
# Final test evaluation (same for both tasks)
print("Evaluating on test set...")
test_acc = evaluate_model(model, test_loader, device)
print(f"Test accuracy: {test_acc:.4f}")
# Save checkpoint (same for both tasks)
checkpoint_data = {
"model_state": model.state_dict(),
"args": vars(args),
"label_to_index": label_mapping,
}
save_checkpoint(checkpoint_path, checkpoint_data)
print(f"Saved checkpoint to {checkpoint_path}\n")
if __name__ == "__main__":
main()