From cdf7c2aa53db7bffc1ed99bb0755d346f7b03184 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:01:34 +0100 Subject: [PATCH 1/8] iterblocks: factorize_str --- bquery/ctable_ext.pyx | 32 ++++++------------------ bquery/templates/ctable_ext.template.pyx | 32 ++++++------------------ 2 files changed, 14 insertions(+), 50 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 38666f8..3296e4e 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -74,8 +74,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +84,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +103,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 51e9e68..b80eb20 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -74,8 +74,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +84,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +103,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) From b61ef88b7a2ddeb31a21d97176980a4447430c45 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:09:07 +0100 Subject: [PATCH 2/8] iterblocks: factorize_xxx --- bquery/ctable_ext.pyx | 96 ++++++------------------ bquery/templates/ctable_ext.template.pyx | 32 ++------ 2 files changed, 28 insertions(+), 100 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 3296e4e..f51f99b 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -144,8 +144,7 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer @@ -155,20 +154,17 @@ def factorize_int64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int64') table = kh_init_int64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -177,21 +173,7 @@ def factorize_int64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int64(table) @@ -232,8 +214,7 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int32(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer @@ -243,20 +224,17 @@ def factorize_int32(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int32') table = kh_init_int32() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int32_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int32_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -265,21 +243,7 @@ def factorize_int32(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int32_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int32(table) @@ -320,8 +284,7 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_float64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer @@ -331,20 +294,17 @@ def factorize_float64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='float64') table = kh_init_float64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_float64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_float64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -353,21 +313,7 @@ def factorize_float64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_float64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_float64(table) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index b80eb20..eff5434 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -145,8 +145,7 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_{{ factor_type }}(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_{{ factor_type }}] in_buffer ndarray[npy_uint64] out_buffer @@ -156,20 +155,17 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='{{ factor_type }}') table = kh_init_{{ factor_type }}() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_{{ factor_type }}_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_{{ factor_type }}_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -178,21 +174,7 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_{{ factor_type }}_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_{{ factor_type }}(table) From 873acbc7f9464df0cdb6117204ca8d63a24a4e9f Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:15:03 +0100 Subject: [PATCH 3/8] iterblocks: translate_int64 --- bquery/ctable_ext.pyx | 22 +++++----------------- bquery/templates/ctable_ext.template.pyx | 22 +++++----------------- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index f51f99b..9d3ad50 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -337,32 +337,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index eff5434..5657b62 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -200,32 +200,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) From 905af17b58836cb835a3fcd7bf0f9ee9934a708e Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:21:11 +0100 Subject: [PATCH 4/8] iterblocks: groupsort_indexer --- bquery/ctable_ext.pyx | 20 ++++---------------- bquery/templates/ctable_ext.template.pyx | 20 ++++---------------- 2 files changed, 8 insertions(+), 32 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 9d3ad50..deaf81c 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -383,7 +383,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -400,22 +400,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 5657b62..233ec7d 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -246,7 +246,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -263,22 +263,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data From 29a6ebfea6199e6eedad7f5f84f7f7c3b71ced17 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:30:34 +0100 Subject: [PATCH 5/8] iterblocks: sum_xxx (1st step) --- bquery/ctable_ext.pyx | 181 ++--------------------- bquery/templates/ctable_ext.template.pyx | 66 +-------- 2 files changed, 20 insertions(+), 227 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index deaf81c..64e5122 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -504,8 +504,8 @@ cdef count_unique_int32(ndarray[int32_t] values): cpdef sum_float64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -537,8 +537,6 @@ cpdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='float64') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -551,13 +549,11 @@ cpdef sum_float64(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -599,53 +595,6 @@ cpdef sum_float64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='float64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -657,8 +606,8 @@ cpdef sum_float64(carray ca_input, carray ca_factor, cpdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -690,8 +639,6 @@ cpdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int32') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -704,13 +651,11 @@ cpdef sum_int32(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -751,52 +696,6 @@ cpdef sum_int32(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int32') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -808,8 +707,8 @@ cpdef sum_int32(carray ca_input, carray ca_factor, cpdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -841,8 +740,6 @@ cpdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int64') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -855,13 +752,11 @@ cpdef sum_int64(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -902,52 +797,6 @@ cpdef sum_int64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 233ec7d..a3f36cd 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -325,8 +325,8 @@ cdef count_unique_{{ count_unique_type }}(ndarray[{{ count_unique_type }}_t] val cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -358,8 +358,6 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='{{ sum_type }}') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -372,13 +370,11 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='{{ sum_type }}') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -424,58 +420,6 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: -{%- if sum_type == "float64" %} - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 -{%- else %} - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 -{%- endif %} - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='{{ sum_type }}') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) From 3cdf7372845a221ecf7fe84a4d44277336e26128 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:49:50 +0100 Subject: [PATCH 6/8] iterblocks: sum_xxx (2nd step) --- bquery/ctable_ext.pyx | 78 ++++++++---------------- bquery/templates/ctable_ext.template.pyx | 26 +++----- 2 files changed, 32 insertions(+), 72 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 64e5122..56984c6 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -504,10 +504,9 @@ cdef count_unique_int32(ndarray[int32_t] values): cpdef sum_float64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_float64] in_buffer ndarray[npy_int64] factor_buffer @@ -521,6 +520,7 @@ cpdef sum_float64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -537,15 +537,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') @@ -556,13 +550,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -606,10 +596,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, cpdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int32] in_buffer ndarray[npy_int64] factor_buffer @@ -623,6 +612,7 @@ cpdef sum_int32(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -639,15 +629,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') @@ -658,13 +642,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -707,10 +687,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, cpdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int64] in_buffer ndarray[npy_int64] factor_buffer @@ -724,6 +703,7 @@ cpdef sum_int64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -740,15 +720,9 @@ cpdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') @@ -759,13 +733,9 @@ cpdef sum_int64(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index a3f36cd..97d5736 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -325,10 +325,9 @@ cdef count_unique_{{ count_unique_type }}(ndarray[{{ count_unique_type }}_t] val cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_{{ sum_type }}] in_buffer ndarray[npy_int64] factor_buffer @@ -342,6 +341,7 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -358,15 +358,9 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='{{ sum_type }}') @@ -377,13 +371,9 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index From 5befa23ea369683d0a8021b4c0cabf656c2fe622 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:55:37 +0100 Subject: [PATCH 7/8] iterblocks: groupby_value (1st step) --- bquery/ctable_ext.pyx | 41 ++++-------------------- bquery/templates/ctable_ext.template.pyx | 41 ++++-------------------- 2 files changed, 12 insertions(+), 70 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 56984c6..ee2131c 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -777,10 +777,10 @@ cpdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -790,8 +790,6 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ ret = 0 reverse = {} - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -804,37 +802,10 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(leftover_elements): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 97d5736..5621bf9 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -422,10 +422,10 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -435,8 +435,6 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ ret = 0 reverse = {} - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -449,37 +447,10 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(leftover_elements): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: From 0ad69ea5198764c7cf7647e51c85d022ed82c9e4 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 12:11:25 +0100 Subject: [PATCH 8/8] iterblocks: groupby_value (2nd step) --- bquery/ctable_ext.pyx | 24 ++++++++---------------- bquery/templates/ctable_ext.template.pyx | 24 ++++++++---------------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index ee2131c..acdcebc 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -777,9 +777,8 @@ cpdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer @@ -789,16 +788,13 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) @@ -808,13 +804,9 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 5621bf9..34b81b9 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -422,9 +422,8 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer @@ -434,16 +433,13 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) @@ -453,13 +449,9 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index