diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 310736c014cc..bda1beb2b71c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -104,6 +104,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */ @Internal @@ -113,6 +115,8 @@ }) public class DoFnSignatures { + private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class); + private DoFnSignatures() {} /** @@ -2327,12 +2331,77 @@ private static Map analyzeStateDeclarati (TypeDescriptor) TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType); + // Warn if ValueState contains a collection type that could benefit from specialized state + warnIfValueStateContainsCollection(fnClazz, id, stateType); + declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType)); } return ImmutableMap.copyOf(declarations); } + /** + * Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit + * from using specialized state types (MapState, BagState, SetState) for better performance. + */ + private static void warnIfValueStateContainsCollection( + Class fnClazz, String stateId, TypeDescriptor stateType) { + if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) { + return; + } + + try { + // Get the type directly and extract ValueState's type parameter + Type type = stateType.getType(); + if (!(type instanceof ParameterizedType)) { + return; + } + + // Find ValueState in the type hierarchy and get its type argument + Type valueType = null; + ParameterizedType pType = (ParameterizedType) type; + if (pType.getRawType() == ValueState.class) { + valueType = pType.getActualTypeArguments()[0]; + } else { + // For subtypes of ValueState, we need to resolve the type parameter + return; + } + + if (valueType == null + || valueType instanceof java.lang.reflect.TypeVariable + || valueType instanceof java.lang.reflect.WildcardType) { + // Cannot determine actual type, skip warning + return; + } + + TypeDescriptor valueTypeDescriptor = TypeDescriptor.of(valueType); + Class rawType = valueTypeDescriptor.getRawType(); + + String recommendation = null; + if (Map.class.isAssignableFrom(rawType)) { + recommendation = "MapState"; + } else if (List.class.isAssignableFrom(rawType)) { + recommendation = "BagState or OrderedListState"; + } else if (java.util.Set.class.isAssignableFrom(rawType)) { + recommendation = "SetState"; + } + + if (recommendation != null) { + LOG.warn( + "DoFn {} declares ValueState '{}' with type {}. " + + "Storing collections in ValueState requires reading and writing the entire " + + "collection on each access, which can cause performance issues. " + + "Consider using {} instead for better performance with large collections.", + fnClazz.getSimpleName(), + stateId, + rawType.getSimpleName(), + recommendation); + } + } catch (Exception e) { + // If we can't determine the type, don't warn - it's just an optimization hint + } + } + private static @Nullable Method findAnnotatedMethod( ErrorReporter errors, Class anno, Class fnClazz, boolean required) { Collection matches = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index de4a622e03d7..a394b23cd7a0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -1700,4 +1700,65 @@ public void onMyTimer() {} @Override public void processWithTimer(ProcessContext context, Timer timer) {} } + + // Test DoFns for ValueState collection warning tests + private static class DoFnWithMapValueState extends DoFn { + @StateId("mapState") + private final StateSpec>> mapState = + StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithListValueState extends DoFn { + @StateId("listState") + private final StateSpec>> listState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSetValueState extends DoFn { + @StateId("setState") + private final StateSpec>> setState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + private static class DoFnWithSimpleValueState extends DoFn { + @StateId("simpleState") + private final StateSpec> simpleState = StateSpecs.value(); + + @ProcessElement + public void process() {} + } + + @Test + public void testValueStateWithMapLogsWarning() { + // This test verifies that the signature can be parsed for DoFns with collection ValueState. + // The warning is logged but doesn't prevent the signature from being created. + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class); + assertThat(signature.stateDeclarations().get("mapState"), notNullValue()); + } + + @Test + public void testValueStateWithListLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class); + assertThat(signature.stateDeclarations().get("listState"), notNullValue()); + } + + @Test + public void testValueStateWithSetLogsWarning() { + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class); + assertThat(signature.stateDeclarations().get("setState"), notNullValue()); + } + + @Test + public void testValueStateWithSimpleTypeNoWarning() { + // Simple types should not trigger any warning + DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class); + assertThat(signature.stateDeclarations().get("simpleState"), notNullValue()); + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 3829baa43665..011d17c06579 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -93,10 +93,29 @@ static org.apache.iceberg.Schema resolveSchema( if (keep != null && !keep.isEmpty()) { selectedFieldsBuilder.addAll(keep); } else if (drop != null && !drop.isEmpty()) { - Set fields = - schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet()); - drop.forEach(fields::remove); - selectedFieldsBuilder.addAll(fields); + // Get all field paths including nested ones + java.util.List allPaths = new java.util.ArrayList<>( + org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet()); + java.util.Collections.sort(allPaths); + + // Identify leaf fields only (fields that are not parents of other fields) + // This prevents selecting a parent struct from implicitly including dropped children + java.util.Set leaves = new java.util.HashSet<>(); + for (int i = 0; i < allPaths.size(); i++) { + String path = allPaths.get(i); + // If the next path starts with "path.", then "path" is a parent - skip it + if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) { + continue; + } + leaves.add(path); + } + + // Remove fields that are dropped or are children of dropped fields + for (String d : drop) { + leaves.removeIf(f -> f.equals(d) || f.startsWith(d + ".")); + } + + selectedFieldsBuilder.addAll(leaves); } else { // default: include all columns return schema; @@ -327,7 +346,9 @@ void validate(Table table) { param = "drop"; fieldsSpecified = newHashSet(checkNotNull(drop)); } - table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name())); + // Use findField() to support nested column paths (e.g., "colA.colB") + // Iceberg's Schema.findField() resolves dot-notation paths for nested fields + fieldsSpecified.removeIf(name -> table.schema().findField(name) != null); checkArgument( fieldsSpecified.isEmpty(), diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index bb303ea9c305..6e823cb894bf 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -273,6 +273,55 @@ public void testProjectedSchema() { assertTrue(projectKeep.sameSchema(expectedKeep)); } + @Test + public void testNestedColumnPruningValidation() { + // Test that nested column paths (dot notation) are accepted in keep/drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test that nested column path "data.name" is valid and can be selected + org.apache.iceberg.Schema projectNestedKeep = + resolveSchema(schemaWithNested, asList("id", "data.name"), null); + + // Verify the projected schema contains the nested field + assertTrue(projectNestedKeep.findField("id") != null); + assertTrue(projectNestedKeep.findField("data.name") != null); + } + + @Test + public void testNestedColumnDropValidation() { + // Test that nested column paths work correctly with drop configuration + org.apache.iceberg.Schema schemaWithNested = + new org.apache.iceberg.Schema( + required(1, "id", StringType.get()), + required( + 2, + "data", + StructType.of( + required(3, "name", StringType.get()), + required(4, "value", StringType.get()))), + required(5, "metadata", StringType.get())); + + // Test dropping a nested field "data.name" - should keep id, data.value, metadata + org.apache.iceberg.Schema projectNestedDrop = + resolveSchema(schemaWithNested, null, asList("data.name")); + + // Verify "data.name" is NOT in the projected schema + assertTrue(projectNestedDrop.findField("id") != null); + assertTrue(projectNestedDrop.findField("data.value") != null); + assertTrue(projectNestedDrop.findField("metadata") != null); + // data.name should be dropped + assertTrue(projectNestedDrop.findField("data.name") == null); + } + @Test public void testSimpleScan() throws Exception { TableIdentifier tableId =