Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -610,6 +611,23 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}

/**
* Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
* using the given row filter encoded with {@link RowUtils#encodeRowFilter(RowFilter)}. If
* {@link #withRowFilter(RowFilter)} is also set, it'll use the row filter specified in {@link
* #withRowFilter(RowFilter)}.
*
* <p>Does not modify this object.
*/
public Read withEncodedRowFilter(ValueProvider<String> filter) {
checkArgumentNotNull(filter, "filter can not be null");
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(
bigtableReadOptions.toBuilder().setEncodedRowFilter(filter).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
* This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
Expand Down Expand Up @@ -1939,7 +1957,19 @@ public List<ByteKeyRange> getRanges() {

public @Nullable RowFilter getRowFilter() {
ValueProvider<RowFilter> rowFilter = readOptions.getRowFilter();
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
if (rowFilter != null && rowFilter.isAccessible()) {
return rowFilter.get();
}
ValueProvider<String> encoded = readOptions.getEncodedRowFilter();
if (encoded != null && encoded.isAccessible()) {
String filterString = encoded.get();
try {
return RowUtils.decodeRowFilter(filterString);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Failed to decode row filter string", e);
}
}
return null;
}

public @Nullable Integer getMaxBufferElementCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ abstract class BigtableReadOptions implements Serializable {
/** Returns the row filter to use. */
abstract @Nullable ValueProvider<RowFilter> getRowFilter();

/** Returns the row filter string encoded with {@link RowUtils#encodeRowFilter(RowFilter)}. */
abstract @Nullable ValueProvider<String> getEncodedRowFilter();

/** Returns the key ranges to read. */
abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();

Expand Down Expand Up @@ -73,6 +76,8 @@ abstract static class Builder {

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);

abstract Builder setEncodedRowFilter(ValueProvider<String> serializedRowFilter);

abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);

abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
Expand Down Expand Up @@ -110,6 +115,9 @@ void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
.addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter"))
.addIfNotNull(
DisplayData.item("encodedRowFilter", getEncodedRowFilter())
.withLabel("Encoded Row Filter"))
.addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges"))
.addIfNotNull(
DisplayData.item("attemptTimeout", getAttemptTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;

import com.google.bigtable.v2.RowFilter;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Base64;

public class RowUtils {
public static final String KEY = "key";
Expand All @@ -33,4 +36,15 @@ public static ByteString byteString(byte[] bytes) {
public static ByteString byteStringUtf8(String value) {
return ByteString.copyFromUtf8(value);
}

/** Encode a row filter with Base64 encoding. */
public static String encodeRowFilter(RowFilter filter) {
return Base64.getEncoder().encodeToString(filter.toByteArray());
}

/** Decode a base64 encoded row filter string. */
public static RowFilter decodeRowFilter(String serialized) throws InvalidProtocolBufferException {
byte[] decoded = Base64.getDecoder().decode(serialized);
return RowFilter.parseFrom(decoded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,36 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception {
defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)),
Lists.newArrayList(filteredRows));
}

@Test
public void testReadingWithEncodedRowFilter() throws Exception {
final String table = "TEST-FILTER-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
String regex = ".*17.*";
final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
Iterable<Row> filteredRows =
testRows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
return keyPredicate.apply(input.getKey());
})
.collect(Collectors.toList());

RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
String serializedFilter = RowUtils.encodeRowFilter(filter);

service.setupSampleRowKeys(table, 5, 10L);

runReadTest(
defaultRead
.withTableId(table)
.withEncodedRowFilter(StaticValueProvider.of(serializedFilter)),
Lists.newArrayList(filteredRows));
}

/** Tests dynamic work rebalancing exhaustively. */
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {
Expand Down
Loading