Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9b67bbb
Add ColumnarIndexShuffleBlockResolver
guowangy Jan 22, 2026
0be5dae
Add DiscontiguousFileRegion
guowangy Jan 22, 2026
74e9692
Add FileSegmentsBuffer#createInputStream
guowangy Jan 23, 2026
15f2226
Impl FileSegmentsBuffer#nioByteBuffer
guowangy Jan 23, 2026
54d9dd1
Add mmap opt for single large file
guowangy Jan 23, 2026
c072f0e
DiscontiguousFileRegion support lazy open
guowangy Jan 23, 2026
d534477
Impl FileSegmentsManagedBuffer#convertToNetty
guowangy Jan 23, 2026
f96ca3e
Add ColumnarIndexShuffleBlockResolver#getSegmentsFromIndex
guowangy Jan 23, 2026
4d1a015
Impl ColumnarIndexShuffleBlockResolver#getBlockData
guowangy Jan 26, 2026
d6159c1
ColumnarIndexShuffleBlockResolver: add limitation of using new format
guowangy Jan 27, 2026
6286505
Add an optional indexFile params to LocalPartitionWriter for multiple…
guowangy Jan 29, 2026
f24c45a
LocalPartitionWriter: support multiple segments of partition
guowangy Feb 1, 2026
5a2ea3e
Add LowCopyFileSegmentsJniByteInputStream to support native read
guowangy Feb 2, 2026
9ceb7fd
Add FileSegmentsInputStream to handle segments read
guowangy Feb 5, 2026
66d3742
LowCopyFileSegmentsJniByteInputStream use FileSegmentsInputStream
guowangy Feb 5, 2026
e8f1b54
Avoid frequently calling fp.Tell()
guowangy Feb 6, 2026
9cac127
LocalPartitionWriter: fix output format for sortEvict
guowangy Mar 3, 2026
93b3f53
Various fixes
guowangy Mar 5, 2026
5857915
FileSegmentsManagedBuffer: empty segments should work
guowangy Mar 6, 2026
ecde73f
Fixes
guowangy Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SHUFFLE_COMPRESS, SHUFFLE_DISK_WRITE_BUFFER_SIZE, SHUFFLE_FILE_BUFFER_SIZE, SHUFFLE_SORT_INIT_BUFFER_SIZE, SHUFFLE_SORT_USE_RADIXSORT}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.sort.ColumnarIndexShuffleBlockResolver
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil, Utils}

Expand All @@ -42,6 +43,8 @@ class ColumnarShuffleWriter[K, V](
with Logging {

private val dep = handle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V, V]]
private var columnarShuffleBlockResolver: ColumnarIndexShuffleBlockResolver = _
private var partitionUseMultipleSegments: Boolean = false

dep.shuffleWriterType match {
case HashShuffleWriterType | SortShuffleWriterType | GpuHashShuffleWriterType =>
Expand All @@ -52,6 +55,18 @@ class ColumnarShuffleWriter[K, V](
s"expected one of: ${HashShuffleWriterType.name}, ${SortShuffleWriterType.name}")
}

shuffleBlockResolver match {
case resolver: ColumnarIndexShuffleBlockResolver =>
if (resolver.canUseNewFormat() && !GlutenConfig.get.columnarShuffleEnableDictionary) {
// For Dictionary encoding, the dict only finalizes after all batches are processed,
// and dict is required to saved at the head of the partition data.
// So we cannot use multiple segments to save partition data incrementally.
partitionUseMultipleSegments = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a configuration to enable this feature.

columnarShuffleBlockResolver = resolver
}
case _ =>
}

protected val isSort: Boolean = dep.shuffleWriterType == SortShuffleWriterType

private val numPartitions: Int = dep.partitioner.numPartitions
Expand Down Expand Up @@ -136,6 +151,11 @@ class ColumnarShuffleWriter[K, V](
}

val tempDataFile = Utils.tempFileWith(shuffleBlockResolver.getDataFile(dep.shuffleId, mapId))
val tempIndexFile = if (partitionUseMultipleSegments) {
Utils.tempFileWith(columnarShuffleBlockResolver.getIndexFile(dep.shuffleId, mapId))
} else {
null
}

while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
Expand All @@ -155,6 +175,7 @@ class ColumnarShuffleWriter[K, V](
blockManager.subDirsPerLocalDir,
conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt,
tempDataFile.getAbsolutePath,
if (tempIndexFile != null) tempIndexFile.getAbsolutePath else "",
localDirs,
GlutenConfig.get.columnarShuffleEnableDictionary
)
Expand Down Expand Up @@ -264,16 +285,29 @@ class ColumnarShuffleWriter[K, V](

partitionLengths = splitResult.getPartitionLengths
try {
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
tempDataFile)
if (partitionUseMultipleSegments) {
columnarShuffleBlockResolver.writeIndexFileAndCommit(
dep.shuffleId,
mapId,
tempDataFile,
tempIndexFile,
numPartitions,
None)
} else {
shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
Array[Long](),
tempDataFile)
}
} finally {
if (tempDataFile.exists() && !tempDataFile.delete()) {
logError(s"Error while deleting temp file ${tempDataFile.getAbsolutePath}")
}
if (tempIndexFile != null && tempIndexFile.exists() && !tempIndexFile.delete()) {
logError(s"Error while deleting temp file ${tempIndexFile.getAbsolutePath}")
}
}

// The partitionLength is much more than vanilla spark partitionLengths,
Expand Down
5 changes: 4 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -943,13 +943,15 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
jint numSubDirs,
jint shuffleFileBufferSize,
jstring dataFileJstr,
jstring indexFileJstr,
jstring localDirsJstr,
jboolean enableDictionary) {
JNI_METHOD_START

const auto ctx = getRuntime(env, wrapper);

auto dataFile = jStringToCString(env, dataFileJstr);
auto indexFile = jStringToCString(env, indexFileJstr);
auto localDirs = splitPaths(jStringToCString(env, localDirsJstr));

auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>(
Expand All @@ -968,7 +970,8 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
ctx->memoryManager(),
partitionWriterOptions,
dataFile,
std::move(localDirs));
std::move(localDirs),
indexFile);

return ctx->saveObject(partitionWriter);
JNI_METHOD_END(kInvalidObjectHandle)
Expand Down
132 changes: 130 additions & 2 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,25 @@ class LocalPartitionWriter::PayloadCache {
return arrow::Status::OK();
}

arrow::Result<bool> writeIncremental(uint32_t partitionId, arrow::io::OutputStream *os) {
GLUTEN_CHECK(!enableDictionary_, "Incremental write is not supported when dictionary is enabled.");
if ((partitionInUse_.has_value() && partitionInUse_.value() == partitionId) || !hasCachedPayloads(partitionId)) {
return false;
}

auto& payloads = partitionCachedPayload_[partitionId];
while (!payloads.empty()) {
const auto payload = std::move(payloads.front());
payloads.pop_front();
uint8_t blockType = static_cast<uint8_t>(BlockType::kPlainPayload);
RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType)));
RETURN_NOT_OK(payload->serialize(os));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
return true;
}

bool canSpill() {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (partitionInUse_.has_value() && partitionInUse_.value() == pid) {
Expand Down Expand Up @@ -506,10 +525,12 @@ LocalPartitionWriter::LocalPartitionWriter(
MemoryManager* memoryManager,
const std::shared_ptr<LocalPartitionWriterOptions>& options,
const std::string& dataFile,
std::vector<std::string> localDirs)
std::vector<std::string> localDirs,
const std::string& indexFile)
: PartitionWriter(numPartitions, std::move(codec), memoryManager),
options_(options),
dataFile_(dataFile),
indexFile_(indexFile),
localDirs_(std::move(localDirs)) {
init();
}
Expand Down Expand Up @@ -562,6 +583,56 @@ void LocalPartitionWriter::init() {
std::default_random_engine engine(rd());
std::shuffle(localDirs_.begin(), localDirs_.end(), engine);
subDirSelection_.assign(localDirs_.size(), 0);

if (!indexFile_.empty()) {
usePartitionMultipleSegments_ = true;
partitionSegments_.resize(numPartitions_);
}
}

// Helper for big-endian conversion (network order)
#include <arpa/inet.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the header to the top, and move htoll into anonymous namespace after the headers.

static uint64_t htonll(uint64_t value) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
return (((uint64_t)htonl(value & 0xFFFFFFFFULL)) << 32) | htonl(value >> 32);
#else
return value;
#endif
}

arrow::Status LocalPartitionWriter::writeIndexFile() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some c++ unit tests for the multi-segment partition write?

if (!usePartitionMultipleSegments_) {
return arrow::Status::OK();
}

ARROW_ASSIGN_OR_RAISE(auto indexFileOs, openFile(indexFile_, options_->shuffleFileBufferSize));

uint64_t segmentOffset = (numPartitions_ + 1) * sizeof(int64_t);
// write segment index of each partition in big-endian
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
uint64_t beOffset = htonll(segmentOffset);
RETURN_NOT_OK(indexFileOs->Write(reinterpret_cast<const uint8_t*>(&beOffset), sizeof(beOffset)));
const auto& segments = partitionSegments_[pid];
segmentOffset += (segments.size() * 2 * sizeof(int64_t));
}
uint64_t beOffset = htonll(segmentOffset);
RETURN_NOT_OK(indexFileOs->Write(reinterpret_cast<const uint8_t*>(&beOffset), sizeof(beOffset)));
// Write partition segments info in big-endian
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
const auto& segments = partitionSegments_[pid];
for (const auto& segment : segments) {
uint64_t beFirst = htonll(segment.first);
uint64_t beSecond = htonll(segment.second);
RETURN_NOT_OK(indexFileOs->Write(reinterpret_cast<const uint8_t*>(&beFirst), sizeof(beFirst)));
RETURN_NOT_OK(indexFileOs->Write(reinterpret_cast<const uint8_t*>(&beSecond), sizeof(beSecond)));
}
}

// Write an ending marker byte with value 1
const uint8_t marker = 1;
RETURN_NOT_OK(indexFileOs->Write(&marker, 1));
RETURN_NOT_OK(indexFileOs->Close());
return arrow::Status::OK();
}

arrow::Result<int64_t> LocalPartitionWriter::mergeSpills(uint32_t partitionId, arrow::io::OutputStream* os) {
Expand Down Expand Up @@ -600,13 +671,61 @@ arrow::Status LocalPartitionWriter::writeCachedPayloads(uint32_t partitionId, ar
return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::flushCachedPayloads() {
if (dataFileOs_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_, options_->shuffleFileBufferSize));
}
ARROW_ASSIGN_OR_RAISE(int64_t endInDataFile, dataFileOs_->Tell());
for (auto pid = 0; pid < numPartitions_; ++pid) {
auto startInDataFile = endInDataFile;
ARROW_ASSIGN_OR_RAISE(int64_t spillWrittenBytes, mergeSpills(pid, dataFileOs_.get()));
ARROW_ASSIGN_OR_RAISE(bool cachePayloadWritten, payloadCache_->writeIncremental(pid, dataFileOs_.get()));
if (spillWrittenBytes > 0 || cachePayloadWritten) {
ARROW_ASSIGN_OR_RAISE(endInDataFile, dataFileOs_->Tell());
auto bytesWritten = endInDataFile - startInDataFile;
partitionSegments_[pid].emplace_back(startInDataFile, bytesWritten);
partitionLengths_[pid] += bytesWritten;
}
}

return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::writeMemoryPayload(uint32_t partitionId, std::unique_ptr<InMemoryPayload> payload) {
if (dataFileOs_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_, options_->shuffleFileBufferSize));
}

ARROW_ASSIGN_OR_RAISE(int64_t startOffset, dataFileOs_->Tell());
if (codec_ != nullptr) {
ARROW_ASSIGN_OR_RAISE(auto compressOs, ShuffleCompressedOutputStream::Make(codec_.get(), options_->compressionBufferSize, dataFileOs_, arrow::default_memory_pool()));
RETURN_NOT_OK(payload->serialize(compressOs.get()));
RETURN_NOT_OK(compressOs->Flush());
compressTime_ += compressOs->compressTime();
RETURN_NOT_OK(compressOs->Close());
} else {
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
}
ARROW_ASSIGN_OR_RAISE(int64_t endOffset, dataFileOs_->Tell());
auto bytesWritten = endOffset - startOffset;
partitionSegments_[partitionId].emplace_back(startOffset, bytesWritten);
partitionLengths_[partitionId] += bytesWritten;

return arrow::Status::OK();
}

arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics, int64_t& evictBytes) {
if (stopped_) {
return arrow::Status::OK();
}
stopped_ = true;

if (useSpillFileAsDataFile_) {
if (usePartitionMultipleSegments_) {
RETURN_NOT_OK(finishSpill());
RETURN_NOT_OK(finishMerger());
RETURN_NOT_OK(flushCachedPayloads());
RETURN_NOT_OK(writeIndexFile());
} else if (useSpillFileAsDataFile_) {
ARROW_ASSIGN_OR_RAISE(auto spill, spiller_->finish());

// Merge the remaining partitions from spills.
Expand Down Expand Up @@ -750,6 +869,9 @@ arrow::Status LocalPartitionWriter::hashEvict(
for (auto& payload : merged) {
RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
}
if (usePartitionMultipleSegments_) {
RETURN_NOT_OK(flushCachedPayloads());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hashEvict is not only called for spilling. When the evictType is kCache, then it try to cache as much payload in memory as possible to reduce spilling.

And when the evitType is kSpill, the data will be written to a spilled data file. Two evict types can exist in the same job. Is evictType == kSpill being properly handled for multi-segments write?

}
merged.clear();
}
return arrow::Status::OK();
Expand All @@ -759,6 +881,12 @@ arrow::Status
LocalPartitionWriter::sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t& evictBytes) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();

if (usePartitionMultipleSegments_) {
// If multiple segments per partition is enabled, write directly to the final data file.
RETURN_NOT_OK(writeMemoryPayload(partitionId, std::move(inMemoryPayload)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit more on how this can reduce the memory usage? Looks like the memory is still only get reclaimed by OOM and spilling.

return arrow::Status::OK();
}

if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill());
Expand Down
15 changes: 14 additions & 1 deletion cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class LocalPartitionWriter : public PartitionWriter {
MemoryManager* memoryManager,
const std::shared_ptr<LocalPartitionWriterOptions>& options,
const std::string& dataFile,
std::vector<std::string> localDirs);
std::vector<std::string> localDirs,
const std::string& indexFile = "");

arrow::Status hashEvict(
uint32_t partitionId,
Expand Down Expand Up @@ -90,6 +91,8 @@ class LocalPartitionWriter : public PartitionWriter {

void init();

arrow::Status writeIndexFile();

arrow::Status requestSpill(bool isFinal);

arrow::Status finishSpill();
Expand All @@ -102,12 +105,17 @@ class LocalPartitionWriter : public PartitionWriter {

arrow::Status writeCachedPayloads(uint32_t partitionId, arrow::io::OutputStream* os) const;

arrow::Status flushCachedPayloads();

arrow::Status writeMemoryPayload(uint32_t partitionId, std::unique_ptr<InMemoryPayload> inMemoryPayload);

arrow::Status clearResource();

arrow::Status populateMetrics(ShuffleWriterMetrics* metrics);

std::shared_ptr<LocalPartitionWriterOptions> options_;
std::string dataFile_;
std::string indexFile_;
std::vector<std::string> localDirs_;

bool stopped_{false};
Expand All @@ -128,6 +136,11 @@ class LocalPartitionWriter : public PartitionWriter {
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;

bool usePartitionMultipleSegments_{false};
// For each partition, record all segments' (start, length) in the final data file.
// partitionSegments_[pid] = [(start1, length1), (start2, length2), ...]
std::vector<std::vector<std::pair<int64_t, int64_t>>> partitionSegments_{};

int32_t lastEvictPid_{-1};
};
} // namespace gluten
7 changes: 7 additions & 0 deletions gluten-arrow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-substrait</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>${sparkshim.artifactId}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public static JniByteInputStream create(InputStream in) {
if (LowCopyFileSegmentJniByteInputStream.isSupported(unwrapped)) {
return new LowCopyFileSegmentJniByteInputStream(in);
}
if (LowCopyFileSegmentsJniByteInputStream.isSupported(unwrapped)) {
return new LowCopyFileSegmentsJniByteInputStream(in);
}
return new OnHeapJniByteInputStream(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public native long createPartitionWriter(
int subDirsPerLocalDir,
int shuffleFileBufferSize,
String dataFile,
String indexFile,
String localDirs,
boolean enableDictionary);
}
Loading
Loading