Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ class RowToColumnarIterator(
localGoal: CoalesceSizeGoal,
batchSizeBytes: Long,
converters: GpuRowToColumnConverter,
enableRetry: Boolean = false,
numInputRows: GpuMetric = NoopMetric,
numOutputRows: GpuMetric = NoopMetric,
numOutputBatches: GpuMetric = NoopMetric,
Expand Down Expand Up @@ -608,7 +609,6 @@ class RowToColumnarIterator(
if (localSchema.fields.isEmpty) {
// if there are no columns then we just default to a small number
// of rows for the first batch
// TODO do we even need to allocate anything here?
targetRows = 1024
initialRows = targetRows
} else {
Expand All @@ -623,18 +623,30 @@ class RowToColumnarIterator(
var rowCount = 0
// Double because validity can be < 1 byte, and this is just an estimate anyways
var byteCount: Double = 0
val converter = new RetryableRowConverter(builders, rowCopyProjection)
// read at least one row
while (rowIter.hasNext &&
(rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) {
converter.attempt(rowIter.next())
val bytesWritten = withRetryNoSplit {
withRestoreOnRetry(converter) {
converters.convert(converter.currentRow, builders)

if (enableRetry) {
val converter = new RetryableRowConverter(builders, rowCopyProjection)
// read at least one row
while (rowIter.hasNext &&
(rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) {
converter.attempt(rowIter.next())
val bytesWritten = withRetryNoSplit {
withRestoreOnRetry(converter) {
converters.convert(converter.currentRow, builders)
}
}
byteCount += bytesWritten
rowCount += 1
}
} else {
// Disabling R2C retry only removes the per-row retry wrapper around convert().
// The final builders.tryBuild(rowCount) call below still uses withRetryNoSplit.
while (rowIter.hasNext &&
(rowCount == 0 || rowCount < targetRows && byteCount < targetSizeBytes)) {
val row = rowIter.next()
byteCount += converters.convert(row, builders)
rowCount += 1
}
byteCount += bytesWritten
rowCount += 1
}

// enforce RequireSingleBatch limit
Expand Down Expand Up @@ -990,8 +1002,9 @@ case class GpuRowToColumnarExec(child: SparkPlan, goal: CoalesceSizeGoal)
val converters = new GpuRowToColumnConverter(localSchema)
val conf = new RapidsConf(child.conf)
val batchSizeBytes = conf.gpuTargetBatchSizeBytes
val enableR2cRetry = conf.isR2cRetryEnabled
rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,
localSchema, localGoal, batchSizeBytes, converters,
localSchema, localGoal, batchSizeBytes, converters, enableR2cRetry,
numInputRows, numOutputRows, numOutputBatches, streamTime, opTime))
}
}
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ object RapidsConf extends Logging {
.integerConf
.createWithDefault(2)

val ENABLE_R2C_RETRY = conf("spark.rapids.sql.rowToColumnar.retry.enabled")
.doc("When true, the row-to-columnar conversion wraps each row's conversion with " +
"retry logic so that host OOM during conversion can be recovered. This adds a small " +
"per-row overhead. When false (default), the retry is disabled to avoid that overhead, " +
"at the risk of failing the task on host OOM during R2C conversion.")
.internal()
.booleanConf
.createWithDefault(false)

val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir")
.doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " +
"an exception. The URI can reference a distributed filesystem. The filename will be of the " +
Expand Down Expand Up @@ -3315,6 +3324,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val gpuOomMaxRetries: Int = get(GPU_OOM_MAX_RETRIES)

lazy val isR2cRetryEnabled: Boolean = get(ENABLE_R2C_RETRY)

lazy val gpuCoreDumpDir: Option[String] = get(GPU_COREDUMP_DIR)

lazy val gpuCoreDumpPipePattern: String = get(GPU_COREDUMP_PIPE_PATTERN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,9 +1335,10 @@ class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer {
val structSchema = schemaWithUnambiguousNames.toStructType
val converters = new GpuRowToColumnConverter(structSchema)
val batchSizeBytes = rapidsConf.gpuTargetBatchSizeBytes
val enableR2cRetry = rapidsConf.isR2cRetryEnabled
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, batchSizeBytes,
converters)
converters, enableR2cRetry)
})
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
* Copyright (c) 2023-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,19 +16,34 @@

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, GpuSplitAndRetryOOM, RmmSpark}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types._

class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
private val schema = StructType(Seq(StructField("a", IntegerType)))
private val batchSize = 1 * 1024 * 1024 * 1024

private def rowThatFailsOnceWithCpuRetryOOM(value: Int): InternalRow = {
var failed = false
new GenericInternalRow(Array[Any](value.asInstanceOf[AnyRef])) {
override def getInt(ordinal: Int): Int = {
if (!failed) {
failed = true
throw new CpuRetryOOM("Injected row conversion failure")
}
super.getInt(ordinal)
}
}
}

test("test simple GPU OOM retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema))
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = true)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
Arm.withResource(row2ColIter.next()) { batch =>
Expand All @@ -39,7 +54,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
test("test simple CPU OOM retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema))
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = true)
// Inject CPU OOM after skipping the first few CPU allocations. The skipCount ensures
// the OOM is thrown at a point where our retry logic can handle it (during row conversion,
// after builder state has been captured).
Expand All @@ -50,10 +66,56 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
}
}

test("test CPU OOM retry preserves all rows for non-RequireSingleBatch") {
val totalRows = 10
val rowIter: Iterator[InternalRow] = (1 to totalRows).map(InternalRow(_)).toIterator
val goal = TargetSize(batchSize)
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, goal, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = true)
// Inject a CPU OOM during conversion and verify that retry still produces
// the complete set of rows when the iterator is allowed to emit multiple batches.
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 3)
var totalRowsSeen = 0
while (row2ColIter.hasNext) {
Arm.withResource(row2ColIter.next()) { batch =>
totalRowsSeen += batch.numRows()
}
}
assertResult(totalRows)(totalRowsSeen)
}

test("test simple CPU OOM without retry fails") {
val rowIter: Iterator[InternalRow] =
Iterator(rowThatFailsOnceWithCpuRetryOOM(1))
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = false)
// Use a row that throws CpuRetryOOM from getInt() on first access to verify
// that a conversion-time CpuRetryOOM propagates directly to the caller when
// per-row retry is disabled, instead of accidentally testing builders.tryBuild().
assertThrows[CpuRetryOOM] {
row2ColIter.next()
}
}

test("test simple CPU OOM with retry recovers") {
val rowIter: Iterator[InternalRow] =
Iterator(rowThatFailsOnceWithCpuRetryOOM(1))
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = true)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(1)(batch.numRows())
}
}

test("test simple OOM split and retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema))
rowIter, schema, RequireSingleBatch, batchSize, new GpuRowToColumnConverter(schema),
enableRetry = true)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
Expand Down
Loading