From cdf7c2aa53db7bffc1ed99bb0755d346f7b03184 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:01:34 +0100 Subject: [PATCH 01/17] iterblocks: factorize_str --- bquery/ctable_ext.pyx | 32 ++++++------------------ bquery/templates/ctable_ext.template.pyx | 32 ++++++------------------ 2 files changed, 14 insertions(+), 50 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 38666f8..3296e4e 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -74,8 +74,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +84,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +103,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 51e9e68..b80eb20 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -74,8 +74,7 @@ cdef void _factorize_str_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_str(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray in_buffer ndarray[npy_uint64] out_buffer @@ -85,20 +84,17 @@ def factorize_str(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype=carray_.dtype) table = kh_init_str() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_str_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_str_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -107,21 +103,7 @@ def factorize_str(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_str_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_str(table) From b61ef88b7a2ddeb31a21d97176980a4447430c45 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:09:07 +0100 Subject: [PATCH 02/17] iterblocks: factorize_xxx --- bquery/ctable_ext.pyx | 96 ++++++------------------ bquery/templates/ctable_ext.template.pyx | 32 ++------ 2 files changed, 28 insertions(+), 100 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 3296e4e..f51f99b 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -144,8 +144,7 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer @@ -155,20 +154,17 @@ def factorize_int64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int64') table = kh_init_int64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -177,21 +173,7 @@ def factorize_int64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int64(table) @@ -232,8 +214,7 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int32(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer @@ -243,20 +224,17 @@ def factorize_int32(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='int32') table = kh_init_int32() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_int32_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_int32_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -265,21 +243,7 @@ def factorize_int32(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_int32_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int32(table) @@ -320,8 +284,7 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_float64(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer @@ -331,20 +294,17 @@ def factorize_float64(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='float64') table = kh_init_float64() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_float64_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_float64_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -353,21 +313,7 @@ def factorize_float64(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_float64_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_float64(table) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index b80eb20..eff5434 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -145,8 +145,7 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_{{ factor_type }}(carray carray_, carray labels=None): cdef: - chunk chunk_ - Py_ssize_t n, i, count, chunklen, leftover_elements + Py_ssize_t len_carray, count, chunklen, len_in_buffer dict reverse ndarray[npy_{{ factor_type }}] in_buffer ndarray[npy_uint64] out_buffer @@ -156,20 +155,17 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): ret = 0 reverse = {} - n = len(carray_) + len_carray = len(carray_) chunklen = carray_.chunklen if labels is None: - labels = carray([], dtype='int64', expectedlen=n) + labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') - in_buffer = np.empty(chunklen, dtype='{{ factor_type }}') table = kh_init_{{ factor_type }}() - for i in range(carray_.nchunks): - chunk_ = carray_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - _factorize_{{ factor_type }}_helper(chunklen, + for in_buffer in bz.iterblocks(carray_): + len_in_buffer = len(in_buffer) + _factorize_{{ factor_type }}_helper(len_in_buffer, carray_.dtype.itemsize + 1, in_buffer, out_buffer, @@ -178,21 +174,7 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): reverse, ) # compress out_buffer into labels - labels.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize) - if leftover_elements > 0: - _factorize_{{ factor_type }}_helper(leftover_elements, - carray_.dtype.itemsize + 1, - carray_.leftover_array, - out_buffer, - table, - &count, - reverse, - ) - - # compress out_buffer into labels - labels.append(out_buffer[:leftover_elements].astype(np.int64)) + labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_{{ factor_type }}(table) From 873acbc7f9464df0cdb6117204ca8d63a24a4e9f Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:15:03 +0100 Subject: [PATCH 03/17] iterblocks: translate_int64 --- bquery/ctable_ext.pyx | 22 +++++----------------- bquery/templates/ctable_ext.template.pyx | 22 +++++----------------- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index f51f99b..9d3ad50 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -337,32 +337,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index eff5434..5657b62 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -200,32 +200,20 @@ cpdef factorize(carray carray_, carray labels=None): @cython.boundscheck(False) cpdef translate_int64(carray input_, carray output_, dict lookup, npy_int64 default=-1): cdef: - chunk chunk_ - Py_ssize_t i, chunklen, leftover_elements + Py_ssize_t chunklen, leftover_elements, len_in_buffer ndarray[npy_int64] in_buffer ndarray[npy_int64] out_buffer chunklen = input_.chunklen out_buffer = np.empty(chunklen, dtype='int64') - in_buffer = np.empty(chunklen, dtype='int64') - for i in range(input_.nchunks): - chunk_ = input_.chunks[i] - # decompress into in_buffer - chunk_._getitem(0, chunklen, in_buffer.data) - for i in range(chunklen): + for in_buffer in bz.iterblocks(input_): + len_in_buffer = len(in_buffer) + for i in range(len_in_buffer): element = in_buffer[i] out_buffer[i] = lookup.get(element, default) # compress out_buffer into labels - output_.append(out_buffer.astype(np.int64)) - - leftover_elements = cython.cdiv(input_.leftover, input_.atomsize) - if leftover_elements > 0: - in_buffer = input_.leftover_array - for i in range(leftover_elements): - element = in_buffer[i] - out_buffer[i] = lookup.get(element, default) - output_.append(out_buffer[:leftover_elements].astype(np.int64)) + output_.append(out_buffer[:len_in_buffer].astype(np.int64)) # --------------------------------------------------------------------------- # Aggregation Section (old) From 905af17b58836cb835a3fcd7bf0f9ee9934a708e Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:21:11 +0100 Subject: [PATCH 04/17] iterblocks: groupsort_indexer --- bquery/ctable_ext.pyx | 20 ++++---------------- bquery/templates/ctable_ext.template.pyx | 20 ++++---------------- 2 files changed, 8 insertions(+), 32 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 9d3ad50..deaf81c 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -383,7 +383,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -400,22 +400,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 5657b62..233ec7d 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -246,7 +246,7 @@ def agg_sum(iter_): @cython.wraparound(False) def groupsort_indexer(carray index, Py_ssize_t ngroups): cdef: - Py_ssize_t i, label, n + Py_ssize_t i, label, n, len_in_buffer ndarray[int64_t] counts, where, np_result # -- carray c_result @@ -263,22 +263,10 @@ def groupsort_indexer(carray index, Py_ssize_t ngroups): counts = np.zeros(ngroups + 1, dtype=np.int64) n = len(index) - for index_chunk_nr in range(index.nchunks): - # fill input buffer - input_chunk = index.chunks[index_chunk_nr] - input_chunk._getitem(0, index_chunk_len, in_buffer.data) - - # loop through rows - for i in range(index_chunk_len): - counts[index[i] + 1] += 1 - - leftover_elements = cython.cdiv(index.leftover, index.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = index.leftover_array - + for in_buffer in bz.iterblocks(index): + len_in_buffer = len(in_buffer) # loop through rows - for i in range(leftover_elements): + for i in range(len_in_buffer): counts[index[i] + 1] += 1 # mark the start of each contiguous group of like-indexed data From 29a6ebfea6199e6eedad7f5f84f7f7c3b71ced17 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:30:34 +0100 Subject: [PATCH 05/17] iterblocks: sum_xxx (1st step) --- bquery/ctable_ext.pyx | 181 ++--------------------- bquery/templates/ctable_ext.template.pyx | 66 +-------- 2 files changed, 20 insertions(+), 227 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index deaf81c..64e5122 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -504,8 +504,8 @@ cdef count_unique_int32(ndarray[int32_t] values): cpdef sum_float64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -537,8 +537,6 @@ cpdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='float64') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -551,13 +549,11 @@ cpdef sum_float64(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -599,53 +595,6 @@ cpdef sum_float64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='float64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -657,8 +606,8 @@ cpdef sum_float64(carray ca_input, carray ca_factor, cpdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -690,8 +639,6 @@ cpdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int32') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -704,13 +651,11 @@ cpdef sum_int32(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -751,52 +696,6 @@ cpdef sum_int32(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int32') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) @@ -808,8 +707,8 @@ cpdef sum_int32(carray ca_input, carray ca_factor, cpdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -841,8 +740,6 @@ cpdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='int64') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -855,13 +752,11 @@ cpdef sum_int64(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -902,52 +797,6 @@ cpdef sum_int64(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='int64') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 233ec7d..a3f36cd 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -325,8 +325,8 @@ cdef count_unique_{{ count_unique_type }}(ndarray[{{ count_unique_type }}_t] val cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements @@ -358,8 +358,6 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, return num_uniques - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype='{{ sum_type }}') factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -372,13 +370,11 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='{{ sum_type }}') - for input_chunk_nr in range(ca_input.nchunks): - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) # loop through rows - for i in range(input_chunk_len): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: @@ -424,58 +420,6 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, else: raise NotImplementedError('sumtype not supported') - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - # fill input buffer - in_buffer = ca_input.leftover_array - - # loop through rows - for i in range(leftover_elements): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - if agg_method == _SUM: - out_buffer[current_index] += in_buffer[i] - elif agg_method == _COUNT: - out_buffer[current_index] += 1 - elif agg_method == _COUNT_NA: -{%- if sum_type == "float64" %} - v = in_buffer[i] - if v == v: # skip NA values - out_buffer[current_index] += 1 -{%- else %} - # TODO: Warning: int does not support NA values, is this what we need? - out_buffer[current_index] += 1 -{%- endif %} - elif agg_method == _SORTED_COUNT_DISTINCT: - v = in_buffer[i] - if not count_distinct_started: - count_distinct_started = 1 - last_values = np.zeros(nr_groups, dtype='{{ sum_type }}') - last_values[0] = v - out_buffer[0] = 1 - else: - if v != last_values[current_index]: - out_buffer[current_index] += 1 - - last_values[current_index] = v - else: - raise NotImplementedError('sumtype not supported') - # check whether a row has to be removed if it was meant to be skipped if skip_key < nr_groups: np.delete(out_buffer, skip_key) From 3cdf7372845a221ecf7fe84a4d44277336e26128 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:49:50 +0100 Subject: [PATCH 06/17] iterblocks: sum_xxx (2nd step) --- bquery/ctable_ext.pyx | 78 ++++++++---------------- bquery/templates/ctable_ext.template.pyx | 26 +++----- 2 files changed, 32 insertions(+), 72 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 64e5122..56984c6 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -504,10 +504,9 @@ cdef count_unique_int32(ndarray[int32_t] values): cpdef sum_float64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_float64] in_buffer ndarray[npy_int64] factor_buffer @@ -521,6 +520,7 @@ cpdef sum_float64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -537,15 +537,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='float64') @@ -556,13 +550,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -606,10 +596,9 @@ cpdef sum_float64(carray ca_input, carray ca_factor, cpdef sum_int32(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int32] in_buffer ndarray[npy_int64] factor_buffer @@ -623,6 +612,7 @@ cpdef sum_int32(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -639,15 +629,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int32') @@ -658,13 +642,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index @@ -707,10 +687,9 @@ cpdef sum_int32(carray ca_input, carray ca_factor, cpdef sum_int64(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_int64] in_buffer ndarray[npy_int64] factor_buffer @@ -724,6 +703,7 @@ cpdef sum_int64(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -740,15 +720,9 @@ cpdef sum_int64(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='int64') @@ -759,13 +733,9 @@ cpdef sum_int64(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index a3f36cd..97d5736 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -325,10 +325,9 @@ cdef count_unique_{{ count_unique_type }}(ndarray[{{ count_unique_type }}_t] val cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key, agg_method=_SUM): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, j, end_counts, start_counts, factor_total_chunks, leftover_elements + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row + Py_ssize_t current_index, i, j, end_counts, start_counts ndarray[npy_{{ sum_type }}] in_buffer ndarray[npy_int64] factor_buffer @@ -342,6 +341,7 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) if agg_method == _COUNT_DISTINCT: num_uniques = carray([], dtype='int64') @@ -358,15 +358,9 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, return num_uniques - factor_chunk_len = ca_factor.chunklen - factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype='{{ sum_type }}') @@ -377,13 +371,9 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index From 5befa23ea369683d0a8021b4c0cabf656c2fe622 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 11:55:37 +0100 Subject: [PATCH 07/17] iterblocks: groupby_value (1st step) --- bquery/ctable_ext.pyx | 41 ++++-------------------- bquery/templates/ctable_ext.template.pyx | 41 ++++-------------------- 2 files changed, 12 insertions(+), 70 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index 56984c6..ee2131c 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -777,10 +777,10 @@ cpdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -790,8 +790,6 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ ret = 0 reverse = {} - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -804,37 +802,10 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(leftover_elements): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 97d5736..5621bf9 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -422,10 +422,10 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk input_chunk, factor_chunk - Py_ssize_t input_chunk_nr, input_chunk_len + chunk factor_chunk + Py_ssize_t in_buffer_len Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row - Py_ssize_t current_index, i, factor_total_chunks, leftover_elements + Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer ndarray[npy_int64] factor_buffer @@ -435,8 +435,6 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ ret = 0 reverse = {} - input_chunk_len = ca_input.chunklen - in_buffer = np.empty(input_chunk_len, dtype=ca_input.dtype) factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 @@ -449,37 +447,10 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) - for input_chunk_nr in range(ca_input.nchunks): - - # fill input buffer - input_chunk = ca_input.chunks[input_chunk_nr] - input_chunk._getitem(0, input_chunk_len, in_buffer.data) - - for i in range(input_chunk_len): - - # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: - factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array - factor_chunk_row = 0 - - # retrieve index - current_index = factor_buffer[factor_chunk_row] - factor_chunk_row += 1 - - # update value if it's not an invalid index - if current_index != skip_key: - out_buffer[current_index] = in_buffer[i] - - leftover_elements = cython.cdiv(ca_input.leftover, ca_input.atomsize) - if leftover_elements > 0: - in_buffer = ca_input.leftover_array + for in_buffer in bz.iterblocks(ca_input): + in_buffer_len = len(in_buffer) - for i in range(leftover_elements): + for i in range(in_buffer_len): # go to next factor buffer if necessary if factor_chunk_row == factor_chunk_len: From 0ad69ea5198764c7cf7647e51c85d022ed82c9e4 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 10 Mar 2015 12:11:25 +0100 Subject: [PATCH 08/17] iterblocks: groupby_value (2nd step) --- bquery/ctable_ext.pyx | 24 ++++++++---------------- bquery/templates/ctable_ext.template.pyx | 24 ++++++++---------------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index ee2131c..acdcebc 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -777,9 +777,8 @@ cpdef sum_int64(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer @@ -789,16 +788,13 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) @@ -808,13 +804,9 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 5621bf9..34b81b9 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -422,9 +422,8 @@ cpdef sum_{{ sum_type }}(carray ca_input, carray ca_factor, @cython.boundscheck(False) cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ssize_t skip_key): cdef: - chunk factor_chunk - Py_ssize_t in_buffer_len - Py_ssize_t factor_chunk_nr, factor_chunk_len, factor_chunk_row + Py_ssize_t in_buffer_len, factor_buffer_len + Py_ssize_t factor_chunk_nr, factor_chunk_row Py_ssize_t current_index, i, factor_total_chunks ndarray in_buffer @@ -434,16 +433,13 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ count = 0 ret = 0 reverse = {} + iter_ca_factor = bz.iterblocks(ca_factor) + - factor_chunk_len = ca_factor.chunklen factor_total_chunks = ca_factor.nchunks factor_chunk_nr = 0 - factor_buffer = np.empty(factor_chunk_len, dtype='int64') - if factor_total_chunks > 0: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() + factor_buffer_len = len(factor_buffer) factor_chunk_row = 0 out_buffer = np.zeros(nr_groups, dtype=ca_input.dtype) @@ -453,13 +449,9 @@ cpdef groupby_value(carray ca_input, carray ca_factor, Py_ssize_t nr_groups, Py_ for i in range(in_buffer_len): # go to next factor buffer if necessary - if factor_chunk_row == factor_chunk_len: + if factor_chunk_row == factor_buffer_len: factor_chunk_nr += 1 - if factor_chunk_nr < factor_total_chunks: - factor_chunk = ca_factor.chunks[factor_chunk_nr] - factor_chunk._getitem(0, factor_chunk_len, factor_buffer.data) - else: - factor_buffer = ca_factor.leftover_array + factor_buffer = iter_ca_factor.next() factor_chunk_row = 0 # retrieve index From 6fa57ed2f34e049b1f319893b022503d0503f92f Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Mon, 18 May 2015 19:40:15 +0200 Subject: [PATCH 09/17] cpu_count --- bquery/ctable.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bquery/ctable.py b/bquery/ctable.py index 65f893b..9028192 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -2,6 +2,8 @@ import ctable_ext # external imports +from multiprocessing.pool import ThreadPool +from multiprocessing import cpu_count import numpy as np import bcolz from collections import namedtuple @@ -10,6 +12,9 @@ SUM, COUNT, COUNT_NA, COUNT_DISTINCT, SORTED_COUNT_DISTINCT +NUM_PROC = cpu_count() + + class ctable(bcolz.ctable): def cache_valid(self, col): """ From e0f2fef6b3a9e2ed868acfddcbe0f065064ff762 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 12 May 2015 10:05:29 +0200 Subject: [PATCH 10/17] [Threads] agg_groups_by_iter --- bquery/ctable.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/bquery/ctable.py b/bquery/ctable.py index 9028192..ef59de5 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -115,7 +115,7 @@ def unique(self, col_or_col_list): return output - def aggregate_groups_by_iter_2(self, ct_agg, nr_groups, skip_key, + def agg_groups_by_iter(self, ct_agg, nr_groups, skip_key, factor_carray, groupby_cols, output_agg_ops, bool_arr=None, agg_method=ctable_ext.SUM): @@ -212,11 +212,9 @@ def groupby(self, groupby_cols, agg_list, bool_arr=None, rootdir=None, self.create_agg_ctable(groupby_cols, agg_list, nr_groups, rootdir) # perform aggregation - self.aggregate_groups_by_iter_2(ct_agg, nr_groups, skip_key, - factor_carray, groupby_cols, - agg_ops, - bool_arr= bool_arr, - agg_method=_agg_method) + self.agg_groups_by_iter(ct_agg, nr_groups, skip_key, factor_carray, + groupby_cols, agg_ops, bool_arr= bool_arr, + agg_method=_agg_method) return ct_agg From c539a50015da710f1b5b45f436587414c2d22388 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Mon, 18 May 2015 19:18:05 +0200 Subject: [PATCH 11/17] [Threads] agg_groups_by_iter --- bquery/ctable.py | 56 ++++++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/bquery/ctable.py b/bquery/ctable.py index ef59de5..73c0d83 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -115,35 +115,49 @@ def unique(self, col_or_col_list): return output + def helper_agg_groups_by_iter(self, ct_agg=None, col=None, factor_carray=None, nr_groups=None, + skip_key=None, agg_method=None): + # TODO: input vs output column + col_dtype = ct_agg[col].dtype + + if col_dtype == np.float64: + r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + elif col_dtype == np.int64: + r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + elif col_dtype == np.int32: + r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups, + skip_key, agg_method=agg_method) + else: + raise NotImplementedError( + 'Column dtype ({0}) not supported for aggregation yet ' + '(only int32, int64 & float64)'.format(str(col_dtype))) + + return r + def agg_groups_by_iter(self, ct_agg, nr_groups, skip_key, - factor_carray, groupby_cols, output_agg_ops, - bool_arr=None, - agg_method=ctable_ext.SUM): + factor_carray, groupby_cols, output_agg_ops, + bool_arr=None, + agg_method=ctable_ext.SUM): total = [] + results = [] + pool = ThreadPool(processes=NUM_PROC) for col in groupby_cols: total.append(ctable_ext.groupby_value(self[col], factor_carray, nr_groups, skip_key)) for col, agg_op in output_agg_ops: - # TODO: input vs output column - col_dtype = ct_agg[col].dtype - - if col_dtype == np.float64: - r = ctable_ext.sum_float64(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - elif col_dtype == np.int64: - r = ctable_ext.sum_int64(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - elif col_dtype == np.int32: - r = ctable_ext.sum_int32(self[col], factor_carray, nr_groups, - skip_key, agg_method=agg_method) - else: - raise NotImplementedError( - 'Column dtype ({0}) not supported for aggregation yet ' - '(only int32, int64 & float64)'.format(str(col_dtype))) - - total.append(r) + kwds = {"ct_agg": ct_agg, "col": col, + "factor_carray": factor_carray, + "nr_groups": nr_groups, "skip_key": skip_key, + "agg_method": agg_method} + results.append(pool.apply_async(self.helper_agg_groups_by_iter, + kwds=kwds)) + + for r in results: + total.append(r.get()) # TODO: fix ugly fix? if bool_arr is not None: From 12223ea3600a0421e7f8bb4a8aba992ec773a239 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Mon, 18 May 2015 19:37:25 +0200 Subject: [PATCH 12/17] [Threads] factorize_groupby_cols --- bquery/ctable.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/bquery/ctable.py b/bquery/ctable.py index 73c0d83..295315b 100644 --- a/bquery/ctable.py +++ b/bquery/ctable.py @@ -233,6 +233,21 @@ def groupby(self, groupby_cols, agg_list, bool_arr=None, rootdir=None, return ct_agg # groupby helper functions + def helper_factorize_groupby_cols(self, col): + if self.cache_valid(col): + col_rootdir = self[col].rootdir + col_factor_rootdir = col_rootdir + '.factor' + col_values_rootdir = col_rootdir + '.values' + col_factor_carray = bcolz.carray(rootdir=col_factor_rootdir, + mode='r') + col_values_carray = bcolz.carray(rootdir=col_values_rootdir, + mode='r') + else: + col_factor_carray, values = ctable_ext.factorize(self[col]) + col_values_carray = \ + bcolz.carray(values.values(), dtype=self[col].dtype) + return col_factor_carray, col_values_carray + def factorize_groupby_cols(self, groupby_cols): """ @@ -242,23 +257,19 @@ def factorize_groupby_cols(self, groupby_cols): # unless we need to refresh the cache factor_list = [] values_list = [] + results = [] # factorize the groupby columns - for col in groupby_cols: + pool = ThreadPool(processes=NUM_PROC) - if self.cache_valid(col): - col_rootdir = self[col].rootdir - col_factor_rootdir = col_rootdir + '.factor' - col_values_rootdir = col_rootdir + '.values' - col_factor_carray = \ - bcolz.carray(rootdir=col_factor_rootdir, mode='r') - col_values_carray = \ - bcolz.carray(rootdir=col_values_rootdir, mode='r') - else: - col_factor_carray, values = ctable_ext.factorize(self[col]) - col_values_carray = \ - bcolz.carray(values.values(), dtype=self[col].dtype) + for col in groupby_cols: + results.append(pool.apply_async(self.helper_factorize_groupby_cols, + args=(col, ))) + for r in results: + _r = r.get() + col_factor_carray = _r[0] + col_values_carray = _r[1] factor_list.append(col_factor_carray) values_list.append(col_values_carray) From aef265800c052a2ca05fda9e46f5b2053807a8d7 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 19 May 2015 17:07:13 +0200 Subject: [PATCH 13/17] [overcome gil] khash --- bquery/khash.pxd | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bquery/khash.pxd b/bquery/khash.pxd index a8fd51a..de7b6a1 100644 --- a/bquery/khash.pxd +++ b/bquery/khash.pxd @@ -65,9 +65,9 @@ cdef extern from "khash_python.h": inline kh_int64_t* kh_init_int64() inline void kh_destroy_int64(kh_int64_t*) inline void kh_clear_int64(kh_int64_t*) - inline khint_t kh_get_int64(kh_int64_t*, int64_t) + inline khint_t kh_get_int64(kh_int64_t*, int64_t) nogil inline void kh_resize_int64(kh_int64_t*, khint_t) - inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*) + inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*) nogil inline void kh_del_int64(kh_int64_t*, khint_t) bint kh_exist_int64(kh_int64_t*, khiter_t) @@ -81,9 +81,9 @@ cdef extern from "khash_python.h": inline kh_float64_t* kh_init_float64() inline void kh_destroy_float64(kh_float64_t*) inline void kh_clear_float64(kh_float64_t*) - inline khint_t kh_get_float64(kh_float64_t*, float64_t) + inline khint_t kh_get_float64(kh_float64_t*, float64_t) nogil inline void kh_resize_float64(kh_float64_t*, khint_t) - inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*) + inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*) nogil inline void kh_del_float64(kh_float64_t*, khint_t) bint kh_exist_float64(kh_float64_t*, khiter_t) @@ -97,9 +97,9 @@ cdef extern from "khash_python.h": inline kh_int32_t* kh_init_int32() inline void kh_destroy_int32(kh_int32_t*) inline void kh_clear_int32(kh_int32_t*) - inline khint_t kh_get_int32(kh_int32_t*, int32_t) + inline khint_t kh_get_int32(kh_int32_t*, int32_t) nogil inline void kh_resize_int32(kh_int32_t*, khint_t) - inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*) + inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*) nogil inline void kh_del_int32(kh_int32_t*, khint_t) bint kh_exist_int32(kh_int32_t*, khiter_t) From ef38b8067c1bb9fb95607b26a725a9a1710bf2a0 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 19 May 2015 17:08:14 +0200 Subject: [PATCH 14/17] c++ stl --- bquery/templates/ctable_ext.template.pyx | 3 ++- setup.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 34b81b9..ac327d5 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -1,6 +1,6 @@ import numpy as np from numpy cimport ndarray, dtype, npy_intp, npy_int32, \ - npy_uint64, npy_int64, npy_float64, npy_bool + npy_uint64, npy_int64, npy_float64, npy_bool, uint64_t import cython import bcolz as bz @@ -10,6 +10,7 @@ import itertools as itt from libc.stdlib cimport malloc from libc.string cimport strcpy +from libcpp.vector cimport vector from khash cimport * # ---------------------------------------------------------------------------- diff --git a/setup.py b/setup.py index fa4c243..1a02cca 100644 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ def check_import(pkgname, pkgver): ########### Project specific command line options ########### -class bquery_build_ext(build_ext): +class bquery_build_ext(build_ext): user_options = build_ext.user_options + \ [ ('from-templates', None, @@ -166,7 +166,8 @@ def run(self): library_dirs=lib_dirs, libraries=libs, extra_link_args=LFLAGS, - extra_compile_args=CFLAGS), + extra_compile_args=CFLAGS, + language='c++'), ], packages=['bquery', 'bquery.tests'], ) From 357e2831127c04932d17312d9bf71944d52b0216 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Tue, 19 May 2015 17:09:34 +0200 Subject: [PATCH 15/17] [overcome gil] factorize --- bquery/ctable_ext.pyx | 93 +++++++++++++++--------- bquery/templates/ctable_ext.template.pyx | 30 +++++--- 2 files changed, 78 insertions(+), 45 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index acdcebc..fa89c14 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -1,6 +1,6 @@ import numpy as np from numpy cimport ndarray, dtype, npy_intp, npy_int32, \ - npy_uint64, npy_int64, npy_float64, npy_bool + npy_uint64, npy_int64, npy_float64, npy_bool, uint64_t import cython import bcolz as bz @@ -10,6 +10,7 @@ import itertools as itt from libc.stdlib cimport malloc from libc.string cimport strcpy +from libcpp.vector cimport vector from khash cimport * # ---------------------------------------------------------------------------- @@ -112,13 +113,12 @@ def factorize_str(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_int64_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_int64] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_int64[:] in_buffer, + uint64_t[:] out_buffer, kh_int64_t *table, Py_ssize_t * count, - dict reverse, - ): + vector[npy_int64] & reverse_values, + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -136,7 +136,7 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, else: k = kh_put_int64(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element + reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -144,11 +144,14 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int64(carray carray_, carray labels=None): cdef: - Py_ssize_t len_carray, count, chunklen, len_in_buffer + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer kh_int64_t *table + vector[npy_int64] reverse_values + npy_int64[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 @@ -160,35 +163,40 @@ def factorize_int64(carray carray_, carray labels=None): labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') + out_buffer_view = out_buffer table = kh_init_int64() for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer _factorize_int64_helper(len_in_buffer, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, + in_buffer_view, + out_buffer_view, table, &count, - reverse, + reverse_values ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int64(table) + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # # construct python dict from vectors + for i in range(reverse_values.size()): + reverse[i] = reverse_values[i] + return labels, reverse @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_int32_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_int32] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_int32[:] in_buffer, + uint64_t[:] out_buffer, kh_int32_t *table, Py_ssize_t * count, - dict reverse, - ): + vector[npy_int32] & reverse_values, + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -206,7 +214,7 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, else: k = kh_put_int32(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element + reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -214,11 +222,14 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_int32(carray carray_, carray labels=None): cdef: - Py_ssize_t len_carray, count, chunklen, len_in_buffer + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer kh_int32_t *table + vector[npy_int32] reverse_values + npy_int32[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 @@ -230,35 +241,40 @@ def factorize_int32(carray carray_, carray labels=None): labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') + out_buffer_view = out_buffer table = kh_init_int32() for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer _factorize_int32_helper(len_in_buffer, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, + in_buffer_view, + out_buffer_view, table, &count, - reverse, + reverse_values ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_int32(table) + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # # construct python dict from vectors + for i in range(reverse_values.size()): + reverse[i] = reverse_values[i] + return labels, reverse @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_float64_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_float64] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_float64[:] in_buffer, + uint64_t[:] out_buffer, kh_float64_t *table, Py_ssize_t * count, - dict reverse, - ): + vector[npy_float64] & reverse_values, + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -276,7 +292,7 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, else: k = kh_put_float64(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element + reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -284,11 +300,14 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_float64(carray carray_, carray labels=None): cdef: - Py_ssize_t len_carray, count, chunklen, len_in_buffer + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer kh_float64_t *table + vector[npy_float64] reverse_values + npy_float64[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 @@ -300,23 +319,29 @@ def factorize_float64(carray carray_, carray labels=None): labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') + out_buffer_view = out_buffer table = kh_init_float64() for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer _factorize_float64_helper(len_in_buffer, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, + in_buffer_view, + out_buffer_view, table, &count, - reverse, + reverse_values ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_float64(table) + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # # construct python dict from vectors + for i in range(reverse_values.size()): + reverse[i] = reverse_values[i] + return labels, reverse cpdef factorize(carray carray_, carray labels=None): diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index ac327d5..6db3740 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -114,13 +114,12 @@ def factorize_str(carray carray_, carray labels=None): @cython.wraparound(False) @cython.boundscheck(False) cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, - Py_ssize_t allocation_size, - ndarray[npy_{{ factor_type }}] in_buffer, - ndarray[npy_uint64] out_buffer, + npy_{{ factor_type }}[:] in_buffer, + uint64_t[:] out_buffer, kh_{{ factor_type }}_t *table, Py_ssize_t * count, - dict reverse, - ): + vector[npy_{{ factor_type }}] & reverse_values, + ) nogil: cdef: Py_ssize_t i, idx int ret @@ -138,7 +137,7 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, else: k = kh_put_{{ factor_type }}(table, element, &ret) table.vals[k] = idx = count[0] - reverse[count[0]] = element + reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -146,11 +145,14 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, @cython.boundscheck(False) def factorize_{{ factor_type }}(carray carray_, carray labels=None): cdef: - Py_ssize_t len_carray, count, chunklen, len_in_buffer + Py_ssize_t len_carray, count, chunklen, len_in_buffer, i dict reverse ndarray[npy_{{ factor_type }}] in_buffer ndarray[npy_uint64] out_buffer kh_{{ factor_type }}_t *table + vector[npy_{{ factor_type }}] reverse_values + npy_{{ factor_type }}[:] in_buffer_view + uint64_t[:] out_buffer_view count = 0 ret = 0 @@ -162,23 +164,29 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): labels = carray([], dtype='int64', expectedlen=len_carray) # in-buffer isn't typed, because cython doesn't support string arrays (?) out_buffer = np.empty(chunklen, dtype='uint64') + out_buffer_view = out_buffer table = kh_init_{{ factor_type }}() for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) + in_buffer_view = in_buffer _factorize_{{ factor_type }}_helper(len_in_buffer, - carray_.dtype.itemsize + 1, - in_buffer, - out_buffer, + in_buffer_view, + out_buffer_view, table, &count, - reverse, + reverse_values ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) kh_destroy_{{ factor_type }}(table) + # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 + # # construct python dict from vectors + for i in range(reverse_values.size()): + reverse[i] = reverse_values[i] + return labels, reverse {% endfor -%} From 2fb5d7a89531804d49689f53624ddc524a519fbf Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Thu, 21 May 2015 13:54:11 +0200 Subject: [PATCH 16/17] reverse values from hash table --- bquery/ctable_ext.pyx | 51 ++++++++++-------------- bquery/templates/ctable_ext.template.pyx | 17 ++++---- 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index fa89c14..cc92801 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -117,7 +117,6 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, uint64_t[:] out_buffer, kh_int64_t *table, Py_ssize_t * count, - vector[npy_int64] & reverse_values, ) nogil: cdef: Py_ssize_t i, idx @@ -136,7 +135,6 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range, else: k = kh_put_int64(table, element, &ret) table.vals[k] = idx = count[0] - reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -149,7 +147,6 @@ def factorize_int64(carray carray_, carray labels=None): ndarray[npy_int64] in_buffer ndarray[npy_uint64] out_buffer kh_int64_t *table - vector[npy_int64] reverse_values npy_int64[:] in_buffer_view uint64_t[:] out_buffer_view @@ -173,18 +170,18 @@ def factorize_int64(carray carray_, carray labels=None): in_buffer_view, out_buffer_view, table, - &count, - reverse_values + &count ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) - kh_destroy_int64(table) - # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 - # # construct python dict from vectors - for i in range(reverse_values.size()): - reverse[i] = reverse_values[i] + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_int64(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] + kh_destroy_int64(table) return labels, reverse @@ -195,7 +192,6 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, uint64_t[:] out_buffer, kh_int32_t *table, Py_ssize_t * count, - vector[npy_int32] & reverse_values, ) nogil: cdef: Py_ssize_t i, idx @@ -214,7 +210,6 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range, else: k = kh_put_int32(table, element, &ret) table.vals[k] = idx = count[0] - reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -227,7 +222,6 @@ def factorize_int32(carray carray_, carray labels=None): ndarray[npy_int32] in_buffer ndarray[npy_uint64] out_buffer kh_int32_t *table - vector[npy_int32] reverse_values npy_int32[:] in_buffer_view uint64_t[:] out_buffer_view @@ -251,18 +245,18 @@ def factorize_int32(carray carray_, carray labels=None): in_buffer_view, out_buffer_view, table, - &count, - reverse_values + &count ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) - kh_destroy_int32(table) - # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 - # # construct python dict from vectors - for i in range(reverse_values.size()): - reverse[i] = reverse_values[i] + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_int32(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] + kh_destroy_int32(table) return labels, reverse @@ -273,7 +267,6 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, uint64_t[:] out_buffer, kh_float64_t *table, Py_ssize_t * count, - vector[npy_float64] & reverse_values, ) nogil: cdef: Py_ssize_t i, idx @@ -292,7 +285,6 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range, else: k = kh_put_float64(table, element, &ret) table.vals[k] = idx = count[0] - reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -305,7 +297,6 @@ def factorize_float64(carray carray_, carray labels=None): ndarray[npy_float64] in_buffer ndarray[npy_uint64] out_buffer kh_float64_t *table - vector[npy_float64] reverse_values npy_float64[:] in_buffer_view uint64_t[:] out_buffer_view @@ -329,18 +320,18 @@ def factorize_float64(carray carray_, carray labels=None): in_buffer_view, out_buffer_view, table, - &count, - reverse_values + &count ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) - kh_destroy_float64(table) - # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 - # # construct python dict from vectors - for i in range(reverse_values.size()): - reverse[i] = reverse_values[i] + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_float64(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] + kh_destroy_float64(table) return labels, reverse diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index 6db3740..c7c018d 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -118,7 +118,6 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, uint64_t[:] out_buffer, kh_{{ factor_type }}_t *table, Py_ssize_t * count, - vector[npy_{{ factor_type }}] & reverse_values, ) nogil: cdef: Py_ssize_t i, idx @@ -137,7 +136,6 @@ cdef void _factorize_{{ factor_type }}_helper(Py_ssize_t iter_range, else: k = kh_put_{{ factor_type }}(table, element, &ret) table.vals[k] = idx = count[0] - reverse_values.push_back(element) count[0] += 1 out_buffer[i] = idx @@ -150,7 +148,6 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): ndarray[npy_{{ factor_type }}] in_buffer ndarray[npy_uint64] out_buffer kh_{{ factor_type }}_t *table - vector[npy_{{ factor_type }}] reverse_values npy_{{ factor_type }}[:] in_buffer_view uint64_t[:] out_buffer_view @@ -174,18 +171,18 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): in_buffer_view, out_buffer_view, table, - &count, - reverse_values + &count ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) - kh_destroy_{{ factor_type }}(table) - # TODO: many thanks https://github.com/visualfabriq/bquery/pull/21 - # # construct python dict from vectors - for i in range(reverse_values.size()): - reverse[i] = reverse_values[i] + # construct python dict from vectors and free element memory + for i in range(table.n_buckets): + if not kh_exist_{{ factor_type }}(table, i): # adjust function name to hash-table data-type + continue + reverse[table.vals[i]] = table.keys[i] + kh_destroy_{{ factor_type }}(table) return labels, reverse From 743ee4850df3afb3adf3829d9ef0e5d184bb2003 Mon Sep 17 00:00:00 2001 From: Francesc Elies Henar Date: Thu, 21 May 2015 17:37:42 +0200 Subject: [PATCH 17/17] nogil --- bquery/ctable_ext.pyx | 39 +++++++++++++----------- bquery/templates/ctable_ext.template.pyx | 13 ++++---- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/bquery/ctable_ext.pyx b/bquery/ctable_ext.pyx index cc92801..8be6686 100644 --- a/bquery/ctable_ext.pyx +++ b/bquery/ctable_ext.pyx @@ -166,12 +166,13 @@ def factorize_int64(carray carray_, carray labels=None): for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) in_buffer_view = in_buffer - _factorize_int64_helper(len_in_buffer, - in_buffer_view, - out_buffer_view, - table, - &count - ) + with nogil: + _factorize_int64_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) @@ -241,12 +242,13 @@ def factorize_int32(carray carray_, carray labels=None): for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) in_buffer_view = in_buffer - _factorize_int32_helper(len_in_buffer, - in_buffer_view, - out_buffer_view, - table, - &count - ) + with nogil: + _factorize_int32_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) @@ -316,12 +318,13 @@ def factorize_float64(carray carray_, carray labels=None): for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) in_buffer_view = in_buffer - _factorize_float64_helper(len_in_buffer, - in_buffer_view, - out_buffer_view, - table, - &count - ) + with nogil: + _factorize_float64_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64)) diff --git a/bquery/templates/ctable_ext.template.pyx b/bquery/templates/ctable_ext.template.pyx index c7c018d..99a3a20 100644 --- a/bquery/templates/ctable_ext.template.pyx +++ b/bquery/templates/ctable_ext.template.pyx @@ -167,12 +167,13 @@ def factorize_{{ factor_type }}(carray carray_, carray labels=None): for in_buffer in bz.iterblocks(carray_): len_in_buffer = len(in_buffer) in_buffer_view = in_buffer - _factorize_{{ factor_type }}_helper(len_in_buffer, - in_buffer_view, - out_buffer_view, - table, - &count - ) + with nogil: + _factorize_{{ factor_type }}_helper(len_in_buffer, + in_buffer_view, + out_buffer_view, + table, + &count + ) # compress out_buffer into labels labels.append(out_buffer[:len_in_buffer].astype(np.int64))