Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,29 +110,32 @@ private void compile()

toSplit.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
});

toMaximal.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
addToWrites.put(replication, delta.onlyWrites());
moveReads.put(replication, delta.onlyReads());
});

toFinal.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
moveReads.put(replication, delta.onlyReads());
removeFromWrites.put(replication, delta.onlyWrites());
});

toMerged.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range()));
removeFromWrites.put(replication, delta);
});
this.addToWrites = addToWrites.build();
this.moveReads = moveReads.build();
this.removeFromWrites = removeFromWrites.build();
this.affectedRanges = affectedRanges.build();

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ public boolean await(ConsistencyLevel cl, ClusterMetadata metadata)
{
EndpointsForRange writes = metadata.placements.get(params).writes.matchRange(range).get().filter(r -> filter.test(r.endpoint()));
EndpointsForRange reads = metadata.placements.get(params).reads.matchRange(range).get().filter(r -> filter.test(r.endpoint()));
// Affected ranges can contain ranges which are the results of merging or splitting and may not exist
// as keys in the existing ReplicaGroups. As such, no replicas will be found for these ranges and so no
// WaitFor is necessary.
if (reads.isEmpty() && writes.isEmpty())
continue;

reads.stream().map(Replica::endpoint).forEach(superset::add);
writes.stream().map(Replica::endpoint).forEach(superset::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,24 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.UnsafeJoin;
import org.apache.cassandra.utils.concurrent.Future;

import static org.junit.Assert.assertEquals;

public class ProgressBarrierTest extends CMSTestBase
{
static
Expand All @@ -72,6 +77,48 @@ public class ProgressBarrierTest extends CMSTestBase
DatabaseDescriptor.setProgressBarrierBackoff(5);
}

@Test
public void testProgressBarrierWithMergingRanges()
{
TokenPlacementModel.ReplicationFactor rf = new TokenPlacementModel.SimpleReplicationFactor(1);
try (CMSTestBase.CMSSut sut = new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, rf))
{
TokenPlacementModel.NodeFactory nodeFactory = TokenPlacementModel.nodeFactory();
List<TokenPlacementModel.Node> nodes = new ArrayList<>();
for (int i = 1; i < 6; i++)
{
TokenPlacementModel.Node node = nodeFactory.make(i, 1, 1).overrideToken(i*100);
nodes.add(node);
sut.service.commit(new Register(new NodeAddresses(node.addr()),
new Location(node.dc(), node.rack()),
NodeVersion.CURRENT));
sut.service.commit(new UnsafeJoin(node.nodeId(),
Collections.singleton(node.longToken()),
ClusterMetadataService.instance().placementProvider()));
}

// 6 node cluster, with a single RF1 keyspace
// node2 owns range (100,200] & node3 (200,300]
// if node2 leaves node3 will acquire its ranges and will then own (100, 300]
// no other peers are involved in this trivial operation
// the progress barrier should be looking for consensus from (nodes2, node3)
TokenPlacementModel.Node node2 = nodes.get(1);
TokenPlacementModel.Node node3 = nodes.get(2);
sut.service.commit(new PrepareLeave(node2.nodeId(),
true,
ClusterMetadataService.instance().placementProvider(),
LeaveStreams.Kind.UNBOOTSTRAP));
UnbootstrapAndLeave leave = (UnbootstrapAndLeave) sut.service.metadata().inProgressSequences.get(node2.nodeId());

// Internally affectedRanges::toPeers uses the same logic as
// the progress barrier does to identify the consensus group
Set<NodeId> consensusGroup = leave.barrier().affectedRanges.toPeers(ReplicationParams.simple(1),
sut.service.metadata().placements,
sut.service.metadata().directory);
assertEquals(Set.of(node2.nodeId(), node3.nodeId()), consensusGroup);
}
}

@Test
public void testProgressBarrier() throws Throwable
{
Expand Down Expand Up @@ -143,7 +190,7 @@ public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort
if (respond.get())
{
responded.add(to);
cb.onResponse((Message<RSP>) message.responseWith(message.epoch()));
cb.onResponse((Message<RSP>) message.responseWith(message.epoch()).withFrom(to));
}
else
{
Expand Down