diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 1b35423fe93b..e1059f485e8e 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -36,8 +36,6 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.ownership.DataPlacement; -import org.apache.cassandra.utils.FBUtilities; public class DiskBoundaryManager { @@ -143,22 +141,17 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs, IParti private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, ClusterMetadata metadata) { - RangesAtEndpoint localRanges; - DataPlacement placement; - if (StorageService.instance.isBootstrapMode() - && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally + if (StorageService.instance.isBootstrapMode() && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally { - placement = metadata.placements.get(cfs.keyspace.getMetadata().params.replication); + return metadata.localWriteRanges(cfs.keyspace.getMetadata()); } else { // Reason we use the future settled metadata is that if we decommission a node, we want to stream // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled - placement = metadata.writePlacementAllSettled(cfs.keyspace.getMetadata()); + return metadata.localWriteRangesAllSettled(cfs.keyspace.getMetadata()); } - localRanges = placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()); - return localRanges; } /** diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index dc9d9e3f680b..112367997985 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -415,7 +415,7 @@ public List> getLocalRanges(String ks) public Collection> getLocalAndPendingRanges(String ks) { - return ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata()); + return ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata()).ranges(); } public OwnedRanges getNormalizedLocalRanges(String keyspaceName, InetAddressAndPort broadcastAddress) diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java index fb92038cbf9b..660dca619b26 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java @@ -119,7 +119,7 @@ private void finish(Ballot lowBound) private static boolean isOutOfRange(SharedContext ctx, String ksName, Collection> repairRanges) { Keyspace keyspace = Keyspace.open(ksName); - Collection> localRanges = Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), ctx.broadcastAddressAndPort())); + Collection> localRanges = Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), ctx.broadcastAddressAndPort()).ranges()); for (Range repairRange : Range.normalize(repairRanges)) { diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index fa2d8f5ee5f6..f2013acf2b70 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.slf4j.Logger; @@ -52,6 +53,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Locator; import org.apache.cassandra.locator.MetaStrategy; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.CMSIdentifierMismatchException; import org.apache.cassandra.schema.DistributedSchema; @@ -73,7 +75,6 @@ import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator; import org.apache.cassandra.tcm.ownership.ReplicaGroups; @@ -119,7 +120,8 @@ public class ClusterMetadata private EndpointsForRange fullCMSReplicas; private Set fullCMSEndpoints; private Set fullCMSIds; - private DataPlacements writePlacementAllSettled; + private volatile Map localRangesAllSettled = null; + private static final RangesAtEndpoint EMPTY_LOCAL_RANGES = RangesAtEndpoint.empty(FBUtilities.getBroadcastAddressAndPort()); public ClusterMetadata(IPartitioner partitioner) { @@ -317,21 +319,72 @@ public Epoch nextEpoch() return epoch.nextEpoch(); } - public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm) + public RangesAtEndpoint localWriteRangesAllSettled(KeyspaceMetadata ksm) { - if (writePlacementAllSettled == null) + // Local strategy ranges are constant + if (ksm.params.replication.isLocal()) + return localWriteRanges(ksm); + + if (localRangesAllSettled != null) + return localRangesAllSettled.getOrDefault(ksm.params.replication, EMPTY_LOCAL_RANGES); + + ClusterMetadata metadata = this; + NodeId localId = metadata.myNodeId(); + synchronized (this) { - ClusterMetadata metadata = this; - Iterator> iter = metadata.inProgressSequences.iterator(); - while (iter.hasNext()) + if (localRangesAllSettled != null) + return localRangesAllSettled.get(ksm.params.replication); + + Map builder = Maps.newHashMapWithExpectedSize(this.placements.size()); + DataPlacements settled = placementsAllSettledForNode(localId, metadata); + settled.forEach((replication, placement) -> { + builder.put(replication, placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort())); + }); + localRangesAllSettled = builder; + } + return localRangesAllSettled.getOrDefault(ksm.params.replication, EMPTY_LOCAL_RANGES); + } + + /** + * Run through all inflight MultiStepOperations in the supplied metadata and if any impact the specifed node, + * apply their metadata transformations. Only used outside of tests by @{link localWriteRangesAllSettled} to + * identify how placements for the local node will be affected by in flight operations. In that case, the result + * is cached so this should be called at most once for a given ClusterMetadata instance. + */ + @VisibleForTesting + public static DataPlacements placementsAllSettledForNode(NodeId peer, ClusterMetadata metadata) + { + Iterator> iter = metadata.inProgressSequences.iterator(); + while (iter.hasNext()) + { + MultiStepOperation operation = iter.next(); + // Check whether the MSO materially affects the local ranges of the target node. + boolean isRelevantOperation = operationAffectsLocalRangesOfPeer(peer, + operation, + metadata.directory); + if (isRelevantOperation) { - Transformation.Result result = iter.next().applyTo(metadata); + logger.debug("Operation {} affects node {}, calculating local ranges after application", + operation.sequenceKey(), peer); + Transformation.Result result = operation.applyTo(metadata); assert result.isSuccess(); metadata = result.success().metadata; } - writePlacementAllSettled = metadata.placements; } - return writePlacementAllSettled.get(ksm.params.replication); + return metadata.placements; + } + + public static boolean operationAffectsLocalRangesOfPeer(NodeId peer, + MultiStepOperation operation, + Directory directory) + { + return operation.affectedPeers(directory).contains(peer); + } + + @VisibleForTesting + public void unsafeClearLocalRangesAllSettled() + { + localRangesAllSettled = null; } // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges @@ -352,14 +405,14 @@ public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endp return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint)); } - public Collection> localWriteRanges(KeyspaceMetadata metadata) + public RangesAtEndpoint localWriteRanges(KeyspaceMetadata metadata) { return writeRanges(metadata, FBUtilities.getBroadcastAddressAndPort()); } - public Collection> writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer) + public RangesAtEndpoint writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer) { - return placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges(); + return placements.get(metadata.params.replication).writes.byEndpoint().get(peer); } // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java index 7083f27b648e..03164243b7a0 100644 --- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java +++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java @@ -19,9 +19,17 @@ package org.apache.cassandra.tcm; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; @@ -57,6 +65,8 @@ */ public abstract class MultiStepOperation { + private static final Logger logger = LoggerFactory.getLogger(MultiStepOperation.class); + public enum Kind { @Deprecated(since = "CEP-21") @@ -155,6 +165,39 @@ public boolean finishDuringStartup() */ public abstract Transformation.Result applyTo(ClusterMetadata metadata); + /** + * Return the id of any peer known to be involved in the execution of the operation. + * For example, in the case of a new node bootstrapping this would include all current and proposed replicas of the + * affected ranges. + * Important: this currently requires a Directory to be supplied as many MSO implementations are endpoint-centric + * The directory is used to convert endpoints to node ids, but this will become unnecessary as placements & deltas + * evolve away from endpoints to use ids directly. + * @return Node ids of the peers involved in the operation + */ + public abstract Set affectedPeers(Directory directory); + + /** + * Helper method for affectedPeers implementations to convert from endpoints to node ids + * @return set of node ids which map to the supplied endpoints using the directory. Any endpoints without a + * corresponding id are ignored + */ + protected Set endpointsToIds(Set endpoints, Directory directory) + { + Set affectedNodes = Sets.newHashSetWithExpectedSize(endpoints.size()); + for (InetAddressAndPort endpoint : endpoints) + { + NodeId id = directory.peerId(endpoint); + // TODO should we error here? + if (id == null) + logger.warn("No node id found for endpoint {} in directory with epoch {} " + + "by MultiStepOperation {} with sequence key {}", + endpoint, directory.lastModified().getEpoch(), kind(), sequenceKey()); + else + affectedNodes.add(id); + } + return affectedNodes; + } + /** * Helper method for the standard applyTo implementations where we just execute a list of transformations, starting at `next` * @return diff --git a/src/java/org/apache/cassandra/tcm/ownership/Delta.java b/src/java/org/apache/cassandra/tcm/ownership/Delta.java index 9660bd255a81..38fcdfde9651 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/Delta.java +++ b/src/java/org/apache/cassandra/tcm/ownership/Delta.java @@ -19,10 +19,13 @@ package org.apache.cassandra.tcm.ownership; import java.io.IOException; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.RangesByEndpoint; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; @@ -52,6 +55,13 @@ public Delta onlyRemovals() return new Delta(removals, RangesByEndpoint.EMPTY); } + public Set allEndpoints() + { + Set endpoints = new HashSet<>(removals.keySet()); + endpoints.addAll(additions.keySet()); + return endpoints; + } + /** * Merges this delta with `other` * diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java index 6ba80a854bdd..760cd0243f55 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.serialization.MetadataSerializer; @@ -152,6 +155,14 @@ public PlacementDelta onlyRemovals() return new PlacementDelta(reads.onlyRemovals(), writes.onlyRemovals()); } + // TODO deltas (& placements in general) should deal in node ids, not endpoints. + public Set affectedEndpoints() + { + Set affectedEndpoints = new HashSet<>(reads.allEndpoints()); + affectedEndpoints.addAll(writes.allEndpoints()); + return affectedEndpoints; + } + public DataPlacement apply(Epoch epoch, DataPlacement placement) { DataPlacement.Builder builder = placement.unbuild(); diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java index 0d5ee2f06710..ec516854a034 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java @@ -35,6 +35,7 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.MetadataSerializer; @@ -125,6 +126,12 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return finishJoin.execute(metadata); } + @Override + public Set affectedPeers(Directory directory) + { + return Set.of(); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index b410097f84a8..d2909bd8737b 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -56,6 +57,7 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -185,6 +187,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startJoin, midJoin, finishJoin)); } + @Override + public Set affectedPeers(Directory directory) + { + Set affectedEndpoints = new HashSet<>(); + affectedEndpoints.addAll(startJoin.affectedEndpoints()); + affectedEndpoints.addAll(midJoin.affectedEndpoints()); + affectedEndpoints.addAll(finishJoin.affectedEndpoints()); + return endpointsToIds(affectedEndpoints, directory); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java index cb34179d4456..32afe234916d 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java @@ -53,6 +53,7 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -180,6 +181,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startReplace, midReplace, finishReplace)); } + @Override + public Set affectedPeers(Directory directory) + { + Set affectedEndpoints = new HashSet<>(); + affectedEndpoints.addAll(startReplace.affectedEndpoints()); + affectedEndpoints.addAll(midReplace.affectedEndpoints()); + affectedEndpoints.addAll(finishReplace.affectedEndpoints()); + return endpointsToIds(affectedEndpoints, directory); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java index 0a3512296aef..c95fe94435a1 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java +++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; @@ -39,6 +40,8 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; @@ -129,6 +132,12 @@ public Transformation.Kind nextStep() return FINISH_DROP_ACCORD_TABLE; } + @Override + public Set affectedPeers(Directory directory) + { + return Set.of(); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index a26d6ded4500..a7f316e38fac 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.RangesByEndpoint; import org.apache.cassandra.locator.Replica; @@ -61,6 +62,7 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.DataPlacements; @@ -194,6 +196,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startMove, midMove, finishMove)); } + @Override + public Set affectedPeers(Directory directory) + { + Set affectedEndpoints = new HashSet<>(); + affectedEndpoints.addAll(startMove.affectedEndpoints()); + affectedEndpoints.addAll(midMove.affectedEndpoints()); + affectedEndpoints.addAll(finishMove.affectedEndpoints()); + return endpointsToIds(affectedEndpoints, directory); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java index ce2daf6b4670..39613ce7b86d 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java @@ -63,6 +63,7 @@ import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Retry; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.MovementMap; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; @@ -145,6 +146,12 @@ public MetadataSerializer keySerialize return next.kind(); } + @Override + public Set affectedPeers(Directory directory) + { + return Set.of(); + } + @Override public Transformation.Result applyTo(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java index 83d543a8dda1..da8ae74e6a34 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java +++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java @@ -19,7 +19,9 @@ package org.apache.cassandra.tcm.sequences; import java.io.IOException; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutionException; import com.google.common.annotations.VisibleForTesting; @@ -31,12 +33,14 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; @@ -156,6 +160,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata) return applyMultipleTransformations(metadata, next, of(startLeave, midLeave, finishLeave)); } + @Override + public Set affectedPeers(Directory directory) + { + Set affectedEndpoints = new HashSet<>(); + affectedEndpoints.addAll(startLeave.affectedEndpoints()); + affectedEndpoints.addAll(midLeave.affectedEndpoints()); + affectedEndpoints.addAll(finishLeave.affectedEndpoints()); + return endpointsToIds(affectedEndpoints, directory); + } + @Override public SequenceState executeNext() { diff --git a/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java b/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java index 2a1eaba7e0c4..10f48609d430 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java +++ b/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java @@ -19,11 +19,14 @@ package org.apache.cassandra.tcm.transformations; import java.io.IOException; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import org.apache.cassandra.exceptions.ExceptionCode; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.NodeId; @@ -60,6 +63,13 @@ public PlacementDeltas delta() return delta; } + public Set affectedEndpoints() + { + Set affectedEndpoints = new HashSet<>(); + delta.forEach((replication, d) -> affectedEndpoints.addAll(d.affectedEndpoints())); + return affectedEndpoints; + } + @Override public final Result execute(ClusterMetadata prev) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 063f94857cfc..1598c5ca8974 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -1018,6 +1018,12 @@ public static PrepareLeave prepareLeave(int idx) { return prepareLeave(nodeId(idx)); } + + public static PrepareLeave prepareLeave(int idx, boolean force) + { + return new PrepareLeave(nodeId(idx), force, new UniformRangePlacement(), LeaveStreams.Kind.UNBOOTSTRAP); + } + public static PrepareLeave prepareLeave(NodeId nodeId) { return new PrepareLeave(nodeId, @@ -1026,10 +1032,15 @@ public static PrepareLeave prepareLeave(NodeId nodeId) LeaveStreams.Kind.UNBOOTSTRAP); } + public static PrepareMove prepareMove(int idx, Token newToken) + { + return prepareMove(nodeId(idx), newToken); + } + public static PrepareMove prepareMove(NodeId id, Token newToken) { return new PrepareMove(id, - Collections.singleton(Murmur3Partitioner.instance.getRandomToken()), + Collections.singleton(newToken), new UniformRangePlacement(), false); } @@ -1103,6 +1114,11 @@ public static BootstrapAndReplace getReplacePlan(NodeId nodeId, ClusterMetadata return (BootstrapAndReplace) metadata.inProgressSequences.get(nodeId); } + public static Move getMovePlan(int peer) + { + return getMovePlan(addr(peer)); + } + public static Move getMovePlan(InetAddressAndPort addr) { return getMovePlan(ClusterMetadata.current().directory.peerId(addr)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java index 82e864b3ef2d..8223ca471f19 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java @@ -53,6 +53,7 @@ import org.apache.cassandra.locator.CMSPlacementStrategy; import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.ReplicationParams; @@ -66,6 +67,7 @@ import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.OwnershipUtils; import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.tcm.transformations.Register; @@ -930,14 +932,26 @@ public void testPlacementsAllSettled() throws Throwable state = SimulatedOperation.leave(sut, state, toLeave); KeyspaceMetadata ksm = sut.service.metadata().schema.getKeyspaces().get("test").get(); - DataPlacement allSettled = sut.service.metadata().writePlacementAllSettled(ksm); + // Calculate the full set of data placements when all in flight operations have been completed + DataPlacement allSettled = OwnershipUtils.placementsAllSettled(sut.service.metadata()).get(ksm.params.replication); Assert.assertEquals(4, state.inFlightOperations.size()); // make sure none was rejected while (!state.inFlightOperations.isEmpty()) { state = state.inFlightOperations.get(random.nextInt(state.inFlightOperations.size())).advance(state); - Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().writePlacementAllSettled(ksm))); + // for every node, ask ClusterMetadata for local ranges after all operations are complete. These + // should not change as we progress + for (Node n : state.currentNodes) + { + RangesAtEndpoint localRanges = ClusterMetadata.placementsAllSettledForNode(n.nodeId(), sut.service.metadata()) + .get(ksm.params.replication) + .writes + .byEndpoint() + .get(n.addr()); + Assert.assertEquals(localRanges, allSettled.writes.byEndpoint().get(n.addr())); + } validatePlacements(sut, state); } + // Finally verify that the predicted placements match the actual ones Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().placements.get(ksm.params.replication))); } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java b/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java new file mode 100644 index 000000000000..8b5917b420e8 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.FileInputStream; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.ownership.OwnershipUtils; +import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; +import org.apache.cassandra.utils.FBUtilities; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +public class LocalRangesAllSettledBench +{ + static ClusterMetadata metadata; + @Setup(Level.Trial) + public void setup() throws Exception + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + ClusterMetadataService.setInstance(ClusterMetadataTestHelper.syncInstanceForTest()); + metadata = metadata(); + } + + @Benchmark + public void benchLocalRangesOnlyWithRelevantMSOs() + { + Map settledByKeyspace = new HashMap<>(); + metadata.unsafeClearLocalRangesAllSettled(); + // This peer is involved in a MOVE operation + InetAddressAndPort local = InetAddressAndPort.getByNameUnchecked("10.10.9.129:7000"); + FBUtilities.setBroadcastInetAddressAndPort(local); + for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces()) + settledByKeyspace.put(ksm, metadata.localWriteRangesAllSettled(ksm)); + } + + @Benchmark + public void benchLocalRangesOnlyNoRelevantMSOs() + { + Map settledByKeyspace = new HashMap<>(); + metadata.unsafeClearLocalRangesAllSettled(); + // This peer has no involvement in any in-flight MSOs + InetAddressAndPort local = InetAddressAndPort.getByNameUnchecked("10.10.14.13:7000"); + FBUtilities.setBroadcastInetAddressAndPort(local); + for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces()) + settledByKeyspace.put(ksm, metadata.localWriteRangesAllSettled(ksm)); + } + + @Benchmark + public void benchPlacementsAllSettled() + { + // Emulates the previous implementation of ClusterMetadata::writePlacementsAllSettled + // which would be lazily computed during on first access. + // As this fully applies all in-flight MSOs to derive the final settled placements, + // the local broadcast address is not significant. + DataPlacements placementAllSettled = OwnershipUtils.placementsAllSettled(metadata); + Map settledByKeyspace = new HashMap<>(); + for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces()) + { + DataPlacement placement = placementAllSettled.get(ksm.params.replication); + settledByKeyspace.put(ksm, placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort())); + } + } + + public ClusterMetadata metadata() throws Exception + { + Path p = Path.of(this.getClass().getClassLoader().getResource("cluster_metadata/CASSANDRA-21144_clustermetadata.gz").toURI()); + try (DataInputStreamPlus in = Util.DataInputStreamPlusImpl.wrap(new GZIPInputStream(new FileInputStream(p.toFile())))) + { + ClusterMetadata metadata = VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in); + return metadata; + } + } + + public static void main(String[] args) throws Exception + { + Options options = new OptionsBuilder() + .include(LocalRangesAllSettledBench.class.getSimpleName()) + .build(); + new Runner(options).run(); + } +/* +$ ant microbench -Dbenchmark.name=LocalRangesAllSettledBench + + [java] Benchmark Mode Cnt Score Error Units + [java] LocalRangesAllSettledBench.benchLocalRangesOnlyNoRelevantMSOs avgt 5 18.214 ± 4.350 ms/op + [java] LocalRangesAllSettledBench.benchLocalRangesOnlyWithRelevantMSOs avgt 5 274.931 ± 14.193 ms/op + [java] LocalRangesAllSettledBench.benchPlacementsAllSettled avgt 5 11465.778 ± 370.754 ms/op + + */ +} diff --git a/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz b/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz new file mode 100644 index 000000000000..0bc749ed5b95 Binary files /dev/null and b/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz differ diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java b/test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java similarity index 62% rename from test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java rename to test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java index ac3a278cf238..785053f9aec6 100644 --- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java +++ b/test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java @@ -30,17 +30,12 @@ import org.apache.cassandra.distributed.test.log.CMSTestBase; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.harry.model.TokenPlacementModel; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.SchemaTransformation; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; -import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.LockedRanges; -import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.tcm.transformations.Assassinate; @@ -48,11 +43,10 @@ import org.apache.cassandra.utils.CassandraVersion; import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr; -import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class ClusterMetadataTest +public class NewTransformationVersionCompatibilityTest { @BeforeClass public static void beforeClass() @@ -67,61 +61,6 @@ public void before() throws ExecutionException, InterruptedException new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, new TokenPlacementModel.SimpleReplicationFactor(3)); } - @Test - public void testWritePlacementAllSettledLeaving() - { - for (int i = 1; i <= 4; i++) - { - ClusterMetadataTestHelper.register(i); - ClusterMetadataTestHelper.join(i, i); - } - ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareLeave(3)); - UnbootstrapAndLeave plan = getLeavePlan(3); - - ClusterMetadataService.instance().commit(plan.startLeave); - KeyspaceMetadata ksm = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3)); - - DataPlacement writeAllSettled = ClusterMetadata.current().writePlacementAllSettled(ksm); - ClusterMetadataService.instance().commit(plan.midLeave); - ClusterMetadataService.instance().commit(plan.finishLeave); - - DataPlacement actualFinishedWritePlacements = ClusterMetadata.current().placements.get(ksm.params.replication); - - assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.removals.isEmpty()); - assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.additions.isEmpty()); - } - - @Test - public void testWritePlacementAllSettledJoining() - { - for (int i = 1; i <= 4; i++) - { - ClusterMetadataTestHelper.register(i); - ClusterMetadataTestHelper.join(i, i); - } - - ClusterMetadataTestHelper.register(10); - ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareJoin(10)); - - BootstrapAndJoin plan = ClusterMetadataTestHelper.getBootstrapPlan(10); - ClusterMetadataService.instance().commit(plan.startJoin); - KeyspaceMetadata ksm = KeyspaceMetadata.create("ks", KeyspaceParams.simple(3)); - DataPlacement writeAllSettled = ClusterMetadata.current().writePlacementAllSettled(ksm); - - ClusterMetadataService.instance().commit(plan.midJoin); - ClusterMetadataService.instance().commit(plan.finishJoin); - - DataPlacement actualFinishedWritePlacements = ClusterMetadata.current().placements.get(ksm.params.replication); - assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.removals.isEmpty()); - assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.additions.isEmpty()); - } - - @Test - public void testWritePlacementAllSettledMoving() - { - // todo - } - @Test public void testNewTransformationCommit() { diff --git a/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java b/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java new file mode 100644 index 000000000000..f9e8104ca9a0 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.CMSTestBase; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.harry.model.TokenPlacementModel; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.tcm.AtomicLongBackedProcessor; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; +import org.apache.cassandra.tcm.sequences.Move; +import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr; +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan; +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getMovePlan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class LocalRangesAllSettledTest +{ + private static final Logger logger = LoggerFactory.getLogger(LocalRangesAllSettledTest.class); + + private static final int[] INITIAL_NODES = new int[]{1, 2, 3, 4}; + + @BeforeClass + public static void beforeClass() + { + ServerTestUtils.prepareServerNoRegister(); + } + + @Before + public void before() throws ExecutionException, InterruptedException + { + ClusterMetadataService.unsetInstance(); + new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, new TokenPlacementModel.SimpleReplicationFactor(3)); + + // Join the first 4 nodes + for (int i : INITIAL_NODES) + { + ClusterMetadataTestHelper.register(i, "dc" + i % 3, "rack0"); + ClusterMetadataTestHelper.join(i, i); + } + + // Create keyspaces with various replication settings + for (int i = 1; i <= 3; i++) + { + ClusterMetadataTestHelper.createKeyspace("simple_" + i, KeyspaceParams.simple(i)); + ClusterMetadataTestHelper.createKeyspace("nts_" + i, KeyspaceParams.nts("dc0", i, "dc1", i, "dc2", i)); + } + + } + + @Test + public void testLeaving() + { + // Verify proposed ranges without any in flight operations + AllLocalRanges initial = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES); + AllLocalRanges proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES); + assertEquals(initial, proposed); + // Check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, initial, INITIAL_NODES); + + // Initiate an operation which affects ownership. This will add the MultiStepOperation which encodes any + // necessary range movements so subsequent calls to ClusterMetadata::localRangesAllSettled + // should return the expected local ranges after the operation has completed + // pick a random node to leave (but not the CMS node (1), for simplicity's sake) + int leaving = INITIAL_NODES[Math.max(1, new Random().nextInt(4))]; + logger.info("Selected node {} to leave", leaving); + ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareLeave(leaving, true)); + proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES); + assertNotEquals(initial, proposed); + + // Step through execution of the MSO, verifying after each step that the proposed ranges don't change + UnbootstrapAndLeave plan = getLeavePlan(leaving); + ClusterMetadataService.instance().commit(plan.startLeave); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + ClusterMetadataService.instance().commit(plan.midLeave); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + ClusterMetadataService.instance().commit(plan.finishLeave); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + + // Verify that the final local ranges match what was proposed + AllLocalRanges finalized = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES); + assertEquals(proposed, finalized); + + // Finally, check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, finalized, INITIAL_NODES); + } + + @Test + public void testJoining() + { + // Verify proposed ranges without any in flight operations + AllLocalRanges initial = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES); + AllLocalRanges proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES); + assertEquals(initial, proposed); + // Check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, initial, INITIAL_NODES); + + // Initiate an operation which affects ownership. This will add the MultiStepOperation which encodes any + // necessary range movements so subsequent calls to ClusterMetadata::localRangesAllSettled + // should return the expected local ranges after the operation has completed + int newNode = 10; + ClusterMetadataTestHelper.register(newNode); + int[] expandedNodes = Arrays.copyOf(INITIAL_NODES, INITIAL_NODES.length + 1); + expandedNodes[expandedNodes.length - 1] = newNode; + ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareJoin(newNode)); + proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes); + assertNotEquals(initial, proposed); + + // Step through execution of the MSO, verifying after each step that the proposed ranges don't change + BootstrapAndJoin plan = ClusterMetadataTestHelper.getBootstrapPlan(newNode); + ClusterMetadataService.instance().commit(plan.startJoin); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes)); + ClusterMetadataService.instance().commit(plan.midJoin); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes)); + ClusterMetadataService.instance().commit(plan.finishJoin); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes)); + + // Verify that the final local ranges match what was proposed + AllLocalRanges finalized = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, expandedNodes); + assertEquals(proposed, finalized); + + // Finally, check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, finalized, expandedNodes); + } + + @Test + public void testMoving() + { + // Verify proposed ranges without any in flight operations + AllLocalRanges initial = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES); + AllLocalRanges proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES); + assertEquals(initial, proposed); + // Check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, initial, INITIAL_NODES); + + // Initiate an operation which affects ownership. This will add the MultiStepOperation which encodes any + // necessary range movements so subsequent calls to ClusterMetadata::localRangesAllSettled + // should return the expected local ranges after the operation has completed + // pick a random node to leave (but not the CMS node (1), for simplicity's sake) + int moving = INITIAL_NODES[Math.max(1, new Random().nextInt(4))]; + Token newToken = ClusterMetadata.current().partitioner.getRandomToken(); + while (ClusterMetadata.current().tokenMap.tokens().contains(newToken)) + newToken = ClusterMetadata.current().partitioner.getRandomToken(); + logger.info("Selected node {} to move to token {} ", moving, newToken); + ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareMove(moving, newToken)); + proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES); + assertNotEquals(initial, proposed); + + // Step through execution of the MSO, verifying after each step that the proposed ranges don't change + Move plan = getMovePlan(moving); + ClusterMetadataService.instance().commit(plan.startMove); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + ClusterMetadataService.instance().commit(plan.midMove); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + ClusterMetadataService.instance().commit(plan.finishMove); + assertEquals(proposed, snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES)); + + // Verify that the final local ranges match what was proposed + AllLocalRanges finalized = snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES); + assertEquals(proposed, finalized); + + // Finally, check against the actual write placements + assertLocalRangesMatchPlacements(ClusterMetadata.current().placements, finalized, INITIAL_NODES); + } + + private void assertLocalRangesMatchPlacements(DataPlacements placements, + AllLocalRanges allLocalRanges, + int... nodes) + { + for (int id : nodes) + { + RangeSetMap localRanges = allLocalRanges.get(id); + InetAddressAndPort endpoint = addr(id); + placements.forEach((replication, placement) -> { + Set> ranges = localRanges.get(replication); + Set> fromPlacement = placement.writes.byEndpoint().get(endpoint).ranges(); + assertEquals(ranges, fromPlacement); + }); + } + } + + private enum LocalRangeStatus { CURRENT, SETTLED } + private static AllLocalRanges snapshotAllLocalRanges(LocalRangeStatus status, int... nodes) + { + InetAddressAndPort realLocalAddress = FBUtilities.getBroadcastAddressAndPort(); + AllLocalRanges snapshot = new AllLocalRanges(); + ClusterMetadata metadata = ClusterMetadata.current(); + for (int id : nodes) + { + InetAddressAndPort address = addr(id); + // clear cached settled local ranges + metadata.unsafeClearLocalRangesAllSettled(); + // temporarily set broadcast address to infer which node is "local" + FBUtilities.setBroadcastInetAddressAndPort(address); + RangeSetMap.Builder localRanges = RangeSetMap.builder(); + for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces()) + { + Set> ranges = status == LocalRangeStatus.SETTLED + ? metadata.localWriteRangesAllSettled(ksm).ranges() + : metadata.localWriteRanges(ksm).ranges(); + localRanges.put(ksm.params.replication, ranges); + } + snapshot.put(id, localRanges.build()); + } + // restore local address + FBUtilities.setBroadcastInetAddressAndPort(realLocalAddress); + metadata.unsafeClearLocalRangesAllSettled(); + return snapshot; + } + + // A snapshot of the local ranges for each replication setting for each node + private static class AllLocalRanges + { + Map localWriteRanges = new HashMap<>(); + + void put(int nodeId, RangeSetMap ranges) + { + localWriteRanges.put(nodeId, ranges); + } + + RangeSetMap get(int nodeId) + { + return localWriteRanges.get(nodeId); + } + + public final boolean equals(Object o) + { + if (!(o instanceof AllLocalRanges)) return false; + + return Objects.equals(localWriteRanges, ((AllLocalRanges)o).localWriteRanges); + } + + public int hashCode() + { + return Objects.hashCode(localWriteRanges); + } + + public String toString() + { + return "AllLocalRanges{" + + "localWriteRanges=" + localWriteRanges + + '}'; + } + } + +} diff --git a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java index 0f29c460996a..897dea5e5eea 100644 --- a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java +++ b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; @@ -41,7 +42,10 @@ import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MultiStepOperation; +import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.broadcastAddress; @@ -239,4 +243,17 @@ public static Set randomTokens(int numTokens, IPartitioner partitioner, R tokens.add(partitioner.getRandomToken(random)); return tokens; } + + public static DataPlacements placementsAllSettled(ClusterMetadata metadata) + { + ClusterMetadata workingMetadata = metadata; + Iterator> iter = metadata.inProgressSequences.iterator(); + while (iter.hasNext()) + { + Transformation.Result result = iter.next().applyTo(workingMetadata); + assert result.isSuccess(); + workingMetadata = result.success().metadata; + } + return workingMetadata.placements; + } } diff --git a/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java b/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java new file mode 100644 index 000000000000..a7357ebd9071 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Maps; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.ReplicationParams; + +public class RangeSetMap extends ReplicationMap>> +{ + public RangeSetMap() + { + super(new HashMap<>()); + } + + public RangeSetMap(Map>> map) + { + super(map); + } + + @Override + protected Set> defaultValue() + { + return Set.of(); + } + + @Override + protected Set> localOnly() + { + throw new UnsupportedOperationException(); + } + + public Builder unbuild() + { + return new Builder(map); + } + + public void clear() + { + map.clear(); + } + + public String toString() + { + return "RangeSetMap{" + + "map=" + map + + '}'; + } + + public static Builder builder() + { + return new Builder(new HashMap<>()); + } + + public static Builder builder(int expectedSize) + { + return new Builder(Maps.newHashMapWithExpectedSize(expectedSize)); + } + + public static class Builder + { + private final Map>> map; + private Builder(Map>> map) + { + this.map = map; + } + + public Builder put(ReplicationParams params, Set> ranges) + { + map.put(params, ranges); + return this; + } + + public RangeSetMap build() + { + return new RangeSetMap(map); + } + } +} diff --git a/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java b/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java index 470198cd5bbf..364b212f6c5e 100644 --- a/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java +++ b/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java @@ -96,11 +96,11 @@ public void e2e() cms.commit(new PrepareDropAccordTable(table)); - // This is only here because "applyTo" is not touched without it... - for (KeyspaceMetadata ks : cms.metadata().schema.getKeyspaces()) - cms.metadata().writePlacementAllSettled(ks); - Assertions.assertThat(cms.metadata().inProgressSequences.isEmpty()).isFalse(); + MultiStepOperation operation = cms.metadata().inProgressSequences.get(table); + Assertions.assertThat(operation).isNotNull(); + Assertions.assertThat(operation.kind() == MultiStepOperation.Kind.DROP_ACCORD_TABLE).isTrue(); + InProgressSequences.finishInProgressSequences(table); Assertions.assertThat(cms.metadata().inProgressSequences.isEmpty()).isTrue();