From 029213271c4386c48c89cf3651fdce0ba4c34cf6 Mon Sep 17 00:00:00 2001 From: Andrew Leng Date: Mon, 15 Dec 2025 18:50:59 -0500 Subject: [PATCH 1/4] GH-45847: [C++] Optimize Parquet column reader by fusing decoding and counting This PR implements the optimization to fuse definition level decoding with counting in the Parquet column reader, addressing the TODO in column_reader.cc. --- cpp/src/arrow/util/rle_encoding_internal.h | 87 ++++++++++++++++++++++ cpp/src/arrow/util/rle_encoding_test.cc | 44 +++++++++++ cpp/src/parquet/column_reader.cc | 33 ++++++-- cpp/src/parquet/column_reader.h | 4 + 4 files changed, 162 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index 2c084e0b4c8..49e96d0daae 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -308,6 +308,23 @@ class RleRunDecoder { return to_read; } + /// Get a batch of values and count how many equal match_value + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + rle_size_t value_bit_width, + value_type match_value, int64_t* out_count) { + if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) { + return 0; + } + + const auto to_read = std::min(remaining_count_, batch_size); + std::fill(out, out + to_read, value_); + if (value_ == match_value) { + *out_count += to_read; + } + remaining_count_ -= to_read; + return to_read; + } + private: value_type value_ = {}; rle_size_t remaining_count_ = 0; @@ -377,6 +394,15 @@ class BitPackedRunDecoder { return steps; } + /// Get a batch of values and count how many equal match_value + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + rle_size_t value_bit_width, + value_type match_value, int64_t* out_count) { + auto steps = GetBatch(out, batch_size, value_bit_width); + *out_count += std::count(out, out + steps, match_value); + return steps; + } + private: /// The pointer to the beginning of the run const uint8_t* data_ = nullptr; @@ -438,6 +464,10 @@ class RleBitPackedDecoder { /// left or if an error occurred. [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size); + /// Get a batch of values and count how many equal match_value + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, int64_t* out_count); + /// Like GetBatch but add spacing for null entries. /// /// Null entries will be set to an arbistrary value to avoid leaking private data. @@ -483,6 +513,18 @@ class RleBitPackedDecoder { decoder_); } + /// Get a batch of values from the current run and return the number elements read. + [[nodiscard]] rle_size_t RunGetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, + int64_t* out_count) { + return std::visit( + [&](auto& dec) { + return dec.GetBatchWithCount(out, batch_size, value_bit_width_, match_value, + out_count); + }, + decoder_); + } + /// Call the parser with a single callable for all event types. template void ParseWithCallable(Callable&& func); @@ -1474,4 +1516,49 @@ inline void RleBitPackedEncoder::Clear() { bit_writer_.Clear(); } +template +auto RleBitPackedDecoder::GetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, int64_t* out_count) + -> rle_size_t { + using ControlFlow = RleBitPackedParser::ControlFlow; + + rle_size_t values_read = 0; + + // Remaining from a previous call that would have left some unread data from a run. + if (ARROW_PREDICT_FALSE(run_remaining() > 0)) { + const auto read = RunGetBatchWithCount(out, batch_size, match_value, out_count); + values_read += read; + out += read; + + // Either we fulfilled all the batch to be read or we finished remaining run. + if (ARROW_PREDICT_FALSE(values_read == batch_size)) { + return values_read; + } + ARROW_DCHECK(run_remaining() == 0); + } + + ParseWithCallable([&](auto run) { + using RunDecoder = typename decltype(run)::template DecoderType; + + ARROW_DCHECK_LT(values_read, batch_size); + RunDecoder decoder(run, value_bit_width_); + const auto read = + decoder.GetBatchWithCount(out, batch_size - values_read, value_bit_width_, + match_value, out_count); + ARROW_DCHECK_LE(read, batch_size - values_read); + values_read += read; + out += read; + + // Stop reading and store remaining decoder + if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) { + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + + return ControlFlow::Continue; + }); + + return values_read; +} + } // namespace arrow::util diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index b2d4f7df6f1..327ccfea281 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -1280,4 +1280,48 @@ TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) { DoTestGetBatchSpacedRoundtrip(); } +TEST(Rle, GetBatchWithCount) { + const int bit_width = 1; + uint8_t buffer[100]; + RleBitPackedEncoder encoder(buffer, sizeof(buffer), bit_width); + + // 30 1s + for (int i = 0; i < 30; ++i) { + bool result = encoder.Put(1); + EXPECT_TRUE(result); + } + // 20 0s + for (int i = 0; i < 20; ++i) { + bool result = encoder.Put(0); + EXPECT_TRUE(result); + } + // 10 1s + for (int i = 0; i < 10; ++i) { + bool result = encoder.Put(1); + EXPECT_TRUE(result); + } + encoder.Flush(); + + RleBitPackedDecoder decoder(buffer, sizeof(buffer), bit_width); + std::vector values(60); + int64_t count = 0; + + // Read first 40 values. Should be 30 1s and 10 0s. + // Count 1s. + int read = decoder.GetBatchWithCount(values.data(), 40, 1, &count); + EXPECT_EQ(read, 40); + EXPECT_EQ(count, 30); + for (int i = 0; i < 30; ++i) EXPECT_EQ(values[i], 1); + for (int i = 30; i < 40; ++i) EXPECT_EQ(values[i], 0); + + // Read next 20 values. Should be 10 0s and 10 1s. + // Count 1s. + count = 0; + read = decoder.GetBatchWithCount(values.data(), 20, 1, &count); + EXPECT_EQ(read, 20); + EXPECT_EQ(count, 10); + for (int i = 0; i < 10; ++i) EXPECT_EQ(values[i], 0); + for (int i = 10; i < 20; ++i) EXPECT_EQ(values[i], 1); +} + } // namespace arrow::util diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 79b837f755c..2480ab6069f 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -191,6 +191,30 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) { return num_decoded; } +int LevelDecoder::DecodeAndCount(int batch_size, int16_t* levels, int64_t* count) { + int num_decoded = 0; + + int num_values = std::min(num_values_remaining_, batch_size); + if (encoding_ == Encoding::RLE) { + num_decoded = + rle_decoder_->GetBatchWithCount(levels, num_values, max_level_, count); + } else { + num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); + *count += std::count(levels, levels + num_decoded, max_level_); + } + if (num_decoded > 0) { + internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); + if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { + std::stringstream ss; + ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max + << " out of range. Max Level: " << max_level_; + throw ParquetException(ss.str()); + } + } + num_values_remaining_ -= num_decoded; + return num_decoded; +} + ReaderProperties default_reader_properties() { static ReaderProperties default_reader_properties; return default_reader_properties; @@ -1006,14 +1030,11 @@ class TypedColumnReaderImpl : public TypedColumnReader, // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { - *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); - if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { + if (ARROW_PREDICT_FALSE(this->definition_level_decoder_.DecodeAndCount( + batch_size, def_levels, non_null_values_to_read) != batch_size)) { throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } - // TODO(wesm): this tallying of values-to-decode can be performed with better - // cache-efficiency if fused with the level decoding. - *non_null_values_to_read += - std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); + *num_def_levels = batch_size; } else { // Required field, read all values if (num_def_levels != nullptr) { diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index ac4469b1904..0a9ac49ec60 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -92,6 +92,10 @@ class PARQUET_EXPORT LevelDecoder { // Decodes a batch of levels into an array and returns the number of levels decoded int Decode(int batch_size, int16_t* levels); + /// Decodes a batch of levels into an array and counts the number of levels equal to + /// max_level_ + int DecodeAndCount(int batch_size, int16_t* levels, int64_t* count); + private: int bit_width_; int num_values_remaining_; From 2280bb9b0c441a475ae7d408afa9b6141e73d893 Mon Sep 17 00:00:00 2001 From: Andrew Leng Date: Wed, 17 Dec 2025 19:28:19 -0500 Subject: [PATCH 2/4] Add benchmarks for fused DecodeAndCount optimization Add ReadLevels_RleCountSeparate and ReadLevels_RleCountFused benchmarks to compare the old approach (Decode + std::count) vs the new fused DecodeAndCount approach. Results show ~12% speedup for RLE-heavy data (high repeat counts) where counting is O(1) for entire runs. --- cpp/src/parquet/column_reader_benchmark.cc | 93 ++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/cpp/src/parquet/column_reader_benchmark.cc b/cpp/src/parquet/column_reader_benchmark.cc index 83f661bf9ad..50f7a910636 100644 --- a/cpp/src/parquet/column_reader_benchmark.cc +++ b/cpp/src/parquet/column_reader_benchmark.cc @@ -336,6 +336,77 @@ static void DecodeLevels(Encoding::type level_encoding, int16_t max_level, int n state.SetItemsProcessed(state.iterations() * num_levels); } +// Benchmark that simulates the old approach: Decode + separate count +static void DecodeLevelsAndCountSeparate(Encoding::type level_encoding, int16_t max_level, + int num_levels, int batch_size, + int level_repeat_count, + ::benchmark::State& state) { + std::vector bytes; + { + std::vector input_levels; + GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level, + num_levels, &input_levels); + EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes); + } + + LevelDecoder decoder; + std::vector output_levels(batch_size); + for (auto _ : state) { + state.PauseTiming(); + decoder.SetData(level_encoding, max_level, num_levels, bytes.data(), + static_cast(bytes.size())); + int64_t total_count = 0; + state.ResumeTiming(); + // Decode + count separately (old approach) + while (true) { + int levels_decoded = decoder.Decode(batch_size, output_levels.data()); + if (levels_decoded == 0) { + break; + } + // Separate count pass (simulating the old approach) + total_count += + std::count(output_levels.data(), output_levels.data() + levels_decoded, max_level); + } + DoNotOptimize(total_count); + } + state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t)); + state.SetItemsProcessed(state.iterations() * num_levels); +} + +// Benchmark that uses the new fused DecodeAndCount approach +static void DecodeLevelsAndCountFused(Encoding::type level_encoding, int16_t max_level, + int num_levels, int batch_size, + int level_repeat_count, ::benchmark::State& state) { + std::vector bytes; + { + std::vector input_levels; + GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level, + num_levels, &input_levels); + EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes); + } + + LevelDecoder decoder; + std::vector output_levels(batch_size); + for (auto _ : state) { + state.PauseTiming(); + decoder.SetData(level_encoding, max_level, num_levels, bytes.data(), + static_cast(bytes.size())); + int64_t total_count = 0; + state.ResumeTiming(); + // Fused decode + count (new approach) + while (true) { + int levels_decoded = + decoder.DecodeAndCount(batch_size, output_levels.data(), &total_count); + if (levels_decoded == 0) { + break; + } + } + DoNotOptimize(total_count); + } + state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t)); + state.SetItemsProcessed(state.iterations() * num_levels); +} + static void ReadLevels_Rle(::benchmark::State& state) { int16_t max_level = static_cast(state.range(0)); int num_levels = static_cast(state.range(1)); @@ -354,6 +425,26 @@ static void ReadLevels_BitPack(::benchmark::State& state) { level_repeat_count, state); } +// Benchmark: Decode + Count separately (old approach) +static void ReadLevels_RleCountSeparate(::benchmark::State& state) { + int16_t max_level = static_cast(state.range(0)); + int num_levels = static_cast(state.range(1)); + int batch_size = static_cast(state.range(2)); + int level_repeat_count = static_cast(state.range(3)); + DecodeLevelsAndCountSeparate(Encoding::RLE, max_level, num_levels, batch_size, + level_repeat_count, state); +} + +// Benchmark: Fused DecodeAndCount (new approach) +static void ReadLevels_RleCountFused(::benchmark::State& state) { + int16_t max_level = static_cast(state.range(0)); + int num_levels = static_cast(state.range(1)); + int batch_size = static_cast(state.range(2)); + int level_repeat_count = static_cast(state.range(3)); + DecodeLevelsAndCountFused(Encoding::RLE, max_level, num_levels, batch_size, + level_repeat_count, state); +} + static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) { b->ArgNames({"MaxLevel", "NumLevels", "BatchSize", "LevelRepeatCount"}) ->Args({1, 8096, 1024, 1}) @@ -367,6 +458,8 @@ static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) { BENCHMARK(ReadLevels_Rle)->Apply(ReadLevelsArguments); BENCHMARK(ReadLevels_BitPack)->Apply(ReadLevelsArguments); +BENCHMARK(ReadLevels_RleCountSeparate)->Apply(ReadLevelsArguments); +BENCHMARK(ReadLevels_RleCountFused)->Apply(ReadLevelsArguments); } // namespace benchmarks } // namespace parquet From e4f35d1d07006028fbdd36c5494796eea09a496b Mon Sep 17 00:00:00 2001 From: Andrew Leng Date: Wed, 17 Dec 2025 19:32:26 -0500 Subject: [PATCH 3/4] Add documentation comment explaining BitPackedRunDecoder::GetBatchWithCount design Clarify that the fused counting approach is only beneficial for RLE runs where counting is O(1). For bit-packed runs, std::count after GetBatch is used since it's highly optimized by modern compilers (SIMD). --- cpp/src/arrow/util/rle_encoding_internal.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index 49e96d0daae..d2d38083a35 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -395,10 +395,14 @@ class BitPackedRunDecoder { } /// Get a batch of values and count how many equal match_value + /// Note: For bit-packed runs, we use std::count after GetBatch since it's + /// highly optimized by the compiler. The fused approach is only beneficial + /// for RLE runs where counting is O(1). [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, rle_size_t value_bit_width, value_type match_value, int64_t* out_count) { - auto steps = GetBatch(out, batch_size, value_bit_width); + const auto steps = GetBatch(out, batch_size, value_bit_width); + // std::count is highly optimized (SIMD) by modern compilers *out_count += std::count(out, out + steps, match_value); return steps; } From 6f4c7e1f62060f8355b7c3cdb48b7bce6f43785d Mon Sep 17 00:00:00 2001 From: Andrew Leng Date: Fri, 19 Dec 2025 23:53:08 -0500 Subject: [PATCH 4/4] Refactor RleRunDecoder::GetBatchWithCount to avoid code duplication Address review feedback by calling GetBatch instead of duplicating the fill logic. For RLE runs, counting remains O(1) since all values in the run are identical. --- cpp/src/arrow/util/rle_encoding_internal.h | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index d2d38083a35..59eef304edc 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -308,20 +308,17 @@ class RleRunDecoder { return to_read; } - /// Get a batch of values and count how many equal match_value + /// Get a batch of values and count how many equal match_value. + /// For RLE runs, counting is O(1) because all values in the run are identical. [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, rle_size_t value_bit_width, value_type match_value, int64_t* out_count) { - if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) { - return 0; - } - - const auto to_read = std::min(remaining_count_, batch_size); - std::fill(out, out + to_read, value_); - if (value_ == match_value) { + // Save value_ before GetBatch since it doesn't change during the call. + const auto current_value = value_; + const auto to_read = GetBatch(out, batch_size, value_bit_width); + if (current_value == match_value) { *out_count += to_read; } - remaining_count_ -= to_read; return to_read; }