Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ColumnRange;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -47,13 +49,16 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
Expand Down Expand Up @@ -98,6 +103,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.ToStringHelper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
Expand Down Expand Up @@ -610,6 +616,35 @@ public Read withRowFilter(RowFilter filter) {
return withRowFilter(StaticValueProvider.of(filter));
}

/**
* Convenient method to return a new {@link BigtableIO.Read} that will filter the rows read from
* Cloud Bigtable using the given columns. The columns should be in the format of
* "family1:*,family2:!{qualifier1|qualifier2}".
*
* <ul>
* <li>Use "*" to include all the qualifiers in a family. For example, "cf1:*" includes all
* the qualifiers in family "cf1".
* <li>Use "!" to exclude some qualifiers from a family. Qualifiers are delimited by "|". For
* example, "cf1:!{q1|q2}" excludes qualifier "q1" and "q2" from family "cf1".
* <li>"cf1:{q1|q2}" includes qualifier "q1" and "q2" from family "cf1".
* <li>Family and qualifier groups are delimited by ",".
* <li>Only specified families are included.
* <li>If there's a "," or "|" in the qualifier name, escape it with "\". For example,
* "cf1:{q1_a\\,q1_b\\|q1_c} selects qualifier "q1_a,q1_b|q1_c" from family "cf1".
* </ul>
*
* If {@link #withRowFilter(RowFilter)} is also present, the filter will be chained.
*
* <p>Does not modify this object.
*/
public Read withColumns(ValueProvider<String> columns) {
checkArgumentNotNull(columns, "columns cannot be null");
BigtableReadOptions readOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(readOptions.toBuilder().setColumns(columns).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
* This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
Expand Down Expand Up @@ -1568,6 +1603,10 @@ public String toString() {
}

////// Private state and internal implementation details //////
private static final Pattern COLUMN_REGEX =
Pattern.compile("(?<family>[^:]+):(?<qualifier>.*)");
private static final Pattern QUALIFIER_REGEX = Pattern.compile("!?\\{(?<q>.*)\\}");

private final BigtableConfig config;
private final BigtableReadOptions readOptions;
private @Nullable Long estimatedSizeBytes;
Expand Down Expand Up @@ -1938,8 +1977,106 @@ public List<ByteKeyRange> getRanges() {
}

public @Nullable RowFilter getRowFilter() {
ValueProvider<RowFilter> rowFilter = readOptions.getRowFilter();
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
RowFilter rowFilter = null;
if (readOptions.getRowFilter() != null && readOptions.getRowFilter().isAccessible()) {
rowFilter = readOptions.getRowFilter().get();
}

RowFilter columnsFilter = null;
if (readOptions.getColumns() != null && readOptions.getColumns().isAccessible()) {
columnsFilter = parseColumns(readOptions.getColumns().get());
}

if (rowFilter != null && columnsFilter != null) {
return RowFilter.newBuilder()
.setChain(RowFilter.Chain.newBuilder().addFilters(rowFilter).addFilters(columnsFilter))
.build();
} else if (rowFilter != null) {
return rowFilter;
} else if (columnsFilter != null) {
return columnsFilter;
}
return null;
}

private RowFilter parseColumns(String input) {
RowFilter.Interleave.Builder filterBuilder = RowFilter.Interleave.newBuilder();
// qualifier can contain ",", the content "," should be escaped
Iterable<String> columns = Splitter.onPattern("(?<!\\\\),").split(input);

for (String column : columns) {
Matcher matcher = COLUMN_REGEX.matcher(column);
checkArgument(matcher.matches(), "Invalid column format: %s", column);
String family = matcher.group("family");
String qualifiersStr = matcher.group("qualifiers");
if (qualifiersStr.equals("*")) {
filterBuilder.addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter(family).build());
} else {
boolean exclude = qualifiersStr.startsWith("!");
Matcher qualifierMatcher = QUALIFIER_REGEX.matcher(qualifiersStr);
checkArgument(qualifierMatcher.matches(), "Invalid qualifier format: %s", column);
Iterable<String> qualifiers = Splitter.onPattern("(?<!\\\\)\\|").split(input);
if (exclude) {
addExcludeQualifiers(filterBuilder, family, qualifiers);
} else {
addIncludeQualifiers(filterBuilder, family, qualifiers);
}
}
}
return RowFilter.newBuilder().setInterleave(filterBuilder.build()).build();
}

private void addExcludeQualifiers(
RowFilter.Interleave.Builder builder, String family, Iterable<String> qualifiers) {
TreeSet<String> sorted =
new TreeSet<>(
(a, b) -> {
ByteString a1 = ByteString.copyFromUtf8(a);
ByteString b1 = ByteString.copyFromUtf8(b);
return ByteStringComparator.INSTANCE.compare(a1, b1);
});
qualifiers.forEach(sorted::add);

ByteString prev = ByteString.EMPTY;

for (String q : qualifiers) {
q = cleanupQualifier(q);
builder.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName(family)
.setStartQualifierOpen(prev)
.setEndQualifierOpen(ByteString.copyFromUtf8(q))
.build()));
prev = ByteString.copyFromUtf8(q);
}
builder.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName(family)
.setStartQualifierOpen(prev)
.build()));
}

private void addIncludeQualifiers(
RowFilter.Interleave.Builder builder, String family, Iterable<String> qualifiers) {
for (String q : qualifiers) {
q = cleanupQualifier(q);
builder.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName(family)
.setStartQualifierClosed(ByteString.copyFromUtf8(q))
.setEndQualifierClosed(ByteString.copyFromUtf8(q))
.build()));
}
}

private String cleanupQualifier(String input) {
return input.replace("\\|", "|").replace("\\,", ",");
}

public @Nullable Integer getMaxBufferElementCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ abstract class BigtableReadOptions implements Serializable {
/** Returns the row filter to use. */
abstract @Nullable ValueProvider<RowFilter> getRowFilter();

abstract @Nullable ValueProvider<String> getColumns();

/** Returns the key ranges to read. */
abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();

Expand Down Expand Up @@ -73,6 +75,8 @@ abstract static class Builder {

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);

abstract Builder setColumns(ValueProvider<String> columns);

abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);

abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
Expand Down Expand Up @@ -110,6 +114,7 @@ void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
.addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter"))
.addIfNotNull(DisplayData.item("columns", getColumns()).withLabel("Columns to read"))
.addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges"))
.addIfNotNull(
DisplayData.item("attemptTimeout", getAttemptTimeout())
Expand All @@ -127,6 +132,11 @@ void validate() {
if (getRowFilter() != null && getRowFilter().isAccessible()) {
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
}

if (getColumns() != null && getColumns().isAccessible()) {
checkArgument(getColumns().get() != null, "columns cannot be null");
}

if (getMaxBufferElementCount() != null) {
checkArgument(
getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.ColumnRange;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
Expand Down Expand Up @@ -677,9 +678,154 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception {
service.setupSampleRowKeys(table, 5, 10L);

runReadTest(
defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)),
Lists.newArrayList(filteredRows));
defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows));
}

@Test
public void testParseColumnFilter() {
String test1 = "cf1:*,cf2:*";
RowFilter expected1 =
RowFilter.newBuilder()
.setInterleave(
RowFilter.Interleave.newBuilder()
.addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf1").build())
.addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf2").build()))
.build();
BigtableSource source =
new BigtableSource(
factory,
configId,
config,
BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test1)).build(),
null);
assertEquals(expected1, source.getRowFilter());

String test2 = "cf1:*,cf2:!{b|a}";
RowFilter expected2 =
RowFilter.newBuilder()
.setInterleave(
RowFilter.Interleave.newBuilder()
.addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf1").build())
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf2")
.setStartQualifierOpen(ByteString.EMPTY)
.setEndQualifierOpen(ByteString.copyFromUtf8("a"))
.build()))
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf2")
.setStartQualifierOpen(ByteString.copyFromUtf8("a"))
.setEndQualifierOpen(ByteString.copyFromUtf8("b"))
.build()))
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf2")
.setStartQualifierOpen(ByteString.copyFromUtf8("b"))
.build()))
.build())
.build();

source =
new BigtableSource(
factory,
configId,
config,
BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test2)).build(),
null);
assertEquals(expected2, source.getRowFilter());

String test3 = "cf1:{a|b},cf2:*,cf3:!{a\\,b\\|c}";
RowFilter expected3 =
RowFilter.newBuilder()
.setInterleave(
RowFilter.Interleave.newBuilder()
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf1")
.setStartQualifierClosed(ByteString.copyFromUtf8("a"))
.setEndQualifierClosed(ByteString.copyFromUtf8("a"))
.build()))
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf1")
.setStartQualifierClosed(ByteString.copyFromUtf8("b"))
.setEndQualifierClosed(ByteString.copyFromUtf8("b"))
.build()))
.addFilters(RowFilter.newBuilder().setFamilyNameRegexFilter("cf2").build())
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf3")
.setStartQualifierOpen(ByteString.EMPTY)
.setEndQualifierOpen(ByteString.copyFromUtf8("a,b|c"))
.build())
.build())
.addFilters(
RowFilter.newBuilder()
.setColumnRangeFilter(
ColumnRange.newBuilder()
.setFamilyName("cf3")
.setStartQualifierOpen(ByteString.copyFromUtf8("a,b|c"))
.build()))
.build())
.build();
source =
new BigtableSource(
factory,
configId,
config,
BigtableReadOptions.builder().setColumns(StaticValueProvider.of(test3)).build(),
null);
assertEquals(expected3, source.getRowFilter());

String test4Columns = "cf:*";
RowFilter test4Filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*")).build();
RowFilter expected4 =
RowFilter.newBuilder()
.setChain(
RowFilter.Chain.newBuilder()
.addFilters(
RowFilter.newBuilder()
.setRowKeyRegexFilter(ByteString.copyFromUtf8(".*"))
.build())
.addFilters(
RowFilter.newBuilder()
.setInterleave(
RowFilter.Interleave.newBuilder()
.addFilters(
RowFilter.newBuilder()
.setFamilyNameRegexFilter("cf")
.build())
.build()))
.build())
.build();
source =
new BigtableSource(
factory,
configId,
config,
BigtableReadOptions.builder()
.setRowFilter(StaticValueProvider.of(test4Filter))
.setColumns(StaticValueProvider.of(test4Columns))
.build(),
null);

assertEquals(expected4, source.getRowFilter());
}

/** Tests dynamic work rebalancing exhaustively. */
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {
Expand Down
Loading