Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions cpp/src/arrow/util/rle_encoding_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,20 @@ class RleRunDecoder {
return to_read;
}

/// Get a batch of values and count how many equal match_value.
/// For RLE runs, counting is O(1) because all values in the run are identical.
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
rle_size_t value_bit_width,
value_type match_value, int64_t* out_count) {
// Save value_ before GetBatch since it doesn't change during the call.
const auto current_value = value_;
const auto to_read = GetBatch(out, batch_size, value_bit_width);
if (current_value == match_value) {
*out_count += to_read;
}
return to_read;
}

private:
value_type value_ = {};
rle_size_t remaining_count_ = 0;
Expand Down Expand Up @@ -377,6 +391,19 @@ class BitPackedRunDecoder {
return steps;
}

/// Get a batch of values and count how many equal match_value
/// Note: For bit-packed runs, we use std::count after GetBatch since it's
/// highly optimized by the compiler. The fused approach is only beneficial
/// for RLE runs where counting is O(1).
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
rle_size_t value_bit_width,
value_type match_value, int64_t* out_count) {
const auto steps = GetBatch(out, batch_size, value_bit_width);
// std::count is highly optimized (SIMD) by modern compilers
*out_count += std::count(out, out + steps, match_value);
Comment on lines +401 to +403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been working on the unpack function used in GetBatch and my intuition is also that this function could not easily be extended to count at the same time as it extract (not impossible but heavy changes).

Still this could provide better data locality when doing run by run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typical batch size for levels is probably small, so it would fit at least in L2 cache and perhaps L1. Not sure it's worth trying to do it while decoding.

return steps;
}

private:
/// The pointer to the beginning of the run
const uint8_t* data_ = nullptr;
Expand Down Expand Up @@ -438,6 +465,10 @@ class RleBitPackedDecoder {
/// left or if an error occurred.
[[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size);

/// Get a batch of values and count how many equal match_value
[[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size,
value_type match_value, int64_t* out_count);

/// Like GetBatch but add spacing for null entries.
///
/// Null entries will be set to an arbistrary value to avoid leaking private data.
Expand Down Expand Up @@ -483,6 +514,18 @@ class RleBitPackedDecoder {
decoder_);
}

/// Get a batch of values from the current run and return the number elements read.
[[nodiscard]] rle_size_t RunGetBatchWithCount(value_type* out, rle_size_t batch_size,
value_type match_value,
int64_t* out_count) {
return std::visit(
[&](auto& dec) {
return dec.GetBatchWithCount(out, batch_size, value_bit_width_, match_value,
out_count);
},
decoder_);
}

/// Call the parser with a single callable for all event types.
template <typename Callable>
void ParseWithCallable(Callable&& func);
Expand Down Expand Up @@ -1474,4 +1517,49 @@ inline void RleBitPackedEncoder::Clear() {
bit_writer_.Clear();
}

template <typename T>
auto RleBitPackedDecoder<T>::GetBatchWithCount(value_type* out, rle_size_t batch_size,
value_type match_value, int64_t* out_count)
-> rle_size_t {
using ControlFlow = RleBitPackedParser::ControlFlow;

rle_size_t values_read = 0;

// Remaining from a previous call that would have left some unread data from a run.
if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
const auto read = RunGetBatchWithCount(out, batch_size, match_value, out_count);
values_read += read;
out += read;

// Either we fulfilled all the batch to be read or we finished remaining run.
if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
return values_read;
}
ARROW_DCHECK(run_remaining() == 0);
}

ParseWithCallable([&](auto run) {
using RunDecoder = typename decltype(run)::template DecoderType<value_type>;

ARROW_DCHECK_LT(values_read, batch_size);
RunDecoder decoder(run, value_bit_width_);
const auto read =
decoder.GetBatchWithCount(out, batch_size - values_read, value_bit_width_,
match_value, out_count);
ARROW_DCHECK_LE(read, batch_size - values_read);
values_read += read;
out += read;

// Stop reading and store remaining decoder
if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {
decoder_ = std::move(decoder);
return ControlFlow::Break;
}

return ControlFlow::Continue;
});

return values_read;
}

} // namespace arrow::util
44 changes: 44 additions & 0 deletions cpp/src/arrow/util/rle_encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1280,4 +1280,48 @@ TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) {
DoTestGetBatchSpacedRoundtrip<uint64_t>();
}

TEST(Rle, GetBatchWithCount) {
const int bit_width = 1;
uint8_t buffer[100];
RleBitPackedEncoder encoder(buffer, sizeof(buffer), bit_width);

// 30 1s
for (int i = 0; i < 30; ++i) {
bool result = encoder.Put(1);
EXPECT_TRUE(result);
}
// 20 0s
for (int i = 0; i < 20; ++i) {
bool result = encoder.Put(0);
EXPECT_TRUE(result);
}
// 10 1s
for (int i = 0; i < 10; ++i) {
bool result = encoder.Put(1);
EXPECT_TRUE(result);
}
encoder.Flush();

RleBitPackedDecoder<int> decoder(buffer, sizeof(buffer), bit_width);
std::vector<int> values(60);
int64_t count = 0;

// Read first 40 values. Should be 30 1s and 10 0s.
// Count 1s.
int read = decoder.GetBatchWithCount(values.data(), 40, 1, &count);
EXPECT_EQ(read, 40);
EXPECT_EQ(count, 30);
for (int i = 0; i < 30; ++i) EXPECT_EQ(values[i], 1);
for (int i = 30; i < 40; ++i) EXPECT_EQ(values[i], 0);

// Read next 20 values. Should be 10 0s and 10 1s.
// Count 1s.
count = 0;
read = decoder.GetBatchWithCount(values.data(), 20, 1, &count);
EXPECT_EQ(read, 20);
EXPECT_EQ(count, 10);
for (int i = 0; i < 10; ++i) EXPECT_EQ(values[i], 0);
for (int i = 10; i < 20; ++i) EXPECT_EQ(values[i], 1);
}

} // namespace arrow::util
33 changes: 27 additions & 6 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,30 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) {
return num_decoded;
}

int LevelDecoder::DecodeAndCount(int batch_size, int16_t* levels, int64_t* count) {
int num_decoded = 0;

int num_values = std::min(num_values_remaining_, batch_size);
if (encoding_ == Encoding::RLE) {
num_decoded =
rle_decoder_->GetBatchWithCount(levels, num_values, max_level_, count);
} else {
num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
*count += std::count(levels, levels + num_decoded, max_level_);
}
if (num_decoded > 0) {
internal::MinMax min_max = internal::FindMinMax(levels, num_decoded);
if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) {
std::stringstream ss;
ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max
<< " out of range. Max Level: " << max_level_;
throw ParquetException(ss.str());
}
}
num_values_remaining_ -= num_decoded;
return num_decoded;
}

ReaderProperties default_reader_properties() {
static ReaderProperties default_reader_properties;
return default_reader_properties;
Expand Down Expand Up @@ -1006,14 +1030,11 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
if (ARROW_PREDICT_FALSE(this->definition_level_decoder_.DecodeAndCount(
batch_size, def_levels, non_null_values_to_read) != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
*num_def_levels = batch_size;
} else {
// Required field, read all values
if (num_def_levels != nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class PARQUET_EXPORT LevelDecoder {
// Decodes a batch of levels into an array and returns the number of levels decoded
int Decode(int batch_size, int16_t* levels);

/// Decodes a batch of levels into an array and counts the number of levels equal to
/// max_level_
int DecodeAndCount(int batch_size, int16_t* levels, int64_t* count);

private:
int bit_width_;
int num_values_remaining_;
Expand Down
93 changes: 93 additions & 0 deletions cpp/src/parquet/column_reader_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,77 @@ static void DecodeLevels(Encoding::type level_encoding, int16_t max_level, int n
state.SetItemsProcessed(state.iterations() * num_levels);
}

// Benchmark that simulates the old approach: Decode + separate count
static void DecodeLevelsAndCountSeparate(Encoding::type level_encoding, int16_t max_level,
int num_levels, int batch_size,
int level_repeat_count,
::benchmark::State& state) {
std::vector<uint8_t> bytes;
{
std::vector<int16_t> input_levels;
GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level,
num_levels, &input_levels);
EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes);
}

LevelDecoder decoder;
std::vector<int16_t> output_levels(batch_size);
for (auto _ : state) {
state.PauseTiming();
decoder.SetData(level_encoding, max_level, num_levels, bytes.data(),
static_cast<int>(bytes.size()));
int64_t total_count = 0;
state.ResumeTiming();
// Decode + count separately (old approach)
while (true) {
int levels_decoded = decoder.Decode(batch_size, output_levels.data());
if (levels_decoded == 0) {
break;
}
// Separate count pass (simulating the old approach)
total_count +=
std::count(output_levels.data(), output_levels.data() + levels_decoded, max_level);
}
DoNotOptimize(total_count);
}
state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t));
state.SetItemsProcessed(state.iterations() * num_levels);
}

// Benchmark that uses the new fused DecodeAndCount approach
static void DecodeLevelsAndCountFused(Encoding::type level_encoding, int16_t max_level,
int num_levels, int batch_size,
int level_repeat_count, ::benchmark::State& state) {
std::vector<uint8_t> bytes;
{
std::vector<int16_t> input_levels;
GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level,
num_levels, &input_levels);
EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes);
}

LevelDecoder decoder;
std::vector<int16_t> output_levels(batch_size);
for (auto _ : state) {
state.PauseTiming();
decoder.SetData(level_encoding, max_level, num_levels, bytes.data(),
static_cast<int>(bytes.size()));
int64_t total_count = 0;
state.ResumeTiming();
// Fused decode + count (new approach)
while (true) {
int levels_decoded =
decoder.DecodeAndCount(batch_size, output_levels.data(), &total_count);
if (levels_decoded == 0) {
break;
}
}
DoNotOptimize(total_count);
}
state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t));
state.SetItemsProcessed(state.iterations() * num_levels);
}

static void ReadLevels_Rle(::benchmark::State& state) {
int16_t max_level = static_cast<int16_t>(state.range(0));
int num_levels = static_cast<int>(state.range(1));
Expand All @@ -354,6 +425,26 @@ static void ReadLevels_BitPack(::benchmark::State& state) {
level_repeat_count, state);
}

// Benchmark: Decode + Count separately (old approach)
static void ReadLevels_RleCountSeparate(::benchmark::State& state) {
int16_t max_level = static_cast<int16_t>(state.range(0));
int num_levels = static_cast<int>(state.range(1));
int batch_size = static_cast<int>(state.range(2));
int level_repeat_count = static_cast<int>(state.range(3));
DecodeLevelsAndCountSeparate(Encoding::RLE, max_level, num_levels, batch_size,
level_repeat_count, state);
}

// Benchmark: Fused DecodeAndCount (new approach)
static void ReadLevels_RleCountFused(::benchmark::State& state) {
int16_t max_level = static_cast<int16_t>(state.range(0));
int num_levels = static_cast<int>(state.range(1));
int batch_size = static_cast<int>(state.range(2));
int level_repeat_count = static_cast<int>(state.range(3));
DecodeLevelsAndCountFused(Encoding::RLE, max_level, num_levels, batch_size,
level_repeat_count, state);
}

static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) {
b->ArgNames({"MaxLevel", "NumLevels", "BatchSize", "LevelRepeatCount"})
->Args({1, 8096, 1024, 1})
Expand All @@ -367,6 +458,8 @@ static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) {

BENCHMARK(ReadLevels_Rle)->Apply(ReadLevelsArguments);
BENCHMARK(ReadLevels_BitPack)->Apply(ReadLevelsArguments);
BENCHMARK(ReadLevels_RleCountSeparate)->Apply(ReadLevelsArguments);
BENCHMARK(ReadLevels_RleCountFused)->Apply(ReadLevelsArguments);

} // namespace benchmarks
} // namespace parquet
Loading