From a3f34e9aadd2c94fcc586c8ec61480ceeac23eed Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 11 Mar 2026 05:23:30 +0800 Subject: [PATCH] Add a config to disable R2C Host retry (#14373) Contributes to https://github.com/NVIDIA/spark-rapids/issues/14368 ### Description This pr add a config `spark.rapids.sql.rowToColumnar.retry.enabled` to disable the per-row retry introduced in #13842, which caused performance issues in some cases. Also added a test. ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [x] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. --------- Signed-off-by: Haoyang Li Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Sameer Raheja --- .../spark/rapids/GpuRowToColumnarExec.scala | 37 ++++++---- .../com/nvidia/spark/rapids/RapidsConf.scala | 11 +++ .../ParquetCachedBatchSerializer.scala | 3 +- .../RowToColumnarIteratorRetrySuite.scala | 72 +++++++++++++++++-- 4 files changed, 105 insertions(+), 18 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index a35d370b647..60303ed50c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -578,6 +578,7 @@ class RowToColumnarIterator( localGoal: CoalesceSizeGoal, batchSizeBytes: Long, converters: GpuRowToColumnConverter, + enableRetry: Boolean = false, numInputRows: GpuMetric = NoopMetric, numOutputRows: GpuMetric = NoopMetric, numOutputBatches: GpuMetric = NoopMetric, @@ -608,7 +609,6 @@ class RowToColumnarIterator( if (localSchema.fields.isEmpty) { // if there are no columns then we just default to a small number // of rows for the first batch - // TODO do we even need to allocate anything here? targetRows = 1024 initialRows = targetRows } else { @@ -623,18 +623,30 @@ class RowToColumnarIterator( var rowCount = 0 // Double because validity can be < 1 byte, and this is just an estimate anyways var byteCount: Double = 0 - val converter = new RetryableRowConverter(builders, rowCopyProjection) - // read at least one row - while (rowIter.hasNext && - (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { - converter.attempt(rowIter.next()) - val bytesWritten = withRetryNoSplit { - withRestoreOnRetry(converter) { - converters.convert(converter.currentRow, builders) + + if (enableRetry) { + val converter = new RetryableRowConverter(builders, rowCopyProjection) + // read at least one row + while (rowIter.hasNext && + (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { + converter.attempt(rowIter.next()) + val bytesWritten = withRetryNoSplit { + withRestoreOnRetry(converter) { + converters.convert(converter.currentRow, builders) + } } + byteCount += bytesWritten + rowCount += 1 + } + } else { + // Disabling R2C retry only removes the per-row retry wrapper around convert(). + // The final builders.tryBuild(rowCount) call below still uses withRetryNoSplit. + while (rowIter.hasNext && + (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { + val row = rowIter.next() + byteCount += converters.convert(row, builders) + rowCount += 1 } - byteCount += bytesWritten - rowCount += 1 } // enforce RequireSingleBatch limit @@ -990,8 +1002,9 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal) val converters = new GpuRowToColumnConverter(localSchema) val conf = new RapidsConf(child.conf) val batchSizeBytes = conf.gpuTargetBatchSizeBytes + val enableR2cRetry = conf.isR2cRetryEnabled rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter, - localSchema, localGoal, batchSizeBytes, converters, + localSchema, localGoal, batchSizeBytes, converters, enableR2cRetry, numInputRows, numOutputRows, numOutputBatches, streamTime, opTime)) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a0412fad91f..2ca50eaeff2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -438,6 +438,15 @@ object RapidsConf extends Logging { .integerConf .createWithDefault(2) + val ENABLE_R2C_RETRY = conf("spark.rapids.sql.rowToColumnar.retry.enabled") + .doc("When true, the row-to-columnar conversion wraps each row's conversion with " + + "retry logic so that host OOM during conversion can be recovered. This adds a small " + + "per-row overhead. When false (default), the retry is disabled to avoid that overhead, " + + "at the risk of failing the task on host OOM during R2C conversion.") + .internal() + .booleanConf + .createWithDefault(false) + val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir") .doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " + "an exception. The URI can reference a distributed filesystem. The filename will be of the " + @@ -3315,6 +3324,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gpuOomMaxRetries: Int = get(GPU_OOM_MAX_RETRIES) + lazy val isR2cRetryEnabled: Boolean = get(ENABLE_R2C_RETRY) + lazy val gpuCoreDumpDir: Option[String] = get(GPU_COREDUMP_DIR) lazy val gpuCoreDumpPipePattern: String = get(GPU_COREDUMP_PIPE_PATTERN) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala index 6f8ef5f3670..3aa42fb9a8d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala @@ -1335,9 +1335,10 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { val structSchema = schemaWithUnambiguousNames.toStructType val converters = new GpuRowToColumnConverter(structSchema) val batchSizeBytes = rapidsConf.gpuTargetBatchSizeBytes + val enableR2cRetry = rapidsConf.isR2cRetryEnabled val columnarBatchRdd = input.mapPartitions(iter => { new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, batchSizeBytes, - converters) + converters, enableR2cRetry) }) columnarBatchRdd.flatMap(cb => { withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala index 042281875a1..37d1c63ff4d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,34 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark} +import com.nvidia.spark.rapids.jni.{CpuRetryOOM, GpuSplitAndRetryOOM, RmmSpark} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.types._ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { private val schema = StructType(Seq(StructField("a", IntegerType))) private val batchSize = 1 * 1024 * 1024 * 1024 + private def rowThatFailsOnceWithCpuRetryOOM(value: Int): InternalRow = { + var failed = false + new GenericInternalRow(Array[Any](value.asInstanceOf[AnyRef])) { + override def getInt(ordinal: Int): Int = { + if (!failed) { + failed = true + throw new CpuRetryOOM("Injected row conversion failure") + } + super.getInt(ordinal) + } + } + } + test("test simple GPU OOM retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) Arm.withResource(row2ColIter.next()) { batch => @@ -39,7 +54,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { test("test simple CPU OOM retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) // Inject CPU OOM after skipping the first few CPU allocations. The skipCount ensures // the OOM is thrown at a point where our retry logic can handle it (during row conversion, // after builder state has been captured). @@ -50,10 +66,56 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { } } + test("test CPU OOM retry preserves all rows for non-RequireSingleBatch") { + val totalRows = 10 + val rowIter: Iterator[InternalRow] = (1 to totalRows).map(InternalRow(_)).toIterator + val goal = TargetSize(batchSize) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, goal, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) + // Inject a CPU OOM during conversion and verify that retry still produces + // the complete set of rows when the iterator is allowed to emit multiple batches. + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1, + RmmSpark.OomInjectionType.CPU.ordinal, 3) + var totalRowsSeen = 0 + while (row2ColIter.hasNext) { + Arm.withResource(row2ColIter.next()) { batch => + totalRowsSeen += batch.numRows() + } + } + assertResult(totalRows)(totalRowsSeen) + } + + test("test simple CPU OOM without retry fails") { + val rowIter: Iterator[InternalRow] = + Iterator(rowThatFailsOnceWithCpuRetryOOM(1)) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = false) + // Use a row that throws CpuRetryOOM from getInt() on first access to verify + // that a conversion-time CpuRetryOOM propagates directly to the caller when + // per-row retry is disabled, instead of accidentally testing builders.tryBuild(). + assertThrows[CpuRetryOOM] { + row2ColIter.next() + } + } + + test("test simple CPU OOM with retry recovers") { + val rowIter: Iterator[InternalRow] = + Iterator(rowThatFailsOnceWithCpuRetryOOM(1)) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) + Arm.withResource(row2ColIter.next()) { batch => + assertResult(1)(batch.numRows()) + } + } + test("test simple OOM split and retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) assertThrows[GpuSplitAndRetryOOM] {