diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs index d465d848..d3173a46 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProviderFactory.cs @@ -116,10 +116,18 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault; this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault; - // copy all applicable fields from both the options and the storageProvider options - JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings); - JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options.StorageProvider), netheriteSettings); - + // collect settings from the two places that we want to import into the Netherite settings + string durableExtensionSettings = JsonConvert.SerializeObject(this.options); + string storageProviderSettings = JsonConvert.SerializeObject(this.options.StorageProvider); + + // copy all applicable settings into the Netherite settings, based on matching the names + JsonConvert.PopulateObject(durableExtensionSettings, netheriteSettings); + JsonConvert.PopulateObject(storageProviderSettings, netheriteSettings); + + // copy extension settings to FASTER tuning parameters, based on matching the names + netheriteSettings.FasterTuningParameters ??= new Faster.BlobManager.FasterTuningParameters(); + JsonConvert.PopulateObject(storageProviderSettings, netheriteSettings.FasterTuningParameters); + // configure the cache size if not already configured netheriteSettings.InstanceCacheSizeMB ??= (this.inConsumption ? 100 : 200 * Environment.ProcessorCount); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs index b963a914..e4b15c8b 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/AzureStorageDevice.cs @@ -62,8 +62,6 @@ struct RemoveRequestInfo const uint MAX_UPLOAD_SIZE = 1024 * 1024; const uint MAX_DOWNLOAD_SIZE = 1024 * 1024; - const long MAX_PAGEBLOB_SIZE = 512L * 1024 * 1024 * 1024; // set this at 512 GB for now TODO consider implications - /// /// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs /// @@ -424,8 +422,8 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti { var pageBlob = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId)); - // If segment size is -1 we use a default - var size = this.segmentSize == -1 ? AzureStorageDevice.MAX_PAGEBLOB_SIZE : this.segmentSize; + // If segment size is -1 we use the default starting size for auto-expanding page blobs. + var size = this.segmentSize == -1 ? this.BlobManager.StartingPageBlobSize : this.segmentSize; // If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement) // After creation is done, we can call write. @@ -469,15 +467,55 @@ await this.BlobManager.PerformWithRetriesAsync( { var client = numAttempts > 2 ? blobEntry.PageBlob.Default : blobEntry.PageBlob.Aggressive; - var response = await client.UploadPagesAsync( - content: stream, - offset: destinationAddress + offset, - transactionalContentHash: null, - conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null, - progressHandler: null, - cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false); + try + { + var response = await client.UploadPagesAsync( + content: stream, + offset: destinationAddress + offset, + transactionalContentHash: null, + conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null, + progressHandler: null, + cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false); + + blobEntry.ETag = response.Value.ETag; + } + catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidPageRange") + { + // this kind of error can indicate that the page blob is too small. + // from the perspective of FASTER, this storage device is infinite, so it may write past the end of the blob. + // To deal with this situation, we dynamically enlarge this device as needed. + + // first, compute desired size to request + long currentSize = (await client.GetPropertiesAsync().ConfigureAwait(false)).Value.ContentLength; + long sizeToRequest = currentSize; + long sizeToAccommodate = destinationAddress + offset + length + 1; + while (sizeToAccommodate > sizeToRequest) + { + sizeToRequest <<= 1; + } - blobEntry.ETag = response.Value.ETag; + if (sizeToRequest <= currentSize) + { + throw e; // blob is already big enough, so this exception was thrown for some other reason + } + else + { + if (sizeToRequest > this.BlobManager.MaxPageBlobSize) + { + throw new InvalidOperationException($"cannot expand page blob {blobEntry.PageBlob.Default.Name} beyond maximum size {this.BlobManager.MaxPageBlobSize}"); + } + + // enlarge the blob to accommodate the size + await client.ResizeAsync( + sizeToRequest, + conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null, + cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false); + + // force retry + // this also generates a warning in the traces, containing the information about what happened + throw new BlobUtils.ForceRetryException($"page blob was enlarged from {currentSize} to {sizeToRequest}", e); + } + } } return (long)length; @@ -500,11 +538,17 @@ async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlob { using (stream) { + // we use this to prevent reading past the end of the page blob + // but for performance reasons (Azure storage access required to determine current size of page blob) + // we set it lazily, i.e. only after a request failed + long? readCap = null; + long offset = 0; while (readLength > 0) { - var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE); - + // determine how much we are going to try to read in this portion + var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE); + await this.BlobManager.PerformWithRetriesAsync( BlobManager.AsynchronousStorageReadMaxConcurrency, true, @@ -516,36 +560,94 @@ await this.BlobManager.PerformWithRetriesAsync( true, async (numAttempts) => { - if (numAttempts > 0) - { - stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying - } + stream.Seek(offset, SeekOrigin.Begin); - if (length > 0) - { - var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive; + long requestedLength = length; - var response = await client.DownloadStreamingAsync( - range: new Azure.HttpRange(sourceAddress + offset, length), - conditions: null, - rangeGetContentHash: false, - cancellationToken: this.PartitionErrorHandler.Token) - .ConfigureAwait(false); + if (readCap.HasValue && sourceAddress + offset + requestedLength > readCap.Value) + { + requestedLength = readCap.Value - (sourceAddress + offset); - using (var streamingResult = response.Value) + if (requestedLength <= 0) { - await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false); + requestedLength = 0; } } - if (stream.Position != offset + length) + if (requestedLength > 0) { - throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}"); + var client = (numAttempts > 1 || requestedLength == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive; + + try + { + var response = await client.DownloadStreamingAsync( + range: new Azure.HttpRange(sourceAddress + offset, requestedLength), + conditions: null, + rangeGetContentHash: false, + cancellationToken: this.PartitionErrorHandler.Token) + .ConfigureAwait(false); + + using (var streamingResult = response.Value) + { + await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false); + } + + // We have observed that we may get 206 (Partial Response) codes where the actual length is less than the requested length + // The Azure storage client SDK handles the http codes transparently, but we may still observe that fewer bytes than + // requested were returned by the streamingResult. + long actualLength = (stream.Position - offset); + + if (actualLength < requestedLength) + { + this.BlobManager.StorageTracer?.FasterStorageProgress($"PageBlob.DownloadStreamingAsync id={id} returned partial response range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength={actualLength}"); + + if (actualLength == 0) + { + throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned empty response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} "); + } + else if (actualLength % 512 != 0) + { + throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned unaligned response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}"); + } + else + { + length = (uint)actualLength; // adjust length to actual length read so the next read will start where this read ended + } + } + else if (actualLength > requestedLength) + { + throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned too much data, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}"); + } + } + catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidRange") + { + // from the perspective of FASTER, this storage device is infinite, so it may read past the end of the blob. + // But even though it requests more data than what it wrote, it will only actually use what it wrote before. + // so we can deal with this situation by just copying fewer bytes from the blob into the buffer. + + // first, determine current page blob size. + var properties = await client.GetPropertiesAsync().ConfigureAwait(false); + readCap = properties.Value.ContentLength; + + if (sourceAddress + offset + requestedLength <= readCap.Value) + { + // page blob is big enough, so this exception was thrown for some other reason + throw e; + } + else + { + // page blob was indeed too small; now that we have set a read cap, force a retry + // so we can read an adjusted portion + throw new BlobUtils.ForceRetryException($"reads now capped at {readCap}", e); + } + } } return length; }); + // adjust how much we have to read, and where to read from, in the next iteration + // based on how much was actually read in this iteration. readLength -= length; offset += length; } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs index e26089e3..24876385 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/AzureBlobs/BlobManager.cs @@ -58,6 +58,9 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager public IDevice HybridLogDevice { get; private set; } public IDevice ObjectLogDevice { get; private set; } + internal long StartingPageBlobSize { get; private set; } + internal long MaxPageBlobSize { get; private set; } + public DateTime IncarnationTimestamp { get; private set; } public string ContainerName { get; } @@ -90,6 +93,8 @@ public class FasterTuningParameters public double? StoreLogMutableFraction; public int? EstimatedAverageObjectSize; public int? NumPagesToPreload; + public int? StartingPageBlobSizeBits; + public int? MaxPageBlobSizeBits; } public FasterLogSettings GetEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters) @@ -333,6 +338,13 @@ public BlobManager( this.CheckpointInfo = new CheckpointInfo(); this.CheckpointInfoETag = default; + // some page blobs have no specific size; for example, they are used for the object log, and for checkpoints. + // by default, we start those at 512 GB (by default) - there is no cost incurred for empty pages so why not. + // if that turns out to not be enough at some point, we enlarge them automatically, + // up to a maximum of 2TB (by default). For maximum for Azure Storage is 8TB. + this.StartingPageBlobSize = 1L << (this.settings.FasterTuningParameters?.StartingPageBlobSizeBits ?? 29 /* 512 GB */); + this.MaxPageBlobSize = 1L << (this.settings.FasterTuningParameters?.MaxPageBlobSizeBits ?? 31 /* 2 TB */); + if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage)) { this.UseLocalFiles = true; diff --git a/src/DurableTask.Netherite/Util/BlobUtils.cs b/src/DurableTask.Netherite/Util/BlobUtils.cs index 6cc94e72..7048b0c7 100644 --- a/src/DurableTask.Netherite/Util/BlobUtils.cs +++ b/src/DurableTask.Netherite/Util/BlobUtils.cs @@ -76,6 +76,11 @@ public static bool IsTransientStorageError(Exception exception) return true; } + if (exception is ForceRetryException) + { + return true; + } + // Empirically observed: timeouts on synchronous calls if (exception.InnerException is TimeoutException) { @@ -126,6 +131,27 @@ static bool httpStatusIndicatesTransientError(int? statusCode) => || statusCode == 504); //504 Gateway Timeout + /// + /// A custom exception class that we use to explicitly force a retry after a transient error. + /// By using an exception we ensure that we stay under the total retry count and generate the proper tracing. + /// + public class ForceRetryException : Exception + { + public ForceRetryException() + { + } + + public ForceRetryException(string message) + : base(message) + { + } + + public ForceRetryException(string message, Exception inner) + : base(message, inner) + { + } + } + // Lease error codes are documented at https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob public static bool LeaseConflictOrExpired(StorageException e) diff --git a/test/PerformanceTests/host.json b/test/PerformanceTests/host.json index dc2cb594..b7cd08fb 100644 --- a/test/PerformanceTests/host.json +++ b/test/PerformanceTests/host.json @@ -94,6 +94,10 @@ // or to "ClientOnly" to run only the client "PartitionManagement": "EventProcessorHost", + // can use the following to test the auto-expansion of page blobs + //"StartingPageBlobSizeBits": 13, // 8 kB + //"MaxPageBlobSizeBits": 22, // 4 GB + // set this to "Local" to disable the global activity distribution algorithm // options: "Local", "Static", "Locavore" "ActivityScheduler": "Locavore",