From 90d1016bfe62b67a41370b06cb66b08cff567240 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:03:05 +0800 Subject: [PATCH 1/3] [Iceberg] Support nested column paths in keep/drop configuration This change fixes the validation logic in IcebergScanConfig to support nested column paths using dot notation (e.g., "data.name"). Previously, the validation only checked top-level column names, causing nested paths like "colA.colB" to fail with "unknown field(s)" error. The fix uses Iceberg's Schema.findField() which natively resolves dot-notation paths for nested fields. Fixes #37486 --- .../sdk/io/iceberg/IcebergScanConfig.java | 4 +++- .../sdk/io/iceberg/IcebergIOReadTest.java | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 3829baa43665..51591e2f8476 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -327,7 +327,9 @@ void validate(Table table) { param = "drop"; fieldsSpecified = newHashSet(checkNotNull(drop)); } - table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name())); + // Use findField() to support nested column paths (e.g., "colA.colB") + // Iceberg's Schema.findField() resolves dot-notation paths for nested fields + fieldsSpecified.removeIf(name -> table.schema().findField(name) != null); checkArgument( fieldsSpecified.isEmpty(), diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index bb303ea9c305..dd2bf3037888 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -273,6 +273,29 @@ public void testProjectedSchema() { assertTrue(projectKeep.sameSchema(expectedKeep)); } + @Test + public void testNestedColumnPruningValidation() { + // Test that nested column paths (dot notation) are accepted in keep/drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test that nested column path "data.name" is valid and can be selected + org.apache.iceberg.Schema projectNestedKeep = + resolveSchema(schemaWithNested, asList("id", "data.name"), null); + + // Verify the projected schema contains the nested field + assertTrue(projectNestedKeep.findField("id") != null); + assertTrue(projectNestedKeep.findField("data.name") != null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = From 4b938659176a311d4db5fe989936d280aa2d0568 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:40:21 +0800 Subject: [PATCH 2/3] [Iceberg] Also fix drop functionality for nested column paths - Use TypeUtil.indexByName() to enumerate all field paths - Only select leaf fields to prevent parent struct from including dropped children - Add test for nested drop validation --- .../sdk/io/iceberg/IcebergScanConfig.java | 27 ++++++++++++++++--- .../sdk/io/iceberg/IcebergIOReadTest.java | 26 ++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 51591e2f8476..011d17c06579 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -93,10 +93,29 @@ static org.apache.iceberg.Schema resolveSchema( if (keep != null && !keep.isEmpty()) { selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { - Set fields = - schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); - drop.forEach(fields::remove); - selectedFieldsBuilder.addAll(fields); + // Get all field paths including nested ones + java.util.List allPaths = new java.util.ArrayList<>( + org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); + java.util.Collections.sort(allPaths); + + // Identify leaf fields only (fields that are not parents of other fields) + // This prevents selecting a parent struct from implicitly including dropped children + java.util.Set leaves = new java.util.HashSet<>(); + for (int i = 0; i < allPaths.size(); i++) { + String path = allPaths.get(i); + // If the next path starts with "path.", then "path" is a parent - skip it + if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) { + continue; + } + leaves.add(path); + } + + // Remove fields that are dropped or are children of dropped fields + for (String d : drop) { + leaves.removeIf(f -> f.equals(d) || f.startsWith(d + ".")); + } + + selectedFieldsBuilder.addAll(leaves); } else { // default: include all columns return schema; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index dd2bf3037888..6e823cb894bf 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -296,6 +296,32 @@ public void testNestedColumnPruningValidation() { assertTrue(projectNestedKeep.findField("data.name") != null); } + @Test + public void testNestedColumnDropValidation() { + // Test that nested column paths work correctly with drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test dropping a nested field "data.name" - should keep id, data.value, metadata + org.apache.iceberg.Schema projectNestedDrop = + resolveSchema(schemaWithNested, null, asList("data.name")); + + // Verify "data.name" is NOT in the projected schema + assertTrue(projectNestedDrop.findField("id") != null); + assertTrue(projectNestedDrop.findField("data.value") != null); + assertTrue(projectNestedDrop.findField("metadata") != null); + // data.name should be dropped + assertTrue(projectNestedDrop.findField("data.name") == null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = From 83c7a3c5dfcc6d6d78b4183bbf9664746a99b0e7 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Fri, 6 Feb 2026 03:43:51 +0800 Subject: [PATCH 3/3] Apply Spotless formatting --- .../org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java | 7 +++---- .../org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java | 6 ++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 011d17c06579..c1a324ad844c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -37,7 +36,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -94,8 +92,9 @@ static org.apache.iceberg.Schema resolveSchema( selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { // Get all field paths including nested ones - java.util.List allPaths = new java.util.ArrayList<>( - org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); + java.util.List allPaths = + new java.util.ArrayList<>( + org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); java.util.Collections.sort(allPaths); // Identify leaf fields only (fields that are not parents of other fields) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 6e823cb894bf..8b1037168037 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -283,8 +283,7 @@ public void testNestedColumnPruningValidation() { 2, "data", StructType.of( - required(3, "name", StringType.get()), - required(4, "value", StringType.get()))), + required(3, "name", StringType.get()), required(4, "value", StringType.get()))), required(5, "metadata", StringType.get())); // Test that nested column path "data.name" is valid and can be selected @@ -306,8 +305,7 @@ public void testNestedColumnDropValidation() { 2, "data", StructType.of( - required(3, "name", StringType.get()), - required(4, "value", StringType.get()))), + required(3, "name", StringType.get()), required(4, "value", StringType.get()))), required(5, "metadata", StringType.get())); // Test dropping a nested field "data.name" - should keep id, data.value, metadata