From 9fbf56871ec54d449b839930f9dd70501c670aca Mon Sep 17 00:00:00 2001 From: Angad Singh Date: Sun, 29 Jun 2014 15:33:41 +0530 Subject: [PATCH] Hbase 0.90.x compatibility [remove dependency of fuzzy row filter] --- .../hraven/datasource/HdfsStatsService.java | 9 +- .../twitter/hraven/util/FuzzyRowFilter.java | 290 ++++++++++++++++++ 2 files changed, 294 insertions(+), 5 deletions(-) create mode 100644 hraven-core/src/main/java/com/twitter/hraven/util/FuzzyRowFilter.java diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/HdfsStatsService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/HdfsStatsService.java index f40dcdc..4470885 100644 --- a/hraven-core/src/main/java/com/twitter/hraven/datasource/HdfsStatsService.java +++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/HdfsStatsService.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -44,9 +43,9 @@ import com.twitter.hraven.HdfsStats; import com.twitter.hraven.HdfsStatsKey; import com.twitter.hraven.QualifiedPathKey; +import com.twitter.hraven.util.FuzzyRowFilter; import com.twitter.hraven.util.StringUtil; - /** * Service that accesses the hdfs stats tables and populates the HdfsStats object */ @@ -296,10 +295,10 @@ Scan GenerateScanFuzzy(long starttime, long endtime, String cluster, String path @SuppressWarnings("unchecked") FuzzyRowFilter rowFilter = - new FuzzyRowFilter(Arrays.asList( - new Pair(Bytes.toBytesBinary(rowKey), fuzzyInfo))); + new FuzzyRowFilter(Arrays.asList(new Pair(Bytes.toBytesBinary(rowKey), + fuzzyInfo))); - scan.setFilter(rowFilter); + scan.setFilter(rowFilter); String minStartKey = Long.toString(getEncodedRunId(starttime)); String maxEndKey = Long.toString(getEncodedRunId(endtime)); LOG.info(starttime + " " + getEncodedRunId(starttime) + " min " + minStartKey + " " + endtime diff --git a/hraven-core/src/main/java/com/twitter/hraven/util/FuzzyRowFilter.java b/hraven-core/src/main/java/com/twitter/hraven/util/FuzzyRowFilter.java new file mode 100644 index 0000000..8311f29 --- /dev/null +++ b/hraven-core/src/main/java/com/twitter/hraven/util/FuzzyRowFilter.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.hraven.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Filters data based on fuzzy row key. Performs fast-forwards during scanning. + * It takes pairs (row key, fuzzy info) to match row keys. Where fuzzy info is + * a byte array with 0 or 1 as its values: + *
    + *
  • + * 0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position + * must match + *
  • + *
  • + * 1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this + * position can be different from the one in provided row key + *
  • + *
+ * + * + * Example: + * Let's assume row key format is userId_actionId_year_month. Length of userId is fixed + * and is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. + * + * Let's assume that we need to fetch all users that performed certain action (encoded as "99") + * in Jan of any year. Then the pair (row key, fuzzy info) would be the following: + * row key = "????_99_????_01" (one can use any value instead of "?") + * fuzzy info = "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" + * + * I.e. fuzzy info tells the matching mask is "????_99_????_01", where at ? can be any value. + * + */ +public class FuzzyRowFilter extends FilterBase { + private List> fuzzyKeysData; + private boolean done = false; + + /** + * Used internally for reflection, do NOT use it directly + */ + public FuzzyRowFilter() { + } + + public FuzzyRowFilter(List> fuzzyKeysData) { + this.fuzzyKeysData = fuzzyKeysData; + } + + // TODO: possible improvement: save which fuzzy row key to use when providing a hint + @Override + public ReturnCode filterKeyValue(KeyValue kv) { + byte[] rowKey = kv.getRow(); + // assigning "worst" result first and looking for better options + SatisfiesCode bestOption = SatisfiesCode.NO_NEXT; + for (Pair fuzzyData : fuzzyKeysData) { + SatisfiesCode satisfiesCode = + satisfies(rowKey, fuzzyData.getFirst(), fuzzyData.getSecond()); + if (satisfiesCode == SatisfiesCode.YES) { + return ReturnCode.INCLUDE; + } + + if (satisfiesCode == SatisfiesCode.NEXT_EXISTS) { + bestOption = SatisfiesCode.NEXT_EXISTS; + } + } + + if (bestOption == SatisfiesCode.NEXT_EXISTS) { + return ReturnCode.SEEK_NEXT_USING_HINT; + } + + // the only unhandled SatisfiesCode is NO_NEXT, i.e. we are done + done = true; + return ReturnCode.NEXT_ROW; + } + + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) { + byte[] rowKey = currentKV.getRow(); + byte[] nextRowKey = null; + // Searching for the "smallest" row key that satisfies at least one fuzzy row key + for (Pair fuzzyData : fuzzyKeysData) { + byte[] nextRowKeyCandidate = getNextForFuzzyRule(rowKey, + fuzzyData.getFirst(), fuzzyData.getSecond()); + if (nextRowKeyCandidate == null) { + continue; + } + if (nextRowKey == null || Bytes.compareTo(nextRowKeyCandidate, nextRowKey) < 0) { + nextRowKey = nextRowKeyCandidate; + } + } + + if (nextRowKey == null) { + // SHOULD NEVER happen + // TODO: is there a better way than throw exception? (stop the scanner?) + throw new IllegalStateException("No next row key that satisfies fuzzy exists when" + + " getNextKeyHint() is invoked." + + " Filter: " + this.toString() + + " currentKV: " + currentKV.toString()); + } + + return KeyValue.createFirstOnRow(nextRowKey); + } + + @Override + public boolean filterAllRemaining() { + return done; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(this.fuzzyKeysData.size()); + for (Pair fuzzyData : fuzzyKeysData) { + Bytes.writeByteArray(dataOutput, fuzzyData.getFirst()); + Bytes.writeByteArray(dataOutput, fuzzyData.getSecond()); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + int count = dataInput.readInt(); + this.fuzzyKeysData = new ArrayList>(count); + for (int i = 0; i < count; i++) { + byte[] keyBytes = Bytes.readByteArray(dataInput); + byte[] keyMeta = Bytes.readByteArray(dataInput); + this.fuzzyKeysData.add(new Pair(keyBytes, keyMeta)); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("FuzzyRowFilter"); + sb.append("{fuzzyKeysData="); + for (Pair fuzzyData : fuzzyKeysData) { + sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":"); + sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}'); + } + sb.append("}, "); + return sb.toString(); + } + + // Utility methods + + static enum SatisfiesCode { + // row satisfies fuzzy rule + YES, + // row doesn't satisfy fuzzy rule, but there's possible greater row that does + NEXT_EXISTS, + // row doesn't satisfy fuzzy rule and there's no greater row that does + NO_NEXT + } + + static SatisfiesCode satisfies(byte[] row, + byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { + return satisfies(row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta); + } + + private static SatisfiesCode satisfies(byte[] row, int offset, int length, + byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { + if (row == null) { + // do nothing, let scan to proceed + return SatisfiesCode.YES; + } + + boolean nextRowKeyCandidateExists = false; + + for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) { + // First, checking if this position is fixed and not equals the given one + boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0; + boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset]; + if (fixedByteIncorrect) { + // in this case there's another row that satisfies fuzzy rule and bigger than this row + if (nextRowKeyCandidateExists) { + return SatisfiesCode.NEXT_EXISTS; + } + + // If this row byte is less than fixed then there's a byte array bigger than + // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array: + // this row is simply bigger than any byte array that satisfies the fuzzy rule + boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF); + return rowByteLessThanFixed ? SatisfiesCode.NEXT_EXISTS : SatisfiesCode.NO_NEXT; + } + + // Second, checking if this position is not fixed and byte value is not the biggest. In this + // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get + // bigger byte array that satisfies the rule we need to just increase this byte + // (see the code of getNextForFuzzyRule below) by one. + // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's + // bigger one that satisfies the rule as it can't be increased. + if (fuzzyKeyMeta[i] == 1 && !isMax(fuzzyKeyBytes[i])) { + nextRowKeyCandidateExists = true; + } + } + + return SatisfiesCode.YES; + } + + private static boolean isMax(byte fuzzyKeyByte) { + return (fuzzyKeyByte & 0xFF) == 255; + } + + static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { + return getNextForFuzzyRule(row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta); + } + + /** + * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, + * null otherwise + */ + private static byte[] getNextForFuzzyRule(byte[] row, int offset, int length, + byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { + // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than + // the given one we do the following: + // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes + // 2. if during the first step given row did not increase, then we increase the value at + // the first "non-fixed" position (where it is not maximum already) + + // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position + // values than otherwise. + byte[] result = Arrays.copyOf(fuzzyKeyBytes, + length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length); + int toInc = -1; + + boolean increased = false; + for (int i = 0; i < result.length; i++) { + if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 1) { + result[i] = row[offset + i]; + if (!isMax(row[i])) { + // this is "non-fixed" position and is not at max value, hence we can increase it + toInc = i; + } + } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == 0) { + if ((row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF)) { + // if setting value for any fixed position increased the original array, + // we are OK + increased = true; + break; + } + if ((row[i + offset] & 0xFF) > (fuzzyKeyBytes[i] & 0xFF)) { + // if setting value for any fixed position makes array "smaller", then just stop: + // in case we found some non-fixed position to increase we will do it, otherwise + // there's no "next" row key that satisfies fuzzy rule and "greater" than given row + break; + } + } + } + + if (!increased) { + if (toInc < 0) { + return null; + } + result[toInc]++; + + // Setting all "non-fixed" positions to zeroes to the right of the one we increased so + // that found "next" row key is the smallest possible + for (int i = toInc + 1; i < result.length; i++) { + if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 1) { + result[i] = 0; + } + } + } + + return result; + } +}