From 3fbf7cc69aef1af2a3e7595ad234453804bcdaf1 Mon Sep 17 00:00:00 2001 From: ritaw Date: Thu, 9 Sep 2021 21:44:47 +0800 Subject: [PATCH 1/5] update default parameter for async examples --- benchmark/synthetic_benchmark.py | 11 +++++++++-- squad/main.py | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/benchmark/synthetic_benchmark.py b/benchmark/synthetic_benchmark.py index 3358b15..a035e1f 100644 --- a/benchmark/synthetic_benchmark.py +++ b/benchmark/synthetic_benchmark.py @@ -64,10 +64,16 @@ ) parser.add_argument( "--async-sync-interval", - default=50, + default=500, type=int, help="Model synchronization interval(ms) for async algorithm", ) +parser.add_argument( + "--async-warmup-steps", + default=0, + type=int, + help="Warmup(allreduce) steps for async algorithm", +) parser.add_argument( "--amp", action="store_true", @@ -137,7 +143,8 @@ from bagua.torch_api.algorithms import async_model_average algorithm = async_model_average.AsyncModelAverageAlgorithm( - sync_interval_ms=args.async_sync_interval + sync_interval_ms=args.async_sync_interval, + warmup_steps=args.async_warmup_steps, ) else: raise NotImplementedError diff --git a/squad/main.py b/squad/main.py index 3cd0bca..76a30c8 100644 --- a/squad/main.py +++ b/squad/main.py @@ -164,7 +164,8 @@ def train(args, train_dataset, model, tokenizer): from bagua.torch_api.algorithms import async_model_average algorithm = async_model_average.AsyncModelAverageAlgorithm( - sync_interval_ms=args.async_sync_interval + sync_interval_ms=args.async_sync_interval, + warmup_steps=args.async_warmup_steps ) else: raise NotImplementedError @@ -309,7 +310,7 @@ def train(args, train_dataset, model, tokenizer): outputs = model(**inputs) # model outputs are always tuple in transformers (see doc) loss = outputs[0] - + if args.n_gpu > 1: loss = ( loss.mean() @@ -919,6 +920,12 @@ def main(): type=int, help="Model synchronization interval(ms) for async algorithm", ) + parser.add_argument( + "--async-warmup-steps", + default=100, + type=int, + help="Warmup(allreduce) steps for async algorithm", + ) args = parser.parse_args() if args.doc_stride >= args.max_seq_length - args.max_query_length: From ba31b1952e2b7d3c739be2feab7092624db27e98 Mon Sep 17 00:00:00 2001 From: ritaw Date: Thu, 9 Sep 2021 22:01:46 +0800 Subject: [PATCH 2/5] update imagenet example --- imagenet/main.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/imagenet/main.py b/imagenet/main.py index 5cf6adc..67535a0 100644 --- a/imagenet/main.py +++ b/imagenet/main.py @@ -149,11 +149,18 @@ parser.add_argument( "--async-sync-interval", - default=100, + default=500, type=int, help="Model synchronization interval(ms) for async algorithm", ) +parser.add_argument( + "--async-warmup-steps", + default=100, + type=int, + help="Warmup(allreduce) steps for async algorithm", +) + best_acc1 = 0 @@ -238,7 +245,8 @@ def main_worker(args): from bagua.torch_api.algorithms import async_model_average algorithm = async_model_average.AsyncModelAverageAlgorithm( - sync_interval_ms=args.async_sync_interval + sync_interval_ms=args.async_sync_interval, + warmup_steps=args.async_warmup_steps, ) else: raise NotImplementedError @@ -439,6 +447,9 @@ def train(train_loader, model, criterion, optimizer, scaler, epoch, args): if args.prof >= 0 and i == args.prof + 10: print("Profiling ended at iteration {}".format(i)) torch.cuda.cudart().cudaProfilerStop() + + if args.algorithm == "async": + model.bagua_algorithm.abort(model, grace_period_seconds=0) quit() From 1fc334ef12dfcae1141400cf52a5168cbf345c31 Mon Sep 17 00:00:00 2001 From: ritaw Date: Tue, 14 Sep 2021 10:06:30 +0000 Subject: [PATCH 3/5] use new api --- benchmark/synthetic_benchmark.py | 6 ++++-- imagenet/main.py | 23 +++++++++++++++++++---- mnist/main.py | 13 ++++++++++--- squad/main.py | 7 +++---- 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/benchmark/synthetic_benchmark.py b/benchmark/synthetic_benchmark.py index a035e1f..c145935 100644 --- a/benchmark/synthetic_benchmark.py +++ b/benchmark/synthetic_benchmark.py @@ -137,7 +137,9 @@ elif args.algorithm == "qadam": from bagua.torch_api.algorithms import q_adam - optimizer = q_adam.QAdamOptimizer(model.parameters(), lr=0.01 * bagua.get_world_size(), warmup_steps=100) + optimizer = q_adam.QAdamOptimizer( + model.parameters(), lr=0.01 * bagua.get_world_size(), warmup_steps=100 + ) algorithm = q_adam.QAdamAlgorithm(optimizer) elif args.algorithm == "async": from bagua.torch_api.algorithms import async_model_average @@ -221,4 +223,4 @@ def benchmark_step(): ) if args.algorithm == "async": - algorithm.abort(model) + algorithm.destroy() diff --git a/imagenet/main.py b/imagenet/main.py index 67535a0..fa574f6 100644 --- a/imagenet/main.py +++ b/imagenet/main.py @@ -239,7 +239,9 @@ def main_worker(args): elif args.algorithm == "qadam": from bagua.torch_api.algorithms import q_adam - optimizer = q_adam.QAdamOptimizer(model.parameters(), lr=args.lr, warmup_steps=100) + optimizer = q_adam.QAdamOptimizer( + model.parameters(), lr=args.lr, warmup_steps=100 + ) algorithm = q_adam.QAdamAlgorithm(optimizer) elif args.algorithm == "async": from bagua.torch_api.algorithms import async_model_average @@ -343,9 +345,15 @@ def main_worker(args): if args.distributed: train_sampler.set_epoch(epoch) + if args.algorithm == "async": + algorithm.resume() + # train for one epoch train(train_loader, model, criterion, optimizer, scaler, epoch, args) + if args.algorithm == "async": + algorithm.abort() + # evaluate on validation set acc1 = validate(val_loader, model, criterion, epoch, args) @@ -366,7 +374,7 @@ def main_worker(args): ) if args.algorithm == "async": - algorithm.abort(model) + algorithm.destroy() def train(train_loader, model, criterion, optimizer, scaler, epoch, args): @@ -423,10 +431,17 @@ def train(train_loader, model, criterion, optimizer, scaler, epoch, args): top5.update(acc5[0], images.size(0)) if args.prof >= 0: - torch.cuda.nvtx.range_push("optimizer.step()") + torch.cuda.nvtx.range_push("backward") # compute gradient and do SGD step scaler.scale(loss).backward() + + if args.prof >= 0: + torch.cuda.nvtx.range_pop() + + if args.prof >= 0: + torch.cuda.nvtx.range_push("optimizer.step()") + scaler.step(optimizer) scaler.update() @@ -449,7 +464,7 @@ def train(train_loader, model, criterion, optimizer, scaler, epoch, args): torch.cuda.cudart().cudaProfilerStop() if args.algorithm == "async": - model.bagua_algorithm.abort(model, grace_period_seconds=0) + model.bagua_algorithm.destroy() quit() diff --git a/mnist/main.py b/mnist/main.py index 8d3654c..5606d5d 100644 --- a/mnist/main.py +++ b/mnist/main.py @@ -232,7 +232,9 @@ def main(): elif args.algorithm == "qadam": from bagua.torch_api.algorithms import q_adam - optimizer = q_adam.QAdamOptimizer(model.parameters(), lr=args.lr, warmup_steps=100) + optimizer = q_adam.QAdamOptimizer( + model.parameters(), lr=args.lr, warmup_steps=100 + ) algorithm = q_adam.QAdamAlgorithm(optimizer) elif args.algorithm == "async": from bagua.torch_api.algorithms import async_model_average @@ -250,14 +252,19 @@ def main(): scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) for epoch in range(1, args.epochs + 1): + if args.algorithm == "async": + algorithm.resume() + train(args, model, train_loader, optimizer, epoch) + if args.algorithm == "async": + algorithm.abort() + test(model, test_loader) scheduler.step() if args.algorithm == "async": - algorithm.abort(model) - + algorithm.destroy() if args.save_model: torch.save(model.state_dict(), "mnist_cnn.pt") diff --git a/squad/main.py b/squad/main.py index 76a30c8..2594177 100644 --- a/squad/main.py +++ b/squad/main.py @@ -165,7 +165,7 @@ def train(args, train_dataset, model, tokenizer): algorithm = async_model_average.AsyncModelAverageAlgorithm( sync_interval_ms=args.async_sync_interval, - warmup_steps=args.async_warmup_steps + warmup_steps=args.async_warmup_steps, ) else: raise NotImplementedError @@ -310,7 +310,7 @@ def train(args, train_dataset, model, tokenizer): outputs = model(**inputs) # model outputs are always tuple in transformers (see doc) loss = outputs[0] - + if args.n_gpu > 1: loss = ( loss.mean() @@ -399,8 +399,7 @@ def train(args, train_dataset, model, tokenizer): tb_writer.close() if args.algorithm == "async": - algorithm.abort(model) - torch.cuda.synchronize() + algorithm.destroy() return global_step, tr_loss / global_step From c5ce36863e7bb07d9a8781e1bb02f135d02a625f Mon Sep 17 00:00:00 2001 From: ritaw Date: Wed, 15 Sep 2021 14:40:02 +0800 Subject: [PATCH 4/5] update api --- benchmark/synthetic_benchmark.py | 5 ++++- imagenet/main.py | 9 +++------ mnist/main.py | 8 +++----- squad/main.py | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/benchmark/synthetic_benchmark.py b/benchmark/synthetic_benchmark.py index c145935..93328ca 100644 --- a/benchmark/synthetic_benchmark.py +++ b/benchmark/synthetic_benchmark.py @@ -195,6 +195,9 @@ def benchmark_step(): # Warm-up logging.info("Running warmup...") +if args.algorithm == "async": + algorithm.resume(model) + timeit.timeit(benchmark_step, number=args.num_warmup_batches) # Benchmark @@ -223,4 +226,4 @@ def benchmark_step(): ) if args.algorithm == "async": - algorithm.destroy() + algorithm.abort(model) diff --git a/imagenet/main.py b/imagenet/main.py index fa574f6..705f399 100644 --- a/imagenet/main.py +++ b/imagenet/main.py @@ -346,13 +346,13 @@ def main_worker(args): train_sampler.set_epoch(epoch) if args.algorithm == "async": - algorithm.resume() + algorithm.resume(model) # train for one epoch train(train_loader, model, criterion, optimizer, scaler, epoch, args) if args.algorithm == "async": - algorithm.abort() + algorithm.abort(model) # evaluate on validation set acc1 = validate(val_loader, model, criterion, epoch, args) @@ -373,9 +373,6 @@ def main_worker(args): is_best, ) - if args.algorithm == "async": - algorithm.destroy() - def train(train_loader, model, criterion, optimizer, scaler, epoch, args): batch_time = AverageMeter("Time", ":6.3f") @@ -464,7 +461,7 @@ def train(train_loader, model, criterion, optimizer, scaler, epoch, args): torch.cuda.cudart().cudaProfilerStop() if args.algorithm == "async": - model.bagua_algorithm.destroy() + model.bagua_algorithm.abort(model) quit() diff --git a/mnist/main.py b/mnist/main.py index 5606d5d..1e75b48 100644 --- a/mnist/main.py +++ b/mnist/main.py @@ -240,7 +240,7 @@ def main(): from bagua.torch_api.algorithms import async_model_average algorithm = async_model_average.AsyncModelAverageAlgorithm( - sync_interval_ms=args.async_sync_interval + sync_interval_ms=args.async_sync_interval, ) else: raise NotImplementedError @@ -253,18 +253,16 @@ def main(): scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) for epoch in range(1, args.epochs + 1): if args.algorithm == "async": - algorithm.resume() + algorithm.resume(model) train(args, model, train_loader, optimizer, epoch) if args.algorithm == "async": - algorithm.abort() + algorithm.abort(model) test(model, test_loader) scheduler.step() - if args.algorithm == "async": - algorithm.destroy() if args.save_model: torch.save(model.state_dict(), "mnist_cnn.pt") diff --git a/squad/main.py b/squad/main.py index 2594177..9aff996 100644 --- a/squad/main.py +++ b/squad/main.py @@ -399,7 +399,7 @@ def train(args, train_dataset, model, tokenizer): tb_writer.close() if args.algorithm == "async": - algorithm.destroy() + algorithm.abort(model) return global_step, tr_loss / global_step From 73df7c9e3ce2c20417aff054260c850c0cfa2bef Mon Sep 17 00:00:00 2001 From: ritaw Date: Wed, 15 Sep 2021 14:42:25 +0800 Subject: [PATCH 5/5] . --- benchmark/synthetic_benchmark.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/benchmark/synthetic_benchmark.py b/benchmark/synthetic_benchmark.py index 93328ca..1f1fc21 100644 --- a/benchmark/synthetic_benchmark.py +++ b/benchmark/synthetic_benchmark.py @@ -195,8 +195,6 @@ def benchmark_step(): # Warm-up logging.info("Running warmup...") -if args.algorithm == "async": - algorithm.resume(model) timeit.timeit(benchmark_step, number=args.num_warmup_batches)