From feffbecdebf1f1232e4949ed7043faa241a9464e Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:03:05 +0800 Subject: [PATCH 1/4] [Iceberg] Support nested column paths in keep/drop configuration This change fixes the validation logic in IcebergScanConfig to support nested column paths using dot notation (e.g., "data.name"). Previously, the validation only checked top-level column names, causing nested paths like "colA.colB" to fail with "unknown field(s)" error. The fix uses Iceberg's Schema.findField() which natively resolves dot-notation paths for nested fields. Fixes #37486 --- .../sdk/io/iceberg/IcebergScanConfig.java | 4 +++- .../sdk/io/iceberg/IcebergIOReadTest.java | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 3829baa43665..51591e2f8476 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -327,7 +327,9 @@ void validate(Table table) { param = "drop"; fieldsSpecified = newHashSet(checkNotNull(drop)); } - table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name())); + // Use findField() to support nested column paths (e.g., "colA.colB") + // Iceberg's Schema.findField() resolves dot-notation paths for nested fields + fieldsSpecified.removeIf(name -> table.schema().findField(name) != null); checkArgument( fieldsSpecified.isEmpty(), diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index bb303ea9c305..dd2bf3037888 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -273,6 +273,29 @@ public void testProjectedSchema() { assertTrue(projectKeep.sameSchema(expectedKeep)); } + @Test + public void testNestedColumnPruningValidation() { + // Test that nested column paths (dot notation) are accepted in keep/drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test that nested column path "data.name" is valid and can be selected + org.apache.iceberg.Schema projectNestedKeep = + resolveSchema(schemaWithNested, asList("id", "data.name"), null); + + // Verify the projected schema contains the nested field + assertTrue(projectNestedKeep.findField("id") != null); + assertTrue(projectNestedKeep.findField("data.name") != null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = From 90f2d10fb61944de6097f8ef97a3527bc60c8dbb Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:40:21 +0800 Subject: [PATCH 2/4] [Iceberg] Also fix drop functionality for nested column paths - Use TypeUtil.indexByName() to enumerate all field paths - Only select leaf fields to prevent parent struct from including dropped children - Add test for nested drop validation --- .../sdk/io/iceberg/IcebergScanConfig.java | 27 ++++++++++++++++--- .../sdk/io/iceberg/IcebergIOReadTest.java | 26 ++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 51591e2f8476..011d17c06579 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -93,10 +93,29 @@ static org.apache.iceberg.Schema resolveSchema( if (keep != null && !keep.isEmpty()) { selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { - Set fields = - schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); - drop.forEach(fields::remove); - selectedFieldsBuilder.addAll(fields); + // Get all field paths including nested ones + java.util.List allPaths = new java.util.ArrayList<>( + org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); + java.util.Collections.sort(allPaths); + + // Identify leaf fields only (fields that are not parents of other fields) + // This prevents selecting a parent struct from implicitly including dropped children + java.util.Set leaves = new java.util.HashSet<>(); + for (int i = 0; i < allPaths.size(); i++) { + String path = allPaths.get(i); + // If the next path starts with "path.", then "path" is a parent - skip it + if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) { + continue; + } + leaves.add(path); + } + + // Remove fields that are dropped or are children of dropped fields + for (String d : drop) { + leaves.removeIf(f -> f.equals(d) || f.startsWith(d + ".")); + } + + selectedFieldsBuilder.addAll(leaves); } else { // default: include all columns return schema; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index dd2bf3037888..6e823cb894bf 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -296,6 +296,32 @@ public void testNestedColumnPruningValidation() { assertTrue(projectNestedKeep.findField("data.name") != null); } + @Test + public void testNestedColumnDropValidation() { + // Test that nested column paths work correctly with drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test dropping a nested field "data.name" - should keep id, data.value, metadata + org.apache.iceberg.Schema projectNestedDrop = + resolveSchema(schemaWithNested, null, asList("data.name")); + + // Verify "data.name" is NOT in the projected schema + assertTrue(projectNestedDrop.findField("id") != null); + assertTrue(projectNestedDrop.findField("data.value") != null); + assertTrue(projectNestedDrop.findField("metadata") != null); + // data.name should be dropped + assertTrue(projectNestedDrop.findField("data.name") == null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId = From 59e8733dee2224010ba8f63aa5c3d1547e1ad952 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Fri, 6 Feb 2026 03:07:02 +0800 Subject: [PATCH 3/4] [Java SDK] Warn when ValueState contains collection types When users declare ValueState, ValueState, or ValueState, log a warning suggesting they use MapState, BagState, or SetState instead. Storing collections in ValueState requires reading and writing the entire collection on each access, which can cause performance issues for large collections. The specialized state types provide better performance. Fixes #36746 --- .../transforms/reflect/DoFnSignatures.java | 59 ++++++++++++++++++ .../reflect/DoFnSignaturesTest.java | 61 +++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 310736c014cc..9acfb25f02b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -104,6 +104,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */ @Internal @@ -113,6 +115,8 @@ }) public class DoFnSignatures { + private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class); + private DoFnSignatures() {} /** @@ -2327,12 +2331,67 @@ private static Map analyzeStateDeclarati (TypeDescriptor) TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType); + // Warn if ValueState contains a collection type that could benefit from specialized state + warnIfValueStateContainsCollection(fnClazz, id, stateType); + declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType)); } return ImmutableMap.copyOf(declarations); } + /** + * Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit + * from using specialized state types (MapState, BagState, SetState) for better performance. + */ + private static void warnIfValueStateContainsCollection( + Class fnClazz, String stateId, TypeDescriptor stateType) { + if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) { + return; + } + + try { + TypeDescriptor valueStateType = stateType.getSupertype(ValueState.class); + Type type = valueStateType.getType(); + if (!(type instanceof ParameterizedType)) { + return; + } + + Type valueType = ((ParameterizedType) type).getActualTypeArguments()[0]; + if (valueType instanceof java.lang.reflect.TypeVariable + || valueType instanceof java.lang.reflect.WildcardType) { + // Cannot determine actual type, skip warning + return; + } + + TypeDescriptor valueTypeDescriptor = TypeDescriptor.of(valueType); + Class rawType = valueTypeDescriptor.getRawType(); + + String recommendation = null; + if (Map.class.isAssignableFrom(rawType)) { + recommendation = "MapState"; + } else if (List.class.isAssignableFrom(rawType)) { + recommendation = "BagState or OrderedListState"; + } else if (java.util.Set.class.isAssignableFrom(rawType)) { + recommendation = "SetState"; + } + + if (recommendation != null) { + LOG.warn( + "DoFn {} declares ValueState '{}' with type {}. " + + "Storing collections in ValueState requires reading and writing the entire " + + "collection on each access, which can cause performance issues. " + + "Consider using {} instead for better performance with large collections.", + fnClazz.getSimpleName(), + stateId, + rawType.getSimpleName(), + recommendation); + } + } catch (Exception e) { + // If we can't determine the type, don't warn - it's just an optimization hint + } + } + private static @Nullable Method findAnnotatedMethod( ErrorReporter errors, Class anno, Class fnClazz, boolean required) { Collection matches = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index de4a622e03d7..a394b23cd7a0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -1700,4 +1700,65 @@ public void onMyTimer() {} @Override public void processWithTimer(ProcessContext context, Timer timer) {} } + + // Test DoFns for ValueState collection warning tests + private static class DoFnWithMapValueState extends DoFn { + @StateId("mapState") + private final StateSpec>> mapState = + StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithListValueState extends DoFn { + @StateId("listState") + private final StateSpec>> listState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSetValueState extends DoFn { + @StateId("setState") + private final StateSpec>> setState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSimpleValueState extends DoFn { + @StateId("simpleState") + private final StateSpec> simpleState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + @Test + public void testValueStateWithMapLogsWarning() { + // This test verifies that the signature can be parsed for DoFns with collection ValueState. + // The warning is logged but doesn't prevent the signature from being created. + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class); + assertThat(signature.stateDeclarations().get("mapState"), notNullValue()); + } + + @Test + public void testValueStateWithListLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class); + assertThat(signature.stateDeclarations().get("listState"), notNullValue()); + } + + @Test + public void testValueStateWithSetLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class); + assertThat(signature.stateDeclarations().get("setState"), notNullValue()); + } + + @Test + public void testValueStateWithSimpleTypeNoWarning() { + // Simple types should not trigger any warning + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class); + assertThat(signature.stateDeclarations().get("simpleState"), notNullValue()); + } } From a750a7578347465c9bd34a6569302c142e37c666 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Fri, 6 Feb 2026 03:27:34 +0800 Subject: [PATCH 4/4] Fix compilation error in warnIfValueStateContainsCollection --- .../sdk/transforms/reflect/DoFnSignatures.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 9acfb25f02b0..bda1beb2b71c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -2351,14 +2351,24 @@ private static void warnIfValueStateContainsCollection( } try { - TypeDescriptor valueStateType = stateType.getSupertype(ValueState.class); - Type type = valueStateType.getType(); + // Get the type directly and extract ValueState's type parameter + Type type = stateType.getType(); if (!(type instanceof ParameterizedType)) { return; } - Type valueType = ((ParameterizedType) type).getActualTypeArguments()[0]; - if (valueType instanceof java.lang.reflect.TypeVariable + // Find ValueState in the type hierarchy and get its type argument + Type valueType = null; + ParameterizedType pType = (ParameterizedType) type; + if (pType.getRawType() == ValueState.class) { + valueType = pType.getActualTypeArguments()[0]; + } else { + // For subtypes of ValueState, we need to resolve the type parameter + return; + } + + if (valueType == null + || valueType instanceof java.lang.reflect.TypeVariable || valueType instanceof java.lang.reflect.WildcardType) { // Cannot determine actual type, skip warning return;