Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions pandarallel/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def parallelize_with_memory_file_system(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
max_progress_bars=-1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typehint

):
def closure(
data: Any,
Expand Down Expand Up @@ -239,7 +240,7 @@ def closure(

show_progress_bars = progress_bars_type != ProgressBarsType.No

progress_bars = get_progress_bars(progresses_length, show_progress_bars)
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check line length -- I think this repo (generally) follows the PEP8 rule of max 79 character lines. @nalepae do we have an auto-formatter that we use?

progresses = [0] * nb_workers
workers_status = [WorkerStatus.Running] * nb_workers

Expand Down Expand Up @@ -355,6 +356,7 @@ def parallelize_with_pipe(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
max_progress_bars=-1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typehint

):
def closure(
data: Any,
Expand Down Expand Up @@ -391,7 +393,7 @@ def closure(

show_progress_bars = progress_bars_type != ProgressBarsType.No

progress_bars = get_progress_bars(progresses_length, show_progress_bars)
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
progresses = [0] * nb_workers
workers_status = [WorkerStatus.Running] * nb_workers

Expand Down Expand Up @@ -458,7 +460,8 @@ def initialize(
verbose=2,
use_memory_fs: Optional[bool] = None,
) -> None:
show_progress_bars = progress_bar
show_progress_bars = bool(progress_bar)
max_progress_bars = int(progress_bar) if not isinstance(progress_bar, bool) else nb_workers
is_memory_fs_available = Path(MEMORY_FS_ROOT).exists()

use_memory_fs = (
Expand Down Expand Up @@ -521,36 +524,37 @@ def initialize(

# DataFrame
pd.DataFrame.parallel_apply = parallelize(
nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function
nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function, max_progress_bars
)
pd.DataFrame.parallel_applymap = parallelize(
nb_workers,
DataFrame.ApplyMap,
progress_bars_in_user_defined_function_multiply_by_number_of_columns,
max_progress_bars,
)

# DataFrame GroupBy
PandaDataFrameGroupBy.parallel_apply = parallelize(
nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function
nb_workers, DataFrameGroupBy.Apply, progress_bars_in_user_defined_function, max_progress_bars
)

# Expanding GroupBy
PandasExpandingGroupby.parallel_apply = parallelize(
nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function
nb_workers, ExpandingGroupBy.Apply, progress_bars_in_work_function, max_progress_bars
)

# Rolling GroupBy
PandasRollingGroupby.parallel_apply = parallelize(
nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function
nb_workers, RollingGroupBy.Apply, progress_bars_in_work_function, max_progress_bars
)

# Series
pd.Series.parallel_apply = parallelize(
nb_workers, Series.Apply, progress_bars_in_user_defined_function
nb_workers, Series.Apply, progress_bars_in_user_defined_function, max_progress_bars
)
pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars)
pd.Series.parallel_map = parallelize(nb_workers, Series.Map, show_progress_bars, max_progress_bars)

# Series Rolling
pd.core.window.Rolling.parallel_apply = parallelize(
nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function
nb_workers, SeriesRolling.Apply, progress_bars_in_user_defined_function, max_progress_bars
)
47 changes: 37 additions & 10 deletions pandarallel/progress_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ def is_notebook_lab() -> bool:


class ProgressBarsConsole(ProgressBars):
def __init__(self, maxs: List[int], show: bool) -> None:
def __init__(self, maxs: List[int], show: bool, count=-1) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typehint

self.__show = show
self.__bars = [[0, max] for max in maxs]
num_workers = len(maxs)
self.bars_to_show = min(count, num_workers) if count > 0 else num_workers
self.map_worker_to_bar = {
index: (index % self.bars_to_show) for index in range(num_workers)
}
bar_maxs = (
sum(maxs[k] for k, v in self.map_worker_to_bar.items() if v == bar_index)
for bar_index in range(self.bars_to_show)
)
self.__bars = [[0, max] for max in bar_maxs]
self.__width = self.__get_width()

self.__lines = self.__update_lines()
Expand Down Expand Up @@ -108,7 +117,11 @@ def update(self, values: List[int]) -> None:
if not self.__show:
return

for index, value in enumerate(values):
bar_values = [0] * self.bars_to_show
for worker_index, value in enumerate(values):
bar_values[self.map_worker_to_bar[worker_index]] += value

for index, value in enumerate(bar_values):
self.__bars[index][0] = value

self.__remove_displayed_lines()
Expand All @@ -119,7 +132,7 @@ def update(self, values: List[int]) -> None:


class ProgressBarsNotebookLab(ProgressBars):
def __init__(self, maxs: List[int], show: bool) -> None:
def __init__(self, maxs: List[int], show: bool, count=-1) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typehint

"""Initialization.
Positional argument:
maxs - List containing the max value of each progress bar
Expand All @@ -129,6 +142,16 @@ def __init__(self, maxs: List[int], show: bool) -> None:
if not show:
return

num_workers = len(maxs)
self.bars_to_show = min(count, num_workers) if count > 0 else num_workers
self.map_worker_to_bar = {
index: (index % self.bars_to_show) for index in range(num_workers)
}
bar_maxs = (
sum(maxs[k] for k, v in self.map_worker_to_bar.items() if v == bar_index)
for bar_index in range(self.bars_to_show)
)

from IPython.display import display
from ipywidgets import HBox, IntProgress, Label, VBox

Expand All @@ -139,7 +162,7 @@ def __init__(self, maxs: List[int], show: bool) -> None:
Label("{} / {}".format(0, max)),
]
)
for max in maxs
for max in bar_maxs
]

display(VBox(self.__bars))
Expand All @@ -152,7 +175,11 @@ def update(self, values: List[int]) -> None:
if not self.__show:
return

for index, value in enumerate(values):
bar_values = [0] * self.bars_to_show
for worker_index, value in enumerate(values):
bar_values[self.map_worker_to_bar[worker_index]] += value

for index, value in enumerate(bar_values):
bar, label = self.__bars[index].children

bar.value = value
Expand All @@ -168,17 +195,17 @@ def set_error(self, index: int) -> None:
if not self.__show:
return

bar, _ = self.__bars[index].children
bar, _ = self.__bars[self.map_worker_to_bar[index]].children
bar.bar_style = "danger"


def get_progress_bars(
maxs: List[int], show
maxs: List[int], show, count=-1
) -> Union[ProgressBarsNotebookLab, ProgressBarsConsole]:
return (
ProgressBarsNotebookLab(maxs, show)
ProgressBarsNotebookLab(maxs, show, count)
if is_notebook_lab()
else ProgressBarsConsole(maxs, show)
else ProgressBarsConsole(maxs, show, count)
)


Expand Down
34 changes: 33 additions & 1 deletion tests/test_pandarallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def df_size(request):
return request.param


@pytest.fixture(params=(False, True))
@pytest.fixture(params=(False, True, 1))
def progress_bar(request):
return request.param

Expand Down Expand Up @@ -358,6 +358,38 @@ def test_dataframe_axis_1_no_reduction(

assert res.equals(res_parallel)


def test_limit_number_of_progress_bars():
from pandarallel.progress_bars import get_progress_bars
progresses_length = [2, 3, 2]
show_progress_bars = True
max_progress_bars = 3
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
mangled_attr_name = f"_{progress_bars.__class__.__name__}__bars"
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 3
max_progress_bars = 4
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 3
max_progress_bars = 2
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 2
max_progress_bars = 1
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 1
max_progress_bars = -100
progress_bars = get_progress_bars(progresses_length, show_progress_bars, max_progress_bars)
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 3
show_progress_bars = False
progress_bars = get_progress_bars(progresses_length * 2, show_progress_bars, max_progress_bars)
__bars = getattr(progress_bars, mangled_attr_name)
assert len(__bars) == 6


def test_memory_fs_root_environment_variable(monkeypatch):
monkeypatch.setenv("MEMORY_FS_ROOT", "/test")
from pandarallel import core
Expand Down