Skip to content
This repository was archived by the owner on Jun 19, 2025. It is now read-only.
106 changes: 67 additions & 39 deletions bquery/ctable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import ctable_ext

# external imports
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
import numpy as np
import bcolz
from collections import namedtuple
Expand All @@ -10,6 +12,9 @@
SUM, COUNT, COUNT_NA, COUNT_DISTINCT, SORTED_COUNT_DISTINCT


NUM_PROC = cpu_count()


class ctable(bcolz.ctable):
def cache_valid(self, col):
"""
Expand Down Expand Up @@ -110,35 +115,49 @@ def unique(self, col_or_col_list):

return output

def aggregate_groups_by_iter_2(self, ct_agg, nr_groups, skip_key,
factor_carray, groupby_cols, output_agg_ops,
bool_arr=None,
agg_method=ctable_ext.SUM):
def helper_agg_groups_by_iter(self, ct_agg=None, col=None, factor_carray=None, nr_groups=None,
skip_key=None, agg_method=None):
# TODO: input vs output column
col_dtype = ct_agg[col].dtype

if col_dtype == np.float64:
r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
elif col_dtype == np.int64:
r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
elif col_dtype == np.int32:
r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
else:
raise NotImplementedError(
'Column dtype ({0}) not supported for aggregation yet '
'(only int32, int64 & float64)'.format(str(col_dtype)))

return r

def agg_groups_by_iter(self, ct_agg, nr_groups, skip_key,
factor_carray, groupby_cols, output_agg_ops,
bool_arr=None,
agg_method=ctable_ext.SUM):
total = []
results = []
pool = ThreadPool(processes=NUM_PROC)

for col in groupby_cols:
total.append(ctable_ext.groupby_value(self[col], factor_carray,
nr_groups, skip_key))

for col, agg_op in output_agg_ops:
# TODO: input vs output column
col_dtype = ct_agg[col].dtype

if col_dtype == np.float64:
r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
elif col_dtype == np.int64:
r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
elif col_dtype == np.int32:
r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups,
skip_key, agg_method=agg_method)
else:
raise NotImplementedError(
'Column dtype ({0}) not supported for aggregation yet '
'(only int32, int64 & float64)'.format(str(col_dtype)))
kwds = {"ct_agg": ct_agg, "col": col,
"factor_carray": factor_carray,
"nr_groups": nr_groups, "skip_key": skip_key,
"agg_method": agg_method}
results.append(pool.apply_async(self.helper_agg_groups_by_iter,
kwds=kwds))

total.append(r)
for r in results:
total.append(r.get())

# TODO: fix ugly fix?
if bool_arr is not None:
Expand Down Expand Up @@ -207,15 +226,28 @@ def groupby(self, groupby_cols, agg_list, bool_arr=None, rootdir=None,
self.create_agg_ctable(groupby_cols, agg_list, nr_groups, rootdir)

# perform aggregation
self.aggregate_groups_by_iter_2(ct_agg, nr_groups, skip_key,
factor_carray, groupby_cols,
agg_ops,
bool_arr= bool_arr,
agg_method=_agg_method)
self.agg_groups_by_iter(ct_agg, nr_groups, skip_key, factor_carray,
groupby_cols, agg_ops, bool_arr= bool_arr,
agg_method=_agg_method)

return ct_agg

# groupby helper functions
def helper_factorize_groupby_cols(self, col):
if self.cache_valid(col):
col_rootdir = self[col].rootdir
col_factor_rootdir = col_rootdir + '.factor'
col_values_rootdir = col_rootdir + '.values'
col_factor_carray = bcolz.carray(rootdir=col_factor_rootdir,
mode='r')
col_values_carray = bcolz.carray(rootdir=col_values_rootdir,
mode='r')
else:
col_factor_carray, values = ctable_ext.factorize(self[col])
col_values_carray = \
bcolz.carray(values.values(), dtype=self[col].dtype)
return col_factor_carray, col_values_carray

def factorize_groupby_cols(self, groupby_cols):
"""

Expand All @@ -225,23 +257,19 @@ def factorize_groupby_cols(self, groupby_cols):
# unless we need to refresh the cache
factor_list = []
values_list = []
results = []

# factorize the groupby columns
for col in groupby_cols:
pool = ThreadPool(processes=NUM_PROC)

if self.cache_valid(col):
col_rootdir = self[col].rootdir
col_factor_rootdir = col_rootdir + '.factor'
col_values_rootdir = col_rootdir + '.values'
col_factor_carray = \
bcolz.carray(rootdir=col_factor_rootdir, mode='r')
col_values_carray = \
bcolz.carray(rootdir=col_values_rootdir, mode='r')
else:
col_factor_carray, values = ctable_ext.factorize(self[col])
col_values_carray = \
bcolz.carray(values.values(), dtype=self[col].dtype)
for col in groupby_cols:
results.append(pool.apply_async(self.helper_factorize_groupby_cols,
args=(col, )))

for r in results:
_r = r.get()
col_factor_carray = _r[0]
col_values_carray = _r[1]
factor_list.append(col_factor_carray)
values_list.append(col_values_carray)

Expand Down
Loading