diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala index 81c9870a5260..4bc20142a166 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala @@ -164,4 +164,48 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite { Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan)) } } + + test("test write parquet with custom block size, block rows and page size") { + val blockSize = 64 * 1024 * 1024L // 64MB + val blockRows = 10000000L // 10M + val pageSize = 1024 * 1024L // 1MB + + withTempPath { + f => + spark + .range(100) + .toDF("id") + .write + .format("parquet") + .option(GlutenConfig.PARQUET_BLOCK_SIZE, blockSize.toString) + .option(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows.toString) + .option(GlutenConfig.PARQUET_DATAPAGE_SIZE, pageSize.toString) + .save(f.getCanonicalPath) + + val parquetDf = spark.read.parquet(f.getCanonicalPath) + checkAnswer(parquetDf, spark.range(100).toDF("id")) + } + } + + test("test write parquet with block size, block rows and page size exceeding INT_MAX") { + val largeBlockSize = 3L * 1024 * 1024 * 1024 // 3GB + val largeBlockRows = 3L * 1000 * 1000 * 1000 // 3 billion + val largePageSize = 3L * 1024 * 1024 * 1024 // 3GB + + withTempPath { + f => + spark + .range(100) + .toDF("id") + .write + .format("parquet") + .option(GlutenConfig.PARQUET_BLOCK_SIZE, largeBlockSize.toString) + .option(GlutenConfig.PARQUET_BLOCK_ROWS, largeBlockRows.toString) + .option(GlutenConfig.PARQUET_DATAPAGE_SIZE, largePageSize.toString) + .save(f.getCanonicalPath) + + val parquetDf = spark.read.parquet(f.getCanonicalPath) + checkAnswer(parquetDf, spark.range(100).toDF("id")) + } + } } diff --git a/cpp/velox/utils/VeloxWriterUtils.cc b/cpp/velox/utils/VeloxWriterUtils.cc index fc4b810fb8f5..50e4ca601e16 100644 --- a/cpp/velox/utils/VeloxWriterUtils.cc +++ b/cpp/velox/utils/VeloxWriterUtils.cc @@ -42,10 +42,10 @@ std::unique_ptr makeParquetWriteOption(const std::unordered_map(stoi(it->second)); + maxRowGroupBytes = std::stoll(it->second); } if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) { - maxRowGroupRows = static_cast(stoi(it->second)); + maxRowGroupRows = std::stoll(it->second); } auto writeOption = std::make_unique(); writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/; @@ -93,7 +93,7 @@ std::unique_ptr makeParquetWriteOption(const std::unordered_maparrowMemoryPool = getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool"); if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) { - auto dataPageSize = std::stoi(it->second); + auto dataPageSize = std::stoll(it->second); writeOption->dataPageSize = dataPageSize; } if (auto it = sparkConfs.find(kParquetWriterVersion); it != sparkConfs.end()) {