diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index dd2bc8b2f9c7..576016e5fab2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; @@ -437,8 +436,8 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr return; } HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(rpcTimeoutNs, remainingNs), - calcPriority(serverReq.getPriority(), tableName), tableName); + resetController(controller, Math.min(rpcTimeoutNs, remainingNs), serverReq.getPriority(), + tableName); controller.setRequestAttributes(requestAttributes); if (!cells.isEmpty()) { controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cells)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index 1c2c27fdc3f1..b57b154912ce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -20,7 +20,9 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -42,16 +44,25 @@ public interface Callable { CompletableFuture call(HBaseRpcController controller, MasterService.Interface stub); } + private final Optional tableName; + private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, - int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + Callable callable, TableName tableName, int priority, long pauseNs, + long pauseNsForServerOverloaded, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); + this.tableName = Optional.ofNullable(tableName); this.callable = callable; } + @Override + protected Optional getTableName() { + return tableName; + } + private void clearMasterStubCacheOnError(MasterService.Interface stub, Throwable error) { // ServerNotRunningYetException may because it is the backup master. if ( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 32da6eedd10f..cabb5dfc971f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -89,7 +89,6 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr this.startLogErrorsCnt = startLogErrorsCnt; this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); - this.controller.setPriority(priority); this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 1ea2a1ad7dd4..4f2bb3a114c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; -import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; @@ -160,7 +159,6 @@ private void preCheck() { checkNotNull(row, "row is null"); checkNotNull(locateType, "locateType is null"); checkNotNull(callable, "action is null"); - this.priority = calcPriority(priority, tableName); } public AsyncSingleRequestRpcRetryingCaller build() { @@ -303,7 +301,6 @@ private void preCheck() { checkNotNull(consumer, "consumer is null"); checkNotNull(stub, "stub is null"); checkNotNull(loc, "location is null"); - this.priority = calcPriority(priority, loc.getRegion().getTable()); } public AsyncScanSingleRegionRpcRetryingCaller build() { @@ -411,6 +408,8 @@ public class MasterRequestCallerBuilder extends BuilderBase { private int priority = PRIORITY_UNSET; + private TableName tableName; + public MasterRequestCallerBuilder action(AsyncMasterRequestRpcRetryingCaller.Callable callable) { this.callable = callable; @@ -447,13 +446,13 @@ public MasterRequestCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { return this; } - public MasterRequestCallerBuilder priority(TableName tableName) { - this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); + public MasterRequestCallerBuilder tableName(TableName tableName) { + this.tableName = tableName; return this; } public MasterRequestCallerBuilder priority(int priority) { - this.priority = Math.max(this.priority, priority); + this.priority = priority; return this; } @@ -463,9 +462,9 @@ private void preCheck() { public AsyncMasterRequestRpcRetryingCaller build() { preCheck(); - return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, - pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, tableName, + priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 51a9a07c9a2c..3b8951789761 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -358,7 +358,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.future = new CompletableFuture<>(); this.priority = priority; this.controller = conn.rpcControllerFactory.newController(); - this.controller.setPriority(priority); this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.pauseManager = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 40e205ddca86..42ce44ee31d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -205,7 +205,7 @@ static void resetController(HBaseRpcController controller, long timeoutNs, int p controller.setCallTimeout( (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); } - controller.setPriority(priority); + controller.setPriority(priority, tableName); if (tableName != null) { controller.setTableName(tableName); } @@ -504,34 +504,6 @@ static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValu } } - /** - * Select the priority for the rpc call. - *

- * The rules are: - *

    - *
  1. If user set a priority explicitly, then just use it.
  2. - *
  3. For system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  4. - *
  5. For other tables, use {@link HConstants#NORMAL_QOS}.
  6. - *
- * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. - * @param tableName the table we operate on - */ - static int calcPriority(int priority, TableName tableName) { - if (priority != HConstants.PRIORITY_UNSET) { - return priority; - } else { - return getPriority(tableName); - } - } - - static int getPriority(TableName tableName) { - if (tableName.isSystemTable()) { - return HConstants.SYSTEMTABLE_QOS; - } else { - return HConstants.NORMAL_QOS; - } - } - static CompletableFuture getOrFetch(AtomicReference cacheRef, AtomicReference> futureRef, boolean reload, Supplier> fetch, Predicate validator, String type) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index f4a474957a2f..ea51d27b99a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -542,7 +542,7 @@ private CompletableFuture procedureCall(TableName tableName, private CompletableFuture procedureCall(TableName tableName, PREQ preq, MasterRpcCall rpcCall, Converter respConverter, Converter resultConverter, ProcedureBiConsumer consumer) { - return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter, + return procedureCall(b -> b.tableName(tableName), preq, rpcCall, respConverter, resultConverter, consumer); } @@ -686,7 +686,7 @@ ListTableNamesByNamespaceResponse, List> call(controller, stub, @Override public CompletableFuture getDescriptor(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); - addListener(this.> newMasterCaller().priority(tableName) + addListener(this.> newMasterCaller().tableName(tableName) .action((controller, stub) -> this.> call(controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), @@ -1764,7 +1764,7 @@ public CompletableFuture assign(byte[] regionName) { return; } addListener( - this. newMasterCaller().priority(regionInfo.getTable()) + this. newMasterCaller().tableName(regionInfo.getTable()) .action((controller, stub) -> this. call( controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)) @@ -1789,7 +1789,7 @@ public CompletableFuture unassign(byte[] regionName) { return; } addListener( - this. newMasterCaller().priority(regionInfo.getTable()) + this. newMasterCaller().tableName(regionInfo.getTable()) .action((controller, stub) -> this. call(controller, stub, RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()), @@ -1814,7 +1814,7 @@ public CompletableFuture offline(byte[] regionName) { future.completeExceptionally(err); return; } - addListener(this. newMasterCaller().priority(regionInfo.getTable()) + addListener(this. newMasterCaller().tableName(regionInfo.getTable()) .action((controller, stub) -> this. call( controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)) @@ -1876,7 +1876,7 @@ public CompletableFuture move(byte[] regionName, ServerName destServerName } private CompletableFuture moveRegion(RegionInfo regionInfo, MoveRegionRequest request) { - return this. newMasterCaller().priority(regionInfo.getTable()) + return this. newMasterCaller().tableName(regionInfo.getTable()) .action( (controller, stub) -> this. call(controller, stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 5b220a24ec56..a99c20817474 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -21,7 +21,9 @@ import java.util.Map; import org.apache.hadoop.hbase.ExtendedCellScanner; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.yetus.audience.InterfaceAudience; +import org.checkerframework.checker.nullness.qual.Nullable; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; @@ -93,6 +95,21 @@ public void setPriority(TableName tn) { delegate.setPriority(tn); } + @Override + public void setPriority(int priority, @Nullable TableName tableName) { + delegate.setPriority(priority, tableName); + } + + @Override + public boolean hasRegionInfo() { + return delegate.hasRegionInfo(); + } + + @Override + public RegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } + @Override public int getPriority() { return delegate.getPriority(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 8fe44ca59cfc..ec0a7da0075d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.checkerframework.checker.nullness.qual.Nullable; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; @@ -54,15 +55,41 @@ public interface HBaseRpcController extends RpcController, ExtendedCellScannable * Set the priority for this operation. * @param priority Priority for this request; should fall roughly in the range * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} + * @deprecated Since 3.0.0, will be remove in 4.0.0. Use {@link #setPriority(int, TableName)} + * instead. */ + @Deprecated void setPriority(int priority); /** * Set the priority for this operation. * @param tn Set priority based off the table we are going against. + * @deprecated Since 3.0.0, will be remove in 4.0.0. Use {@link #setPriority(int, TableName)} + * instead. */ + @Deprecated void setPriority(final TableName tn); + /** + * Set the priority for this rpc request. + *

+ * For keep compatibility, here we declare the default method where we call + * {@link #setPriority(int)} and then {@link #setPriority(TableName)}. + *

+ * The default implementation in HBase follow the below rules: + *

    + *
  1. If user set a priority explicitly, then just use it.
  2. + *
  3. For system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  4. + *
  5. For other tables, use {@link HConstants#NORMAL_QOS}.
  6. + *
+ * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. + * @param tableName the table we operate on, can be null. + */ + default void setPriority(int priority, @Nullable TableName tableName) { + setPriority(priority); + setPriority(tableName); + } + /** Returns The priority of this request */ int getPriority(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 0667ce2ee627..3cc01d2b52b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.yetus.audience.InterfaceAudience; +import org.checkerframework.checker.nullness.qual.Nullable; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; @@ -133,6 +134,33 @@ public void setPriority(final TableName tn) { tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); } + static int calcPriority(int priority, TableName tableName) { + if (priority != HConstants.PRIORITY_UNSET) { + return priority; + } else { + return getPriority(tableName); + } + } + + static int getPriority(TableName tableName) { + if (tableName.isSystemTable()) { + return HConstants.SYSTEMTABLE_QOS; + } else { + return HConstants.NORMAL_QOS; + } + } + + @Override + public void setPriority(int priority, @Nullable TableName tableName) { + if (priority != HConstants.PRIORITY_UNSET) { + this.priority = priority; + } else if (tableName != null && tableName.isSystemTable()) { + this.priority = HConstants.SYSTEMTABLE_QOS; + } else { + this.priority = HConstants.NORMAL_QOS; + } + } + @Override public int getPriority() { return priority < 0 ? HConstants.NORMAL_QOS : priority; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index a0ca5b990dd1..e3c82ccfe346 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Optional; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ExtendedCellScanner; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -45,6 +48,8 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller private final Entry[] entries; + private final Optional tableName; + // whether to use replay instead of replicateToReplica, during rolling upgrading if the target // region server has not been upgraded then it will not have the replicateToReplica method, so we // could use replay method first, though it is not perfect. @@ -53,14 +58,19 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, RegionInfo replica, List entries) { - super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), - conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, - operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(), - Collections.emptyMap()); + super(retryTimer, conn, HConstants.PRIORITY_UNSET, conn.connConf.getPauseNs(), + conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, operationTimeoutNs, rpcTimeoutNs, + conn.connConf.getStartLogErrorsCnt(), Collections.emptyMap()); this.replica = replica; + this.tableName = Optional.of(replica.getTable()); this.entries = entries.toArray(new Entry[0]); } + @Override + protected Optional getTableName() { + return tableName; + } + @Override protected Throwable preProcessError(Throwable error) { if (