From edeb407bdf50cd16ef62eee931cc35b11304344b Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 16:35:40 +0200 Subject: [PATCH 01/14] worker ID functionality --- loky/__init__.py | 3 ++- loky/process_executor.py | 32 +++++++++++++++++++++++++++++--- loky/worker_id.py | 8 ++++++++ tests/test_worker_id.py | 31 +++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 loky/worker_id.py create mode 100644 tests/test_worker_id.py diff --git a/loky/__init__.py b/loky/__init__.py index 62f8f0d9..9a5543e5 100644 --- a/loky/__init__.py +++ b/loky/__init__.py @@ -13,13 +13,14 @@ from .reusable_executor import get_reusable_executor from .cloudpickle_wrapper import wrap_non_picklable_objects from .process_executor import BrokenProcessPool, ProcessPoolExecutor +from .worker_id import get_worker_id __all__ = ["get_reusable_executor", "cpu_count", "wait", "as_completed", "Future", "Executor", "ProcessPoolExecutor", "BrokenProcessPool", "CancelledError", "TimeoutError", "FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED", - "wrap_non_picklable_objects", "set_loky_pickler"] + "wrap_non_picklable_objects", "set_loky_pickler", "get_worker_id"] __version__ = '3.0.0.dev0' diff --git a/loky/process_executor.py b/loky/process_executor.py index 06483cb1..463f9865 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -521,6 +521,9 @@ def weakref_cb(_, # A list of the ctx.Process instances used as workers. self.processes = executor._processes + # A list of worker IDs for each process + self.process_worker_ids = executor._process_worker_ids + # A ctx.Queue that will be filled with _CallItems derived from # _WorkItems for processing by the process workers. self.call_queue = executor._call_queue @@ -668,6 +671,7 @@ def process_result_item(self, result_item): # itself: we should not mark the executor as broken. with self.processes_management_lock: p = self.processes.pop(result_item, None) + self.process_worker_ids.pop(result_item, None) # p can be None is the executor is concurrently shutting down. if p is not None: @@ -760,7 +764,10 @@ def kill_workers(self): # terminates descendant workers of the children in case there is some # nested parallelism. while self.processes: - _, p = self.processes.popitem() + pid = next(self.processes.keys()) + p = self.processes.pop(pid, None) + self.process_worker_ids.pop(pid, None) + mp.util.debug('terminate process {}'.format(p.name)) try: recursive_terminate(p) @@ -962,6 +969,8 @@ def __init__(self, max_workers=None, job_reducers=None, if context is None: context = get_context() self._context = context + if env is None: + env = {} self._env = env if initializer is not None and not callable(initializer): @@ -983,8 +992,10 @@ def __init__(self, max_workers=None, job_reducers=None, # Map of pids to processes self._processes = {} + # Map of pids to process worker IDs + self._process_worker_ids = {} + # Internal variables of the ProcessPoolExecutor - self._processes = {} self._queue_count = 0 self._pending_work_items = {} self._running_work_items = [] @@ -1076,16 +1087,31 @@ def _adjust_process_count(self): self._initargs, self._processes_management_lock, self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1) worker_exit_lock.acquire() + + worker_id = -1 + if _CURRENT_DEPTH == 0: + used_ids = set(self._process_worker_ids.values()) + available_ids = set(range(self._max_workers)) - used_ids + if len(available_ids): + worker_id = available_ids.pop() + try: # Try to spawn the process with some environment variable to # overwrite but it only works with the loky context for now. + env = self._env + print("env:", env) + if _CURRENT_DEPTH == 0 and self._env is not None: + env = self._env.copy() + env['LOKY_WORKER_ID'] = str(worker_id) + print("!!", env['LOKY_WORKER_ID']) p = self._context.Process(target=_process_worker, args=args, - env=self._env) + env=env) except TypeError: p = self._context.Process(target=_process_worker, args=args) p._worker_exit_lock = worker_exit_lock p.start() self._processes[p.pid] = p + self._process_worker_ids[p.pid] = worker_id mp.util.debug('Adjust process count : {}'.format(self._processes)) def _ensure_executor_running(self): diff --git a/loky/worker_id.py b/loky/worker_id.py new file mode 100644 index 00000000..9de1027e --- /dev/null +++ b/loky/worker_id.py @@ -0,0 +1,8 @@ +import os + + +def get_worker_id(): + wid = os.environ.get('LOKY_WORKER_ID', None) + if wid is None: + return -1 + return int(wid) diff --git a/tests/test_worker_id.py b/tests/test_worker_id.py new file mode 100644 index 00000000..fb81ceac --- /dev/null +++ b/tests/test_worker_id.py @@ -0,0 +1,31 @@ +import os +import time +import pytest +import numpy as np +from collections import defaultdict +from loky import get_reusable_executor, get_worker_id + + +def random_sleep(k): + rng = np.random.RandomState(seed=k) + duration = rng.uniform(0, 0.05) + t0 = time.time() + time.sleep(duration) + t1 = time.time() + wid = get_worker_id() + return (wid, t0, t1) + + +def test_worker_ids(): + """Test that worker IDs are always unique, with re-use over time""" + executor = get_reusable_executor(max_workers=4, timeout=2) + results = executor.map(random_sleep, range(100)) + + all_intervals = defaultdict(list) + for wid, t0, t1 in results: + all_intervals[wid].append((t0, t1)) + + for intervals in all_intervals.values(): + intervals = sorted(intervals) + for i in range(len(intervals) - 1): + assert intervals[i + 1][0] >= intervals[i][1] From 229a469a8f4a4990b83778c50dd0275791550bdf Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 16:43:51 +0200 Subject: [PATCH 02/14] removed debug prints --- loky/process_executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index 463f9865..ec3e367e 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -1099,11 +1099,9 @@ def _adjust_process_count(self): # Try to spawn the process with some environment variable to # overwrite but it only works with the loky context for now. env = self._env - print("env:", env) if _CURRENT_DEPTH == 0 and self._env is not None: env = self._env.copy() env['LOKY_WORKER_ID'] = str(worker_id) - print("!!", env['LOKY_WORKER_ID']) p = self._context.Process(target=_process_worker, args=args, env=env) except TypeError: From 7033961ada203cf12b463b0710a27ae9106eb253 Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 16:49:35 +0200 Subject: [PATCH 03/14] removed `next` --- loky/process_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index ec3e367e..b9c76491 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -764,7 +764,7 @@ def kill_workers(self): # terminates descendant workers of the children in case there is some # nested parallelism. while self.processes: - pid = next(self.processes.keys()) + pid = list(self.processes.keys())[0] p = self.processes.pop(pid, None) self.process_worker_ids.pop(pid, None) From 949169d6733fc418ea2a7a24a627ebfa0369b40a Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 17:50:31 +0200 Subject: [PATCH 04/14] test that worker_id return value is appropriate --- tests/test_worker_id.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_worker_id.py b/tests/test_worker_id.py index fb81ceac..988feb15 100644 --- a/tests/test_worker_id.py +++ b/tests/test_worker_id.py @@ -18,11 +18,13 @@ def random_sleep(k): def test_worker_ids(): """Test that worker IDs are always unique, with re-use over time""" - executor = get_reusable_executor(max_workers=4, timeout=2) + num_workers = 4 + executor = get_reusable_executor(max_workers=num_workers, timeout=2) results = executor.map(random_sleep, range(100)) all_intervals = defaultdict(list) for wid, t0, t1 in results: + assert wid in set(range(num_workers)) all_intervals[wid].append((t0, t1)) for intervals in all_intervals.values(): From 979fef29997784895615f046edaa1e5ad6b89c87 Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Fri, 3 Jul 2020 17:52:27 +0200 Subject: [PATCH 05/14] Update loky/process_executor.py Co-authored-by: Thomas Moreau --- loky/process_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index b9c76491..329c2504 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -521,7 +521,7 @@ def weakref_cb(_, # A list of the ctx.Process instances used as workers. self.processes = executor._processes - # A list of worker IDs for each process + # A dict mapping worker pids to worker IDs self.process_worker_ids = executor._process_worker_ids # A ctx.Queue that will be filled with _CallItems derived from From 4c5f5e5a265e55244d48ef949b29481725b58d29 Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Fri, 3 Jul 2020 17:53:49 +0200 Subject: [PATCH 06/14] Update loky/process_executor.py Co-authored-by: Thomas Moreau --- loky/process_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index 329c2504..290f44c5 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -765,7 +765,7 @@ def kill_workers(self): # nested parallelism. while self.processes: pid = list(self.processes.keys())[0] - p = self.processes.pop(pid, None) + pid, p = self.processes.popitem() self.process_worker_ids.pop(pid, None) mp.util.debug('terminate process {}'.format(p.name)) From 44bb17dad6dabb15a4192d0fb50ef75a24a14b2f Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 18:20:10 +0200 Subject: [PATCH 07/14] pass worker_id as a function argument --- loky/process_executor.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index 290f44c5..bf02d4fa 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -362,7 +362,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): def _process_worker(call_queue, result_queue, initializer, initargs, processes_management_lock, timeout, worker_exit_lock, - current_depth): + current_depth, worker_id): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -398,6 +398,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs, _last_memory_leak_check = None pid = os.getpid() + # set the worker_id environment variable + os.environ["LOKY_WORKER_ID"] = str(worker_id) + mp.util.debug('Worker started with timeout=%s' % timeout) while True: try: @@ -969,8 +972,6 @@ def __init__(self, max_workers=None, job_reducers=None, if context is None: context = get_context() self._context = context - if env is None: - env = {} self._env = env if initializer is not None and not callable(initializer): @@ -1080,30 +1081,32 @@ def weakref_cb( process_pool_executor_at_exit = threading._register_atexit( _python_exit) + def _get_available_worker_id(self): + if _CURRENT_DEPTH > 0: + return -1 + + used_ids = set(self._process_worker_ids.values()) + available_ids = set(range(self._max_workers)) - used_ids + if len(available_ids): + return available_ids.pop() + else: + return -1 + def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): worker_exit_lock = self._context.BoundedSemaphore(1) + worker_id = self._get_available_worker_id() args = (self._call_queue, self._result_queue, self._initializer, self._initargs, self._processes_management_lock, - self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1) + self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1, + worker_id) worker_exit_lock.acquire() - worker_id = -1 - if _CURRENT_DEPTH == 0: - used_ids = set(self._process_worker_ids.values()) - available_ids = set(range(self._max_workers)) - used_ids - if len(available_ids): - worker_id = available_ids.pop() - try: # Try to spawn the process with some environment variable to # overwrite but it only works with the loky context for now. - env = self._env - if _CURRENT_DEPTH == 0 and self._env is not None: - env = self._env.copy() - env['LOKY_WORKER_ID'] = str(worker_id) p = self._context.Process(target=_process_worker, args=args, - env=env) + env=self._env) except TypeError: p = self._context.Process(target=_process_worker, args=args) p._worker_exit_lock = worker_exit_lock From 93927db89219fea68cb0caf198a17859b5ade4ff Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 18:43:34 +0200 Subject: [PATCH 08/14] docstring --- loky/worker_id.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/loky/worker_id.py b/loky/worker_id.py index 9de1027e..9ba6499d 100644 --- a/loky/worker_id.py +++ b/loky/worker_id.py @@ -2,6 +2,12 @@ def get_worker_id(): + """Get the worker ID of the current process. For a `ReusableExectutor` + with `max_workers=n`, the worker ID is in the range [0..n). This is suited + for reuse of persistent objects such as GPU IDs. This function only works + at the first level of parallelization (i.e. not for nested + parallelization). Returns -1 on failure. + """ wid = os.environ.get('LOKY_WORKER_ID', None) if wid is None: return -1 From d0f4320450ec45a1d55a9ea8d9c22a0056725924 Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 19:19:33 +0200 Subject: [PATCH 09/14] document problems with resizing --- loky/worker_id.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/loky/worker_id.py b/loky/worker_id.py index 9ba6499d..7c22309b 100644 --- a/loky/worker_id.py +++ b/loky/worker_id.py @@ -6,7 +6,8 @@ def get_worker_id(): with `max_workers=n`, the worker ID is in the range [0..n). This is suited for reuse of persistent objects such as GPU IDs. This function only works at the first level of parallelization (i.e. not for nested - parallelization). Returns -1 on failure. + parallelization). Resizing the `ReusableExectutor` will result in + unpredictable return values. Returns -1 on failure. """ wid = os.environ.get('LOKY_WORKER_ID', None) if wid is None: From 4be8b0485a560cf14efb821be0d831f1f54d0d61 Mon Sep 17 00:00:00 2001 From: pmla Date: Fri, 3 Jul 2020 19:27:46 +0200 Subject: [PATCH 10/14] added timeout test --- tests/test_worker_id.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_worker_id.py b/tests/test_worker_id.py index 988feb15..4d8382e6 100644 --- a/tests/test_worker_id.py +++ b/tests/test_worker_id.py @@ -1,4 +1,3 @@ -import os import time import pytest import numpy as np @@ -6,9 +5,10 @@ from loky import get_reusable_executor, get_worker_id -def random_sleep(k): +def random_sleep(args): + k, max_duration = args rng = np.random.RandomState(seed=k) - duration = rng.uniform(0, 0.05) + duration = rng.uniform(0, max_duration) t0 = time.time() time.sleep(duration) t1 = time.time() @@ -16,11 +16,14 @@ def random_sleep(k): return (wid, t0, t1) -def test_worker_ids(): +@pytest.mark.parametrize("max_duration,timeout,kmax", [(0.05, 2, 100), + (1, 0.01, 4)]) +def test_worker_ids(max_duration, timeout, kmax): """Test that worker IDs are always unique, with re-use over time""" num_workers = 4 executor = get_reusable_executor(max_workers=num_workers, timeout=2) - results = executor.map(random_sleep, range(100)) + results = executor.map(random_sleep, [(k, max_duration) + for k in range(kmax)]) all_intervals = defaultdict(list) for wid, t0, t1 in results: From 7e05644cff4e2f8b55dadf6060e585da364b5b87 Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Wed, 8 Jul 2020 20:01:26 +0200 Subject: [PATCH 11/14] Update loky/process_executor.py Co-authored-by: Thomas Moreau --- loky/process_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/loky/process_executor.py b/loky/process_executor.py index bf02d4fa..aa17f0f5 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -767,7 +767,6 @@ def kill_workers(self): # terminates descendant workers of the children in case there is some # nested parallelism. while self.processes: - pid = list(self.processes.keys())[0] pid, p = self.processes.popitem() self.process_worker_ids.pop(pid, None) From 4d8b594ebaaa19246124aa78f0d1baddcd9a9457 Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Wed, 8 Jul 2020 20:03:12 +0200 Subject: [PATCH 12/14] Update loky/worker_id.py Co-authored-by: Thomas Moreau --- loky/worker_id.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/loky/worker_id.py b/loky/worker_id.py index 7c22309b..a21f5798 100644 --- a/loky/worker_id.py +++ b/loky/worker_id.py @@ -2,12 +2,15 @@ def get_worker_id(): - """Get the worker ID of the current process. For a `ReusableExectutor` - with `max_workers=n`, the worker ID is in the range [0..n). This is suited - for reuse of persistent objects such as GPU IDs. This function only works - at the first level of parallelization (i.e. not for nested - parallelization). Resizing the `ReusableExectutor` will result in - unpredictable return values. Returns -1 on failure. + """Get the worker ID of the current process. + + For a `ReusableExectutor` with `max_workers=n`, the worker ID is in the + range [0..n). This is suited for reuse of persistent objects such as GPU + IDs. This function only works at the first level of parallelization (i.e. + not for nested parallelization). Resizing the `ReusableExectutor` will + result in unpredictable return values. + + Returns -1 when the process is not a worker. """ wid = os.environ.get('LOKY_WORKER_ID', None) if wid is None: From 6abad2cec64599d01864ba688be001d24595b25d Mon Sep 17 00:00:00 2001 From: Peter Larsen Date: Wed, 8 Jul 2020 20:03:39 +0200 Subject: [PATCH 13/14] Update tests/test_worker_id.py Co-authored-by: Thomas Moreau --- tests/test_worker_id.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker_id.py b/tests/test_worker_id.py index 4d8382e6..4d2f95a3 100644 --- a/tests/test_worker_id.py +++ b/tests/test_worker_id.py @@ -21,7 +21,7 @@ def random_sleep(args): def test_worker_ids(max_duration, timeout, kmax): """Test that worker IDs are always unique, with re-use over time""" num_workers = 4 - executor = get_reusable_executor(max_workers=num_workers, timeout=2) + executor = get_reusable_executor(max_workers=num_workers, timeout=timeout) results = executor.map(random_sleep, [(k, max_duration) for k in range(kmax)]) From 5b18fa0b11091b49ab8c8c0b2b9fb5de6115a5b2 Mon Sep 17 00:00:00 2001 From: pmla Date: Wed, 8 Jul 2020 20:50:20 +0200 Subject: [PATCH 14/14] removed redundant if-statement --- loky/worker_id.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/loky/worker_id.py b/loky/worker_id.py index a21f5798..8d5e0f4a 100644 --- a/loky/worker_id.py +++ b/loky/worker_id.py @@ -12,7 +12,4 @@ def get_worker_id(): Returns -1 when the process is not a worker. """ - wid = os.environ.get('LOKY_WORKER_ID', None) - if wid is None: - return -1 - return int(wid) + return int(os.environ.get('LOKY_WORKER_ID', -1))