From 687181123c58c3b2c55282787a83feec193bf29d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 17 Feb 2026 18:35:27 -0500 Subject: [PATCH 1/5] Bigtable: add row filter text proto for template --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 34 ++++++++++++++++++- .../io/gcp/bigtable/BigtableReadOptions.java | 12 +++++++ .../sdk/io/gcp/bigtable/BigtableIOTest.java | 26 ++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 6441524fc847..1eb5c2f3f6c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.gax.batching.BatchingException; @@ -38,6 +39,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; +import com.google.protobuf.TextFormat; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -610,6 +612,22 @@ public Read withRowFilter(RowFilter filter) { return withRowFilter(StaticValueProvider.of(filter)); } + /** + * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable + * using the given {@link TextFormat} row filter. If {@link #withRowFilter(RowFilter)} is also + * set, {@link #withRowFilter(RowFilter)} is used. + * + *

Does not modify this object. + */ + public Read withRowFilterTextProto(ValueProvider filter) { + checkNotNull(filter, "filter can not be null"); + BigtableReadOptions bigtableReadOptions = getBigtableReadOptions(); + return toBuilder() + .setBigtableReadOptions( + bigtableReadOptions.toBuilder().setRowFilterTextProto(filter).build()) + .build(); + } + /** * Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches. * This function will switch the base BigtableIO.Reader class to using the SegmentReader. If @@ -1939,7 +1957,21 @@ public List getRanges() { public @Nullable RowFilter getRowFilter() { ValueProvider rowFilter = readOptions.getRowFilter(); - return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null; + if (rowFilter != null && rowFilter.isAccessible()) { + return rowFilter.get(); + } + + ValueProvider rowFilterTextProto = readOptions.getRowFilterTextProto(); + if (rowFilterTextProto != null && rowFilterTextProto.isAccessible()) { + try { + RowFilter parsed = TextFormat.parse(rowFilterTextProto.get(), RowFilter.class); + return parsed; + } catch (TextFormat.ParseException e) { + throw new RuntimeException("Failed to parse row filter text proto", e); + } + } + + return null; } public @Nullable Integer getMaxBufferElementCount() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java index 46834cc9756f..af4b7da18b17 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.RowFilter; @@ -43,6 +44,8 @@ abstract class BigtableReadOptions implements Serializable { /** Returns the row filter to use. */ abstract @Nullable ValueProvider getRowFilter(); + abstract @Nullable ValueProvider getRowFilterTextProto(); + /** Returns the key ranges to read. */ abstract @Nullable ValueProvider> getKeyRanges(); @@ -73,6 +76,8 @@ abstract static class Builder { abstract Builder setRowFilter(ValueProvider rowFilter); + abstract Builder setRowFilterTextProto(ValueProvider rowFilter); + abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount); abstract Builder setKeyRanges(ValueProvider> keyRanges); @@ -110,6 +115,8 @@ void populateDisplayData(DisplayData.Builder builder) { builder .addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id")) .addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter")) + .addIfNotNull( + DisplayData.item("rowFilterTextProto", getRowFilterTextProto()).withLabel("Row Filter")) .addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges")) .addIfNotNull( DisplayData.item("attemptTimeout", getAttemptTimeout()) @@ -127,6 +134,11 @@ void validate() { if (getRowFilter() != null && getRowFilter().isAccessible()) { checkArgument(getRowFilter().get() != null, "rowFilter can not be null"); } + + if (getRowFilterTextProto() != null && getRowFilterTextProto().isAccessible()) { + checkNotNull(getRowFilterTextProto(), "rowFilter can not be null"); + } + if (getMaxBufferElementCount() != null) { checkArgument( getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 2065772a9a4f..8abec5409d7a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -680,6 +680,32 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception { defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)), Lists.newArrayList(filteredRows)); } + + /** Tests reading rows using a text proto filter provided through ValueProvider. */ + @Test + public void testReadingWithRowFilterTextProto() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + List testRows = makeTableData(table, numRows); + String regex = ".*17.*"; + final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex); + Iterable filteredRows = + testRows.stream() + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate.apply(input.getKey()); + }) + .collect(Collectors.toList()); + + String filter = "row_key_regex_filter: \".*17.*\""; + service.setupSampleRowKeys(table, 5, 10L); + + runReadTest( + defaultRead.withTableId(table).withRowFilterTextProto(StaticValueProvider.of(filter)), + Lists.newArrayList(filteredRows)); + } + /** Tests dynamic work rebalancing exhaustively. */ @Test public void testReadingSplitAtFractionExhaustive() throws Exception { From 61113c106a872cbd76fde7802290ff284b0f0bbf Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 17 Feb 2026 18:38:37 -0500 Subject: [PATCH 2/5] remove redundent code --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 1eb5c2f3f6c9..0cbff2d3607e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1964,8 +1964,7 @@ public List getRanges() { ValueProvider rowFilterTextProto = readOptions.getRowFilterTextProto(); if (rowFilterTextProto != null && rowFilterTextProto.isAccessible()) { try { - RowFilter parsed = TextFormat.parse(rowFilterTextProto.get(), RowFilter.class); - return parsed; + return TextFormat.parse(rowFilterTextProto.get(), RowFilter.class); } catch (TextFormat.ParseException e) { throw new RuntimeException("Failed to parse row filter text proto", e); } From a9b08f5dc95d5b4e8e589ac9969b45d066a639c4 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 18 Feb 2026 10:39:23 -0500 Subject: [PATCH 3/5] chain filters --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 0cbff2d3607e..b93265ac077b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -615,7 +615,7 @@ public Read withRowFilter(RowFilter filter) { /** * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable * using the given {@link TextFormat} row filter. If {@link #withRowFilter(RowFilter)} is also - * set, {@link #withRowFilter(RowFilter)} is used. + * set, the filters are chained. * *

Does not modify this object. */ @@ -1956,20 +1956,34 @@ public List getRanges() { } public @Nullable RowFilter getRowFilter() { - ValueProvider rowFilter = readOptions.getRowFilter(); - if (rowFilter != null && rowFilter.isAccessible()) { - return rowFilter.get(); + ValueProvider rowFilterValueProvider = readOptions.getRowFilter(); + RowFilter rowFilter = null; + if (rowFilterValueProvider != null && rowFilterValueProvider.isAccessible()) { + rowFilter = rowFilterValueProvider.get(); } - ValueProvider rowFilterTextProto = readOptions.getRowFilterTextProto(); - if (rowFilterTextProto != null && rowFilterTextProto.isAccessible()) { + ValueProvider textFilterValueProvider = readOptions.getRowFilterTextProto(); + RowFilter textFilter = null; + if (textFilterValueProvider != null && textFilterValueProvider.isAccessible()) { try { - return TextFormat.parse(rowFilterTextProto.get(), RowFilter.class); + textFilter = TextFormat.parse(textFilterValueProvider.get(), RowFilter.class); } catch (TextFormat.ParseException e) { throw new RuntimeException("Failed to parse row filter text proto", e); } } + if (rowFilter != null && textFilter != null) { + RowFilter.Chain chain = + RowFilter.Chain.newBuilder().addFilters(rowFilter).addFilters(textFilter).build(); + return RowFilter.newBuilder().setChain(chain).build(); + } + if (rowFilter != null) { + return rowFilter; + } + if (textFilter != null) { + return textFilter; + } + return null; } From 46c6711d41e799061e3d261d0337d9be692fcc31 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 18 Feb 2026 10:49:15 -0500 Subject: [PATCH 4/5] add a test --- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 65 +++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 8abec5409d7a..a654f63deea7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -706,6 +706,43 @@ public void testReadingWithRowFilterTextProto() throws Exception { Lists.newArrayList(filteredRows)); } + /** Tests reading rows using both filter providers. */ + @Test + public void testReadingWithBothTextFilterAndRowFilter() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + List testRows = makeTableData(table, numRows); + String regex1 = ".*17.*"; + String regex2 = ".*2.*"; + final KeyMatchesRegex keyPredicate1 = new KeyMatchesRegex(regex1); + final KeyMatchesRegex keyPredicate2 = new KeyMatchesRegex(regex2); + Iterable filteredRows = + testRows.stream() + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate1.apply(input.getKey()); + }) + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate2.apply(input.getKey()); + }) + .collect(Collectors.toList()); + + String filter = "row_key_regex_filter: \".*17.*\""; + RowFilter rowFilter = + RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex2)).build(); + service.setupSampleRowKeys(table, 5, 10L); + + runReadTest( + defaultRead + .withTableId(table) + .withRowFilterTextProto(StaticValueProvider.of(filter)) + .withRowFilter(rowFilter), + Lists.newArrayList(filteredRows)); + } + /** Tests dynamic work rebalancing exhaustively. */ @Test public void testReadingSplitAtFractionExhaustive() throws Exception { @@ -1804,17 +1841,27 @@ private static class FakeBigtableReader implements BigtableService.Reader { private final FakeBigtableService service; private Iterator> rows; private Row currentRow; - private final Predicate filter; + private final List> filters = new ArrayList<>(); public FakeBigtableReader(BigtableSource source, FakeBigtableService service) { this.source = source; this.service = service; if (source.getRowFilter() == null) { - filter = Predicates.alwaysTrue(); + filters.add(Predicates.alwaysTrue()); } else { - ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter(); - checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); - filter = new KeyMatchesRegex(keyRegex.toStringUtf8()); + RowFilter rowFilter = source.getRowFilter(); + if (rowFilter.hasChain()) { + RowFilter.Chain chain = rowFilter.getChain(); + for (RowFilter filter : chain.getFiltersList()) { + ByteString keyRegex = filter.getRowKeyRegexFilter(); + checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); + filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8())); + } + } else { + ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter(); + checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); + filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8())); + } } service.verifyTableExists(source.getTableId().get()); } @@ -1835,7 +1882,13 @@ public boolean advance() throws IOException { Map.Entry entry = null; while (rows.hasNext()) { entry = rows.next(); - if (!filter.apply(entry.getKey()) + boolean predicateMatched = true; + for (Predicate predicate : filters) { + if (!predicate.apply(entry.getKey())) { + predicateMatched = false; + } + } + if (!predicateMatched || !rangesContainsKey(source.getRanges(), makeByteKey(entry.getKey()))) { // Does not match row filter or does not match source range. Skip. entry = null; From 50984c6fd199fb081c86f8296171a7a692345eff Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 18 Feb 2026 11:10:52 -0500 Subject: [PATCH 5/5] simplify logic --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index b93265ac077b..58d73af7effb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1956,35 +1956,28 @@ public List getRanges() { } public @Nullable RowFilter getRowFilter() { + RowFilter.Chain.Builder chain = RowFilter.Chain.newBuilder(); ValueProvider rowFilterValueProvider = readOptions.getRowFilter(); - RowFilter rowFilter = null; if (rowFilterValueProvider != null && rowFilterValueProvider.isAccessible()) { - rowFilter = rowFilterValueProvider.get(); + chain.addFilters(rowFilterValueProvider.get()); } - ValueProvider textFilterValueProvider = readOptions.getRowFilterTextProto(); - RowFilter textFilter = null; if (textFilterValueProvider != null && textFilterValueProvider.isAccessible()) { try { - textFilter = TextFormat.parse(textFilterValueProvider.get(), RowFilter.class); + chain.addFilters(TextFormat.parse(textFilterValueProvider.get(), RowFilter.class)); } catch (TextFormat.ParseException e) { throw new RuntimeException("Failed to parse row filter text proto", e); } } - if (rowFilter != null && textFilter != null) { - RowFilter.Chain chain = - RowFilter.Chain.newBuilder().addFilters(rowFilter).addFilters(textFilter).build(); - return RowFilter.newBuilder().setChain(chain).build(); - } - if (rowFilter != null) { - return rowFilter; + switch (chain.getFiltersCount()) { + case 0: + return null; + case 1: + return chain.getFilters(0); + default: + return RowFilter.newBuilder().setChain(chain.build()).build(); } - if (textFilter != null) { - return textFilter; - } - - return null; } public @Nullable Integer getMaxBufferElementCount() {