From f53af9517d3c8bcb7f6a83d029702f7275abae62 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 11:21:49 -0800 Subject: [PATCH 1/8] working distributed lightgbm callback handler (tested on single node) --- src/common/lightgbm_utils.py | 102 ++++++++++++++++++ src/common/metrics.py | 2 +- src/scripts/training/lightgbm_python/train.py | 10 +- 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 88f2a9d0..5538bf00 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -6,6 +6,7 @@ """ import lightgbm import logging +from typing import List class LightGBMCallbackHandler(): """ This class handles LightGBM callbacks for recording metrics. """ @@ -34,3 +35,104 @@ def callback(self, env: lightgbm.callback.CallbackEnv) -> None: value=result, step=env.iteration # provide iteration as step in mlflow ) + +class LightGBMDistributedCallbackHandler(): + COMM_TAG_METRIC = 209834 # "random id" + + """ This class handles LightGBM callbacks for recording metrics. """ + def __init__(self, metrics_logger, mpi_comm, world_size=1, world_rank=0): + """ + Args: + metrics_logger (common.metrics.MetricsLogger): class to log metrics using MLFlow + """ + self.metrics_logger = metrics_logger + self.node_metrics = {} + self.distributed_metrics = {} + self.world_size = world_size + self.world_rank = world_rank + self.mpi_comm = mpi_comm + self.logger = logging.getLogger(__name__) + + def report_distributed_metric(self, env: lightgbm.callback.CallbackEnv): + """Sends metrics to node 0""" + self.mpi_comm.send(env, 0, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking + + def collect_distributed_metrics(self, iteration: int): + """Collect metrics from all nodes other than 0""" + for i in range(1, self.world_size): + remote_node_metrics = self.mpi_comm.recv(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking + if remote_node_metrics.iteration != iteration: + self.logger.warning(f"Remove node {i} sent metric for iteration {remote_node_metrics.iteration} while node 0 is at iteration {iteration}") + self.store_metric(i, remote_node_metrics) + + def store_distributed_metric(self, node: int, env: lightgbm.callback.CallbackEnv): + """Stores a metric in the internal storage + for processing during aggregate_and_report_loop()""" + iteration = env.iteration + if iteration not in self.distributed_metrics: + self.distributed_metrics[iteration] = {} + self.distributed_metrics[iteration][node] = env + + def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, results: List[float]): + # TODO: devise aggregation method per eval_name + self.metrics_logger.log_metric( + key=key, + value=sum(results), + step=iteration # provide iteration as step in mlflow + ) + + def aggregate_and_report_loop(self): + aggregation_tasks = {} + + for iteration in list(self.distributed_metrics.keys()): + if len(self.distributed_metrics[iteration]) < self.world_size: + continue + + # loop on all the evaluation results tuples + for node_id, node_metrics in self.distributed_metrics[iteration].items(): + for data_name, eval_name, result, _ in node_metrics.evaluation_result_list: + key = f"{data_name}.{eval_name}" + if key not in aggregation_tasks: + # record name of metric for aggregation method + aggregation_tasks[key] = (iteration, eval_name, []) + + # add value in the list + aggregation_tasks[key][2].append(result) + + # once done, remove the data from the "queue" + del self.distributed_metrics[iteration] + + for key, (iteration, eval_name, results) in aggregation_tasks.items(): + self.aggregate_and_report_task(key, iteration, eval_name, results) + + + def callback(self, env: lightgbm.callback.CallbackEnv) -> None: + """Callback method to collect metrics produced by LightGBM. + + See https://lightgbm.readthedocs.io/en/latest/_modules/lightgbm/callback.html + """ + # let's record in the object for future use + self.node_metrics[env.iteration] = env.evaluation_result_list + + # node 0 gets to report its metrics + if self.world_rank == 0: + # loop on all the evaluation results tuples + for data_name, eval_name, result, _ in env.evaluation_result_list: + # log each as a distinct metric + self.metrics_logger.log_metric( + key=f"node_0/{data_name}.{eval_name}", + value=result, + step=env.iteration # provide iteration as step in mlflow + ) + + # store own's metrics in the record + self.store_distributed_metric(self.world_rank, env) + + # plus collects everybody else's + self.collect_distributed_metrics(env.iteration) + + # and report aggregates + self.aggregate_and_report_loop() + else: + # all the other just report back to node 0 + self.report_distributed_metric(env) diff --git a/src/common/metrics.py b/src/common/metrics.py index f0993945..e0b6ed84 100644 --- a/src/common/metrics.py +++ b/src/common/metrics.py @@ -76,7 +76,7 @@ def log_metric(self, key, value, step=None): key = self._remove_non_allowed_chars(key) - self._logger.debug(f"mlflow[session={self._session_name}].log_metric({key},{value})") + self._logger.debug(f"mlflow[session={self._session_name}].log_metric({key},{value},step={step})") # NOTE: there's a limit to the name of a metric if len(key) > 50: key = key[:50] diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 55c750d8..446ebfa1 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -26,7 +26,7 @@ # useful imports from common from common.components import RunnableScript from common.io import get_all_files -from common.lightgbm_utils import LightGBMCallbackHandler +from common.lightgbm_utils import LightGBMDistributedCallbackHandler def detect_mpi_config(): """ Detects if we're running in MPI. @@ -66,7 +66,6 @@ def __init__(self): task = "train", framework = "lightgbm", framework_version = lightgbm.__version__, - metrics_prefix=f"node_{self.mpi_config.world_rank}/", do_not_log_properties=not(self.mpi_config.main_node) ) @@ -213,7 +212,12 @@ def run(self, args, logger, metrics_logger, unknown_args): # figure out the lgbm params from cli args + mpi config lgbm_params = self.load_lgbm_params_from_cli(args, self.mpi_config) - callbacks_handler = LightGBMCallbackHandler(metrics_logger = metrics_logger) + callbacks_handler = LightGBMDistributedCallbackHandler( + metrics_logger=metrics_logger, + mpi_comm = MPI.COMM_WORLD, + world_rank=self.mpi_config.world_rank, + world_size=self.mpi_config.world_size + ) # make sure the output argument exists if args.export_model and self.mpi_config.main_node: From c45eacfc284aff286c46ee9349455466ca07810f Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 13:43:15 -0800 Subject: [PATCH 2/8] failing in multi node, needs threads? --- src/common/lightgbm_utils.py | 23 +++++++++++++++---- src/scripts/training/lightgbm_python/train.py | 12 +++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 5538bf00..90cc1907 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -5,6 +5,7 @@ This classes provide help to integrate lightgbm """ import lightgbm +import numpy as np import logging from typing import List @@ -36,6 +37,7 @@ def callback(self, env: lightgbm.callback.CallbackEnv) -> None: step=env.iteration # provide iteration as step in mlflow ) + class LightGBMDistributedCallbackHandler(): COMM_TAG_METRIC = 209834 # "random id" @@ -55,15 +57,24 @@ def __init__(self, metrics_logger, mpi_comm, world_size=1, world_rank=0): def report_distributed_metric(self, env: lightgbm.callback.CallbackEnv): """Sends metrics to node 0""" - self.mpi_comm.send(env, 0, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking + self.logger.info(f"Reporting metric back to node 0: {env}") + self.mpi_comm.isend(env, 0, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # non-blocking def collect_distributed_metrics(self, iteration: int): """Collect metrics from all nodes other than 0""" for i in range(1, self.world_size): - remote_node_metrics = self.mpi_comm.recv(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking + self.logger.info(f"Probing metric from node {i}") + + if self.mpi_comm.probe(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC): + self.logger.info(f"Collecting metric from node {i}") + remote_node_metrics = self.mpi_comm.recv(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking + else: + self.logger.info(f"NO metric from node {i}") + continue + if remote_node_metrics.iteration != iteration: - self.logger.warning(f"Remove node {i} sent metric for iteration {remote_node_metrics.iteration} while node 0 is at iteration {iteration}") - self.store_metric(i, remote_node_metrics) + self.logger.warning(f"Remote node {i} sent metric for iteration {remote_node_metrics.iteration} while node 0 is at iteration {iteration}") + self.store_distributed_metric(i, remote_node_metrics) def store_distributed_metric(self, node: int, env: lightgbm.callback.CallbackEnv): """Stores a metric in the internal storage @@ -77,7 +88,7 @@ def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, re # TODO: devise aggregation method per eval_name self.metrics_logger.log_metric( key=key, - value=sum(results), + value=np.mean(results), step=iteration # provide iteration as step in mlflow ) @@ -136,3 +147,5 @@ def callback(self, env: lightgbm.callback.CallbackEnv) -> None: else: # all the other just report back to node 0 self.report_distributed_metric(env) + + self.logger.info("End of callback") diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 446ebfa1..081f9bec 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -11,9 +11,19 @@ import traceback import json from distutils.util import strtobool +from collections import namedtuple + import lightgbm + +# NOTE: we're doing our own MPI initialization +# to allow for multiple threads (see LightGBMDistributedCallbackHandler) +import mpi4py +mpi4py.rc.initialize = False +mpi4py.rc.finalize = False from mpi4py import MPI -from collections import namedtuple +#MPI.Init() +MPI.Init_thread(required=MPI.THREAD_MULTIPLE) + # Add the right path to PYTHONPATH # so that you can import from common.* From d2db0dd1b053620bad3e3b7257f68c6470337d99 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 15:16:50 -0800 Subject: [PATCH 3/8] finalize design --- src/common/lightgbm_utils.py | 228 ++++++++++++------ src/scripts/training/lightgbm_python/train.py | 4 + 2 files changed, 156 insertions(+), 76 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 90cc1907..1ed88acd 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -8,6 +8,8 @@ import numpy as np import logging from typing import List +import threading +import time class LightGBMCallbackHandler(): """ This class handles LightGBM callbacks for recording metrics. """ @@ -39,94 +41,131 @@ def callback(self, env: lightgbm.callback.CallbackEnv) -> None: class LightGBMDistributedCallbackHandler(): - COMM_TAG_METRIC = 209834 # "random id" - """ This class handles LightGBM callbacks for recording metrics. """ def __init__(self, metrics_logger, mpi_comm, world_size=1, world_rank=0): + """Constructor + + Args: + metrics_logger (common.metrics.MetricsLogger): class to log metrics using MLFlow + mpi_comm (MPI.COMM_WORLD): communicator + world_size (int): mpi world size + world_rank (int): mpi world rank of this node """ + self.recording_thread = DistributedMetricCollectionThread(metrics_logger, mpi_comm, world_size=world_size, world_rank=world_rank) + self.recording_thread.start() + self.logger = logging.getLogger(__name__) + + def finalize(self): + """Asks internal thread to finalize""" + # do one last report + self.recording_thread.aggregate_and_report_loop() + + # set status to kill and join + self.recording_thread.killed = True + self.recording_thread.join() + + def callback(self, env: lightgbm.callback.CallbackEnv) -> None: + """Callback method to collect metrics produced by LightGBM. + + See https://lightgbm.readthedocs.io/en/latest/_modules/lightgbm/callback.html + """ + # let's record in the object for future use + self.recording_thread.send_distributed_metric(env) + self.logger.info("End of callback") + + +class DistributedMetricCollectionThread(threading.Thread): + """ This class handles MPI communication of LightGBM callback metrics. + NOTE: We needed to put this in a thread because having callback() + do the recv/send directly was interacting with LightGBM's own MPI communication somehow. + """ + COMM_TAG_METRIC = 209834 # "random tag" + + def __init__(self, metrics_logger, mpi_comm, world_size=1, world_rank=0): + """Constructor + Args: metrics_logger (common.metrics.MetricsLogger): class to log metrics using MLFlow + mpi_comm (MPI.COMM_WORLD): communicator + world_size (int): mpi world size + world_rank (int): mpi world rank of this node """ + threading.Thread.__init__(self) + self.killed = False # flag, set to True to kill from the inside + + self.logger = logging.getLogger(__name__) self.metrics_logger = metrics_logger - self.node_metrics = {} + + # internal sync storage self.distributed_metrics = {} + self.record_lock = threading.Lock() + self.send_queue = [] + self.send_lock = threading.Lock() + + # MPI communication + self.mpi_comm = mpi_comm self.world_size = world_size self.world_rank = world_rank - self.mpi_comm = mpi_comm - self.logger = logging.getLogger(__name__) - def report_distributed_metric(self, env: lightgbm.callback.CallbackEnv): - """Sends metrics to node 0""" - self.logger.info(f"Reporting metric back to node 0: {env}") - self.mpi_comm.isend(env, 0, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # non-blocking - - def collect_distributed_metrics(self, iteration: int): - """Collect metrics from all nodes other than 0""" - for i in range(1, self.world_size): - self.logger.info(f"Probing metric from node {i}") - - if self.mpi_comm.probe(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC): - self.logger.info(f"Collecting metric from node {i}") - remote_node_metrics = self.mpi_comm.recv(source=i, tag=LightGBMDistributedCallbackHandler.COMM_TAG_METRIC) # blocking - else: - self.logger.info(f"NO metric from node {i}") - continue - - if remote_node_metrics.iteration != iteration: - self.logger.warning(f"Remote node {i} sent metric for iteration {remote_node_metrics.iteration} while node 0 is at iteration {iteration}") - self.store_distributed_metric(i, remote_node_metrics) - - def store_distributed_metric(self, node: int, env: lightgbm.callback.CallbackEnv): - """Stores a metric in the internal storage - for processing during aggregate_and_report_loop()""" - iteration = env.iteration - if iteration not in self.distributed_metrics: - self.distributed_metrics[iteration] = {} - self.distributed_metrics[iteration][node] = env - def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, results: List[float]): - # TODO: devise aggregation method per eval_name - self.metrics_logger.log_metric( - key=key, - value=np.mean(results), - step=iteration # provide iteration as step in mlflow - ) + ##################### + ### RUN FUNCTIONS ### + ##################### - def aggregate_and_report_loop(self): - aggregation_tasks = {} + def run_head(self): + """Run function for node 0 only""" + while not(self.killed): + time.sleep(1) - for iteration in list(self.distributed_metrics.keys()): - if len(self.distributed_metrics[iteration]) < self.world_size: - continue + # collect everything from other nodes into internal record + for i in range(1, self.world_size): + self.logger.info(f"Probing metric from node {i}") - # loop on all the evaluation results tuples - for node_id, node_metrics in self.distributed_metrics[iteration].items(): - for data_name, eval_name, result, _ in node_metrics.evaluation_result_list: - key = f"{data_name}.{eval_name}" - if key not in aggregation_tasks: - # record name of metric for aggregation method - aggregation_tasks[key] = (iteration, eval_name, []) + if self.mpi_comm.probe(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC): + self.logger.info(f"Collecting metric from node {i}") + remote_node_metrics = self.mpi_comm.recv(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC) # blocking + else: + self.logger.info(f"NO metric from node {i}") + continue - # add value in the list - aggregation_tasks[key][2].append(result) + self.record_distributed_metric(i, remote_node_metrics) - # once done, remove the data from the "queue" - del self.distributed_metrics[iteration] + # record node_0's own metrics in internal storage + with self.send_lock: + while self.send_queue: + entry = self.send_queue.pop() + self.record_distributed_metric(0, entry) - for key, (iteration, eval_name, results) in aggregation_tasks.items(): - self.aggregate_and_report_task(key, iteration, eval_name, results) + # then aggregate whatever is in the internal record + self.aggregate_and_report_loop() + def run_worker(self): + """Run function for all other nodes""" + while not(self.killed): + time.sleep(1) + # all other nodes send to node_0 + with self.send_lock: + while self.send_queue: + entry = self.send_queue.pop() + self.logger.info(f"Reporting metric back to node 0: {entry}") + self.mpi_comm.isend(entry, 0, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC) # non-blocking + + def run(self): + """Main function of the thread""" + if self.world_rank == 0: + self.run_head() + else: + self.run_worker() - def callback(self, env: lightgbm.callback.CallbackEnv) -> None: - """Callback method to collect metrics produced by LightGBM. + ################### + ### SEND / RECV ### + ################### - See https://lightgbm.readthedocs.io/en/latest/_modules/lightgbm/callback.html - """ - # let's record in the object for future use - self.node_metrics[env.iteration] = env.evaluation_result_list + def send_distributed_metric(self, env: lightgbm.callback.CallbackEnv): + """Stores a metric report in the internal queue + to be sent by thread using MPI""" - # node 0 gets to report its metrics - if self.world_rank == 0: + if self.world_rank == 0: # node_0 also record as mlflow # loop on all the evaluation results tuples for data_name, eval_name, result, _ in env.evaluation_result_list: # log each as a distinct metric @@ -136,16 +175,53 @@ def callback(self, env: lightgbm.callback.CallbackEnv) -> None: step=env.iteration # provide iteration as step in mlflow ) - # store own's metrics in the record - self.store_distributed_metric(self.world_rank, env) + self.logger.info(f"Queueing metric to send to node_0: iteration={env.iteration}") + with self.send_lock: + self.send_queue.append(env) - # plus collects everybody else's - self.collect_distributed_metrics(env.iteration) + def record_distributed_metric(self, node, env: lightgbm.callback.CallbackEnv): + """Records a metric report internally to node 0""" + self.logger.info(f"Recorded metric from node {node}: {env}") + with self.record_lock: + iteration = env.iteration + if iteration not in self.distributed_metrics: + self.distributed_metrics[iteration] = {} + self.distributed_metrics[iteration][node] = env - # and report aggregates - self.aggregate_and_report_loop() - else: - # all the other just report back to node 0 - self.report_distributed_metric(env) - self.logger.info("End of callback") + ################## + ### PROCESSING ### + ################## + + def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, results: List[float]): + # TODO: devise aggregation method per eval_name + self.metrics_logger.log_metric( + key=key, + value=np.mean(results), + step=iteration # provide iteration as step in mlflow + ) + + def aggregate_and_report_loop(self): + aggregation_tasks = {} + + with self.record_lock: + for iteration in list(self.distributed_metrics.keys()): + if len(self.distributed_metrics[iteration]) < self.world_size: + continue + + # loop on all the evaluation results tuples + for node_id, node_metrics in self.distributed_metrics[iteration].items(): + for data_name, eval_name, result, _ in node_metrics.evaluation_result_list: + key = f"{data_name}.{eval_name}" + if key not in aggregation_tasks: + # record name of metric for aggregation method + aggregation_tasks[key] = (iteration, eval_name, []) + + # add value in the list + aggregation_tasks[key][2].append(result) + + # once done, remove the data from the "queue" + del self.distributed_metrics[iteration] + + for key, (iteration, eval_name, results) in aggregation_tasks.items(): + self.aggregate_and_report_task(key, iteration, eval_name, results) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 081f9bec..7f822141 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -222,6 +222,7 @@ def run(self, args, logger, metrics_logger, unknown_args): # figure out the lgbm params from cli args + mpi config lgbm_params = self.load_lgbm_params_from_cli(args, self.mpi_config) + # create a handler callbacks_handler = LightGBMDistributedCallbackHandler( metrics_logger=metrics_logger, mpi_comm = MPI.COMM_WORLD, @@ -283,6 +284,9 @@ def run(self, args, logger, metrics_logger, unknown_args): logger.info(f"Writing model in {args.export_model}") booster.save_model(args.export_model) + # finalize all remaining metrics + callbacks_handler.finalize() + # clean exit from mpi if MPI.Is_initialized(): logger.info("MPI was initialized, calling MPI.finalize()") From 8b5aa5d3d40013563b39fdf7f8ecdd053336aa10 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 15:23:17 -0800 Subject: [PATCH 4/8] probe in non-blocking mode --- src/common/lightgbm_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 1ed88acd..bc9d1642 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -121,7 +121,7 @@ def run_head(self): for i in range(1, self.world_size): self.logger.info(f"Probing metric from node {i}") - if self.mpi_comm.probe(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC): + if self.mpi_comm.iprobe(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC): # non-blocking self.logger.info(f"Collecting metric from node {i}") remote_node_metrics = self.mpi_comm.recv(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC) # blocking else: From 73de6ba727b49f7372e44653d110100892f64445 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 15:37:30 -0800 Subject: [PATCH 5/8] add try/except around probe --- src/common/lightgbm_utils.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index bc9d1642..560ee0a2 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -10,6 +10,7 @@ from typing import List import threading import time +import traceback class LightGBMCallbackHandler(): """ This class handles LightGBM callbacks for recording metrics. """ @@ -121,12 +122,15 @@ def run_head(self): for i in range(1, self.world_size): self.logger.info(f"Probing metric from node {i}") - if self.mpi_comm.iprobe(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC): # non-blocking - self.logger.info(f"Collecting metric from node {i}") - remote_node_metrics = self.mpi_comm.recv(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC) # blocking - else: - self.logger.info(f"NO metric from node {i}") - continue + try: + if self.mpi_comm.iprobe(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC): # non-blocking + self.logger.info(f"Collecting metric from node {i}") + remote_node_metrics = self.mpi_comm.recv(source=i, tag=DistributedMetricCollectionThread.COMM_TAG_METRIC) # blocking + else: + self.logger.info(f"NO metric from node {i}") + continue + except BaseException: + self.logger.warning(f"Exception while listening to other nodes:\n{traceback.format_exc()}") self.record_distributed_metric(i, remote_node_metrics) From d4eaeb6d9a7caf7ed18b5f8170782f791bb6436a Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 15:49:08 -0800 Subject: [PATCH 6/8] report min and max too --- src/common/lightgbm_utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 560ee0a2..8db60df2 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -199,10 +199,13 @@ def record_distributed_metric(self, node, env: lightgbm.callback.CallbackEnv): def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, results: List[float]): # TODO: devise aggregation method per eval_name - self.metrics_logger.log_metric( - key=key, - value=np.mean(results), - step=iteration # provide iteration as step in mlflow + self.metrics_logger.log_metrics( + { + f"{key}": np.mean(results), + f"{key}_min": np.min(results), + f"{key}_max": np.max(results) + }, + step=iteration ) def aggregate_and_report_loop(self): From 50cc944fad0acdd32bada103427d9d942bc77987 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 16:03:12 -0800 Subject: [PATCH 7/8] do not use log_metrics, not implemented yet --- src/common/lightgbm_utils.py | 40 +++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/common/lightgbm_utils.py b/src/common/lightgbm_utils.py index 8db60df2..faa3191f 100644 --- a/src/common/lightgbm_utils.py +++ b/src/common/lightgbm_utils.py @@ -181,16 +181,21 @@ def send_distributed_metric(self, env: lightgbm.callback.CallbackEnv): self.logger.info(f"Queueing metric to send to node_0: iteration={env.iteration}") with self.send_lock: - self.send_queue.append(env) - - def record_distributed_metric(self, node, env: lightgbm.callback.CallbackEnv): + # filtering out what we don't need to send + # in particular, we don't want to send the model! + self.send_queue.append({ + "iteration":env.iteration, + "evaluation_result_list":env.evaluation_result_list + }) + + def record_distributed_metric(self, node, report): """Records a metric report internally to node 0""" - self.logger.info(f"Recorded metric from node {node}: {env}") + self.logger.info(f"Recorded metric from node {node}: {report}") with self.record_lock: - iteration = env.iteration + iteration = report['iteration'] if iteration not in self.distributed_metrics: self.distributed_metrics[iteration] = {} - self.distributed_metrics[iteration][node] = env + self.distributed_metrics[iteration][node] = report ################## @@ -199,13 +204,20 @@ def record_distributed_metric(self, node, env: lightgbm.callback.CallbackEnv): def aggregate_and_report_task(self, key: str, iteration: int, eval_name: str, results: List[float]): # TODO: devise aggregation method per eval_name - self.metrics_logger.log_metrics( - { - f"{key}": np.mean(results), - f"{key}_min": np.min(results), - f"{key}_max": np.max(results) - }, - step=iteration + self.metrics_logger.log_metric( + key=key, + value=np.mean(results), + step=iteration # provide iteration as step in mlflow + ) + self.metrics_logger.log_metric( + key=key+"_min", + value=np.min(results), + step=iteration # provide iteration as step in mlflow + ) + self.metrics_logger.log_metric( + key=key+"_max", + value=np.max(results), + step=iteration # provide iteration as step in mlflow ) def aggregate_and_report_loop(self): @@ -218,7 +230,7 @@ def aggregate_and_report_loop(self): # loop on all the evaluation results tuples for node_id, node_metrics in self.distributed_metrics[iteration].items(): - for data_name, eval_name, result, _ in node_metrics.evaluation_result_list: + for data_name, eval_name, result, _ in node_metrics['evaluation_result_list']: key = f"{data_name}.{eval_name}" if key not in aggregation_tasks: # record name of metric for aggregation method From ce90af51346aad05f2a13453ad93b1f02a6b0164 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 1 Dec 2021 16:28:42 -0800 Subject: [PATCH 8/8] resolve py import error in unit tests --- src/scripts/training/lightgbm_python/train.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/scripts/training/lightgbm_python/train.py b/src/scripts/training/lightgbm_python/train.py index 4e96eb78..aa4a2ced 100644 --- a/src/scripts/training/lightgbm_python/train.py +++ b/src/scripts/training/lightgbm_python/train.py @@ -14,16 +14,7 @@ from collections import namedtuple import lightgbm - -# NOTE: we're doing our own MPI initialization -# to allow for multiple threads (see LightGBMDistributedCallbackHandler) import mpi4py -mpi4py.rc.initialize = False -mpi4py.rc.finalize = False -from mpi4py import MPI -#MPI.Init() -MPI.Init_thread(required=MPI.THREAD_MULTIPLE) - # Add the right path to PYTHONPATH # so that you can import from common.* @@ -46,8 +37,16 @@ def detect_mpi_config(): Returns: mpi_config (namedtuple) """ + # NOTE: we're doing our own MPI initialization + # to allow for multiple threads (see LightGBMDistributedCallbackHandler) + mpi4py.rc.initialize = False + mpi4py.rc.finalize = False + from mpi4py import MPI + #MPI.Init() + MPI.Init_thread(required=MPI.THREAD_MULTIPLE) + # check if we're running multi or single node - mpi_config_tuple = namedtuple("mpi_config", ['world_size', 'world_rank', 'mpi_available', 'main_node']) + mpi_config_tuple = namedtuple("mpi_config", ['world_size', 'world_rank', 'mpi_available', 'main_node', 'mpi_comm']) try: comm = MPI.COMM_WORLD @@ -56,6 +55,7 @@ def detect_mpi_config(): comm.Get_rank(), # world_rank (comm.Get_size() > 1), # mpi_available (comm.Get_rank() == 0), # main_node + MPI.COMM_WORLD ) logging.getLogger().info(f"MPI detection results: {mpi_config}") except: @@ -225,7 +225,7 @@ def run(self, args, logger, metrics_logger, unknown_args): # create a handler callbacks_handler = LightGBMDistributedCallbackHandler( metrics_logger=metrics_logger, - mpi_comm = MPI.COMM_WORLD, + mpi_comm = self.mpi_config.mpi_comm, world_rank=self.mpi_config.world_rank, world_size=self.mpi_config.world_size ) @@ -288,9 +288,9 @@ def run(self, args, logger, metrics_logger, unknown_args): callbacks_handler.finalize() # clean exit from mpi - if MPI.Is_initialized(): + if mpi4py.MPI.Is_initialized(): logger.info("MPI was initialized, calling MPI.finalize()") - MPI.Finalize() + mpi4py.MPI.Finalize() def get_arg_parser(parser=None):