From 1ce13a364696f7b4ee9366cac3d0764ea6743fcd Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Fri, 30 Jan 2026 07:10:29 +0100 Subject: [PATCH 1/4] (feat): make S3 transfer concurrency, part size, checksum configurable --- s3/client/aws_s3_blobstore.go | 38 +++++++++++++++++++++- s3/client/sdk.go | 4 +++ s3/config/config.go | 51 +++++++++++++++++++++++------- s3/config/config_test.go | 59 +++++++++++++++++++++++++++++++++++ s3/config/endpoints.go | 1 + 5 files changed, 140 insertions(+), 13 deletions(-) diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 854b778..614e24f 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -22,6 +22,14 @@ import ( var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ") var oneTB = int64(1000 * 1024 * 1024 * 1024) +// Default settings for transfer concurrency and part size. +// These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads. +// See: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/s3/manager#Downloader +const ( + defaultTransferConcurrency = 5 + defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB +) + // awsS3Client encapsulates AWS S3 blobstore interactions type awsS3Client struct { s3Client *s3.Client @@ -30,7 +38,21 @@ type awsS3Client struct { // Get fetches a blob, destination will be overwritten if exists func (b *awsS3Client) Get(src string, dest io.WriterAt) error { - downloader := manager.NewDownloader(b.s3Client) + cfg := b.s3cliConfig + + downloader := manager.NewDownloader(b.s3Client, func(d *manager.Downloader) { + if cfg.DownloadConcurrency > 0 { + d.Concurrency = cfg.DownloadConcurrency + } else { + d.Concurrency = defaultTransferConcurrency + } + + if cfg.DownloadPartSize > 0 { + d.PartSize = cfg.DownloadPartSize + } else { + d.PartSize = defaultTransferPartSize + } + }) _, err := downloader.Download(context.TODO(), dest, &s3.GetObjectInput{ Bucket: aws.String(b.s3cliConfig.BucketName), @@ -54,9 +76,23 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { uploader := manager.NewUploader(b.s3Client, func(u *manager.Uploader) { u.LeavePartsOnError = false + if cfg.UploadConcurrency > 0 { + u.Concurrency = cfg.UploadConcurrency + } else { + u.Concurrency = defaultTransferConcurrency + } + + // PartSize: if multipart uploads disabled, force a very large part to avoid multipart. + // Otherwise, use configured upload part size if present, otherwise default. if !cfg.MultipartUpload { // disable multipart uploads by way of large PartSize configuration u.PartSize = oneTB + } else { + if cfg.UploadPartSize > 0 { + u.PartSize = cfg.UploadPartSize + } else { + u.PartSize = defaultTransferPartSize + } } if cfg.ShouldDisableUploaderRequestChecksumCalculation() { diff --git a/s3/client/sdk.go b/s3/client/sdk.go index e61ba69..c67956f 100644 --- a/s3/client/sdk.go +++ b/s3/client/sdk.go @@ -71,6 +71,10 @@ func NewAwsS3ClientWithApiOptions( awsConfig.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired } + if c.ShouldDisableResponseChecksumCalculation() { + awsConfig.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired + } + s3Client := s3.NewFromConfig(awsConfig, func(o *s3.Options) { o.UsePathStyle = !c.HostStyle if endpoint := c.S3Endpoint(); endpoint != "" { diff --git a/s3/config/config.go b/s3/config/config.go index 143cc1b..2241e02 100644 --- a/s3/config/config.go +++ b/s3/config/config.go @@ -27,8 +27,16 @@ type S3Cli struct { HostStyle bool `json:"host_style"` SwiftAuthAccount string `json:"swift_auth_account"` SwiftTempURLKey string `json:"swift_temp_url_key"` - requestChecksumCalculationEnabled bool - uploaderRequestChecksumCalculationEnabled bool + RequestChecksumCalculationEnabled bool + ResponseChecksumCalculationEnabled bool + UploaderRequestChecksumCalculationEnabled bool + // Optional knobs to tune transfer performance. + // If zero, the client will apply sensible defaults (handled by the S3 client layer). + // Part size values are provided in bytes. + DownloadConcurrency int `json:"download_concurrency"` + DownloadPartSize int64 `json:"download_part_size"` + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` } const defaultAWSRegion = "us-east-1" //nolint:unused @@ -67,11 +75,12 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { } c := S3Cli{ - SSLVerifyPeer: true, - UseSSL: true, - MultipartUpload: true, - requestChecksumCalculationEnabled: true, - uploaderRequestChecksumCalculationEnabled: true, + SSLVerifyPeer: true, + UseSSL: true, + MultipartUpload: true, + RequestChecksumCalculationEnabled: true, + ResponseChecksumCalculationEnabled: true, + UploaderRequestChecksumCalculationEnabled: true, } err = json.Unmarshal(bytes, &c) @@ -79,10 +88,16 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, err } + // Validate bucket presence if c.BucketName == "" { return S3Cli{}, errors.New("bucket_name must be set") } + // Validate numeric fields: disallow negative values (zero means "use defaults") + if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 { + return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") + } + switch c.CredentialsSource { case StaticCredentialsSource: if c.AccessKeyID == "" || c.SecretAccessKey == "" { @@ -116,6 +131,8 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { c.configureAlicloud() case "google": c.configureGoogle() + case "gdch": + c.configureGDCH() default: c.configureDefault() } @@ -155,13 +172,19 @@ func (c *S3Cli) configureAlicloud() { if c.Region == "" { c.Region = AlicloudHostToRegion(c.Host) } - c.requestChecksumCalculationEnabled = false - c.uploaderRequestChecksumCalculationEnabled = false + c.RequestChecksumCalculationEnabled = false + c.UploaderRequestChecksumCalculationEnabled = false } func (c *S3Cli) configureGoogle() { c.MultipartUpload = false - c.requestChecksumCalculationEnabled = false + c.RequestChecksumCalculationEnabled = false +} + +func (c *S3Cli) configureGDCH() { + c.RequestChecksumCalculationEnabled = false + c.ResponseChecksumCalculationEnabled = false + c.UploaderRequestChecksumCalculationEnabled = false } func (c *S3Cli) configureDefault() { @@ -190,9 +213,13 @@ func (c *S3Cli) IsGoogle() bool { } func (c *S3Cli) ShouldDisableRequestChecksumCalculation() bool { - return !c.requestChecksumCalculationEnabled + return !c.RequestChecksumCalculationEnabled +} + +func (c *S3Cli) ShouldDisableResponseChecksumCalculation() bool { + return !c.ResponseChecksumCalculationEnabled } func (c *S3Cli) ShouldDisableUploaderRequestChecksumCalculation() bool { - return !c.uploaderRequestChecksumCalculationEnabled + return !c.UploaderRequestChecksumCalculationEnabled } diff --git a/s3/config/config_test.go b/s3/config/config_test.go index 8431a63..75b2c8c 100644 --- a/s3/config/config_test.go +++ b/s3/config/config_test.go @@ -270,6 +270,65 @@ var _ = Describe("BlobstoreClient configuration", func() { }) }) + Describe("transfer tuning fields (concurrency / part size)", func() { + It("accepts zero as 'use defaults' and preserves zeros when not set", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.DownloadConcurrency).To(Equal(0)) + Expect(c.UploadConcurrency).To(Equal(0)) + Expect(c.DownloadPartSize).To(Equal(int64(0))) + Expect(c.UploadPartSize).To(Equal(int64(0))) + }) + + It("preserves positive tuning values from config", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "download_concurrency": 10, + "download_part_size": 10485760, + "upload_concurrency": 8, + "upload_part_size": 5242880 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.DownloadConcurrency).To(Equal(10)) + Expect(c.DownloadPartSize).To(Equal(int64(10485760))) + Expect(c.UploadConcurrency).To(Equal(8)) + Expect(c.UploadPartSize).To(Equal(int64(5242880))) + }) + + It("rejects negative tuning values", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "download_concurrency": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) + + // negative part size + dummyJSONBytes = []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "upload_part_size": -1024 + }`) + dummyJSONReader = bytes.NewReader(dummyJSONBytes) + + _, err = config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) + }) + }) + Describe("returning the S3 endpoint", func() { Context("when port is provided", func() { It("returns a URI in the form `host:port`", func() { diff --git a/s3/config/endpoints.go b/s3/config/endpoints.go index 2d5aad3..bc41fae 100644 --- a/s3/config/endpoints.go +++ b/s3/config/endpoints.go @@ -9,6 +9,7 @@ var ( "aws": regexp.MustCompile(`(^$|s3[-.]?(.*)\.amazonaws\.com(\.cn)?$)`), "alicloud": regexp.MustCompile(`^oss-([a-z]+-[a-z]+(-[1-9])?)(-internal)?\.aliyuncs\.com$`), "google": regexp.MustCompile(`^storage\.googleapis\.com$`), + "gdch": regexp.MustCompile(`objectstorage\..*`), } ) From 2b8d34becf3cb5c4804b303a7000cf26487a7b8f Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Fri, 30 Jan 2026 07:26:00 +0100 Subject: [PATCH 2/4] (feat): make S3 transfer concurrency, part size, checksum configurable --- s3/README.md | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/s3/README.md b/s3/README.md index aff05bb..33ee2ac 100644 --- a/s3/README.md +++ b/s3/README.md @@ -27,7 +27,11 @@ The S3 client requires a JSON configuration file with the following structure: "signature_version": " (optional)", "server_side_encryption": " (optional)", "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true) + "multipart_upload": (optional - default: true), + "download_concurrency": " (optional - default: '5')", + "download_part_size": " (optional - default: '5242880')", # 5 MB + "upload_concurrency": " (optional - default: '5')", + "upload_part_size": " (optional - default: '5242880')" # 5 MB } ``` > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' @@ -79,12 +83,12 @@ Run `./.github/scripts/s3/setup-aws-infrastructure.sh` and `./.github/scripts/s3 #### Setup for GCP 1. Create a bucket in GCP 2. Create access keys - 1. Navigate to **IAM & Admin > Service Accounts**. - 2. Select your service account or create a new one if needed. - 3. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want. - 4. Go to **Cloud Storage** and select **Settings**. - 5. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key". -3. Export the following variables into your environment: +3. Navigate to **IAM & Admin > Service Accounts**. +4. Select your service account or create a new one if needed. +5. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want. +6. Go to **Cloud Storage** and select **Settings**. +7. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key". +8. Export the following variables into your environment: ``` export access_key_id= export secret_access_key= From cd3430ebbe0d16b38e2129a5bf75eae7f60b8d2c Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Fri, 30 Jan 2026 16:32:39 +0100 Subject: [PATCH 3/4] address review comments --- s3/README.md | 3 --- s3/client/aws_s3_blobstore.go | 9 +++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/s3/README.md b/s3/README.md index 33ee2ac..a567220 100644 --- a/s3/README.md +++ b/s3/README.md @@ -13,15 +13,12 @@ The S3 client requires a JSON configuration file with the following structure: ``` json { "bucket_name": " (required)", - "credentials_source": " [static|env_or_profile|none]", "access_key_id": " (required if credentials_source = 'static')", "secret_access_key": " (required if credentials_source = 'static')", - "region": " (optional - default: 'us-east-1')", "host": " (optional)", "port": (optional), - "ssl_verify_peer": (optional - default: true), "use_ssl": (optional - default: true), "signature_version": " (optional)", diff --git a/s3/client/aws_s3_blobstore.go b/s3/client/aws_s3_blobstore.go index 614e24f..ce72b26 100644 --- a/s3/client/aws_s3_blobstore.go +++ b/s3/client/aws_s3_blobstore.go @@ -41,16 +41,14 @@ func (b *awsS3Client) Get(src string, dest io.WriterAt) error { cfg := b.s3cliConfig downloader := manager.NewDownloader(b.s3Client, func(d *manager.Downloader) { + d.Concurrency = defaultTransferConcurrency if cfg.DownloadConcurrency > 0 { d.Concurrency = cfg.DownloadConcurrency - } else { - d.Concurrency = defaultTransferConcurrency } + d.PartSize = defaultTransferPartSize if cfg.DownloadPartSize > 0 { d.PartSize = cfg.DownloadPartSize - } else { - d.PartSize = defaultTransferPartSize } }) @@ -76,10 +74,9 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { uploader := manager.NewUploader(b.s3Client, func(u *manager.Uploader) { u.LeavePartsOnError = false + u.Concurrency = defaultTransferConcurrency if cfg.UploadConcurrency > 0 { u.Concurrency = cfg.UploadConcurrency - } else { - u.Concurrency = defaultTransferConcurrency } // PartSize: if multipart uploads disabled, force a very large part to avoid multipart. From 8644449517506b60f7593f6d1dcbaaf292516088 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Wed, 4 Feb 2026 06:55:29 +0100 Subject: [PATCH 4/4] fix readme --- s3/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/s3/README.md b/s3/README.md index a567220..f3a30c4 100644 --- a/s3/README.md +++ b/s3/README.md @@ -25,10 +25,10 @@ The S3 client requires a JSON configuration file with the following structure: "server_side_encryption": " (optional)", "sse_kms_key_id": " (optional)", "multipart_upload": (optional - default: true), - "download_concurrency": " (optional - default: '5')", - "download_part_size": " (optional - default: '5242880')", # 5 MB - "upload_concurrency": " (optional - default: '5')", - "upload_part_size": " (optional - default: '5242880')" # 5 MB + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880) # 5 MB } ``` > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host'