diff --git a/pandarallel/core.py b/pandarallel/core.py index c54ec14..81f6d14 100644 --- a/pandarallel/core.py +++ b/pandarallel/core.py @@ -205,6 +205,7 @@ def parallelize_with_memory_file_system( nb_requested_workers: int, data_type: Type[DataType], progress_bars_type: ProgressBarsType, + max_progress_bars=-1, ): def closure( data: Any, @@ -239,7 +240,7 @@ def closure( show_progress_bars = progress_bars_type != ProgressBarsType.No - progress_bars = get_progress_bars(progresses_length, show_progress_bars) + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) progresses = [0] * nb_workers workers_status = [WorkerStatus.Running] * nb_workers @@ -355,6 +356,7 @@ def parallelize_with_pipe( nb_requested_workers: int, data_type: Type[DataType], progress_bars_type: ProgressBarsType, + max_progress_bars=-1, ): def closure( data: Any, @@ -391,7 +393,7 @@ def closure( show_progress_bars = progress_bars_type != ProgressBarsType.No - progress_bars = get_progress_bars(progresses_length, show_progress_bars) + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) progresses = [0] * nb_workers workers_status = [WorkerStatus.Running] * nb_workers @@ -458,7 +460,8 @@ def initialize( verbose=2, use_memory_fs: Optional[bool] = None, ) -> None: - show_progress_bars = progress_bar + show_progress_bars = bool(progress_bar) + max_progress_bars = int(progress_bar) if not isinstance(progress_bar, bool) else nb_workers is_memory_fs_available = Path(MEMORY_FS_ROOT).exists() use_memory_fs = ( @@ -521,36 +524,37 @@ def initialize( # DataFrame pd.DataFrame.parallel_apply = parallelize( - nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function + nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function, max_progress_bars ) pd.DataFrame.parallel_applymap = parallelize( nb_workers, DataFrame.ApplyMap, progress_bars_in_user_defined_function_multiply_by_number_of_columns, + max_progress_bars, ) # DataFrame GroupBy PandaDataFrameGroupBy.parallel_apply = parallelize( - nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function + nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function, max_progress_bars ) # Expanding GroupBy PandasExpandingGroupby.parallel_apply = parallelize( - nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function + nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function, max_progress_bars ) # Rolling GroupBy PandasRollingGroupby.parallel_apply = parallelize( - nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function + nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function, max_progress_bars ) # Series pd.Series.parallel_apply = parallelize( - nb_workers, Series.Apply, progress_bars_in_user_defined_function + nb_workers, Series.Apply, progress_bars_in_user_defined_function, max_progress_bars ) - pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars) + pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars, max_progress_bars) # Series Rolling pd.core.window.Rolling.parallel_apply = parallelize( - nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function + nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function, max_progress_bars ) diff --git a/pandarallel/progress_bars.py b/pandarallel/progress_bars.py index ffaa738..b8b53fd 100644 --- a/pandarallel/progress_bars.py +++ b/pandarallel/progress_bars.py @@ -55,9 +55,18 @@ def is_notebook_lab() -> bool: class ProgressBarsConsole(ProgressBars): - def __init__(self, maxs: List[int], show: bool) -> None: + def __init__(self, maxs: List[int], show: bool, count=-1) -> None: self.__show = show - self.__bars = [[0, max] for max in maxs] + num_workers = len(maxs) + self.bars_to_show = min(count, num_workers) if count > 0 else num_workers + self.map_worker_to_bar = { + index: (index % self.bars_to_show) for index in range(num_workers) + } + bar_maxs = ( + sum(maxs[k] for k, v in self.map_worker_to_bar.items() if v == bar_index) + for bar_index in range(self.bars_to_show) + ) + self.__bars = [[0, max] for max in bar_maxs] self.__width = self.__get_width() self.__lines = self.__update_lines() @@ -108,7 +117,11 @@ def update(self, values: List[int]) -> None: if not self.__show: return - for index, value in enumerate(values): + bar_values = [0] * self.bars_to_show + for worker_index, value in enumerate(values): + bar_values[self.map_worker_to_bar[worker_index]] += value + + for index, value in enumerate(bar_values): self.__bars[index][0] = value self.__remove_displayed_lines() @@ -119,7 +132,7 @@ def update(self, values: List[int]) -> None: class ProgressBarsNotebookLab(ProgressBars): - def __init__(self, maxs: List[int], show: bool) -> None: + def __init__(self, maxs: List[int], show: bool, count=-1) -> None: """Initialization. Positional argument: maxs - List containing the max value of each progress bar @@ -129,6 +142,16 @@ def __init__(self, maxs: List[int], show: bool) -> None: if not show: return + num_workers = len(maxs) + self.bars_to_show = min(count, num_workers) if count > 0 else num_workers + self.map_worker_to_bar = { + index: (index % self.bars_to_show) for index in range(num_workers) + } + bar_maxs = ( + sum(maxs[k] for k, v in self.map_worker_to_bar.items() if v == bar_index) + for bar_index in range(self.bars_to_show) + ) + from IPython.display import display from ipywidgets import HBox, IntProgress, Label, VBox @@ -139,7 +162,7 @@ def __init__(self, maxs: List[int], show: bool) -> None: Label("{} / {}".format(0, max)), ] ) - for max in maxs + for max in bar_maxs ] display(VBox(self.__bars)) @@ -152,7 +175,11 @@ def update(self, values: List[int]) -> None: if not self.__show: return - for index, value in enumerate(values): + bar_values = [0] * self.bars_to_show + for worker_index, value in enumerate(values): + bar_values[self.map_worker_to_bar[worker_index]] += value + + for index, value in enumerate(bar_values): bar, label = self.__bars[index].children bar.value = value @@ -168,17 +195,17 @@ def set_error(self, index: int) -> None: if not self.__show: return - bar, _ = self.__bars[index].children + bar, _ = self.__bars[self.map_worker_to_bar[index]].children bar.bar_style = "danger" def get_progress_bars( - maxs: List[int], show + maxs: List[int], show, count=-1 ) -> Union[ProgressBarsNotebookLab, ProgressBarsConsole]: return ( - ProgressBarsNotebookLab(maxs, show) + ProgressBarsNotebookLab(maxs, show, count) if is_notebook_lab() - else ProgressBarsConsole(maxs, show) + else ProgressBarsConsole(maxs, show, count) ) diff --git a/tests/test_pandarallel.py b/tests/test_pandarallel.py index 0f66fcf..39e17d3 100644 --- a/tests/test_pandarallel.py +++ b/tests/test_pandarallel.py @@ -12,7 +12,7 @@ def df_size(request): return request.param -@pytest.fixture(params=(False, True)) +@pytest.fixture(params=(False, True, 1)) def progress_bar(request): return request.param @@ -358,6 +358,38 @@ def test_dataframe_axis_1_no_reduction( assert res.equals(res_parallel) + +def test_limit_number_of_progress_bars(): + from pandarallel.progress_bars import get_progress_bars + progresses_length = [2, 3, 2] + show_progress_bars = True + max_progress_bars = 3 + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) + mangled_attr_name = f"_{progress_bars.__class__.__name__}__bars" + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 3 + max_progress_bars = 4 + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 3 + max_progress_bars = 2 + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 2 + max_progress_bars = 1 + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 1 + max_progress_bars = -100 + progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars) + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 3 + show_progress_bars = False + progress_bars = get_progress_bars(progresses_length * 2, show_progress_bars, max_progress_bars) + __bars = getattr(progress_bars, mangled_attr_name) + assert len(__bars) == 6 + + def test_memory_fs_root_environment_variable(monkeypatch): monkeypatch.setenv("MEMORY_FS_ROOT", "/test") from pandarallel import core