Skip to content

A worker partially runs the next task after being interrupted (SIGINT) on a previous task #477

@asrelo

Description

@asrelo

I am very confused what's going on, it doesn't seem like this particular problem was mentioned in docs or here.

Consider a worker function that sleeps several times and then returns:

def worker_function(task_id):
    pid = os.getpid()
    print(f'[Worker {pid}] Task {task_id}: Starting')
    steps_num = math.ceil(TASK_DURATION * 2)
    step_sleep = TASK_DURATION / steps_num
    for step in range(steps_num):
        time.sleep(step_sleep)
        print(f'[Worker {pid}] Task {task_id}: Step {step + 1}/{int(steps_num)}')
    result = task_id ** 2
    print(f'[Worker {pid}] Task {task_id}: Completed! Result = {result}')
    return result

Running it as:

N_TASKS = 6
N_JOBS = 2

results = joblib.Parallel(n_jobs=N_JOBS, backend='loky')(
    joblib.delayed(worker_function)(i) for i in range(N_TASKS)
)

Result of normal execution are as expected:

Details
[Worker PID 6660] Task 0: Starting
[Worker PID 21616] Task 1: Starting
[Worker PID 6660] Task 0: Step 1/4
[Worker PID 21616] Task 1: Step 1/4
[Worker PID 6660] Task 0: Step 2/4
[Worker PID 21616] Task 1: Step 2/4
[Worker PID 6660] Task 0: Step 3/4
[Worker PID 21616] Task 1: Step 3/4
[Worker PID 6660] Task 0: Step 4/4
[Worker PID 6660] Task 0: Completed! Result = 0
[Worker PID 21616] Task 1: Step 4/4
[Worker PID 21616] Task 1: Completed! Result = 1
[Worker PID 6660] Task 2: Starting
[Worker PID 21616] Task 3: Starting
[Worker PID 6660] Task 2: Step 1/4
[Worker PID 21616] Task 3: Step 1/4
[Worker PID 6660] Task 2: Step 2/4
[Worker PID 21616] Task 3: Step 2/4
[Worker PID 6660] Task 2: Step 3/4
[Worker PID 21616] Task 3: Step 3/4
[Worker PID 6660] Task 2: Step 4/4
[Worker PID 6660] Task 2: Completed! Result = 4
[Worker PID 21616] Task 3: Step 4/4
[Worker PID 21616] Task 3: Completed! Result = 9
[Worker PID 6660] Task 4: Starting
[Worker PID 21616] Task 5: Starting
[Worker PID 6660] Task 4: Step 1/4
[Worker PID 21616] Task 5: Step 1/4
[Worker PID 6660] Task 4: Step 2/4
[Worker PID 21616] Task 5: Step 2/4
[Worker PID 6660] Task 4: Step 3/4
[Worker PID 21616] Task 5: Step 3/4
[Worker PID 6660] Task 4: Step 4/4
[Worker PID 6660] Task 4: Completed! Result = 16
[Worker PID 21616] Task 5: Step 4/4
[Worker PID 21616] Task 5: Completed! Result = 25

But if parallel execution is interrupted from console by Ctrl+C, this happens (full log):

[Worker PID 29536] Task 0: Starting
[Worker PID 36804] Task 1: Starting
[Worker PID 29536] Task 0: Step 1/4
[Worker PID 36804] Task 1: Step 1/4
[Worker PID 29536] Task 0: Step 2/4
[Worker PID 36804] Task 1: Step 2/4
[Worker PID 29536] Task 0: Step 3/4
[Worker PID 36804] Task 1: Step 3/4
# <-- Ctrl+C sent here -->
[Worker PID 29536] Task 2: Starting
[Worker PID 36804] Task 3: Starting

I tried catching & re-raising a KeyboardInterrupt inside worker_function and traced execution of worker_function.

Apparently, worker's control leaves worker_function prematurely with a KeyboardInterrupt as expected, but then the worker starts the next task, control enters worker_function again and runs until it hits the time.sleep() call, after which execution of the worker function silently stops without an exception appearing in the user code.

This does not happen on the multiprocessing backend.

I'm on Python 3.12.10, Windows 10.

The full testing script (N_TASKS, N_JOBS are larger): gist

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions