From 16b207dc4a932264528d82e14742ae22a3e7636f Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Sun, 21 Dec 2025 17:54:00 +0800 Subject: [PATCH] fix --- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 56 ++++++++++--------- .../shuffle/VeloxRssSortShuffleWriter.cc | 35 ++++++------ cpp/velox/utils/Common.h | 18 ++---- 3 files changed, 55 insertions(+), 54 deletions(-) diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 6a1298d743e6..dda2c42a2e7d 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -269,13 +269,14 @@ arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr cb, i VELOX_CHECK(numColumns >= 2); auto pidBatch = veloxColumnBatch->select(veloxPool_.get(), {0}); auto pidArr = getFirstColumn(*(pidBatch->getRowVector())); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0); - RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), row2Partition_)); - for (auto& pid : row2Partition_) { - partition2RowCount_[pid]++; + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0); + RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), row2Partition_)); + for (auto& pid : row2Partition_) { + partition2RowCount_[pid]++; + } } - END_TIMING(); std::vector range; range.reserve(numColumns); for (int32_t i = 1; i < numColumns; i++) { @@ -289,9 +290,10 @@ arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr cb, i auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); facebook::velox::RowVectorPtr rv; - START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]); - rv = veloxColumnBatch->getFlattenedRowVector(); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingFlattenRV]); + rv = veloxColumnBatch->getFlattenedRowVector(); + } if (isExtremelyLargeBatch(rv)) { auto numRows = rv->size(); int32_t offset = 0; @@ -313,23 +315,25 @@ arrow::Status VeloxHashShuffleWriter::partitioningAndDoSplit(facebook::velox::Ro std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0); if (partitioner_->hasPid()) { auto pidArr = getFirstColumn(*rv); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), row2Partition_)); - for (auto& pid : row2Partition_) { - partition2RowCount_[pid]++; + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), row2Partition_)); + for (auto& pid : row2Partition_) { + partition2RowCount_[pid]++; + } } - END_TIMING(); auto strippedRv = getStrippedRowVector(*rv); RETURN_NOT_OK(initFromRowVector(*strippedRv)); RETURN_NOT_OK(doSplit(*strippedRv, memLimit)); } else { RETURN_NOT_OK(initFromRowVector(*rv)); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), row2Partition_)); - for (auto& pid : row2Partition_) { - partition2RowCount_[pid]++; + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), row2Partition_)); + for (auto& pid : row2Partition_) { + partition2RowCount_[pid]++; + } } - END_TIMING(); RETURN_NOT_OK(doSplit(*rv, memLimit)); } return arrow::Status::OK(); @@ -416,13 +420,13 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& RETURN_NOT_OK(buildPartition2Row(rowNum)); RETURN_NOT_OK(updateInputHasNull(rv)); - START_TIMING(cpuWallTimingList_[CpuWallTimingIteratePartitions]); - - setSplitState(SplitState::kPreAlloc); - // Calculate buffer size based on available offheap memory, history average bytes per row and options_.bufferSize. - auto preAllocBufferSize = calculatePartitionBufferSize(rv, memLimit); - RETURN_NOT_OK(preAllocPartitionBuffers(preAllocBufferSize)); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingIteratePartitions]); + setSplitState(SplitState::kPreAlloc); + // Calculate buffer size based on available offheap memory, history average bytes per row and options_.bufferSize. + auto preAllocBufferSize = calculatePartitionBufferSize(rv, memLimit); + RETURN_NOT_OK(preAllocPartitionBuffers(preAllocBufferSize)); + } printPartitionBuffer(); diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc index 54e253972037..0f9297ac5f9b 100644 --- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc @@ -21,7 +21,6 @@ #include "shuffle/ShuffleSchema.h" #include "utils/Common.h" #include "utils/Macros.h" -#include "utils/VeloxArrowUtils.h" #include "velox/common/base/Nulls.h" #include "velox/type/Type.h" @@ -79,10 +78,11 @@ arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr cb VELOX_CHECK(numColumns >= 2); auto pidBatch = veloxColumnBatch->select(veloxPool_.get(), {0}); auto pidArr = getFirstColumn(*(pidBatch->getRowVector())); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - setSortState(RssSortState::kSort); - RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), batches_.size(), rowVectorIndexMap_)); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(RssSortState::kSort); + RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), batches_.size(), rowVectorIndexMap_)); + } std::vector range; range.reserve(numColumns); for (int32_t i = 1; i < numColumns; i++) { @@ -96,24 +96,27 @@ arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr cb auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); facebook::velox::RowVectorPtr rv; - START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]); - rv = veloxColumnBatch->getFlattenedRowVector(); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingFlattenRV]); + rv = veloxColumnBatch->getFlattenedRowVector(); + } if (partitioner_->hasPid()) { auto pidArr = getFirstColumn(*rv); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - setSortState(RssSortState::kSort); - RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(), rowVectorIndexMap_)); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(RssSortState::kSort); + RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(), rowVectorIndexMap_)); + } auto strippedRv = getStrippedRowVector(*rv); RETURN_NOT_OK(initFromRowVector(*strippedRv)); RETURN_NOT_OK(doSort(strippedRv, sortBufferMaxSize_)); } else { RETURN_NOT_OK(initFromRowVector(*rv)); - START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); - setSortState(RssSortState::kSort); - RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), batches_.size(), rowVectorIndexMap_)); - END_TIMING(); + { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCompute]); + setSortState(RssSortState::kSort); + RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), batches_.size(), rowVectorIndexMap_)); + } RETURN_NOT_OK(doSort(rv, sortBufferMaxSize_)); } } diff --git a/cpp/velox/utils/Common.h b/cpp/velox/utils/Common.h index eaa551da0566..6dd92b409d86 100644 --- a/cpp/velox/utils/Common.h +++ b/cpp/velox/utils/Common.h @@ -35,17 +35,11 @@ static inline void fastCopy(void* dst, const void* src, size_t n) { facebook::velox::simd::memcpy(dst, src, n); } -#define START_TIMING(timing) \ - { \ - auto ptiming = &timing; \ - facebook::velox::DeltaCpuWallTimer timer{ \ - [ptiming](const facebook::velox::CpuWallTiming& delta) { ptiming->add(delta); }}; - -#define END_TIMING() } - -#define SCOPED_TIMER(timing) \ - auto ptiming = &timing; \ - facebook::velox::DeltaCpuWallTimer timer{ \ - [ptiming](const facebook::velox::CpuWallTiming& delta) { ptiming->add(delta); }}; +#define SCOPED_TIMER(timing) \ + do { \ + auto ptiming = &timing; \ + facebook::velox::DeltaCpuWallTimer timer{ \ + [ptiming](const facebook::velox::CpuWallTiming& delta) { ptiming->add(delta); }}; \ + } while (0) } // namespace gluten