From 4ef0473e4a3d77c4be43d6b8da3042ae2a54187f Mon Sep 17 00:00:00 2001 From: Paolo Alba Date: Tue, 31 May 2022 21:04:22 +0200 Subject: [PATCH] add text are with exception from worker; raise exception for error in worker --- pandarallel/core.py | 22 ++++++++++++++-------- pandarallel/progress_bars.py | 17 +++++++++++++---- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pandarallel/core.py b/pandarallel/core.py index f96359c..8641a38 100644 --- a/pandarallel/core.py +++ b/pandarallel/core.py @@ -1,15 +1,17 @@ -import multiprocessing import os +import dill import pickle +import psutil +import multiprocessing +import traceback + +import pandas as pd + from itertools import count from multiprocessing.managers import SyncManager from pathlib import Path from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, Iterator, Optional, Tuple, Type, cast - -import dill -import pandas as pd -import psutil from pandas.core.groupby import DataFrameGroupBy as PandaDataFrameGroupBy from pandas.core.window.expanding import ExpandingGroupby as PandasExpandingGroupby from pandas.core.window.rolling import RollingGroupby as PandasRollingGroupby @@ -105,8 +107,9 @@ def __call__( master_workers_queue.put((worker_index, WorkerStatus.Success, None)) - except: - master_workers_queue.put((worker_index, WorkerStatus.Error, None)) + except Exception as excp: + err_msg = f"{excp}" + "\n" + traceback.format_exc() + master_workers_queue.put((worker_index, WorkerStatus.Error, err_msg)) raise @@ -318,9 +321,12 @@ def closure( if next(generation) % nb_workers == 0: progress_bars.update(progresses) elif worker_status == WorkerStatus.Error: - progress_bars.set_error(worker_index) + progress_bars.set_error(worker_index, payload) progress_bars.update(progresses) + if any(w_s == WorkerStatus.Error for w_s in workers_status): + raise Exception("Exception during parallelization.") + return wrapped_reduce_function( (Path(output_file.name) for output_file in output_files), reduce_extra, diff --git a/pandarallel/progress_bars.py b/pandarallel/progress_bars.py index dfe9190..a454b0f 100644 --- a/pandarallel/progress_bars.py +++ b/pandarallel/progress_bars.py @@ -129,13 +129,17 @@ def __init__(self, maxs: List[int], show: bool) -> None: return from IPython.display import display - from ipywidgets import HBox, IntProgress, Label, VBox + from ipywidgets import HBox, IntProgress, Label, VBox, Textarea, Layout self.__bars = [ HBox( [ IntProgress(0, 0, max, description="{:.2f}%".format(0)), Label("{} / {}".format(0, max)), + Textarea( + disabled=True, + layout=Layout(width="800px", height="200px", display="None"), + ), ] ) for max in maxs @@ -152,7 +156,10 @@ def update(self, values: List[int]) -> None: return for index, value in enumerate(values): - bar, label = self.__bars[index].children + bar, label, _ = self.__bars[index].children + + if bar.bar_style == "danger": + continue bar.value = value bar.description = "{:.2f}%".format(value / bar.max * 100) @@ -162,13 +169,15 @@ def update(self, values: List[int]) -> None: label.value = "{} / {}".format(value, bar.max) - def set_error(self, index: int) -> None: + def set_error(self, index: int, err_msg: str) -> None: """Set a bar on error""" if not self.__show: return - bar, _ = self.__bars[index].children + bar, _, txt_area = self.__bars[index].children bar.bar_style = "danger" + txt_area.value = err_msg + txt_area.layout.display = "" def get_progress_bars(