From be22904b15af2c5653533a24dfbc5724128ac96b Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 5 Feb 2021 19:57:54 +0100 Subject: [PATCH 1/9] ENH add get_worker_rank with unique rank --- loky/__init__.py | 7 ++-- loky/process_executor.py | 85 +++++++++++++++++++++++++++++++++++---- loky/reusable_executor.py | 4 ++ 3 files changed, 85 insertions(+), 11 deletions(-) diff --git a/loky/__init__.py b/loky/__init__.py index 62f8f0d9f..a83f93b94 100644 --- a/loky/__init__.py +++ b/loky/__init__.py @@ -1,5 +1,5 @@ -r"""The :mod:`loky` module manages a pool of worker that can be re-used across time. -It provides a robust and dynamic implementation os the +r"""The :mod:`loky` module manages a pool of worker that can be re-used across +time. It provides a robust and dynamic implementation os the :class:`ProcessPoolExecutor` and a function :func:`get_reusable_executor` which hide the pool management under the hood. """ @@ -13,10 +13,11 @@ from .reusable_executor import get_reusable_executor from .cloudpickle_wrapper import wrap_non_picklable_objects from .process_executor import BrokenProcessPool, ProcessPoolExecutor +from .process_executor import get_worker_rank __all__ = ["get_reusable_executor", "cpu_count", "wait", "as_completed", - "Future", "Executor", "ProcessPoolExecutor", + "Future", "Executor", "ProcessPoolExecutor", "get_worker_rank", "BrokenProcessPool", "CancelledError", "TimeoutError", "FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED", "wrap_non_picklable_objects", "set_loky_pickler"] diff --git a/loky/process_executor.py b/loky/process_executor.py index f2fc167fc..ca7cf7528 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -122,6 +122,34 @@ def _get_memory_usage(pid, force_gc=False): except ImportError: _USE_PSUTIL = False +# Mechanism to obtain the rank of a worker and the total number of workers in +# the executor. +_WORKER_RANK = None +_WORKER_WORLD = None + + +def get_worker_rank(): + """Returns the rank of the worker and the number of workers in the executor + + This helper function should only be called in a worker, else it will throw + a RuntimeError. + """ + if _WORKER_RANK is None: + raise RuntimeError( + "get_worker_id, should only be called in a worker, not in the " + "main process." + ) + return _WORKER_RANK, _WORKER_WORLD + + +def set_worker_rank(pid, rank_mapper): + """Set worker's rank and world size from the process pid and an rank_mapper. + """ + global _WORKER_RANK, _WORKER_WORLD + if pid in rank_mapper: + _WORKER_RANK = rank_mapper[pid] + _WORKER_WORLD = rank_mapper['world'] + class _ThreadWakeup: def __init__(self): @@ -273,11 +301,12 @@ def __init__(self, work_id, exception=None, result=None): class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): + def __init__(self, work_id, fn, args, kwargs, rank_mapper): self.work_id = work_id self.fn = fn self.args = args self.kwargs = kwargs + self.rank_mapper = rank_mapper # Store the current loky_pickler so it is correctly set in the worker self.loky_pickler = get_loky_pickler_name() @@ -364,7 +393,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): def _process_worker(call_queue, result_queue, initializer, initargs, processes_management_lock, timeout, worker_exit_lock, - current_depth): + current_depth, rank_mapper): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -383,6 +412,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs, worker_exit_lock: Lock to avoid flagging the executor as broken on workers timeout. current_depth: Nested parallelism level, to avoid infinite spawning. + rank_mapper: Initial value for rank and world as a dict with keys None + and world. """ if initializer is not None: try: @@ -400,6 +431,13 @@ def _process_worker(call_queue, result_queue, initializer, initargs, _last_memory_leak_check = None pid = os.getpid() + # Passing an initial value is necessary as some jobs can be sent and + # serialized before this worker is created. In this case, no rank is + # available in the call_item.rank_mapper and this rank is the correct one. + # When initialized, main process does not know the pid and pass the worker + # rank as None. + set_worker_rank(None, rank_mapper) + mp.util.debug('Worker started with timeout=%s' % timeout) while True: try: @@ -431,6 +469,11 @@ def _process_worker(call_queue, result_queue, initializer, initargs, with worker_exit_lock: mp.util.debug('Exited cleanly') return + + # If the executor has been resized, this new rank mapper might contain + # new rank/world info. Correct the value before runnning the task. + set_worker_rank(pid, call_item.rank_mapper) + try: r = call_item() except BaseException as e: @@ -547,6 +590,10 @@ def weakref_cb(_, # of new processes or shut down self.processes_management_lock = executor._processes_management_lock + # A dict mapping the workers' pid to their rank. Also contains the + # current size of the executor associated to 'world' key. + self.rank_mapper = executor._rank_mapper + super(_ExecutorManagerThread, self).__init__() if sys.version_info < (3, 9): self.daemon = True @@ -592,11 +639,11 @@ def add_call_item_to_queue(self): if work_item.future.set_running_or_notify_cancel(): self.running_work_items += [work_id] - self.call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) + self.call_queue.put( + _CallItem(work_id, work_item.fn, work_item.args, + work_item.kwargs, self.rank_mapper), + block=True + ) else: del self.pending_work_items[work_id] continue @@ -679,6 +726,7 @@ def process_result_item(self, result_item): # itself: we should not mark the executor as broken. with self.processes_management_lock: p = self.processes.pop(result_item, None) + del self.rank_mapper[result_item] # p can be None is the executor is concurrently shutting down. if p is not None: @@ -1022,6 +1070,10 @@ def __init__(self, max_workers=None, job_reducers=None, # Finally setup the queues for interprocess communication self._setup_queues(job_reducers, result_reducers) + # A dict mapping the workers' pid to their rank. The current size of + # the executor is associated with the 'world' key. + self._rank_mapper = {'world': max_workers} + mp.util.debug('ProcessPoolExecutor is setup') def _setup_queues(self, job_reducers, result_reducers, queue_size=None): @@ -1082,11 +1134,20 @@ def weakref_cb( _python_exit) def _adjust_process_count(self): + # Compute available worker ranks for newly spawned workers + given_ranks = set( + v for k, v in self._rank_mapper.items() if k != 'world' + ) + all_ranks = set(range(self._max_workers)) + available_ranks = all_ranks - given_ranks + for _ in range(len(self._processes), self._max_workers): worker_exit_lock = self._context.BoundedSemaphore(1) + rank = available_ranks.pop() args = (self._call_queue, self._result_queue, self._initializer, self._initargs, self._processes_management_lock, - self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1) + self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1, + {None: rank, 'world': self._max_workers}) worker_exit_lock.acquire() try: # Try to spawn the process with some environment variable to @@ -1098,6 +1159,14 @@ def _adjust_process_count(self): p._worker_exit_lock = worker_exit_lock p.start() self._processes[p.pid] = p + self._rank_mapper[p.pid] = rank + + # Reassign rank that are too high to rank that are still available. + # They will be passed to the workers when sending the tasks with + # the CallItem. + for pid, rank in list(self._rank_mapper.items()): + if pid != 'world' and rank >= self._max_workers: + self._rank_mapper[pid] = available_ranks.pop() mp.util.debug('Adjust process count : {}'.format(self._processes)) def _ensure_executor_running(self): diff --git a/loky/reusable_executor.py b/loky/reusable_executor.py index 9a8e73f37..1a19d6ade 100644 --- a/loky/reusable_executor.py +++ b/loky/reusable_executor.py @@ -189,10 +189,14 @@ def _resize(self, max_workers): # then no processes have been spawned and we can just # update _max_workers and return self._max_workers = max_workers + self._rank_mapper['world'] = max_workers return self._wait_job_completion() + # Set the new size to be broadcasted to the workers + self._rank_mapper['world'] = max_workers + # Some process might have returned due to timeout so check how many # children are still alive. Use the _process_management_lock to # ensure that no process are spawned or timeout during the resize. From a44f7d7b5217a47d54a9ca63f4d4a17d8ca2d9d2 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 5 Feb 2021 20:00:41 +0100 Subject: [PATCH 2/9] TST test get_worker_rank --- tests/_test_process_executor.py | 32 +++++++++++++++++++++++++++---- tests/test_loky_module.py | 15 +++++++++------ tests/test_reusable_executor.py | 34 ++++++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index 65d9a2c9e..f3344fa42 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -33,6 +33,7 @@ from threading import Thread from collections import defaultdict +from loky import get_worker_rank from loky.process_executor import LokyRecursionError from loky.process_executor import ShutdownExecutorError, TerminatedWorkerError from loky._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, @@ -906,8 +907,10 @@ def test_max_depth(self, kill_workers): @pytest.mark.skipif(sys.maxsize < 2 ** 32, reason="Test requires a 64 bit version of Python") @pytest.mark.skipif( - sys.version_info[:2] < (3, 8), - reason="Python version does not support pickling objects of size > 2 ** 31GB") + sys.version_info[:2] < (3, 8), + reason="Python version does not support pickling objects " + "of size > 2 ** 31GB" + ) def test_no_failure_on_large_data_send(self): data = b'\x00' * int(2.2e9) self.executor.submit(id, data).result() @@ -916,8 +919,9 @@ def test_no_failure_on_large_data_send(self): @pytest.mark.skipif(sys.maxsize < 2 ** 32, reason="Test requires a 64 bit version of Python") @pytest.mark.skipif( - sys.version_info[:2] >= (3, 8), - reason="Python version supports pickling objects of size > 2 ** 31GB") + sys.version_info[:2] >= (3, 8), + reason="Python version supports pickling objects of size > 2 ** 31GB" + ) def test_expected_failure_on_large_data_send(self): data = b'\x00' * int(2.2e9) with pytest.raises(RuntimeError): @@ -1034,3 +1038,23 @@ def test_child_env_executor(self): assert var_child == var_value executor.shutdown(wait=True) + + @staticmethod + def _worker_rank(x): + time.sleep(.1) + rank, world = get_worker_rank() + return dict(pid=os.getpid(), rank=rank, world=world) + + @pytest.mark.parametrize('max_workers', [1, 5, 13]) + @pytest.mark.parametrize('timeout', [None, 0.01, 0]) + def test_workers_rank(self, max_workers, timeout): + executor = self.executor_type(max_workers, timeout=timeout) + results = executor.map(self._worker_rank, range(max_workers * 5)) + workers_rank = {} + for f in results: + assert f['world'] == max_workers + rank = workers_rank.get(f['pid'], None) + assert rank is None or rank == f['rank'] + workers_rank[f['pid']] = f['rank'] + assert set(workers_rank.values()) == set(range(max_workers)) + executor.shutdown(wait=True, kill_workers=True) diff --git a/tests/test_loky_module.py b/tests/test_loky_module.py index 076e6e409..a3432e560 100644 --- a/tests/test_loky_module.py +++ b/tests/test_loky_module.py @@ -1,15 +1,15 @@ -import multiprocessing as mp import os import sys import shutil import tempfile +import multiprocessing as mp from subprocess import check_output -import subprocess import pytest import loky from loky import cpu_count +from loky import get_worker_rank from loky.backend.context import _cpu_count_user @@ -54,7 +54,7 @@ def test_cpu_count_affinity(): res = check_output([taskset_bin, '-c', '0', python_bin, '-c', cpu_count_cmd.format(args='')]) - + res_physical = check_output([ taskset_bin, '-c', '0', python_bin, '-c', cpu_count_cmd.format(args='only_physical_cores=True')]) @@ -92,7 +92,7 @@ def test_only_physical_cores_error(): # unable to retrieve the number of physical cores. if sys.platform != "linux": pytest.skip() - + # if number of available cpus is already restricted, cpu_count will return # that value and no warning is issued even if only_physical_cores == True. # (tested in another test: test_only_physical_cores_with_user_limitation @@ -100,8 +100,6 @@ def test_only_physical_cores_error(): if _cpu_count_user(cpu_count_mp) < cpu_count_mp: pytest.skip() - start_dir = os.path.abspath('.') - with tempfile.TemporaryDirectory() as tmp_dir: # Write bad lscpu program lscpu_path = tmp_dir + '/lscpu' @@ -140,3 +138,8 @@ def test_only_physical_cores_with_user_limitation(): if cpu_count_user < cpu_count_mp: assert cpu_count() == cpu_count_user assert cpu_count(only_physical_cores=True) == cpu_count_user + + +def test_worker_rank_in_worker_only(): + with pytest.raises(RuntimeError): + get_worker_rank() diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index d24d2defb..eceb4c429 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -1,6 +1,7 @@ import os -import sys import gc +import sys +import time import ctypes import psutil import pytest @@ -13,6 +14,7 @@ import loky from loky import cpu_count +from loky import get_worker_rank from loky import get_reusable_executor from loky.process_executor import _RemoteTraceback, TerminatedWorkerError from loky.process_executor import BrokenProcessPool, ShutdownExecutorError @@ -577,6 +579,36 @@ def test_resize_after_timeout(self): expected_msg = 'A worker stopped' assert expected_msg in recorded_warnings[0].message.args[0] + @staticmethod + def _worker_rank(x): + time.sleep(.1) + rank, world = get_worker_rank() + return dict(pid=os.getpid(), rank=rank, world=world) + + @pytest.mark.parametrize('timeout', [10, 0]) + def test_workers_rank_resize(self, timeout): + + executor = get_reusable_executor(max_workers=2, timeout=timeout) + + with warnings.catch_warnings(record=True): + # Cause all warnings to always be triggered. + warnings.simplefilter("always") + for size in [12, 2, 1, 12, 6, 1, 8, 5]: + executor = get_reusable_executor(max_workers=size, reuse=True) + results = executor.map(self._worker_rank, range(size * 5)) + executor.map(sleep, [0.01] * 6) + workers_rank = {} + for f in results: + assert f['world'] == size + rank = workers_rank.get(f['pid'], None) + assert rank is None or rank == f['rank'] + workers_rank[f['pid']] = f['rank'] + assert set(workers_rank.values()) == set(range(size)), ( + ', '.join(f'{k}: {v}' + for k, v in executor._rank_mapper.items()) + ) + executor.shutdown(wait=True) + class TestGetReusableExecutor(ReusableExecutorMixin): From 886122cbdc0789fe0bf0f18fa948f0e0b3267db8 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 5 Feb 2021 20:11:06 +0100 Subject: [PATCH 3/9] FIX compat with python3.5-: no f-string --- tests/_test_process_executor.py | 5 ++++- tests/test_reusable_executor.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index f3344fa42..ea0e9d7d8 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -1056,5 +1056,8 @@ def test_workers_rank(self, max_workers, timeout): rank = workers_rank.get(f['pid'], None) assert rank is None or rank == f['rank'] workers_rank[f['pid']] = f['rank'] - assert set(workers_rank.values()) == set(range(max_workers)) + assert set(workers_rank.values()) == set(range(max_workers))( + ', '.join('{}: {}'.format(k, v) + for k, v in executor._rank_mapper.items()) + ) executor.shutdown(wait=True, kill_workers=True) diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index eceb4c429..d54f35631 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -604,7 +604,7 @@ def test_workers_rank_resize(self, timeout): assert rank is None or rank == f['rank'] workers_rank[f['pid']] = f['rank'] assert set(workers_rank.values()) == set(range(size)), ( - ', '.join(f'{k}: {v}' + ', '.join('{}: {}'.format(k, v) for k, v in executor._rank_mapper.items()) ) executor.shutdown(wait=True) From a963625c78b6ff536195c0c7d3e465766ca8b2a3 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Fri, 5 Feb 2021 21:51:14 +0100 Subject: [PATCH 4/9] FIX tests --- .readthedocs.yml | 2 +- tests/_test_process_executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.readthedocs.yml b/.readthedocs.yml index 4b222dba1..768c9f956 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -2,4 +2,4 @@ requirements_file: docs/requirements.txt python: setup_py_install: true - version: 3.5 + version: 3.8 diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index ea0e9d7d8..7e8616d5a 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -1056,7 +1056,7 @@ def test_workers_rank(self, max_workers, timeout): rank = workers_rank.get(f['pid'], None) assert rank is None or rank == f['rank'] workers_rank[f['pid']] = f['rank'] - assert set(workers_rank.values()) == set(range(max_workers))( + assert set(workers_rank.values()) == set(range(max_workers)), ( ', '.join('{}: {}'.format(k, v) for k, v in executor._rank_mapper.items()) ) From 647d60dccbdff1b8d78e57d63719fec3284d92d1 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 8 Feb 2021 09:50:26 +0100 Subject: [PATCH 5/9] FIX avoid timeout 0 too hard on CI machine --- tests/_test_process_executor.py | 2 +- tests/test_reusable_executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index 7e8616d5a..85c43f0c6 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -1046,7 +1046,7 @@ def _worker_rank(x): return dict(pid=os.getpid(), rank=rank, world=world) @pytest.mark.parametrize('max_workers', [1, 5, 13]) - @pytest.mark.parametrize('timeout', [None, 0.01, 0]) + @pytest.mark.parametrize('timeout', [None, 0.01]) def test_workers_rank(self, max_workers, timeout): executor = self.executor_type(max_workers, timeout=timeout) results = executor.map(self._worker_rank, range(max_workers * 5)) diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index d54f35631..a61c59359 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -585,7 +585,7 @@ def _worker_rank(x): rank, world = get_worker_rank() return dict(pid=os.getpid(), rank=rank, world=world) - @pytest.mark.parametrize('timeout', [10, 0]) + @pytest.mark.parametrize('timeout', [10, 0.01]) def test_workers_rank_resize(self, timeout): executor = get_reusable_executor(max_workers=2, timeout=timeout) From a693b38d64896dfcd3bb948bc9d8cbd95fb96294 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 8 Feb 2021 19:29:09 +0100 Subject: [PATCH 6/9] FIX more debug for tests --- tests/_test_process_executor.py | 6 ++++-- tests/test_reusable_executor.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index 85c43f0c6..fb68c873c 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -31,6 +31,7 @@ import faulthandler from math import sqrt from threading import Thread +import multiprocessing as mp from collections import defaultdict from loky import get_worker_rank @@ -1041,9 +1042,10 @@ def test_child_env_executor(self): @staticmethod def _worker_rank(x): - time.sleep(.1) + time.sleep(.2) rank, world = get_worker_rank() - return dict(pid=os.getpid(), rank=rank, world=world) + return dict(pid=os.getpid(), name=mp.current_process().name, + rank=rank, world=world) @pytest.mark.parametrize('max_workers', [1, 5, 13]) @pytest.mark.parametrize('timeout', [None, 0.01]) diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index a61c59359..b8ee2298f 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -8,6 +8,7 @@ import warnings import threading from time import sleep +import multiprocessing as mp from multiprocessing import util, current_process from pickle import PicklingError, UnpicklingError from distutils.version import LooseVersion @@ -581,9 +582,10 @@ def test_resize_after_timeout(self): @staticmethod def _worker_rank(x): - time.sleep(.1) + time.sleep(.2) rank, world = get_worker_rank() - return dict(pid=os.getpid(), rank=rank, world=world) + return dict(pid=os.getpid(), name=mp.current_process().name, + rank=rank, world=world) @pytest.mark.parametrize('timeout', [10, 0.01]) def test_workers_rank_resize(self, timeout): From bba5d584278b8d562d58aa11a24edd2334868fa6 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 8 Feb 2021 22:30:46 +0100 Subject: [PATCH 7/9] ENH rm low timeout for reusable executor as it is too slow on CI --- tests/_executor_mixin.py | 3 ++- tests/_test_process_executor.py | 3 ++- tests/test_reusable_executor.py | 6 ++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/_executor_mixin.py b/tests/_executor_mixin.py index 7f74bf8a3..4d8f399f2 100644 --- a/tests/_executor_mixin.py +++ b/tests/_executor_mixin.py @@ -143,7 +143,8 @@ def setup_method(self): try: self.executor = self.executor_type( max_workers=self.worker_count, context=self.context, - initializer=initializer_event, initargs=(_test_event,)) + initializer=initializer_event, initargs=(_test_event,) + ) except NotImplementedError as e: self.skipTest(str(e)) _check_executor_started(self.executor) diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index fb68c873c..9f00e0af0 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -416,7 +416,8 @@ def _test_recursive_kill(cls, depth): executor = cls.executor_type( max_workers=2, context=cls.context, initializer=_executor_mixin.initializer_event, - initargs=(_executor_mixin._test_event,)) + initargs=(_executor_mixin._test_event,) + ) assert executor.submit(sleep_and_return, 0, 42).result() == 42 if depth >= 2: diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index b8ee2298f..910150fb8 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -587,10 +587,9 @@ def _worker_rank(x): return dict(pid=os.getpid(), name=mp.current_process().name, rank=rank, world=world) - @pytest.mark.parametrize('timeout', [10, 0.01]) - def test_workers_rank_resize(self, timeout): + def test_workers_rank_resize(self): - executor = get_reusable_executor(max_workers=2, timeout=timeout) + executor = get_reusable_executor(max_workers=2) with warnings.catch_warnings(record=True): # Cause all warnings to always be triggered. @@ -609,7 +608,6 @@ def test_workers_rank_resize(self, timeout): ', '.join('{}: {}'.format(k, v) for k, v in executor._rank_mapper.items()) ) - executor.shutdown(wait=True) class TestGetReusableExecutor(ReusableExecutorMixin): From aac92348d1a4a74dbcaf3c992287167661a86421 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Thu, 9 Sep 2021 10:32:47 +0200 Subject: [PATCH 8/9] FIX typo --- loky/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loky/__init__.py b/loky/__init__.py index a83f93b94..ca1e08530 100644 --- a/loky/__init__.py +++ b/loky/__init__.py @@ -1,5 +1,5 @@ r"""The :mod:`loky` module manages a pool of worker that can be re-used across -time. It provides a robust and dynamic implementation os the +time. It provides a robust and dynamic implementation of the :class:`ProcessPoolExecutor` and a function :func:`get_reusable_executor` which hide the pool management under the hood. """ From 3b686468c232700f86e9b77f87cd690613e492d3 Mon Sep 17 00:00:00 2001 From: tommoral Date: Tue, 9 Apr 2024 08:47:02 +0200 Subject: [PATCH 9/9] FIX linter --- loky/__init__.py | 1 + loky/process_executor.py | 16 +++++++--------- loky/reusable_executor.py | 4 ++-- tests/_test_process_executor.py | 29 +++++++++++++++-------------- tests/test_loky_module.py | 2 -- tests/test_reusable_executor.py | 24 ++++++++++++++---------- 6 files changed, 39 insertions(+), 37 deletions(-) diff --git a/loky/__init__.py b/loky/__init__.py index 0014f2655..2bf079d3a 100644 --- a/loky/__init__.py +++ b/loky/__init__.py @@ -3,6 +3,7 @@ :class:`ProcessPoolExecutor` and a function :func:`get_reusable_executor` which hide the pool management under the hood. """ + from concurrent.futures import ( ALL_COMPLETED, FIRST_COMPLETED, diff --git a/loky/process_executor.py b/loky/process_executor.py index 7a3a6b272..77266270f 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -136,12 +136,11 @@ def get_worker_rank(): def set_worker_rank(pid, rank_mapper): - """Set worker's rank and world size from the process pid and an rank_mapper. - """ + """Set worker's rank and world size from the process pid and an rank_mapper.""" global _WORKER_RANK, _WORKER_WORLD if pid in rank_mapper: _WORKER_RANK = rank_mapper[pid] - _WORKER_WORLD = rank_mapper['world'] + _WORKER_WORLD = rank_mapper["world"] class _ThreadWakeup: @@ -413,7 +412,7 @@ def _process_worker( timeout, worker_exit_lock, current_depth, - rank_mapper + rank_mapper, ): """Evaluates calls from call_queue and places the results in result_queue. @@ -486,7 +485,7 @@ def _process_worker( if call_item is None: # Notify queue management thread about worker shutdown result_queue.put(pid) - + is_clean = worker_exit_lock.acquire(True, timeout=30) # Early notify any loky executor running in this worker process @@ -1064,7 +1063,6 @@ class TerminatedWorkerError(BrokenProcessPool): class ShutdownExecutorError(RuntimeError): - """ Raised when a ProcessPoolExecutor is shutdown while a future was in the running or pending state. @@ -1240,11 +1238,11 @@ def _start_executor_manager_thread(self): def _adjust_process_count(self): # Compute available worker ranks for newly spawned workers given_ranks = set( - v for k, v in self._rank_mapper.items() if k != 'world' + v for k, v in self._rank_mapper.items() if k != "world" ) all_ranks = set(range(self._max_workers)) available_ranks = all_ranks - given_ranks - + while len(self._processes) < self._max_workers: worker_exit_lock = self._context.BoundedSemaphore(1) rank = available_ranks.pop() @@ -1277,7 +1275,7 @@ def _adjust_process_count(self): # They will be passed to the workers when sending the tasks with # the CallItem. for pid, rank in list(self._rank_mapper.items()): - if pid != 'world' and rank >= self._max_workers: + if pid != "world" and rank >= self._max_workers: self._rank_mapper[pid] = available_ranks.pop() mp.util.debug( f"Adjusted process count to {self._max_workers}: " diff --git a/loky/reusable_executor.py b/loky/reusable_executor.py index 52bc63789..2755a14e2 100644 --- a/loky/reusable_executor.py +++ b/loky/reusable_executor.py @@ -236,13 +236,13 @@ def _resize(self, max_workers): # then no processes have been spawned and we can just # update _max_workers and return self._max_workers = max_workers - self._rank_mapper['world'] = max_workers + self._rank_mapper["world"] = max_workers return self._wait_job_completion() # Set the new size to be broadcasted to the workers - self._rank_mapper['world'] = max_workers + self._rank_mapper["world"] = max_workers # Some process might have returned due to timeout so check how many # children are still alive. Use the _process_management_lock to diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index bf5b8c0db..3c07cadad 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -1008,7 +1008,6 @@ def test_no_failure_on_large_data_send(self): @pytest.mark.skipif( sys.version_info >= (3, 8), reason="Python version supports pickling objects of size > 2 ** 31GB", - ) def test_expected_failure_on_large_data_send(self): data = b"\x00" * int(2.2e9) @@ -1130,26 +1129,28 @@ def test_child_env_executor(self): @staticmethod def _worker_rank(x): - time.sleep(.2) + time.sleep(0.2) rank, world = loky.get_worker_rank() - return dict(pid=os.getpid(), name=mp.current_process().name, - rank=rank, world=world) + return dict( + pid=os.getpid(), + name=mp.current_process().name, + rank=rank, + world=world, + ) - @pytest.mark.parametrize('max_workers', [1, 5, 13]) - @pytest.mark.parametrize('timeout', [None, 0.01]) + @pytest.mark.parametrize("max_workers", [1, 5, 13]) + @pytest.mark.parametrize("timeout", [None, 0.01]) def test_workers_rank(self, max_workers, timeout): executor = self.executor_type(max_workers, timeout=timeout) results = executor.map(self._worker_rank, range(max_workers * 5)) workers_rank = {} for f in results: - assert f['world'] == max_workers - rank = workers_rank.get(f['pid'], None) - assert rank is None or rank == f['rank'] - workers_rank[f['pid']] = f['rank'] - assert set(workers_rank.values()) == set(range(max_workers)), ( - ', '.join('{}: {}'.format(k, v) - for k, v in executor._rank_mapper.items()) - ) + assert f["world"] == max_workers + rank = workers_rank.get(f["pid"], None) + assert rank is None or rank == f["rank"] + workers_rank[f["pid"]] = f["rank"] + msg = ", ".join(f"{k}, {v}" for k, v in executor._rank_mapper.items()) + assert set(workers_rank.values()) == set(range(max_workers)), msg executor.shutdown(wait=True, kill_workers=True) def test_viztracer_profiler(self): diff --git a/tests/test_loky_module.py b/tests/test_loky_module.py index d459ad301..a7e5a2a32 100644 --- a/tests/test_loky_module.py +++ b/tests/test_loky_module.py @@ -14,7 +14,6 @@ from loky.backend.context import _cpu_count_user, _MAX_WINDOWS_WORKERS - def test_version(): assert hasattr( loky, "__version__" @@ -61,7 +60,6 @@ def test_cpu_count_os_sched_getaffinity(): except NotImplementedError: pytest.skip() - res = check_output( [ taskset_bin, diff --git a/tests/test_reusable_executor.py b/tests/test_reusable_executor.py index bc0070712..123550757 100644 --- a/tests/test_reusable_executor.py +++ b/tests/test_reusable_executor.py @@ -685,10 +685,14 @@ def test_resize_after_timeout(self): @staticmethod def _worker_rank(x): - time.sleep(.2) + time.sleep(0.2) rank, world = get_worker_rank() - return dict(pid=os.getpid(), name=mp.current_process().name, - rank=rank, world=world) + return dict( + pid=os.getpid(), + name=mp.current_process().name, + rank=rank, + world=world, + ) def test_workers_rank_resize(self): @@ -703,14 +707,14 @@ def test_workers_rank_resize(self): executor.map(sleep, [0.01] * 6) workers_rank = {} for f in results: - assert f['world'] == size - rank = workers_rank.get(f['pid'], None) - assert rank is None or rank == f['rank'] - workers_rank[f['pid']] = f['rank'] - assert set(workers_rank.values()) == set(range(size)), ( - ', '.join('{}: {}'.format(k, v) - for k, v in executor._rank_mapper.items()) + assert f["world"] == size + rank = workers_rank.get(f["pid"], None) + assert rank is None or rank == f["rank"] + workers_rank[f["pid"]] = f["rank"] + msg = ", ".join( + f"{k}: {v}" for k, v in executor._rank_mapper.items() ) + assert set(workers_rank.values()) == set(range(size)), msg class TestGetReusableExecutor(ReusableExecutorMixin):