Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ public final class ContainerBalancerConfiguration {
"data node is very high")
private boolean triggerDuEnable = false;

@Config(key = "hdds.container.balancer.include.non.standard.containers", type = ConfigType.BOOLEAN,
defaultValue = "false", tags = {ConfigTag.BALANCER},
description = "Whether to include containers in non-standard states, such as " +
"over-replicated CLOSED containers with additional QUASI_CLOSED replicas " +
"or consistent QUASI_CLOSED containers.")
private boolean includeNonStandardContainers = false;

/**
* Gets the threshold value for Container Balancer.
*
Expand Down Expand Up @@ -432,6 +439,24 @@ public void setExcludeNodes(String excludeNodes) {
this.excludeNodes = excludeNodes;
}

/**
* Get the includeNonStandardContainers value for Container Balancer.
*
* @return the boolean value of includeNonStandardContainers
*/
public Boolean getIncludeNonStandardContainers() {
return includeNonStandardContainers;
}

/**
* Set the includeNonStandardContainers value for Container Balancer.
*
* @param enable the boolean value to be set to includeNonStandardContainers
*/
public void setIncludeNonStandardContainers(boolean enable) {
includeNonStandardContainers = enable;
}

@Override
public String toString() {
return String.format("Container Balancer Configuration values:%n" +
Expand Down Expand Up @@ -478,7 +503,9 @@ public String toString() {
"Datanodes Specified to be Balanced",
includeNodes.equals("") ? "None" : includeNodes,
"Datanodes Excluded from Balancing",
excludeNodes.equals("") ? "None" : excludeNodes);
excludeNodes.equals("") ? "None" : excludeNodes,
"Whether to include non-standard containers (over-replicated, quasi-closed) for balancing",
includeNonStandardContainers);
}

public ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
Expand All @@ -500,7 +527,8 @@ public ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
.setExcludeDatanodes(excludeNodes)
.setMoveNetworkTopologyEnable(networkTopologyEnable)
.setTriggerDuBeforeMoveEnable(triggerDuEnable)
.setMoveReplicationTimeout(moveReplicationTimeout);
.setMoveReplicationTimeout(moveReplicationTimeout)
.setIncludeNonStandardContainers(includeNonStandardContainers);
return builder;
}

Expand Down Expand Up @@ -555,6 +583,9 @@ static ContainerBalancerConfiguration fromProtobuf(
if (proto.hasMoveReplicationTimeout()) {
config.setMoveReplicationTimeout(proto.getMoveReplicationTimeout());
}
if (proto.hasIncludeNonStandardContainers()) {
config.setIncludeNonStandardContainers(proto.getIncludeNonStandardContainers());
}
return config;
}
}
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ message ContainerBalancerConfigurationProto {
optional int32 nextIterationIndex = 19;
optional int64 moveReplicationTimeout = 20;
optional string includeContainers = 21;
optional bool includeNonStandardContainers = 22;
}

message TransferLeadershipRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,15 @@ public boolean shouldBeExcluded(ContainerID containerID,
return true;
}

return !isContainerClosed(container, node, replicas) ||
!isContainerHealthyForMove(container, replicas) ||
isContainerReplicatingOrDeleting(containerID);
if (balancerConfiguration.getIncludeNonStandardContainers()) {
return !isContainerClosedRelaxed(container, node, replicas) ||
!isContainerHealthyForMoveRelaxed(container, replicas) ||
isContainerReplicatingOrDeleting(containerID);
} else {
return !isContainerClosed(container, node, replicas) ||
!isContainerHealthyForMove(container, replicas) ||
isContainerReplicatingOrDeleting(containerID);
}
}

/**
Expand Down Expand Up @@ -242,6 +248,91 @@ private boolean isContainerHealthyForMove(ContainerInfo container, Set<Container
return true;
}

/**
* Relaxed version of isContainerClosed used when includeNonStandardContainers is enabled.
* <p>
* - CLOSED containers: Allows moving non-empty QUASI_CLOSED replicas if minimum CLOSED replicas exist
* - QUASI_CLOSED containers: Allows moving if all replicas are QUASI_CLOSED and not empty
*
* @param container container to check
* @param datanodeDetails datanode on which a replica of the container is present
* @param replicas all replicas of the container
* @return true if container and replica are eligible for balancing, else false
*/
private boolean isContainerClosedRelaxed(ContainerInfo container,
DatanodeDetails datanodeDetails,
Set<ContainerReplica> replicas) {
HddsProtos.LifeCycleState containerState = container.getState();
// Find the specific replica on this datanode
ContainerReplica targetReplica = replicas.stream()
.filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
.findFirst()
.orElse(null);
if (targetReplica == null) {
return false;
}
ContainerReplicaProto.State replicaState = targetReplica.getState();

// Case 1: Container is CLOSED
if (containerState == HddsProtos.LifeCycleState.CLOSED) {
if (replicaState == ContainerReplicaProto.State.CLOSED) {
return true;
}

// Allow non-empty QUASI_CLOSED replicas if we have minimum required CLOSED replicas
if (replicaState == ContainerReplicaProto.State.QUASI_CLOSED) {
long numClosedReplicas = replicas.stream()
.filter(r -> r.getState() == ContainerReplicaProto.State.CLOSED)
.count();
int minRequiredReplicas = container.getReplicationConfig().getRequiredNodes();

if (numClosedReplicas >= minRequiredReplicas) {
return !targetReplica.isEmpty();
}
}
return false;
}

// Case 2: Container is QUASI_CLOSED
if (containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
// All replicas must be QUASI_CLOSED
boolean allReplicasQuasiClosed = replicas.stream()
.allMatch(r -> r.getState() == ContainerReplicaProto.State.QUASI_CLOSED);
if (!allReplicasQuasiClosed) {
return false;
}
return !targetReplica.isEmpty();
}
return false;
}

/**
* Relaxed version of isContainerHealthyForMove used when includeNonStandardContainers is enabled.
* <p>
* - OVER_REPLICATED containers are also allowed.
*
* @param container container to check
* @param replicas the container's replicas
* @return false if it should not be moved, true otherwise
*/
private boolean isContainerHealthyForMoveRelaxed(ContainerInfo container, Set<ContainerReplica> replicas) {
ContainerHealthResult.HealthState state =
replicationManager.getContainerReplicationHealth(container, replicas).getHealthState();
if (state == ContainerHealthResult.HealthState.HEALTHY) {
return true;
}

// OVER_REPLICATED containers allowed
if (state == ContainerHealthResult.HealthState.OVER_REPLICATED) {
LOG.debug("Container {} is over-replicated but allowed for balancing with " +
"includeNonStandardContainers enabled", container);
return true;
}

LOG.debug("Excluding container {} with replicas {} as its health is {}.", container, replicas, state);
return false;
}

private boolean breaksMaxSizeToMoveLimit(ContainerID containerID,
long usedBytes,
long sizeMovedAlready) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public ContainerBalancerTask(StorageContainerManager scm,
this.moveManager.setMoveTimeout(config.getMoveTimeout().toMillis());
this.moveManager.setReplicationTimeout(
config.getMoveReplicationTimeout().toMillis());
this.moveManager.setIncludeNonStandardContainers(
config.getIncludeNonStandardContainers());
this.delayStart = delayStart;
this.ozoneConfiguration = scm.getConfiguration();
this.containerBalancer = containerBalancer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand Down Expand Up @@ -63,6 +64,7 @@ public final class MoveManager implements
*/
private long moveTimeout = 1000 * 65 * 60;
private long replicationTimeout = 1000 * 50 * 60;
private boolean includeNonStandardContainers = false;

private final ReplicationManager replicationManager;
private final ContainerManager containerManager;
Expand Down Expand Up @@ -195,14 +197,22 @@ CompletableFuture<MoveResult> move(
If the container is under, over, or mis replicated, we should let
replication manager solve these issues first. Fail move for such a
container.

If includeNonStandardContainers is enabled, allow OVER_REPLICATED
containers to be moved by the balancer.
*/
ContainerHealthResult healthBeforeMove =
replicationManager.getContainerReplicationHealth(containerInfo,
currentReplicas);
if (healthBeforeMove.getHealthState() !=
ContainerHealthResult.HealthState.HEALTHY) {
ret.complete(MoveResult.REPLICATION_NOT_HEALTHY_BEFORE_MOVE);
return ret;
ContainerHealthResult.HealthState healthState = healthBeforeMove.getHealthState();

if (healthState != ContainerHealthResult.HealthState.HEALTHY) {
// Allow OVER_REPLICATED if config is enabled
if (!(healthState == ContainerHealthResult.HealthState.OVER_REPLICATED &&
includeNonStandardContainers)) {
ret.complete(MoveResult.REPLICATION_NOT_HEALTHY_BEFORE_MOVE);
return ret;
}
}

/*
Expand All @@ -226,10 +236,15 @@ CompletableFuture<MoveResult> move(
}

// Ensure the container is CLOSED
// If includeNonStandardContainers is enabled and ALL replicas
// are QUASI_CLOSED, allow moving QUASI_CLOSED containers
HddsProtos.LifeCycleState currentContainerStat = containerInfo.getState();
if (currentContainerStat != HddsProtos.LifeCycleState.CLOSED) {
ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
return ret;
// Allow QUASI_CLOSED if config is enabled and all replicas are QUASI_CLOSED
if (!isQuasiClosed(currentContainerStat, currentReplicas)) {
ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
return ret;
}
}

// Create a set or replicas that indicates how the container will look
Expand All @@ -239,10 +254,17 @@ CompletableFuture<MoveResult> move(
src, tgt, currentReplicas);
ContainerHealthResult healthResult = replicationManager
.getContainerReplicationHealth(containerInfo, replicasAfterMove);
if (healthResult.getHealthState()
!= ContainerHealthResult.HealthState.HEALTHY) {
ret.complete(MoveResult.REPLICATION_NOT_HEALTHY_AFTER_MOVE);
return ret;
ContainerHealthResult.HealthState healthAfterMove = healthResult.getHealthState();

if (healthAfterMove != ContainerHealthResult.HealthState.HEALTHY) {
// Allow OVER_REPLICATED after move if config is enabled
// This allows balancing over-replicated containers
// ReplicationManager will handle the excess replica deletion separately
if (!(healthAfterMove == ContainerHealthResult.HealthState.OVER_REPLICATED &&
includeNonStandardContainers)) {
ret.complete(MoveResult.REPLICATION_NOT_HEALTHY_AFTER_MOVE);
return ret;
}
}
startMove(containerInfo, src, tgt, ret);
LOG.debug("Processed a move request for container {}, from {} to {}",
Expand All @@ -251,6 +273,13 @@ CompletableFuture<MoveResult> move(
}
}

private boolean isQuasiClosed(HddsProtos.LifeCycleState lifeCycleState, Set<ContainerReplica> replicas) {
return (lifeCycleState == HddsProtos.LifeCycleState.QUASI_CLOSED &&
includeNonStandardContainers &&
replicas.stream().allMatch(r ->
r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED));
}

/**
* Notify Move Manager that a container op has been completed.
*
Expand Down Expand Up @@ -345,13 +374,17 @@ private void handleSuccessfulAdd(final ContainerID cid)

ContainerHealthResult healthResult = replicationManager
.getContainerReplicationHealth(containerInfo, futureReplicas);
ContainerHealthResult.HealthState futureHealthState = healthResult.getHealthState();

if (healthResult.getHealthState() ==
ContainerHealthResult.HealthState.HEALTHY) {
// Allow deletion if future state is HEALTHY, or if it's OVER_REPLICATED
// and includeNonStandardContainers is enabled (balancer can move over-replicated containers)
if (futureHealthState == ContainerHealthResult.HealthState.HEALTHY ||
(futureHealthState == ContainerHealthResult.HealthState.OVER_REPLICATED &&
includeNonStandardContainers)) {
sendDeleteCommand(containerInfo, src, moveOp.getMoveStartTime());
} else {
LOG.info("Cannot remove source replica as the container health would " +
"be {}", healthResult.getHealthState());
"be {}", futureHealthState);
completeMove(cid, MoveResult.DELETE_FAIL_POLICY);
}
}
Expand Down Expand Up @@ -447,6 +480,10 @@ void setReplicationTimeout(long replicationTimeout) {
this.replicationTimeout = replicationTimeout;
}

void setIncludeNonStandardContainers(boolean includeNonStandardContainers) {
this.includeNonStandardContainers = includeNonStandardContainers;
}

/**
* All details about a move operation.
*/
Expand Down
Loading