diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index a35d370b647..60303ed50c9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -578,6 +578,7 @@ class RowToColumnarIterator( localGoal: CoalesceSizeGoal, batchSizeBytes: Long, converters: GpuRowToColumnConverter, + enableRetry: Boolean = false, numInputRows: GpuMetric = NoopMetric, numOutputRows: GpuMetric = NoopMetric, numOutputBatches: GpuMetric = NoopMetric, @@ -608,7 +609,6 @@ class RowToColumnarIterator( if (localSchema.fields.isEmpty) { // if there are no columns then we just default to a small number // of rows for the first batch - // TODO do we even need to allocate anything here? targetRows = 1024 initialRows = targetRows } else { @@ -623,18 +623,30 @@ class RowToColumnarIterator( var rowCount = 0 // Double because validity can be < 1 byte, and this is just an estimate anyways var byteCount: Double = 0 - val converter = new RetryableRowConverter(builders, rowCopyProjection) - // read at least one row - while (rowIter.hasNext && - (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { - converter.attempt(rowIter.next()) - val bytesWritten = withRetryNoSplit { - withRestoreOnRetry(converter) { - converters.convert(converter.currentRow, builders) + + if (enableRetry) { + val converter = new RetryableRowConverter(builders, rowCopyProjection) + // read at least one row + while (rowIter.hasNext && + (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { + converter.attempt(rowIter.next()) + val bytesWritten = withRetryNoSplit { + withRestoreOnRetry(converter) { + converters.convert(converter.currentRow, builders) + } } + byteCount += bytesWritten + rowCount += 1 + } + } else { + // Disabling R2C retry only removes the per-row retry wrapper around convert(). + // The final builders.tryBuild(rowCount) call below still uses withRetryNoSplit. + while (rowIter.hasNext && + (rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) { + val row = rowIter.next() + byteCount += converters.convert(row, builders) + rowCount += 1 } - byteCount += bytesWritten - rowCount += 1 } // enforce RequireSingleBatch limit @@ -990,8 +1002,9 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal) val converters = new GpuRowToColumnConverter(localSchema) val conf = new RapidsConf(child.conf) val batchSizeBytes = conf.gpuTargetBatchSizeBytes + val enableR2cRetry = conf.isR2cRetryEnabled rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter, - localSchema, localGoal, batchSizeBytes, converters, + localSchema, localGoal, batchSizeBytes, converters, enableR2cRetry, numInputRows, numOutputRows, numOutputBatches, streamTime, opTime)) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a0412fad91f..2ca50eaeff2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -438,6 +438,15 @@ object RapidsConf extends Logging { .integerConf .createWithDefault(2) + val ENABLE_R2C_RETRY = conf("spark.rapids.sql.rowToColumnar.retry.enabled") + .doc("When true, the row-to-columnar conversion wraps each row's conversion with " + + "retry logic so that host OOM during conversion can be recovered. This adds a small " + + "per-row overhead. When false (default), the retry is disabled to avoid that overhead, " + + "at the risk of failing the task on host OOM during R2C conversion.") + .internal() + .booleanConf + .createWithDefault(false) + val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir") .doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " + "an exception. The URI can reference a distributed filesystem. The filename will be of the " + @@ -3315,6 +3324,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gpuOomMaxRetries: Int = get(GPU_OOM_MAX_RETRIES) + lazy val isR2cRetryEnabled: Boolean = get(ENABLE_R2C_RETRY) + lazy val gpuCoreDumpDir: Option[String] = get(GPU_COREDUMP_DIR) lazy val gpuCoreDumpPipePattern: String = get(GPU_COREDUMP_PIPE_PATTERN) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala index 6f8ef5f3670..3aa42fb9a8d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/parquet/ParquetCachedBatchSerializer.scala @@ -1335,9 +1335,10 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer { val structSchema = schemaWithUnambiguousNames.toStructType val converters = new GpuRowToColumnConverter(structSchema) val batchSizeBytes = rapidsConf.gpuTargetBatchSizeBytes + val enableR2cRetry = rapidsConf.isR2cRetryEnabled val columnarBatchRdd = input.mapPartitions(iter => { new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, batchSizeBytes, - converters) + converters, enableR2cRetry) }) columnarBatchRdd.flatMap(cb => { withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala index 042281875a1..37d1c63ff4d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RowToColumnarIteratorRetrySuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,34 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark} +import com.nvidia.spark.rapids.jni.{CpuRetryOOM, GpuSplitAndRetryOOM, RmmSpark} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.types._ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { private val schema = StructType(Seq(StructField("a", IntegerType))) private val batchSize = 1 * 1024 * 1024 * 1024 + private def rowThatFailsOnceWithCpuRetryOOM(value: Int): InternalRow = { + var failed = false + new GenericInternalRow(Array[Any](value.asInstanceOf[AnyRef])) { + override def getInt(ordinal: Int): Int = { + if (!failed) { + failed = true + throw new CpuRetryOOM("Injected row conversion failure") + } + super.getInt(ordinal) + } + } + } + test("test simple GPU OOM retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) Arm.withResource(row2ColIter.next()) { batch => @@ -39,7 +54,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { test("test simple CPU OOM retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) // Inject CPU OOM after skipping the first few CPU allocations. The skipCount ensures // the OOM is thrown at a point where our retry logic can handle it (during row conversion, // after builder state has been captured). @@ -50,10 +66,56 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase { } } + test("test CPU OOM retry preserves all rows for non-RequireSingleBatch") { + val totalRows = 10 + val rowIter: Iterator[InternalRow] = (1 to totalRows).map(InternalRow(_)).toIterator + val goal = TargetSize(batchSize) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, goal, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) + // Inject a CPU OOM during conversion and verify that retry still produces + // the complete set of rows when the iterator is allowed to emit multiple batches. + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1, + RmmSpark.OomInjectionType.CPU.ordinal, 3) + var totalRowsSeen = 0 + while (row2ColIter.hasNext) { + Arm.withResource(row2ColIter.next()) { batch => + totalRowsSeen += batch.numRows() + } + } + assertResult(totalRows)(totalRowsSeen) + } + + test("test simple CPU OOM without retry fails") { + val rowIter: Iterator[InternalRow] = + Iterator(rowThatFailsOnceWithCpuRetryOOM(1)) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = false) + // Use a row that throws CpuRetryOOM from getInt() on first access to verify + // that a conversion-time CpuRetryOOM propagates directly to the caller when + // per-row retry is disabled, instead of accidentally testing builders.tryBuild(). + assertThrows[CpuRetryOOM] { + row2ColIter.next() + } + } + + test("test simple CPU OOM with retry recovers") { + val rowIter: Iterator[InternalRow] = + Iterator(rowThatFailsOnceWithCpuRetryOOM(1)) + val row2ColIter = new RowToColumnarIterator( + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) + Arm.withResource(row2ColIter.next()) { batch => + assertResult(1)(batch.numRows()) + } + } + test("test simple OOM split and retry") { val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator val row2ColIter = new RowToColumnarIterator( - rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema)) + rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema), + enableRetry = true) RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) assertThrows[GpuSplitAndRetryOOM] {