diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 1c7e7cd0419e2..dd928dd911304 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -208,19 +208,6 @@ public Coordinator( this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings); this.electionStrategy = electionStrategy; - this.joinHelper = new JoinHelper( - settings, - allocationService, - clusterManagerService, - transportService, - this::getCurrentTerm, - this::getStateForClusterManagerService, - this::handleJoinRequest, - this::joinLeaderInTerm, - this.onJoinValidators, - rerouteService, - nodeHealthService - ); this.persistedStateSupplier = persistedStateSupplier; this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); @@ -244,6 +231,20 @@ public Coordinator( new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver ); + this.joinHelper = new JoinHelper( + settings, + allocationService, + clusterManagerService, + transportService, + this::getCurrentTerm, + this::getStateForClusterManagerService, + this::handleJoinRequest, + this::joinLeaderInTerm, + this.onJoinValidators, + rerouteService, + nodeHealthService, + peerFinder.nodeCommissionedListener() + ); this.publicationHandler = new PublicationTransportHandler( transportService, namedWriteableRegistry, @@ -1438,6 +1439,11 @@ private void startElectionScheduler() { public void run() { synchronized (mutex) { if (mode == Mode.CANDIDATE) { + if(peerFinder.localNodeDecommissioned()) { + logger.debug("skip prevoting as local node is decommissioned"); + return; + } + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); if (localNodeMayWinElection(lastAcceptedState) == false) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 656e6d220720f..9e30b9a3f7ee3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; @@ -57,6 +58,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportRequest; @@ -113,6 +115,9 @@ public class JoinHelper { private final TimeValue joinTimeout; // only used for Zen1 joining private final NodeHealthService nodeHealthService; + public boolean isDecommissioned; + private final ActionListener nodeCommissionedListener; + private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); private final AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); @@ -130,12 +135,14 @@ public class JoinHelper { Function joinLeaderInTerm, Collection> joinValidators, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + ActionListener nodeCommissionedListener ) { this.clusterManagerService = clusterManagerService; this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); + this.nodeCommissionedListener = nodeCommissionedListener; this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) { private final long term = currentTermSupplier.getAsLong(); @@ -343,11 +350,22 @@ public void handleResponse(Empty response) { logger.debug("successfully joined {} with {}", destination, joinRequest); lastFailedJoinAttempt.set(null); onCompletion.run(); + if (isDecommissioned) { + isDecommissioned = false; + nodeCommissionedListener.onResponse(null); + } } @Override public void handleException(TransportException exp) { pendingOutgoingJoins.remove(dedupKey); + if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) { + logger.info("local node is decommissioned. Will not be able to join the cluster"); + if (!isDecommissioned) { + isDecommissioned = true; + nodeCommissionedListener.onFailure(exp); + } + } logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); attempt.logNow(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1665614c18496..54579031aac08 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) { PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/discovery/PeerFinder.java b/server/src/main/java/org/opensearch/discovery/PeerFinder.java index a601a6fbe4d82..0cd7169ff191e 100644 --- a/server/src/main/java/org/opensearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/opensearch/discovery/PeerFinder.java @@ -84,6 +84,14 @@ public abstract class PeerFinder { Setting.Property.NodeScope ); + // the time between attempts to find all peers when node is in decommissioned state, default set to 2 minutes + public static final Setting DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting( + "discovery.find_peers_interval_during_decommission", + TimeValue.timeValueMinutes(2L), + TimeValue.timeValueMillis(1000), + Setting.Property.NodeScope + ); + public static final Setting DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting( "discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), @@ -91,7 +99,8 @@ public abstract class PeerFinder { Setting.Property.NodeScope ); - private final TimeValue findPeersInterval; + private final Settings settings; + private TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; private final Object mutex = new Object(); @@ -101,6 +110,7 @@ public abstract class PeerFinder { private volatile long currentTerm; private boolean active; + private boolean localNodeDecommissioned = false; private DiscoveryNodes lastAcceptedNodes; private final Map peersByAddress = new LinkedHashMap<>(); private Optional leader = Optional.empty(); @@ -112,6 +122,7 @@ public PeerFinder( TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver ) { + this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; @@ -128,6 +139,32 @@ public PeerFinder( ); } + public ActionListener nodeCommissionedListener() { + return new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.info("setting findPeersInterval to [{}], due to recommissioning", findPeersInterval); + assert localNodeDecommissioned; // TODO: Do we need this? + localNodeDecommissioned = false; + findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); + + } + + @Override + public void onFailure(Exception e) { + logger.info("setting findPeersInterval to [{}], due to decommissioning", + DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings)); + assert !localNodeDecommissioned; + localNodeDecommissioned = true; + findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings); + } + }; + } + + public boolean localNodeDecommissioned() { + return localNodeDecommissioned; + } + public void activate(final DiscoveryNodes lastAcceptedNodes) { logger.trace("activating with {}", lastAcceptedNodes); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index a3c945cdbac3a..50e18f25aad5b 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.Level; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; @@ -55,6 +56,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import static org.mockito.Mockito.mock; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -90,7 +92,8 @@ public void testJoinDeduplication() { startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - () -> new StatusInfo(HEALTHY, "info") + () -> new StatusInfo(HEALTHY, "info"), + mock(ActionListener.class) ); transportService.start(); @@ -230,7 +233,8 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - null + null, + mock(ActionListener.class) ); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); @@ -284,7 +288,8 @@ public void testJoinFailureOnUnhealthyNodes() { startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - () -> nodeHealthServiceStatus.get() + () -> nodeHealthServiceStatus.get(), + mock(ActionListener.class) ); transportService.start();