From 3cf26d1e23f41cc948f55afba2a510dade4f8a10 Mon Sep 17 00:00:00 2001 From: hasanpandher Date: Mon, 8 Sep 2025 16:34:07 +0530 Subject: [PATCH 1/2] Algo optimisations and polars compatibility --- .idea/.gitignore | 8 + .../inspectionProfiles/profiles_settings.xml | 7 + .idea/ipfn.iml | 13 + .idea/misc.xml | 10 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + benchmark.py | 243 ++++++++++++++ ipfn.egg-info/PKG-INFO | 239 +++++++++++++ ipfn.egg-info/SOURCES.txt | 12 + ipfn.egg-info/dependency_links.txt | 1 + ipfn.egg-info/requires.txt | 2 + ipfn.egg-info/top_level.txt | 1 + ipfn/__init__.py | 2 +- ipfn/ipfn.py | 314 ++++++++++++++---- ipfn_contributors.txt | 5 + pytest.ini | 6 + tests/context.py | 2 +- 17 files changed, 808 insertions(+), 71 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/ipfn.iml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 benchmark.py create mode 100644 ipfn.egg-info/PKG-INFO create mode 100644 ipfn.egg-info/SOURCES.txt create mode 100644 ipfn.egg-info/dependency_links.txt create mode 100644 ipfn.egg-info/requires.txt create mode 100644 ipfn.egg-info/top_level.txt create mode 100644 pytest.ini diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..dd4c951 --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,7 @@ + + + + \ No newline at end of file diff --git a/.idea/ipfn.iml b/.idea/ipfn.iml new file mode 100644 index 0000000..014f736 --- /dev/null +++ b/.idea/ipfn.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a8f3eb3 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,10 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..f6758a3 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..fba81c1 --- /dev/null +++ b/benchmark.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +import time +import numpy as np +import pandas as pd +from ipfn import ipfn + +# Optional Polars import to benchmark polars backend when available +try: + import polars as pl # type: ignore +except Exception: + pl = None + + +def time_run(name, builder_fn, repeats=1000, algorithms=("legacy", "optimized")): + print(f"\n=== {name} ===") + times = {algo: [] for algo in algorithms} + results = {} + for algo in algorithms: + for _ in range(repeats): + original, aggregates, dimensions, weight_col = builder_fn() + # If benchmarking polars and original is pandas, convert to Polars to begin with + if (pl is not None) and (algo == 'polars'): + if isinstance(original, pd.DataFrame): + try: + original = pl.from_pandas(original) + except Exception: + original = pl.DataFrame(original) + IPF = ipfn(original, aggregates, dimensions, convergence_rate=1e-6, algorithm=algo) + t0 = time.perf_counter() + out = IPF.iteration() + t1 = time.perf_counter() + times[algo].append(t1 - t0) + results[algo] = out + print(f"{algo:9s} avg time over {repeats}: {np.mean(times[algo]):.6f}s (min {np.min(times[algo]):.6f}s, max {np.max(times[algo]):.6f}s)") + return results + + +def wikipedia_2d_example(): + def build(): + m = np.array([[40, 30, 20, 10], [35, 50, 100, 75], [30, 80, 70, 120], [20, 30, 40, 50]], dtype=float) + xip = np.array([150, 300, 400, 150], dtype=float) + xpj = np.array([200, 300, 400, 100], dtype=float) + aggregates = [xip, xpj] + dimensions = [[0], [1]] + return m.copy(), aggregates, dimensions, 'total' + return build + + +def three_d_example(): + def build(): + m = np.zeros((2, 4, 3), dtype=float) + m[0,0,0] = 1; m[0,0,1] = 2; m[0,0,2] = 1 + m[0,1,0] = 3; m[0,1,1] = 5; m[0,1,2] = 5 + m[0,2,0] = 6; m[0,2,1] = 2; m[0,2,2] = 2 + m[0,3,0] = 1; m[0,3,1] = 7; m[0,3,2] = 2 + m[1,0,0] = 5; m[1,0,1] = 4; m[1,0,2] = 2 + m[1,1,0] = 5; m[1,1,1] = 5; m[1,1,2] = 5 + m[1,2,0] = 3; m[1,2,1] = 8; m[1,2,2] = 7 + m[1,3,0] = 2; m[1,3,1] = 7; m[1,3,2] = 6 + xipp = np.array([52, 48], dtype=float) + xpjp = np.array([20, 30, 35, 15], dtype=float) + xppk = np.array([35, 40, 25], dtype=float) + xijp = np.array([[9, 17, 19, 7], [11, 13, 16, 8]], dtype=float) + xpjk = np.array([[7, 9, 4], [8, 12, 10], [15, 12, 8], [5, 7, 3]], dtype=float) + aggregates = [xipp, xpjp, xppk, xijp, xpjk] + dimensions = [[0], [1], [2], [0, 1], [1, 2]] + return m.copy(), aggregates, dimensions, 'total' + return build + + +def pandas_example(): + def build(): + age = [30, 30, 30, 30, 40, 40, 40, 40, 50, 50, 50, 50] + distance = [10, 20, 30, 40, 10, 20, 30, 40, 10, 20, 30, 40] + m = [8., 4., 6., 7., 3., 6., 5., 2., 9., 11., 3., 1.] + df = pd.DataFrame({'age': age, 'distance': distance, 'total': m}) + xip = df.groupby('age')['total'].sum() + xip.loc[30] = 20 + xip.loc[40] = 18 + xip.loc[50] = 22 + xpj = df.groupby('distance')['total'].sum() + xpj.loc[10] = 18 + xpj.loc[20] = 16 + xpj.loc[30] = 12 + xpj.loc[40] = 14 + dimensions = [['age'], ['distance']] + aggregates = [xip, xpj] + return df.copy(), aggregates, dimensions, 'total' + return build + + +def main(): + print("Benchmarking IPFN: legacy vs optimized (+ polars if available)") + time_run('Wikipedia 2D (NumPy)', wikipedia_2d_example(), repeats=10) + time_run('3D example (NumPy)', three_d_example(), repeats=10) + algos_pd = ("legacy", "optimized") + (("polars",) if pl is not None else tuple()) + time_run('Pandas example', pandas_example(), repeats=10, algorithms=algos_pd) + # New required 6D/1500-rows benchmark with 100 repetitions and time/CPU/RAM metrics + benchmark_6d_1500(repeats=100) + + +if __name__ == '__main__': + main() + + +# ---- 6D 1500-rows benchmark with time/CPU/RAM measurements ---- + +def _build_6d_dataset_1500(seed: int = 42): + import numpy as np + import pandas as pd + + rng = np.random.RandomState(seed) + n_rows = 1500 + # Define 6 dimensions with varying number of categories + dims = { + 'd1': [f'a{i}' for i in range(5)], + 'd2': [f'b{i}' for i in range(5)], + 'd3': [f'c{i}' for i in range(5)], + 'd4': [f'd{i}' for i in range(5)], + 'd5': [f'e{i}' for i in range(3)], + 'd6': [f'f{i}' for i in range(4)], + } + data = {} + for k, cats in dims.items(): + data[k] = rng.choice(cats, size=n_rows, replace=True) + # Positive weights + data['total'] = rng.gamma(shape=2.0, scale=5.0, size=n_rows).astype(float) + df = pd.DataFrame(data) + + # Build single-dimension marginals with a small random perturbation and rescale to preserve total + total_sum = df['total'].sum() + aggregates = [] + dimensions = [] + for k in dims.keys(): + grp = df.groupby(k)['total'].sum() + # Perturb each group by up to ±10% + noise = rng.uniform(0.9, 1.1, size=len(grp)) + target = grp.values * noise + # Rescale to match original total sum for stability + target *= (total_sum / target.sum()) + agg = pd.Series(target, index=grp.index, name='total') + aggregates.append(agg) + dimensions.append([k]) + + return df, aggregates, dimensions, 'total' + + +def _measure_ipfn_run(algo: str, original, aggregates, dimensions, weight_col='total'): + import time + import copy as _copy + import tracemalloc + + # Optional psutil import + try: + import psutil # type: ignore + proc = psutil.Process() + except Exception: + psutil = None # type: ignore + proc = None + + # Prepare input per algorithm (convert to Polars if requested and available) + local_original = original.copy(deep=True) if hasattr(original, 'copy') else _copy.deepcopy(original) + if (pl is not None) and (algo == 'polars'): + try: + import polars as _pl # type: ignore + if not isinstance(local_original, _pl.DataFrame): + try: + local_original = _pl.from_pandas(local_original) + except Exception: + local_original = _pl.DataFrame(local_original) + except Exception: + pass + + # Measurements + cpu_t0 = time.process_time() + wall_t0 = time.perf_counter() + + tracemalloc.start() + rss_before = proc.memory_info().rss if proc else None + + IPF = ipfn(local_original, _copy.deepcopy(aggregates), _copy.deepcopy(dimensions), convergence_rate=1e-6, algorithm=algo) + _ = IPF.iteration() + + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + wall_t1 = time.perf_counter() + cpu_t1 = time.process_time() + rss_after = proc.memory_info().rss if proc else None + + return { + 'wall_time_s': wall_t1 - wall_t0, + 'cpu_time_s': cpu_t1 - cpu_t0, + 'peak_tracemalloc_bytes': int(peak), + 'rss_before_bytes': int(rss_before) if rss_before is not None else None, + 'rss_after_bytes': int(rss_after) if rss_after is not None else None, + 'rss_delta_bytes': (int(rss_after) - int(rss_before)) if (rss_before is not None and rss_after is not None) else None, + } + + +def benchmark_6d_1500(repeats: int = 100): + print("\nBenchmark: 6D dataset with 1500 rows, 100 runs per algorithm") + # Determine algorithms: always legacy and optimized; include polars when available + algos = ["legacy", "optimized"] + (["polars"] if pl is not None else []) + + original, aggregates, dimensions, weight_col = _build_6d_dataset_1500() + + import numpy as _np + results_summary = {} + for algo in algos: + wall_times = [] + cpu_times = [] + peaks = [] + rss_deltas = [] + rss_after = [] + for _ in range(repeats): + m = _measure_ipfn_run(algo, original, aggregates, dimensions, weight_col) + wall_times.append(m['wall_time_s']) + cpu_times.append(m['cpu_time_s']) + peaks.append(m['peak_tracemalloc_bytes']) + if m['rss_delta_bytes'] is not None: + rss_deltas.append(m['rss_delta_bytes']) + if m['rss_after_bytes'] is not None: + rss_after.append(m['rss_after_bytes']) + summary = { + 'wall_time_avg_s': float(_np.mean(wall_times)), + 'wall_time_min_s': float(_np.min(wall_times)), + 'wall_time_max_s': float(_np.max(wall_times)), + 'cpu_time_avg_s': float(_np.mean(cpu_times)), + 'peak_tracemalloc_avg_bytes': int(_np.mean(peaks)), + } + if rss_deltas: + summary['rss_delta_avg_bytes'] = float(_np.mean(rss_deltas)) + if rss_after: + summary['rss_after_avg_bytes'] = float(_np.mean(rss_after)) + results_summary[algo] = summary + + # Pretty print summary + print(f"{algo:9s} | wall avg {summary['wall_time_avg_s']:.6f}s (min {summary['wall_time_min_s']:.6f}, max {summary['wall_time_max_s']:.6f}) | " + f"cpu avg {summary['cpu_time_avg_s']:.6f}s | peak_py_mem avg {summary['peak_tracemalloc_avg_bytes']/1_048_576:.3f} MiB" + + (f" | rss_delta avg {summary['rss_delta_avg_bytes']/1_048_576:.3f} MiB | rss_after avg {summary['rss_after_avg_bytes']/1_048_576:.3f} MiB" if 'rss_delta_avg_bytes' in summary else "")) + + return results_summary diff --git a/ipfn.egg-info/PKG-INFO b/ipfn.egg-info/PKG-INFO new file mode 100644 index 0000000..dade528 --- /dev/null +++ b/ipfn.egg-info/PKG-INFO @@ -0,0 +1,239 @@ +Metadata-Version: 2.4 +Name: ipfn +Version: 1.4.4 +Summary: Iterative Proportional Fitting with N dimensions, for python +Home-page: https://github.com/Dirguis/ipfn.git +Author: Damien Forthommme +Author-email: damien2227@hotmail.com +License: MIT +Keywords: iterative proportional fitting ipfp biproportional ras raking scaling +Platform: Any +Classifier: Development Status :: 4 - Beta +Classifier: Intended Audience :: Science/Research +Classifier: Topic :: Software Development :: Build Tools +Classifier: License :: OSI Approved :: MIT License +Classifier: Programming Language :: Python :: 3 +Requires-Dist: pandas>=0.22.0 +Requires-Dist: numpy +Dynamic: author +Dynamic: author-email +Dynamic: classifier +Dynamic: description +Dynamic: home-page +Dynamic: keywords +Dynamic: license +Dynamic: platform +Dynamic: requires-dist +Dynamic: summary + +ipfn +======================= + +Iterative proportional fitting is an algorithm used is many different fields such as economics or social sciences, to alter results in such a way that aggregates along one or several dimensions match known marginals (or aggregates along these same dimensions). + +The algorithm exists in 2 versions: + +* numpy version, which the fastest by far +* pandas version, which is much slower but easier to use than the numpy version + + +The algorithm recognizes the input variable type and and uses the appropriate version to solve the problem. To install the package: + +* pip install ipfn +* pip install git+http://github.com/dirguis/ipfn@master + +For more information and examples, please visit: + +* `wikipedia page on ipf `_ +* `slides explaining the methodology and links to specific examples `_ +* https://github.com/Dirguis/ipfn + +---- + +If you want to test the package, clone the repo and from the main folder, run: + +* py.test --verbose --color=yes tests/tests.py + +---- + +The project is similar to the ipfp package available for R and tests have been run to ensure same results. + +---- + +Input Variables: + * original: numpy darray matrix or dataframe to perform the ipfn on. + * aggregates: list of numpy array or darray or pandas dataframe/series. The aggregates are the same as the marginals. +They are the target values that we want along one or several axis when aggregating along one or several axes. + * dimensions: list of lists with integers if working with numpy objects, or column names if working with pandas objects. +Preserved dimensions along which we sum to get the corresponding aggregates. + * convergence_rate: if there are many aggregates/marginal, it could be useful to loosen the convergence criterion. + * max_iteration: Integer. Maximum number of iterations allowed. + * verbose: integer 0, 1 or 2. Each case number includes the outputs of the previous case numbers. + + * 0: Updated matrix returned. + + * 1: Flag with the output status (0 for failure and 1 for success). + + * 2: dataframe with iteration numbers and convergence rate information at all steps. + + * rate_tolerance: float value. If above 0.0, like 0.001, the algorithm will stop once the difference between the conv_rate variable of 2 consecutive iterations is below that specified value. + + +Wikipedia example with Numpy: +----------------------------- +To illustrate Iterative Proportional Fitting, Wikipedia uses an example `here `_ + +Below is that example solved with IPFN:: + + import numpy as np + from ipfn import ipfn + + m = [[40, 30, 20, 10], [35, 50, 100, 75], [30, 80, 70, 120], [20, 30, 40, 50]] + m = np.array(m) + xip = np.array([150, 300, 400, 150]) + xpj = np.array([200, 300, 400, 100]) + + aggregates = [xip, xpj] + dimensions = [[0], [1]] + + IPF = ipfn.ipfn(m, aggregates, dimensions, convergence_rate=1e-6) + m = IPF.iteration() + print(m) + + +Example with the numpy version of the algorithm: +------------------------------------------------ +Please, follow the example below to run the package. Several additional examples in addition to the one listed below, are listed in the ipfn.py script. This example is taken from ``_ + +First, let us define a matrix of N=3 dimensions, the matrix being of specific size 2*4*3 and populate that matrix with some values :: + + from ipfn import ipfn + import numpy as np + import pandas as pd + + m = np.zeros((2,4,3)) + m[0,0,0] = 1 + m[0,0,1] = 2 + m[0,0,2] = 1 + m[0,1,0] = 3 + m[0,1,1] = 5 + m[0,1,2] = 5 + m[0,2,0] = 6 + m[0,2,1] = 2 + m[0,2,2] = 2 + m[0,3,0] = 1 + m[0,3,1] = 7 + m[0,3,2] = 2 + m[1,0,0] = 5 + m[1,0,1] = 4 + m[1,0,2] = 2 + m[1,1,0] = 5 + m[1,1,1] = 5 + m[1,1,2] = 5 + m[1,2,0] = 3 + m[1,2,1] = 8 + m[1,2,2] = 7 + m[1,3,0] = 2 + m[1,3,1] = 7 + m[1,3,2] = 6 + +Now, let us define some marginals:: + + xipp = np.array([52, 48]) + xpjp = np.array([20, 30, 35, 15]) + xppk = np.array([35, 40, 25]) + xijp = np.array([[9, 17, 19, 7], [11, 13, 16, 8]]) + xpjk = np.array([[7, 9, 4], [8, 12, 10], [15, 12, 8], [5, 7, 3]]) + +I used the letter p to denote the dimension(s) being summed over + +For this specific example, they all have to be less than N=3 dimensions and be consistent with the dimensions of contingency table m. For example, the marginal along the first dimension will be made of 2 elements. We want the sum of elements in m for dimensions 2 and 3 to equal the marginal:: + + m[0,:,:].sum() == xipp[0] + m[1,:,:].sum() == xipp[1] + +Define the aggregates list and the corresponding list of dimension to indicate the algorithm which dimension(s) to sum over for each aggregate:: + + aggregates = [xipp, xpjp, xppk, xijp, xpjk] + dimensions = [[0], [1], [2], [0, 1], [1, 2]] + +Finally, run the algorithm:: + + IPF = ipfn.ipfn(m, aggregates, dimensions) + m = IPF.iteration() + print(xijp[0,0]) + print(m[0, 0, :].sum()) + + +Example with the pandas version of the algorithm: +------------------------------------------------- +In the same fashion, we can run a similar example, but using a dataframe:: + + from ipfn import ipfn + import numpy as np + import pandas as pd + + m = np.array([1., 2., 1., 3., 5., 5., 6., 2., 2., 1., 7., 2., + 5., 4., 2., 5., 5., 5., 3., 8., 7., 2., 7., 6.], ) + dma_l = [501, 501, 501, 501, 501, 501, 501, 501, 501, 501, 501, 501, + 502, 502, 502, 502, 502, 502, 502, 502, 502, 502, 502, 502] + size_l = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, + 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4] + + age_l = ['20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45', + '20-25','30-35','40-45'] + + df = pd.DataFrame() + df['dma'] = dma_l + df['size'] = size_l + df['age'] = age_l + df['total'] = m + + xipp = df.groupby('dma')['total'].sum() + xpjp = df.groupby('size')['total'].sum() + xppk = df.groupby('age')['total'].sum() + xijp = df.groupby(['dma', 'size'])['total'].sum() + xpjk = df.groupby(['size', 'age'])['total'].sum() + # xppk = df.groupby('age')['total'].sum() + + xipp.loc[501] = 52 + xipp.loc[502] = 48 + + xpjp.loc[1] = 20 + xpjp.loc[2] = 30 + xpjp.loc[3] = 35 + xpjp.loc[4] = 15 + + xppk.loc['20-25'] = 35 + xppk.loc['30-35'] = 40 + xppk.loc['40-45'] = 25 + + xijp.loc[501] = [9, 17, 19, 7] + xijp.loc[502] = [11, 13, 16, 8] + + xpjk.loc[1] = [7, 9, 4] + xpjk.loc[2] = [8, 12, 10] + xpjk.loc[3] = [15, 12, 8] + xpjk.loc[4] = [5, 7, 3] + + aggregates = [xipp, xpjp, xppk, xijp, xpjk] + dimensions = [['dma'], ['size'], ['age'], ['dma', 'size'], ['size', 'age']] + + IPF = ipfn.ipfn(df, aggregates, dimensions) + df = IPF.iteration() + + print(df) + print(df.groupby('size')['total'].sum(), xpjp) + +Added notes: +------------ + +To call the algorithm in a program, execute:: + + from ipfn import ipfn diff --git a/ipfn.egg-info/SOURCES.txt b/ipfn.egg-info/SOURCES.txt new file mode 100644 index 0000000..3bd2981 --- /dev/null +++ b/ipfn.egg-info/SOURCES.txt @@ -0,0 +1,12 @@ +MANIFEST.in +README.rst +setup.cfg +setup.py +ipfn/__init__.py +ipfn/ipfn.py +ipfn.egg-info/PKG-INFO +ipfn.egg-info/SOURCES.txt +ipfn.egg-info/dependency_links.txt +ipfn.egg-info/requires.txt +ipfn.egg-info/top_level.txt +tests/tests.py \ No newline at end of file diff --git a/ipfn.egg-info/dependency_links.txt b/ipfn.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ipfn.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/ipfn.egg-info/requires.txt b/ipfn.egg-info/requires.txt new file mode 100644 index 0000000..de86843 --- /dev/null +++ b/ipfn.egg-info/requires.txt @@ -0,0 +1,2 @@ +pandas>=0.22.0 +numpy diff --git a/ipfn.egg-info/top_level.txt b/ipfn.egg-info/top_level.txt new file mode 100644 index 0000000..ecb7b5f --- /dev/null +++ b/ipfn.egg-info/top_level.txt @@ -0,0 +1 @@ +ipfn diff --git a/ipfn/__init__.py b/ipfn/__init__.py index b2ac7d2..5a54c47 100644 --- a/ipfn/__init__.py +++ b/ipfn/__init__.py @@ -1,3 +1,3 @@ -from ipfn import ipfn +from .ipfn import ipfn __all__ = ['ipfn'] diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py index 6b2181b..a09b9cb 100755 --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -5,11 +5,17 @@ from itertools import product import copy +# Optional Polars support +try: + import polars as pl # type: ignore +except Exception: # pragma: no cover - polars optional at runtime + pl = None + class ipfn(object): def __init__(self, original, aggregates, dimensions, weight_col='total', - convergence_rate=1e-5, max_iteration=500, verbose=0, rate_tolerance=1e-8): + convergence_rate=1e-5, max_iteration=500, verbose=0, rate_tolerance=1e-8, algorithm='optimized'): """ Initialize the ipfn class @@ -32,6 +38,8 @@ def __init__(self, original, aggregates, dimensions, weight_col='total', rate_tolerance: float value. If above 0.0, like 0.001, the algorithm will stop once the difference between the conv_rate variable of 2 consecutive iterations is below that specified value + algorithm: 'optimized' (default) uses vectorized numpy implementation; 'legacy' uses the original slice-iteration algorithm (slower). Only affects numpy path. + For examples, please open the ipfn script or look for help on functions ipfn_np and ipfn_df """ self.original = original @@ -44,6 +52,9 @@ def __init__(self, original, aggregates, dimensions, weight_col='total', raise(ValueError(f"wrong verbose input, must be either 0, 1 or 2 but got {verbose}")) self.verbose = verbose self.rate_tolerance = rate_tolerance + if algorithm not in ('optimized', 'legacy', 'polars'): + raise(ValueError(f"algorithm must be 'optimized', 'legacy' or 'polars' but got {algorithm}")) + self.algorithm = algorithm @staticmethod def index_axis_elem(dims, axes, elems): @@ -58,20 +69,9 @@ def index_axis_elem(dims, axes, elems): idx += (np.s_[:],) return idx - def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): + def _ipfn_np_legacy(self, m, aggregates, dimensions, weight_col='total'): """ - Runs the ipfn method from a matrix m, aggregates/marginals and the dimension(s) preserved. - For example: - from ipfn import ipfn - import numpy as np - m = np.array([[8., 4., 6., 7.], [3., 6., 5., 2.], [9., 11., 3., 1.]], ) - xip = np.array([20., 18., 22.]) - xpj = np.array([18., 16., 12., 14.]) - aggregates = [xip, xpj] - dimensions = [[0], [1]] - - IPF = ipfn(m, aggregates, dimensions) - m = IPF.iteration() + Legacy numpy implementation retained for benchmarking. """ # Check that the inputs are numpay arrays of floats @@ -93,13 +93,10 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): dim = len(m.shape) product_elem = [] tables = [m] - # TODO: do we need to persist all these dataframe? Or maybe we just need to persist the table_update and table_current - # and then update the table_current to the table_update to the latest we have. And create an empty zero dataframe for table_update (Evelyn) for inc in range(steps - 1): tables.append(np.array(np.zeros(m.shape))) original = copy.copy(m) - # Calculate the new weights for each dimension for inc in range(steps): if inc == (steps - 1): table_update = m @@ -113,28 +110,16 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): idx = self.index_axis_elem(dim, dimensions[inc], item) table_current_slice = table_current[idx] mijk = table_current_slice.sum() - # TODO: Directly put it as xijk = aggregates[inc][item] (Evelyn) xijk = aggregates[inc] xijk = xijk[item] if mijk == 0: - # table_current_slice += 1e-5 - # TODO: Basically, this part would remain 0 as always right? Cause if the sum of the slice is zero, then we only have zeros in this slice. - # TODO: you could put it as table_update[idx] = table_current_slice (since multiplication on zero is still zero) table_update[idx] = table_current_slice else: - # TODO: when inc == steps - 1, this part is also directly updating the dataframe m (Evelyn) - # If we are not going to persist every table generated, we could still keep this part to directly update dataframe m table_update[idx] = table_current_slice * 1.0 * xijk / mijk - # For debug purposes - # if np.isnan(table_update).any(): - # print(idx) - # sys.exit(0) product_elem = [] - # Check the convergence rate for each dimension max_conv = 0 for inc in range(steps): - # TODO: this part already generated before, we could somehow persist it. But it's not important (Evelyn) for dimension in dimensions[inc]: product_elem.append(range(m.shape[dimension])) for item in product(*product_elem): @@ -142,46 +127,80 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): ori_ijk = aggregates[inc][item] m_slice = m[idx] m_ijk = m_slice.sum() - # print('Current vs original', abs(m_ijk/ori_ijk - 1)) if abs(m_ijk / ori_ijk - 1) > max_conv: max_conv = abs(m_ijk / ori_ijk - 1) - product_elem = [] return m, max_conv - def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): + def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): """ - Runs the ipfn method from a dataframe df, aggregates/marginals and the dimension(s) preserved. - For example: - from ipfn import ipfn - import pandas as pd - age = [30, 30, 30, 30, 40, 40, 40, 40, 50, 50, 50, 50] - distance = [10,20,30,40,10,20,30,40,10,20,30,40] - m = [8., 4., 6., 7., 3., 6., 5., 2., 9., 11., 3., 1.] - df = pd.DataFrame() - df['age'] = age - df['distance'] = distance - df['total'] = m - - xip = df.groupby('age')['total'].sum() - xip.loc[30] = 20 - xip.loc[40] = 18 - xip.loc[50] = 22 - xpj = df.groupby('distance')['total'].sum() - xpj.loc[10] = 18 - xpj.loc[20] = 16 - xpj.loc[30] = 12 - xpj.loc[40] = 14 - dimensions = [['age'], ['distance']] - aggregates = [xip, xpj] - - IPF = ipfn(df, aggregates, dimensions) - df = IPF.iteration() - - print(df) - print(df.groupby('age')['total'].sum(), xip)""" + Optimized numpy implementation using vectorized broadcasting. + """ + # Ensure numpy arrays of floats + for i, aggregate in enumerate(aggregates): + if not isinstance(aggregate, np.ndarray): + aggregates[i] = np.array(aggregate, dtype=float) + elif aggregate.dtype not in [float, float]: + aggregates[i] = aggregate.astype(float) + if not isinstance(m, np.ndarray): + m = np.array(m, dtype=float) + elif m.dtype not in [float, float]: + m = m.astype(float) + steps = len(aggregates) + dim = m.ndim + + # Main proportional fitting passes + for inc in range(steps): + axes_keep = dimensions[inc] + # axes to sum (all except axes_keep) + sum_axes = tuple(ax for ax in range(dim) if ax not in axes_keep) + + # Current marginal sums with dimensions kept for broadcasting + denom = m.sum(axis=sum_axes, keepdims=True) + + # Target reshaped for broadcasting on original m shape + target = np.array(aggregates[inc], dtype=float) + reshape_shape = [1] * dim + for k, ax in enumerate(axes_keep): + reshape_shape[ax] = target.shape[k] + target_b = target.reshape(reshape_shape) + + # Compute ratio into denom buffer to avoid extra allocation + np.divide(target_b, denom, out=denom, where=denom != 0) + # When denom == 0, keep ratio 1.0 (no change) + if np.any(denom == 0): + denom[denom == 0] = 1.0 + + # Scale m in-place + m *= denom + + # Convergence computation + max_conv = 0.0 + for inc in range(steps): + axes_keep = dimensions[inc] + sum_axes = tuple(ax for ax in range(dim) if ax not in axes_keep) + current = m.sum(axis=sum_axes) + target = np.array(aggregates[inc], dtype=float) + with np.errstate(divide='ignore', invalid='ignore'): + diff = np.abs(current / target - 1.0) + if np.any(target == 0): + # If target==0 and current>0, set inf; if both 0, set 0 + zmask = target == 0 + zval = np.where(current == 0, 0.0, np.inf) + diff = np.where(zmask, zval, diff) + # nanmax handles potential NaNs from 0/0 + local_max = np.nanmax(diff) + if local_max > max_conv: + max_conv = float(local_max) + + return m, max_conv + + def _ipfn_df_legacy(self, df, aggregates, dimensions, weight_col='total'): + """ + Legacy pandas implementation retained for benchmarking. + """ steps = len(aggregates) tables = [df] for inc in range(steps - 1): @@ -235,7 +254,6 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): table_update.reset_index(inplace=True) table_current.reset_index(inplace=True) inc += 1 - feat_l = [] # Calculate the max convergence rate max_conv = 0 @@ -250,9 +268,140 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): return table_update, max_conv + def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): + """ + Runs the ipfn method from a dataframe df, aggregates/marginals and the dimension(s) preserved. + Vectorized implementation to reduce Python-level loops and memory usage. + """ + steps = len(aggregates) + + # Calculate the new weights for each dimension directly on df + for inc, features in enumerate(dimensions): + # Current group sums + tmp = df.groupby(features, sort=False)[weight_col].sum() + xijk = aggregates[inc] + # Align target to current group index if needed + if isinstance(xijk, (pd.Series, pd.DataFrame)): + if isinstance(xijk, pd.DataFrame): + if xijk.shape[1] != 1: + raise(ValueError('Aggregate DataFrame must have a single column')) + xijk = xijk.iloc[:, 0] + xijk_aligned = xijk.reindex(tmp.index) + else: + xijk_aligned = pd.Series(np.array(xijk, dtype=float).ravel(), index=tmp.index) + + ratio = pd.Series(1.0, index=tmp.index, dtype=float) + nonzero = tmp != 0 + ratio.loc[nonzero] = (xijk_aligned.loc[nonzero].astype(float) / tmp.loc[nonzero].astype(float)) + + ratio_df = ratio.rename('ipf_ratio').reset_index() + merged = df.merge(ratio_df, on=features, how='left', sort=False) + df[weight_col] = merged[weight_col].astype(float) * merged['ipf_ratio'].astype(float) + + # Calculate the max convergence rate + max_conv = 0 + for inc, features in enumerate(dimensions): + tmp = df.groupby(features, sort=False)[weight_col].sum() + ori_ijk = aggregates[inc] + if isinstance(ori_ijk, pd.DataFrame): + ori_ijk = ori_ijk.iloc[:, 0] + if isinstance(ori_ijk, pd.Series): + ori_ijk = ori_ijk.reindex(tmp.index) + with np.errstate(divide='ignore', invalid='ignore'): + temp_conv = np.nanmax(np.abs(tmp.values / np.array(ori_ijk, dtype=float) - 1)) + if temp_conv > max_conv: + max_conv = temp_conv + + return df, max_conv + + @staticmethod + def _aggregate_to_pl_df(xijk, features, target_col: str = 'target'): + """Convert aggregate (pandas Series/DataFrame or numpy) to a Polars DataFrame with given feature columns and target column.""" + if pl is None: + raise RuntimeError("Polars is not available") + # pandas Series/DataFrame path + if isinstance(xijk, pd.DataFrame): + if xijk.shape[1] != 1: + raise(ValueError('Aggregate DataFrame must have a single column')) + xijk = xijk.iloc[:, 0] + if isinstance(xijk, pd.Series): + df_pd = xijk.rename(target_col).reset_index() + # Ensure columns order matches features then target + # If features do not match, rely on column names after reset_index + # Rename target to target_col already done + return pl.DataFrame(df_pd) + # numpy array path (fallback): build cartesian from features unique values is not available here + # We only support Series/DataFrame aggregates for DataFrame backend + arr = np.array(xijk, dtype=float).ravel() + # Create a single dummy column when features is single and length matches + if len(features) == 1: + return pl.DataFrame({features[0]: range(len(arr)), target_col: arr}) + else: + raise ValueError("When using polars backend with DataFrame inputs, aggregates should be pandas Series/DataFrame") + + def ipfn_pl(self, df_pl, aggregates, dimensions, weight_col='total'): + """ + Polars-based implementation of IPFN for tabular inputs. + Accepts a Polars DataFrame and aggregates as pandas Series/DataFrame or Polars DataFrames. + Returns a Polars DataFrame and the max convergence. + """ + if pl is None: + raise RuntimeError("Polars is not installed. Please install polars to use algorithm='polars'.") + + steps = len(aggregates) + df = df_pl + # Ensure weight column is float + if weight_col not in df.columns: + raise ValueError(f"weight_col '{weight_col}' not found in DataFrame") + df = df.with_columns(pl.col(weight_col).cast(pl.Float64)) + + for inc, features in enumerate(dimensions): + # Group sums + grouped = df.group_by(features).agg(pl.col(weight_col).sum().alias('sum_w')) + # Target as Polars + xijk = aggregates[inc] + if isinstance(xijk, pl.DataFrame): + target_df = xijk.rename({xijk.columns[-1]: 'target'}) + else: + target_df = self._aggregate_to_pl_df(xijk, features, target_col='target') + # Join and compute ratio + ratio_tbl = grouped.join(target_df, on=features, how='left') + ratio_tbl = ratio_tbl.with_columns( + pl.when(pl.col('sum_w') != 0) + .then(pl.col('target').cast(pl.Float64) / pl.col('sum_w')) + .otherwise(1.0) + .alias('ipf_ratio') + ).select(features + ['ipf_ratio']) + # Apply ratio to weights + df = df.join(ratio_tbl, on=features, how='left') + df = df.with_columns((pl.col(weight_col) * pl.col('ipf_ratio')).alias(weight_col)) + df = df.drop('ipf_ratio') + + # Convergence computation + max_conv = 0.0 + for inc, features in enumerate(dimensions): + grouped = df.group_by(features).agg(pl.col(weight_col).sum().alias('sum_w')) + xijk = aggregates[inc] + if isinstance(xijk, pl.DataFrame): + target_df = xijk.rename({xijk.columns[-1]: 'target'}) + else: + target_df = self._aggregate_to_pl_df(xijk, features, target_col='target') + joined = grouped.join(target_df, on=features, how='left') + joined = joined.with_columns( + pl.when(pl.col('target') == 0) + .then(pl.when(pl.col('sum_w') == 0).then(0.0).otherwise(float('inf'))) + .otherwise((pl.col('sum_w') / pl.col('target') - 1.0).abs()) + .alias('conv_abs') + ) + local_max = joined.select(pl.max('conv_abs')).item() + if local_max is not None and float(local_max) > max_conv: + max_conv = float(local_max) + + return df, max_conv + def iteration(self): """ - Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or pandas dataframes. + Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or (pandas|polars) dataframes. """ i = 0 @@ -261,14 +410,33 @@ def iteration(self): conv_list = [] m = self.original - # If the original data input is in pandas DataFrame format - if isinstance(self.original, pd.DataFrame): - ipfn_method = self.ipfn_df - elif isinstance(self.original, np.ndarray): - ipfn_method = self.ipfn_np + used_polars_backend = False + + # Determine backend based on input type and requested algorithm + if isinstance(m, pd.DataFrame): + if self.algorithm == 'legacy': + ipfn_method = self._ipfn_df_legacy + elif self.algorithm == 'polars': + if pl is None: + raise RuntimeError("algorithm='polars' requested but Polars is not installed") + # convert pandas to polars + try: + m = pl.from_pandas(m) + except Exception: + m = pl.DataFrame(m) + ipfn_method = self.ipfn_pl + used_polars_backend = True + else: + ipfn_method = self.ipfn_df + elif isinstance(m, np.ndarray): + ipfn_method = self._ipfn_np_legacy if self.algorithm == 'legacy' else self.ipfn_np self.original = self.original.astype('float64') + elif (pl is not None) and isinstance(m, pl.DataFrame): + ipfn_method = self.ipfn_pl + used_polars_backend = True else: - raise(ValueError(f'Data input instance not recognized. The input matrix is not a numpy array or pandas DataFrame')) + raise(ValueError('Data input instance not recognized. The input matrix is not a numpy array or (pandas|polars) DataFrame')) + while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)): old_conv = conv m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) @@ -284,6 +452,14 @@ def iteration(self): print('Maximum iterations reached') converged = 0 + # Convert back to pandas DataFrame if we used polars backend but the original was pandas + if used_polars_backend and (isinstance(self.original, pd.DataFrame)): + try: + m = m.to_pandas() + except Exception: + # Fallback: construct via rows + m = pd.DataFrame(m.to_dicts()) + # Handle the verbose if self.verbose == 0: return m diff --git a/ipfn_contributors.txt b/ipfn_contributors.txt index 7a59a7f..1b6c1c6 100644 --- a/ipfn_contributors.txt +++ b/ipfn_contributors.txt @@ -19,3 +19,8 @@ https://github.com/harisbal mattswoon https://github.com/mattswoon + +dhruv-panther +hasan@panther-home.com +https://github.com/dhruv-panther +Added new algoriths and polars support diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..e6cc686 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +# Ensure pytest discovers our legacy-named tests file +python_files = tests.py test_*.py *_test.py +python_classes = Test* +python_functions = test_* +testpaths = tests diff --git a/tests/context.py b/tests/context.py index 3399491..f8163fd 100644 --- a/tests/context.py +++ b/tests/context.py @@ -1,4 +1,4 @@ import os import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from ipfn import ipfn +import ipfn From 54ff622c15a8c287d57219a06777cf5d8e67d192 Mon Sep 17 00:00:00 2001 From: hasanpandher Date: Mon, 8 Sep 2025 16:43:11 +0530 Subject: [PATCH 2/2] Algo optimisations and polars compatibility --- .gitignore | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b23483f --- /dev/null +++ b/.gitignore @@ -0,0 +1,51 @@ +# Python bytecode and cache +__pycache__/ +*.py[cod] +*$py.class + +# Virtual environments +.venv/ +venv/ +env/ +.python-version + +# Environment variables/secrets +.env +.env.* + +# Build and distribution artifacts +build/ +dist/ +.eggs/ +*.egg-info/ + +# Testing and coverage +.pytest_cache/ +.mypy_cache/ +.tox/ +.nox/ +.coverage +.coverage.* +coverage.xml +htmlcov/ + +# IDE/editor settings +.idea/ +.vscode/ +.ropeproject/ + +# OS files +.DS_Store +Thumbs.db + +# Jupyter +.ipynb_checkpoints/ + +# Caches +.cache/ + +# PIP and packaging +pip-wheel-metadata/ + +# Misc +*.log