diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index 6c530a76e18..c738d13b054 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -174,6 +174,8 @@ add_arrow_compute_test(row_test EXTRA_LINK_LIBS arrow_compute_testing) +add_arrow_compute_benchmark(exec_benchmark) + add_arrow_compute_benchmark(function_benchmark) add_subdirectory(kernels) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 1be398fdae9..199e43a262f 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -50,6 +50,7 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/thread_pool.h" #include "arrow/util/vector.h" +#include "arrow/visit_data_inline.h" namespace arrow { @@ -359,6 +360,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize, have_all_scalars_ = CheckIfAllScalar(batch); promote_if_all_scalars_ = promote_if_all_scalars; position_ = 0; + selection_position_ = 0; length_ = batch.length; chunk_indexes_.clear(); chunk_indexes_.resize(args_->size(), 0); @@ -367,6 +369,12 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize, value_offsets_.clear(); value_offsets_.resize(args_->size(), 0); max_chunksize_ = std::min(length_, max_chunksize); + selection_vector_ = batch.selection_vector.get(); + if (selection_vector_) { + selection_length_ = selection_vector_->length(); + } else { + selection_length_ = 0; + } return Status::OK(); } @@ -403,7 +411,7 @@ int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* spa return iteration_size; } -bool ExecSpanIterator::Next(ExecSpan* span) { +bool ExecSpanIterator::Next(ExecSpan* span, SelectionVectorSpan* selection_span) { if (!initialized_) { span->length = 0; @@ -442,6 +450,13 @@ bool ExecSpanIterator::Next(ExecSpan* span) { PromoteExecSpanScalars(span); } + if (!have_all_scalars_ || promote_if_all_scalars_) { + if (selection_vector_) { + DCHECK_NE(selection_span, nullptr); + *selection_span = SelectionVectorSpan(selection_vector_->indices()); + } + } + initialized_ = true; } else if (position_ == length_) { // We've emitted at least one span and we're at the end so we are done @@ -465,6 +480,20 @@ bool ExecSpanIterator::Next(ExecSpan* span) { } } + // Then the selection span + if (selection_vector_) { + DCHECK_NE(selection_span, nullptr); + auto indices_begin = selection_vector_->indices() + selection_position_; + auto indices_end = selection_vector_->indices() + selection_vector_->length(); + DCHECK_LE(indices_begin, indices_end); + auto indices_limit = std::lower_bound( + indices_begin, indices_end, static_cast(position_ + iteration_size)); + int64_t num_indices = indices_limit - indices_begin; + selection_span->SetSlice(selection_position_, num_indices, + static_cast(position_)); + selection_position_ += num_indices; + } + position_ += iteration_size; DCHECK_LE(position_, length_); return true; @@ -694,7 +723,14 @@ std::shared_ptr ToChunkedArray(const std::vector& values, // Skip empty chunks continue; } - arrays.emplace_back(val.make_array()); + if (val.is_chunked_array()) { + for (const auto& chunk : val.chunked_array()->chunks()) { + arrays.emplace_back(chunk); + } + } else { + DCHECK(val.is_array()); + arrays.emplace_back(val.make_array()); + } } return std::make_shared(std::move(arrays), type.GetSharedPtr()); } @@ -781,17 +817,45 @@ class KernelExecutorImpl : public KernelExecutor { class ScalarExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); - if (batch.length == 0) { // For zero-length batches, we do nothing except return a zero-length // array of the correct output type ARROW_ASSIGN_OR_RAISE(std::shared_ptr result, MakeArrayOfNull(output_type_.GetSharedPtr(), /*length=*/0, exec_context()->memory_pool())); + RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); return EmitResult(result->data(), listener); } + if (batch.selection_vector && !kernel_->selective_exec) { + // If the batch contains a selection vector but the kernel does not support + // selective execution, we need to execute the batch in a "dense" manner. + return ExecuteSelectiveDense(batch, listener); + } + + return ExecuteBatch(batch, listener); + } + + Datum WrapResults(const std::vector& inputs, + const std::vector& outputs) override { + // If execution yielded multiple chunks (because large arrays were split + // based on the ExecContext parameters, then the result is a ChunkedArray + if (HaveChunkedArray(inputs) || outputs.size() > 1) { + return ToChunkedArray(outputs, output_type_); + } else { + // Outputs have just one element + return outputs[0]; + } + } + + protected: + // Execute a single batch either non-selectively (batch doesn't contain a selection + // vector) or selectively (kernel supports selective execution). + Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) { + DCHECK(!batch.selection_vector || kernel_->selective_exec); + + RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); + // If the executor is configured to produce a single large Array output for // kernels supporting preallocation, then we do so up front and then // iterate over slices of that large array. Otherwise, we preallocate prior @@ -811,19 +875,46 @@ class ScalarExecutor : public KernelExecutorImpl { } } - Datum WrapResults(const std::vector& inputs, - const std::vector& outputs) override { - // If execution yielded multiple chunks (because large arrays were split - // based on the ExecContext parameters, then the result is a ChunkedArray - if (HaveChunkedArray(inputs) || outputs.size() > 1) { - return ToChunkedArray(outputs, output_type_); - } else { - // Outputs have just one element - return outputs[0]; + // Execute a single batch with a selection vector "densely" for a kernel that doesn't + // support selective execution. "Densely" here means that we first gather the rows + // indicated by the selection vector into a contiguous ExecBatch, execute that, and + // then scatter the result back to the original row positions in the output. + Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) { + DCHECK(batch.selection_vector && !kernel_->selective_exec); + + if (CheckIfAllScalar(batch)) { + // For all-scalar batch, we can skip the gather/scatter steps as if there is no + // selection vector - the result is a scalar anyway. + ExecBatch input = batch; + input.selection_vector = nullptr; + return ExecuteBatch(input, listener); } + + std::vector values(batch.num_values()); + for (int i = 0; i < batch.num_values(); ++i) { + if (batch[i].is_scalar()) { + // XXX: Skip gather for scalars since it is not currently supported by Take. + values[i] = batch[i]; + } else { + ARROW_ASSIGN_OR_RAISE(values[i], + Take(batch[i], *batch.selection_vector->data(), + TakeOptions{/*boundcheck=*/false}, exec_context())); + } + } + ARROW_ASSIGN_OR_RAISE( + ExecBatch input, + ExecBatch::Make(std::move(values), batch.selection_vector->length())); + + DatumAccumulator dense_listener; + RETURN_NOT_OK(ExecuteBatch(input, &dense_listener)); + Datum dense_result = WrapResults(input.values, dense_listener.values()); + + ARROW_ASSIGN_OR_RAISE(auto result, + Scatter(dense_result, *batch.selection_vector->data(), + ScatterOptions{/*max_index=*/batch.length - 1})); + return listener->OnResult(std::move(result)); } - protected: Status EmitResult(std::shared_ptr out, ExecListener* listener) { if (span_iterator_.have_all_scalars()) { // ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to @@ -842,6 +933,11 @@ class ScalarExecutor : public KernelExecutorImpl { // eventually skip the creation of ArrayData altogether std::shared_ptr preallocation; ExecSpan input; + SelectionVectorSpan selection; + SelectionVectorSpan* selection_ptr = nullptr; + if (span_iterator_.have_selection_vector()) { + selection_ptr = &selection; + } ExecResult output; ArraySpan* output_span = output.array_span_mutable(); @@ -853,10 +949,10 @@ class ScalarExecutor : public KernelExecutorImpl { output_span->SetMembers(*preallocation); output_span->offset = 0; int64_t result_offset = 0; - while (span_iterator_.Next(&input)) { + while (span_iterator_.Next(&input, selection_ptr)) { // Set absolute output span position and length output_span->SetSlice(result_offset, input.length); - RETURN_NOT_OK(ExecuteSingleSpan(input, &output)); + RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output)); result_offset = span_iterator_.position(); } @@ -866,10 +962,10 @@ class ScalarExecutor : public KernelExecutorImpl { // Fully preallocating, but not contiguously // We preallocate (maybe) only for the output of processing the current // chunk - while (span_iterator_.Next(&input)) { + while (span_iterator_.Next(&input, selection_ptr)) { ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(input.length)); output_span->SetMembers(*preallocation); - RETURN_NOT_OK(ExecuteSingleSpan(input, &output)); + RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output)); // Emit the result for this chunk RETURN_NOT_OK(EmitResult(std::move(preallocation), listener)); } @@ -877,7 +973,8 @@ class ScalarExecutor : public KernelExecutorImpl { } } - Status ExecuteSingleSpan(const ExecSpan& input, ExecResult* out) { + Status ExecuteSingleSpan(const ExecSpan& input, const SelectionVectorSpan* selection, + ExecResult* out) { ArraySpan* result_span = out->array_span_mutable(); if (output_type_.type->id() == Type::NA) { result_span->null_count = result_span->length; @@ -888,7 +985,7 @@ class ScalarExecutor : public KernelExecutorImpl { } else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) { result_span->null_count = 0; } - RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, out)); + RETURN_NOT_OK(ExecuteKernel(input, selection, out)); // Output type didn't change DCHECK(out->is_array_span()); return Status::OK(); @@ -903,8 +1000,13 @@ class ScalarExecutor : public KernelExecutorImpl { // We will eventually delete the Scalar output path per // ARROW-16757. ExecSpan input; + SelectionVectorSpan selection; + SelectionVectorSpan* selection_ptr = nullptr; + if (span_iterator_.have_selection_vector()) { + selection_ptr = &selection; + } ExecResult output; - while (span_iterator_.Next(&input)) { + while (span_iterator_.Next(&input, selection_ptr)) { ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length)); DCHECK(output.is_array_data()); @@ -917,7 +1019,7 @@ class ScalarExecutor : public KernelExecutorImpl { out_arr->null_count = 0; } - RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, &output)); + RETURN_NOT_OK(ExecuteKernel(input, selection_ptr, &output)); // Output type didn't change DCHECK(output.is_array_data()); @@ -983,6 +1085,17 @@ class ScalarExecutor : public KernelExecutorImpl { return Status::OK(); } + // Actually invoke the kernel on the given input span, either selectively if there is a + // selection or non-selectively otherwise. + Status ExecuteKernel(const ExecSpan& input, const SelectionVectorSpan* selection, + ExecResult* out) { + if (selection) { + DCHECK_NE(kernel_->selective_exec, nullptr); + return kernel_->selective_exec(kernel_ctx_, input, *selection, out); + } + return kernel_->exec(kernel_ctx_, input, out); + } + // Used to account for the case where we do not preallocate a // validity bitmap because the inputs are all non-null and we're // using NullHandling::INTERSECTION to compute the validity bitmap @@ -1345,18 +1458,53 @@ const CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); } SelectionVector::SelectionVector(std::shared_ptr data) : data_(std::move(data)) { - DCHECK_EQ(Type::INT32, data_->type->id()); - DCHECK_EQ(0, data_->GetNullCount()); + DCHECK_NE(data_, nullptr); + DCHECK_EQ(data_->type->id(), Type::INT32); indices_ = data_->GetValues(1); } SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {} -int32_t SelectionVector::length() const { return static_cast(data_->length); } +int64_t SelectionVector::length() const { return data_->length; } + +Status SelectionVector::Validate(int64_t values_length) const { + if (data_ == nullptr) { + return Status::Invalid("SelectionVector not initialized"); + } + ARROW_CHECK_NE(indices_, nullptr); + if (data_->type->id() != Type::INT32) { + return Status::Invalid("SelectionVector must be of type int32"); + } + if (data_->GetNullCount() != 0) { + return Status::Invalid("SelectionVector cannot contain nulls"); + } + for (int64_t i = 1; i < length(); ++i) { + if (indices_[i - 1] > indices_[i]) { + return Status::Invalid("SelectionVector indices must be sorted"); + } + } + for (int64_t i = 0; i < length(); ++i) { + if (indices_[i] < 0) { + return Status::Invalid("SelectionVector indices must be non-negative"); + } + } + if (values_length >= 0) { + for (int64_t i = 0; i < length(); ++i) { + if (indices_[i] >= values_length) { + return Status::Invalid("SelectionVector index ", indices_[i], + " >= values length ", values_length); + } + } + } + return Status::OK(); +} -Result> SelectionVector::FromMask( - const BooleanArray& arr) { - return Status::NotImplemented("FromMask"); +void SelectionVectorSpan::SetSlice(int64_t offset, int64_t length, + int32_t index_back_shift) { + DCHECK_NE(indices_, nullptr); + offset_ = offset; + length_ = length; + index_back_shift_ = index_back_shift; } Result CallFunction(const std::string& func_name, const std::vector& args, diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index dae7e1ea686..7ab78163861 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -131,8 +131,6 @@ class ARROW_EXPORT ExecContext { /// implementations. This is especially relevant for aggregations but also /// applies to scalar operations. /// -/// We are not yet using this so this is mostly a placeholder for now. -/// /// [1]: http://cidrdb.org/cidr2005/papers/P19.pdf class ARROW_EXPORT SelectionVector { public: @@ -140,17 +138,53 @@ class ARROW_EXPORT SelectionVector { explicit SelectionVector(const Array& arr); - /// \brief Create SelectionVector from boolean mask - static Result> FromMask(const BooleanArray& arr); - + std::shared_ptr data() const { return data_; } const int32_t* indices() const { return indices_; } - int32_t length() const; + int64_t length() const; + + Status Validate(int64_t values_length = -1) const; private: std::shared_ptr data_; const int32_t* indices_; }; +/// \brief A span of a SelectionVector's indices. Can represent a slice of the +/// underlying indices. +/// +/// Note that as an indirection of indices to the data in an ExecBatch, when sliced +/// along with the batch, the indices themselves need to be back-shifted to be relative to +/// the batch slice (ExecSpan). For example, consider an ExecBatch of 10 rows with a +/// SelectionVector [0, 1, 9] is to be executed per-8-rows. The first slice of the batch +/// will have row 0 to row 7 of the original batch with selection slice [0, 1]. The second +/// slice of the batch will have row 8 and row 9 of the original batch however they are +/// referred to as row 0 and row 1 by the kernel. Therefore the second selection slice +/// should be [9 - 8] = [1]. This is done by setting index_back_shift to 8 for the second +/// selection slice. +class ARROW_EXPORT SelectionVectorSpan { + public: + explicit SelectionVectorSpan(const int32_t* indices = NULLPTR, int64_t length = 0, + int64_t offset = 0, int32_t index_back_shift = 0) + : indices_(indices), + length_(length), + offset_(offset), + index_back_shift_(index_back_shift) {} + + void SetSlice(int64_t offset, int64_t length, int32_t index_back_shift = 0); + + int32_t operator[](int64_t i) const { + return indices_[i + offset_] - index_back_shift_; + } + + int64_t length() const { return length_; } + + private: + const int32_t* indices_; + int64_t length_; + int64_t offset_; + int32_t index_back_shift_; +}; + /// An index to represent that a batch does not belong to an ordered stream constexpr int64_t kUnsequencedIndex = -1; @@ -173,8 +207,11 @@ constexpr int64_t kUnsequencedIndex = -1; struct ARROW_EXPORT ExecBatch { ExecBatch() = default; - ExecBatch(std::vector values, int64_t length) - : values(std::move(values)), length(length) {} + ExecBatch(std::vector values, int64_t length, + std::shared_ptr selection_vector = NULLPTR) + : values(std::move(values)), + length(length), + selection_vector(std::move(selection_vector)) {} explicit ExecBatch(const RecordBatch& batch); @@ -196,13 +233,6 @@ struct ARROW_EXPORT ExecBatch { /// exec function for processing. std::vector values; - /// A deferred filter represented as an array of indices into the values. - /// - /// For example, the filter [true, true, false, true] would be represented as - /// the selection vector [0, 1, 3]. When the selection vector is set, - /// ExecBatch::length is equal to the length of this array. - std::shared_ptr selection_vector; - /// A predicate Expression guaranteed to evaluate to true for all rows in this batch. Expression guarantee = literal(true); @@ -218,6 +248,13 @@ struct ARROW_EXPORT ExecBatch { /// whether any values are Scalar. int64_t length = 0; + /// A deferred filter represented as an array of indices into the values. + /// + /// For example, the filter [true, true, false, true] would be represented as + /// the selection vector [0, 1, 3]. When the selection vector is set, + /// ExecBatch::length is equal to the length of this array. + std::shared_ptr selection_vector; + /// \brief index of this batch in a sorted stream of batches /// /// This index must be strictly monotonic starting at 0 without gaps or diff --git a/cpp/src/arrow/compute/exec_benchmark.cc b/cpp/src/arrow/compute/exec_benchmark.cc new file mode 100644 index 00000000000..8900d1567dd --- /dev/null +++ b/cpp/src/arrow/compute/exec_benchmark.cc @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/compute/exec_internal.h" +#include "arrow/compute/expression.h" +#include "arrow/compute/function_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/compute/registry.h" +#include "arrow/testing/generator.h" +#include "arrow/util/logging.h" + +namespace arrow::compute { + +namespace { + +// A trivial kernel that just keeps the CPU busy for a specified number of iterations per +// input row. Has both regular and selective variants. Used to benchmark the overhead of +// the execution framework. + +struct SpinOptions : public FunctionOptions { + explicit SpinOptions(int64_t count = 0); + static constexpr char kTypeName[] = "SpinOptions"; + static SpinOptions Defaults() { return SpinOptions(); } + int64_t count = 0; +}; + +static auto kSpinOptionsType = internal::GetFunctionOptionsType( + arrow::internal::DataMember("count", &SpinOptions::count)); + +SpinOptions::SpinOptions(int64_t count) + : FunctionOptions(kSpinOptionsType), count(count) {} + +const SpinOptions* GetDefaultSpinOptions() { + static const auto kDefaultSpinOptions = SpinOptions::Defaults(); + return &kDefaultSpinOptions; +} + +using SpinState = internal::OptionsWrapper; + +inline void Spin(volatile int64_t count) { + while (count-- > 0) { + // Do nothing, just burn CPU cycles. + } +} + +Status SpinExec(KernelContext* ctx, const ExecSpan& span, ExecResult* out) { + ARROW_CHECK_EQ(span.num_values(), 1); + const auto& arg = span[0]; + ARROW_CHECK(arg.is_array()); + + int64_t count = SpinState::Get(ctx).count; + for (int64_t i = 0; i < arg.length(); ++i) { + Spin(count); + } + *out->array_data_mutable() = *arg.array.ToArrayData(); + return Status::OK(); +} + +Status SpinSelectiveExec(KernelContext* ctx, const ExecSpan& span, + const SelectionVectorSpan& selection_span, ExecResult* out) { + ARROW_CHECK_EQ(span.num_values(), 1); + const auto& arg = span[0]; + ARROW_CHECK(arg.is_array()); + + int64_t count = SpinState::Get(ctx).count; + detail::VisitSelectionVectorSpanInline(selection_span, [&](int64_t i) { Spin(count); }); + *out->array_data_mutable() = *arg.array.ToArrayData(); + return Status::OK(); +} + +Status RegisterSpinFunction() { + auto registry = GetFunctionRegistry(); + + if (registry->CanAddFunctionOptionsType(kSpinOptionsType).ok()) { + RETURN_NOT_OK(registry->AddFunctionOptionsType(kSpinOptionsType)); + } + + auto register_spin_function = [&](std::string name, ArrayKernelExec exec, + ArrayKernelSelectiveExec selective_exec) { + auto func = std::make_shared( + std::move(name), Arity::Unary(), FunctionDoc::Empty(), GetDefaultSpinOptions()); + ScalarKernel kernel({InputType::Any()}, internal::FirstType, exec, selective_exec, + SpinState::Init); + kernel.can_write_into_slices = false; + kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; + kernel.mem_allocation = MemAllocation::NO_PREALLOCATE; + RETURN_NOT_OK(func->AddKernel(kernel)); + if (registry->CanAddFunction(func, /*allow_overwrite=*/false).ok()) { + RETURN_NOT_OK(registry->AddFunction(std::move(func))); + } + return Status::OK(); + }; + + // Register two variants, one with selective exec and one without. + RETURN_NOT_OK(register_spin_function("spin_selective", SpinExec, SpinSelectiveExec)); + RETURN_NOT_OK(register_spin_function("spin", SpinExec, /*selective_exec=*/nullptr)); + + return Status::OK(); +} + +std::shared_ptr MakeSelectionVectorTo(int64_t length) { + auto res = gen::Step()->Generate(length); + ARROW_CHECK_OK(res.status()); + auto arr = res.ValueUnsafe(); + return std::make_shared(*arr); +} + +} // namespace + +void BenchmarkExec(benchmark::State& state, std::string spin_function, + int64_t kernel_intensity, std::shared_ptr input, + std::shared_ptr selection = nullptr) { + static auto registered = RegisterSpinFunction(); + ARROW_CHECK_OK(registered); + + auto expr = + call(std::move(spin_function), {field_ref(0)}, SpinOptions(kernel_intensity)); + auto bound = expr.Bind(*schema({field("", input->type())})).ValueOrDie(); + auto length = input->length(); + auto batch = ExecBatch{{std::move(input)}, length, std::move(selection)}; + + for (auto _ : state) { + ARROW_CHECK_OK(ExecuteScalarExpression(bound, batch).status()); + } + + state.SetItemsProcessed(state.iterations() * length); +} + +// Baseline: Run the spin kernel without selection vector. +static void BM_ExecBaseline(benchmark::State& state) { + const int64_t kernel_intensity = state.range(0); + const int64_t num_rows = state.range(1); + + auto input = ConstantArrayGenerator::Int32(num_rows, 0); + BenchmarkExec(state, "spin", kernel_intensity, std::move(input)); +} + +// Selective: Run the spin kernel with a selection vector, either sparsely or densely, +// depending on whether the spin kernel has a selective exec implementation. +static void BM_ExecSelective(benchmark::State& state, std::string spin_function) { + const int64_t selectivity = state.range(0); + ARROW_CHECK(selectivity >= 0 && selectivity <= 100); + const int64_t kernel_intensity = state.range(1); + const int64_t num_rows = state.range(2); + + auto input = ConstantArrayGenerator::Int32(num_rows, 0); + auto selection = + MakeSelectionVectorTo(static_cast(num_rows * selectivity / 100)); + BenchmarkExec(state, std::move(spin_function), kernel_intensity, std::move(input), + std::move(selection)); +} + +const char* kSelectivityArgName = "selectivity"; +const std::vector kSelectivityArg{0, 20, 50, 100}; +const char* kKernelIntensityArgName = "kernel_intensity"; +const std::vector kKernelIntensityArg = benchmark::CreateDenseRange(0, 100, 20); +const char* kNumRowsArgName = "num_rows"; +const std::vector kNumRowsArg{4096}; + +BENCHMARK(BM_ExecBaseline) + ->ArgNames({kKernelIntensityArgName, kNumRowsArgName}) + ->ArgsProduct({kKernelIntensityArg, kNumRowsArg}); + +BENCHMARK_CAPTURE(BM_ExecSelective, sparse, "spin_selective") + ->ArgNames({kSelectivityArgName, kKernelIntensityArgName, kNumRowsArgName}) + ->ArgsProduct({kSelectivityArg, kKernelIntensityArg, kNumRowsArg}); + +BENCHMARK_CAPTURE(BM_ExecSelective, dense, "spin") + ->ArgNames({kSelectivityArgName, kKernelIntensityArgName, kNumRowsArgName}) + ->ArgsProduct({kSelectivityArg, kKernelIntensityArg, kNumRowsArg}); + +} // namespace arrow::compute diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index 7e4f364a928..744764e639a 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -28,6 +28,7 @@ #include "arrow/compute/exec.h" #include "arrow/compute/kernel.h" #include "arrow/status.h" +#include "arrow/util/functional.h" #include "arrow/util/visibility.h" namespace arrow { @@ -57,21 +58,22 @@ class ARROW_EXPORT ExecSpanIterator { Status Init(const ExecBatch& batch, int64_t max_chunksize = kDefaultMaxChunksize, bool promote_if_all_scalars = true); - /// \brief Compute the next span by updating the state of the - /// previous span object. You must keep passing in the previous - /// value for the results to be consistent. If you need to process - /// in parallel, make a copy of the in-use ExecSpan while it's being - /// used by another thread and pass it into Next. This function - /// always populates at least one span. If you call this function - /// with a blank ExecSpan after the first iteration, it will not - /// work correctly (maybe we will change this later). Return false - /// if the iteration is exhausted - bool Next(ExecSpan* span); + /// \brief Compute the next span of the data and optionally the selection by updating + /// the state of the previous span objects. You must keep passing in the previous value + /// for the results to be consistent. If you need to process in parallel, make a copy of + /// the in-use ExecSpan while it's being used by another thread and pass it into Next. + /// This function always populates at least one span. If you call this function with a + /// blank ExecSpan after the first iteration, it will not work correctly (maybe we will + /// change this later). Return false if the iteration is exhausted + bool Next(ExecSpan* span, SelectionVectorSpan* selection_span = NULLPTR); int64_t length() const { return length_; } + int64_t selection_length() const { return selection_length_; } int64_t position() const { return position_; } + int64_t selection_position() const { return selection_position_; } bool have_all_scalars() const { return have_all_scalars_; } + bool have_selection_vector() const { return selection_vector_ != NULLPTR; } private: ExecSpanIterator(const std::vector& args, int64_t length, int64_t max_chunksize); @@ -83,6 +85,7 @@ class ARROW_EXPORT ExecSpanIterator { bool have_all_scalars_ = false; bool promote_if_all_scalars_ = true; const std::vector* args_; + SelectionVector* selection_vector_ = NULLPTR; std::vector chunk_indexes_; std::vector value_positions_; @@ -93,6 +96,8 @@ class ARROW_EXPORT ExecSpanIterator { std::vector value_offsets_; int64_t position_ = 0; int64_t length_ = 0; + int64_t selection_length_ = 0; + int64_t selection_position_ = 0; int64_t max_chunksize_; }; @@ -148,6 +153,7 @@ class ARROW_EXPORT KernelExecutor { static std::unique_ptr MakeScalarAggregate(); }; +ARROW_EXPORT int64_t InferBatchLength(const std::vector& values, bool* all_same); /// \brief Populate validity bitmap with the intersection of the nullity of the @@ -165,6 +171,25 @@ Status PropagateNulls(KernelContext* ctx, const ExecSpan& batch, ArrayData* out) ARROW_EXPORT void PropagateNullsSpans(const ExecSpan& batch, ArraySpan* out); +template +typename ::arrow::internal::call_traits::enable_if_return::type +VisitSelectionVectorSpanInline(const SelectionVectorSpan& selection, + OnSelectionFn&& on_selection) { + for (int64_t i = 0; i < selection.length(); ++i) { + RETURN_NOT_OK(on_selection(selection[i])); + } + return Status::OK(); +} + +template +typename ::arrow::internal::call_traits::enable_if_return::type +VisitSelectionVectorSpanInline(const SelectionVectorSpan& selection, + OnSelectionFn&& on_selection) { + for (int64_t i = 0; i < selection.length(); ++i) { + on_selection(selection[i]); + } +} + } // namespace detail } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 8314ad1d5c3..25f77b4a023 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -30,11 +30,13 @@ #include "arrow/chunked_array.h" #include "arrow/compute/exec.h" #include "arrow/compute/exec_internal.h" +#include "arrow/compute/expression.h" #include "arrow/compute/function.h" #include "arrow/compute/function_internal.h" #include "arrow/compute/kernel.h" #include "arrow/compute/ordering.h" #include "arrow/compute/registry.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/scalar.h" @@ -146,11 +148,64 @@ TEST(ExecContext, BasicWorkings) { } TEST(SelectionVector, Basics) { - auto indices = ArrayFromJSON(int32(), "[0, 3]"); - auto sel_vector = std::make_shared(*indices); + auto sel_vector = SelectionVectorFromJSON("[0, 42]"); - ASSERT_EQ(indices->length(), sel_vector->length()); - ASSERT_EQ(3, sel_vector->indices()[1]); + ASSERT_EQ(sel_vector->length(), 2); + ASSERT_EQ(sel_vector->indices()[0], 0); + ASSERT_EQ(sel_vector->indices()[1], 42); +} + +TEST(SelectionVector, Validate) { + { + auto sel_vector = SelectionVectorFromJSON("[]"); + ASSERT_OK(sel_vector->Validate()); + } + { + auto sel_vector = SelectionVectorFromJSON("[0, null, 42]"); + ASSERT_RAISES(Invalid, sel_vector->Validate()); + } + { + auto sel_vector = SelectionVectorFromJSON("[42, 0]"); + ASSERT_RAISES(Invalid, sel_vector->Validate()); + } + { + auto sel_vector = SelectionVectorFromJSON("[-42, 0]"); + ASSERT_RAISES(Invalid, sel_vector->Validate()); + } + { + auto sel_vector = SelectionVectorFromJSON("[]"); + ASSERT_OK(sel_vector->Validate(/*values_length=*/0)); + } + { + auto sel_vector = SelectionVectorFromJSON("[0]"); + ASSERT_RAISES(Invalid, sel_vector->Validate(/*values_length=*/0)); + } + { + auto sel_vector = SelectionVectorFromJSON("[0, 41]"); + ASSERT_OK(sel_vector->Validate(/*values_length=*/42)); + } + { + auto sel_vector = SelectionVectorFromJSON("[0, 42]"); + ASSERT_RAISES(Invalid, sel_vector->Validate(/*values_length=*/42)); + } +} + +TEST(SelectionVectorSpan, Basics) { + auto indices = ArrayFromJSON(int32(), "[0, 3, 7]"); + SelectionVectorSpan sel_span(indices->data()->GetValues(1), + indices->length() - 1, + /*offset=*/1, /*index_back_shift=*/1); + ASSERT_EQ(sel_span[0], 2); + ASSERT_EQ(sel_span[1], 6); + + sel_span.SetSlice(/*offset=*/1, /*length=*/2, /*index_back_shift=*/0); + ASSERT_EQ(sel_span[0], 3); + ASSERT_EQ(sel_span[1], 7); + + sel_span.SetSlice(/*offset=*/0, /*length=*/3); + ASSERT_EQ(sel_span[0], 0); + ASSERT_EQ(sel_span[1], 3); + ASSERT_EQ(sel_span[2], 7); } void AssertValidityZeroExtraBits(const uint8_t* data, int64_t length, int64_t offset) { @@ -732,13 +787,23 @@ class TestExecSpanIterator : public TestComputeInternals { } void CheckIteration(const ExecBatch& input, int chunksize, const std::vector& ex_batch_sizes) { + ASSERT_EQ(input.selection_vector, nullptr); + std::vector ex_selection_sizes(ex_batch_sizes.size(), 0); + return CheckIteration(input, chunksize, ex_batch_sizes, ex_selection_sizes); + } + void CheckIteration(const ExecBatch& input, int chunksize, + const std::vector& ex_batch_sizes, + const std::vector& ex_selection_sizes) { SetupIterator(input, chunksize); ExecSpan batch; - int64_t position = 0; + SelectionVectorSpan selection; + int64_t position = 0, selection_position = 0; for (size_t i = 0; i < ex_batch_sizes.size(); ++i) { ASSERT_EQ(position, iterator_.position()); - ASSERT_TRUE(iterator_.Next(&batch)); + ASSERT_EQ(selection_position, iterator_.selection_position()); + ASSERT_TRUE(iterator_.Next(&batch, &selection)); ASSERT_EQ(ex_batch_sizes[i], batch.length); + ASSERT_EQ(ex_selection_sizes[i], selection.length()); for (size_t j = 0; j < input.values.size(); ++j) { switch (input[j].kind()) { @@ -764,12 +829,22 @@ class TestExecSpanIterator : public TestComputeInternals { break; } } + if (iterator_.have_selection_vector()) { + for (int64_t j = 0; j < selection.length(); ++j) { + ASSERT_EQ(input.selection_vector->indices()[selection_position + j] - position, + selection[j]); + ASSERT_GE(selection[j], 0); + ASSERT_LT(selection[j], batch.length); + } + } position += ex_batch_sizes[i]; + selection_position += ex_selection_sizes[i]; } // Ensure that the iterator is exhausted - ASSERT_FALSE(iterator_.Next(&batch)); + ASSERT_FALSE(iterator_.Next(&batch, &selection)); ASSERT_EQ(iterator_.length(), iterator_.position()); + ASSERT_EQ(iterator_.selection_length(), iterator_.selection_position()); } protected: @@ -881,9 +956,158 @@ TEST_F(TestExecSpanIterator, ZeroLengthInputs) { CheckArgs(input); } +TEST_F(TestExecSpanIterator, SelectionSpanBasic) { + ExecBatch batch( + {Datum(GetInt32Array(30)), Datum(GetInt32Array(30)), + Datum(std::make_shared(5)), Datum(MakeNullScalar(boolean()))}, + 30, SelectionVectorFromJSON("[1, 2, 7, 29]")); + + CheckIteration(batch, /*chunksize=*/7, {7, 7, 7, 7, 2}, {2, 1, 0, 0, 1}); + CheckIteration(batch, /*chunksize=*/10, {10, 10, 10}, {3, 0, 1}); + CheckIteration(batch, /*chunksize=*/20, {20, 10}, {3, 1}); + CheckIteration(batch, /*chunksize=*/30, {30}, {4}); +} + +TEST_F(TestExecSpanIterator, SelectionSpanChunked) { + ExecBatch batch({Datum(GetInt32Chunked({0, 20, 10})), Datum(GetInt32Chunked({15, 15})), + Datum(GetInt32Array(30)), Datum(std::make_shared(5)), + Datum(MakeNullScalar(boolean()))}, + 30, SelectionVectorFromJSON("[1, 2, 7, 29]")); + + CheckIteration(batch, /*chunksize=*/7, {7, 7, 1, 5, 7, 3}, {2, 1, 0, 0, 0, 1}); + CheckIteration(batch, /*chunksize=*/10, {10, 5, 5, 10}, {3, 0, 0, 1}); + CheckIteration(batch, /*chunksize=*/20, {15, 5, 10}, {3, 0, 1}); + CheckIteration(batch, /*chunksize=*/30, {15, 5, 10}, {3, 0, 1}); +} + // ---------------------------------------------------------------------- // Scalar function execution +template +void VisitIndicesWithSelection(int64_t length, const SelectionVectorSpan& selection, + OnSelectedFn&& on_selected, + OnNonSelectedFn&& on_non_selected) { + int64_t selected = 0; + for (int64_t i = 0; i < length; ++i) { + if (selected < selection.length() && i == selection[selected]) { + on_selected(i); + ++selected; + } else { + on_non_selected(i); + } + } +} + +constexpr uint8_t kNonSelectedByte = 0xFE; + +void AssertArraysEqualSparseWithSelection(const Array& src, + const SelectionVectorSpan& selection, + const Array& dst) { + ASSERT_EQ(src.length(), dst.length()); + ASSERT_EQ(src.type()->id(), dst.type()->id()); + + int value_size = src.type()->byte_width(); + const uint8_t* src_validity = src.data()->buffers[0]->data(); + const uint8_t* dst_validity = dst.data()->buffers[0]->data(); + const uint8_t* src_data = src.data()->buffers[1]->data(); + const uint8_t* dst_data = dst.data()->buffers[1]->data(); + int64_t src_offset = src.data()->offset; + int64_t dst_offset = dst.data()->offset; + + VisitIndicesWithSelection( + src.length(), selection, + [&](int64_t i) { + // Selected values should match + ASSERT_EQ(bit_util::GetBit(src_validity, src_offset + i), + bit_util::GetBit(dst_validity, dst_offset + i)); + if (bit_util::GetBit(src_validity, src_offset + i)) { + ASSERT_EQ(memcmp(src_data + (src_offset + i) * value_size, + dst_data + (dst_offset + i) * value_size, value_size), + 0); + } + }, + [&](int64_t i) { + // Non-selected values should be the valid special value in the output + ASSERT_TRUE(bit_util::GetBit(dst_validity, dst_offset + i)); + for (int j = 0; j < value_size; ++j) { + ASSERT_EQ(dst_data[(dst_offset + i) * value_size + j], kNonSelectedByte); + } + }); +} + +void AssertArraysEqualDenseWithSelection(const Array& src, + const SelectionVectorSpan& selection, + const Array& dst) { + ASSERT_EQ(src.length(), dst.length()); + ASSERT_EQ(src.type()->id(), dst.type()->id()); + + int value_size = src.type()->byte_width(); + const uint8_t* src_validity = src.data()->buffers[0]->data(); + const uint8_t* dst_validity = dst.data()->buffers[0]->data(); + const uint8_t* src_data = src.data()->buffers[1]->data(); + const uint8_t* dst_data = dst.data()->buffers[1]->data(); + int64_t src_offset = src.data()->offset; + int64_t dst_offset = dst.data()->offset; + + VisitIndicesWithSelection( + src.length(), selection, + [&](int64_t i) { + // Selected values should match + ASSERT_EQ(bit_util::GetBit(src_validity, src_offset + i), + bit_util::GetBit(dst_validity, dst_offset + i)); + if (bit_util::GetBit(src_validity, src_offset + i)) { + ASSERT_EQ(memcmp(src_data + (src_offset + i) * value_size, + dst_data + (dst_offset + i) * value_size, value_size), + 0); + } + }, + [&](int64_t i) { + // Non-selected values should be invalid in the output + ASSERT_FALSE(bit_util::GetBit(dst_validity, dst_offset + i)); + }); +} + +void AssertChunkedExecResultsEqualSparseWithSelection(int64_t exec_chunksize, + const Array& input, + const SelectionVector* selection, + const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + const ChunkedArray& carr = *result.chunked_array(); + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ASSERT_EQ(bit_util::CeilDiv(input.length(), exec_chunksize), carr.num_chunks()); + int64_t selection_idx = 0; + for (int i = 0; i < carr.num_chunks(); ++i) { + auto next_selection_idx = selection_idx; + while (next_selection_idx < selection->length() && + selection->indices()[next_selection_idx] < exec_chunksize * (i + 1)) { + ++next_selection_idx; + } + selection_span.SetSlice(selection_idx, next_selection_idx - selection_idx, + static_cast(exec_chunksize * i)); + selection_idx = next_selection_idx; + AssertArraysEqualSparseWithSelection( + *input.Slice(exec_chunksize * i, + std::min(exec_chunksize, input.length() - exec_chunksize * i)), + selection_span, *carr.chunk(i)); + } +} + +void AssertChunkedExecResultsEqualDenseWithSelection(int64_t exec_chunksize, + const Array& input, + const SelectionVector* selection, + const Datum& result) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + if (selection_span.length() <= exec_chunksize) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualDenseWithSelection(input, selection_span, *result.make_array()); + } else { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + const ChunkedArray& carr = *result.chunked_array(); + ASSERT_EQ(1, carr.num_chunks()); + AssertArraysEqualDenseWithSelection(input, selection_span, *carr.chunk(0)); + } +} + Status ExecCopyArrayData(KernelContext*, const ExecSpan& batch, ExecResult* out) { DCHECK_EQ(1, batch.num_values()); int value_size = batch[0].type()->byte_width(); @@ -896,6 +1120,32 @@ Status ExecCopyArrayData(KernelContext*, const ExecSpan& batch, ExecResult* out) return Status::OK(); } +Status SelectiveExecCopyArrayData(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, ExecResult* out) { + DCHECK_EQ(1, batch.num_values()); + int value_size = batch[0].type()->byte_width(); + + const ArraySpan& arg0 = batch[0].array; + ArrayData* out_arr = out->array_data().get(); + uint8_t* dst_validity = out_arr->buffers[0]->mutable_data(); + int64_t dst_validity_offset = out_arr->offset; + uint8_t* dst = out_arr->buffers[1]->mutable_data() + out_arr->offset * value_size; + const uint8_t* src = arg0.buffers[1].data + arg0.offset * value_size; + VisitIndicesWithSelection( + batch.length, selection, + [&](int64_t i) { + // Copy the selected value + std::memcpy(dst + i * value_size, src + i * value_size, value_size); + }, + [&](int64_t i) { + // Set the non-selected as valid (regardless of its precomputed validity) and set + // its values with a special value + bit_util::SetBit(dst_validity, dst_validity_offset + i); + std::memset(dst + i * value_size, kNonSelectedByte, value_size); + }); + return Status::OK(); +} + Status ExecCopyArraySpan(KernelContext*, const ExecSpan& batch, ExecResult* out) { DCHECK_EQ(1, batch.num_values()); int value_size = batch[0].type()->byte_width(); @@ -907,6 +1157,31 @@ Status ExecCopyArraySpan(KernelContext*, const ExecSpan& batch, ExecResult* out) return Status::OK(); } +Status SelectiveExecCopyArraySpan(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, ExecResult* out) { + DCHECK_EQ(1, batch.num_values()); + int value_size = batch[0].type()->byte_width(); + const ArraySpan& arg0 = batch[0].array; + ArraySpan* out_arr = out->array_span_mutable(); + uint8_t* dst_validity = out_arr->buffers[0].data; + int64_t dst_validity_offset = out_arr->offset; + uint8_t* dst = out_arr->buffers[1].data + out_arr->offset * value_size; + const uint8_t* src = arg0.buffers[1].data + arg0.offset * value_size; + VisitIndicesWithSelection( + batch.length, selection, + [&](int64_t i) { + // Copy the selected value + std::memcpy(dst + i * value_size, src + i * value_size, value_size); + }, + [&](int64_t i) { + // Set the non-selected as valid (regardless of its precomputed validity) and set + // its values with a special value + bit_util::SetBit(dst_validity, dst_validity_offset + i); + std::memset(dst + i * value_size, kNonSelectedByte, value_size); + }); + return Status::OK(); +} + Status ExecComputedBitmap(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { // Propagate nulls not used. Check that the out bitmap isn't the same already // as the input bitmap @@ -923,6 +1198,24 @@ Status ExecComputedBitmap(KernelContext* ctx, const ExecSpan& batch, ExecResult* return ExecCopyArraySpan(ctx, batch, out); } +Status SelectiveExecComputedBitmap(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, + ExecResult* out) { + // Propagate nulls not used. Check that the out bitmap isn't the same already + // as the input bitmap + const ArraySpan& arg0 = batch[0].array; + ArraySpan* out_arr = out->array_span_mutable(); + if (CountSetBits(arg0.buffers[0].data, arg0.offset, batch.length) > 0) { + // Check that the bitmap has not been already copied over + DCHECK(!BitmapEquals(arg0.buffers[0].data, arg0.offset, out_arr->buffers[0].data, + out_arr->offset, batch.length)); + } + + CopyBitmap(arg0.buffers[0].data, arg0.offset, batch.length, out_arr->buffers[0].data, + out_arr->offset); + return SelectiveExecCopyArraySpan(ctx, batch, selection, out); +} + Status ExecNoPreallocatedData(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { // Validity preallocated, but not the data @@ -934,6 +1227,18 @@ Status ExecNoPreallocatedData(KernelContext* ctx, const ExecSpan& batch, return ExecCopyArrayData(ctx, batch, out); } +Status SelectiveExecNoPreallocatedData(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, + ExecResult* out) { + // Validity preallocated, but not the data + ArrayData* out_arr = out->array_data().get(); + DCHECK_EQ(0, out_arr->offset); + int value_size = batch[0].type()->byte_width(); + Status s = (ctx->Allocate(out_arr->length * value_size).Value(&out_arr->buffers[1])); + DCHECK_OK(s); + return SelectiveExecCopyArrayData(ctx, batch, selection, out); +} + Status ExecNoPreallocatedAnything(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { // Neither validity nor data preallocated @@ -949,6 +1254,22 @@ Status ExecNoPreallocatedAnything(KernelContext* ctx, const ExecSpan& batch, return ExecNoPreallocatedData(ctx, batch, out); } +Status SelectiveExecNoPreallocatedAnything(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, + ExecResult* out) { + // Neither validity nor data preallocated + ArrayData* out_arr = out->array_data().get(); + DCHECK_EQ(0, out_arr->offset); + Status s = (ctx->AllocateBitmap(out_arr->length).Value(&out_arr->buffers[0])); + DCHECK_OK(s); + const ArraySpan& arg0 = batch[0].array; + CopyBitmap(arg0.buffers[0].data, arg0.offset, batch.length, + out_arr->buffers[0]->mutable_data(), /*offset=*/0); + + // Reuse the kernel that allocates the data + return SelectiveExecNoPreallocatedData(ctx, batch, selection, out); +} + class ExampleOptions : public FunctionOptions { public: explicit ExampleOptions(std::shared_ptr value); @@ -1003,6 +1324,33 @@ Status ExecStateful(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) return Status::OK(); } +Status SelectiveExecStateful(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, ExecResult* out) { + // We take the value from the state and multiply the data in batch[0] with it + ExampleState* state = static_cast(ctx->state()); + int32_t multiplier = checked_cast(*state->value).value; + + const ArraySpan& arg0 = batch[0].array; + ArraySpan* out_arr = out->array_span_mutable(); + const int32_t* arg0_data = arg0.GetValues(1); + uint8_t* dst_validity = out_arr->buffers[0].data; + int64_t dst_validity_offset = out_arr->offset; + int32_t* dst = out_arr->GetValues(1); + VisitIndicesWithSelection( + batch.length, selection, + [&](int64_t i) { + // Copy the selected value + dst[i] = arg0_data[i] * multiplier; + }, + [&](int64_t i) { + // Set the non-selected as valid (regardless of its precomputed validity) and set + // its values with a special value + bit_util::SetBit(dst_validity, dst_validity_offset + i); + memset(dst + i, kNonSelectedByte, sizeof(int32_t)); + }); + return Status::OK(); +} + Status ExecAddInt32(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { const int32_t* left_data = batch[0].array.GetValues(1); const int32_t* right_data = batch[1].array.GetValues(1); @@ -1013,6 +1361,29 @@ Status ExecAddInt32(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) return Status::OK(); } +Status SelectiveExecAddInt32(KernelContext* ctx, const ExecSpan& batch, + const SelectionVectorSpan& selection, ExecResult* out) { + const int32_t* left_data = batch[0].array.GetValues(1); + const int32_t* right_data = batch[1].array.GetValues(1); + ArraySpan* out_arr = out->array_span_mutable(); + uint8_t* dst_validity = out_arr->buffers[0].data; + int64_t dst_validity_offset = out_arr->offset; + int32_t* out_data = out_arr->GetValues(1); + VisitIndicesWithSelection( + batch.length, selection, + [&](int64_t i) { + // Copy the selected value + out_data[i] = left_data[i] + right_data[i]; + }, + [&](int64_t i) { + // Set the non-selected as valid (regardless of its precomputed validity) and set + // its values with a special value + bit_util::SetBit(dst_validity, dst_validity_offset + i); + memset(out_data + i, kNonSelectedByte, sizeof(int32_t)); + }); + return Status::OK(); +} + class TestCallScalarFunction : public TestComputeInternals { protected: static bool initialized_; @@ -1023,9 +1394,13 @@ class TestCallScalarFunction : public TestComputeInternals { if (!initialized_) { initialized_ = true; AddCopyFunctions(); + AddSelectiveCopyFunctions(); AddNoPreallocateFunctions(); + AddSelectiveNoPreallocateFunctions(); AddStatefulFunction(); + AddSelectiveStatefulFunction(); AddScalarFunction(); + AddSelectiveScalarFunction(); } } @@ -1052,6 +1427,34 @@ class TestCallScalarFunction : public TestComputeInternals { ASSERT_OK(registry->AddFunction(func2)); } + void AddSelectiveCopyFunctions() { + auto registry = GetFunctionRegistry(); + + // This function simply copies memory from the input argument into the + // (preallocated) output + auto func = std::make_shared("test_copy_selective", Arity::Unary(), + /*doc=*/FunctionDoc::Empty()); + + // Add a few kernels. Our implementation only accepts arrays + ASSERT_OK(func->AddKernel({uint8()}, uint8(), ExecCopyArraySpan, + SelectiveExecCopyArraySpan)); + ASSERT_OK(func->AddKernel({int32()}, int32(), ExecCopyArraySpan, + SelectiveExecCopyArraySpan)); + ASSERT_OK(func->AddKernel({float64()}, float64(), ExecCopyArraySpan, + SelectiveExecCopyArraySpan)); + ASSERT_OK(registry->AddFunction(func)); + + // A version which doesn't want the executor to call PropagateNulls + auto func2 = + std::make_shared("test_copy_computed_bitmap_selective", + Arity::Unary(), /*doc=*/FunctionDoc::Empty()); + ScalarKernel kernel({uint8()}, uint8(), ExecComputedBitmap, + SelectiveExecComputedBitmap); + kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE; + ASSERT_OK(func2->AddKernel(kernel)); + ASSERT_OK(registry->AddFunction(func2)); + } + void AddNoPreallocateFunctions() { auto registry = GetFunctionRegistry(); @@ -1074,6 +1477,32 @@ class TestCallScalarFunction : public TestComputeInternals { ASSERT_OK(registry->AddFunction(f2)); } + void AddSelectiveNoPreallocateFunctions() { + auto registry = GetFunctionRegistry(); + + // A function that allocates its own output memory. We have cases for both + // non-preallocated data and non-preallocated validity bitmap + auto f1 = + std::make_shared("test_nopre_data_selective", Arity::Unary(), + /*doc=*/FunctionDoc::Empty()); + auto f2 = + std::make_shared("test_nopre_validity_or_data_selective", + Arity::Unary(), /*doc=*/FunctionDoc::Empty()); + + ScalarKernel kernel({uint8()}, uint8(), ExecNoPreallocatedData, + SelectiveExecNoPreallocatedData); + kernel.mem_allocation = MemAllocation::NO_PREALLOCATE; + ASSERT_OK(f1->AddKernel(kernel)); + + kernel.exec = ExecNoPreallocatedAnything; + kernel.selective_exec = SelectiveExecNoPreallocatedAnything; + kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE; + ASSERT_OK(f2->AddKernel(kernel)); + + ASSERT_OK(registry->AddFunction(f1)); + ASSERT_OK(registry->AddFunction(f2)); + } + void AddStatefulFunction() { auto registry = GetFunctionRegistry(); @@ -1087,6 +1516,21 @@ class TestCallScalarFunction : public TestComputeInternals { ASSERT_OK(registry->AddFunction(func)); } + void AddSelectiveStatefulFunction() { + auto registry = GetFunctionRegistry(); + + // This function's behavior depends on a static parameter that is made + // available to the kernel's execution function through its Options object + auto func = + std::make_shared("test_stateful_selective", Arity::Unary(), + /*doc=*/FunctionDoc::Empty()); + + ScalarKernel kernel({int32()}, int32(), ExecStateful, SelectiveExecStateful, + InitStateful); + ASSERT_OK(func->AddKernel(kernel)); + ASSERT_OK(registry->AddFunction(func)); + } + void AddScalarFunction() { auto registry = GetFunctionRegistry(); @@ -1095,6 +1539,17 @@ class TestCallScalarFunction : public TestComputeInternals { ASSERT_OK(func->AddKernel({int32(), int32()}, int32(), ExecAddInt32)); ASSERT_OK(registry->AddFunction(func)); } + + void AddSelectiveScalarFunction() { + auto registry = GetFunctionRegistry(); + + auto func = std::make_shared("test_scalar_add_int32_selective", + Arity::Binary(), + /*doc=*/FunctionDoc::Empty()); + ASSERT_OK(func->AddKernel({int32(), int32()}, int32(), ExecAddInt32, + SelectiveExecAddInt32)); + ASSERT_OK(registry->AddFunction(func)); + } }; bool TestCallScalarFunction::initialized_ = false; @@ -1103,11 +1558,23 @@ class FunctionCaller { public: virtual ~FunctionCaller() = default; + virtual std::string name() const = 0; + + virtual Result Call(const std::vector& args, + std::shared_ptr selection, + const FunctionOptions* options = NULLPTR, + ExecContext* ctx = NULLPTR) const = 0; + virtual Result Call(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx = NULLPTR) = 0; + ExecContext* ctx = NULLPTR) const { + return Call(args, nullptr, options, ctx); + } + virtual Result Call(const std::vector& args, - ExecContext* ctx = NULLPTR) = 0; + ExecContext* ctx = NULLPTR) const { + return Call(args, /*options=*/nullptr, ctx); + } }; using FunctionCallerMaker = std::function>( @@ -1117,6 +1584,8 @@ class SimpleFunctionCaller : public FunctionCaller { public: explicit SimpleFunctionCaller(const std::string& func_name) : func_name(func_name) {} + std::string name() const override { return "simple_caller"; } + static Result> Make(const std::string& func_name) { return std::make_shared(func_name); } @@ -1126,13 +1595,13 @@ class SimpleFunctionCaller : public FunctionCaller { return Make(func_name); } - Result Call(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) override { + Result Call(const std::vector& args, + std::shared_ptr selection, + const FunctionOptions* options, ExecContext* ctx) const override { + ARROW_RETURN_IF(selection != nullptr, + Status::Invalid("Selection vector not supported")); return CallFunction(func_name, args, options, ctx); } - Result Call(const std::vector& args, ExecContext* ctx) override { - return CallFunction(func_name, args, ctx); - } std::string func_name; }; @@ -1142,6 +1611,8 @@ class ExecFunctionCaller : public FunctionCaller { explicit ExecFunctionCaller(std::shared_ptr func_exec) : func_exec(std::move(func_exec)) {} + std::string name() const override { return "exec_caller"; } + static Result> Make( const std::string& func_name, const std::vector& args, const FunctionOptions* options = nullptr, @@ -1165,235 +1636,673 @@ class ExecFunctionCaller : public FunctionCaller { return Make(func_name, std::move(in_types)); } - Result Call(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) override { + Result Call(const std::vector& args, + std::shared_ptr selection, + const FunctionOptions* options, ExecContext* ctx) const override { + ARROW_RETURN_IF(selection != nullptr, + Status::Invalid("Selection vector not supported")); ARROW_RETURN_NOT_OK(func_exec->Init(options, ctx)); return func_exec->Execute(args); } - Result Call(const std::vector& args, ExecContext* ctx) override { - return Call(args, nullptr, ctx); - } std::shared_ptr func_exec; }; -class TestCallScalarFunctionArgumentValidation : public TestCallScalarFunction { - protected: - void DoTest(FunctionCallerMaker caller_maker); -}; +// Call the function via expression with an optional selection vector. +class ExpressionFunctionCaller : public FunctionCaller { + public: + ExpressionFunctionCaller(std::string func_name, const std::vector& in_types) + : func_name_(std::move(func_name)) { + std::vector> fields(in_types.size()); + for (size_t i = 0; i < in_types.size(); ++i) { + fields[i] = field("arg" + std::to_string(i), in_types[i].GetSharedPtr()); + } + schema_ = schema(std::move(fields)); + } -void TestCallScalarFunctionArgumentValidation::DoTest(FunctionCallerMaker caller_maker) { - ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker("test_copy", {int32()})); + std::string name() const override { return "expression_caller"; } - // Copy accepts only a single array argument - Datum d1(GetInt32Array(10)); + static Result> Make(std::string func_name, + std::vector in_types) { + return std::make_shared(std::move(func_name), + std::move(in_types)); + } - // Too many args - std::vector args = {d1, d1}; - ASSERT_RAISES(Invalid, test_copy->Call(args)); + Result Call(const std::vector& args, + std::shared_ptr selection, + const FunctionOptions* options, ExecContext* ctx) const override { + bool all_same = false; + auto length = InferBatchLength(args, &all_same); + ExecBatch batch(args, length, std::move(selection)); + std::vector expr_args(args.size()); + for (int i = 0; i < static_cast(args.size()); ++i) { + expr_args[i] = field_ref(i); + } + Expression expr = + call(func_name_, std::move(expr_args), options ? options->Copy() : nullptr); + ARROW_ASSIGN_OR_RAISE(auto bound, expr.Bind(*schema_, ctx)); + return ExecuteScalarExpression(bound, batch, ctx); + } - // Too few - args = {}; - ASSERT_RAISES(Invalid, test_copy->Call(args)); + Result Call(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const override { + return Call(args, /*selection=*/nullptr, options, ctx); + } - // Cannot do scalar - Datum d1_scalar(std::make_shared(5)); - ASSERT_OK_AND_ASSIGN(auto result, test_copy->Call({d1})); - ASSERT_OK_AND_ASSIGN(result, test_copy->Call({d1_scalar})); -} + static Result> Maker(const std::string& func_name, + std::vector in_types) { + return Make(func_name, std::move(in_types)); + } -TEST_F(TestCallScalarFunctionArgumentValidation, SimpleCall) { - TestCallScalarFunctionArgumentValidation::DoTest(SimpleFunctionCaller::Maker); -} + private: + std::string func_name_; + std::shared_ptr schema_; +}; -TEST_F(TestCallScalarFunctionArgumentValidation, ExecCall) { - TestCallScalarFunctionArgumentValidation::DoTest(ExecFunctionCaller::Maker); -} +class TestCallScalarFunctionArgumentValidation : public TestCallScalarFunction {}; -class TestCallScalarFunctionPreallocationCases : public TestCallScalarFunction { - protected: - void DoTest(FunctionCallerMaker caller_maker); -}; +TEST_F(TestCallScalarFunctionArgumentValidation, Basic) { + for (const auto& caller_maker : {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker("test_copy", {int32()})); + ARROW_SCOPED_TRACE(test_copy->name()); + ResetContexts(); -void TestCallScalarFunctionPreallocationCases::DoTest(FunctionCallerMaker caller_maker) { - double null_prob = 0.2; + // Copy accepts only a single array argument + Datum d1(GetInt32Array(10)); - auto arr = GetUInt8Array(100, null_prob); + // Too many args + std::vector args = {d1, d1}; + ASSERT_RAISES(Invalid, test_copy->Call(args)); - auto CheckFunction = [&](std::shared_ptr test_copy) { - ResetContexts(); + // Too few + args = {}; + ASSERT_RAISES(Invalid, test_copy->Call(args)); + + // Cannot do scalar + Datum d1_scalar(std::make_shared(5)); + ASSERT_OK_AND_ASSIGN(auto result, test_copy->Call({d1})); + ASSERT_OK_AND_ASSIGN(result, test_copy->Call({d1_scalar})); + } +} +class TestCallScalarFunctionPreallocationCases : public TestCallScalarFunction { + protected: + std::shared_ptr GetTestArray() { return GetUInt8Array(100, 0.2); } + + std::vector> GetTestSelectionVectors() { + return {SelectionVectorFromJSON("[]"), + SelectionVectorFromJSON("[0]"), + SelectionVectorFromJSON("[42]"), + SelectionVectorFromJSON("[99]"), + SelectionVectorFromJSON("[0, 1, 2, 3, 4]"), + SelectionVectorFromJSON("[0, 42, 99]"), + MakeSelectionVectorTo(40), + MakeSelectionVectorTo(41), + MakeSelectionVectorTo(99), + MakeSelectionVectorTo(100)}; + } + + template + void DoTestBasic(const FunctionCaller* caller, const Array& input, + std::shared_ptr selection, CheckFunc&& check_func) { // The default should be a single array output { - std::vector args = {Datum(arr)}; - ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args)); - ASSERT_EQ(Datum::ARRAY, result.kind()); - AssertArraysEqual(*arr, *result.make_array()); + std::vector args = {Datum(input)}; + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection)); + check_func(result); } // Set the exec_chunksize to be smaller, so now we have several invocations // of the kernel, but still the output is one array { - std::vector args = {Datum(arr)}; + std::vector args = {Datum(input)}; exec_ctx_->set_exec_chunksize(80); - ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); - AssertArraysEqual(*arr, *result.make_array()); + ASSERT_OK_AND_ASSIGN( + Datum result, + caller->Call(args, selection, /*options=*/nullptr, exec_ctx_.get())); + check_func(result); } { // Chunksize not multiple of 8 - std::vector args = {Datum(arr)}; + std::vector args = {Datum(input)}; exec_ctx_->set_exec_chunksize(11); - ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); - AssertArraysEqual(*arr, *result.make_array()); + ASSERT_OK_AND_ASSIGN( + Datum result, + caller->Call(args, selection, /*options=*/nullptr, exec_ctx_.get())); + check_func(result); } + } + template + void DoTestChunked(const FunctionCaller* caller, const ChunkedArray& input, + std::shared_ptr selection, CheckFunc&& check_func) { // Input is chunked, output has one big chunk - { - auto carr = - std::make_shared(ArrayVector{arr->Slice(0, 10), arr->Slice(10)}); - std::vector args = {Datum(carr)}; - ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); - std::shared_ptr actual = result.chunked_array(); - ASSERT_EQ(1, actual->num_chunks()); - AssertChunkedEquivalent(*carr, *actual); - } + std::vector args = {Datum(input)}; + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection, /*options=*/nullptr, + exec_ctx_.get())); + check_func(result); + } + template + void DoTestIndependentPreallocate(const FunctionCaller* caller, int64_t exec_chunksize, + const Array& input, + std::shared_ptr selection, + CheckFunc&& check_func) { // Preallocate independently for each batch - { - std::vector args = {Datum(arr)}; - exec_ctx_->set_preallocate_contiguous(false); - exec_ctx_->set_exec_chunksize(40); - ASSERT_OK_AND_ASSIGN(Datum result, test_copy->Call(args, exec_ctx_.get())); - ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); - const ChunkedArray& carr = *result.chunked_array(); - ASSERT_EQ(3, carr.num_chunks()); - AssertArraysEqual(*arr->Slice(0, 40), *carr.chunk(0)); - AssertArraysEqual(*arr->Slice(40, 40), *carr.chunk(1)); - AssertArraysEqual(*arr->Slice(80), *carr.chunk(2)); + std::vector args = {Datum(input)}; + exec_ctx_->set_preallocate_contiguous(false); + exec_ctx_->set_exec_chunksize(exec_chunksize); + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection, /*options=*/nullptr, + exec_ctx_.get())); + check_func(result); + } +}; + +TEST_F(TestCallScalarFunctionPreallocationCases, Basic) { + auto arr = GetTestArray(); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + for (const auto& caller_maker : + {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker(name, {uint8()})); + ARROW_SCOPED_TRACE(test_copy->name()); + ResetContexts(); + + DoTestBasic(test_copy.get(), *arr, /*selection=*/nullptr, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqual(*arr, *result.make_array()); + }); } - }; + } +} - ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker("test_copy", {uint8()})); - CheckFunction(test_copy); - ASSERT_OK_AND_ASSIGN(auto test_copy_computed_bitmap, - caller_maker("test_copy_computed_bitmap", {uint8()})); - CheckFunction(test_copy_computed_bitmap); +TEST_F(TestCallScalarFunctionPreallocationCases, BasicSelectiveSparse) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : + {"test_copy_selective", "test_copy_computed_bitmap_selective"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(test_copy.get(), *arr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualSparseWithSelection(*arr, selection_span, *result.make_array()); + }); + } + } } -TEST_F(TestCallScalarFunctionPreallocationCases, SimpleCaller) { - TestCallScalarFunctionPreallocationCases::DoTest(SimpleFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionPreallocationCases, BasicSelectiveDense) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(test_copy.get(), *arr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualDenseWithSelection(*arr, selection_span, *result.make_array()); + }); + } + } } -TEST_F(TestCallScalarFunctionPreallocationCases, ExecCaller) { - TestCallScalarFunctionPreallocationCases::DoTest(ExecFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionPreallocationCases, Chunked) { + auto arr = GetTestArray(); + auto carr = + std::make_shared(ArrayVector{arr->Slice(0, 10), arr->Slice(10)}); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + for (const auto& caller_maker : + {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker(name, {uint8()})); + ARROW_SCOPED_TRACE(test_copy->name()); + ResetContexts(); + + DoTestChunked(test_copy.get(), *carr, /*selection=*/nullptr, + [&](const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + std::shared_ptr actual = result.chunked_array(); + ASSERT_EQ(1, actual->num_chunks()); + AssertChunkedEquivalent(*carr, *actual); + }); + } + } } +TEST_F(TestCallScalarFunctionPreallocationCases, ChunkedSelectiveSparse) { + auto arr = GetTestArray(); + auto carr = + std::make_shared(ArrayVector{arr->Slice(0, 10), arr->Slice(10)}); + auto selections = GetTestSelectionVectors(); + for (const auto& name : + {"test_copy_selective", "test_copy_computed_bitmap_selective"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestChunked(test_copy.get(), *carr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + std::shared_ptr actual = result.chunked_array(); + ASSERT_EQ(1, actual->num_chunks()); + AssertArraysEqualSparseWithSelection(*arr, selection_span, *actual->chunk(0)); + }); + } + } +} + +TEST_F(TestCallScalarFunctionPreallocationCases, ChunkedSelectiveDense) { + auto arr = GetTestArray(); + auto carr = + std::make_shared(ArrayVector{arr->Slice(0, 10), arr->Slice(10)}); + auto selections = GetTestSelectionVectors(); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestChunked(test_copy.get(), *carr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + std::shared_ptr actual = result.chunked_array(); + ASSERT_EQ(1, actual->num_chunks()); + AssertArraysEqualDenseWithSelection(*arr, selection_span, *actual->chunk(0)); + }); + } + } +} + +TEST_F(TestCallScalarFunctionPreallocationCases, IndependentPreallocate) { + auto arr = GetTestArray(); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + for (const auto& caller_maker : + {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_copy, caller_maker(name, {uint8()})); + ARROW_SCOPED_TRACE(test_copy->name()); + ResetContexts(); + + DoTestIndependentPreallocate( + test_copy.get(), /*exec_chunksize=*/40, *arr, /*selection=*/nullptr, + [&](const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + const ChunkedArray& carr = *result.chunked_array(); + ASSERT_EQ(3, carr.num_chunks()); + AssertArraysEqual(*arr->Slice(0, 40), *carr.chunk(0)); + AssertArraysEqual(*arr->Slice(40, 40), *carr.chunk(1)); + AssertArraysEqual(*arr->Slice(80), *carr.chunk(2)); + }); + } + } +} + +TEST_F(TestCallScalarFunctionPreallocationCases, IndependentPreallocateSelectiveSparse) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : + {"test_copy_selective", "test_copy_computed_bitmap_selective"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + const int64_t exec_chunksize = 40; + ResetContexts(); + + DoTestIndependentPreallocate(test_copy.get(), exec_chunksize, *arr, selection, + [&](const Datum& result) { + AssertChunkedExecResultsEqualSparseWithSelection( + exec_chunksize, *arr, selection.get(), result); + }); + } + } +} + +TEST_F(TestCallScalarFunctionPreallocationCases, IndependentPreallocateSelectiveDense) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : {"test_copy", "test_copy_computed_bitmap"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_copy, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + const int64_t exec_chunksize = 40; + ResetContexts(); + + DoTestIndependentPreallocate(test_copy.get(), exec_chunksize, *arr, selection, + [&](const Datum& result) { + AssertChunkedExecResultsEqualDenseWithSelection( + exec_chunksize, *arr, selection.get(), result); + }); + } + } +} + +// Test a handful of cases +// +// * Validity bitmap computed by kernel rather than using PropagateNulls +// * Data not pre-allocated +// * Validity bitmap not pre-allocated class TestCallScalarFunctionBasicNonStandardCases : public TestCallScalarFunction { protected: - void DoTest(FunctionCallerMaker caller_maker); -}; + std::shared_ptr GetTestArray() { return GetUInt8Array(1000, 0.2); } -void TestCallScalarFunctionBasicNonStandardCases::DoTest( - FunctionCallerMaker caller_maker) { - // Test a handful of cases - // - // * Validity bitmap computed by kernel rather than using PropagateNulls - // * Data not pre-allocated - // * Validity bitmap not pre-allocated + std::vector> GetTestSelectionVectors() { + return {SelectionVectorFromJSON("[]"), SelectionVectorFromJSON("[0]"), + SelectionVectorFromJSON("[999]"), MakeSelectionVectorTo(400), + MakeSelectionVectorTo(401), MakeSelectionVectorTo(1000)}; + } - double null_prob = 0.2; + template + void DoTestBasic(const FunctionCaller* caller, const Array& input, + std::shared_ptr selection, CheckFunc&& check_func) { + // The default should be a single array output + std::vector args = {Datum(input)}; + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection)); + check_func(result); + } - auto arr = GetUInt8Array(1000, null_prob); - std::vector args = {Datum(arr)}; + template + void DoTestSplitExecution(const FunctionCaller* caller, int64_t exec_chunksize, + const Array& input, + std::shared_ptr selection, + CheckFunc&& check_func) { + // Split execution into several chunks + std::vector args = {Datum(input)}; + exec_ctx_->set_exec_chunksize(exec_chunksize); + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection, /*options=*/nullptr, + exec_ctx_.get())); + check_func(result); + } +}; - auto CheckFunction = [&](std::shared_ptr test_nopre) { - ResetContexts(); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, Basic) { + auto arr = GetTestArray(); + for (const auto& name : {"test_nopre_data", "test_nopre_validity_or_data"}) { + ARROW_SCOPED_TRACE(name); + for (const auto& caller_maker : + {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_nopre, caller_maker(name, {uint8()})); + ARROW_SCOPED_TRACE(test_nopre->name()); + ResetContexts(); + + DoTestBasic(test_nopre.get(), *arr, /*selection=*/nullptr, + [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqual(*arr, *result.make_array(), /*verbose=*/true); + }); + } + } +} - // The default should be a single array output - { - ASSERT_OK_AND_ASSIGN(Datum result, test_nopre->Call(args)); - AssertArraysEqual(*arr, *result.make_array(), true); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, BasicSelectiveSparse) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : + {"test_nopre_data_selective", "test_nopre_validity_or_data_selective"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_nopre, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(test_nopre.get(), *arr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualSparseWithSelection(*arr, selection_span, *result.make_array()); + }); } + } +} - // Split execution into 3 chunks - { - exec_ctx_->set_exec_chunksize(400); - ASSERT_OK_AND_ASSIGN(Datum result, test_nopre->Call(args, exec_ctx_.get())); - ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); - const ChunkedArray& carr = *result.chunked_array(); - ASSERT_EQ(3, carr.num_chunks()); - AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0)); - AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1)); - AssertArraysEqual(*arr->Slice(800), *carr.chunk(2)); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, BasicSelectiveDense) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : {"test_nopre_data", "test_nopre_validity_or_data"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_nopre, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(test_nopre.get(), *arr, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualDenseWithSelection(*arr, selection_span, *result.make_array()); + }); } - }; + } +} - ASSERT_OK_AND_ASSIGN(auto test_nopre_data, caller_maker("test_nopre_data", {uint8()})); - CheckFunction(test_nopre_data); - ASSERT_OK_AND_ASSIGN(auto test_nopre_validity_or_data, - caller_maker("test_nopre_validity_or_data", {uint8()})); - CheckFunction(test_nopre_validity_or_data); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, SplitExecution) { + auto arr = GetTestArray(); + for (const auto& name : {"test_nopre_data", "test_nopre_validity_or_data"}) { + ARROW_SCOPED_TRACE(name); + for (const auto& caller_maker : + {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_nopre, caller_maker(name, {uint8()})); + ARROW_SCOPED_TRACE(test_nopre->name()); + ResetContexts(); + + DoTestSplitExecution(test_nopre.get(), /*exec_chunksize=*/400, *arr, + /*selection=*/nullptr, [&](const Datum& result) { + ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind()); + const ChunkedArray& carr = *result.chunked_array(); + ASSERT_EQ(3, carr.num_chunks()); + AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0)); + AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1)); + AssertArraysEqual(*arr->Slice(800), *carr.chunk(2)); + }); + } + } } -TEST_F(TestCallScalarFunctionBasicNonStandardCases, SimpleCall) { - TestCallScalarFunctionBasicNonStandardCases::DoTest(SimpleFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, SplitExecutionSelectiveSparse) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : + {"test_nopre_data_selective", "test_nopre_validity_or_data_selective"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_nopre, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + const int64_t exec_chunksize = 400; + ResetContexts(); + + DoTestSplitExecution(test_nopre.get(), exec_chunksize, *arr, selection, + [&](const Datum& result) { + AssertChunkedExecResultsEqualSparseWithSelection( + exec_chunksize, *arr, selection.get(), result); + }); + } + } } -TEST_F(TestCallScalarFunctionBasicNonStandardCases, ExecCall) { - TestCallScalarFunctionBasicNonStandardCases::DoTest(ExecFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionBasicNonStandardCases, SplitExecutionSelectiveDense) { + auto arr = GetTestArray(); + auto selections = GetTestSelectionVectors(); + for (const auto& name : {"test_nopre_data", "test_nopre_validity_or_data"}) { + ARROW_SCOPED_TRACE(name); + ASSERT_OK_AND_ASSIGN(auto test_nopre, + ExpressionFunctionCaller::Maker(name, {uint8()})); + for (const auto& selection : selections) { + const int64_t exec_chunksize = 400; + ResetContexts(); + + DoTestSplitExecution(test_nopre.get(), exec_chunksize, *arr, selection, + [&](const Datum& result) { + AssertChunkedExecResultsEqualDenseWithSelection( + exec_chunksize, *arr, selection.get(), result); + }); + } + } } class TestCallScalarFunctionStatefulKernel : public TestCallScalarFunction { protected: - void DoTest(FunctionCallerMaker caller_maker); -}; + std::shared_ptr GetTestArray() { + return ArrayFromJSON(int32(), "[1, 2, 3, null, 5]"); + } + + static constexpr int32_t kMultiplier = 2; + + std::shared_ptr GetExpected() { + return ArrayFromJSON(int32(), "[2, 4, 6, null, 10]"); + } -void TestCallScalarFunctionStatefulKernel::DoTest(FunctionCallerMaker caller_maker) { - ASSERT_OK_AND_ASSIGN(auto test_stateful, caller_maker("test_stateful", {int32()})); + std::vector> GetTestSelectionVectors() { + return {SelectionVectorFromJSON("[]"), SelectionVectorFromJSON("[0]"), + SelectionVectorFromJSON("[4]"), MakeSelectionVectorTo(2), + MakeSelectionVectorTo(5)}; + } - auto input = ArrayFromJSON(int32(), "[1, 2, 3, null, 5]"); - auto multiplier = std::make_shared(2); - auto expected = ArrayFromJSON(int32(), "[2, 4, 6, null, 10]"); + template + void DoTestBasic(const FunctionCaller* caller, const Array& input, + std::shared_ptr multiplier, + std::shared_ptr selection, CheckFunc&& check_func) { + ExampleOptions options(multiplier); + std::vector args = {Datum(input)}; + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, selection, &options)); + check_func(result); + } +}; - ExampleOptions options(multiplier); - std::vector args = {Datum(input)}; - ASSERT_OK_AND_ASSIGN(Datum result, test_stateful->Call(args, &options)); - AssertArraysEqual(*expected, *result.make_array()); +TEST_F(TestCallScalarFunctionStatefulKernel, Basic) { + auto input = GetTestArray(); + auto multiplier = std::make_shared(kMultiplier); + auto expected = GetExpected(); + for (const auto& caller_maker : {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_stateful, caller_maker("test_stateful", {int32()})); + ARROW_SCOPED_TRACE(test_stateful->name()); + ResetContexts(); + + DoTestBasic( + test_stateful.get(), *input, multiplier, /*selection=*/nullptr, + [&](const Datum& result) { AssertArraysEqual(*expected, *result.make_array()); }); + } } -TEST_F(TestCallScalarFunctionStatefulKernel, Simplecall) { - TestCallScalarFunctionStatefulKernel::DoTest(SimpleFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionStatefulKernel, BasicSelectiveSparse) { + auto input = GetTestArray(); + auto multiplier = std::make_shared(kMultiplier); + auto selections = GetTestSelectionVectors(); + auto expected = GetExpected(); + ASSERT_OK_AND_ASSIGN( + auto caller, ExpressionFunctionCaller::Maker("test_stateful_selective", {int32()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(caller.get(), *input, multiplier, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualSparseWithSelection(*expected, selection_span, + *result.make_array()); + }); + } } -TEST_F(TestCallScalarFunctionStatefulKernel, ExecCall) { - TestCallScalarFunctionStatefulKernel::DoTest(ExecFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionStatefulKernel, BasicSelectiveDense) { + auto input = GetTestArray(); + auto multiplier = std::make_shared(kMultiplier); + auto selections = GetTestSelectionVectors(); + auto expected = GetExpected(); + ASSERT_OK_AND_ASSIGN(auto caller, + ExpressionFunctionCaller::Maker("test_stateful", {int32()})); + for (const auto& selection : selections) { + SelectionVectorSpan selection_span(selection->indices(), selection->length()); + ResetContexts(); + + DoTestBasic(caller.get(), *input, multiplier, selection, [&](const Datum& result) { + ASSERT_EQ(Datum::ARRAY, result.kind()); + AssertArraysEqualDenseWithSelection(*expected, selection_span, + *result.make_array()); + }); + } } class TestCallScalarFunctionScalarFunction : public TestCallScalarFunction { protected: - void DoTest(FunctionCallerMaker caller_maker); -}; + std::vector GetTestArgs() { + return {Datum(std::make_shared(5)), + Datum(std::make_shared(7))}; + } -void TestCallScalarFunctionScalarFunction::DoTest(FunctionCallerMaker caller_maker) { - ASSERT_OK_AND_ASSIGN(auto test_scalar_add_int32, - caller_maker("test_scalar_add_int32", {int32(), int32()})); + static constexpr int32_t kExpectedResult = 12; + + std::vector> GetTestSelectionVectors() { + return {SelectionVectorFromJSON("[]"), SelectionVectorFromJSON("[0]"), + SelectionVectorFromJSON("[0, 42]")}; + } - std::vector args = {Datum(std::make_shared(5)), - Datum(std::make_shared(7))}; - ASSERT_OK_AND_ASSIGN(Datum result, test_scalar_add_int32->Call(args)); - ASSERT_EQ(Datum::SCALAR, result.kind()); + // Despite the existence of a selection vector, the result is always a scalar when all + // arguments are scalars. + void DoTestBasic(const FunctionCaller* caller, const std::vector& args, + std::shared_ptr selection) { + ASSERT_OK_AND_ASSIGN(Datum result, caller->Call(args, std::move(selection))); + ASSERT_EQ(Datum::SCALAR, result.kind()); - auto expected = std::make_shared(12); - ASSERT_TRUE(expected->Equals(*result.scalar())); + auto expected = std::make_shared(kExpectedResult); + ASSERT_TRUE(expected->Equals(*result.scalar())); + } +}; + +TEST_F(TestCallScalarFunctionScalarFunction, Basic) { + auto args = GetTestArgs(); + for (const auto& caller_maker : {SimpleFunctionCaller::Maker, ExecFunctionCaller::Maker, + ExpressionFunctionCaller::Maker}) { + ASSERT_OK_AND_ASSIGN(auto test_scalar_add_int32, + caller_maker("test_scalar_add_int32", {int32(), int32()})); + ARROW_SCOPED_TRACE(test_scalar_add_int32->name()); + ResetContexts(); + + DoTestBasic(test_scalar_add_int32.get(), args, /*selection=*/nullptr); + } } -TEST_F(TestCallScalarFunctionScalarFunction, SimpleCall) { - TestCallScalarFunctionScalarFunction::DoTest(SimpleFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionScalarFunction, BasicSelectiveSparse) { + auto args = GetTestArgs(); + auto selections = GetTestSelectionVectors(); + ASSERT_OK_AND_ASSIGN(auto test_scalar_add_int32, + ExpressionFunctionCaller::Maker("test_scalar_add_int32_selective", + {int32(), int32()})); + for (const auto& selection : selections) { + ResetContexts(); + + DoTestBasic(test_scalar_add_int32.get(), GetTestArgs(), selection); + } } -TEST_F(TestCallScalarFunctionScalarFunction, ExecCall) { - TestCallScalarFunctionScalarFunction::DoTest(ExecFunctionCaller::Maker); +TEST_F(TestCallScalarFunctionScalarFunction, BasicSelectiveDense) { + auto args = GetTestArgs(); + auto selections = GetTestSelectionVectors(); + ASSERT_OK_AND_ASSIGN( + auto test_scalar_add_int32, + ExpressionFunctionCaller::Maker("test_scalar_add_int32", {int32(), int32()})); + for (const auto& selection : selections) { + ResetContexts(); + + DoTestBasic(test_scalar_add_int32.get(), GetTestArgs(), selection); + } } TEST(Ordering, IsSuborderOf) { diff --git a/cpp/src/arrow/compute/expression.cc b/cpp/src/arrow/compute/expression.cc index 3c2ec100402..d1b512f705e 100644 --- a/cpp/src/arrow/compute/expression.cc +++ b/cpp/src/arrow/compute/expression.cc @@ -770,12 +770,19 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i } int64_t input_length; + std::shared_ptr input_selection_vector = nullptr; if (!arguments.empty() && all_scalar) { // all inputs are scalar, so use a 1-long batch to avoid // computing input.length equivalent outputs input_length = 1; } else { input_length = input.length; + input_selection_vector = input.selection_vector; +#ifndef NDEBUG + if (input_selection_vector) { + RETURN_NOT_OK(input_selection_vector->Validate(input.length)); + } +#endif } auto executor = compute::detail::KernelExecutor::MakeScalar(); @@ -789,7 +796,8 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i RETURN_NOT_OK(executor->Init(&kernel_context, {kernel, types, options})); compute::detail::DatumAccumulator listener; - RETURN_NOT_OK(executor->Execute(ExecBatch(arguments, input_length), &listener)); + RETURN_NOT_OK(executor->Execute( + ExecBatch(arguments, input_length, std::move(input_selection_vector)), &listener)); const auto out = executor->WrapResults(arguments, listener.values()); #ifndef NDEBUG DCHECK_OK(executor->CheckResultType(out, call->function_name.c_str())); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index b0b12a690f8..b9c557aed27 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -412,6 +412,14 @@ Status Function::Validate() const { Status ScalarFunction::AddKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init, std::shared_ptr constraint) { + return AddKernel(std::move(in_types), std::move(out_type), std::move(exec), + /*selective_exec=*/nullptr, std::move(init), std::move(constraint)); +} + +Status ScalarFunction::AddKernel(std::vector in_types, OutputType out_type, + ArrayKernelExec exec, + ArrayKernelSelectiveExec selective_exec, KernelInit init, + std::shared_ptr constraint) { RETURN_NOT_OK(CheckArity(in_types.size())); if (arity_.is_varargs && in_types.size() != 1) { @@ -419,7 +427,7 @@ Status ScalarFunction::AddKernel(std::vector in_types, OutputType out } auto sig = KernelSignature::Make(std::move(in_types), std::move(out_type), arity_.is_varargs, std::move(constraint)); - kernels_.emplace_back(std::move(sig), exec, init); + kernels_.emplace_back(std::move(sig), exec, selective_exec, init); return Status::OK(); } diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 399081e2a73..96845219654 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -304,13 +304,21 @@ class ARROW_EXPORT ScalarFunction : public detail::FunctionImpl { std::move(doc), default_options), is_pure_(is_pure) {} - /// \brief Add a kernel with given input/output types, no required state - /// initialization, preallocation for fixed-width types, and default null - /// handling (intersect validity bitmaps of inputs). + /// \brief Add a kernel with given input/output types and exec API, no selective exec + /// API, no required state initialization, preallocation for fixed-width types, and + /// default null handling (intersect validity bitmaps of inputs). Status AddKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR, std::shared_ptr constraint = NULLPTR); + /// \brief Add a kernel with given input/output types, exec and selective exec APIs, no + /// required state initialization, preallocation for fixed-width types, and default null + /// handling (intersect validity bitmaps of inputs). + Status AddKernel(std::vector in_types, OutputType out_type, + ArrayKernelExec exec, ArrayKernelSelectiveExec selective_exec, + KernelInit init = NULLPTR, + std::shared_ptr constraint = NULLPTR); + /// \brief Add a kernel (function implementation). Returns error if the /// kernel's signature does not match the function's arity. Status AddKernel(ScalarKernel kernel); diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0d4f9d6ff43..1a76414f484 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -555,19 +555,39 @@ struct ARROW_EXPORT Kernel { /// employed this may not be possible. using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*); +/// \brief The optional scalar kernel selective execution API for SCALAR kernel types. +/// It's like ArrayKernelExec but with an additional SelectionVectorSpan argument. When a +/// selection vector is specified in the batch, this API will be preferred, if provided, +/// over ArrayKernelExec. +using ArrayKernelSelectiveExec = Status (*)(KernelContext*, const ExecSpan&, + const SelectionVectorSpan&, ExecResult*); + /// \brief Kernel data structure for implementations of ScalarFunction. In /// addition to the members found in Kernel, contains the null handling /// and memory pre-allocation preferences. struct ARROW_EXPORT ScalarKernel : public Kernel { ScalarKernel() = default; + ScalarKernel(std::shared_ptr sig, ArrayKernelExec exec, + ArrayKernelSelectiveExec selective_exec, KernelInit init = NULLPTR) + : Kernel(std::move(sig), std::move(init)), + exec(std::move(exec)), + selective_exec(std::move(selective_exec)) {} + + ScalarKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, + ArrayKernelSelectiveExec selective_exec, KernelInit init = NULLPTR) + : Kernel(std::move(in_types), std::move(out_type), std::move(init)), + exec(std::move(exec)), + selective_exec(std::move(selective_exec)) {} + ScalarKernel(std::shared_ptr sig, ArrayKernelExec exec, KernelInit init = NULLPTR) - : Kernel(std::move(sig), init), exec(exec) {} + : ScalarKernel(std::move(sig), std::move(exec), NULLPTR, std::move(init)) {} ScalarKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, KernelInit init = NULLPTR) - : Kernel(std::move(in_types), std::move(out_type), std::move(init)), exec(exec) {} + : ScalarKernel(std::move(in_types), std::move(out_type), std::move(exec), NULLPTR, + std::move(init)) {} /// \brief Perform a single invocation of this kernel. Depending on the /// implementation, it may only write into preallocated memory, while in some @@ -575,6 +595,13 @@ struct ARROW_EXPORT ScalarKernel : public Kernel { /// through the KernelContext. ArrayKernelExec exec; + /// \brief Optional and similar to `exec`, but providing a specialized implementation + /// that takes a selection vector argument and performs the computation only on the + /// selected indices. When this specialized kernel is not provided we fallback to + /// logic that gathers all selected values into a dense array, call `exec` on it + /// and then scather the values on the output array. + ArrayKernelSelectiveExec selective_exec; + /// \brief Writing execution results into larger contiguous allocations /// requires that the kernel be able to write into sliced output ArrayData*, /// including sliced output validity bitmaps. Some kernel implementations may diff --git a/cpp/src/arrow/compute/test_util_internal.cc b/cpp/src/arrow/compute/test_util_internal.cc index 219028943cb..6407799399c 100644 --- a/cpp/src/arrow/compute/test_util_internal.cc +++ b/cpp/src/arrow/compute/test_util_internal.cc @@ -24,6 +24,7 @@ #include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/table.h" +#include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" #include "arrow/util/logging_internal.h" @@ -120,4 +121,15 @@ void ValidateOutput(const Datum& output) { } } +std::shared_ptr SelectionVectorFromJSON(const std::string& json) { + return std::make_shared(*ArrayFromJSON(int32(), json)); +} + +std::shared_ptr MakeSelectionVectorTo(int64_t length) { + auto res = gen::Step()->Generate(length); + DCHECK_OK(res.status()); + auto arr = res.ValueUnsafe(); + return std::make_shared(*arr); +} + } // namespace arrow::compute diff --git a/cpp/src/arrow/compute/test_util_internal.h b/cpp/src/arrow/compute/test_util_internal.h index 6a172b07692..b7f4e126199 100644 --- a/cpp/src/arrow/compute/test_util_internal.h +++ b/cpp/src/arrow/compute/test_util_internal.h @@ -39,4 +39,8 @@ ExecBatch ExecBatchFromJSON(const std::vector& types, void ValidateOutput(const Datum& output); +std::shared_ptr SelectionVectorFromJSON(const std::string& json); + +std::shared_ptr MakeSelectionVectorTo(int64_t length); + } // namespace arrow::compute