diff --git a/bquery/ctable.py b/bquery/ctable.py index 65f893b..295315b 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -2,6 +2,8 @@ import ctable_ext # external imports +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count import numpy as np import bcolz from collections import namedtuple @@ -10,6 +12,9 @@ SUM, COUNT, COUNT_NA, COUNT_DISTINCT, SORTED_COUNT_DISTINCT +NUM_PROC = cpu_count() + + class ctable(bcolz.ctable): def cache_valid(self, col): """ @@ -110,35 +115,49 @@ def unique(self, col_or_col_list): return output - def aggregate_groups_by_iter_2(self, ct_agg, nr_groups, skip_key, - factor_carray, groupby_cols, output_agg_ops, - bool_arr=None, - agg_method=ctable_ext.SUM): + def helper_agg_groups_by_iter(self, ct_agg=None, col=None, factor_carray=None, nr_groups=None, + skip_key=None, agg_method=None): + # TODO: input vs output column + col_dtype = ct_agg[col].dtype + + if col_dtype == np.float64: + r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + elif col_dtype == np.int64: + r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + elif col_dtype == np.int32: + r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + else: + raise NotImplementedError( + 'Column dtype ({0}) not supported for aggregation yet ' + '(only int32, int64 & float64)'.format(str(col_dtype))) + + return r + + def agg_groups_by_iter(self, ct_agg, nr_groups, skip_key, + factor_carray, groupby_cols, output_agg_ops, + bool_arr=None, + agg_method=ctable_ext.SUM): total = [] + results = [] + pool = ThreadPool(processes=NUM_PROC) for col in groupby_cols: total.append(ctable_ext.groupby_value(self[col], factor_carray, nr_groups, skip_key)) for col, agg_op in output_agg_ops: - # TODO: input vs output column - col_dtype = ct_agg[col].dtype - - if col_dtype == np.float64: - r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - elif col_dtype == np.int64: - r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - elif col_dtype == np.int32: - r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - else: - raise NotImplementedError( - 'Column dtype ({0}) not supported for aggregation yet ' - '(only int32, int64 & float64)'.format(str(col_dtype))) + kwds = {"ct_agg": ct_agg, "col": col, + "factor_carray": factor_carray, + "nr_groups": nr_groups, "skip_key": skip_key, + "agg_method": agg_method} + results.append(pool.apply_async(self.helper_agg_groups_by_iter, + kwds=kwds)) - total.append(r) + for r in results: + total.append(r.get()) # TODO: fix ugly fix? if bool_arr is not None: @@ -207,15 +226,28 @@ def groupby(self, groupby_cols, agg_list, bool_arr=None, rootdir=None, self.create_agg_ctable(groupby_cols, agg_list, nr_groups, rootdir) # perform aggregation - self.aggregate_groups_by_iter_2(ct_agg, nr_groups, skip_key, - factor_carray, groupby_cols, - agg_ops, - bool_arr= bool_arr, - agg_method=_agg_method) + self.agg_groups_by_iter(ct_agg, nr_groups, skip_key, factor_carray, + groupby_cols, agg_ops, bool_arr= bool_arr, + agg_method=_agg_method) return ct_agg # groupby helper functions + def helper_factorize_groupby_cols(self, col): + if self.cache_valid(col): + col_rootdir = self[col].rootdir + col_factor_rootdir = col_rootdir + '.factor' + col_values_rootdir = col_rootdir + '.values' + col_factor_carray = bcolz.carray(rootdir=col_factor_rootdir, + mode='r') + col_values_carray = bcolz.carray(rootdir=col_values_rootdir, + mode='r') + else: + col_factor_carray, values = ctable_ext.factorize(self[col]) + col_values_carray = \ + bcolz.carray(values.values(), dtype=self[col].dtype) + return col_factor_carray, col_values_carray + def factorize_groupby_cols(self, groupby_cols): """ @@ -225,23 +257,19 @@ def factorize_groupby_cols(self, groupby_cols): # unless we need to refresh the cache factor_list = [] values_list = [] + results = [] # factorize the groupby columns - for col in groupby_cols: + pool = ThreadPool(processes=NUM_PROC) - if self.cache_valid(col): - col_rootdir = self[col].rootdir - col_factor_rootdir = col_rootdir + '.factor' - col_values_rootdir = col_rootdir + '.values' - col_factor_carray = \ - bcolz.carray(rootdir=col_factor_rootdir, mode='r') - col_values_carray = \ - bcolz.carray(rootdir=col_values_rootdir, mode='r') - else: - col_factor_carray, values = ctable_ext.factorize(self[col]) - col_values_carray = \ - bcolz.carray(values.values(), dtype=self[col].dtype) + for col in groupby_cols: + results.append(pool.apply_async(self.helper_factorize_groupby_cols, + args=(col, ))) + for r in results: + _r = r.get() + col_factor_carray = _r[0] + col_values_carray = _r[1] factor_list.append(col_factor_carray) values_list.append(col_values_carray) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 38666f8..8be6686 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -1,6 +1,6 @@ import numpy as np from numpy cimport ndarray, dtype, npy_intp, npy_int32, \ - npy_uint64, npy_int64, npy_float64, npy_bool + npy_uint64, npy_int64, npy_float64, npy_bool, uint64_t import cython import bcolz as bz @@ -10,6 +10,7 @@ import itertools as itt from libc.stdlib cimport malloc from libc.string cimport strcpy +from libcpp.vector cimport vector from khash cimport * # ---------------------------------------------------------------------------- @@ -74,8 +75,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +85,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +104,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) @@ -130,13 +113,11 @@ def factorize_str(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_int64_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_int64] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_int64[:] in_buffer, + uint64_t[:] out_buffer, kh_int64_t *table, Py_ssize_t * count, - dict reverse, - ): + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -154,7 +135,6 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, else: k = kh_put_int64(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element count[0] += 1 out_buffer[i] = idx @@ -162,55 +142,46 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer kh_int64_t *table + npy_int64[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int64') + out_buffer_view = out_buffer table = kh_init_int64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int64_helper(chunklen, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, - table, - &count, - reverse, - ) + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer + with nogil: + _factorize_int64_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) - + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_int64(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] kh_destroy_int64(table) return labels, reverse @@ -218,13 +189,11 @@ def factorize_int64(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_int32_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_int32] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_int32[:] in_buffer, + uint64_t[:] out_buffer, kh_int32_t *table, Py_ssize_t * count, - dict reverse, - ): + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -242,7 +211,6 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, else: k = kh_put_int32(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element count[0] += 1 out_buffer[i] = idx @@ -250,55 +218,46 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int32(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer kh_int32_t *table + npy_int32[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int32') + out_buffer_view = out_buffer table = kh_init_int32() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int32_helper(chunklen, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, - table, - &count, - reverse, - ) + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer + with nogil: + _factorize_int32_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int32_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) - + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_int32(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] kh_destroy_int32(table) return labels, reverse @@ -306,13 +265,11 @@ def factorize_int32(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_float64_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_float64] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_float64[:] in_buffer, + uint64_t[:] out_buffer, kh_float64_t *table, Py_ssize_t * count, - dict reverse, - ): + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -330,7 +287,6 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, else: k = kh_put_float64(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element count[0] += 1 out_buffer[i] = idx @@ -338,55 +294,46 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_float64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer kh_float64_t *table + npy_float64[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='float64') + out_buffer_view = out_buffer table = kh_init_float64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_float64_helper(chunklen, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, - table, - &count, - reverse, - ) + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer + with nogil: + _factorize_float64_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_float64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) - + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_float64(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] kh_destroy_float64(table) return labels, reverse @@ -409,32 +356,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) @@ -467,7 +402,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -484,22 +419,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data @@ -600,10 +523,9 @@ cdef count_unique_int32(ndarray[int32_t] values): cpdef sum_float64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_float64] in_buffer ndarray[npy_int64] factor_buffer @@ -617,6 +539,7 @@ cpdef sum_float64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -633,36 +556,22 @@ cpdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='float64') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -695,53 +604,6 @@ cpdef sum_float64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='float64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -753,10 +615,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, cpdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int32] in_buffer ndarray[npy_int64] factor_buffer @@ -770,6 +631,7 @@ cpdef sum_int32(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -786,36 +648,22 @@ cpdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int32') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -847,52 +695,6 @@ cpdef sum_int32(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int32') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -904,10 +706,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, cpdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int64] in_buffer ndarray[npy_int64] factor_buffer @@ -921,6 +722,7 @@ cpdef sum_int64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -937,36 +739,22 @@ cpdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int64') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -998,52 +786,6 @@ cpdef sum_int64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -1054,10 +796,9 @@ cpdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -1066,61 +807,25 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array - - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/khash.pxd b/bquery/khash.pxd index a8fd51a..de7b6a1 100644 --- a/bquery/khash.pxd +++ b/bquery/khash.pxd @@ -65,9 +65,9 @@ cdef extern from "khash_python.h": inline kh_int64_t* kh_init_int64() inline void kh_destroy_int64(kh_int64_t*) inline void kh_clear_int64(kh_int64_t*) - inline khint_t kh_get_int64(kh_int64_t*, int64_t) + inline khint_t kh_get_int64(kh_int64_t*, int64_t) nogil inline void kh_resize_int64(kh_int64_t*, khint_t) - inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*) + inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*) nogil inline void kh_del_int64(kh_int64_t*, khint_t) bint kh_exist_int64(kh_int64_t*, khiter_t) @@ -81,9 +81,9 @@ cdef extern from "khash_python.h": inline kh_float64_t* kh_init_float64() inline void kh_destroy_float64(kh_float64_t*) inline void kh_clear_float64(kh_float64_t*) - inline khint_t kh_get_float64(kh_float64_t*, float64_t) + inline khint_t kh_get_float64(kh_float64_t*, float64_t) nogil inline void kh_resize_float64(kh_float64_t*, khint_t) - inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*) + inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*) nogil inline void kh_del_float64(kh_float64_t*, khint_t) bint kh_exist_float64(kh_float64_t*, khiter_t) @@ -97,9 +97,9 @@ cdef extern from "khash_python.h": inline kh_int32_t* kh_init_int32() inline void kh_destroy_int32(kh_int32_t*) inline void kh_clear_int32(kh_int32_t*) - inline khint_t kh_get_int32(kh_int32_t*, int32_t) + inline khint_t kh_get_int32(kh_int32_t*, int32_t) nogil inline void kh_resize_int32(kh_int32_t*, khint_t) - inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*) + inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*) nogil inline void kh_del_int32(kh_int32_t*, khint_t) bint kh_exist_int32(kh_int32_t*, khiter_t) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 51e9e68..99a3a20 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -1,6 +1,6 @@ import numpy as np from numpy cimport ndarray, dtype, npy_intp, npy_int32, \ - npy_uint64, npy_int64, npy_float64, npy_bool + npy_uint64, npy_int64, npy_float64, npy_bool, uint64_t import cython import bcolz as bz @@ -10,6 +10,7 @@ import itertools as itt from libc.stdlib cimport malloc from libc.string cimport strcpy +from libcpp.vector cimport vector from khash cimport * # ---------------------------------------------------------------------------- @@ -74,8 +75,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +85,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +104,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) @@ -131,13 +114,11 @@ def factorize_str(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_{{ factor_type }}] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_{{ factor_type }}[:] in_buffer, + uint64_t[:] out_buffer, kh_{{ factor_type }}_t *table, Py_ssize_t * count, - dict reverse, - ): + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -155,7 +136,6 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, else: k = kh_put_{{ factor_type }}(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element count[0] += 1 out_buffer[i] = idx @@ -163,55 +143,46 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_{{ factor_type }}(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_{{ factor_type }}] in_buffer ndarray[npy_uint64] out_buffer kh_{{ factor_type }}_t *table + npy_{{ factor_type }}[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='{{ factor_type }}') + out_buffer_view = out_buffer table = kh_init_{{ factor_type }}() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_{{ factor_type }}_helper(chunklen, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, - table, - &count, - reverse, - ) + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer + with nogil: + _factorize_{{ factor_type }}_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_{{ factor_type }}_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) - + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) + + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_{{ factor_type }}(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] kh_destroy_{{ factor_type }}(table) return labels, reverse @@ -236,32 +207,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) @@ -294,7 +253,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -311,22 +270,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data @@ -385,10 +332,9 @@ cdef count_unique_{{ count_unique_type }}(ndarray[{{ count_unique_type }}_t] val cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_{{ sum_type }}] in_buffer ndarray[npy_int64] factor_buffer @@ -402,6 +348,7 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -418,36 +365,22 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='{{ sum_type }}') - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='{{ sum_type }}') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -484,58 +417,6 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: -{%- if sum_type == "float64" %} - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 -{%- else %} - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 -{%- endif %} - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='{{ sum_type }}') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -548,10 +429,9 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -560,61 +440,25 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array - - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/setup.py b/setup.py index fa4c243..1a02cca 100644 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ def check_import(pkgname, pkgver): ########### Project specific command line options ########### -class bquery_build_ext(build_ext): +class bquery_build_ext(build_ext): user_options = build_ext.user_options + \ [ ('from-templates', None, @@ -166,7 +166,8 @@ def run(self): library_dirs=lib_dirs, libraries=libs, extra_link_args=LFLAGS, - extra_compile_args=CFLAGS), + extra_compile_args=CFLAGS, + language='c++'), ], packages=['bquery', 'bquery.tests'], )