-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CEP-45: Incremental Repair Blocking Wait for offsets #4569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
aparna0522
wants to merge
8
commits into
apache:cep-45-mutation-tracking
Choose a base branch
from
aparna0522:incremental-repair-for-MT
base: cep-45-mutation-tracking
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
3abbb58
CEP-45: Incremental Repair Blocking Wait for offsets
aparnanaik0522 65471d8
end--no-edit
aparnanaik0522 8408e13
Create new test to validate inc repair on ALL replicas
aparnanaik0522 619fe4a
SyncCoordinatorTest file fix
aparnanaik0522 86ad36f
Change shardStates from CHM -> HM
aparnanaik0522 ce56492
Fix possible shard staleness
aparnanaik0522 ff1bf5f
Fix for happens-before
aparnanaik0522 a94886d
Fix MutationTrackingIncrementalRepairTask file
aparnanaik0522 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
214 changes: 214 additions & 0 deletions
214
src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.cassandra.repair; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import org.apache.cassandra.concurrent.ExecutorPlus; | ||
| import org.apache.cassandra.config.CassandraRelevantProperties; | ||
| import org.apache.cassandra.dht.Range; | ||
| import org.apache.cassandra.dht.Token; | ||
| import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; | ||
| import org.apache.cassandra.schema.KeyspaceMetadata; | ||
| import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; | ||
| import org.apache.cassandra.tcm.ClusterMetadata; | ||
| import org.apache.cassandra.utils.TimeUUID; | ||
| import org.apache.cassandra.utils.concurrent.AsyncPromise; | ||
| import org.apache.cassandra.utils.concurrent.Future; | ||
|
|
||
| /** Incremental repair task for keyspaces using mutation tracking */ | ||
| public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask | ||
| { | ||
| private static final long SYNC_TIMEOUT_MINUTES = CassandraRelevantProperties.REPAIR_SYNC_TIMEOUT_MINUTES.getLong(); | ||
|
|
||
| private final TimeUUID parentSession; | ||
| private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; | ||
| private final String[] cfnames; | ||
|
|
||
| protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, | ||
| TimeUUID parentSession, | ||
| RepairCoordinator.NeighborsAndRanges neighborsAndRanges, | ||
| String[] cfnames) | ||
| { | ||
| super(coordinator); | ||
| this.parentSession = parentSession; | ||
| this.neighborsAndRanges = neighborsAndRanges; | ||
| this.cfnames = cfnames; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() | ||
| { | ||
| return "MutationTrackingIncrementalRepair"; | ||
| } | ||
|
|
||
| @Override | ||
| public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) | ||
| { | ||
| List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); | ||
|
|
||
| if (allRanges.isEmpty()) | ||
| { | ||
| logger.info("No common ranges to repair for keyspace {}", keyspace); | ||
| return new AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); | ||
| } | ||
|
|
||
| List<MutationTrackingSyncCoordinator> syncCoordinators = new ArrayList<>(); | ||
| List<Collection<Range<Token>>> rangeCollections = new ArrayList<>(); | ||
|
|
||
| for (CommonRange commonRange : allRanges) | ||
| { | ||
| for (Range<Token> range : commonRange.ranges) | ||
| { | ||
| MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range); | ||
| syncCoordinator.start(); | ||
| syncCoordinators.add(syncCoordinator); | ||
| rangeCollections.add(List.of(range)); | ||
|
|
||
| logger.info("Started mutation tracking sync for range {}", range); | ||
| } | ||
| } | ||
|
|
||
| coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); | ||
|
|
||
| AsyncPromise<CoordinatedRepairResult> resultPromise = new AsyncPromise<>(); | ||
|
|
||
| executor.execute(() -> { | ||
| try | ||
| { | ||
| waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise); | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| logger.error("Error during mutation tracking repair", e); | ||
| resultPromise.tryFailure(e); | ||
| } | ||
| }); | ||
|
|
||
| return resultPromise; | ||
| } | ||
|
|
||
| private void waitForSyncCompletion(List<MutationTrackingSyncCoordinator> syncCoordinators, | ||
| ExecutorPlus executor, | ||
| Scheduler validationScheduler, | ||
| List<CommonRange> allRanges, | ||
| List<Collection<Range<Token>>> rangeCollections, | ||
| AsyncPromise<CoordinatedRepairResult> resultPromise) throws InterruptedException | ||
| { | ||
| boolean allSucceeded = true; | ||
| for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) | ||
| { | ||
| boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES); | ||
| if (!completed) | ||
| { | ||
| logger.warn("Mutation tracking sync timed out for keyspace {} range {}", | ||
| keyspace, syncCoordinator.getRange()); | ||
| syncCoordinator.cancel(); | ||
| allSucceeded = false; | ||
| } | ||
| } | ||
|
|
||
| if (!allSucceeded) | ||
| { | ||
| resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges")); | ||
| return; | ||
| } | ||
|
|
||
| coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); | ||
|
|
||
| if (requiresTraditionalRepair(keyspace)) | ||
| { | ||
| runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise); | ||
| } | ||
| else | ||
| { | ||
| // Pure mutation tracking - create successful result | ||
| List<RepairSessionResult> results = new ArrayList<>(); | ||
| for (int i = 0; i < rangeCollections.size(); i++) | ||
| { | ||
| Collection<Range<Token>> ranges = rangeCollections.get(i); | ||
| results.add(new RepairSessionResult(parentSession, keyspace, ranges, List.of(), false)); | ||
| } | ||
| resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, results)); | ||
| } | ||
| } | ||
|
|
||
| private void runTraditionalRepairForMigration(ExecutorPlus executor, | ||
| Scheduler validationScheduler, | ||
| List<CommonRange> allRanges, | ||
| AsyncPromise<CoordinatedRepairResult> resultPromise) | ||
| { | ||
| coordinator.notifyProgress("Running traditional repair for migration"); | ||
|
|
||
| // Use the inherited runRepair method from AbstractRepairTask | ||
| Future<CoordinatedRepairResult> traditionalRepair = runRepair(parentSession, true, executor, | ||
| validationScheduler, allRanges, | ||
| neighborsAndRanges.shouldExcludeDeadParticipants, | ||
| cfnames); | ||
|
|
||
| traditionalRepair.addListener(f -> { | ||
| try | ||
| { | ||
| CoordinatedRepairResult result = (CoordinatedRepairResult) f.get(); | ||
| resultPromise.setSuccess(result); | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| resultPromise.setFailure(e); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Determines if this keyspace should use mutation tracking incremental repair. | ||
| * Returns true if: | ||
| * - Keyspace uses mutation tracking replication, OR | ||
| * - Keyspace is currently migrating (either direction) | ||
| */ | ||
| public static boolean shouldUseMutationTrackingRepair(String keyspace) | ||
| { | ||
| ClusterMetadata metadata = ClusterMetadata.current(); | ||
| KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null); | ||
| if (ksm == null) | ||
| return false; | ||
|
|
||
| // Check if keyspace uses mutation tracking | ||
| if (ksm.useMutationTracking()) | ||
| return true; | ||
|
|
||
| // Check if keyspace is in migration (either direction) | ||
| KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); | ||
| return migrationInfo != null; | ||
| } | ||
|
|
||
| /** | ||
| * Determines if we also need to run traditional repair. | ||
| * Returns true during migration: | ||
| * - Migrating TO mutation tracking: need traditional repair to sync pre-migration data | ||
| * - Migrating FROM mutation tracking: need traditional repair for post-migration consistency | ||
| */ | ||
| public static boolean requiresTraditionalRepair(String keyspace) | ||
| { | ||
| ClusterMetadata metadata = ClusterMetadata.current(); | ||
| KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); | ||
| return migrationInfo != null; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is operating at the MutationTrackingSyncCoordinator which is just one node's view of the offsets from the coordinators that can be arbitrarily behind or incomplete.
To collect the offsets I think the best thing to do is contact every replica of the shard and ask them for all the coordinators they know about and what the maximum offset seen was and union the result of that.
@bdeggleston do I have this right? I don't think the repair will have happens before relationship for acknowledged writes without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's right. We'd need to either proactively contact each replica, or just listen to what offsets the other nodes are broadcasting starting after the IR starts.
You'd need to timeout if you don't get offsets from a participant after some amount of time too. I think this might be a bit easier by listening to incoming offsets and waiting on a future that times out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with Blake: We are currently planning to go with just listening to what offsets the other nodes are broadcasting starting after the IR starts, i.e. broadcasts that happen every 200ms. We might change our approach if it turns out to be an issue later.
Also, as Blake stated:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is the plan to wait until the broadcast arrives? If it doesn't arrive we won't move forward? It would be a correctness issue to move forward without receiving the broadcast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm, yea for that, I have added a timeout of 10s - if we don't receive the broadcast within that time we continue towards completing the repair. This is a buffer for about 50 broadcast cycles - do you think this is sufficient?
Do you think we should not move forward instead of completing the repair?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spotted another wrinkle. Network message can be delayed arbitrarily. So you can wait 300 milliseconds, but when you receive the broadcast you don't know what is included in that broadcast. Both the message and its contents can be arbitrarily delayed because what you don't know is when it was actually collected from the data structures at the node sending the message.
In a request response scenario you know when it was sent (some time after the request). The broadcast message could include the current time, but we don't have hard guarantees on clock sync and in simulator we actively bring clocks out of sync and this would probably cause visible failures.
You can do the "wait on broadcast" thing, but it has to include something from sync coordinator node in the broadcast message that shows the sync coordinator that the information was collected after the sync coordinator started.