From 3abbb58de7c22842e18b67835c997856e9929ea5 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Fri, 16 Jan 2026 16:31:51 -0800 Subject: [PATCH 1/8] CEP-45: Incremental Repair Blocking Wait for offsets --- ...MutationTrackingIncrementalRepairTask.java | 207 ++++++++++++++++ .../cassandra/repair/RepairCoordinator.java | 10 +- .../replication/BroadcastLogOffsets.java | 6 + .../replication/MutationTrackingService.java | 54 +++++ .../MutationTrackingSyncCoordinator.java | 229 ++++++++++++++++++ .../apache/cassandra/replication/Shard.java | 18 ++ ...tionTrackingIncrementalRepairTaskTest.java | 171 +++++++++++++ .../MutationTrackingSyncCoordinatorTest.java | 196 +++++++++++++++ 8 files changed, 890 insertions(+), 1 deletion(-) create mode 100644 src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java create mode 100644 src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java diff --git a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java new file mode 100644 index 000000000000..d70746408adf --- /dev/null +++ b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +/** Incremental repair task for keyspaces using mutation tracking */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + private static final long SYNC_TIMEOUT_MINUTES = 30; + + private final TimeUUID parentSession; + private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, + TimeUUID parentSession, + RepairCoordinator.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(coordinator); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "MutationTrackingIncrementalRepair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { + List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + if (allRanges.isEmpty()) + { + logger.info("No common ranges to repair for keyspace {}", keyspace); + return new AsyncPromise().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); + } + + List syncCoordinators = new ArrayList<>(); + List>> rangeCollections = new ArrayList<>(); + + for (CommonRange commonRange : allRanges) + { + for (Range range : commonRange.ranges) + { + MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range); + syncCoordinator.start(); + syncCoordinators.add(syncCoordinator); + rangeCollections.add(List.of(range)); + + logger.info("Started mutation tracking sync for range {}", range); + } + } + + coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); + + AsyncPromise resultPromise = new AsyncPromise<>(); + + executor.execute(() -> { + try + { + waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise); + } + catch (Exception e) + { + logger.error("Error during mutation tracking repair", e); + resultPromise.tryFailure(e); + } + }); + + return resultPromise; + } + + private void waitForSyncCompletion(List syncCoordinators, + ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + List>> rangeCollections, + AsyncPromise resultPromise) throws InterruptedException + { + boolean allSucceeded = true; + for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) + { + boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES); + if (!completed) + { + logger.warn("Mutation tracking sync timed out for keyspace {} range {}", + keyspace, syncCoordinator.getRange()); + syncCoordinator.cancel(); + allSucceeded = false; + } + } + + if (!allSucceeded) + { + resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges")); + return; + } + + coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); + + if (requiresTraditionalRepair(keyspace)) + { + runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise); + } + else + { + // Pure mutation tracking - create successful result + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of())); + } + } + + private void runTraditionalRepairForMigration(ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + AsyncPromise resultPromise) + { + coordinator.notifyProgress("Running traditional repair for migration"); + + // Use the inherited runRepair method from AbstractRepairTask + Future traditionalRepair = runRepair(parentSession, true, executor, + validationScheduler, allRanges, + neighborsAndRanges.shouldExcludeDeadParticipants, + cfnames); + + traditionalRepair.addListener(f -> { + try + { + CoordinatedRepairResult result = (CoordinatedRepairResult) f.get(); + resultPromise.setSuccess(result); + } + catch (Exception e) + { + resultPromise.setFailure(e); + } + }); + } + + /** + * Determines if this keyspace should use mutation tracking incremental repair. + * Returns true if: + * - Keyspace uses mutation tracking replication, OR + * - Keyspace is currently migrating (either direction) + */ + public static boolean shouldUseMutationTrackingRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null); + if (ksm == null) + return false; + + // Check if keyspace uses mutation tracking + if (ksm.useMutationTracking()) + return true; + + // Check if keyspace is in migration (either direction) + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } + + /** + * Determines if we also need to run traditional repair. + * Returns true during migration: + * - Migrating TO mutation tracking: need traditional repair to sync pre-migration data + * - Migrating FROM mutation tracking: need traditional repair for post-migration consistency + */ + public static boolean requiresTraditionalRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } +} diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index b511d081c984..55274dd7b996 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -503,7 +503,15 @@ private Future>> repair(String[] } else if (state.options.isIncremental()) { - task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + // For keyspaces using mutation tracking, use the mutation tracking repair task + if (MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(state.keyspace)) + { + task = new MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } + else + { + task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } } else { diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index abd23dc3f218..6246a7286859 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.cassandra.db.TypeSizes; @@ -51,6 +52,11 @@ boolean isEmpty() return replicatedOffsets.isEmpty(); } + public List getOffsets() + { + return Collections.unmodifiableList(replicatedOffsets); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 18394bdf5390..38128628aa2d 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -131,6 +131,8 @@ public class MutationTrackingService private final IncomingMutations incomingMutations = new IncomingMutations(); private final OutgoingMutations outgoingMutations = new OutgoingMutations(); + private final Map> syncCoordinatorsByKeyspace = new ConcurrentHashMap<>(); + private volatile boolean started = false; private MutationTrackingService() @@ -305,6 +307,19 @@ public void updateReplicatedOffsets(String keyspace, Range range, List coordinators = syncCoordinatorsByKeyspace.get(keyspace); + if (coordinators != null) + { + for (MutationTrackingSyncCoordinator coordinator : coordinators) + { + if (range.intersects(coordinator.getRange())) + { + coordinator.onOffsetsReceived(); + } + } + } } finally { @@ -367,6 +382,30 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta return incomingMutations.subscribe(mutationId, callback); } + /** + * Register a sync coordinator to be notified when offset updates arrive. + */ + public void registerSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + syncCoordinatorsByKeyspace.computeIfAbsent(coordinator.getKeyspace(), k -> ConcurrentHashMap.newKeySet()) + .add(coordinator); + } + + /** + * Unregister a sync coordinator. + */ + public void unregisterSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + Set coordinators = syncCoordinatorsByKeyspace.get(coordinator.getKeyspace()); + if (coordinators != null) + { + coordinators.remove(coordinator); + + if (coordinators.isEmpty()) + syncCoordinatorsByKeyspace.remove(coordinator.getKeyspace(), coordinators); + } + } + public void executeTransfers(String keyspace, Set sstables, ConsistencyLevel cl) { shardLock.readLock().lock(); @@ -495,6 +534,21 @@ public Iterable getShards() return shards; } + public void forEachShardInKeyspace(String keyspace, Consumer consumer) + { + shardLock.readLock().lock(); + try + { + KeyspaceShards ksShards = keyspaceShards.get(keyspace); + if (ksShards != null) + ksShards.forEachShard(consumer); + } + finally + { + shardLock.readLock().unlock(); + } + } + public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into) { shardLock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java new file mode 100644 index 000000000000..09a33c59a8f4 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final String keyspace; + private final Range range; + private final AsyncPromise completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map, ShardSyncState> shardStates = new ConcurrentHashMap<>(); + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean completed = new AtomicBoolean(false); + + public MutationTrackingSyncCoordinator(String keyspace, Range range) + { + this.keyspace = keyspace; + this.range = range; + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + List overlappingShards; + + overlappingShards = new ArrayList<>(); + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + if (overlappingShards.isEmpty()) + { + completionFuture.setSuccess(null); + return; + } + + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + + // Initialize state for each shard and capture targets + for (Shard shard : overlappingShards) + { + ShardSyncState state = new ShardSyncState(shard); + state.captureTargets(); + shardStates.put(shard.range, state); + } + + if (checkIfComplete()) + { + complete(); + return; + } + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards", + keyspace, range, overlappingShards.size()); + } + + private void complete() + { + if (!completed.compareAndSet(false, true)) + return; + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setSuccess(null); + } + + private boolean checkIfComplete() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.isComplete()) + return false; + } + return true; + } + + public void onOffsetsReceived() + { + if (completed.get()) + return; + + // The underlying CoordinatorLog already updates its reconciled offsets. + // We just need to re-check if we're now complete. + if (checkIfComplete()) + { + complete(); + } + } + + public String getKeyspace() + { + return keyspace; + } + + public Range getRange() + { + return range; + } + + public Future awaitCompletion() + { + return completionFuture; + } + + /** + * Blocks until sync completes or timeout is reached. + * + * @param timeout Maximum time to wait + * @param unit Time unit + * @return true if completed, false if timed out + */ + public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException + { + try + { + completionFuture.get(timeout, unit); + return true; + } + catch (java.util.concurrent.TimeoutException e) + { + return false; + } + catch (java.util.concurrent.ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + } + + public void cancel() + { + if (completed.compareAndSet(false, true)) + { + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Sync cancelled")); + } + } + + /** + * Tracks sync state for a single shard. + */ + private static class ShardSyncState + { + private final Shard shard; + + // Target offsets: LogId -> the offsets we're waiting for all nodes to have + private final Map targets = new ConcurrentHashMap<>(); + + ShardSyncState(Shard shard) + { + this.shard = shard; + } + + void captureTargets() + { + BroadcastLogOffsets current = shard.collectReplicatedOffsets(false); + for (Offsets.Immutable logOffsets : current.getOffsets()) + { + targets.put(logOffsets.logId(), logOffsets); + } + } + + boolean isComplete() + { + Map currentReconciled = shard.collectReconciledOffsetsPerLog(); + + for (Map.Entry entry : targets.entrySet()) + { + CoordinatorLogId logId = entry.getKey(); + Offsets.Immutable target = entry.getValue(); + + Offsets.Immutable reconciled = currentReconciled.get(logId); + if (reconciled == null) + return false; + + // Check if reconciled contains all offsets in target + if (!containsAll(reconciled, target)) + return false; + } + return true; + } + + private boolean containsAll(Offsets reconciled, Offsets target) + { + for (ShortMutationId id : target) + { + if (!reconciled.contains(id.offset())) + return false; + } + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 5f7f1e7ee641..9bb9646ab14b 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -406,6 +408,22 @@ void collectShardReconciledOffsetsToBuilder(ReconciledKeyspaceOffsets.Builder ke logs.values().forEach(log -> keyspaceBuilder.put(log.logId, log.collectReconciledOffsets(), range)); } + /** + * Returns the reconciled offsets for each coordinator log in this shard. + * Reconciled offsets are the intersection of what all participants have. + */ + public Map collectReconciledOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable reconciled = log.collectReconciledOffsets(); + if (reconciled != null && !reconciled.isEmpty()) + result.put(log.logId, reconciled); + } + return result; + } + public DebugInfo getDebugInfo() { SortedMap logDebugState = new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java new file mode 100644 index 000000000000..5383fea0d79e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask; + +import static org.junit.Assert.*; + +/** + * Tests for MutationTrackingIncrementalRepairTask. + * Tests the decision logic for when to use mutation tracking repair. + * + * Uses a shared cluster across all tests to minimize overhead. + */ +public class MutationTrackingIncrementalRepairTaskTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + private static final AtomicInteger ksCounter = new AtomicInteger(); + + @BeforeClass + public static void setupCluster() throws IOException + { + CLUSTER = Cluster.build() + .withNodes(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set("mutation_tracking_enabled", true)) + .start(); + } + + @AfterClass + public static void teardownCluster() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + private static String nextKsName() + { + return "mtirt_ks" + ksCounter.incrementAndGet(); + } + + @Test + public void testShouldUseMutationTrackingRepairForTrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUse); + } + + @Test + public void testShouldNotUseMutationTrackingRepairForUntrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUse); + } + + @Test + public void testRequiresTraditionalRepairReturnsFalseForNonMigratingKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean requiresTraditional = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + + assertFalse("Non-migrating keyspace should not require traditional repair", requiresTraditional); + } + + @Test + public void testShouldUseMutationTrackingRepairForNonexistentKeyspace() throws Throwable + { + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair("nonexistent_ks_xyz")); + + assertFalse("Nonexistent keyspace should return false", shouldUse); + } + + @Test + public void testMigrationFromUntrackedToTracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating keyspace should not require traditional repair", requiresBefore); + + // Trigger migration by altering to tracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + // Verify migration state - both methods should now return true + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Migrating keyspace should use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Migrating keyspace should require traditional repair", requiresAfter); + } + + @Test + public void testMigrationFromTrackedToUntracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating tracked keyspace should not require traditional repair", requiresBefore); + + // Migrate back to untracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + // During reverse migration, both should still apply + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Keyspace migrating from tracked should still use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Keyspace migrating from tracked should require traditional repair", requiresAfter); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java new file mode 100644 index 000000000000..7c29ccc121d2 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + // Create a tracked keyspace + cluster.schemaChange("CREATE KEYSPACE " + KS_NAME + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + KS_NAME + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + Range fullRange = new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullRange); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable + { + try (Cluster cluster = builder().withNodes(6).start()) + { + // Create a tracked keyspace + cluster.schemaChange("CREATE KEYSPACE " + KS_NAME + "2 WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + KS_NAME + "2.tbl (k int PRIMARY KEY, v int)"); + + // Insert some data to create mutations + for (int i = 0; i < 10000; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + KS_NAME + "2.tbl (k, v) VALUES (?, ?)", + ConsistencyLevel.ALL, i, i + ); + } + + Thread.sleep(500); // Wait for offset broadcasts to propagate + + // Create a sync coordinator - should complete since all data is synced (CL.ALL) + Boolean completed = cluster.get(1).callOnInstance(() -> { + Range fullRange = new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullRange); + coordinator.start(); + + try + { + // Give it enough time for broadcasts to arrive + return coordinator.awaitCompletion(15, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete after data is fully replicated", completed); + } + } + + @Test + public void testSyncCoordinatorCancel() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + // Create a tracked keyspace with data so there are shards to sync + cluster.schemaChange("CREATE KEYSPACE cancel_test_ks WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE cancel_test_ks.tbl (k int PRIMARY KEY, v int)"); + + // Pause offset broadcasts on all nodes to prevent sync from completing + for (int i = 1; i <= 3; i++) + { + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(true)); + } + + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO cancel_test_ks.tbl (k, v) VALUES (?, ?)", + ConsistencyLevel.ONE, i, i); + } + + // Start coordinator - it will be stuck waiting for offsets + Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { + Range fullRange = new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator("cancel_test_ks", fullRange); + coordinator.start(); + + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + + coordinator.cancel(); // Cancel it + + // Verify it was cancelled + try + { + coordinator.awaitCompletion(1, TimeUnit.SECONDS); + return false; // Should have thrown + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + catch (RuntimeException e) + { + return e.getMessage() != null && e.getMessage().contains("cancelled"); + } + }); + assertTrue("Sync coordinator should be cancelled", wasCancelled); + } + } +} From 65471d8bc616500b819a2af39125133bebdb4982 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Wed, 21 Jan 2026 15:27:25 -0800 Subject: [PATCH 2/8] end--no-edit --- .../cassandra/replication/CoordinatorLog.java | 18 ++++++++++++++ .../replication/MutationTrackingService.java | 24 +++++++++---------- .../MutationTrackingSyncCoordinator.java | 13 ++-------- .../replication/Node2OffsetsMap.java | 13 ++++++++++ .../apache/cassandra/replication/Shard.java | 16 +++++++++++++ 5 files changed, 61 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 56180c25945a..50ec6c24d8b2 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets() } } + /** + * Returns the UNION of all witnessed offsets from all participants. + * This represents all offsets that ANY replica has witnessed. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets() + { + lock.readLock().lock(); + try + { + Offsets.Mutable union = witnessedOffsets.union(); + return union.isEmpty() ? null : Offsets.Immutable.copy(union); + } + finally + { + lock.readLock().unlock(); + } + } + public long getUnreconciledCount() { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 38128628aa2d..e28fe825f61e 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -307,24 +307,24 @@ public void updateReplicatedOffsets(String keyspace, Range range, List coordinators = syncCoordinatorsByKeyspace.get(keyspace); - if (coordinators != null) + // Notify any registered sync coordinators about the offset update + Set coordinators = syncCoordinatorsByKeyspace.get(keyspace); + if (coordinators != null) + { + for (MutationTrackingSyncCoordinator coordinator : coordinators) { - for (MutationTrackingSyncCoordinator coordinator : coordinators) + if (range.intersects(coordinator.getRange())) { - if (range.intersects(coordinator.getRange())) - { - coordinator.onOffsetsReceived(); - } + coordinator.onOffsetsReceived(); } } } - finally - { - shardLock.readLock().unlock(); - } } public void recordFullyReconciledOffsets(ReconciledLogSnapshot reconciledSnapshot) diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java index 09a33c59a8f4..4210d4638306 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -31,7 +31,6 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; public class MutationTrackingSyncCoordinator { @@ -134,11 +133,6 @@ public Range getRange() return range; } - public Future awaitCompletion() - { - return completionFuture; - } - /** * Blocks until sync completes or timeout is reached. * @@ -189,11 +183,8 @@ private static class ShardSyncState void captureTargets() { - BroadcastLogOffsets current = shard.collectReplicatedOffsets(false); - for (Offsets.Immutable logOffsets : current.getOffsets()) - { - targets.put(logOffsets.logId(), logOffsets); - } + Map unionOffsets = shard.collectUnionOfWitnessedOffsetsPerLog(); + targets.putAll(unionOffsets); } boolean isComplete() diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java index ac6fcc0dafae..8d943feb1050 100644 --- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java @@ -73,6 +73,19 @@ Offsets.Mutable intersection() return intersection; } + Offsets.Mutable union() + { + Iterator iter = offsetsMap.values().iterator(); + if (offsetsMap.size() == 1) + return Offsets.Mutable.copy(iter.next()); + + Offsets.Mutable union = Offsets.Mutable.copy(iter.next()); + while (iter.hasNext()) + union.addAll(iter.next()); + + return union; + } + public void add(int node, Offsets offsets) { Offsets.Mutable current = offsetsMap.get(node); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 9bb9646ab14b..f164d3d4e678 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -424,6 +424,22 @@ public Map collectReconciledOffsetsPerLog() return result; } + /** + * Returns the UNION of witnessed offsets from all participants for each coordinator log. + * Union = all offsets that ANY replica has witnessed. + */ + public Map collectUnionOfWitnessedOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable union = log.collectUnionOfWitnessedOffsets(); + if (union != null && !union.isEmpty()) + result.put(log.logId, union); + } + return result; + } + public DebugInfo getDebugInfo() { SortedMap logDebugState = new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong)); From 8408e13699fe4b191f6deb1510bba20424b6a3d6 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Wed, 21 Jan 2026 16:03:18 -0800 Subject: [PATCH 3/8] Create new test to validate inc repair on ALL replicas --- .../MutationTrackingSyncCoordinatorTest.java | 122 ++++++++++++------ 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java index 7c29ccc121d2..4ca689666f26 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -43,26 +43,45 @@ public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl private static final String KS_NAME = "sync_test_ks"; private static final String TBL_NAME = "sync_test_tbl"; + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(pause)); + } + + private static Range fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + @Test public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable { try (Cluster cluster = builder().withNodes(3).start()) { - // Create a tracked keyspace - cluster.schemaChange("CREATE KEYSPACE " + KS_NAME + " WITH replication = " + - "{'class': 'SimpleStrategy', 'replication_factor': 3} " + - "AND replication_type='tracked'"); - cluster.schemaChange("CREATE TABLE " + KS_NAME + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + createTrackedKeyspace(cluster, ""); // Create a sync coordinator for a range that has no data // It should complete immediately since there are no offsets to sync Boolean completed = cluster.get(1).callOnInstance(() -> { - Range fullRange = new Range<>( - new Murmur3Partitioner.LongToken(Long.MIN_VALUE), - new Murmur3Partitioner.LongToken(Long.MAX_VALUE) - ); - - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullRange); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange()); coordinator.start(); try @@ -85,17 +104,13 @@ public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable { try (Cluster cluster = builder().withNodes(6).start()) { - // Create a tracked keyspace - cluster.schemaChange("CREATE KEYSPACE " + KS_NAME + "2 WITH replication = " + - "{'class': 'SimpleStrategy', 'replication_factor': 3} " + - "AND replication_type='tracked'"); - cluster.schemaChange("CREATE TABLE " + KS_NAME + "2.tbl (k int PRIMARY KEY, v int)"); + createTrackedKeyspace(cluster, "2"); // Insert some data to create mutations for (int i = 0; i < 10000; i++) { cluster.coordinator(1).execute( - "INSERT INTO " + KS_NAME + "2.tbl (k, v) VALUES (?, ?)", + "INSERT INTO " + tableName("2") + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, i, i ); } @@ -104,12 +119,7 @@ public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable // Create a sync coordinator - should complete since all data is synced (CL.ALL) Boolean completed = cluster.get(1).callOnInstance(() -> { - Range fullRange = new Range<>( - new Murmur3Partitioner.LongToken(Long.MIN_VALUE), - new Murmur3Partitioner.LongToken(Long.MAX_VALUE) - ); - - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullRange); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullTokenRange()); coordinator.start(); try @@ -128,38 +138,70 @@ public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable } } + @Test + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable + { + try (Cluster cluster = builder().withNodes(6).start()) + { + createTrackedKeyspace(cluster, "3"); + + // Pause broadcasts so nodes don't share offsets yet + pauseOffsetBroadcasts(cluster, true); + + // Write from different nodes with CL.ONE - each node has different mutations + // Different coordinators create mutations that only their local replica group knows about initially + cluster.coordinator(1).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", ConsistencyLevel.ONE); + cluster.coordinator(2).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (2, 2)", ConsistencyLevel.ONE); + cluster.coordinator(3).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (3, 3)", ConsistencyLevel.ONE); + + // Resume broadcasts so nodes can share their offsets + pauseOffsetBroadcasts(cluster, false); + + // Trigger broadcasts to share offsets between nodes + for (int i = 1; i <= 6; i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + Thread.sleep(500); // Wait for broadcasts to propagate + + Boolean completed = cluster.get(4).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "3", fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync should complete after all mutations from all nodes are reconciled", completed); + } + } + @Test public void testSyncCoordinatorCancel() throws Throwable { try (Cluster cluster = builder().withNodes(3).start()) { - // Create a tracked keyspace with data so there are shards to sync - cluster.schemaChange("CREATE KEYSPACE cancel_test_ks WITH replication = " + - "{'class': 'SimpleStrategy', 'replication_factor': 3} " + - "AND replication_type='tracked'"); - cluster.schemaChange("CREATE TABLE cancel_test_ks.tbl (k int PRIMARY KEY, v int)"); + createTrackedKeyspace(cluster, "4"); // Pause offset broadcasts on all nodes to prevent sync from completing - for (int i = 1; i <= 3; i++) - { - cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(true)); - } + pauseOffsetBroadcasts(cluster, true); for (int i = 0; i < 100; i++) { cluster.coordinator(1).execute( - "INSERT INTO cancel_test_ks.tbl (k, v) VALUES (?, ?)", + "INSERT INTO " + tableName("4") + " (k, v) VALUES (?, ?)", ConsistencyLevel.ONE, i, i); } // Start coordinator - it will be stuck waiting for offsets Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { - Range fullRange = new Range<>( - new Murmur3Partitioner.LongToken(Long.MIN_VALUE), - new Murmur3Partitioner.LongToken(Long.MAX_VALUE) - ); - - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator("cancel_test_ks", fullRange); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "4", fullTokenRange()); coordinator.start(); try @@ -187,7 +229,7 @@ public void testSyncCoordinatorCancel() throws Throwable } catch (RuntimeException e) { - return e.getMessage() != null && e.getMessage().contains("cancelled"); + return e.getMessage() != null && e.getMessage().contains("cancelled"); } }); assertTrue("Sync coordinator should be cancelled", wasCancelled); From 619fe4a4ca2167cfaef05a8befc160de9064b13a Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Mon, 26 Jan 2026 10:56:25 -0800 Subject: [PATCH 4/8] SyncCoordinatorTest file fix --- .../replication/MutationTrackingService.java | 2 +- .../MutationTrackingSyncCoordinator.java | 110 +++++++++++++-- .../MutationTrackingSyncCoordinatorTest.java | 125 ++++++++++-------- 3 files changed, 166 insertions(+), 71 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index e28fe825f61e..1ed2a1c8237e 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -321,7 +321,7 @@ public void updateReplicatedOffsets(String keyspace, Range range, List range; private final AsyncPromise completionFuture = new AsyncPromise<>(); + private volatile long startTimeMs; // Per-shard state: tracks what each node has reported for that shard private final Map, ShardSyncState> shardStates = new ConcurrentHashMap<>(); @@ -46,6 +56,9 @@ public class MutationTrackingSyncCoordinator private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean completed = new AtomicBoolean(false); + private final Set allParticipants = new HashSet<>(); + private final Set reportedParticipants = ConcurrentHashMap.newKeySet(); + public MutationTrackingSyncCoordinator(String keyspace, Range range) { this.keyspace = keyspace; @@ -57,6 +70,8 @@ public void start() if (!started.compareAndSet(false, true)) throw new IllegalStateException("Sync coordinator already started"); + startTimeMs = System.currentTimeMillis(); + List overlappingShards; overlappingShards = new ArrayList<>(); @@ -71,25 +86,35 @@ public void start() return; } + InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + for (Shard shard : overlappingShards) + { + allParticipants.addAll(shard.remoteReplicas()); + allParticipants.add(localAddress); + } + // Register to receive offset updates MutationTrackingService.instance.registerSyncCoordinator(this); - // Initialize state for each shard and capture targets + // Initialize state for each shard for (Shard shard : overlappingShards) { ShardSyncState state = new ShardSyncState(shard); - state.captureTargets(); shardStates.put(shard.range, state); } - if (checkIfComplete()) - { - complete(); - return; - } + // Mark self as reported and capture local targets + reportedParticipants.add(localAddress); + recaptureTargets(); + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards, waiting for {} participants", + keyspace, range, overlappingShards.size(), allParticipants.size()); - logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards", - keyspace, range, overlappingShards.size()); + // Check if we're the only participant and already complete + checkIfReadyToComplete(); + + // Schedule a delayed check for the empty targets timeout case + scheduler.schedule(this::checkIfReadyToComplete, EMPTY_TARGETS_TIMEOUT_MS + 100, TimeUnit.MILLISECONDS); } private void complete() @@ -110,19 +135,80 @@ private boolean checkIfComplete() return true; } - public void onOffsetsReceived() + private void recaptureTargets() + { + for (ShardSyncState state : shardStates.values()) + { + state.captureTargets(); + } + } + + /** + * Check if we're ready to complete. We can complete when: + * 1. All participants have reported their offsets AND all targets are reconciled, OR + * 2. No targets have been discovered after the timeout (no data to sync anywhere) + */ + private void checkIfReadyToComplete() { if (completed.get()) return; - // The underlying CoordinatorLog already updates its reconciled offsets. - // We just need to re-check if we're now complete. + if (hasNoTargets() && (System.currentTimeMillis() - startTimeMs) > EMPTY_TARGETS_TIMEOUT_MS) + { + logger.info("Sync coordinator completed for keyspace {} range {} - no targets discovered after {}ms", + keyspace, range, EMPTY_TARGETS_TIMEOUT_MS); + complete(); + return; + } + + // Wait until all participants have reported + if (!reportedParticipants.containsAll(allParticipants)) + { + logger.trace("Sync coordinator waiting for participants. Reported: {}, All: {}", + reportedParticipants.size(), allParticipants.size()); + return; + } + + // All participants have reported, check if targets are reconciled if (checkIfComplete()) { + logger.info("Sync coordinator completed for keyspace {} range {}", keyspace, range); complete(); } } + private boolean hasNoTargets() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.targets.isEmpty()) + return false; + } + return true; + } + + /** + * Called when offset updates are received from a participant. + * @param from The participant that sent the offsets + */ + public void onOffsetsReceived(InetAddressAndPort from) + { + if (completed.get()) + return; + + boolean newParticipant = reportedParticipants.add(from); + + if (newParticipant) + { + logger.trace("Sync coordinator received offsets from new participant {}. Reported: {}/{}", + from, reportedParticipants.size(), allParticipants.size()); + } + + recaptureTargets(); // Recapture targets to include any new coordinator logs + + checkIfReadyToComplete(); + } + public String getKeyspace() { return keyspace; diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java index 4ca689666f26..b2a5cbfe5ae0 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.distributed.test.replication; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -29,6 +31,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.awaitility.Awaitility; import static org.junit.Assert.*; @@ -100,85 +103,91 @@ public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable } @Test - public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable { - try (Cluster cluster = builder().withNodes(6).start()) + try (Cluster cluster = builder().withNodes(3).start()) { - createTrackedKeyspace(cluster, "2"); + createTrackedKeyspace(cluster, "3"); - // Insert some data to create mutations - for (int i = 0; i < 10000; i++) - { - cluster.coordinator(1).execute( - "INSERT INTO " + tableName("2") + " (k, v) VALUES (?, ?)", - ConsistencyLevel.ALL, i, i - ); - } + // Block all messages FROM node 1 to prevent write replication + // This ensures that write only succeeds locally on node 1 + cluster.filters().allVerbs().from(1).drop(); - Thread.sleep(500); // Wait for offset broadcasts to propagate + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", + ConsistencyLevel.ONE + ); - // Create a sync coordinator - should complete since all data is synced (CL.ALL) - Boolean completed = cluster.get(1).callOnInstance(() -> { - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullTokenRange()); + // Start MutationTrackingSyncCoordinator on node 2 in a separate thread + // It should wait for offsets to sync since node 1's data hasn't propagated yet + CompletableFuture coordinatorFuture = CompletableFuture.supplyAsync(() -> cluster.get(2).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '3', fullTokenRange()); coordinator.start(); try { - // Give it enough time for broadcasts to arrive - return coordinator.awaitCompletion(15, TimeUnit.SECONDS); + return coordinator.awaitCompletion(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } - }); - - assertTrue("Sync coordinator should complete after data is fully replicated", completed); - } - } - - @Test - public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable - { - try (Cluster cluster = builder().withNodes(6).start()) - { - createTrackedKeyspace(cluster, "3"); - - // Pause broadcasts so nodes don't share offsets yet - pauseOffsetBroadcasts(cluster, true); + })); + + // Wait until node 1 has the data + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(1).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node 1 should have the data", 1, results.length); + }); + + // Verify other nodes shouldn't have the data yet since we have blocked messages + for (int i = 2; i <= 3; i++) + { + Object[][] results = cluster.get(i).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1" + ); + assertEquals("Node " + i + " should not have data yet", 0, results.length); + } - // Write from different nodes with CL.ONE - each node has different mutations - // Different coordinators create mutations that only their local replica group knows about initially - cluster.coordinator(1).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", ConsistencyLevel.ONE); - cluster.coordinator(2).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (2, 2)", ConsistencyLevel.ONE); - cluster.coordinator(3).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (3, 3)", ConsistencyLevel.ONE); + // Verify coordinator stays blocked for at least 2 seconds + Awaitility.await() + .during(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(3)) + .until(() -> !coordinatorFuture.isDone()); - // Resume broadcasts so nodes can share their offsets - pauseOffsetBroadcasts(cluster, false); + cluster.filters().reset(); - // Trigger broadcasts to share offsets between nodes - for (int i = 1; i <= 6; i++) + for (int i = 1; i <= 3; i++) cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); - Thread.sleep(500); // Wait for broadcasts to propagate + // Wait for coordinator to complete + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofMillis(200)) + .until(coordinatorFuture::isDone); - Boolean completed = cluster.get(4).callOnInstance(() -> { - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "3", fullTokenRange()); - coordinator.start(); - - try - { - return coordinator.awaitCompletion(30, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - return false; - } - }); + assertTrue("Coordinator should complete successfully", coordinatorFuture.get()); - assertTrue("Sync should complete after all mutations from all nodes are reconciled", completed); + // Verify data propagated to all replicas + for (int i = 1; i <= 3; i++) + { + final int nodeId = i; + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(nodeId).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node " + nodeId + " should have the data", 1, results.length); + assertEquals(1, results[0][0]); + assertEquals(1, results[0][1]); + }); + } } } @@ -201,7 +210,7 @@ public void testSyncCoordinatorCancel() throws Throwable // Start coordinator - it will be stuck waiting for offsets Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { - MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "4", fullTokenRange()); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '4', fullTokenRange()); coordinator.start(); try From 86ad36f1a145a266fb88c2b8a00c9788920fe12a Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Mon, 26 Jan 2026 14:41:37 -0800 Subject: [PATCH 5/8] Change shardStates from CHM -> HM --- .../replication/MutationTrackingSyncCoordinator.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java index 1ab1d459ff92..7d6f59858e94 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -51,7 +52,7 @@ public class MutationTrackingSyncCoordinator private volatile long startTimeMs; // Per-shard state: tracks what each node has reported for that shard - private final Map, ShardSyncState> shardStates = new ConcurrentHashMap<>(); + private final Map, ShardSyncState> shardStates = new HashMap<>(); private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean completed = new AtomicBoolean(false); @@ -93,9 +94,6 @@ public void start() allParticipants.add(localAddress); } - // Register to receive offset updates - MutationTrackingService.instance.registerSyncCoordinator(this); - // Initialize state for each shard for (Shard shard : overlappingShards) { @@ -103,6 +101,9 @@ public void start() shardStates.put(shard.range, state); } + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + // Mark self as reported and capture local targets reportedParticipants.add(localAddress); recaptureTargets(); From ce56492d0fe71027c786e4fe8483679f66ecc925 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Mon, 26 Jan 2026 15:06:08 -0800 Subject: [PATCH 6/8] Fix possible shard staleness --- .../MutationTrackingSyncCoordinator.java | 48 +++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java index 7d6f59858e94..818718dcb3ed 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -92,11 +92,7 @@ public void start() { allParticipants.addAll(shard.remoteReplicas()); allParticipants.add(localAddress); - } - // Initialize state for each shard - for (Shard shard : overlappingShards) - { ShardSyncState state = new ShardSyncState(shard); shardStates.put(shard.range, state); } @@ -138,12 +134,54 @@ private boolean checkIfComplete() private void recaptureTargets() { + if (checkForTopologyChange()) + return; + for (ShardSyncState state : shardStates.values()) { state.captureTargets(); } } + /** + * Checks if any of the shards we're tracking have changed due to topology updates. + * @return true if topology changed (and repair was failed), false if all shards are still current + */ + private boolean checkForTopologyChange() + { + for (ShardSyncState state : shardStates.values()) + { + Shard currentShard = getCurrentShard(state.shard.range); + if (currentShard != state.shard) + { + failWithTopologyChange(); + return true; + } + } + return false; + } + + private Shard getCurrentShard(Range shardRange) + { + Shard[] result = new Shard[1]; + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.equals(shardRange)) + result[0] = shard; + }); + return result[0]; + } + + private void failWithTopologyChange() + { + if (completed.compareAndSet(false, true)) + { + logger.warn("Sync coordinator for keyspace {} range {} failed due to topology change", + keyspace, range); + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Repair failed: topology changed during sync")); + } + } + /** * Check if we're ready to complete. We can complete when: * 1. All participants have reported their offsets AND all targets are reconciled, OR @@ -151,7 +189,7 @@ private void recaptureTargets() */ private void checkIfReadyToComplete() { - if (completed.get()) + if (completed.get() || checkForTopologyChange()) return; if (hasNoTargets() && (System.currentTimeMillis() - startTimeMs) > EMPTY_TARGETS_TIMEOUT_MS) From ff1bf5f002efa770710e04825ea0b838cbdce132 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Wed, 28 Jan 2026 15:29:18 -0800 Subject: [PATCH 7/8] Fix for happens-before --- .../MutationTrackingSyncCoordinator.java | 47 +++++++++++++--- .../MutationTrackingSyncCoordinatorTest.java | 54 +++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java index 818718dcb3ed..0dd3935aead5 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -44,6 +44,10 @@ public class MutationTrackingSyncCoordinator private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); private static final long EMPTY_TARGETS_TIMEOUT_MS = 3000; + // Must be >= TRANSIENT_BROADCAST_INTERVAL_MILLIS (200ms) + network buffer + // to ensure we receive at least one fresh broadcast from each participant + private static final long MIN_BROADCAST_WAIT_MS = 300; + private static final long PARTICIPANT_TIMEOUT_MS = 10000; private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final String keyspace; @@ -192,7 +196,10 @@ private void checkIfReadyToComplete() if (completed.get() || checkForTopologyChange()) return; - if (hasNoTargets() && (System.currentTimeMillis() - startTimeMs) > EMPTY_TARGETS_TIMEOUT_MS) + long elapsedMs = System.currentTimeMillis() - startTimeMs; + + // Handle the empty targets + if (hasNoTargets() && elapsedMs > EMPTY_TARGETS_TIMEOUT_MS) { logger.info("Sync coordinator completed for keyspace {} range {} - no targets discovered after {}ms", keyspace, range, EMPTY_TARGETS_TIMEOUT_MS); @@ -200,20 +207,48 @@ private void checkIfReadyToComplete() return; } - // Wait until all participants have reported - if (!reportedParticipants.containsAll(allParticipants)) + // Ensuring we have waited long enough for fresh broadcasts from all replicas - happens-before situation + if (elapsedMs < MIN_BROADCAST_WAIT_MS) { - logger.trace("Sync coordinator waiting for participants. Reported: {}, All: {}", - reportedParticipants.size(), allParticipants.size()); + long remainingMs = MIN_BROADCAST_WAIT_MS - elapsedMs + 10; + logger.trace("Sync coordinator waiting for broadcast cycle. Elapsed: {}ms, Required: {}ms", + elapsedMs, MIN_BROADCAST_WAIT_MS); + scheduler.schedule(this::checkIfReadyToComplete, remainingMs, TimeUnit.MILLISECONDS); return; } - // All participants have reported, check if targets are reconciled + // Wait until all participants have reported or timeout + if (!reportedParticipants.containsAll(allParticipants)) + { + if (elapsedMs < PARTICIPANT_TIMEOUT_MS) + { + logger.trace("Sync coordinator waiting for participants. Reported: {}, All: {}", + reportedParticipants.size(), allParticipants.size()); + // Schedule a retry to check again after timeout + long remainingMs = PARTICIPANT_TIMEOUT_MS - elapsedMs + 100; + scheduler.schedule(this::checkIfReadyToComplete, remainingMs, TimeUnit.MILLISECONDS); + return; + } + + Set missing = new HashSet<>(allParticipants); + missing.removeAll(reportedParticipants); + logger.warn("Sync coordinator timed out waiting for participants: {}. Proceeding with available offsets.", + missing); + } + + // All participants have reported (or timed out), check if targets are reconciled if (checkIfComplete()) { logger.info("Sync coordinator completed for keyspace {} range {}", keyspace, range); complete(); } + else if (elapsedMs >= PARTICIPANT_TIMEOUT_MS) + { + // Participant timeout reached but targets not reconciled - complete anyway + logger.warn("Sync coordinator completing for keyspace {} range {} after timeout, some targets may not be reconciled", + keyspace, range); + complete(); + } } private boolean hasNoTargets() diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java index b2a5cbfe5ae0..5b2d5cd55b8d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -120,6 +120,7 @@ public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable // Start MutationTrackingSyncCoordinator on node 2 in a separate thread // It should wait for offsets to sync since node 1's data hasn't propagated yet + long syncStartTime = System.currentTimeMillis(); CompletableFuture coordinatorFuture = CompletableFuture.supplyAsync(() -> cluster.get(2).callOnInstance(() -> { MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '3', fullTokenRange()); coordinator.start(); @@ -188,6 +189,11 @@ public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable assertEquals(1, results[0][1]); }); } + + // Verify the sync respected the minimum broadcast wait time (MIN_BROADCAST_WAIT_MS = 300ms) + long syncDuration = System.currentTimeMillis() - syncStartTime; + assertTrue("Sync should wait at least MIN_BROADCAST_WAIT_MS (300ms). Actual: " + syncDuration + "ms", + syncDuration >= 300); } } @@ -244,4 +250,52 @@ public void testSyncCoordinatorCancel() throws Throwable assertTrue("Sync coordinator should be cancelled", wasCancelled); } } + + @Test + public void testSyncCoordinatorTimesOutOnUnresponsiveParticipant() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "5"); + + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("5") + " (k, v) VALUES (1, 1)", + ConsistencyLevel.ALL + ); + + // Broadcast from all nodes first so they're in sync + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + // Block all messages FROM node 3 permanently - it will never report + cluster.filters().allVerbs().from(3).drop(); + + long syncStartTime = System.currentTimeMillis(); + + // Start sync coordinator on node 1 - it should time out waiting for node 3 + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator( + KS_NAME + '5', fullTokenRange()); + coordinator.start(); + + try + { + // Wait longer than PARTICIPANT_TIMEOUT_MS (10s) + buffer + return coordinator.awaitCompletion(20, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + long syncDuration = System.currentTimeMillis() - syncStartTime; + + assertTrue("Sync coordinator should complete after timeout", completed); + // Should have taken at least PARTICIPANT_TIMEOUT_MS (10s) + assertTrue("Sync should have timed out waiting for participant. Actual: " + syncDuration + "ms", + syncDuration >= 10000); + } + } } From a94886dfa882dcfc726bcb9151665476b21a9869 Mon Sep 17 00:00:00 2001 From: Aparna Naik Date: Thu, 29 Jan 2026 11:39:50 -0800 Subject: [PATCH 8/8] Fix MutationTrackingIncrementalRepairTask file --- .../cassandra/config/CassandraRelevantProperties.java | 1 + .../repair/MutationTrackingIncrementalRepairTask.java | 11 +++++++++-- .../MutationTrackingSyncCoordinatorTest.java | 9 +++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a518497e8f3c..0613c102dd0a 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -445,6 +445,7 @@ public enum CassandraRelevantProperties REPAIR_FAIL_TIMEOUT_SECONDS("cassandra.repair_fail_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)))), REPAIR_MUTATION_REPAIR_ROWS_PER_BATCH("cassandra.repair.mutation_repair_rows_per_batch", "100"), REPAIR_STATUS_CHECK_TIMEOUT_SECONDS("cassandra.repair_status_check_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.HOURS.toSeconds(1)))), + REPAIR_SYNC_TIMEOUT_MINUTES("cassandra.repair_sync_timeout_minutes", "30"), /** * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known * but the current state isn't known. If the host replacement is needed to repair this state, this property must diff --git a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java index d70746408adf..54ba5bcfacba 100644 --- a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java +++ b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; @@ -36,7 +37,7 @@ /** Incremental repair task for keyspaces using mutation tracking */ public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask { - private static final long SYNC_TIMEOUT_MINUTES = 30; + private static final long SYNC_TIMEOUT_MINUTES = CassandraRelevantProperties.REPAIR_SYNC_TIMEOUT_MINUTES.getLong(); private final TimeUUID parentSession; private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; @@ -140,7 +141,13 @@ private void waitForSyncCompletion(List syncCoo else { // Pure mutation tracking - create successful result - resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of())); + List results = new ArrayList<>(); + for (int i = 0; i < rangeCollections.size(); i++) + { + Collection> ranges = rangeCollections.get(i); + results.add(new RepairSessionResult(parentSession, keyspace, ranges, List.of(), false)); + } + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, results)); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java index 5b2d5cd55b8d..9ab6cb8113ab 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -263,11 +263,16 @@ public void testSyncCoordinatorTimesOutOnUnresponsiveParticipant() throws Throwa ConsistencyLevel.ALL ); - // Broadcast from all nodes first so they're in sync + // Pause broadcasts on node 3 BEFORE any broadcasts - this ensures node 3 + // never sends any offset broadcasts to the coordinator, simulating an unresponsive node + cluster.get(3).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(true)); + + // Broadcast from all nodes - node 3's broadcast is a no-op because it's paused for (int i = 1; i <= cluster.size(); i++) cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); - // Block all messages FROM node 3 permanently - it will never report + // Also block messages FROM node 3 to ensure even if periodic broadcasts resume, + // they won't reach the coordinator cluster.filters().allVerbs().from(3).drop(); long syncStartTime = System.currentTimeMillis();