Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/java/org/apache/cassandra/db/DiskBoundaryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.utils.FBUtilities;

public class DiskBoundaryManager
{
Expand Down Expand Up @@ -143,22 +141,17 @@ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs, IParti

private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, ClusterMetadata metadata)
{
RangesAtEndpoint localRanges;
DataPlacement placement;
if (StorageService.instance.isBootstrapMode()
&& !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally
if (StorageService.instance.isBootstrapMode() && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally
{
placement = metadata.placements.get(cfs.keyspace.getMetadata().params.replication);
return metadata.localWriteRanges(cfs.keyspace.getMetadata());
}
else
{
// Reason we use the future settled metadata is that if we decommission a node, we want to stream
// from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
// We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
placement = metadata.writePlacementAllSettled(cfs.keyspace.getMetadata());
return metadata.localWriteRangesAllSettled(cfs.keyspace.getMetadata());
}
localRanges = placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort());
return localRanges;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public List<Range<Token>> getLocalRanges(String ks)

public Collection<Range<Token>> getLocalAndPendingRanges(String ks)
{
return ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata());
return ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata()).ranges();
}

public OwnedRanges getNormalizedLocalRanges(String keyspaceName, InetAddressAndPort broadcastAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void finish(Ballot lowBound)
private static boolean isOutOfRange(SharedContext ctx, String ksName, Collection<Range<Token>> repairRanges)
{
Keyspace keyspace = Keyspace.open(ksName);
Collection<Range<Token>> localRanges = Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), ctx.broadcastAddressAndPort()));
Collection<Range<Token>> localRanges = Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(), ctx.broadcastAddressAndPort()).ranges());

for (Range<Token> repairRange : Range.normalize(repairRanges))
{
Expand Down
79 changes: 66 additions & 13 deletions src/java/org/apache/cassandra/tcm/ClusterMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
Expand All @@ -52,6 +53,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.CMSIdentifierMismatchException;
import org.apache.cassandra.schema.DistributedSchema;
Expand All @@ -73,7 +75,6 @@
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
Expand Down Expand Up @@ -119,7 +120,8 @@ public class ClusterMetadata
private EndpointsForRange fullCMSReplicas;
private Set<InetAddressAndPort> fullCMSEndpoints;
private Set<NodeId> fullCMSIds;
private DataPlacements writePlacementAllSettled;
private volatile Map<ReplicationParams, RangesAtEndpoint> localRangesAllSettled = null;
private static final RangesAtEndpoint EMPTY_LOCAL_RANGES = RangesAtEndpoint.empty(FBUtilities.getBroadcastAddressAndPort());

public ClusterMetadata(IPartitioner partitioner)
{
Expand Down Expand Up @@ -317,21 +319,72 @@ public Epoch nextEpoch()
return epoch.nextEpoch();
}

public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
public RangesAtEndpoint localWriteRangesAllSettled(KeyspaceMetadata ksm)
{
if (writePlacementAllSettled == null)
// Local strategy ranges are constant
if (ksm.params.replication.isLocal())
return localWriteRanges(ksm);

if (localRangesAllSettled != null)
return localRangesAllSettled.getOrDefault(ksm.params.replication, EMPTY_LOCAL_RANGES);

ClusterMetadata metadata = this;
NodeId localId = metadata.myNodeId();
synchronized (this)
{
ClusterMetadata metadata = this;
Iterator<MultiStepOperation<?>> iter = metadata.inProgressSequences.iterator();
while (iter.hasNext())
if (localRangesAllSettled != null)
return localRangesAllSettled.get(ksm.params.replication);

Map<ReplicationParams, RangesAtEndpoint> builder = Maps.newHashMapWithExpectedSize(this.placements.size());
DataPlacements settled = placementsAllSettledForNode(localId, metadata);
settled.forEach((replication, placement) -> {
builder.put(replication, placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()));
});
localRangesAllSettled = builder;
}
return localRangesAllSettled.getOrDefault(ksm.params.replication, EMPTY_LOCAL_RANGES);
}

/**
* Run through all inflight MultiStepOperations in the supplied metadata and if any impact the specifed node,
* apply their metadata transformations. Only used outside of tests by @{link localWriteRangesAllSettled} to
* identify how placements for the local node will be affected by in flight operations. In that case, the result
* is cached so this should be called at most once for a given ClusterMetadata instance.
*/
@VisibleForTesting
public static DataPlacements placementsAllSettledForNode(NodeId peer, ClusterMetadata metadata)
{
Iterator<MultiStepOperation<?>> iter = metadata.inProgressSequences.iterator();
while (iter.hasNext())
{
MultiStepOperation<?> operation = iter.next();
// Check whether the MSO materially affects the local ranges of the target node.
boolean isRelevantOperation = operationAffectsLocalRangesOfPeer(peer,
operation,
metadata.directory);
if (isRelevantOperation)
{
Transformation.Result result = iter.next().applyTo(metadata);
logger.debug("Operation {} affects node {}, calculating local ranges after application",
operation.sequenceKey(), peer);
Transformation.Result result = operation.applyTo(metadata);
assert result.isSuccess();
metadata = result.success().metadata;
}
writePlacementAllSettled = metadata.placements;
}
return writePlacementAllSettled.get(ksm.params.replication);
return metadata.placements;
}

public static boolean operationAffectsLocalRangesOfPeer(NodeId peer,
MultiStepOperation<?> operation,
Directory directory)
{
return operation.affectedPeers(directory).contains(peer);
}

@VisibleForTesting
public void unsafeClearLocalRangesAllSettled()
{
localRangesAllSettled = null;
}

// TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges
Expand All @@ -352,14 +405,14 @@ public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endp
return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
}

public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
public RangesAtEndpoint localWriteRanges(KeyspaceMetadata metadata)
{
return writeRanges(metadata, FBUtilities.getBroadcastAddressAndPort());
}

public Collection<Range<Token>> writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer)
public RangesAtEndpoint writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer)
{
return placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges();
return placements.get(metadata.params.replication).writes.byEndpoint().get(peer);
}

// TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges
Expand Down
43 changes: 43 additions & 0 deletions src/java/org/apache/cassandra/tcm/MultiStepOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@
package org.apache.cassandra.tcm;

import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.sequences.AddToCMS;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
Expand Down Expand Up @@ -57,6 +65,8 @@
*/
public abstract class MultiStepOperation<CONTEXT>
{
private static final Logger logger = LoggerFactory.getLogger(MultiStepOperation.class);

public enum Kind
{
@Deprecated(since = "CEP-21")
Expand Down Expand Up @@ -155,6 +165,39 @@ public boolean finishDuringStartup()
*/
public abstract Transformation.Result applyTo(ClusterMetadata metadata);

/**
* Return the id of any peer known to be involved in the execution of the operation.
* For example, in the case of a new node bootstrapping this would include all current and proposed replicas of the
* affected ranges.
* Important: this currently requires a Directory to be supplied as many MSO implementations are endpoint-centric
* The directory is used to convert endpoints to node ids, but this will become unnecessary as placements & deltas
* evolve away from endpoints to use ids directly.
* @return Node ids of the peers involved in the operation
*/
public abstract Set<NodeId> affectedPeers(Directory directory);

/**
* Helper method for affectedPeers implementations to convert from endpoints to node ids
* @return set of node ids which map to the supplied endpoints using the directory. Any endpoints without a
* corresponding id are ignored
*/
protected Set<NodeId> endpointsToIds(Set<InetAddressAndPort> endpoints, Directory directory)
{
Set<NodeId> affectedNodes = Sets.newHashSetWithExpectedSize(endpoints.size());
for (InetAddressAndPort endpoint : endpoints)
{
NodeId id = directory.peerId(endpoint);
// TODO should we error here?
if (id == null)
logger.warn("No node id found for endpoint {} in directory with epoch {} " +
"by MultiStepOperation {} with sequence key {}",
endpoint, directory.lastModified().getEpoch(), kind(), sequenceKey());
else
affectedNodes.add(id);
}
return affectedNodes;
}

/**
* Helper method for the standard applyTo implementations where we just execute a list of transformations, starting at `next`
* @return
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/tcm/ownership/Delta.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.cassandra.tcm.ownership;

import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
Expand Down Expand Up @@ -52,6 +55,13 @@ public Delta onlyRemovals()
return new Delta(removals, RangesByEndpoint.EMPTY);
}

public Set<InetAddressAndPort> allEndpoints()
{
Set<InetAddressAndPort> endpoints = new HashSet<>(removals.keySet());
endpoints.addAll(additions.keySet());
return endpoints;
}

/**
* Merges this delta with `other`
*
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
Expand Down Expand Up @@ -152,6 +155,14 @@ public PlacementDelta onlyRemovals()
return new PlacementDelta(reads.onlyRemovals(), writes.onlyRemovals());
}

// TODO deltas (& placements in general) should deal in node ids, not endpoints.
public Set<InetAddressAndPort> affectedEndpoints()
{
Set<InetAddressAndPort> affectedEndpoints = new HashSet<>(reads.allEndpoints());
affectedEndpoints.addAll(writes.allEndpoints());
return affectedEndpoints;
}

public DataPlacement apply(Epoch epoch, DataPlacement placement)
{
DataPlacement.Builder builder = placement.unbuild();
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
Expand Down Expand Up @@ -125,6 +126,12 @@ public Transformation.Result applyTo(ClusterMetadata metadata)
return finishJoin.execute(metadata);
}

@Override
public Set<NodeId> affectedPeers(Directory directory)
{
return Set.of();
}

@Override
public SequenceState executeNext()
{
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.DataPlacement;
Expand Down Expand Up @@ -185,6 +187,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata)
return applyMultipleTransformations(metadata, next, of(startJoin, midJoin, finishJoin));
}

@Override
public Set<NodeId> affectedPeers(Directory directory)
{
Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
affectedEndpoints.addAll(startJoin.affectedEndpoints());
affectedEndpoints.addAll(midJoin.affectedEndpoints());
affectedEndpoints.addAll(finishJoin.affectedEndpoints());
return endpointsToIds(affectedEndpoints, directory);
}

@Override
public SequenceState executeNext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.DataPlacement;
Expand Down Expand Up @@ -180,6 +181,16 @@ public Transformation.Result applyTo(ClusterMetadata metadata)
return applyMultipleTransformations(metadata, next, of(startReplace, midReplace, finishReplace));
}

@Override
public Set<NodeId> affectedPeers(Directory directory)
{
Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
affectedEndpoints.addAll(startReplace.affectedEndpoints());
affectedEndpoints.addAll(midReplace.affectedEndpoints());
affectedEndpoints.addAll(finishReplace.affectedEndpoints());
return endpointsToIds(affectedEndpoints, directory);
}

@Override
public SequenceState executeNext()
{
Expand Down
Loading