Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ add_arrow_compute_test(row_test
EXTRA_LINK_LIBS
arrow_compute_testing)

add_arrow_compute_benchmark(exec_benchmark)

add_arrow_compute_benchmark(function_benchmark)

add_subdirectory(kernels)
Expand Down
204 changes: 176 additions & 28 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "arrow/util/logging_internal.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"
#include "arrow/visit_data_inline.h"

namespace arrow {

Expand Down Expand Up @@ -359,6 +360,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
have_all_scalars_ = CheckIfAllScalar(batch);
promote_if_all_scalars_ = promote_if_all_scalars;
position_ = 0;
selection_position_ = 0;
length_ = batch.length;
chunk_indexes_.clear();
chunk_indexes_.resize(args_->size(), 0);
Expand All @@ -367,6 +369,12 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
value_offsets_.clear();
value_offsets_.resize(args_->size(), 0);
max_chunksize_ = std::min(length_, max_chunksize);
selection_vector_ = batch.selection_vector.get();
if (selection_vector_) {
selection_length_ = selection_vector_->length();
} else {
selection_length_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's less confusing if, without a selection, the "length of the selection" be the length of the whole array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I may see it otherwise.

The naming of the three selection_*_ members implies they are tightly coupled (with selection_vector_ being the "leader"). If selection_vector_ is null, then the value of selection_length_ makes no sense, then 0 is more close to the meaning of "nonsense" (less than -1 though) I guess?

}
return Status::OK();
}

Expand Down Expand Up @@ -403,7 +411,7 @@ int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* spa
return iteration_size;
}

bool ExecSpanIterator::Next(ExecSpan* span) {
bool ExecSpanIterator::Next(ExecSpan* span, SelectionVectorSpan* selection_span) {
if (!initialized_) {
span->length = 0;

Expand Down Expand Up @@ -442,6 +450,13 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
PromoteExecSpanScalars(span);
}

if (!have_all_scalars_ || promote_if_all_scalars_) {
if (selection_vector_) {
DCHECK_NE(selection_span, nullptr);
*selection_span = SelectionVectorSpan(selection_vector_->indices());
}
}

initialized_ = true;
} else if (position_ == length_) {
// We've emitted at least one span and we're at the end so we are done
Expand All @@ -465,6 +480,20 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
}
}

// Then the selection span
if (selection_vector_) {
DCHECK_NE(selection_span, nullptr);
auto indices_begin = selection_vector_->indices() + selection_position_;
auto indices_end = selection_vector_->indices() + selection_vector_->length();
DCHECK_LE(indices_begin, indices_end);
auto indices_limit = std::lower_bound(
indices_begin, indices_end, static_cast<int32_t>(position_ + iteration_size));
int64_t num_indices = indices_limit - indices_begin;
selection_span->SetSlice(selection_position_, num_indices,
static_cast<int32_t>(position_));
selection_position_ += num_indices;
}

position_ += iteration_size;
DCHECK_LE(position_, length_);
return true;
Expand Down Expand Up @@ -694,7 +723,14 @@ std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
// Skip empty chunks
continue;
}
arrays.emplace_back(val.make_array());
if (val.is_chunked_array()) {
for (const auto& chunk : val.chunked_array()->chunks()) {
arrays.emplace_back(chunk);
}
} else {
DCHECK(val.is_array());
arrays.emplace_back(val.make_array());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated. Why is it necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, as a quite independent free function, I think it's no harm to extend it a little bit to support chunked array?

}
return std::make_shared<ChunkedArray>(std::move(arrays), type.GetSharedPtr());
}
Expand Down Expand Up @@ -781,17 +817,45 @@ class KernelExecutorImpl : public KernelExecutor {
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
public:
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));

if (batch.length == 0) {
// For zero-length batches, we do nothing except return a zero-length
// array of the correct output type
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
MakeArrayOfNull(output_type_.GetSharedPtr(), /*length=*/0,
exec_context()->memory_pool()));
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
return EmitResult(result->data(), listener);
}

if (batch.selection_vector && !kernel_->selective_exec) {
// If the batch contains a selection vector but the kernel does not support
// selective execution, we need to execute the batch in a "dense" manner.
return ExecuteSelectiveDense(batch, listener);
}

return ExecuteBatch(batch, listener);
}

Datum WrapResults(const std::vector<Datum>& inputs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should be in the protected: section as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's override of a public method of its parent class KernelExecutor::WrapResults().

const std::vector<Datum>& outputs) override {
// If execution yielded multiple chunks (because large arrays were split
// based on the ExecContext parameters, then the result is a ChunkedArray
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
return ToChunkedArray(outputs, output_type_);
} else {
// Outputs have just one element
return outputs[0];
}
}

protected:
// Execute a single batch either non-selectively (batch doesn't contain a selection
// vector) or selectively (kernel supports selective execution).
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
DCHECK(!batch.selection_vector || kernel_->selective_exec);

RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));

// If the executor is configured to produce a single large Array output for
// kernels supporting preallocation, then we do so up front and then
// iterate over slices of that large array. Otherwise, we preallocate prior
Expand All @@ -811,19 +875,46 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
}
}

Datum WrapResults(const std::vector<Datum>& inputs,
const std::vector<Datum>& outputs) override {
// If execution yielded multiple chunks (because large arrays were split
// based on the ExecContext parameters, then the result is a ChunkedArray
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
return ToChunkedArray(outputs, output_type_);
} else {
// Outputs have just one element
return outputs[0];
// Execute a single batch with a selection vector "densely" for a kernel that doesn't
// support selective execution. "Densely" here means that we first gather the rows
// indicated by the selection vector into a contiguous ExecBatch, execute that, and
// then scatter the result back to the original row positions in the output.
Status ExecuteSelectiveDense(const ExecBatch& batch, ExecListener* listener) {
DCHECK(batch.selection_vector && !kernel_->selective_exec);

if (CheckIfAllScalar(batch)) {
// For all-scalar batch, we can skip the gather/scatter steps as if there is no
// selection vector - the result is a scalar anyway.
ExecBatch input = batch;
input.selection_vector = nullptr;
return ExecuteBatch(input, listener);
}

std::vector<Datum> values(batch.num_values());
for (int i = 0; i < batch.num_values(); ++i) {
if (batch[i].is_scalar()) {
// XXX: Skip gather for scalars since it is not currently supported by Take.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not taking from the scalar is that it's not necessary, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's not necessary. But the drawback is we lose the ability to uniformly call Take on any Datum - have to make sure it's not scalar and go through a special path, like here, for scalar.

I think maybe we can simply return the scalar as is for Take (to allow the uniform invoking on arbitrary Datum). Or we insist that taking scalar makes no sense and we do special checks everywhere.

values[i] = batch[i];
} else {
ARROW_ASSIGN_OR_RAISE(values[i],
Take(batch[i], *batch.selection_vector->data(),
TakeOptions{/*boundcheck=*/false}, exec_context()));
}
}
ARROW_ASSIGN_OR_RAISE(
ExecBatch input,
ExecBatch::Make(std::move(values), batch.selection_vector->length()));

DatumAccumulator dense_listener;
RETURN_NOT_OK(ExecuteBatch(input, &dense_listener));
Datum dense_result = WrapResults(input.values, dense_listener.values());
Comment on lines +908 to +910
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to actually obtain the "dense" result datum (as opposed to passing them into the eventual listener), to call scatter on. Thus we need to call WrapResults to fully reuse the existing result obtaining logic. This applies one extra round of "faking a chunked array". Then in the final WrapResults called by the caller (function executor or expression evaluation), we may see an already faked chunked array.


ARROW_ASSIGN_OR_RAISE(auto result,
Scatter(dense_result, *batch.selection_vector->data(),
ScatterOptions{/*max_index=*/batch.length - 1}));
return listener->OnResult(std::move(result));
}

protected:
Status EmitResult(std::shared_ptr<ArrayData> out, ExecListener* listener) {
if (span_iterator_.have_all_scalars()) {
// ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to
Expand All @@ -842,6 +933,11 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// eventually skip the creation of ArrayData altogether
std::shared_ptr<ArrayData> preallocation;
ExecSpan input;
SelectionVectorSpan selection;
SelectionVectorSpan* selection_ptr = nullptr;
if (span_iterator_.have_selection_vector()) {
selection_ptr = &selection;
}
ExecResult output;
ArraySpan* output_span = output.array_span_mutable();

Expand All @@ -853,10 +949,10 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
output_span->SetMembers(*preallocation);
output_span->offset = 0;
int64_t result_offset = 0;
while (span_iterator_.Next(&input)) {
while (span_iterator_.Next(&input, selection_ptr)) {
// Set absolute output span position and length
output_span->SetSlice(result_offset, input.length);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
result_offset = span_iterator_.position();
}

Expand All @@ -866,18 +962,19 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// Fully preallocating, but not contiguously
// We preallocate (maybe) only for the output of processing the current
// chunk
while (span_iterator_.Next(&input)) {
while (span_iterator_.Next(&input, selection_ptr)) {
ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(input.length));
output_span->SetMembers(*preallocation);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
RETURN_NOT_OK(ExecuteSingleSpan(input, selection_ptr, &output));
// Emit the result for this chunk
RETURN_NOT_OK(EmitResult(std::move(preallocation), listener));
}
return Status::OK();
}
}

Status ExecuteSingleSpan(const ExecSpan& input, ExecResult* out) {
Status ExecuteSingleSpan(const ExecSpan& input, const SelectionVectorSpan* selection,
ExecResult* out) {
ArraySpan* result_span = out->array_span_mutable();
if (output_type_.type->id() == Type::NA) {
result_span->null_count = result_span->length;
Expand All @@ -888,7 +985,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
result_span->null_count = 0;
}
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, out));
RETURN_NOT_OK(ExecuteKernel(input, selection, out));
// Output type didn't change
DCHECK(out->is_array_span());
return Status::OK();
Expand All @@ -903,8 +1000,13 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// We will eventually delete the Scalar output path per
// ARROW-16757.
ExecSpan input;
SelectionVectorSpan selection;
SelectionVectorSpan* selection_ptr = nullptr;
if (span_iterator_.have_selection_vector()) {
selection_ptr = &selection;
}
ExecResult output;
while (span_iterator_.Next(&input)) {
while (span_iterator_.Next(&input, selection_ptr)) {
ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length));
DCHECK(output.is_array_data());

Expand All @@ -917,7 +1019,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
out_arr->null_count = 0;
}

RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, &output));
RETURN_NOT_OK(ExecuteKernel(input, selection_ptr, &output));

// Output type didn't change
DCHECK(output.is_array_data());
Expand Down Expand Up @@ -983,6 +1085,17 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
return Status::OK();
}

// Actually invoke the kernel on the given input span, either selectively if there is a
// selection or non-selectively otherwise.
Status ExecuteKernel(const ExecSpan& input, const SelectionVectorSpan* selection,
ExecResult* out) {
if (selection) {
DCHECK_NE(kernel_->selective_exec, nullptr);
return kernel_->selective_exec(kernel_ctx_, input, *selection, out);
}
return kernel_->exec(kernel_ctx_, input, out);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is called twice, but it would probably be better to have it inlined at the callsites. This pre-condition that non-null selection implies non-null selective_exec is very specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pre-condition that non-null selection implies non-null selective_exec is very specific.

Sorry I don't get it. The two callsites both have the possibility that selection is non-null, and we need to make sure that selective_exec is also non-null. In other word, if we inline it, the code would be exactly the same in these two places.

Or are you suggesting something performance-wise?


// Used to account for the case where we do not preallocate a
// validity bitmap because the inputs are all non-null and we're
// using NullHandling::INTERSECTION to compute the validity bitmap
Expand Down Expand Up @@ -1345,18 +1458,53 @@ const CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }

SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
: data_(std::move(data)) {
DCHECK_EQ(Type::INT32, data_->type->id());
DCHECK_EQ(0, data_->GetNullCount());
DCHECK_NE(data_, nullptr);
DCHECK_EQ(data_->type->id(), Type::INT32);
indices_ = data_->GetValues<int32_t>(1);
}

SelectionVector::SelectionVector(const Array& arr) : SelectionVector(arr.data()) {}

int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
int64_t SelectionVector::length() const { return data_->length; }

Status SelectionVector::Validate(int64_t values_length) const {
if (data_ == nullptr) {
return Status::Invalid("SelectionVector not initialized");
}
ARROW_CHECK_NE(indices_, nullptr);
if (data_->type->id() != Type::INT32) {
return Status::Invalid("SelectionVector must be of type int32");
}
if (data_->GetNullCount() != 0) {
return Status::Invalid("SelectionVector cannot contain nulls");
}
for (int64_t i = 1; i < length(); ++i) {
if (indices_[i - 1] > indices_[i]) {
return Status::Invalid("SelectionVector indices must be sorted");
}
}
for (int64_t i = 0; i < length(); ++i) {
if (indices_[i] < 0) {
return Status::Invalid("SelectionVector indices must be non-negative");
}
}
if (values_length >= 0) {
for (int64_t i = 0; i < length(); ++i) {
if (indices_[i] >= values_length) {
return Status::Invalid("SelectionVector index ", indices_[i],
" >= values length ", values_length);
}
}
}
return Status::OK();
}

Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
const BooleanArray& arr) {
return Status::NotImplemented("FromMask");
void SelectionVectorSpan::SetSlice(int64_t offset, int64_t length,
int32_t index_back_shift) {
DCHECK_NE(indices_, nullptr);
offset_ = offset;
length_ = length;
index_back_shift_ = index_back_shift;
}

Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
Expand Down
Loading
Loading