Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
Expand Down Expand Up @@ -437,8 +436,8 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName), tableName);
resetController(controller, Math.min(rpcTimeoutNs, remainingNs), serverReq.getPriority(),
tableName);
controller.setRequestAttributes(requestAttributes);
if (!cells.isEmpty()) {
controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cells));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
Expand All @@ -42,16 +44,25 @@ public interface Callable<T> {
CompletableFuture<T> call(HBaseRpcController controller, MasterService.Interface stub);
}

private final Optional<TableName> tableName;

private final Callable<T> callable;

public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
Callable<T> callable, TableName tableName, int priority, long pauseNs,
long pauseNsForServerOverloaded, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
this.tableName = Optional.ofNullable(tableName);
this.callable = callable;
}

@Override
protected Optional<TableName> getTableName() {
return tableName;
}

private void clearMasterStubCacheOnError(MasterService.Interface stub, Throwable error) {
// ServerNotRunningYetException may because it is the backup master.
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
this.startLogErrorsCnt = startLogErrorsCnt;
this.future = new CompletableFuture<>();
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.controller.setRequestAttributes(requestAttributes);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -160,7 +159,6 @@ private void preCheck() {
checkNotNull(row, "row is null");
checkNotNull(locateType, "locateType is null");
checkNotNull(callable, "action is null");
this.priority = calcPriority(priority, tableName);
}

public AsyncSingleRequestRpcRetryingCaller<T> build() {
Expand Down Expand Up @@ -303,7 +301,6 @@ private void preCheck() {
checkNotNull(consumer, "consumer is null");
checkNotNull(stub, "stub is null");
checkNotNull(loc, "location is null");
this.priority = calcPriority(priority, loc.getRegion().getTable());
}

public AsyncScanSingleRegionRpcRetryingCaller build() {
Expand Down Expand Up @@ -411,6 +408,8 @@ public class MasterRequestCallerBuilder<T> extends BuilderBase {

private int priority = PRIORITY_UNSET;

private TableName tableName;

public MasterRequestCallerBuilder<T>
action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
Expand Down Expand Up @@ -447,13 +446,13 @@ public MasterRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

public MasterRequestCallerBuilder<T> priority(TableName tableName) {
this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName));
public MasterRequestCallerBuilder<T> tableName(TableName tableName) {
this.tableName = tableName;
return this;
}

public MasterRequestCallerBuilder<T> priority(int priority) {
this.priority = Math.max(this.priority, priority);
this.priority = priority;
return this;
}

Expand All @@ -463,9 +462,9 @@ private void preCheck() {

public AsyncMasterRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, tableName,
priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.future = new CompletableFuture<>();
this.priority = priority;
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.controller.setRequestAttributes(requestAttributes);
this.exceptions = new ArrayList<>();
this.pauseManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ static void resetController(HBaseRpcController controller, long timeoutNs, int p
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
}
controller.setPriority(priority);
controller.setPriority(priority, tableName);
if (tableName != null) {
controller.setTableName(tableName);
}
Expand Down Expand Up @@ -504,34 +504,6 @@ static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValu
}
}

/**
* Select the priority for the rpc call.
* <p/>
* The rules are:
* <ol>
* <li>If user set a priority explicitly, then just use it.</li>
* <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
* </ol>
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
* @param tableName the table we operate on
*/
static int calcPriority(int priority, TableName tableName) {
if (priority != HConstants.PRIORITY_UNSET) {
return priority;
} else {
return getPriority(tableName);
}
}

static int getPriority(TableName tableName) {
if (tableName.isSystemTable()) {
return HConstants.SYSTEMTABLE_QOS;
} else {
return HConstants.NORMAL_QOS;
}
}

static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
AtomicReference<CompletableFuture<T>> futureRef, boolean reload,
Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName,
private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter,
return procedureCall(b -> b.tableName(tableName), preq, rpcCall, respConverter, resultConverter,
consumer);
}

Expand Down Expand Up @@ -686,7 +686,7 @@ ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
@Override
public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
addListener(this.<List<TableSchema>> newMasterCaller().tableName(tableName)
.action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
List<TableSchema>> call(controller, stub,
RequestConverter.buildGetTableDescriptorsRequest(tableName),
Expand Down Expand Up @@ -1764,7 +1764,7 @@ public CompletableFuture<Void> assign(byte[] regionName) {
return;
}
addListener(
this.<Void> newMasterCaller().priority(regionInfo.getTable())
this.<Void> newMasterCaller().tableName(regionInfo.getTable())
.action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
(s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
Expand All @@ -1789,7 +1789,7 @@ public CompletableFuture<Void> unassign(byte[] regionName) {
return;
}
addListener(
this.<Void> newMasterCaller().priority(regionInfo.getTable())
this.<Void> newMasterCaller().tableName(regionInfo.getTable())
.action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
Void> call(controller, stub,
RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
Expand All @@ -1814,7 +1814,7 @@ public CompletableFuture<Void> offline(byte[] regionName) {
future.completeExceptionally(err);
return;
}
addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
addListener(this.<Void> newMasterCaller().tableName(regionInfo.getTable())
.action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
(s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
Expand Down Expand Up @@ -1876,7 +1876,7 @@ public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName
}

private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
return this.<Void> newMasterCaller().priority(regionInfo.getTable())
return this.<Void> newMasterCaller().tableName(regionInfo.getTable())
.action(
(controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Map;
import org.apache.hadoop.hbase.ExtendedCellScanner;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.checkerframework.checker.nullness.qual.Nullable;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

Expand Down Expand Up @@ -93,6 +95,21 @@ public void setPriority(TableName tn) {
delegate.setPriority(tn);
}

@Override
public void setPriority(int priority, @Nullable TableName tableName) {
delegate.setPriority(priority, tableName);
}

@Override
public boolean hasRegionInfo() {
return delegate.hasRegionInfo();
}

@Override
public RegionInfo getRegionInfo() {
return delegate.getRegionInfo();
}

@Override
public int getPriority() {
return delegate.getPriority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.checkerframework.checker.nullness.qual.Nullable;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
Expand Down Expand Up @@ -54,15 +55,41 @@ public interface HBaseRpcController extends RpcController, ExtendedCellScannable
* Set the priority for this operation.
* @param priority Priority for this request; should fall roughly in the range
* {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
* @deprecated Since 3.0.0, will be remove in 4.0.0. Use {@link #setPriority(int, TableName)}
* instead.
*/
@Deprecated
void setPriority(int priority);

/**
* Set the priority for this operation.
* @param tn Set priority based off the table we are going against.
* @deprecated Since 3.0.0, will be remove in 4.0.0. Use {@link #setPriority(int, TableName)}
* instead.
*/
@Deprecated
void setPriority(final TableName tn);

/**
* Set the priority for this rpc request.
* <p>
* For keep compatibility, here we declare the default method where we call
* {@link #setPriority(int)} and then {@link #setPriority(TableName)}.
* <p>
* The default implementation in HBase follow the below rules:
* <ol>
* <li>If user set a priority explicitly, then just use it.</li>
* <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
* </ol>
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
* @param tableName the table we operate on, can be null.
*/
default void setPriority(int priority, @Nullable TableName tableName) {
setPriority(priority);
setPriority(tableName);
}

/** Returns The priority of this request */
int getPriority();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.checkerframework.checker.nullness.qual.Nullable;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

Expand Down Expand Up @@ -133,6 +134,33 @@ public void setPriority(final TableName tn) {
tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
}

static int calcPriority(int priority, TableName tableName) {
if (priority != HConstants.PRIORITY_UNSET) {
return priority;
} else {
return getPriority(tableName);
}
}

static int getPriority(TableName tableName) {
if (tableName.isSystemTable()) {
return HConstants.SYSTEMTABLE_QOS;
} else {
return HConstants.NORMAL_QOS;
}
}

@Override
public void setPriority(int priority, @Nullable TableName tableName) {
if (priority != HConstants.PRIORITY_UNSET) {
this.priority = priority;
} else if (tableName != null && tableName.isSystemTable()) {
this.priority = HConstants.SYSTEMTABLE_QOS;
} else {
this.priority = HConstants.NORMAL_QOS;
}
}

@Override
public int getPriority() {
return priority < 0 ? HConstants.NORMAL_QOS : priority;
Expand Down
Loading