diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 6441524fc847..58d73af7effb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.gax.batching.BatchingException; @@ -38,6 +39,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; +import com.google.protobuf.TextFormat; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -610,6 +612,22 @@ public Read withRowFilter(RowFilter filter) { return withRowFilter(StaticValueProvider.of(filter)); } + /** + * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable + * using the given {@link TextFormat} row filter. If {@link #withRowFilter(RowFilter)} is also + * set, the filters are chained. + * + *

Does not modify this object. + */ + public Read withRowFilterTextProto(ValueProvider filter) { + checkNotNull(filter, "filter can not be null"); + BigtableReadOptions bigtableReadOptions = getBigtableReadOptions(); + return toBuilder() + .setBigtableReadOptions( + bigtableReadOptions.toBuilder().setRowFilterTextProto(filter).build()) + .build(); + } + /** * Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches. * This function will switch the base BigtableIO.Reader class to using the SegmentReader. If @@ -1938,8 +1956,28 @@ public List getRanges() { } public @Nullable RowFilter getRowFilter() { - ValueProvider rowFilter = readOptions.getRowFilter(); - return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null; + RowFilter.Chain.Builder chain = RowFilter.Chain.newBuilder(); + ValueProvider rowFilterValueProvider = readOptions.getRowFilter(); + if (rowFilterValueProvider != null && rowFilterValueProvider.isAccessible()) { + chain.addFilters(rowFilterValueProvider.get()); + } + ValueProvider textFilterValueProvider = readOptions.getRowFilterTextProto(); + if (textFilterValueProvider != null && textFilterValueProvider.isAccessible()) { + try { + chain.addFilters(TextFormat.parse(textFilterValueProvider.get(), RowFilter.class)); + } catch (TextFormat.ParseException e) { + throw new RuntimeException("Failed to parse row filter text proto", e); + } + } + + switch (chain.getFiltersCount()) { + case 0: + return null; + case 1: + return chain.getFilters(0); + default: + return RowFilter.newBuilder().setChain(chain.build()).build(); + } } public @Nullable Integer getMaxBufferElementCount() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java index 46834cc9756f..af4b7da18b17 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigtable; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.RowFilter; @@ -43,6 +44,8 @@ abstract class BigtableReadOptions implements Serializable { /** Returns the row filter to use. */ abstract @Nullable ValueProvider getRowFilter(); + abstract @Nullable ValueProvider getRowFilterTextProto(); + /** Returns the key ranges to read. */ abstract @Nullable ValueProvider> getKeyRanges(); @@ -73,6 +76,8 @@ abstract static class Builder { abstract Builder setRowFilter(ValueProvider rowFilter); + abstract Builder setRowFilterTextProto(ValueProvider rowFilter); + abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount); abstract Builder setKeyRanges(ValueProvider> keyRanges); @@ -110,6 +115,8 @@ void populateDisplayData(DisplayData.Builder builder) { builder .addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id")) .addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter")) + .addIfNotNull( + DisplayData.item("rowFilterTextProto", getRowFilterTextProto()).withLabel("Row Filter")) .addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges")) .addIfNotNull( DisplayData.item("attemptTimeout", getAttemptTimeout()) @@ -127,6 +134,11 @@ void validate() { if (getRowFilter() != null && getRowFilter().isAccessible()) { checkArgument(getRowFilter().get() != null, "rowFilter can not be null"); } + + if (getRowFilterTextProto() != null && getRowFilterTextProto().isAccessible()) { + checkNotNull(getRowFilterTextProto(), "rowFilter can not be null"); + } + if (getMaxBufferElementCount() != null) { checkArgument( getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 2065772a9a4f..a654f63deea7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -680,6 +680,69 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception { defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)), Lists.newArrayList(filteredRows)); } + + /** Tests reading rows using a text proto filter provided through ValueProvider. */ + @Test + public void testReadingWithRowFilterTextProto() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + List testRows = makeTableData(table, numRows); + String regex = ".*17.*"; + final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex); + Iterable filteredRows = + testRows.stream() + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate.apply(input.getKey()); + }) + .collect(Collectors.toList()); + + String filter = "row_key_regex_filter: \".*17.*\""; + service.setupSampleRowKeys(table, 5, 10L); + + runReadTest( + defaultRead.withTableId(table).withRowFilterTextProto(StaticValueProvider.of(filter)), + Lists.newArrayList(filteredRows)); + } + + /** Tests reading rows using both filter providers. */ + @Test + public void testReadingWithBothTextFilterAndRowFilter() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + List testRows = makeTableData(table, numRows); + String regex1 = ".*17.*"; + String regex2 = ".*2.*"; + final KeyMatchesRegex keyPredicate1 = new KeyMatchesRegex(regex1); + final KeyMatchesRegex keyPredicate2 = new KeyMatchesRegex(regex2); + Iterable filteredRows = + testRows.stream() + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate1.apply(input.getKey()); + }) + .filter( + input -> { + verifyNotNull(input, "input"); + return keyPredicate2.apply(input.getKey()); + }) + .collect(Collectors.toList()); + + String filter = "row_key_regex_filter: \".*17.*\""; + RowFilter rowFilter = + RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex2)).build(); + service.setupSampleRowKeys(table, 5, 10L); + + runReadTest( + defaultRead + .withTableId(table) + .withRowFilterTextProto(StaticValueProvider.of(filter)) + .withRowFilter(rowFilter), + Lists.newArrayList(filteredRows)); + } + /** Tests dynamic work rebalancing exhaustively. */ @Test public void testReadingSplitAtFractionExhaustive() throws Exception { @@ -1778,17 +1841,27 @@ private static class FakeBigtableReader implements BigtableService.Reader { private final FakeBigtableService service; private Iterator> rows; private Row currentRow; - private final Predicate filter; + private final List> filters = new ArrayList<>(); public FakeBigtableReader(BigtableSource source, FakeBigtableService service) { this.source = source; this.service = service; if (source.getRowFilter() == null) { - filter = Predicates.alwaysTrue(); + filters.add(Predicates.alwaysTrue()); } else { - ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter(); - checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); - filter = new KeyMatchesRegex(keyRegex.toStringUtf8()); + RowFilter rowFilter = source.getRowFilter(); + if (rowFilter.hasChain()) { + RowFilter.Chain chain = rowFilter.getChain(); + for (RowFilter filter : chain.getFiltersList()) { + ByteString keyRegex = filter.getRowKeyRegexFilter(); + checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); + filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8())); + } + } else { + ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter(); + checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported"); + filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8())); + } } service.verifyTableExists(source.getTableId().get()); } @@ -1809,7 +1882,13 @@ public boolean advance() throws IOException { Map.Entry entry = null; while (rows.hasNext()) { entry = rows.next(); - if (!filter.apply(entry.getKey()) + boolean predicateMatched = true; + for (Predicate predicate : filters) { + if (!predicate.apply(entry.getKey())) { + predicateMatched = false; + } + } + if (!predicateMatched || !rangesContainsKey(source.getRanges(), makeByteKey(entry.getKey()))) { // Does not match row filter or does not match source range. Skip. entry = null;