Support from_protobuf expression#14354
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review |
Greptile SummaryThis PR adds GPU acceleration for Spark's The prior review rounds addressed an extensive set of critical bugs (proto3 acceptance on reflection failure, Key remaining items:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Catalyst as Catalyst Optimizer
participant Shim as ProtobufExprShims (tagExprForGpu)
participant Compat as SparkProtobufCompat
participant Extractor as ProtobufSchemaExtractor
participant Validator as ProtobufSchemaValidator
participant GPU as GpuFromProtobuf (doColumnar)
participant JNI as Protobuf.decodeToStruct (JNI)
Catalyst->>Shim: tagExprForGpu(ProtobufDataToCatalyst)
Shim->>Compat: extractExprInfo(expr) → ProtobufExprInfo
Compat-->>Shim: messageName, descriptorSource, options
Shim->>Compat: resolveMessageDescriptor(exprInfo) → ProtobufMessageDescriptor
Compat-->>Shim: ReflectiveMessageDescriptor (via reflection)
Shim->>Extractor: analyzeAllFields(schema, msgDesc, enumsAsInts)
Extractor-->>Shim: Map[fieldName → ProtobufFieldInfo]
Shim->>Shim: analyzeRequiredFields() → Set[requiredFieldNames]
Note over Shim: Schema pruning: only required fields decoded
Shim->>Shim: registerPrunedOrdinals() on GetStructField/GetArrayStructFields
Note over Shim: PRUNED_ORDINAL_TAG set on downstream extractors
loop for each required field
Shim->>Validator: toFlattenedFieldDescriptor(path, field, info)
Validator-->>Shim: FlattenedFieldDescriptor
end
Shim->>Validator: validateFlattenedSchema(flatFields)
Shim->>Shim: convertToGpu() → GpuFromProtobuf
Catalyst->>GPU: doColumnar(inputBinaryColumn)
GPU->>JNI: Protobuf.decodeToStruct(input, ProtobufSchemaDescriptor, failOnErrors)
JNI-->>GPU: cudf.ColumnVector (struct)
GPU->>GPU: mergeAndSetValidity (apply input nulls)
GPU-->>Catalyst: decoded StructType column
|
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFromProtobuf.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile please check again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile please check again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile please check again |
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFromProtobuf.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile please check again |
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaValidator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFromProtobuf.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaModel.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala
Show resolved
Hide resolved
|
@greptile review |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaExtractor.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
| } | ||
|
|
There was a problem hiding this comment.
addFieldWithChildren passes wrong parentIdx for doubly-nested structs
In addChildFieldsFromStruct, every child addFieldWithChildren call passes parentIdx — the parent index received by addChildFieldsFromStruct — as the parent for all children of the struct. This is correct for one level of nesting, but once addFieldWithChildren is called recursively for a deeper struct child, it calls back into addChildFieldsFromStruct passing currentIdx as the new parentIdx. Inside that recursive call to addChildFieldsFromStruct, the children's grandchildren then receive currentIdx (the deep struct's own flat-schema index) as their parent, which is correct.
However, the issue is that addFieldWithChildren calls addChildFieldsFromStruct(st, nestedMsgDesc, sf.name, currentIdx, depth, pathPrefix) — it passes nestedMsgDesc (the parent message descriptor passed in from the outer addChildFieldsFromStruct) as the parentMsgDesc of the child struct, not sf's own message descriptor. Then inside the recursive addChildFieldsFromStruct, it calls parentMsgDesc.findField(fieldName) using the outer message descriptor to find sf, which is correct. But then it uses parentField.get.messageDescriptor to get sf's own message type for looking up its children — this is also correct.
Wait — actually the concern is that when the code does:
case ArrayType(st: StructType, _) if nestedMsgDesc != null =>
addChildFieldsFromStruct(st, nestedMsgDesc, sf.name, currentIdx, depth, pathPrefix)nestedMsgDesc is the parent container's descriptor (not sf's message descriptor). addChildFieldsFromStruct then does parentMsgDesc.findField(sf.name) to locate sf within its parent, which is correct. But this means nestedMsgDesc must be the descriptor of the message that contains sf, not sf's own message descriptor. For top-level calls this is msgDesc (the root), which is correct. For recursive calls from addChildFieldsFromStruct, childMsgDesc is passed to addFieldWithChildren — but that childMsgDesc is sf's own message descriptor, yet it's passed as nestedMsgDesc which is then used as the container descriptor for sf's children. This causes addChildFieldsFromStruct to call childMsgDesc.findField(childSf.name) where childMsgDesc is sf's descriptor — which is correct.
This is actually fine, but the recursive semantics of nestedMsgDesc are counter-intuitive: on each level it carries the descriptor of the current struct (to find its own fields), not the parent's. This works because addFieldWithChildren immediately passes the same nestedMsgDesc downward as the descriptor of the struct it just emitted. For correctness this is OK, but the parameter is named confusingly. Consider renaming nestedMsgDesc to containingMsgDesc and adding a clarifying comment to avoid future mistakes.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| @@ -76,7 +121,7 @@ class GpuProjectExecMeta( | |||
| } | |||
There was a problem hiding this comment.
ENABLE_PROTOBUF_BATCH_MERGE_AFTER_PROJECT defaults to false, silently disabling the post-project coalesce optimization
The config's default is false and is marked internal(). This means users who deploy protobuf decoding with schema pruning will not benefit from the post-project coalesce even though it is the correct behavior to request a coalesce after producing small projected batches. The intent of this config appears to be a safety valve rather than a permanent opt-in gate.
Consider whether the default should be true once the feature is considered stable, and document in the config's doc string what conditions must hold before enabling it, so operators know when it is safe to flip. Otherwise, production deployments will silently miss the optimization without any indication in the logs that it is disabled.
| isRequired = fieldDescriptor.isRequired, | ||
| defaultValue = defaultValue, | ||
| enumMetadata = fieldDescriptor.enumMetadata, | ||
| isRepeated = fieldDescriptor.isRepeated | ||
| ) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
extractFieldInfo silently drops the checkFieldSupport unsupported reason when defaultValueResult is Left
When checkFieldSupport returns (false, Some("type mismatch: ..."), ...) but fieldDescriptor.defaultValueResult is also Left("reflection failure for default value"), the function returns Left("reflection failure for default value"). In analyzeAllFields, this Left becomes the unsupportedReason in unsupportedFieldInfo, discarding the type-mismatch reason entirely.
While not a correctness bug (the field is still correctly marked unsupported), the error message surfaced to the user will say "reflection failure" instead of the actionable "type mismatch: Spark X vs Protobuf Y", which hinders debugging.
Consider building the ProtobufFieldInfo directly with the known isSupported/unsupportedReason from checkFieldSupport before inspecting defaultValueResult, so the primary unsupported reason is never lost:
val (isSupported, unsupportedReason, encoding) = checkFieldSupport(...)
val defaultValue = fieldDescriptor.defaultValueResult match {
case Right(v) => v
case Left(err) if !isSupported => None // ignore, field is already unsupported
case Left(err) => return Left(err) // only propagate if the field was otherwise OK
}
Right(ProtobufFieldInfo(..., isSupported = isSupported, ...))| val runtimeOrd = orderedChildren.indexOf(childName) | ||
| if (runtimeOrd >= 0) { | ||
| gsf.setTagValue(ProtobufExprShims.PRUNED_ORDINAL_TAG, runtimeOrd) | ||
| } | ||
| } | ||
| } | ||
| case Some(parentPath) if parentPath.isEmpty => | ||
| val runtimeOrd = topLevelIndices.indexOf(ordinal) | ||
| if (runtimeOrd >= 0) { | ||
| gsf.setTagValue(ProtobufExprShims.PRUNED_ORDINAL_TAG, runtimeOrd) | ||
| } | ||
| case _ => | ||
| } | ||
| case gasf @ GetArrayStructFields(childExpr, field, _, _, _) => |
There was a problem hiding this comment.
analyzeRequiredFields short-circuits on collectedExprs.isEmpty even when fieldReqs is already populated
The guard condition:
if (!safeToPrune || collectedExprs.isEmpty || hasDirectStructRef || fieldReqs.isEmpty) {evaluates collectedExprs.isEmpty before fieldReqs.isEmpty. Because fieldReqs is populated solely via collectStructFieldReferences calls on the expressions in collectedExprs, it is impossible for collectedExprs to be empty while fieldReqs is non-empty. The check is harmless but misleading — it could suggest that collecting expressions and populating requirements are independent paths. Consider simplifying to just check fieldReqs.isEmpty (which subsumes the collectedExprs.isEmpty case), and adding a comment explaining that an empty fieldReqs covers both "no expressions found" and "no protobuf fields referenced".
Fixes #14069.
Description
This PR is a huge PR to support a (big) subset in from_protobuf expression.
I will add documents, performance numbers, and other informations in this PR very soon.
I suppose this PR will be split into smaller ones that will be merged over time.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)