Skip to content

Submission to reusable executor is not fully thread-safe #458

@ogrisel

Description

@ogrisel

The bug is hard to reproduce, but here is a minimal reproducer that oftens triggers it for me. The trick is to ensure that each submission will trigger the current executor to shutdown with high probability.

from loky import get_reusable_executor
from concurrent.futures import ThreadPoolExecutor

tpe = ThreadPoolExecutor(max_workers=10)


def loky_submit_one_task_with_env(env):
    return get_reusable_executor(env=env).submit(lambda: env)

# Initialize the executor with a given env in the main thread

envs = [{"a": str(i)} for i in range(10)] * 10
for i in range(10):
    print("Outer iteration:", i)
    loky_submit_one_task_with_env({"a": "1"})


    for f in tpe.map(loky_submit_one_task_with_env, envs):
        # print(f.result())
        f.result()
        print(".", end="")
    print()

And here is the traceback (it can take more than one outer iteration to get it):

---------------------------------------------------------------------------
ShutdownExecutorError                     Traceback (most recent call last)
Cell In[1], line 19
     15 print("Outer iteration:", i)
     16 loky_submit_one_task_with_env({"a": "1"})
---> 19 for f in tpe.map(loky_submit_one_task_with_env, envs):
     20     # print(f.result())
     21     f.result()
     22     print(".", end="")

File ~/miniforge3/envs/dev/lib/python3.13/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
    616 while fs:
    617     # Careful not to keep a reference to the popped future
    618     if timeout is None:
--> 619         yield _result_or_cancel(fs.pop())
    620     else:
    621         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File ~/miniforge3/envs/dev/lib/python3.13/concurrent/futures/_base.py:317, in _result_or_cancel(***failed resolving arguments***)
    315 try:
    316     try:
--> 317         return fut.result(timeout)
    318     finally:
    319         fut.cancel()

File ~/miniforge3/envs/dev/lib/python3.13/concurrent/futures/_base.py:449, in Future.result(self, timeout)
    447     raise CancelledError()
    448 elif self._state == FINISHED:
--> 449     return self.__get_result()
    451 self._condition.wait(timeout)
    453 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniforge3/envs/dev/lib/python3.13/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/miniforge3/envs/dev/lib/python3.13/concurrent/futures/thread.py:59, in _WorkItem.run(self)
     56     return
     58 try:
---> 59     result = self.fn(*self.args, **self.kwargs)
     60 except BaseException as exc:
     61     self.future.set_exception(exc)

Cell In[1], line 9, in loky_submit_one_task_with_env(env)
      8 def loky_submit_one_task_with_env(env):
----> 9     return get_reusable_executor(env=env).submit(lambda: env)

File ~/code/loky/loky/reusable_executor.py:230, in _ReusablePoolExecutor.submit(self, fn, *args, **kwargs)
    228 def submit(self, fn, *args, **kwargs):
    229     with self._submit_resize_lock:
--> 230         return super().submit(fn, *args, **kwargs)

File ~/code/loky/loky/process_executor.py:1258, in ProcessPoolExecutor.submit(self, fn, *args, **kwargs)
   1256     raise self._flags.broken
   1257 if self._flags.shutdown:
-> 1258     raise ShutdownExecutorError(
   1259         "cannot schedule new futures after shutdown"
   1260     )
   1262 # Cannot submit a new calls once the interpreter is shutting down.
   1263 # This check avoids spawning new processes at exit.
   1264 if _global_shutdown:

ShutdownExecutorError: cannot schedule new futures after shutdown

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions