diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index b73af5e61a43..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/.gitignore b/.gitignore index 9c6e68f4ce59..e9fe331cb316 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,13 @@ # is an input to 'maven-assembly-plugin' that generates source distribution. # This is typically in files named 'src.xml' throughout this repository. +# Ignore IDE files +.codex/ +.trae/ +.cursor/ +.windsurf/ +.claude/ + # Ignore any offline repositories the user may have created. **/offline-repository/**/* diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index fd008701c548..855fcf7d1e19 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -66,8 +66,6 @@ /** * Utilities that convert between a SQL filter expression and an Iceberg {@link Expression}. Uses * Apache Calcite semantics. - * - *

Note: Only supports top-level fields (i.e. cannot reference nested fields). */ @Internal public class FilterUtils { @@ -112,7 +110,7 @@ static Set getReferencedFieldNames(@Nullable String filter) { private static void extractFieldNames(SqlNode node, Set fieldNames) { if (node instanceof SqlIdentifier) { - fieldNames.add(((SqlIdentifier) node).getSimple()); + fieldNames.add(getFieldName((SqlIdentifier) node)); } else if (node instanceof SqlBasicCall) { // recursively check operands SqlBasicCall call = (SqlBasicCall) node; @@ -133,9 +131,6 @@ private static void extractFieldNames(SqlNode node, Set fieldNames) { /** * parses a SQL filter expression string into an Iceberg {@link Expression} that can be used for * data pruning. - * - *

Note: This utility currently supports only top-level fields within the filter expression. - * Nested field references are not supported. */ static Expression convert(@Nullable String filter, Schema schema) { if (filter == null) { @@ -154,7 +149,7 @@ static Expression convert(@Nullable String filter, Schema schema) { private static Expression convert(SqlNode expression, Schema schema) throws SqlParseException { if (expression instanceof SqlIdentifier) { - String fieldName = ((SqlIdentifier) expression).getSimple(); + String fieldName = getFieldName((SqlIdentifier) expression); Types.NestedField field = schema.caseInsensitiveFindField(fieldName); if (field.type().equals(Types.BooleanType.get())) { return Expressions.equal(field.name(), true); @@ -242,7 +237,14 @@ private static String getOnlyChildName(SqlBasicCall call) { SqlNode ref = call.operand(0); Preconditions.checkState( ref instanceof SqlIdentifier, "Expected operand '%s' to be a reference.", ref); - return ((SqlIdentifier) ref).getSimple(); + return getFieldName((SqlIdentifier) ref); + } + + private static String getFieldName(SqlIdentifier identifier) { + if (identifier.isSimple()) { + return identifier.getSimple(); + } + return String.join(".", identifier.names); } private static SqlNode getLeftChild(SqlBasicCall call) { @@ -285,9 +287,9 @@ private static Expression convertFieldInLiteral(Operation op, SqlBasicCall call, checkArgument( value instanceof SqlNodeList, "Expected right hand side to be a list but got " + value.getClass()); - String caseInsensitiveName = ((SqlIdentifier) term).getSimple(); + String caseInsensitiveName = getFieldName((SqlIdentifier) term); Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); - String name = field.name(); + String name = schema.findColumnName(field.fieldId()); TypeID type = field.type().typeId(); List list = ((SqlNodeList) value) @@ -313,16 +315,16 @@ private static Expression convertFieldAndLiteral( SqlNode left = getLeftChild(call); SqlNode right = getRightChild(call); if (left instanceof SqlIdentifier && right instanceof SqlLiteral) { - String caseInsensitiveName = ((SqlIdentifier) left).getSimple(); + String caseInsensitiveName = getFieldName((SqlIdentifier) left); Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); - String name = field.name(); + String name = schema.findColumnName(field.fieldId()); TypeID type = field.type().typeId(); Object value = convertLiteral((SqlLiteral) right, name, type); return convertLR.apply(name, value); } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) { - String caseInsensitiveName = ((SqlIdentifier) right).getSimple(); + String caseInsensitiveName = getFieldName((SqlIdentifier) right); Types.NestedField field = schema.caseInsensitiveFindField(caseInsensitiveName); - String name = field.name(); + String name = schema.findColumnName(field.fieldId()); TypeID type = field.type().typeId(); Object value = convertLiteral((SqlLiteral) left, name, type); return convertRL.apply(name, value); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java index 591467ce0d05..12acfff7bf0a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -169,6 +169,21 @@ public void testLessThan() { .validate(); } + @Test + public void testNestedField() { + // nested integer + TestCase.expecting(lessThan("nested.field", 30)) + .fromFilter("\"nested\".\"field\" < 30") + .withSchema( + new Schema( + Types.NestedField.required( + 1, + "nested", + Types.StructType.of( + Types.NestedField.required(2, "field", Types.IntegerType.get()))))) + .validate(); + } + @Test public void testLessThanOrEqual() { // integer @@ -726,6 +741,7 @@ public void testReferencedFieldsInFilter() { Pair.of("field_1 < 35", Sets.newHashSet("FIELD_1")), Pair.of("\"field_1\" in (1, 2, 3)", Sets.newHashSet("field_1")), Pair.of("field_1 < 35 and \"fiELd_2\" = TRUE", Sets.newHashSet("FIELD_1", "fiELd_2")), + Pair.of("\"nested\".\"inner\" = 'abc'", Sets.newHashSet("nested.inner")), Pair.of( "(\"field_1\" < 35 and \"field_2\" = TRUE) or \"field_3\" in ('a', 'b')", Sets.newHashSet("field_1", "field_2", "field_3"))); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index 9e6aa5913cc5..c2c5dc0b8f4a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -488,6 +488,25 @@ public void testReadWithFilterAndColumnPruning_keep() throws Exception { pipeline.run().waitUntilFinish(); } + @Test + public void testReadWithNestedFieldFilter() throws Exception { + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); + + List expectedRows = + populateTable(table).stream() + .filter(row -> row.getRow("row").getInt32("nested_int") < 350) + .collect(Collectors.toList()); + + Map config = new HashMap<>(managedIcebergConfig(tableId())); + config.put("filter", "\"row\".\"nested_int\" < 350"); + + PCollection rows = + pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection(); + + PAssert.that(rows).containsInAnyOrder(expectedRows); + pipeline.run().waitUntilFinish(); + } + @Test public void testStreamingReadWithFilter() throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);