From d3c0457cd520cb9b2007edab8f5f27c1cdd8db25 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 16:04:01 -0800 Subject: [PATCH 1/5] implement test distributed strategies --- .../training/lightgbm_python/spec.yaml | 8 +++ src/scripts/training/lightgbm_python/train.py | 56 ++++++++++++------- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/src/scripts/training/lightgbm_python/spec.yaml b/src/scripts/training/lightgbm_python/spec.yaml index 61b94f65..a3ee06ac 100644 --- a/src/scripts/training/lightgbm_python/spec.yaml +++ b/src/scripts/training/lightgbm_python/spec.yaml @@ -34,6 +34,13 @@ inputs: type: String optional: true description: "specify group/query column, default '', see https://lightgbm.readthedocs.io/en/latest/Parameters.html#group_column" + test_data_dist_mode: + type: Enum + default: "n_train_1_test" + enum: + - n_train_n_test + - n_train_1_test + description: "either share all test files accross all nodes (n_train_1_test) or split test files per node (n_train_n_test)" # Learning Parameters objective: @@ -148,6 +155,7 @@ launcher: --header {inputs.header} [--label_column {inputs.label_column}] [--group_column {inputs.group_column}] + --test_data_dist_mode {inputs.test_data_dist_mode} --device_type {inputs.device_type} --objective {inputs.objective} --metric {inputs.metric} diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 55c750d8..3be0353c 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -96,6 +96,14 @@ def get_arg_parser(cls, parser=None): group_i.add_argument("--header", required=False, default=False, type=strtobool) group_i.add_argument("--label_column", required=False, default="0", type=str) group_i.add_argument("--group_column", required=False, default=None, type=str) + group_i.add_argument( + "--test_data_dist_mode", + required=False, + choices=['n_train_n_test', 'n_train_1_test'], + default='n_train_1_test', + type=str, + help="either share all test files accross all nodes (n_train_1_test) or split test files per node (n_train_n_test)" + ) group_o = parser.add_argument_group("Outputs") group_o.add_argument("--export_model", @@ -114,7 +122,7 @@ def get_arg_parser(cls, parser=None): group_lgbm.add_argument("--learning_rate", required=True, type=float) group_lgbm.add_argument("--max_bin", required=True, type=int) group_lgbm.add_argument("--feature_fraction", required=True, type=float) - group_lgbm.add_argument("--device_type", required=True, type=str) + group_lgbm.add_argument("--device_type", required=False, type=str) group_lgbm.add_argument("--custom_params", required=False, type=str, default=None) return parser @@ -156,35 +164,36 @@ def load_lgbm_params_from_cli(self, args, mpi_config): return lgbm_params - def assign_train_data(self, args, mpi_config): - """ Identifies which training file to load on this node. + def assign_distributed_data(self, args, path, mpi_config, category): + """ Identifies which train/test file to load on this node. Checks for consistency between number of files and mpi config. Args: - args (argparse.Namespace) + path (str): where to find the files mpi_config (namedtuple): as returned from detect_mpi_config() + category (str): name of that data (for logging) Returns: str: path to the data file for this node """ - train_file_paths = get_all_files(args.train) + file_paths = get_all_files(path) if mpi_config.mpi_available: # depending on mode, we'll require different number of training files if args.tree_learner == "data" or args.tree_learner == "voting": - if len(train_file_paths) == mpi_config.world_size: - train_data = train_file_paths[mpi_config.world_rank] + if len(file_paths) == mpi_config.world_size: + dist_data = file_paths[mpi_config.world_rank] else: raise Exception(f"To use MPI with tree_learner={args.tree_learner} and node count {mpi_config.world_rank}, you need to partition the input data into {mpi_config.world_rank} files (currently found {len(train_file_paths)})") elif args.tree_learner == "feature": - if len(train_file_paths) == 1: - train_data = train_file_paths[0] + if len(file_paths) == 1: + dist_data = file_paths[0] else: raise Exception(f"To use MPI with tree_learner=parallel you need to provide only 1 input file, but {len(train_file_paths)} were found") elif args.tree_learner == "serial": - if len(train_file_paths) == 1: - train_data = train_file_paths[0] + if len(file_paths) == 1: + dist_data = file_paths[0] else: - raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(train_file_paths)} were found") + raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(file_paths)} were found") else: NotImplementedError(f"tree_learner mode {args.tree_learner} does not exist or is not implemented.") @@ -194,11 +203,11 @@ def assign_train_data(self, args, mpi_config): logging.getLogger().warning(f"Using tree_learner={args.tree_learner} on single node does not make sense, switching back to tree_learner=serial") args.tree_learner = "serial" - if len(train_file_paths) == 1: - train_data = train_file_paths[0] + if len(file_paths) == 1: + dist_data = file_paths[0] else: - raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(train_file_paths)} were found") - return train_data + raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(file_paths)} were found") + return dist_data def run(self, args, logger, metrics_logger, unknown_args): @@ -232,10 +241,17 @@ def run(self, args, logger, metrics_logger, unknown_args): logger.info(f"Loading data for training") with metrics_logger.log_time_block("time_data_loading"): # obtain the path to the train data for this node - train_data_path = self.assign_train_data(args, self.mpi_config) - test_data_paths = get_all_files(args.test) - - logger.info(f"Running with 1 train file and {len(test_data_paths)} test files.") + train_data_path = self.assign_distributed_data(args, args.train, self.mpi_config, "train") + if args.test_data_dist_mode == 'n_train_n_test': + # in this mode, test data is split accross nodes (for bin test files) + test_data_paths = self.assign_distributed_data(args, args.test, self.mpi_config, "test") + logger.info(f"Running with 1 train file and 1 test files on node {self.mpi_config.world_rank}.") + elif args.test_data_dist_mode == 'n_train_1_test': + # in this mode, test data is shared accross nodes + test_data_paths = get_all_files(args.test) + logger.info(f"Running with 1 train file and {len(test_data_paths)} test files on node {self.mpi_config.world_rank}.") + else: + raise NotImplementedError(f"--test_data_dist_mode {args.test_data_dist_mode} is not implemented yet") # construct datasets if args.construct: From 4d152115c2b8997f0b60111984c1b80858b04b45 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 16:06:38 -0800 Subject: [PATCH 2/5] fix logging --- src/scripts/training/lightgbm_python/train.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 3be0353c..a0899e31 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -183,17 +183,17 @@ def assign_distributed_data(self, args, path, mpi_config, category): if len(file_paths) == mpi_config.world_size: dist_data = file_paths[mpi_config.world_rank] else: - raise Exception(f"To use MPI with tree_learner={args.tree_learner} and node count {mpi_config.world_rank}, you need to partition the input data into {mpi_config.world_rank} files (currently found {len(train_file_paths)})") + raise Exception(f"To use MPI with tree_learner={args.tree_learner} and node count {mpi_config.world_rank}, you need to partition the input {category} data into {mpi_config.world_rank} files (currently found {len(train_file_paths)})") elif args.tree_learner == "feature": if len(file_paths) == 1: dist_data = file_paths[0] else: - raise Exception(f"To use MPI with tree_learner=parallel you need to provide only 1 input file, but {len(train_file_paths)} were found") + raise Exception(f"To use MPI with tree_learner=parallel you need to provide only 1 input {category} file, but {len(file_paths)} were found") elif args.tree_learner == "serial": if len(file_paths) == 1: dist_data = file_paths[0] else: - raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(file_paths)} were found") + raise Exception(f"To use single node training, you need to provide only 1 input {category} file, but {len(file_paths)} were found") else: NotImplementedError(f"tree_learner mode {args.tree_learner} does not exist or is not implemented.") @@ -206,7 +206,7 @@ def assign_distributed_data(self, args, path, mpi_config, category): if len(file_paths) == 1: dist_data = file_paths[0] else: - raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(file_paths)} were found") + raise Exception(f"To use single node training, you need to provide only 1 input {category} file, but {len(file_paths)} were found") return dist_data From 3eff4e22fc3784eeacacddfa566b78549e10f45d Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 16:08:23 -0800 Subject: [PATCH 3/5] rollback unrelated change --- src/scripts/training/lightgbm_python/train.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index a0899e31..28231614 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -122,7 +122,7 @@ def get_arg_parser(cls, parser=None): group_lgbm.add_argument("--learning_rate", required=True, type=float) group_lgbm.add_argument("--max_bin", required=True, type=int) group_lgbm.add_argument("--feature_fraction", required=True, type=float) - group_lgbm.add_argument("--device_type", required=False, type=str) + group_lgbm.add_argument("--device_type", required=True, type=str) group_lgbm.add_argument("--custom_params", required=False, type=str, default=None) return parser From 959ec2c4ef50df568b404d21043a86c119e36a41 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 16:09:01 -0800 Subject: [PATCH 4/5] rollback unrelated change --- src/scripts/training/lightgbm_python/train.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 28231614..32427430 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -168,6 +168,7 @@ def assign_distributed_data(self, args, path, mpi_config, category): """ Identifies which train/test file to load on this node. Checks for consistency between number of files and mpi config. Args: + args (argparse.Namespace) path (str): where to find the files mpi_config (namedtuple): as returned from detect_mpi_config() category (str): name of that data (for logging) From c01640830d325171b18d104ee08eb144da35fe76 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 16:28:26 -0800 Subject: [PATCH 5/5] add as parameter in pipeline --- pipelines/azureml/pipelines/lightgbm_training.py | 1 + src/common/tasks.py | 1 + 2 files changed, 2 insertions(+) diff --git a/pipelines/azureml/pipelines/lightgbm_training.py b/pipelines/azureml/pipelines/lightgbm_training.py index 92a37471..c295a384 100644 --- a/pipelines/azureml/pipelines/lightgbm_training.py +++ b/pipelines/azureml/pipelines/lightgbm_training.py @@ -220,6 +220,7 @@ def lightgbm_training_pipeline_function(benchmark_custom_properties, train_datas training_params['header'] = variant_params.data.header training_params['label_column'] = variant_params.data.label_column training_params['group_column'] = variant_params.data.group_column + training_params['test_data_dist_mode'] = variant_params.data.test_data_dist_mode # extract and construct "sweepable" params if variant_params.sweep: diff --git a/src/common/tasks.py b/src/common/tasks.py index 616f1172..bb712668 100644 --- a/src/common/tasks.py +++ b/src/common/tasks.py @@ -120,6 +120,7 @@ class lightgbm_training_data_variant_parameters: header: bool = False label_column: Optional[str] = "0" group_column: Optional[str] = None + test_data_dist_mode: str = "n_train_1_test" construct: bool = True @dataclass