diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a518497e8f3c..0613c102dd0a 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -445,6 +445,7 @@ public enum CassandraRelevantProperties REPAIR_FAIL_TIMEOUT_SECONDS("cassandra.repair_fail_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)))), REPAIR_MUTATION_REPAIR_ROWS_PER_BATCH("cassandra.repair.mutation_repair_rows_per_batch", "100"), REPAIR_STATUS_CHECK_TIMEOUT_SECONDS("cassandra.repair_status_check_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.HOURS.toSeconds(1)))), + REPAIR_SYNC_TIMEOUT_MINUTES("cassandra.repair_sync_timeout_minutes", "30"), /** * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known * but the current state isn't known. If the host replacement is needed to repair this state, this property must diff --git a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java new file mode 100644 index 000000000000..54ba5bcfacba --- /dev/null +++ b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +/** Incremental repair task for keyspaces using mutation tracking */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + private static final long SYNC_TIMEOUT_MINUTES = CassandraRelevantProperties.REPAIR_SYNC_TIMEOUT_MINUTES.getLong(); + + private final TimeUUID parentSession; + private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, + TimeUUID parentSession, + RepairCoordinator.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(coordinator); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "MutationTrackingIncrementalRepair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { + List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + if (allRanges.isEmpty()) + { + logger.info("No common ranges to repair for keyspace {}", keyspace); + return new AsyncPromise().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); + } + + List syncCoordinators = new ArrayList<>(); + List>> rangeCollections = new ArrayList<>(); + + for (CommonRange commonRange : allRanges) + { + for (Range range : commonRange.ranges) + { + MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range); + syncCoordinator.start(); + syncCoordinators.add(syncCoordinator); + rangeCollections.add(List.of(range)); + + logger.info("Started mutation tracking sync for range {}", range); + } + } + + coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); + + AsyncPromise resultPromise = new AsyncPromise<>(); + + executor.execute(() -> { + try + { + waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise); + } + catch (Exception e) + { + logger.error("Error during mutation tracking repair", e); + resultPromise.tryFailure(e); + } + }); + + return resultPromise; + } + + private void waitForSyncCompletion(List syncCoordinators, + ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + List>> rangeCollections, + AsyncPromise resultPromise) throws InterruptedException + { + boolean allSucceeded = true; + for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) + { + boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES); + if (!completed) + { + logger.warn("Mutation tracking sync timed out for keyspace {} range {}", + keyspace, syncCoordinator.getRange()); + syncCoordinator.cancel(); + allSucceeded = false; + } + } + + if (!allSucceeded) + { + resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges")); + return; + } + + coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); + + if (requiresTraditionalRepair(keyspace)) + { + runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise); + } + else + { + // Pure mutation tracking - create successful result + List results = new ArrayList<>(); + for (int i = 0; i < rangeCollections.size(); i++) + { + Collection> ranges = rangeCollections.get(i); + results.add(new RepairSessionResult(parentSession, keyspace, ranges, List.of(), false)); + } + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, results)); + } + } + + private void runTraditionalRepairForMigration(ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + AsyncPromise resultPromise) + { + coordinator.notifyProgress("Running traditional repair for migration"); + + // Use the inherited runRepair method from AbstractRepairTask + Future traditionalRepair = runRepair(parentSession, true, executor, + validationScheduler, allRanges, + neighborsAndRanges.shouldExcludeDeadParticipants, + cfnames); + + traditionalRepair.addListener(f -> { + try + { + CoordinatedRepairResult result = (CoordinatedRepairResult) f.get(); + resultPromise.setSuccess(result); + } + catch (Exception e) + { + resultPromise.setFailure(e); + } + }); + } + + /** + * Determines if this keyspace should use mutation tracking incremental repair. + * Returns true if: + * - Keyspace uses mutation tracking replication, OR + * - Keyspace is currently migrating (either direction) + */ + public static boolean shouldUseMutationTrackingRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null); + if (ksm == null) + return false; + + // Check if keyspace uses mutation tracking + if (ksm.useMutationTracking()) + return true; + + // Check if keyspace is in migration (either direction) + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } + + /** + * Determines if we also need to run traditional repair. + * Returns true during migration: + * - Migrating TO mutation tracking: need traditional repair to sync pre-migration data + * - Migrating FROM mutation tracking: need traditional repair for post-migration consistency + */ + public static boolean requiresTraditionalRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } +} diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index b511d081c984..55274dd7b996 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -503,7 +503,15 @@ private Future>> repair(String[] } else if (state.options.isIncremental()) { - task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + // For keyspaces using mutation tracking, use the mutation tracking repair task + if (MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(state.keyspace)) + { + task = new MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } + else + { + task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } } else { diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index abd23dc3f218..6246a7286859 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.cassandra.db.TypeSizes; @@ -51,6 +52,11 @@ boolean isEmpty() return replicatedOffsets.isEmpty(); } + public List getOffsets() + { + return Collections.unmodifiableList(replicatedOffsets); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 56180c25945a..50ec6c24d8b2 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets() } } + /** + * Returns the UNION of all witnessed offsets from all participants. + * This represents all offsets that ANY replica has witnessed. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets() + { + lock.readLock().lock(); + try + { + Offsets.Mutable union = witnessedOffsets.union(); + return union.isEmpty() ? null : Offsets.Immutable.copy(union); + } + finally + { + lock.readLock().unlock(); + } + } + public long getUnreconciledCount() { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 18394bdf5390..1ed2a1c8237e 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -131,6 +131,8 @@ public class MutationTrackingService private final IncomingMutations incomingMutations = new IncomingMutations(); private final OutgoingMutations outgoingMutations = new OutgoingMutations(); + private final Map> syncCoordinatorsByKeyspace = new ConcurrentHashMap<>(); + private volatile boolean started = false; private MutationTrackingService() @@ -310,6 +312,19 @@ public void updateReplicatedOffsets(String keyspace, Range range, List coordinators = syncCoordinatorsByKeyspace.get(keyspace); + if (coordinators != null) + { + for (MutationTrackingSyncCoordinator coordinator : coordinators) + { + if (range.intersects(coordinator.getRange())) + { + coordinator.onOffsetsReceived(onHost); + } + } + } } public void recordFullyReconciledOffsets(ReconciledLogSnapshot reconciledSnapshot) @@ -367,6 +382,30 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta return incomingMutations.subscribe(mutationId, callback); } + /** + * Register a sync coordinator to be notified when offset updates arrive. + */ + public void registerSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + syncCoordinatorsByKeyspace.computeIfAbsent(coordinator.getKeyspace(), k -> ConcurrentHashMap.newKeySet()) + .add(coordinator); + } + + /** + * Unregister a sync coordinator. + */ + public void unregisterSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + Set coordinators = syncCoordinatorsByKeyspace.get(coordinator.getKeyspace()); + if (coordinators != null) + { + coordinators.remove(coordinator); + + if (coordinators.isEmpty()) + syncCoordinatorsByKeyspace.remove(coordinator.getKeyspace(), coordinators); + } + } + public void executeTransfers(String keyspace, Set sstables, ConsistencyLevel cl) { shardLock.readLock().lock(); @@ -495,6 +534,21 @@ public Iterable getShards() return shards; } + public void forEachShardInKeyspace(String keyspace, Consumer consumer) + { + shardLock.readLock().lock(); + try + { + KeyspaceShards ksShards = keyspaceShards.get(keyspace); + if (ksShards != null) + ksShards.forEachShard(consumer); + } + finally + { + shardLock.readLock().unlock(); + } + } + public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into) { shardLock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java new file mode 100644 index 000000000000..0dd3935aead5 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private static final long EMPTY_TARGETS_TIMEOUT_MS = 3000; + // Must be >= TRANSIENT_BROADCAST_INTERVAL_MILLIS (200ms) + network buffer + // to ensure we receive at least one fresh broadcast from each participant + private static final long MIN_BROADCAST_WAIT_MS = 300; + private static final long PARTICIPANT_TIMEOUT_MS = 10000; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + private final String keyspace; + private final Range range; + private final AsyncPromise completionFuture = new AsyncPromise<>(); + private volatile long startTimeMs; + + // Per-shard state: tracks what each node has reported for that shard + private final Map, ShardSyncState> shardStates = new HashMap<>(); + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean completed = new AtomicBoolean(false); + + private final Set allParticipants = new HashSet<>(); + private final Set reportedParticipants = ConcurrentHashMap.newKeySet(); + + public MutationTrackingSyncCoordinator(String keyspace, Range range) + { + this.keyspace = keyspace; + this.range = range; + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + startTimeMs = System.currentTimeMillis(); + + List overlappingShards; + + overlappingShards = new ArrayList<>(); + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + if (overlappingShards.isEmpty()) + { + completionFuture.setSuccess(null); + return; + } + + InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + for (Shard shard : overlappingShards) + { + allParticipants.addAll(shard.remoteReplicas()); + allParticipants.add(localAddress); + + ShardSyncState state = new ShardSyncState(shard); + shardStates.put(shard.range, state); + } + + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + + // Mark self as reported and capture local targets + reportedParticipants.add(localAddress); + recaptureTargets(); + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards, waiting for {} participants", + keyspace, range, overlappingShards.size(), allParticipants.size()); + + // Check if we're the only participant and already complete + checkIfReadyToComplete(); + + // Schedule a delayed check for the empty targets timeout case + scheduler.schedule(this::checkIfReadyToComplete, EMPTY_TARGETS_TIMEOUT_MS + 100, TimeUnit.MILLISECONDS); + } + + private void complete() + { + if (!completed.compareAndSet(false, true)) + return; + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setSuccess(null); + } + + private boolean checkIfComplete() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.isComplete()) + return false; + } + return true; + } + + private void recaptureTargets() + { + if (checkForTopologyChange()) + return; + + for (ShardSyncState state : shardStates.values()) + { + state.captureTargets(); + } + } + + /** + * Checks if any of the shards we're tracking have changed due to topology updates. + * @return true if topology changed (and repair was failed), false if all shards are still current + */ + private boolean checkForTopologyChange() + { + for (ShardSyncState state : shardStates.values()) + { + Shard currentShard = getCurrentShard(state.shard.range); + if (currentShard != state.shard) + { + failWithTopologyChange(); + return true; + } + } + return false; + } + + private Shard getCurrentShard(Range shardRange) + { + Shard[] result = new Shard[1]; + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.equals(shardRange)) + result[0] = shard; + }); + return result[0]; + } + + private void failWithTopologyChange() + { + if (completed.compareAndSet(false, true)) + { + logger.warn("Sync coordinator for keyspace {} range {} failed due to topology change", + keyspace, range); + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Repair failed: topology changed during sync")); + } + } + + /** + * Check if we're ready to complete. We can complete when: + * 1. All participants have reported their offsets AND all targets are reconciled, OR + * 2. No targets have been discovered after the timeout (no data to sync anywhere) + */ + private void checkIfReadyToComplete() + { + if (completed.get() || checkForTopologyChange()) + return; + + long elapsedMs = System.currentTimeMillis() - startTimeMs; + + // Handle the empty targets + if (hasNoTargets() && elapsedMs > EMPTY_TARGETS_TIMEOUT_MS) + { + logger.info("Sync coordinator completed for keyspace {} range {} - no targets discovered after {}ms", + keyspace, range, EMPTY_TARGETS_TIMEOUT_MS); + complete(); + return; + } + + // Ensuring we have waited long enough for fresh broadcasts from all replicas - happens-before situation + if (elapsedMs < MIN_BROADCAST_WAIT_MS) + { + long remainingMs = MIN_BROADCAST_WAIT_MS - elapsedMs + 10; + logger.trace("Sync coordinator waiting for broadcast cycle. Elapsed: {}ms, Required: {}ms", + elapsedMs, MIN_BROADCAST_WAIT_MS); + scheduler.schedule(this::checkIfReadyToComplete, remainingMs, TimeUnit.MILLISECONDS); + return; + } + + // Wait until all participants have reported or timeout + if (!reportedParticipants.containsAll(allParticipants)) + { + if (elapsedMs < PARTICIPANT_TIMEOUT_MS) + { + logger.trace("Sync coordinator waiting for participants. Reported: {}, All: {}", + reportedParticipants.size(), allParticipants.size()); + // Schedule a retry to check again after timeout + long remainingMs = PARTICIPANT_TIMEOUT_MS - elapsedMs + 100; + scheduler.schedule(this::checkIfReadyToComplete, remainingMs, TimeUnit.MILLISECONDS); + return; + } + + Set missing = new HashSet<>(allParticipants); + missing.removeAll(reportedParticipants); + logger.warn("Sync coordinator timed out waiting for participants: {}. Proceeding with available offsets.", + missing); + } + + // All participants have reported (or timed out), check if targets are reconciled + if (checkIfComplete()) + { + logger.info("Sync coordinator completed for keyspace {} range {}", keyspace, range); + complete(); + } + else if (elapsedMs >= PARTICIPANT_TIMEOUT_MS) + { + // Participant timeout reached but targets not reconciled - complete anyway + logger.warn("Sync coordinator completing for keyspace {} range {} after timeout, some targets may not be reconciled", + keyspace, range); + complete(); + } + } + + private boolean hasNoTargets() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.targets.isEmpty()) + return false; + } + return true; + } + + /** + * Called when offset updates are received from a participant. + * @param from The participant that sent the offsets + */ + public void onOffsetsReceived(InetAddressAndPort from) + { + if (completed.get()) + return; + + boolean newParticipant = reportedParticipants.add(from); + + if (newParticipant) + { + logger.trace("Sync coordinator received offsets from new participant {}. Reported: {}/{}", + from, reportedParticipants.size(), allParticipants.size()); + } + + recaptureTargets(); // Recapture targets to include any new coordinator logs + + checkIfReadyToComplete(); + } + + public String getKeyspace() + { + return keyspace; + } + + public Range getRange() + { + return range; + } + + /** + * Blocks until sync completes or timeout is reached. + * + * @param timeout Maximum time to wait + * @param unit Time unit + * @return true if completed, false if timed out + */ + public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException + { + try + { + completionFuture.get(timeout, unit); + return true; + } + catch (java.util.concurrent.TimeoutException e) + { + return false; + } + catch (java.util.concurrent.ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + } + + public void cancel() + { + if (completed.compareAndSet(false, true)) + { + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Sync cancelled")); + } + } + + /** + * Tracks sync state for a single shard. + */ + private static class ShardSyncState + { + private final Shard shard; + + // Target offsets: LogId -> the offsets we're waiting for all nodes to have + private final Map targets = new ConcurrentHashMap<>(); + + ShardSyncState(Shard shard) + { + this.shard = shard; + } + + void captureTargets() + { + Map unionOffsets = shard.collectUnionOfWitnessedOffsetsPerLog(); + targets.putAll(unionOffsets); + } + + boolean isComplete() + { + Map currentReconciled = shard.collectReconciledOffsetsPerLog(); + + for (Map.Entry entry : targets.entrySet()) + { + CoordinatorLogId logId = entry.getKey(); + Offsets.Immutable target = entry.getValue(); + + Offsets.Immutable reconciled = currentReconciled.get(logId); + if (reconciled == null) + return false; + + // Check if reconciled contains all offsets in target + if (!containsAll(reconciled, target)) + return false; + } + return true; + } + + private boolean containsAll(Offsets reconciled, Offsets target) + { + for (ShortMutationId id : target) + { + if (!reconciled.contains(id.offset())) + return false; + } + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java index ac6fcc0dafae..8d943feb1050 100644 --- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java @@ -73,6 +73,19 @@ Offsets.Mutable intersection() return intersection; } + Offsets.Mutable union() + { + Iterator iter = offsetsMap.values().iterator(); + if (offsetsMap.size() == 1) + return Offsets.Mutable.copy(iter.next()); + + Offsets.Mutable union = Offsets.Mutable.copy(iter.next()); + while (iter.hasNext()) + union.addAll(iter.next()); + + return union; + } + public void add(int node, Offsets offsets) { Offsets.Mutable current = offsetsMap.get(node); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 5f7f1e7ee641..f164d3d4e678 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -406,6 +408,38 @@ void collectShardReconciledOffsetsToBuilder(ReconciledKeyspaceOffsets.Builder ke logs.values().forEach(log -> keyspaceBuilder.put(log.logId, log.collectReconciledOffsets(), range)); } + /** + * Returns the reconciled offsets for each coordinator log in this shard. + * Reconciled offsets are the intersection of what all participants have. + */ + public Map collectReconciledOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable reconciled = log.collectReconciledOffsets(); + if (reconciled != null && !reconciled.isEmpty()) + result.put(log.logId, reconciled); + } + return result; + } + + /** + * Returns the UNION of witnessed offsets from all participants for each coordinator log. + * Union = all offsets that ANY replica has witnessed. + */ + public Map collectUnionOfWitnessedOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable union = log.collectUnionOfWitnessedOffsets(); + if (union != null && !union.isEmpty()) + result.put(log.logId, union); + } + return result; + } + public DebugInfo getDebugInfo() { SortedMap logDebugState = new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java new file mode 100644 index 000000000000..5383fea0d79e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask; + +import static org.junit.Assert.*; + +/** + * Tests for MutationTrackingIncrementalRepairTask. + * Tests the decision logic for when to use mutation tracking repair. + * + * Uses a shared cluster across all tests to minimize overhead. + */ +public class MutationTrackingIncrementalRepairTaskTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + private static final AtomicInteger ksCounter = new AtomicInteger(); + + @BeforeClass + public static void setupCluster() throws IOException + { + CLUSTER = Cluster.build() + .withNodes(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set("mutation_tracking_enabled", true)) + .start(); + } + + @AfterClass + public static void teardownCluster() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + private static String nextKsName() + { + return "mtirt_ks" + ksCounter.incrementAndGet(); + } + + @Test + public void testShouldUseMutationTrackingRepairForTrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUse); + } + + @Test + public void testShouldNotUseMutationTrackingRepairForUntrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUse); + } + + @Test + public void testRequiresTraditionalRepairReturnsFalseForNonMigratingKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean requiresTraditional = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + + assertFalse("Non-migrating keyspace should not require traditional repair", requiresTraditional); + } + + @Test + public void testShouldUseMutationTrackingRepairForNonexistentKeyspace() throws Throwable + { + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair("nonexistent_ks_xyz")); + + assertFalse("Nonexistent keyspace should return false", shouldUse); + } + + @Test + public void testMigrationFromUntrackedToTracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating keyspace should not require traditional repair", requiresBefore); + + // Trigger migration by altering to tracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + // Verify migration state - both methods should now return true + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Migrating keyspace should use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Migrating keyspace should require traditional repair", requiresAfter); + } + + @Test + public void testMigrationFromTrackedToUntracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating tracked keyspace should not require traditional repair", requiresBefore); + + // Migrate back to untracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + // During reverse migration, both should still apply + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Keyspace migrating from tracked should still use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Keyspace migrating from tracked should require traditional repair", requiresAfter); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java new file mode 100644 index 000000000000..9ab6cb8113ab --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.awaitility.Awaitility; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(pause)); + } + + private static Range fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, ""); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "3"); + + // Block all messages FROM node 1 to prevent write replication + // This ensures that write only succeeds locally on node 1 + cluster.filters().allVerbs().from(1).drop(); + + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", + ConsistencyLevel.ONE + ); + + // Start MutationTrackingSyncCoordinator on node 2 in a separate thread + // It should wait for offsets to sync since node 1's data hasn't propagated yet + long syncStartTime = System.currentTimeMillis(); + CompletableFuture coordinatorFuture = CompletableFuture.supplyAsync(() -> cluster.get(2).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '3', fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + })); + + // Wait until node 1 has the data + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(1).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node 1 should have the data", 1, results.length); + }); + + // Verify other nodes shouldn't have the data yet since we have blocked messages + for (int i = 2; i <= 3; i++) + { + Object[][] results = cluster.get(i).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1" + ); + assertEquals("Node " + i + " should not have data yet", 0, results.length); + } + + // Verify coordinator stays blocked for at least 2 seconds + Awaitility.await() + .during(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(3)) + .until(() -> !coordinatorFuture.isDone()); + + cluster.filters().reset(); + + for (int i = 1; i <= 3; i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + // Wait for coordinator to complete + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofMillis(200)) + .until(coordinatorFuture::isDone); + + assertTrue("Coordinator should complete successfully", coordinatorFuture.get()); + + // Verify data propagated to all replicas + for (int i = 1; i <= 3; i++) + { + final int nodeId = i; + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(nodeId).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node " + nodeId + " should have the data", 1, results.length); + assertEquals(1, results[0][0]); + assertEquals(1, results[0][1]); + }); + } + + // Verify the sync respected the minimum broadcast wait time (MIN_BROADCAST_WAIT_MS = 300ms) + long syncDuration = System.currentTimeMillis() - syncStartTime; + assertTrue("Sync should wait at least MIN_BROADCAST_WAIT_MS (300ms). Actual: " + syncDuration + "ms", + syncDuration >= 300); + } + } + + @Test + public void testSyncCoordinatorCancel() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "4"); + + // Pause offset broadcasts on all nodes to prevent sync from completing + pauseOffsetBroadcasts(cluster, true); + + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("4") + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ONE, i, i); + } + + // Start coordinator - it will be stuck waiting for offsets + Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '4', fullTokenRange()); + coordinator.start(); + + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + + coordinator.cancel(); // Cancel it + + // Verify it was cancelled + try + { + coordinator.awaitCompletion(1, TimeUnit.SECONDS); + return false; // Should have thrown + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + catch (RuntimeException e) + { + return e.getMessage() != null && e.getMessage().contains("cancelled"); + } + }); + assertTrue("Sync coordinator should be cancelled", wasCancelled); + } + } + + @Test + public void testSyncCoordinatorTimesOutOnUnresponsiveParticipant() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "5"); + + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("5") + " (k, v) VALUES (1, 1)", + ConsistencyLevel.ALL + ); + + // Pause broadcasts on node 3 BEFORE any broadcasts - this ensures node 3 + // never sends any offset broadcasts to the coordinator, simulating an unresponsive node + cluster.get(3).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(true)); + + // Broadcast from all nodes - node 3's broadcast is a no-op because it's paused + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + // Also block messages FROM node 3 to ensure even if periodic broadcasts resume, + // they won't reach the coordinator + cluster.filters().allVerbs().from(3).drop(); + + long syncStartTime = System.currentTimeMillis(); + + // Start sync coordinator on node 1 - it should time out waiting for node 3 + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator( + KS_NAME + '5', fullTokenRange()); + coordinator.start(); + + try + { + // Wait longer than PARTICIPANT_TIMEOUT_MS (10s) + buffer + return coordinator.awaitCompletion(20, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + long syncDuration = System.currentTimeMillis() - syncStartTime; + + assertTrue("Sync coordinator should complete after timeout", completed); + // Should have taken at least PARTICIPANT_TIMEOUT_MS (10s) + assertTrue("Sync should have timed out waiting for participant. Actual: " + syncDuration + "ms", + syncDuration >= 10000); + } + } +}