From c172bcae4d4b99d683693776eb7d568059455f55 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Tue, 16 Dec 2025 11:07:44 +0100 Subject: [PATCH 1/6] feat: make S3 transfer concurrency and part size configurable --- README.md | 7 ++++- client/aws_s3_blobstore.go | 38 +++++++++++++++++++++++- config/config.go | 15 ++++++++++ config/config_test.go | 59 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 21a903d9..5a8d3e17 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,12 @@ Given a JSON config file (`config.json`)... "signature_version": " (optional)", "server_side_encryption": " (optional)", "sse_kms_key_id": " (optional)", - "multipart_upload": (optional - default: true) + "multipart_upload": (optional - default: true), + + "download_concurrency": " (optional)", + "download_part_size": " (optional)", + "upload_concurrency": " (optional)", + "upload_part_size": " (optional)" } ``` diff --git a/client/aws_s3_blobstore.go b/client/aws_s3_blobstore.go index 72a1c276..c9bf402b 100644 --- a/client/aws_s3_blobstore.go +++ b/client/aws_s3_blobstore.go @@ -22,6 +22,14 @@ import ( var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ") var oneTB = int64(1000 * 1024 * 1024 * 1024) +// Default settings for transfer concurrency and part size. +// These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads. +// See: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/s3/manager#Downloader +const ( + defaultTransferConcurrency = 5 + defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB +) + // awsS3Client encapsulates AWS S3 blobstore interactions type awsS3Client struct { s3Client *s3.Client @@ -30,7 +38,21 @@ type awsS3Client struct { // Get fetches a blob, destination will be overwritten if exists func (b *awsS3Client) Get(src string, dest io.WriterAt) error { - downloader := manager.NewDownloader(b.s3Client) + cfg := b.s3cliConfig + + downloader := manager.NewDownloader(b.s3Client, func(d *manager.Downloader) { + if cfg.DownloadConcurrency > 0 { + d.Concurrency = cfg.DownloadConcurrency + } else { + d.Concurrency = defaultTransferConcurrency + } + + if cfg.DownloadPartSize > 0 { + d.PartSize = cfg.DownloadPartSize + } else { + d.PartSize = defaultTransferPartSize + } + }) _, err := downloader.Download(context.TODO(), dest, &s3.GetObjectInput{ Bucket: aws.String(b.s3cliConfig.BucketName), @@ -54,9 +76,23 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { uploader := manager.NewUploader(b.s3Client, func(u *manager.Uploader) { u.LeavePartsOnError = false + if cfg.UploadConcurrency > 0 { + u.Concurrency = cfg.UploadConcurrency + } else { + u.Concurrency = defaultTransferConcurrency + } + + // PartSize: if multipart uploads disabled, force a very large part to avoid multipart. + // Otherwise, use configured upload part size if present, otherwise default. if !cfg.MultipartUpload { // disable multipart uploads by way of large PartSize configuration u.PartSize = oneTB + } else { + if cfg.UploadPartSize > 0 { + u.PartSize = cfg.UploadPartSize + } else { + u.PartSize = defaultTransferPartSize + } } }) uploadInput := &s3.PutObjectInput{ diff --git a/config/config.go b/config/config.go index 73eaad1d..313949f9 100644 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,15 @@ type S3Cli struct { HostStyle bool `json:"host_style"` SwiftAuthAccount string `json:"swift_auth_account"` SwiftTempURLKey string `json:"swift_temp_url_key"` + + // Optional knobs to tune transfer performance. + // If zero, the client will apply sensible defaults (handled by the S3 client layer). + // Part size values are provided in bytes. + DownloadConcurrency int `json:"download_concurrency"` + DownloadPartSize int64 `json:"download_part_size"` + + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` } // EmptyRegion is required to allow us to use the AWS SDK against S3 compatible blobstores which do not have @@ -81,10 +90,16 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { return S3Cli{}, err } + // Validate bucket presence if c.BucketName == "" { return S3Cli{}, errors.New("bucket_name must be set") } + // Validate numeric fields: disallow negative values (zero means "use defaults") + if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 { + return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative") + } + switch c.CredentialsSource { case StaticCredentialsSource: if c.AccessKeyID == "" || c.SecretAccessKey == "" { diff --git a/config/config_test.go b/config/config_test.go index adf25e08..d7b5a2b6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -269,6 +269,65 @@ var _ = Describe("BlobstoreClient configuration", func() { }) }) + Describe("transfer tuning fields (concurrency / part size)", func() { + It("accepts zero as 'use defaults' and preserves zeros when not set", func() { + dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.DownloadConcurrency).To(Equal(0)) + Expect(c.UploadConcurrency).To(Equal(0)) + Expect(c.DownloadPartSize).To(Equal(int64(0))) + Expect(c.UploadPartSize).To(Equal(int64(0))) + }) + + It("preserves positive tuning values from config", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "download_concurrency": 10, + "download_part_size": 10485760, + "upload_concurrency": 8, + "upload_part_size": 5242880 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + c, err := config.NewFromReader(dummyJSONReader) + Expect(err).ToNot(HaveOccurred()) + Expect(c.DownloadConcurrency).To(Equal(10)) + Expect(c.DownloadPartSize).To(Equal(int64(10485760))) + Expect(c.UploadConcurrency).To(Equal(8)) + Expect(c.UploadPartSize).To(Equal(int64(5242880))) + }) + + It("rejects negative tuning values", func() { + dummyJSONBytes := []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "download_concurrency": -1 + }`) + dummyJSONReader := bytes.NewReader(dummyJSONBytes) + + _, err := config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) + + // negative part size + dummyJSONBytes = []byte(`{ + "access_key_id":"id", + "secret_access_key":"key", + "bucket_name":"some-bucket", + "upload_part_size": -1024 + }`) + dummyJSONReader = bytes.NewReader(dummyJSONBytes) + + _, err = config.NewFromReader(dummyJSONReader) + Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative")) + }) + }) + Describe("returning the S3 endpoint", func() { Context("when port is provided", func() { It("returns a URI in the form `host:port`", func() { From 95e3d1d773176e92b0232d9384b9e20edd06bc52 Mon Sep 17 00:00:00 2001 From: Gowrisankar Date: Wed, 28 Jan 2026 08:47:59 +0100 Subject: [PATCH 2/6] Update README with optional download/upload parameters Added optional parameters for download and upload settings in the README. --- README.md | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 24440d64..05cebd93 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,10 @@ Given a JSON config file (`config.json`)... "server_side_encryption": " (optional)", "sse_kms_key_id": " (optional)", "multipart_upload": " (optional - default: true)", + "download_concurrency": " (optional - default: '5')", + "download_part_size": " (optional - default: '5242880')", # 5 MB + "upload_concurrency": " (optional - default: '5')", + "upload_part_size": " (optional - default: '5242880')" # 5 MB } ``` @@ -110,13 +114,13 @@ export bucket_name=s3cli-pipeline ### Setup for GCP 1. Create a bucket in GCP -2. Create access keys - 1. Navigate to **IAM & Admin > Service Accounts**. - 2. Select your service account or create a new one if needed. - 3. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want. - 4. Go to **Cloud Storage** and select **Settings**. - 5. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key". -3. Export the following variables into your environment: +2. Create access keys +3. Navigate to **IAM & Admin > Service Accounts**. +4. Select your service account or create a new one if needed. +5. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want. +6. Go to **Cloud Storage** and select **Settings**. +7. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key". +8. Export the following variables into your environment: ``` export access_key_id= export secret_access_key= From 2f102ddbfc6d44f74b990e6fd4318e2729151f85 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Wed, 28 Jan 2026 11:56:29 +0100 Subject: [PATCH 3/6] add disable checksum option + fix the Pascal Case --- README.md | 3 ++- client/sdk.go | 3 ++- config/config.go | 23 ++++++++++++----------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 05cebd93..2c23b903 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,10 @@ Given a JSON config file (`config.json`)... "download_part_size": " (optional - default: '5242880')", # 5 MB "upload_concurrency": " (optional - default: '5')", "upload_part_size": " (optional - default: '5242880')" # 5 MB - + "disable_checksums": " (optional - default: false)" } ``` + > Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host' ``` bash diff --git a/client/sdk.go b/client/sdk.go index 3e6dd9d4..f9d83a88 100644 --- a/client/sdk.go +++ b/client/sdk.go @@ -67,8 +67,9 @@ func NewAwsS3ClientWithApiOptions( awsConfig.Credentials = aws.NewCredentialsCache(provider) } - if c.ShouldDisableRequestChecksumCalculation() { + if c.DisableChecksums || c.ShouldDisableRequestChecksumCalculation() { awsConfig.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired + awsConfig.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired } s3Client := s3.NewFromConfig(awsConfig, func(o *s3.Options) { diff --git a/config/config.go b/config/config.go index 0b30dd91..d4ab4050 100644 --- a/config/config.go +++ b/config/config.go @@ -27,15 +27,16 @@ type S3Cli struct { HostStyle bool `json:"host_style"` SwiftAuthAccount string `json:"swift_auth_account"` SwiftTempURLKey string `json:"swift_temp_url_key"` - requestChecksumCalculationEnabled bool - uploaderRequestChecksumCalculationEnabled bool + RequestChecksumCalculationEnabled bool + UploaderRequestChecksumCalculationEnabled bool // Optional knobs to tune transfer performance. // If zero, the client will apply sensible defaults (handled by the S3 client layer). // Part size values are provided in bytes. DownloadConcurrency int `json:"download_concurrency"` DownloadPartSize int64 `json:"download_part_size"` - UploadConcurrency int `json:"upload_concurrency"` - UploadPartSize int64 `json:"upload_part_size"` + UploadConcurrency int `json:"upload_concurrency"` + UploadPartSize int64 `json:"upload_part_size"` + DisableChecksums bool `json:"disable_checksums"` } const defaultAWSRegion = "us-east-1" //nolint:unused @@ -77,8 +78,8 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { SSLVerifyPeer: true, UseSSL: true, MultipartUpload: true, - requestChecksumCalculationEnabled: true, - uploaderRequestChecksumCalculationEnabled: true, + RequestChecksumCalculationEnabled: true, + UploaderRequestChecksumCalculationEnabled: true, } err = json.Unmarshal(bytes, &c) @@ -168,13 +169,13 @@ func (c *S3Cli) configureAlicloud() { if c.Region == "" { c.Region = AlicloudHostToRegion(c.Host) } - c.requestChecksumCalculationEnabled = false - c.uploaderRequestChecksumCalculationEnabled = false + c.RequestChecksumCalculationEnabled = false + c.UploaderRequestChecksumCalculationEnabled = false } func (c *S3Cli) configureGoogle() { c.MultipartUpload = false - c.requestChecksumCalculationEnabled = false + c.RequestChecksumCalculationEnabled = false } func (c *S3Cli) configureDefault() { @@ -203,9 +204,9 @@ func (c *S3Cli) IsGoogle() bool { } func (c *S3Cli) ShouldDisableRequestChecksumCalculation() bool { - return !c.requestChecksumCalculationEnabled + return !c.RequestChecksumCalculationEnabled || c.DisableChecksums } func (c *S3Cli) ShouldDisableUploaderRequestChecksumCalculation() bool { - return !c.uploaderRequestChecksumCalculationEnabled + return !c.UploaderRequestChecksumCalculationEnabled || c.DisableChecksums } From cedfe3765b32692a03bd9832e12f7ee27351c430 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Thu, 29 Jan 2026 09:45:34 +0100 Subject: [PATCH 4/6] remove disable-checksum generic with separated --- client/sdk.go | 5 ++++- config/config.go | 27 ++++++++++++++++++++------- config/endpoints.go | 1 + 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/client/sdk.go b/client/sdk.go index f9d83a88..426bd50d 100644 --- a/client/sdk.go +++ b/client/sdk.go @@ -67,8 +67,11 @@ func NewAwsS3ClientWithApiOptions( awsConfig.Credentials = aws.NewCredentialsCache(provider) } - if c.DisableChecksums || c.ShouldDisableRequestChecksumCalculation() { + if c.ShouldDisableRequestChecksumCalculation() { awsConfig.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired + } + + if c.ShouldDisableResponseChecksumCalculation() { awsConfig.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired } diff --git a/config/config.go b/config/config.go index d4ab4050..2241e02d 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,7 @@ type S3Cli struct { SwiftAuthAccount string `json:"swift_auth_account"` SwiftTempURLKey string `json:"swift_temp_url_key"` RequestChecksumCalculationEnabled bool + ResponseChecksumCalculationEnabled bool UploaderRequestChecksumCalculationEnabled bool // Optional knobs to tune transfer performance. // If zero, the client will apply sensible defaults (handled by the S3 client layer). @@ -36,7 +37,6 @@ type S3Cli struct { DownloadPartSize int64 `json:"download_part_size"` UploadConcurrency int `json:"upload_concurrency"` UploadPartSize int64 `json:"upload_part_size"` - DisableChecksums bool `json:"disable_checksums"` } const defaultAWSRegion = "us-east-1" //nolint:unused @@ -75,10 +75,11 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { } c := S3Cli{ - SSLVerifyPeer: true, - UseSSL: true, - MultipartUpload: true, - RequestChecksumCalculationEnabled: true, + SSLVerifyPeer: true, + UseSSL: true, + MultipartUpload: true, + RequestChecksumCalculationEnabled: true, + ResponseChecksumCalculationEnabled: true, UploaderRequestChecksumCalculationEnabled: true, } @@ -130,6 +131,8 @@ func NewFromReader(reader io.Reader) (S3Cli, error) { c.configureAlicloud() case "google": c.configureGoogle() + case "gdch": + c.configureGDCH() default: c.configureDefault() } @@ -178,6 +181,12 @@ func (c *S3Cli) configureGoogle() { c.RequestChecksumCalculationEnabled = false } +func (c *S3Cli) configureGDCH() { + c.RequestChecksumCalculationEnabled = false + c.ResponseChecksumCalculationEnabled = false + c.UploaderRequestChecksumCalculationEnabled = false +} + func (c *S3Cli) configureDefault() { // No specific configuration needed for default/unknown providers } @@ -204,9 +213,13 @@ func (c *S3Cli) IsGoogle() bool { } func (c *S3Cli) ShouldDisableRequestChecksumCalculation() bool { - return !c.RequestChecksumCalculationEnabled || c.DisableChecksums + return !c.RequestChecksumCalculationEnabled +} + +func (c *S3Cli) ShouldDisableResponseChecksumCalculation() bool { + return !c.ResponseChecksumCalculationEnabled } func (c *S3Cli) ShouldDisableUploaderRequestChecksumCalculation() bool { - return !c.UploaderRequestChecksumCalculationEnabled || c.DisableChecksums + return !c.UploaderRequestChecksumCalculationEnabled } diff --git a/config/endpoints.go b/config/endpoints.go index 2d5aad3b..bc41faeb 100644 --- a/config/endpoints.go +++ b/config/endpoints.go @@ -9,6 +9,7 @@ var ( "aws": regexp.MustCompile(`(^$|s3[-.]?(.*)\.amazonaws\.com(\.cn)?$)`), "alicloud": regexp.MustCompile(`^oss-([a-z]+-[a-z]+(-[1-9])?)(-internal)?\.aliyuncs\.com$`), "google": regexp.MustCompile(`^storage\.googleapis\.com$`), + "gdch": regexp.MustCompile(`objectstorage\..*`), } ) From 4a7fecddd6398d0a12d87c77d3633f90bad8b4a2 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Fri, 30 Jan 2026 07:31:20 +0100 Subject: [PATCH 5/6] fix readme --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 2c23b903..df0e55d4 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,6 @@ Given a JSON config file (`config.json`)... "download_part_size": " (optional - default: '5242880')", # 5 MB "upload_concurrency": " (optional - default: '5')", "upload_part_size": " (optional - default: '5242880')" # 5 MB - "disable_checksums": " (optional - default: false)" } ``` From d3007ff72e8b81edbc85e28d847ab5ebbe8ed419 Mon Sep 17 00:00:00 2001 From: gowrisankar Date: Wed, 4 Feb 2026 09:54:56 +0100 Subject: [PATCH 6/6] fix readme and improve --- README.md | 8 ++++---- client/aws_s3_blobstore.go | 9 +++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index df0e55d4..7c88b350 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,10 @@ Given a JSON config file (`config.json`)... "server_side_encryption": " (optional)", "sse_kms_key_id": " (optional)", "multipart_upload": " (optional - default: true)", - "download_concurrency": " (optional - default: '5')", - "download_part_size": " (optional - default: '5242880')", # 5 MB - "upload_concurrency": " (optional - default: '5')", - "upload_part_size": " (optional - default: '5242880')" # 5 MB + "download_concurrency": (optional - default: 5), + "download_part_size": (optional - default: 5242880), # 5 MB + "upload_concurrency": (optional - default: 5), + "upload_part_size": (optional - default: 5242880) # 5 MB } ``` diff --git a/client/aws_s3_blobstore.go b/client/aws_s3_blobstore.go index bbb8ca03..d61e017f 100644 --- a/client/aws_s3_blobstore.go +++ b/client/aws_s3_blobstore.go @@ -41,16 +41,14 @@ func (b *awsS3Client) Get(src string, dest io.WriterAt) error { cfg := b.s3cliConfig downloader := manager.NewDownloader(b.s3Client, func(d *manager.Downloader) { + d.Concurrency = defaultTransferConcurrency if cfg.DownloadConcurrency > 0 { d.Concurrency = cfg.DownloadConcurrency - } else { - d.Concurrency = defaultTransferConcurrency } + d.PartSize = defaultTransferPartSize if cfg.DownloadPartSize > 0 { d.PartSize = cfg.DownloadPartSize - } else { - d.PartSize = defaultTransferPartSize } }) @@ -76,10 +74,9 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error { uploader := manager.NewUploader(b.s3Client, func(u *manager.Uploader) { u.LeavePartsOnError = false + u.Concurrency = defaultTransferConcurrency if cfg.UploadConcurrency > 0 { u.Concurrency = cfg.UploadConcurrency - } else { - u.Concurrency = defaultTransferConcurrency } // PartSize: if multipart uploads disabled, force a very large part to avoid multipart.