Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for working with {@link DoFnSignature}. See {@link #getSignature}. */
@Internal
Expand All @@ -113,6 +115,8 @@
})
public class DoFnSignatures {

private static final Logger LOG = LoggerFactory.getLogger(DoFnSignatures.class);

private DoFnSignatures() {}

/**
Expand Down Expand Up @@ -2327,12 +2331,77 @@ private static Map<String, DoFnSignature.StateDeclaration> analyzeStateDeclarati
(TypeDescriptor<? extends State>)
TypeDescriptor.of(fnClazz).resolveType(unresolvedStateType);

// Warn if ValueState contains a collection type that could benefit from specialized state
warnIfValueStateContainsCollection(fnClazz, id, stateType);

declarations.put(id, DoFnSignature.StateDeclaration.create(id, field, stateType));
}

return ImmutableMap.copyOf(declarations);
}

/**
* Warns if a ValueState is declared with a collection type (Map, List, Set) that could benefit
* from using specialized state types (MapState, BagState, SetState) for better performance.
*/
private static void warnIfValueStateContainsCollection(
Class<?> fnClazz, String stateId, TypeDescriptor<? extends State> stateType) {
if (!stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) {
return;
}

try {
// Get the type directly and extract ValueState's type parameter
Type type = stateType.getType();
if (!(type instanceof ParameterizedType)) {
return;
}

// Find ValueState in the type hierarchy and get its type argument
Type valueType = null;
ParameterizedType pType = (ParameterizedType) type;
if (pType.getRawType() == ValueState.class) {
valueType = pType.getActualTypeArguments()[0];
} else {
// For subtypes of ValueState, we need to resolve the type parameter
return;
}

if (valueType == null
|| valueType instanceof java.lang.reflect.TypeVariable
|| valueType instanceof java.lang.reflect.WildcardType) {
// Cannot determine actual type, skip warning
return;
}

TypeDescriptor<?> valueTypeDescriptor = TypeDescriptor.of(valueType);
Class<?> rawType = valueTypeDescriptor.getRawType();

String recommendation = null;
if (Map.class.isAssignableFrom(rawType)) {
recommendation = "MapState";
} else if (List.class.isAssignableFrom(rawType)) {
recommendation = "BagState or OrderedListState";
} else if (java.util.Set.class.isAssignableFrom(rawType)) {
recommendation = "SetState";
}

if (recommendation != null) {
LOG.warn(
"DoFn {} declares ValueState '{}' with type {}. "
+ "Storing collections in ValueState requires reading and writing the entire "
+ "collection on each access, which can cause performance issues. "
+ "Consider using {} instead for better performance with large collections.",
fnClazz.getSimpleName(),
stateId,
rawType.getSimpleName(),
recommendation);
}
} catch (Exception e) {
// If we can't determine the type, don't warn - it's just an optimization hint
}
}

private static @Nullable Method findAnnotatedMethod(
ErrorReporter errors, Class<? extends Annotation> anno, Class<?> fnClazz, boolean required) {
Collection<Method> matches =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,4 +1700,65 @@ public void onMyTimer() {}
@Override
public void processWithTimer(ProcessContext context, Timer timer) {}
}

// Test DoFns for ValueState collection warning tests
private static class DoFnWithMapValueState extends DoFn<String, String> {
@StateId("mapState")
private final StateSpec<ValueState<java.util.Map<String, String>>> mapState =
StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithListValueState extends DoFn<String, String> {
@StateId("listState")
private final StateSpec<ValueState<java.util.List<String>>> listState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSetValueState extends DoFn<String, String> {
@StateId("setState")
private final StateSpec<ValueState<java.util.Set<String>>> setState = StateSpecs.value();

@ProcessElement
public void process() {}
}

private static class DoFnWithSimpleValueState extends DoFn<String, String> {
@StateId("simpleState")
private final StateSpec<ValueState<String>> simpleState = StateSpecs.value();

@ProcessElement
public void process() {}
}

@Test
public void testValueStateWithMapLogsWarning() {
// This test verifies that the signature can be parsed for DoFns with collection ValueState.
// The warning is logged but doesn't prevent the signature from being created.
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithMapValueState.class);
assertThat(signature.stateDeclarations().get("mapState"), notNullValue());
}

@Test
public void testValueStateWithListLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithListValueState.class);
assertThat(signature.stateDeclarations().get("listState"), notNullValue());
}

@Test
public void testValueStateWithSetLogsWarning() {
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSetValueState.class);
assertThat(signature.stateDeclarations().get("setState"), notNullValue());
}

@Test
public void testValueStateWithSimpleTypeNoWarning() {
// Simple types should not trigger any warning
DoFnSignature signature = DoFnSignatures.getSignature(DoFnWithSimpleValueState.class);
assertThat(signature.stateDeclarations().get("simpleState"), notNullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,29 @@ static org.apache.iceberg.Schema resolveSchema(
if (keep != null && !keep.isEmpty()) {
selectedFieldsBuilder.addAll(keep);
} else if (drop != null && !drop.isEmpty()) {
Set<String> fields =
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toSet());
drop.forEach(fields::remove);
selectedFieldsBuilder.addAll(fields);
// Get all field paths including nested ones
java.util.List<String> allPaths = new java.util.ArrayList<>(
org.apache.iceberg.types.TypeUtil.indexByName(schema.asStruct()).keySet());
java.util.Collections.sort(allPaths);

// Identify leaf fields only (fields that are not parents of other fields)
// This prevents selecting a parent struct from implicitly including dropped children
java.util.Set<String> leaves = new java.util.HashSet<>();
for (int i = 0; i < allPaths.size(); i++) {
String path = allPaths.get(i);
// If the next path starts with "path.", then "path" is a parent - skip it
if (i + 1 < allPaths.size() && allPaths.get(i + 1).startsWith(path + ".")) {
continue;
}
leaves.add(path);
}

// Remove fields that are dropped or are children of dropped fields
for (String d : drop) {
leaves.removeIf(f -> f.equals(d) || f.startsWith(d + "."));
}

selectedFieldsBuilder.addAll(leaves);
} else {
// default: include all columns
return schema;
Expand Down Expand Up @@ -327,7 +346,9 @@ void validate(Table table) {
param = "drop";
fieldsSpecified = newHashSet(checkNotNull(drop));
}
table.schema().columns().forEach(nf -> fieldsSpecified.remove(nf.name()));
// Use findField() to support nested column paths (e.g., "colA.colB")
// Iceberg's Schema.findField() resolves dot-notation paths for nested fields
fieldsSpecified.removeIf(name -> table.schema().findField(name) != null);

checkArgument(
fieldsSpecified.isEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,55 @@ public void testProjectedSchema() {
assertTrue(projectKeep.sameSchema(expectedKeep));
}

@Test
public void testNestedColumnPruningValidation() {
// Test that nested column paths (dot notation) are accepted in keep/drop configuration
org.apache.iceberg.Schema schemaWithNested =
new org.apache.iceberg.Schema(
required(1, "id", StringType.get()),
required(
2,
"data",
StructType.of(
required(3, "name", StringType.get()),
required(4, "value", StringType.get()))),
required(5, "metadata", StringType.get()));

// Test that nested column path "data.name" is valid and can be selected
org.apache.iceberg.Schema projectNestedKeep =
resolveSchema(schemaWithNested, asList("id", "data.name"), null);

// Verify the projected schema contains the nested field
assertTrue(projectNestedKeep.findField("id") != null);
assertTrue(projectNestedKeep.findField("data.name") != null);
}

@Test
public void testNestedColumnDropValidation() {
// Test that nested column paths work correctly with drop configuration
org.apache.iceberg.Schema schemaWithNested =
new org.apache.iceberg.Schema(
required(1, "id", StringType.get()),
required(
2,
"data",
StructType.of(
required(3, "name", StringType.get()),
required(4, "value", StringType.get()))),
required(5, "metadata", StringType.get()));

// Test dropping a nested field "data.name" - should keep id, data.value, metadata
org.apache.iceberg.Schema projectNestedDrop =
resolveSchema(schemaWithNested, null, asList("data.name"));

// Verify "data.name" is NOT in the projected schema
assertTrue(projectNestedDrop.findField("id") != null);
assertTrue(projectNestedDrop.findField("data.value") != null);
assertTrue(projectNestedDrop.findField("metadata") != null);
// data.name should be dropped
assertTrue(projectNestedDrop.findField("data.name") == null);
}

@Test
public void testSimpleScan() throws Exception {
TableIdentifier tableId =
Expand Down
Loading