diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index f41e14e..65ecd47 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -17,32 +17,15 @@ jobs: matrix: os: ["ubuntu-20.04", "macos-latest", "windows-latest"] # Remove Python 3.7.9 on 27 Jun 2023: https://endoflife.date/python - python-version: ["3.7.9", "3.8.10", "3.9.13", "3.10.8"] - pandas-version: ["1.0.5", "1.1.5", "1.2.5", "1.3.5", "1.4.3", "2.0", ""] + python-version: ["3.9.20", "3.10.15","3.11.11","3.12.8","3.13.1"] + pandas-version: ["1.2.5", "1.3.5", "1.4.3", "2.0.3","2.1.4" , ""] exclude: - # see https://github.com/nalepae/pandarallel/pull/211#issuecomment-1362647674 - - python-version: "3.8.10" - pandas-version: "1.0.5" - # Pandas 2.0 requires Python >= 3.8 - - python-version: "3.7.9" - pandas-version: "2.0" - # Pandas 1.4.3 requires Python >= 3.8 - - python-version: "3.7.9" - pandas-version: "1.4.3" - # Pandas 1.0.5 has to be fully rebuilt with Python >= 3.9.13 (taking > 10 min) - - python-version: "3.9.13" - pandas-version: "1.0.5" - # Pandas 1.0.5 has to be fully rebuilt with Python >= 3.9.13 (taking > 10 min) - - python-version: "3.10.8" - pandas-version: "1.0.5" - # Pandas 1.1.5 has to be fully rebuilt with Python >= 3.10.5 (taking > 10 min) - - python-version: "3.10.8" - pandas-version: "1.1.5" # Pandas 1.2.5 has to be fully rebuilt with Python >= 3.10.5 (taking > 10 min) - python-version: "3.10.8" pandas-version: "1.2.5" - + + steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} diff --git a/docs/examples_mac_linux.ipynb b/docs/examples_mac_linux.ipynb index 9a7e304..99ecbbf 100644 --- a/docs/examples_mac_linux.ipynb +++ b/docs/examples_mac_linux.ipynb @@ -104,7 +104,66 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# DataFrame.applymap" + "# DataFrame.map\n", + "map was introduced with pandas 2.1. Use applymap for earlier versions instead" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_size = int(1e7)\n", + "df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n", + " b=np.random.rand(df_size)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def func(x):\n", + " return math.sin(x**2) - math.cos(x**2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.applymap(func)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res_parallel = df.parallel_map(func)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res.equals(res_parallel)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# DataFrame.applymap\n", + "applymap was deprecated with pandas 2.1 use map instead" ] }, { @@ -157,6 +216,54 @@ "res.equals(res_parallel)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# DataFrame.agg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_size = int(5e4)\n", + "df = pd.DataFrame(dict(a=np.random.rand(df_size),\n", + " b=np.random.rand(df_size)\n", + " c=np.random.rand(df_size)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.agg(['min','max'],axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res_parallel = df.parallel_agg('min',axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res.equals(res_parallel)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -539,7 +646,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.6" + "version": "3.12.3" }, "mimetype": "text/x-python", "name": "python", diff --git a/docs/examples_windows.ipynb b/docs/examples_windows.ipynb index a149db9..ead28df 100644 --- a/docs/examples_windows.ipynb +++ b/docs/examples_windows.ipynb @@ -134,7 +134,67 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# DataFrame.applymap" + "# DataFrame.map\n", + "map was introduced with pandas 2.1. Use applymap for earlier versions instead" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_size = int(1e7)\n", + "df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),\n", + " b=np.random.rand(df_size)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def func(x):\n", + " import math\n", + " return math.sin(x**2) - math.cos(x**2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.map(func)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res_parallel = df.parallel_map(func)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res.equals(res_parallel)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# DataFrame.applymap\n", + "applymap was deprecated with pandas 2.1 use map instead" ] }, { @@ -188,6 +248,54 @@ "res.equals(res_parallel)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# DataFrame.agg" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df_size = int(5e4)\n", + "df = pd.DataFrame(dict(a=np.random.rand(df_size),\n", + " b=np.random.rand(df_size),\n", + " c=np.random.rand(df_size)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.agg(['min','max'],axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "res = df.parallel_agg(['min','max'],axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res.equals(res_parallel)" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/pandarallel/core.py b/pandarallel/core.py index c54ec14..d1a46ff 100644 --- a/pandarallel/core.py +++ b/pandarallel/core.py @@ -523,6 +523,15 @@ def initialize( pd.DataFrame.parallel_apply = parallelize( nb_workers, DataFrame.Apply, progress_bars_in_user_defined_function ) + pd.DataFrame.parallel_map = parallelize( + nb_workers, + DataFrame.Map, + progress_bars_in_user_defined_function_multiply_by_number_of_columns, + ) + pd.DataFrame.parallel_agg = parallelize( + nb_workers, DataFrame.Agg, progress_bars_in_user_defined_function + ) + # applymap is outdated, might get removed in future pd.DataFrame.parallel_applymap = parallelize( nb_workers, DataFrame.ApplyMap, diff --git a/pandarallel/data_types/dataframe.py b/pandarallel/data_types/dataframe.py index 29c7e50..6de0dfa 100644 --- a/pandarallel/data_types/dataframe.py +++ b/pandarallel/data_types/dataframe.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, Iterable, Iterator +from typing import Any, Callable, Dict, List, Iterable, Iterator from types import GeneratorType import pandas as pd @@ -50,6 +50,7 @@ def reduce( axis = 0 if isinstance(datas[0], pd.Series) else 1 - extra["axis"] return pd.concat(datas, copy=False, axis=axis) + # applymap is outdated might be removed in future class ApplyMap(DataType): @staticmethod def get_chunks( @@ -73,3 +74,70 @@ def reduce( datas: Iterable[pd.DataFrame], extra: Dict[str, Any] ) -> pd.DataFrame: return pd.concat(datas, copy=False) + + + class Map(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: pd.DataFrame, **kwargs + ) -> Iterator[pd.DataFrame]: + for chunk_ in chunk(data.shape[0], nb_workers): + yield data.iloc[chunk_] + + @staticmethod + def work( + data: pd.DataFrame, + user_defined_function: Callable, + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.DataFrame: + return data.map(user_defined_function) + + @staticmethod + def reduce( + datas: Iterable[pd.DataFrame], extra: Dict[str, Any] + ) -> pd.DataFrame: + return pd.concat(datas, copy=False) + + class Agg(DataType): + @staticmethod + def get_chunks( + nb_workers: int, data: pd.DataFrame, **kwargs + ) -> Iterator[pd.DataFrame]: + user_defined_function_kwargs = kwargs["user_defined_function_kwargs"] + + axis_int = get_axis_int(user_defined_function_kwargs) + opposite_axis_int = 1 - axis_int + + for chunk_ in chunk(data.shape[opposite_axis_int], nb_workers): + yield data.iloc[chunk_] if axis_int == 1 else data.iloc[:, chunk_] + + @staticmethod + def work( + data: pd.DataFrame, + user_defined_functions: List[Callable], + user_defined_function_args: tuple, + user_defined_function_kwargs: Dict[str, Any], + extra: Dict[str, Any], + ) -> pd.DataFrame: + return data.agg( + user_defined_functions, + *user_defined_function_args, + **user_defined_function_kwargs, + ) + + @staticmethod + def get_reduce_extra( + data: Any, user_defined_function_kwargs: Dict[str, Any] + ) -> Dict[str, Any]: + return {"axis": get_axis_int(user_defined_function_kwargs)} + + @staticmethod + def reduce( + datas: Iterable[pd.DataFrame], extra: Dict[str, Any] + ) -> pd.DataFrame: + if isinstance(datas, GeneratorType): + datas = list(datas) + axis = 0 if isinstance(datas[0], pd.Series) else 1 - extra["axis"] + return pd.concat(datas, copy=False, axis=axis) \ No newline at end of file diff --git a/tests/test_pandarallel.py b/tests/test_pandarallel.py index 0f91c32..d82263b 100644 --- a/tests/test_pandarallel.py +++ b/tests/test_pandarallel.py @@ -6,6 +6,8 @@ import pytest from pandarallel import pandarallel +from packaging.version import Version + @pytest.fixture(params=(1000, 1)) def df_size(request): @@ -54,6 +56,33 @@ def func(x): request.param ] +@pytest.fixture(params=("named", "anonymous")) +def func_dataframe_map(request): + def func(x): + return math.sin(x**2) - math.cos(x**2) + + return dict(named=func, anonymous=lambda x: math.sin(x**2) - math.cos(x**2))[ + request.param + ] + + +@pytest.fixture(params=("named", "anonymous")) +def func_dataframe_agg_axis_0(request): + def func(x): + return max(x) - min(x) + + return dict(named=func, anonymous=lambda x: max(x) - min(x))[request.param] + + +@pytest.fixture(params=("named", "anonymous")) +def func_dataframe_agg_axis_1(request): + def func(x): + return math.sin(x.a**2) + math.sin(x.b**2) + + return dict( + named=func, anonymous=lambda x: math.sin(x.a**2) + math.sin(x.b**2) + )[request.param] + @pytest.fixture(params=("named", "anonymous")) def func_series_map(request): @@ -204,6 +233,35 @@ def test_dataframe_apply_axis_1(pandarallel_init, func_dataframe_apply_axis_1, d res_parallel = df.parallel_apply(func_dataframe_apply_axis_1, axis=1) assert res.equals(res_parallel) +def test_dataframe_agg_axis_0(pandarallel_init, func_dataframe_agg_axis_0, df_size): + df = pd.DataFrame( + dict( + a=np.random.randint(1, 8, df_size), + b=np.random.rand(df_size), + c=np.random.randint(1, 8, df_size), + d=np.random.rand(df_size), + e=np.random.randint(1, 8, df_size), + f=np.random.rand(df_size), + g=np.random.randint(1, 8, df_size), + h=np.random.rand(df_size), + ) + ) + df.index = [item / 10 for item in df.index] + + res = df.agg(func_dataframe_agg_axis_0) + res_parallel = df.parallel_agg(func_dataframe_agg_axis_0) + assert res.equals(res_parallel) + + +def test_dataframe_agg_axis_1(pandarallel_init, func_dataframe_agg_axis_1, df_size): + df = pd.DataFrame( + dict(a=np.random.randint(1, 8, df_size), b=np.random.rand(df_size)) + ) + df.index = [item / 10 for item in df.index] + + res = df.agg(func_dataframe_agg_axis_1, axis=1) + res_parallel = df.parallel_agg(func_dataframe_agg_axis_1, axis=1) + assert res.equals(res_parallel) def test_dataframe_apply_invalid_axis(pandarallel_init): df = pd.DataFrame(dict(a=[1, 2, 3, 4])) @@ -225,7 +283,7 @@ def test_empty_dataframe_apply_axis_1(pandarallel_init, func_dataframe_apply_axi res_parallel = df.parallel_apply(func_dataframe_apply_axis_1) assert res.equals(res_parallel) - +@pytest.mark.skipif(Version(pd.__version__) >= Version("2.1"), reason="applymap is deprecated for pandas >=2.1") def test_dataframe_applymap(pandarallel_init, func_dataframe_applymap, df_size): df = pd.DataFrame( dict(a=np.random.randint(1, 8, df_size), b=np.random.rand(df_size)) @@ -237,6 +295,18 @@ def test_dataframe_applymap(pandarallel_init, func_dataframe_applymap, df_size): assert res.equals(res_parallel) +@pytest.mark.skipif(Version(pd.__version__) < Version("2.1"), reason="dataframe.map was introduced with pandas 2.1") +def test_dataframe_map(pandarallel_init, func_dataframe_map, df_size): + df = pd.DataFrame( + dict(a=np.random.randint(1, 8, df_size), b=np.random.rand(df_size)) + ) + df.index = [item / 10 for item in df.index] + + res = df.map(func_dataframe_map) + res_parallel = df.parallel_map(func_dataframe_map) + assert res.equals(res_parallel) + + def test_series_map(pandarallel_init, func_series_map, df_size): df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1)) @@ -245,6 +315,21 @@ def test_series_map(pandarallel_init, func_series_map, df_size): assert res.equals(res_parallel) +def test_empty_dataframe_agg_axis_0(pandarallel_init, func_dataframe_agg_axis_0): + df = pd.DataFrame() + + res = df.agg(func_dataframe_agg_axis_0) + res_parallel = df.parallel_agg(func_dataframe_agg_axis_0) + assert res.equals(res_parallel) + +def test_empty_dataframe_agg_axis_1(pandarallel_init, func_dataframe_agg_axis_1): + df = pd.DataFrame() + + res = df.agg(func_dataframe_agg_axis_1) + res_parallel = df.parallel_agg(func_dataframe_agg_axis_1) + assert res.equals(res_parallel) + + def test_series_apply(pandarallel_init, func_series_apply, df_size): df = pd.DataFrame(dict(a=np.random.rand(df_size) + 1))