Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ public enum CassandraRelevantProperties
REPAIR_FAIL_TIMEOUT_SECONDS("cassandra.repair_fail_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.DAYS.toSeconds(1)))),
REPAIR_MUTATION_REPAIR_ROWS_PER_BATCH("cassandra.repair.mutation_repair_rows_per_batch", "100"),
REPAIR_STATUS_CHECK_TIMEOUT_SECONDS("cassandra.repair_status_check_timeout_seconds", convertToString(Ints.checkedCast(TimeUnit.HOURS.toSeconds(1)))),
REPAIR_SYNC_TIMEOUT_MINUTES("cassandra.repair_sync_timeout_minutes", "30"),
/**
* When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known
* but the current state isn't known. If the host replacement is needed to repair this state, this property must
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.replication.MutationTrackingSyncCoordinator;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;

/** Incremental repair task for keyspaces using mutation tracking */
public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask
{
private static final long SYNC_TIMEOUT_MINUTES = CassandraRelevantProperties.REPAIR_SYNC_TIMEOUT_MINUTES.getLong();

private final TimeUUID parentSession;
private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges;
private final String[] cfnames;

protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator,
TimeUUID parentSession,
RepairCoordinator.NeighborsAndRanges neighborsAndRanges,
String[] cfnames)
{
super(coordinator);
this.parentSession = parentSession;
this.neighborsAndRanges = neighborsAndRanges;
this.cfnames = cfnames;
}

@Override
public String name()
{
return "MutationTrackingIncrementalRepair";
}

@Override
public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler)
{
List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);

if (allRanges.isEmpty())
{
logger.info("No common ranges to repair for keyspace {}", keyspace);
return new AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(), List.of()));
}

List<MutationTrackingSyncCoordinator> syncCoordinators = new ArrayList<>();
List<Collection<Range<Token>>> rangeCollections = new ArrayList<>();

for (CommonRange commonRange : allRanges)
{
for (Range<Token> range : commonRange.ranges)
{
MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range);
syncCoordinator.start();
syncCoordinators.add(syncCoordinator);
rangeCollections.add(List.of(range));

logger.info("Started mutation tracking sync for range {}", range);
}
}

coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges");

AsyncPromise<CoordinatedRepairResult> resultPromise = new AsyncPromise<>();

executor.execute(() -> {
try
{
waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise);
}
catch (Exception e)
{
logger.error("Error during mutation tracking repair", e);
resultPromise.tryFailure(e);
}
});

return resultPromise;
}

private void waitForSyncCompletion(List<MutationTrackingSyncCoordinator> syncCoordinators,
ExecutorPlus executor,
Scheduler validationScheduler,
List<CommonRange> allRanges,
List<Collection<Range<Token>>> rangeCollections,
AsyncPromise<CoordinatedRepairResult> resultPromise) throws InterruptedException
{
boolean allSucceeded = true;
for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators)
{
boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!completed)
{
logger.warn("Mutation tracking sync timed out for keyspace {} range {}",
keyspace, syncCoordinator.getRange());
syncCoordinator.cancel();
allSucceeded = false;
}
}

if (!allSucceeded)
{
resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges"));
return;
}

coordinator.notifyProgress("Mutation tracking sync completed for all ranges");

if (requiresTraditionalRepair(keyspace))
{
runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise);
}
else
{
// Pure mutation tracking - create successful result
List<RepairSessionResult> results = new ArrayList<>();
for (int i = 0; i < rangeCollections.size(); i++)
{
Collection<Range<Token>> ranges = rangeCollections.get(i);
results.add(new RepairSessionResult(parentSession, keyspace, ranges, List.of(), false));
}
resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, results));
}
}

private void runTraditionalRepairForMigration(ExecutorPlus executor,
Scheduler validationScheduler,
List<CommonRange> allRanges,
AsyncPromise<CoordinatedRepairResult> resultPromise)
{
coordinator.notifyProgress("Running traditional repair for migration");

// Use the inherited runRepair method from AbstractRepairTask
Future<CoordinatedRepairResult> traditionalRepair = runRepair(parentSession, true, executor,
validationScheduler, allRanges,
neighborsAndRanges.shouldExcludeDeadParticipants,
cfnames);

traditionalRepair.addListener(f -> {
try
{
CoordinatedRepairResult result = (CoordinatedRepairResult) f.get();
resultPromise.setSuccess(result);
}
catch (Exception e)
{
resultPromise.setFailure(e);
}
});
}

/**
* Determines if this keyspace should use mutation tracking incremental repair.
* Returns true if:
* - Keyspace uses mutation tracking replication, OR
* - Keyspace is currently migrating (either direction)
*/
public static boolean shouldUseMutationTrackingRepair(String keyspace)
{
ClusterMetadata metadata = ClusterMetadata.current();
KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null);
if (ksm == null)
return false;

// Check if keyspace uses mutation tracking
if (ksm.useMutationTracking())
return true;

// Check if keyspace is in migration (either direction)
KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace);
return migrationInfo != null;
}

/**
* Determines if we also need to run traditional repair.
* Returns true during migration:
* - Migrating TO mutation tracking: need traditional repair to sync pre-migration data
* - Migrating FROM mutation tracking: need traditional repair for post-migration consistency
*/
public static boolean requiresTraditionalRepair(String keyspace)
{
ClusterMetadata metadata = ClusterMetadata.current();
KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace);
return migrationInfo != null;
}
}
10 changes: 9 additions & 1 deletion src/java/org/apache/cassandra/repair/RepairCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,15 @@ private Future<Pair<CoordinatedRepairResult, Supplier<String>>> repair(String[]
}
else if (state.options.isIncremental())
{
task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames);
// For keyspaces using mutation tracking, use the mutation tracking repair task
if (MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(state.keyspace))
{
task = new MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames);
}
else
{
task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.cassandra.db.TypeSizes;
Expand Down Expand Up @@ -51,6 +52,11 @@ boolean isEmpty()
return replicatedOffsets.isEmpty();
}

public List<Offsets.Immutable> getOffsets()
{
return Collections.unmodifiableList(replicatedOffsets);
}

@Override
public String toString()
{
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/replication/CoordinatorLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets()
}
}

/**
* Returns the UNION of all witnessed offsets from all participants.
* This represents all offsets that ANY replica has witnessed.
*/
Offsets.Immutable collectUnionOfWitnessedOffsets()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is operating at the MutationTrackingSyncCoordinator which is just one node's view of the offsets from the coordinators that can be arbitrarily behind or incomplete.

To collect the offsets I think the best thing to do is contact every replica of the shard and ask them for all the coordinators they know about and what the maximum offset seen was and union the result of that.

@bdeggleston do I have this right? I don't think the repair will have happens before relationship for acknowledged writes without it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's right. We'd need to either proactively contact each replica, or just listen to what offsets the other nodes are broadcasting starting after the IR starts.

You'd need to timeout if you don't get offsets from a participant after some amount of time too. I think this might be a bit easier by listening to incoming offsets and waiting on a future that times out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Blake: We are currently planning to go with just listening to what offsets the other nodes are broadcasting starting after the IR starts, i.e. broadcasts that happen every 200ms. We might change our approach if it turns out to be an issue later.

Also, as Blake stated:

waiting a few seconds for incremental repair to start and a few more for it to complete is totally within the realm of reasonable for a repair process

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the plan to wait until the broadcast arrives? If it doesn't arrive we won't move forward? It would be a correctness issue to move forward without receiving the broadcast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, yea for that, I have added a timeout of 10s - if we don't receive the broadcast within that time we continue towards completing the repair. This is a buffer for about 50 broadcast cycles - do you think this is sufficient?
Do you think we should not move forward instead of completing the repair?

Copy link
Contributor

@aweisberg aweisberg Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotted another wrinkle. Network message can be delayed arbitrarily. So you can wait 300 milliseconds, but when you receive the broadcast you don't know what is included in that broadcast. Both the message and its contents can be arbitrarily delayed because what you don't know is when it was actually collected from the data structures at the node sending the message.

In a request response scenario you know when it was sent (some time after the request). The broadcast message could include the current time, but we don't have hard guarantees on clock sync and in simulator we actively bring clocks out of sync and this would probably cause visible failures.

You can do the "wait on broadcast" thing, but it has to include something from sync coordinator node in the broadcast message that shows the sync coordinator that the information was collected after the sync coordinator started.

{
lock.readLock().lock();
try
{
Offsets.Mutable union = witnessedOffsets.union();
return union.isEmpty() ? null : Offsets.Immutable.copy(union);
}
finally
{
lock.readLock().unlock();
}
}

public long getUnreconciledCount()
{
lock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class MutationTrackingService
private final IncomingMutations incomingMutations = new IncomingMutations();
private final OutgoingMutations outgoingMutations = new OutgoingMutations();

private final Map<String, Set<MutationTrackingSyncCoordinator>> syncCoordinatorsByKeyspace = new ConcurrentHashMap<>();

private volatile boolean started = false;

private MutationTrackingService()
Expand Down Expand Up @@ -310,6 +312,19 @@ public void updateReplicatedOffsets(String keyspace, Range<Token> range, List<?
{
shardLock.readLock().unlock();
}

// Notify any registered sync coordinators about the offset update
Set<MutationTrackingSyncCoordinator> coordinators = syncCoordinatorsByKeyspace.get(keyspace);
if (coordinators != null)
{
for (MutationTrackingSyncCoordinator coordinator : coordinators)
{
if (range.intersects(coordinator.getRange()))
{
coordinator.onOffsetsReceived(onHost);
}
}
}
}

public void recordFullyReconciledOffsets(ReconciledLogSnapshot reconciledSnapshot)
Expand Down Expand Up @@ -367,6 +382,30 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta
return incomingMutations.subscribe(mutationId, callback);
}

/**
* Register a sync coordinator to be notified when offset updates arrive.
*/
public void registerSyncCoordinator(MutationTrackingSyncCoordinator coordinator)
{
syncCoordinatorsByKeyspace.computeIfAbsent(coordinator.getKeyspace(), k -> ConcurrentHashMap.newKeySet())
.add(coordinator);
}

/**
* Unregister a sync coordinator.
*/
public void unregisterSyncCoordinator(MutationTrackingSyncCoordinator coordinator)
{
Set<MutationTrackingSyncCoordinator> coordinators = syncCoordinatorsByKeyspace.get(coordinator.getKeyspace());
if (coordinators != null)
{
coordinators.remove(coordinator);

if (coordinators.isEmpty())
syncCoordinatorsByKeyspace.remove(coordinator.getKeyspace(), coordinators);
}
}

public void executeTransfers(String keyspace, Set<SSTableReader> sstables, ConsistencyLevel cl)
{
shardLock.readLock().lock();
Expand Down Expand Up @@ -495,6 +534,21 @@ public Iterable<Shard> getShards()
return shards;
}

public void forEachShardInKeyspace(String keyspace, Consumer<Shard> consumer)
{
shardLock.readLock().lock();
try
{
KeyspaceShards ksShards = keyspaceShards.get(keyspace);
if (ksShards != null)
ksShards.forEachShard(consumer);
}
finally
{
shardLock.readLock().unlock();
}
}

public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into)
{
shardLock.readLock().lock();
Expand Down
Loading