Skip to content

parallel_compute_score of YRRewardManager occasionally hangs, and a potential fix #45

@shihux

Description

@shihux

Hi, the parallel_compute_score function of this line https://github.com/SkyworkAI/Skywork-OR1/blob/main/verl/workers/reward_manager/yr_code.py#L27 occasionally hangs after progress reaches nearly 100%. I tried the following code which explicitly terminates all processes, and no longer observed the hanging issue.

def parallel_compute_score(evaluation_func, response_str, ground_truth, data_sources, timeout=60, max_workers=64):
    with tqdm(total=len(response_str)) as pbar:
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(evaluation_func, response_str[index], ground_truth[index], data_sources[index]): index
                for index in range(len(response_str))
            }
            results = {}
            metadata = {}
            try:
                for future in as_completed(futures, timeout=timeout):
                    index = futures[future]
                    results[index], metadata[index] = future.result()
                    pbar.update(1)
            except Exception as e:
                print(f"Exception occurred: {e}. Initiating shutdown.")
            finally:
                # Forcefully terminate processes using psutil
                terminated_count = 0
                for pid, proc in executor._processes.items():
                    try:
                        p = psutil.Process(pid)
                        p.terminate()
                        try:
                            p.wait(timeout=5)  # Give processes 5 seconds to terminate gracefully
                        except psutil.TimeoutExpired:
                            p.kill()  # Force kill if still running after 5 seconds
                        terminated_count += 1
                    except Exception as e:
                        print(f"Failed to terminate process {pid}: {e}")

                print(f"parallel_compute_score shutdown completed. {terminated_count} subprocess(es) terminated.")

                # Set default values for unfinished results
                for index in range(len(response_str)):
                    if index not in results:
                        results[index] = 0.0  # Default score for timeout
                        metadata[index] = {"error": "timeout", "status": "terminated"}

    return [results[i] for i in range(len(response_str))]

This is related to the issue verl-project/verl#1466.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions