Skip to content

[VL] Support multiple segments per partition in columnar shuffle#11722

Open
guowangy wants to merge 20 commits intoapache:mainfrom
guowangy:partition-multi-segments
Open

[VL] Support multiple segments per partition in columnar shuffle#11722
guowangy wants to merge 20 commits intoapache:mainfrom
guowangy:partition-multi-segments

Conversation

@guowangy
Copy link

@guowangy guowangy commented Mar 9, 2026

What changes are proposed in this pull request?

Introduces multi-segment-per-partition support in the Velox backend columnar shuffle writer, enabling incremental flushing of partition data to the final data file during processing — reducing peak memory usage without requiring full in-memory buffering or temporary spill files. The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.

New index file format (ColumnarIndexShuffleBlockResolver)

Extends IndexShuffleBlockResolver with a new index format supporting multiple (offset, length) segments per partition:

[Partition Index: (N+1) × 8-byte big-endian offsets]
[Segment Data: per-partition list of (data_offset, length) pairs, each 8 bytes]
[1-byte end marker]  ← distinguishes from legacy format (size always multiple of 8)
image

ColumnarShuffleManager now uses this resolver. Multi-segment mode activates only when external shuffle service, push-based shuffle, and dictionary encoding are all disabled (dictionary encoding requires all-batches-complete before writing).

New I/O abstractions

  • FileSegmentsInputStreamInputStream over non-contiguous (offset, size) file segments; supports zero-copy native reads via read(destAddress, maxSize)
  • FileSegmentsManagedBufferManagedBuffer backed by discontiguous segments; supports nioByteBuffer(), createInputStream(), convertToNetty()
  • DiscontiguousFileRegion — Netty FileRegion mapping a logical range to multiple physical segments for zero-copy network transfer
  • LowCopyFileSegmentsJniByteInputStream — zero-copy JNI wrapper over FileSegmentsInputStream; wired into JniByteInputStreams.create()

C++ LocalPartitionWriter changes

  • usePartitionMultipleSegments_ flag + partitionSegments_ vector tracking (start, length) per partition
  • flushCachedPayloads() — incremental flush after each hashEvict
  • writeMemoryPayload() — direct write to final data file during sortEvict
  • writeIndexFile() — serializes the new index at stop time
  • PayloadCache::writeIncremental() — flushes completed (non-active) partitions without touching the in-use partition

JNI/JVM wiring

LocalPartitionWriterJniWrapper and JniWrapper.cc accept a new optional indexFile parameter; ColumnarShuffleWriter passes the temp index file path when multi-segment mode is active.

How was this patch tested?

New unit test suites:

  • ColumnarIndexShuffleBlockResolverSuite — index format read/write, format detection, multi-segment block lookup
  • FileSegmentsInputStreamSuite — sequential reads, multi-segment traversal, skip, zero-copy native reads
  • FileSegmentsManagedBufferSuitenioByteBuffer, createInputStream, convertToNetty, EOF and mmap edge cases
  • DiscontiguousFileRegionSuite — Netty transfer across discontiguous segments, lazy open
  • LowCopyFileSegmentsJniByteInputStreamTest — JNI wrapper correctness for ByteInputStream

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added CORE works for Gluten Core VELOX labels Mar 9, 2026
@github-actions
Copy link

github-actions bot commented Mar 9, 2026

Run Gluten Clickhouse CI on x86

@zhouyuan zhouyuan requested a review from marin-ma March 10, 2026 15:43
Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guowangy Thanks for contributing this feature. Please check my comments below.

#endif
}

arrow::Status LocalPartitionWriter::writeIndexFile() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some c++ unit tests for the multi-segment partition write?

}

// Helper for big-endian conversion (network order)
#include <arpa/inet.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the header to the top, and move htoll into anonymous namespace after the headers.

// For Dictionary encoding, the dict only finalizes after all batches are processed,
// and dict is required to saved at the head of the partition data.
// So we cannot use multiple segments to save partition data incrementally.
partitionUseMultipleSegments = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a configuration to enable this feature.


if (usePartitionMultipleSegments_) {
// If multiple segments per partition is enabled, write directly to the final data file.
RETURN_NOT_OK(writeMemoryPayload(partitionId, std::move(inMemoryPayload)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more on how this can reduce the memory usage? Looks like the memory is still only get reclaimed by OOM and spilling.

RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
}
if (usePartitionMultipleSegments_) {
RETURN_NOT_OK(flushCachedPayloads());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hashEvict is not only called for spilling. When the evictType is kCache, then it try to cache as much payload in memory as possible to reduce spilling.

And when the evitType is kSpill, the data will be written to a spilled data file. Two evict types can exist in the same job. Is evictType == kSpill being properly handled for multi-segments write?

@marin-ma
Copy link
Contributor

The implementation can reduce total latency of TPC-H(SF6T) by ~16% using sort-based shuffle with low memory capacity in 2-socket Xeon 6960P system.

Can you explain where this improvement mainly comes from?

Currently we follow the same file layout as vanilla spark to have each partition output contiguous. I think one major benefit for this design is to reduce small random disk IO from the shuffle reader side. If memory is tight then the spill will be triggered more frequently, and it will be more likely to produce small output blocks for each partition. In this case this design will not be IO friendly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants