diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 3829baa43665..c1a324ad844c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -37,7 +36,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -93,10 +91,30 @@ static org.apache.iceberg.Schema resolveSchema( if (keep != null && !keep.isEmpty()) { selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { - Set fields = - schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); - drop.forEach(fields::remove); - selectedFieldsBuilder.addAll(fields); + // Get all field paths including nested ones + java.util.List allPaths = + new java.util.ArrayList<>( + org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); + java.util.Collections.sort(allPaths); + + // Identify leaf fields only (fields that are not parents of other fields) + // This prevents selecting a parent struct from implicitly including dropped children + java.util.Set leaves = new java.util.HashSet<>(); + for (int i = 0; i < allPaths.size(); i++) { + String path = allPaths.get(i); + // If the next path starts with "path.", then "path" is a parent - skip it + if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) { + continue; + } + leaves.add(path); + } + + // Remove fields that are dropped or are children of dropped fields + for (String d : drop) { + leaves.removeIf(f -> f.equals(d) || f.startsWith(d + ".")); + } + + selectedFieldsBuilder.addAll(leaves); } else { // default: include all columns return schema; @@ -327,7 +345,9 @@ void validate(Table table) { param = "drop"; fieldsSpecified = newHashSet(checkNotNull(drop)); } - table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name())); + // Use findField() to support nested column paths (e.g., "colA.colB") + // Iceberg's Schema.findField() resolves dot-notation paths for nested fields + fieldsSpecified.removeIf(name -> table.schema().findField(name) != null); checkArgument( fieldsSpecified.isEmpty(), diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index bb303ea9c305..8b1037168037 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -273,6 +273,53 @@ public void testProjectedSchema() { assertTrue(projectKeep.sameSchema(expectedKeep)); } + @Test + public void testNestedColumnPruningValidation() { + // Test that nested column paths (dot notation) are accepted in keep/drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test that nested column path "data.name" is valid and can be selected + org.apache.iceberg.Schema projectNestedKeep = + resolveSchema(schemaWithNested, asList("id", "data.name"), null); + + // Verify the projected schema contains the nested field + assertTrue(projectNestedKeep.findField("id") != null); + assertTrue(projectNestedKeep.findField("data.name") != null); + } + + @Test + public void testNestedColumnDropValidation() { + // Test that nested column paths work correctly with drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test dropping a nested field "data.name" - should keep id, data.value, metadata + org.apache.iceberg.Schema projectNestedDrop = + resolveSchema(schemaWithNested, null, asList("data.name")); + + // Verify "data.name" is NOT in the projected schema + assertTrue(projectNestedDrop.findField("id") != null); + assertTrue(projectNestedDrop.findField("data.value") != null); + assertTrue(projectNestedDrop.findField("metadata") != null); + // data.name should be dropped + assertTrue(projectNestedDrop.findField("data.name") == null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId =