diff --git a/cpp/src/arrow/util/rle_encoding_internal.h b/cpp/src/arrow/util/rle_encoding_internal.h index 2c084e0b4c8..59eef304edc 100644 --- a/cpp/src/arrow/util/rle_encoding_internal.h +++ b/cpp/src/arrow/util/rle_encoding_internal.h @@ -308,6 +308,20 @@ class RleRunDecoder { return to_read; } + /// Get a batch of values and count how many equal match_value. + /// For RLE runs, counting is O(1) because all values in the run are identical. + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + rle_size_t value_bit_width, + value_type match_value, int64_t* out_count) { + // Save value_ before GetBatch since it doesn't change during the call. + const auto current_value = value_; + const auto to_read = GetBatch(out, batch_size, value_bit_width); + if (current_value == match_value) { + *out_count += to_read; + } + return to_read; + } + private: value_type value_ = {}; rle_size_t remaining_count_ = 0; @@ -377,6 +391,19 @@ class BitPackedRunDecoder { return steps; } + /// Get a batch of values and count how many equal match_value + /// Note: For bit-packed runs, we use std::count after GetBatch since it's + /// highly optimized by the compiler. The fused approach is only beneficial + /// for RLE runs where counting is O(1). + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + rle_size_t value_bit_width, + value_type match_value, int64_t* out_count) { + const auto steps = GetBatch(out, batch_size, value_bit_width); + // std::count is highly optimized (SIMD) by modern compilers + *out_count += std::count(out, out + steps, match_value); + return steps; + } + private: /// The pointer to the beginning of the run const uint8_t* data_ = nullptr; @@ -438,6 +465,10 @@ class RleBitPackedDecoder { /// left or if an error occurred. [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size); + /// Get a batch of values and count how many equal match_value + [[nodiscard]] rle_size_t GetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, int64_t* out_count); + /// Like GetBatch but add spacing for null entries. /// /// Null entries will be set to an arbistrary value to avoid leaking private data. @@ -483,6 +514,18 @@ class RleBitPackedDecoder { decoder_); } + /// Get a batch of values from the current run and return the number elements read. + [[nodiscard]] rle_size_t RunGetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, + int64_t* out_count) { + return std::visit( + [&](auto& dec) { + return dec.GetBatchWithCount(out, batch_size, value_bit_width_, match_value, + out_count); + }, + decoder_); + } + /// Call the parser with a single callable for all event types. template void ParseWithCallable(Callable&& func); @@ -1474,4 +1517,49 @@ inline void RleBitPackedEncoder::Clear() { bit_writer_.Clear(); } +template +auto RleBitPackedDecoder::GetBatchWithCount(value_type* out, rle_size_t batch_size, + value_type match_value, int64_t* out_count) + -> rle_size_t { + using ControlFlow = RleBitPackedParser::ControlFlow; + + rle_size_t values_read = 0; + + // Remaining from a previous call that would have left some unread data from a run. + if (ARROW_PREDICT_FALSE(run_remaining() > 0)) { + const auto read = RunGetBatchWithCount(out, batch_size, match_value, out_count); + values_read += read; + out += read; + + // Either we fulfilled all the batch to be read or we finished remaining run. + if (ARROW_PREDICT_FALSE(values_read == batch_size)) { + return values_read; + } + ARROW_DCHECK(run_remaining() == 0); + } + + ParseWithCallable([&](auto run) { + using RunDecoder = typename decltype(run)::template DecoderType; + + ARROW_DCHECK_LT(values_read, batch_size); + RunDecoder decoder(run, value_bit_width_); + const auto read = + decoder.GetBatchWithCount(out, batch_size - values_read, value_bit_width_, + match_value, out_count); + ARROW_DCHECK_LE(read, batch_size - values_read); + values_read += read; + out += read; + + // Stop reading and store remaining decoder + if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) { + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + + return ControlFlow::Continue; + }); + + return values_read; +} + } // namespace arrow::util diff --git a/cpp/src/arrow/util/rle_encoding_test.cc b/cpp/src/arrow/util/rle_encoding_test.cc index b2d4f7df6f1..327ccfea281 100644 --- a/cpp/src/arrow/util/rle_encoding_test.cc +++ b/cpp/src/arrow/util/rle_encoding_test.cc @@ -1280,4 +1280,48 @@ TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) { DoTestGetBatchSpacedRoundtrip(); } +TEST(Rle, GetBatchWithCount) { + const int bit_width = 1; + uint8_t buffer[100]; + RleBitPackedEncoder encoder(buffer, sizeof(buffer), bit_width); + + // 30 1s + for (int i = 0; i < 30; ++i) { + bool result = encoder.Put(1); + EXPECT_TRUE(result); + } + // 20 0s + for (int i = 0; i < 20; ++i) { + bool result = encoder.Put(0); + EXPECT_TRUE(result); + } + // 10 1s + for (int i = 0; i < 10; ++i) { + bool result = encoder.Put(1); + EXPECT_TRUE(result); + } + encoder.Flush(); + + RleBitPackedDecoder decoder(buffer, sizeof(buffer), bit_width); + std::vector values(60); + int64_t count = 0; + + // Read first 40 values. Should be 30 1s and 10 0s. + // Count 1s. + int read = decoder.GetBatchWithCount(values.data(), 40, 1, &count); + EXPECT_EQ(read, 40); + EXPECT_EQ(count, 30); + for (int i = 0; i < 30; ++i) EXPECT_EQ(values[i], 1); + for (int i = 30; i < 40; ++i) EXPECT_EQ(values[i], 0); + + // Read next 20 values. Should be 10 0s and 10 1s. + // Count 1s. + count = 0; + read = decoder.GetBatchWithCount(values.data(), 20, 1, &count); + EXPECT_EQ(read, 20); + EXPECT_EQ(count, 10); + for (int i = 0; i < 10; ++i) EXPECT_EQ(values[i], 0); + for (int i = 10; i < 20; ++i) EXPECT_EQ(values[i], 1); +} + } // namespace arrow::util diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 79b837f755c..2480ab6069f 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -191,6 +191,30 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) { return num_decoded; } +int LevelDecoder::DecodeAndCount(int batch_size, int16_t* levels, int64_t* count) { + int num_decoded = 0; + + int num_values = std::min(num_values_remaining_, batch_size); + if (encoding_ == Encoding::RLE) { + num_decoded = + rle_decoder_->GetBatchWithCount(levels, num_values, max_level_, count); + } else { + num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); + *count += std::count(levels, levels + num_decoded, max_level_); + } + if (num_decoded > 0) { + internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); + if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { + std::stringstream ss; + ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max + << " out of range. Max Level: " << max_level_; + throw ParquetException(ss.str()); + } + } + num_values_remaining_ -= num_decoded; + return num_decoded; +} + ReaderProperties default_reader_properties() { static ReaderProperties default_reader_properties; return default_reader_properties; @@ -1006,14 +1030,11 @@ class TypedColumnReaderImpl : public TypedColumnReader, // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { - *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); - if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { + if (ARROW_PREDICT_FALSE(this->definition_level_decoder_.DecodeAndCount( + batch_size, def_levels, non_null_values_to_read) != batch_size)) { throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } - // TODO(wesm): this tallying of values-to-decode can be performed with better - // cache-efficiency if fused with the level decoding. - *non_null_values_to_read += - std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); + *num_def_levels = batch_size; } else { // Required field, read all values if (num_def_levels != nullptr) { diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index ac4469b1904..0a9ac49ec60 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -92,6 +92,10 @@ class PARQUET_EXPORT LevelDecoder { // Decodes a batch of levels into an array and returns the number of levels decoded int Decode(int batch_size, int16_t* levels); + /// Decodes a batch of levels into an array and counts the number of levels equal to + /// max_level_ + int DecodeAndCount(int batch_size, int16_t* levels, int64_t* count); + private: int bit_width_; int num_values_remaining_; diff --git a/cpp/src/parquet/column_reader_benchmark.cc b/cpp/src/parquet/column_reader_benchmark.cc index 83f661bf9ad..50f7a910636 100644 --- a/cpp/src/parquet/column_reader_benchmark.cc +++ b/cpp/src/parquet/column_reader_benchmark.cc @@ -336,6 +336,77 @@ static void DecodeLevels(Encoding::type level_encoding, int16_t max_level, int n state.SetItemsProcessed(state.iterations() * num_levels); } +// Benchmark that simulates the old approach: Decode + separate count +static void DecodeLevelsAndCountSeparate(Encoding::type level_encoding, int16_t max_level, + int num_levels, int batch_size, + int level_repeat_count, + ::benchmark::State& state) { + std::vector bytes; + { + std::vector input_levels; + GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level, + num_levels, &input_levels); + EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes); + } + + LevelDecoder decoder; + std::vector output_levels(batch_size); + for (auto _ : state) { + state.PauseTiming(); + decoder.SetData(level_encoding, max_level, num_levels, bytes.data(), + static_cast(bytes.size())); + int64_t total_count = 0; + state.ResumeTiming(); + // Decode + count separately (old approach) + while (true) { + int levels_decoded = decoder.Decode(batch_size, output_levels.data()); + if (levels_decoded == 0) { + break; + } + // Separate count pass (simulating the old approach) + total_count += + std::count(output_levels.data(), output_levels.data() + levels_decoded, max_level); + } + DoNotOptimize(total_count); + } + state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t)); + state.SetItemsProcessed(state.iterations() * num_levels); +} + +// Benchmark that uses the new fused DecodeAndCount approach +static void DecodeLevelsAndCountFused(Encoding::type level_encoding, int16_t max_level, + int num_levels, int batch_size, + int level_repeat_count, ::benchmark::State& state) { + std::vector bytes; + { + std::vector input_levels; + GenerateLevels(/*level_repeats=*/level_repeat_count, /*max_repeat_factor=*/max_level, + num_levels, &input_levels); + EncodeLevels(level_encoding, max_level, num_levels, input_levels.data(), &bytes); + } + + LevelDecoder decoder; + std::vector output_levels(batch_size); + for (auto _ : state) { + state.PauseTiming(); + decoder.SetData(level_encoding, max_level, num_levels, bytes.data(), + static_cast(bytes.size())); + int64_t total_count = 0; + state.ResumeTiming(); + // Fused decode + count (new approach) + while (true) { + int levels_decoded = + decoder.DecodeAndCount(batch_size, output_levels.data(), &total_count); + if (levels_decoded == 0) { + break; + } + } + DoNotOptimize(total_count); + } + state.SetBytesProcessed(state.iterations() * num_levels * sizeof(int16_t)); + state.SetItemsProcessed(state.iterations() * num_levels); +} + static void ReadLevels_Rle(::benchmark::State& state) { int16_t max_level = static_cast(state.range(0)); int num_levels = static_cast(state.range(1)); @@ -354,6 +425,26 @@ static void ReadLevels_BitPack(::benchmark::State& state) { level_repeat_count, state); } +// Benchmark: Decode + Count separately (old approach) +static void ReadLevels_RleCountSeparate(::benchmark::State& state) { + int16_t max_level = static_cast(state.range(0)); + int num_levels = static_cast(state.range(1)); + int batch_size = static_cast(state.range(2)); + int level_repeat_count = static_cast(state.range(3)); + DecodeLevelsAndCountSeparate(Encoding::RLE, max_level, num_levels, batch_size, + level_repeat_count, state); +} + +// Benchmark: Fused DecodeAndCount (new approach) +static void ReadLevels_RleCountFused(::benchmark::State& state) { + int16_t max_level = static_cast(state.range(0)); + int num_levels = static_cast(state.range(1)); + int batch_size = static_cast(state.range(2)); + int level_repeat_count = static_cast(state.range(3)); + DecodeLevelsAndCountFused(Encoding::RLE, max_level, num_levels, batch_size, + level_repeat_count, state); +} + static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) { b->ArgNames({"MaxLevel", "NumLevels", "BatchSize", "LevelRepeatCount"}) ->Args({1, 8096, 1024, 1}) @@ -367,6 +458,8 @@ static void ReadLevelsArguments(::benchmark::internal::Benchmark* b) { BENCHMARK(ReadLevels_Rle)->Apply(ReadLevelsArguments); BENCHMARK(ReadLevels_BitPack)->Apply(ReadLevelsArguments); +BENCHMARK(ReadLevels_RleCountSeparate)->Apply(ReadLevelsArguments); +BENCHMARK(ReadLevels_RleCountFused)->Apply(ReadLevelsArguments); } // namespace benchmarks } // namespace parquet