Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.gax.batching.BatchingException;
Expand All @@ -38,6 +39,7 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -610,6 +612,22 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}

/**
* Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
* using the given {@link TextFormat} row filter. If {@link #withRowFilter(RowFilter)} is also
* set, the filters are chained.
*
* <p>Does not modify this object.
*/
public Read withRowFilterTextProto(ValueProvider<String> filter) {
checkNotNull(filter, "filter can not be null");
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(
bigtableReadOptions.toBuilder().setRowFilterTextProto(filter).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
* This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
Expand Down Expand Up @@ -1938,8 +1956,28 @@ public List<ByteKeyRange> getRanges() {
}

public @Nullable RowFilter getRowFilter() {
ValueProvider<RowFilter> rowFilter = readOptions.getRowFilter();
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
RowFilter.Chain.Builder chain = RowFilter.Chain.newBuilder();
ValueProvider<RowFilter> rowFilterValueProvider = readOptions.getRowFilter();
if (rowFilterValueProvider != null && rowFilterValueProvider.isAccessible()) {
chain.addFilters(rowFilterValueProvider.get());
}
ValueProvider<String> textFilterValueProvider = readOptions.getRowFilterTextProto();
if (textFilterValueProvider != null && textFilterValueProvider.isAccessible()) {
try {
chain.addFilters(TextFormat.parse(textFilterValueProvider.get(), RowFilter.class));
} catch (TextFormat.ParseException e) {
throw new RuntimeException("Failed to parse row filter text proto", e);
}
}

switch (chain.getFiltersCount()) {
case 0:
return null;
case 1:
return chain.getFilters(0);
default:
return RowFilter.newBuilder().setChain(chain.build()).build();
}
}

public @Nullable Integer getMaxBufferElementCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigtable;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.RowFilter;
Expand All @@ -43,6 +44,8 @@ abstract class BigtableReadOptions implements Serializable {
/** Returns the row filter to use. */
abstract @Nullable ValueProvider<RowFilter> getRowFilter();

abstract @Nullable ValueProvider<String> getRowFilterTextProto();

/** Returns the key ranges to read. */
abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();

Expand Down Expand Up @@ -73,6 +76,8 @@ abstract static class Builder {

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);

abstract Builder setRowFilterTextProto(ValueProvider<String> rowFilter);

abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);

abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
Expand Down Expand Up @@ -110,6 +115,8 @@ void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
.addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter"))
.addIfNotNull(
DisplayData.item("rowFilterTextProto", getRowFilterTextProto()).withLabel("Row Filter"))
.addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges"))
.addIfNotNull(
DisplayData.item("attemptTimeout", getAttemptTimeout())
Expand All @@ -127,6 +134,11 @@ void validate() {
if (getRowFilter() != null && getRowFilter().isAccessible()) {
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
}

if (getRowFilterTextProto() != null && getRowFilterTextProto().isAccessible()) {
checkNotNull(getRowFilterTextProto(), "rowFilter can not be null");
}

if (getMaxBufferElementCount() != null) {
checkArgument(
getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,69 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception {
defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)),
Lists.newArrayList(filteredRows));
}

/** Tests reading rows using a text proto filter provided through ValueProvider. */
@Test
public void testReadingWithRowFilterTextProto() throws Exception {
final String table = "TEST-FILTER-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
String regex = ".*17.*";
final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
Iterable<Row> filteredRows =
testRows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
return keyPredicate.apply(input.getKey());
})
.collect(Collectors.toList());

String filter = "row_key_regex_filter: \".*17.*\"";
service.setupSampleRowKeys(table, 5, 10L);

runReadTest(
defaultRead.withTableId(table).withRowFilterTextProto(StaticValueProvider.of(filter)),
Lists.newArrayList(filteredRows));
}

/** Tests reading rows using both filter providers. */
@Test
public void testReadingWithBothTextFilterAndRowFilter() throws Exception {
final String table = "TEST-FILTER-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
String regex1 = ".*17.*";
String regex2 = ".*2.*";
final KeyMatchesRegex keyPredicate1 = new KeyMatchesRegex(regex1);
final KeyMatchesRegex keyPredicate2 = new KeyMatchesRegex(regex2);
Iterable<Row> filteredRows =
testRows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
return keyPredicate1.apply(input.getKey());
})
.filter(
input -> {
verifyNotNull(input, "input");
return keyPredicate2.apply(input.getKey());
})
.collect(Collectors.toList());

String filter = "row_key_regex_filter: \".*17.*\"";
RowFilter rowFilter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex2)).build();
service.setupSampleRowKeys(table, 5, 10L);

runReadTest(
defaultRead
.withTableId(table)
.withRowFilterTextProto(StaticValueProvider.of(filter))
.withRowFilter(rowFilter),
Lists.newArrayList(filteredRows));
}

/** Tests dynamic work rebalancing exhaustively. */
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {
Expand Down Expand Up @@ -1778,17 +1841,27 @@ private static class FakeBigtableReader implements BigtableService.Reader {
private final FakeBigtableService service;
private Iterator<Map.Entry<ByteString, ByteString>> rows;
private Row currentRow;
private final Predicate<ByteString> filter;
private final List<Predicate<ByteString>> filters = new ArrayList<>();

public FakeBigtableReader(BigtableSource source, FakeBigtableService service) {
this.source = source;
this.service = service;
if (source.getRowFilter() == null) {
filter = Predicates.alwaysTrue();
filters.add(Predicates.alwaysTrue());
} else {
ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
RowFilter rowFilter = source.getRowFilter();
if (rowFilter.hasChain()) {
RowFilter.Chain chain = rowFilter.getChain();
for (RowFilter filter : chain.getFiltersList()) {
ByteString keyRegex = filter.getRowKeyRegexFilter();
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8()));
}
} else {
ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8()));
}
}
service.verifyTableExists(source.getTableId().get());
}
Expand All @@ -1809,7 +1882,13 @@ public boolean advance() throws IOException {
Map.Entry<ByteString, ByteString> entry = null;
while (rows.hasNext()) {
entry = rows.next();
if (!filter.apply(entry.getKey())
boolean predicateMatched = true;
for (Predicate<ByteString> predicate : filters) {
if (!predicate.apply(entry.getKey())) {
predicateMatched = false;
}
}
if (!predicateMatched
|| !rangesContainsKey(source.getRanges(), makeByteKey(entry.getKey()))) {
// Does not match row filter or does not match source range. Skip.
entry = null;
Expand Down
Loading