diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java index aecab7f4450..b012d7ac040 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java @@ -110,29 +110,32 @@ private void compile() toSplit.forEach((replication, delta) -> { delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); }); toMaximal.forEach((replication, delta) -> { delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); addToWrites.put(replication, delta.onlyWrites()); moveReads.put(replication, delta.onlyReads()); }); toFinal.forEach((replication, delta) -> { delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); moveReads.put(replication, delta.onlyReads()); removeFromWrites.put(replication, delta.onlyWrites()); }); toMerged.forEach((replication, delta) -> { delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + delta.reads.removals.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); removeFromWrites.put(replication, delta); }); this.addToWrites = addToWrites.build(); this.moveReads = moveReads.build(); this.removeFromWrites = removeFromWrites.build(); this.affectedRanges = affectedRanges.build(); - } @Override diff --git a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java index 7b89acceb6e..db728ef67e4 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java @@ -172,6 +172,12 @@ public boolean await(ConsistencyLevel cl, ClusterMetadata metadata) { EndpointsForRange writes = metadata.placements.get(params).writes.matchRange(range).get().filter(r -> filter.test(r.endpoint())); EndpointsForRange reads = metadata.placements.get(params).reads.matchRange(range).get().filter(r -> filter.test(r.endpoint())); + // Affected ranges can contain ranges which are the results of merging or splitting and may not exist + // as keys in the existing ReplicaGroups. As such, no replicas will be found for these ranges and so no + // WaitFor is necessary. + if (reads.isEmpty() && writes.isEmpty()) + continue; + reads.stream().map(Replica::endpoint).forEach(superset::add); writes.stream().map(Replica::endpoint).forEach(superset::add); diff --git a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java index ba431db848a..cb4f15e5efa 100644 --- a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java +++ b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java @@ -50,6 +50,7 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -57,12 +58,16 @@ import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.transformations.PrepareJoin; +import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.UnsafeJoin; import org.apache.cassandra.utils.concurrent.Future; +import static org.junit.Assert.assertEquals; + public class ProgressBarrierTest extends CMSTestBase { static @@ -72,6 +77,48 @@ public class ProgressBarrierTest extends CMSTestBase DatabaseDescriptor.setProgressBarrierBackoff(5); } + @Test + public void testProgressBarrierWithMergingRanges() + { + TokenPlacementModel.ReplicationFactor rf = new TokenPlacementModel.SimpleReplicationFactor(1); + try (CMSTestBase.CMSSut sut = new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, rf)) + { + TokenPlacementModel.NodeFactory nodeFactory = TokenPlacementModel.nodeFactory(); + List nodes = new ArrayList<>(); + for (int i = 1; i < 6; i++) + { + TokenPlacementModel.Node node = nodeFactory.make(i, 1, 1).overrideToken(i*100); + nodes.add(node); + sut.service.commit(new Register(new NodeAddresses(node.addr()), + new Location(node.dc(), node.rack()), + NodeVersion.CURRENT)); + sut.service.commit(new UnsafeJoin(node.nodeId(), + Collections.singleton(node.longToken()), + ClusterMetadataService.instance().placementProvider())); + } + + // 6 node cluster, with a single RF1 keyspace + // node2 owns range (100,200] & node3 (200,300] + // if node2 leaves node3 will acquire its ranges and will then own (100, 300] + // no other peers are involved in this trivial operation + // the progress barrier should be looking for consensus from (nodes2, node3) + TokenPlacementModel.Node node2 = nodes.get(1); + TokenPlacementModel.Node node3 = nodes.get(2); + sut.service.commit(new PrepareLeave(node2.nodeId(), + true, + ClusterMetadataService.instance().placementProvider(), + LeaveStreams.Kind.UNBOOTSTRAP)); + UnbootstrapAndLeave leave = (UnbootstrapAndLeave) sut.service.metadata().inProgressSequences.get(node2.nodeId()); + + // Internally affectedRanges::toPeers uses the same logic as + // the progress barrier does to identify the consensus group + Set consensusGroup = leave.barrier().affectedRanges.toPeers(ReplicationParams.simple(1), + sut.service.metadata().placements, + sut.service.metadata().directory); + assertEquals(Set.of(node2.nodeId(), node3.nodeId()), consensusGroup); + } + } + @Test public void testProgressBarrier() throws Throwable { @@ -143,7 +190,7 @@ public void sendWithCallback(Message message, InetAddressAndPort if (respond.get()) { responded.add(to); - cb.onResponse((Message) message.responseWith(message.epoch())); + cb.onResponse((Message) message.responseWith(message.epoch()).withFrom(to)); } else {