diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 6441524fc847..a4916e7b1253 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -29,11 +29,13 @@ import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.NotFoundException; import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.ColumnRange; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Row; import com.google.bigtable.v2.RowFilter; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; @@ -47,6 +49,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; @@ -54,6 +57,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; @@ -98,6 +103,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.ToStringHelper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -610,6 +616,35 @@ public Read withRowFilter(RowFilter filter) { return withRowFilter(StaticValueProvider.of(filter)); } + /** + * Convenient method to return a new {@link BigtableIO.Read} that will filter the rows read from + * Cloud Bigtable using the given columns. The columns should be in the format of + * "family1:*,family2:!{qualifier1|qualifier2}". + * + * + * + * If {@link #withRowFilter(RowFilter)} is also present, the filter will be chained. + * + *

Does not modify this object. + */ + public Read withColumns(ValueProvider columns) { + checkArgumentNotNull(columns, "columns cannot be null"); + BigtableReadOptions readOptions = getBigtableReadOptions(); + return toBuilder() + .setBigtableReadOptions(readOptions.toBuilder().setColumns(columns).build()) + .build(); + } + /** * Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches. * This function will switch the base BigtableIO.Reader class to using the SegmentReader. If @@ -1568,6 +1603,10 @@ public String toString() { } ////// Private state and internal implementation details ////// + private static final Pattern COLUMN_REGEX = + Pattern.compile("(?[^:]+):(?.*)"); + private static final Pattern QUALIFIER_REGEX = Pattern.compile("!?\\{(?.*)\\}"); + private final BigtableConfig config; private final BigtableReadOptions readOptions; private @Nullable Long estimatedSizeBytes; @@ -1938,8 +1977,106 @@ public List getRanges() { } public @Nullable RowFilter getRowFilter() { - ValueProvider rowFilter = readOptions.getRowFilter(); - return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null; + RowFilter rowFilter = null; + if (readOptions.getRowFilter() != null && readOptions.getRowFilter().isAccessible()) { + rowFilter = readOptions.getRowFilter().get(); + } + + RowFilter columnsFilter = null; + if (readOptions.getColumns() != null && readOptions.getColumns().isAccessible()) { + columnsFilter = parseColumns(readOptions.getColumns().get()); + } + + if (rowFilter != null && columnsFilter != null) { + return RowFilter.newBuilder() + .setChain(RowFilter.Chain.newBuilder().addFilters(rowFilter).addFilters(columnsFilter)) + .build(); + } else if (rowFilter != null) { + return rowFilter; + } else if (columnsFilter != null) { + return columnsFilter; + } + return null; + } + + private RowFilter parseColumns(String input) { + RowFilter.Interleave.Builder filterBuilder = RowFilter.Interleave.newBuilder(); + // qualifier can contain ",", the content "," should be escaped + Iterable columns = Splitter.onPattern("(? qualifiers = Splitter.onPattern("(? qualifiers) { + TreeSet sorted = + new TreeSet<>( + (a, b) -> { + ByteString a1 = ByteString.copyFromUtf8(a); + ByteString b1 = ByteString.copyFromUtf8(b); + return ByteStringComparator.INSTANCE.compare(a1, b1); + }); + qualifiers.forEach(sorted::add); + + ByteString prev = ByteString.EMPTY; + + for (String q : qualifiers) { + q = cleanupQualifier(q); + builder.addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName(family) + .setStartQualifierOpen(prev) + .setEndQualifierOpen(ByteString.copyFromUtf8(q)) + .build())); + prev = ByteString.copyFromUtf8(q); + } + builder.addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName(family) + .setStartQualifierOpen(prev) + .build())); + } + + private void addIncludeQualifiers( + RowFilter.Interleave.Builder builder, String family, Iterable qualifiers) { + for (String q : qualifiers) { + q = cleanupQualifier(q); + builder.addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName(family) + .setStartQualifierClosed(ByteString.copyFromUtf8(q)) + .setEndQualifierClosed(ByteString.copyFromUtf8(q)) + .build())); + } + } + + private String cleanupQualifier(String input) { + return input.replace("\\|", "|").replace("\\,", ","); } public @Nullable Integer getMaxBufferElementCount() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java index 46834cc9756f..5e323330d30a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java @@ -43,6 +43,8 @@ abstract class BigtableReadOptions implements Serializable { /** Returns the row filter to use. */ abstract @Nullable ValueProvider getRowFilter(); + abstract @Nullable ValueProvider getColumns(); + /** Returns the key ranges to read. */ abstract @Nullable ValueProvider> getKeyRanges(); @@ -73,6 +75,8 @@ abstract static class Builder { abstract Builder setRowFilter(ValueProvider rowFilter); + abstract Builder setColumns(ValueProvider columns); + abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount); abstract Builder setKeyRanges(ValueProvider> keyRanges); @@ -110,6 +114,7 @@ void populateDisplayData(DisplayData.Builder builder) { builder .addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id")) .addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter")) + .addIfNotNull(DisplayData.item("columns", getColumns()).withLabel("Columns to read")) .addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges")) .addIfNotNull( DisplayData.item("attemptTimeout", getAttemptTimeout()) @@ -127,6 +132,11 @@ void validate() { if (getRowFilter() != null && getRowFilter().isAccessible()) { checkArgument(getRowFilter().get() != null, "rowFilter can not be null"); } + + if (getColumns() != null && getColumns().isAccessible()) { + checkArgument(getColumns().get() != null, "columns cannot be null"); + } + if (getMaxBufferElementCount() != null) { checkArgument( getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 2065772a9a4f..74a693f03cfc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -44,6 +44,7 @@ import com.google.auto.value.AutoValue; import com.google.bigtable.v2.Cell; import com.google.bigtable.v2.Column; +import com.google.bigtable.v2.ColumnRange; import com.google.bigtable.v2.Family; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; @@ -677,9 +678,154 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception { service.setupSampleRowKeys(table, 5, 10L); runReadTest( - defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)), - Lists.newArrayList(filteredRows)); + defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows)); } + + @Test + public void testParseColumnFilter() { + String test1 = "cf1:*,cf2:*"; + RowFilter expected1 = + RowFilter.newBuilder() + .setInterleave( + RowFilter.Interleave.newBuilder() + .addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf1").build()) + .addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf2").build())) + .build(); + BigtableSource source = + new BigtableSource( + factory, + configId, + config, + BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test1)).build(), + null); + assertEquals(expected1, source.getRowFilter()); + + String test2 = "cf1:*,cf2:!{b|a}"; + RowFilter expected2 = + RowFilter.newBuilder() + .setInterleave( + RowFilter.Interleave.newBuilder() + .addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf1").build()) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf2") + .setStartQualifierOpen(ByteString.EMPTY) + .setEndQualifierOpen(ByteString.copyFromUtf8("a")) + .build())) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf2") + .setStartQualifierOpen(ByteString.copyFromUtf8("a")) + .setEndQualifierOpen(ByteString.copyFromUtf8("b")) + .build())) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf2") + .setStartQualifierOpen(ByteString.copyFromUtf8("b")) + .build())) + .build()) + .build(); + + source = + new BigtableSource( + factory, + configId, + config, + BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test2)).build(), + null); + assertEquals(expected2, source.getRowFilter()); + + String test3 = "cf1:{a|b},cf2:*,cf3:!{a\\,b\\|c}"; + RowFilter expected3 = + RowFilter.newBuilder() + .setInterleave( + RowFilter.Interleave.newBuilder() + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf1") + .setStartQualifierClosed(ByteString.copyFromUtf8("a")) + .setEndQualifierClosed(ByteString.copyFromUtf8("a")) + .build())) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf1") + .setStartQualifierClosed(ByteString.copyFromUtf8("b")) + .setEndQualifierClosed(ByteString.copyFromUtf8("b")) + .build())) + .addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf2").build()) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf3") + .setStartQualifierOpen(ByteString.EMPTY) + .setEndQualifierOpen(ByteString.copyFromUtf8("a,b|c")) + .build()) + .build()) + .addFilters( + RowFilter.newBuilder() + .setColumnRangeFilter( + ColumnRange.newBuilder() + .setFamilyName("cf3") + .setStartQualifierOpen(ByteString.copyFromUtf8("a,b|c")) + .build())) + .build()) + .build(); + source = + new BigtableSource( + factory, + configId, + config, + BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test3)).build(), + null); + assertEquals(expected3, source.getRowFilter()); + + String test4Columns = "cf:*"; + RowFilter test4Filter = + RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*")).build(); + RowFilter expected4 = + RowFilter.newBuilder() + .setChain( + RowFilter.Chain.newBuilder() + .addFilters( + RowFilter.newBuilder() + .setRowKeyRegexFilter(ByteString.copyFromUtf8(".*")) + .build()) + .addFilters( + RowFilter.newBuilder() + .setInterleave( + RowFilter.Interleave.newBuilder() + .addFilters( + RowFilter.newBuilder() + .setFamilyNameRegexFilter("cf") + .build()) + .build())) + .build()) + .build(); + source = + new BigtableSource( + factory, + configId, + config, + BigtableReadOptions.builder() + .setRowFilter(StaticValueProvider.of(test4Filter)) + .setColumns(StaticValueProvider.of(test4Columns)) + .build(), + null); + + assertEquals(expected4, source.getRowFilter()); + } + /** Tests dynamic work rebalancing exhaustively. */ @Test public void testReadingSplitAtFractionExhaustive() throws Exception {