Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,48 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
}
}

test("test write parquet with custom block size, block rows and page size") {
val blockSize = 64 * 1024 * 1024L // 64MB
val blockRows = 10000000L // 10M
val pageSize = 1024 * 1024L // 1MB

withTempPath {
f =>
spark
.range(100)
.toDF("id")
.write
.format("parquet")
.option(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize.toString)
.option(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows.toString)
.option(GlutenConfig.PARQUET_DATAPAGE_SIZE, pageSize.toString)
.save(f.getCanonicalPath)

val parquetDf = spark.read.parquet(f.getCanonicalPath)
checkAnswer(parquetDf, spark.range(100).toDF("id"))
}
}

test("test write parquet with block size, block rows and page size exceeding INT_MAX") {
val largeBlockSize = 3L * 1024 * 1024 * 1024 // 3GB
val largeBlockRows = 3L * 1000 * 1000 * 1000 // 3 billion
val largePageSize = 3L * 1024 * 1024 * 1024 // 3GB

withTempPath {
f =>
spark
.range(100)
.toDF("id")
.write
.format("parquet")
.option(GlutenConfig.PARQUET_BLOCK_SIZE, largeBlockSize.toString)
.option(GlutenConfig.PARQUET_BLOCK_ROWS, largeBlockRows.toString)
.option(GlutenConfig.PARQUET_DATAPAGE_SIZE, largePageSize.toString)
.save(f.getCanonicalPath)

val parquetDf = spark.read.parquet(f.getCanonicalPath)
checkAnswer(parquetDf, spark.range(100).toDF("id"))
}
}
}
6 changes: 3 additions & 3 deletions cpp/velox/utils/VeloxWriterUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const std::unordered_map<s
int64_t maxRowGroupBytes = 134217728; // 128MB
int64_t maxRowGroupRows = 100000000; // 100M
if (auto it = sparkConfs.find(kParquetBlockSize); it != sparkConfs.end()) {
maxRowGroupBytes = static_cast<int64_t>(stoi(it->second));
maxRowGroupBytes = std::stoll(it->second);
}
if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) {
maxRowGroupRows = static_cast<int64_t>(stoi(it->second));
maxRowGroupRows = std::stoll(it->second);
}
auto writeOption = std::make_unique<WriterOptions>();
writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/;
Expand Down Expand Up @@ -93,7 +93,7 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const std::unordered_map<s
writeOption->arrowMemoryPool =
getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool");
if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) {
auto dataPageSize = std::stoi(it->second);
auto dataPageSize = std::stoll(it->second);
writeOption->dataPageSize = dataPageSize;
}
if (auto it = sparkConfs.find(kParquetWriterVersion); it != sparkConfs.end()) {
Expand Down