Skip to content
This repository was archived by the owner on Apr 8, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/azureml/pipelines/lightgbm_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def lightgbm_training_pipeline_function(benchmark_custom_properties, train_datas
training_params['header'] = variant_params.data.header
training_params['label_column'] = variant_params.data.label_column
training_params['group_column'] = variant_params.data.group_column
training_params['test_data_dist_mode'] = variant_params.data.test_data_dist_mode

# extract and construct "sweepable" params
if variant_params.sweep:
Expand Down
1 change: 1 addition & 0 deletions src/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class lightgbm_training_data_variant_parameters:
header: bool = False
label_column: Optional[str] = "0"
group_column: Optional[str] = None
test_data_dist_mode: str = "n_train_1_test"
construct: bool = True

@dataclass
Expand Down
8 changes: 8 additions & 0 deletions src/scripts/training/lightgbm_python/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ inputs:
type: String
optional: true
description: "specify group/query column, default '', see https://lightgbm.readthedocs.io/en/latest/Parameters.html#group_column"
test_data_dist_mode:
type: Enum
default: "n_train_1_test"
enum:
- n_train_n_test
- n_train_1_test
description: "either share all test files accross all nodes (n_train_1_test) or split test files per node (n_train_n_test)"

# Learning Parameters
objective:
Expand Down Expand Up @@ -148,6 +155,7 @@ launcher:
--header {inputs.header}
[--label_column {inputs.label_column}]
[--group_column {inputs.group_column}]
--test_data_dist_mode {inputs.test_data_dist_mode}
--device_type {inputs.device_type}
--objective {inputs.objective}
--metric {inputs.metric}
Expand Down
57 changes: 37 additions & 20 deletions src/scripts/training/lightgbm_python/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ def get_arg_parser(cls, parser=None):
group_i.add_argument("--header", required=False, default=False, type=strtobool)
group_i.add_argument("--label_column", required=False, default="0", type=str)
group_i.add_argument("--group_column", required=False, default=None, type=str)
group_i.add_argument(
"--test_data_dist_mode",
required=False,
choices=['n_train_n_test', 'n_train_1_test'],
default='n_train_1_test',
type=str,
help="either share all test files accross all nodes (n_train_1_test) or split test files per node (n_train_n_test)"
)

group_o = parser.add_argument_group("Outputs")
group_o.add_argument("--export_model",
Expand Down Expand Up @@ -156,35 +164,37 @@ def load_lgbm_params_from_cli(self, args, mpi_config):
return lgbm_params


def assign_train_data(self, args, mpi_config):
""" Identifies which training file to load on this node.
def assign_distributed_data(self, args, path, mpi_config, category):
""" Identifies which train/test file to load on this node.
Checks for consistency between number of files and mpi config.
Args:
args (argparse.Namespace)
path (str): where to find the files
mpi_config (namedtuple): as returned from detect_mpi_config()
category (str): name of that data (for logging)

Returns:
str: path to the data file for this node
"""
train_file_paths = get_all_files(args.train)
file_paths = get_all_files(path)

if mpi_config.mpi_available:
# depending on mode, we'll require different number of training files
if args.tree_learner == "data" or args.tree_learner == "voting":
if len(train_file_paths) == mpi_config.world_size:
train_data = train_file_paths[mpi_config.world_rank]
if len(file_paths) == mpi_config.world_size:
dist_data = file_paths[mpi_config.world_rank]
else:
raise Exception(f"To use MPI with tree_learner={args.tree_learner} and node count {mpi_config.world_rank}, you need to partition the input data into {mpi_config.world_rank} files (currently found {len(train_file_paths)})")
raise Exception(f"To use MPI with tree_learner={args.tree_learner} and node count {mpi_config.world_rank}, you need to partition the input {category} data into {mpi_config.world_rank} files (currently found {len(train_file_paths)})")
elif args.tree_learner == "feature":
if len(train_file_paths) == 1:
train_data = train_file_paths[0]
if len(file_paths) == 1:
dist_data = file_paths[0]
else:
raise Exception(f"To use MPI with tree_learner=parallel you need to provide only 1 input file, but {len(train_file_paths)} were found")
raise Exception(f"To use MPI with tree_learner=parallel you need to provide only 1 input {category} file, but {len(file_paths)} were found")
elif args.tree_learner == "serial":
if len(train_file_paths) == 1:
train_data = train_file_paths[0]
if len(file_paths) == 1:
dist_data = file_paths[0]
else:
raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(train_file_paths)} were found")
raise Exception(f"To use single node training, you need to provide only 1 input {category} file, but {len(file_paths)} were found")
else:
NotImplementedError(f"tree_learner mode {args.tree_learner} does not exist or is not implemented.")

Expand All @@ -194,11 +204,11 @@ def assign_train_data(self, args, mpi_config):
logging.getLogger().warning(f"Using tree_learner={args.tree_learner} on single node does not make sense, switching back to tree_learner=serial")
args.tree_learner = "serial"

if len(train_file_paths) == 1:
train_data = train_file_paths[0]
if len(file_paths) == 1:
dist_data = file_paths[0]
else:
raise Exception(f"To use single node training, you need to provide only 1 input file, but {len(train_file_paths)} were found")
return train_data
raise Exception(f"To use single node training, you need to provide only 1 input {category} file, but {len(file_paths)} were found")
return dist_data


def run(self, args, logger, metrics_logger, unknown_args):
Expand Down Expand Up @@ -232,10 +242,17 @@ def run(self, args, logger, metrics_logger, unknown_args):
logger.info(f"Loading data for training")
with metrics_logger.log_time_block("time_data_loading"):
# obtain the path to the train data for this node
train_data_path = self.assign_train_data(args, self.mpi_config)
test_data_paths = get_all_files(args.test)

logger.info(f"Running with 1 train file and {len(test_data_paths)} test files.")
train_data_path = self.assign_distributed_data(args, args.train, self.mpi_config, "train")
if args.test_data_dist_mode == 'n_train_n_test':
# in this mode, test data is split accross nodes (for bin test files)
test_data_paths = self.assign_distributed_data(args, args.test, self.mpi_config, "test")
logger.info(f"Running with 1 train file and 1 test files on node {self.mpi_config.world_rank}.")
elif args.test_data_dist_mode == 'n_train_1_test':
# in this mode, test data is shared accross nodes
test_data_paths = get_all_files(args.test)
logger.info(f"Running with 1 train file and {len(test_data_paths)} test files on node {self.mpi_config.world_rank}.")
else:
raise NotImplementedError(f"--test_data_dist_mode {args.test_data_dist_mode} is not implemented yet")

# construct datasets
if args.construct:
Expand Down