diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index cf3bf8d4..b38e7fce 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -79,6 +79,8 @@ readonly ClientSession this.blobManager.TraceHelper; public int PageSizeBits => this.storelogsettings.PageSizeBits; @@ -492,33 +494,42 @@ long MinimalLogSize } } - public override long? GetCompactionTarget() + public override bool CompactionIsDue(out long target) { // TODO empiric validation of the heuristics var stats = (StatsState) this.singletons[(int)TrackedObjectKey.Stats.ObjectType]; long actualLogSize = this.fht.Log.TailAddress - this.fht.Log.BeginAddress; long minimalLogSize = this.MinimalLogSize; - long compactionAreaSize = Math.Min(50000, this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress); + long compactionAreaSize = Math.Min(200000, this.fht.Log.SafeReadOnlyAddress - this.fht.Log.BeginAddress); if (actualLogSize > 2 * minimalLogSize // there must be significant bloat - && compactionAreaSize >= 5000) // and enough compaction area to justify the overhead + && compactionAreaSize >= 10000) // and enough compaction area to justify the overhead { - return this.fht.Log.BeginAddress + compactionAreaSize; + target = this.fht.Log.BeginAddress + compactionAreaSize; + return true; } else { - this.TraceHelper.FasterCompactionProgress( - FasterTraceHelper.CompactionProgress.Skipped, - "", - this.Log.BeginAddress, - this.Log.SafeReadOnlyAddress, - this.Log.TailAddress, - minimalLogSize, - compactionAreaSize, - this.GetElapsedCompactionMilliseconds()); - return null; + // trace the skipped compaction + // but this method is called quite frequently, so limit the traces to once per minute + if (this.lastTraceForSkippedCompaction + TimeSpan.FromMinutes(1) < DateTime.UtcNow) + { + this.lastTraceForSkippedCompaction = DateTime.UtcNow; + this.TraceHelper.FasterCompactionProgress( + FasterTraceHelper.CompactionProgress.Skipped, + "", + this.Log.BeginAddress, + this.Log.SafeReadOnlyAddress, + this.Log.TailAddress, + minimalLogSize, + compactionAreaSize, + this.GetElapsedCompactionMilliseconds()); + } + + target = 0; + return false; } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index 99bb2f71..9271d6a9 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -6,6 +6,7 @@ namespace DurableTask.Netherite.Faster using DurableTask.Netherite.Scaling; using System; using System.Collections.Generic; + using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -362,23 +363,23 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) { trigger = CheckpointTrigger.EventCount; } + else if (this.store.CompactionIsDue(out long compactionTarget)) + { + compactUntil = compactionTarget; + trigger = CheckpointTrigger.Compaction; + } else if (this.loadInfo.IsBusy() == null && DateTime.UtcNow > this.timeOfNextIdleCheckpoint) { // we have reached an idle point. this.ScheduleNextIdleCheckpointTime(); - - compactUntil = this.store.GetCompactionTarget(); - if (compactUntil.HasValue) - { - trigger = CheckpointTrigger.Compaction; - } - else if (this.numberEventsSinceLastCheckpoint > 0 || inputQueuePositionLag > 0) + + if (this.numberEventsSinceLastCheckpoint > 0 || inputQueuePositionLag > 0) { // we checkpoint even though not much has happened trigger = CheckpointTrigger.Idle; } } - + return trigger != CheckpointTrigger.None; } @@ -431,7 +432,7 @@ void StartCheckpointOrFailOnTimeout(Func checkpointRoutine, string message } } - async ValueTask RunCheckpointingStateMachine() + async ValueTask RunCheckpointingStateMachine() { // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) if (this.pendingStoreCheckpoint != null) @@ -513,6 +514,13 @@ async ValueTask RunCheckpointingStateMachine() this.pendingCompaction = this.RunCompactionAsync(compactUntil); } + else + { + // there are no checkpoint or compaction operations in progress or due + return false; + } + + return true; // there is a checkpoint or compaction operation in progress. } protected override async Task Process(IList batch) @@ -573,12 +581,6 @@ protected override async Task Process(IList batch) markPartitionAsActive = markPartitionAsActive || partitionEvent.CountsAsPartitionActivity; } - // if we are processing events that count as activity, our latency category is at least "low" - if (markPartitionAsActive) - { - this.loadInfo.MarkActive(); - } - if (this.isShuttingDown || this.cancellationToken.IsCancellationRequested) { return; @@ -587,8 +589,17 @@ protected override async Task Process(IList batch) this.store.AdjustCacheSize(); // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) - await this.RunCheckpointingStateMachine(); - + bool checkpointOrCompactionInProgress = await this.RunCheckpointingStateMachine(); + + // if a checkpoint or compaction is in progress, our latency category is at least "low" + markPartitionAsActive = markPartitionAsActive || checkpointOrCompactionInProgress; + + if (markPartitionAsActive) + { + // mark this partition as having activity; this shows up in the partition table, and influences the the scale controller + this.loadInfo.MarkActive(); + } + // periodically publish the partition load information and the send/receive positions // also report checkpointing stats if (this.lastPublished + PublishInterval < DateTime.UtcNow) @@ -671,8 +682,22 @@ protected override async Task Process(IList batch) { if (target.HasValue) { + Stopwatch stopWatch = Stopwatch.StartNew(); + target = await this.store.RunCompactionAsync(target.Value); + this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler); + + // we mark the latency of the compaction in the partition table so that the scale controller + // will scale out if needed to handle the compaction load + if (stopWatch.Elapsed.TotalSeconds > 5) + { + this.loadInfo.MarkHighLatency(); + } + else if (stopWatch.Elapsed.TotalSeconds > 1) + { + this.loadInfo.MarkMediumLatency(); + } } this.Notify(); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/TrackedObjectStore.cs b/src/DurableTask.Netherite/StorageLayer/Faster/TrackedObjectStore.cs index 02e842c2..d6876dd3 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/TrackedObjectStore.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/TrackedObjectStore.cs @@ -37,7 +37,7 @@ abstract class TrackedObjectStore public abstract Task FinalizeCheckpointCompletedAsync(Guid guid); - public abstract long? GetCompactionTarget(); + public abstract bool CompactionIsDue(out long target); public abstract Task RunCompactionAsync(long target);