diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a7ed66 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ + +\.idea/ + +ipfn/__pycache__/ + +tests/__pycache__/ + +.pytest_cache/ +.vscode/ diff --git a/ipfn/ipfn.py b/ipfn/ipfn.py old mode 100755 new mode 100644 index 2f0637f..ee61f4c --- a/ipfn/ipfn.py +++ b/ipfn/ipfn.py @@ -8,9 +8,17 @@ class ipfn(object): - - def __init__(self, original, aggregates, dimensions, weight_col='total', - convergence_rate=1e-5, max_iteration=500, verbose=0, rate_tolerance=1e-8): + def __init__( + self, + original, + aggregates, + dimensions, + weight_col="total", + convergence_rate=1e-5, + max_iteration=500, + verbose=0, + rate_tolerance=1e-8, + ): """ Initialize the ipfn class @@ -49,15 +57,15 @@ def index_axis_elem(dims, axes, elems): inc_axis = 0 idx = () for dim in range(dims): - if (inc_axis < len(axes)): - if (dim == axes[inc_axis]): + if inc_axis < len(axes): + if dim == axes[inc_axis]: idx += (elems[inc_axis],) inc_axis += 1 else: idx += (np.s_[:],) return idx - def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): + def ipfn_np(self, m, aggregates, dimensions, weight_col="total"): """ Runs the ipfn method from a matrix m, aggregates/marginals and the dimension(s) preserved. For example: @@ -77,16 +85,16 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): inc = 0 for aggregate in aggregates: if not isinstance(aggregate, np.ndarray): - aggregate = np.array(aggregate).astype(np.float) + aggregate = np.array(aggregate).astype(float) aggregates[inc] = aggregate - elif aggregate.dtype not in [np.float, float]: - aggregate = aggregate.astype(np.float) + elif aggregate.dtype != float: + aggregate = aggregate.astype(float) aggregates[inc] = aggregate inc += 1 if not isinstance(m, np.ndarray): m = np.array(m) - elif m.dtype not in [np.float, float]: - m = m.astype(np.float) + elif m.dtype != float: + m = m.astype(float) steps = len(aggregates) dim = len(m.shape) @@ -149,7 +157,7 @@ def ipfn_np(self, m, aggregates, dimensions, weight_col='total'): return m, max_conv - def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): + def ipfn_df(self, df, aggregates, dimensions): """ Runs the ipfn method from a dataframe df, aggregates/marginals and the dimension(s) preserved. For example: @@ -181,108 +189,69 @@ def ipfn_df(self, df, aggregates, dimensions, weight_col='total'): print(df) print(df.groupby('age')['total'].sum(), xip)""" - steps = len(aggregates) - tables = [df] - for inc in range(steps - 1): - tables.append(df.copy()) - original = df.copy() - - # Calculate the new weights for each dimension - inc = 0 - for features in dimensions: - if inc == (steps - 1): - table_update = df - table_current = tables[inc].copy() + factors = [] + index_names = df.index.names + + for k, d in enumerate(dimensions): + dfg = df.groupby(level=d).sum() + f = aggregates[k].div(dfg) + # Requires pandas >= 0.24 + if len(d) > 1: + rem_index = [lvl for lvl in index_names if lvl in d] + df = df.multiply(f.reorder_levels(rem_index), axis=0).reorder_levels( + index_names + ) else: - table_update = tables[inc + 1] - table_current = tables[inc] + df = df.multiply(f, fill_value=0) - tmp = table_current.groupby(features)[weight_col].sum() - xijk = aggregates[inc] + f = f.sub(1).abs().max() + factors.append(f) - feat_l = [] - for feature in features: - feat_l.append(np.unique(table_current[feature])) - table_update.set_index(features, inplace=True) - table_current.set_index(features, inplace=True) + # Check for convergence + max_conv = max(factors) - multi_index_flag = isinstance(table_update.index, pd.MultiIndex) - if multi_index_flag: - if not table_update.index.is_lexsorted(): - table_update.sort_index(inplace=True) - if not table_current.index.is_lexsorted(): - table_current.sort_index(inplace=True) - - for feature in product(*feat_l): - den = tmp.loc[feature] - # calculate new weight for this iteration - - if not multi_index_flag: - msk = table_update.index == feature[0] - else: - msk = feature - - if den == 0: - table_update.loc[msk, weight_col] =\ - table_current.loc[feature, weight_col] *\ - xijk.loc[feature] - else: - table_update.loc[msk, weight_col] = \ - table_current.loc[feature, weight_col].astype(float) * \ - xijk.loc[feature] / den - - table_update.reset_index(inplace=True) - table_current.reset_index(inplace=True) - inc += 1 - feat_l = [] - - # Calculate the max convergence rate - max_conv = 0 - inc = 0 - for features in dimensions: - tmp = table_update.groupby(features)[weight_col].sum() - ori_ijk = aggregates[inc] - temp_conv = max(abs(tmp / ori_ijk - 1)) - if temp_conv > max_conv: - max_conv = temp_conv - inc += 1 - - return table_update, max_conv + return df, max_conv def iteration(self): """ Runs the ipfn algorithm. Automatically detects of working with numpy ndarray or pandas dataframes. """ + def _prepare_df_format(df): + # Add index + idxcols = list(set(x for l in self.dimensions for x in l)) + df = df.reset_index().set_index(idxcols) + # Turn to series + df = df[self.weight_col] + return df + i = 0 - conv = np.inf - old_conv = -np.inf - conv_list = [] + conv = self.conv_rate * 100 + conv_progress = [] m = self.original # If the original data input is in pandas DataFrame format if isinstance(self.original, pd.DataFrame): + m = _prepare_df_format(m) ipfn_method = self.ipfn_df elif isinstance(self.original, np.ndarray): ipfn_method = self.ipfn_np - self.original = self.original.astype('float64') + self.original = self.original.astype("float64") else: - print('Data input instance not recognized') + print("Data input instance not recognized") sys.exit(0) - while ((i <= self.max_itr and conv > self.conv_rate) and (i <= self.max_itr and abs(conv - old_conv) > self.rate_tolerance)): - old_conv = conv - m, conv = ipfn_method(m, self.aggregates, self.dimensions, self.weight_col) - conv_list.append(conv) + + while (i <= self.max_itr) and (conv > self.conv_rate): + m, conv = ipfn_method(m, self.aggregates, self.dimensions) + conv_progress.append(conv) i += 1 - converged = 1 + if i <= self.max_itr: - if (not conv > self.conv_rate) & (self.verbose > 1): - print('ipfn converged: convergence_rate below threshold') - elif not abs(conv - old_conv) > self.rate_tolerance: - print('ipfn converged: convergence_rate not updating or below rate_tolerance') + converged = True + print("ipfn converged") else: - print('Maximum iterations reached') - converged = 0 + print("Maximum iterations reached") + converged = False # Handle the verbose if self.verbose == 0: @@ -290,7 +259,10 @@ def iteration(self): elif self.verbose == 1: return m, converged elif self.verbose == 2: - return m, converged, pd.DataFrame({'iteration': range(i), 'conv': conv_list}).set_index('iteration') + conv_progress = pd.DataFrame( + {"iteration": range(i), "convergence": conv_progress} + ).set_index("iteration") + return m, converged, conv_progress else: - print('wrong verbose input, return None') + print("wrong verbose input, return None") sys.exit(0) diff --git a/tests/tests.py b/tests/tests.py index b4e480b..bd5c03d 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -254,7 +254,7 @@ def test_pandas_3D(self): for marginal, vertical in marginals1D: features = marginal.index.tolist() for feature in features: - assert round(df.groupby(vertical)['total'].sum().loc[feature], 2) == round(marginal.loc[feature], 2) + assert round(df.groupby(vertical).sum().loc[feature], 2) == round(marginal.loc[feature], 2) m_inc += 1 marginals2D = [(xijp, ['dma', 'size']), (xpjk, ['size', 'age'])] @@ -262,5 +262,5 @@ def test_pandas_3D(self): for marginal, vertical in marginals2D: features = marginal.index.tolist() for feature in features: - assert round(df.groupby(vertical)['total'].sum().loc[feature], 2) == round(marginal.loc[feature], 2) + assert round(df.groupby(vertical).sum().loc[feature], 2) == round(marginal.loc[feature], 2) m_inc += 1