Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ Given a JSON config file (`config.json`)...
"server_side_encryption": "<string> (optional)",
"sse_kms_key_id": "<string> (optional)",
"multipart_upload": "<bool> (optional - default: true)",
"download_concurrency": <int> (optional - default: 5),
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
"upload_concurrency": <int> (optional - default: 5),
"upload_part_size": <int64> (optional - default: 5242880) # 5 MB
}
```

> Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host'

``` bash
Expand Down Expand Up @@ -109,13 +114,13 @@ export bucket_name=s3cli-pipeline

### Setup for GCP
1. Create a bucket in GCP
2. Create access keys
1. Navigate to **IAM & Admin > Service Accounts**.
2. Select your service account or create a new one if needed.
3. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want.
4. Go to **Cloud Storage** and select **Settings**.
5. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key".
3. Export the following variables into your environment:
2. Create access keys
3. Navigate to **IAM & Admin > Service Accounts**.
4. Select your service account or create a new one if needed.
5. Ensure your service account has necessary permissions (like `Storage Object Creator`, `Storage Object Viewer`, `Storage Admin`) depending on what access you want.
6. Go to **Cloud Storage** and select **Settings**.
7. In the **Interoperability** section, create an HMAC key for your service account. This generates an "access key ID" and a "secret access key".
8. Export the following variables into your environment:
```
export access_key_id=<YOUR_ACCESS_KEY>
export secret_access_key=<YOUR_SECRET_ACCESS_KEY>
Expand Down
35 changes: 34 additions & 1 deletion client/aws_s3_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import (
var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ")
var oneTB = int64(1000 * 1024 * 1024 * 1024)

// Default settings for transfer concurrency and part size.
// These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads.
// See: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/s3/manager#Downloader
const (
defaultTransferConcurrency = 5
defaultTransferPartSize = int64(5 * 1024 * 1024) // 5 MB
)

// awsS3Client encapsulates AWS S3 blobstore interactions
type awsS3Client struct {
s3Client *s3.Client
Expand All @@ -30,7 +38,19 @@ type awsS3Client struct {

// Get fetches a blob, destination will be overwritten if exists
func (b *awsS3Client) Get(src string, dest io.WriterAt) error {
downloader := manager.NewDownloader(b.s3Client)
cfg := b.s3cliConfig

downloader := manager.NewDownloader(b.s3Client, func(d *manager.Downloader) {
d.Concurrency = defaultTransferConcurrency
if cfg.DownloadConcurrency > 0 {
d.Concurrency = cfg.DownloadConcurrency
}

d.PartSize = defaultTransferPartSize
if cfg.DownloadPartSize > 0 {
d.PartSize = cfg.DownloadPartSize
}
})

_, err := downloader.Download(context.TODO(), dest, &s3.GetObjectInput{
Bucket: aws.String(b.s3cliConfig.BucketName),
Expand All @@ -54,9 +74,22 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
uploader := manager.NewUploader(b.s3Client, func(u *manager.Uploader) {
u.LeavePartsOnError = false

u.Concurrency = defaultTransferConcurrency
if cfg.UploadConcurrency > 0 {
u.Concurrency = cfg.UploadConcurrency
}

// PartSize: if multipart uploads disabled, force a very large part to avoid multipart.
// Otherwise, use configured upload part size if present, otherwise default.
if !cfg.MultipartUpload {
// disable multipart uploads by way of large PartSize configuration
u.PartSize = oneTB
} else {
if cfg.UploadPartSize > 0 {
u.PartSize = cfg.UploadPartSize
} else {
u.PartSize = defaultTransferPartSize
}
}

if cfg.ShouldDisableUploaderRequestChecksumCalculation() {
Expand Down
4 changes: 4 additions & 0 deletions client/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func NewAwsS3ClientWithApiOptions(
awsConfig.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
}

if c.ShouldDisableResponseChecksumCalculation() {
awsConfig.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
}

s3Client := s3.NewFromConfig(awsConfig, func(o *s3.Options) {
o.UsePathStyle = !c.HostStyle
if endpoint := c.S3Endpoint(); endpoint != "" {
Expand Down
51 changes: 39 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ type S3Cli struct {
HostStyle bool `json:"host_style"`
SwiftAuthAccount string `json:"swift_auth_account"`
SwiftTempURLKey string `json:"swift_temp_url_key"`
requestChecksumCalculationEnabled bool
uploaderRequestChecksumCalculationEnabled bool
RequestChecksumCalculationEnabled bool
ResponseChecksumCalculationEnabled bool
UploaderRequestChecksumCalculationEnabled bool
// Optional knobs to tune transfer performance.
// If zero, the client will apply sensible defaults (handled by the S3 client layer).
// Part size values are provided in bytes.
DownloadConcurrency int `json:"download_concurrency"`
DownloadPartSize int64 `json:"download_part_size"`
UploadConcurrency int `json:"upload_concurrency"`
UploadPartSize int64 `json:"upload_part_size"`
}

const defaultAWSRegion = "us-east-1" //nolint:unused
Expand Down Expand Up @@ -67,22 +75,29 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
}

c := S3Cli{
SSLVerifyPeer: true,
UseSSL: true,
MultipartUpload: true,
requestChecksumCalculationEnabled: true,
uploaderRequestChecksumCalculationEnabled: true,
SSLVerifyPeer: true,
UseSSL: true,
MultipartUpload: true,
RequestChecksumCalculationEnabled: true,
ResponseChecksumCalculationEnabled: true,
UploaderRequestChecksumCalculationEnabled: true,
}

err = json.Unmarshal(bytes, &c)
if err != nil {
return S3Cli{}, err
}

// Validate bucket presence
if c.BucketName == "" {
return S3Cli{}, errors.New("bucket_name must be set")
}

// Validate numeric fields: disallow negative values (zero means "use defaults")
if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 {
return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative")
}

switch c.CredentialsSource {
case StaticCredentialsSource:
if c.AccessKeyID == "" || c.SecretAccessKey == "" {
Expand Down Expand Up @@ -116,6 +131,8 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
c.configureAlicloud()
case "google":
c.configureGoogle()
case "gdch":
c.configureGDCH()
default:
c.configureDefault()
}
Expand Down Expand Up @@ -155,13 +172,19 @@ func (c *S3Cli) configureAlicloud() {
if c.Region == "" {
c.Region = AlicloudHostToRegion(c.Host)
}
c.requestChecksumCalculationEnabled = false
c.uploaderRequestChecksumCalculationEnabled = false
c.RequestChecksumCalculationEnabled = false
c.UploaderRequestChecksumCalculationEnabled = false
}

func (c *S3Cli) configureGoogle() {
c.MultipartUpload = false
c.requestChecksumCalculationEnabled = false
c.RequestChecksumCalculationEnabled = false
}

func (c *S3Cli) configureGDCH() {
c.RequestChecksumCalculationEnabled = false
c.ResponseChecksumCalculationEnabled = false
c.UploaderRequestChecksumCalculationEnabled = false
}

func (c *S3Cli) configureDefault() {
Expand Down Expand Up @@ -190,9 +213,13 @@ func (c *S3Cli) IsGoogle() bool {
}

func (c *S3Cli) ShouldDisableRequestChecksumCalculation() bool {
return !c.requestChecksumCalculationEnabled
return !c.RequestChecksumCalculationEnabled
}

func (c *S3Cli) ShouldDisableResponseChecksumCalculation() bool {
return !c.ResponseChecksumCalculationEnabled
}

func (c *S3Cli) ShouldDisableUploaderRequestChecksumCalculation() bool {
return !c.uploaderRequestChecksumCalculationEnabled
return !c.UploaderRequestChecksumCalculationEnabled
}
59 changes: 59 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,65 @@ var _ = Describe("BlobstoreClient configuration", func() {
})
})

Describe("transfer tuning fields (concurrency / part size)", func() {
It("accepts zero as 'use defaults' and preserves zeros when not set", func() {
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.DownloadConcurrency).To(Equal(0))
Expect(c.UploadConcurrency).To(Equal(0))
Expect(c.DownloadPartSize).To(Equal(int64(0)))
Expect(c.UploadPartSize).To(Equal(int64(0)))
})

It("preserves positive tuning values from config", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"download_concurrency": 10,
"download_part_size": 10485760,
"upload_concurrency": 8,
"upload_part_size": 5242880
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

c, err := config.NewFromReader(dummyJSONReader)
Expect(err).ToNot(HaveOccurred())
Expect(c.DownloadConcurrency).To(Equal(10))
Expect(c.DownloadPartSize).To(Equal(int64(10485760)))
Expect(c.UploadConcurrency).To(Equal(8))
Expect(c.UploadPartSize).To(Equal(int64(5242880)))
})

It("rejects negative tuning values", func() {
dummyJSONBytes := []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"download_concurrency": -1
}`)
dummyJSONReader := bytes.NewReader(dummyJSONBytes)

_, err := config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative"))

// negative part size
dummyJSONBytes = []byte(`{
"access_key_id":"id",
"secret_access_key":"key",
"bucket_name":"some-bucket",
"upload_part_size": -1024
}`)
dummyJSONReader = bytes.NewReader(dummyJSONBytes)

_, err = config.NewFromReader(dummyJSONReader)
Expect(err).To(MatchError("download/upload concurrency and part sizes must be non-negative"))
})
})

Describe("returning the S3 endpoint", func() {
Context("when port is provided", func() {
It("returns a URI in the form `host:port`", func() {
Expand Down
1 change: 1 addition & 0 deletions config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var (
"aws": regexp.MustCompile(`(^$|s3[-.]?(.*)\.amazonaws\.com(\.cn)?$)`),
"alicloud": regexp.MustCompile(`^oss-([a-z]+-[a-z]+(-[1-9])?)(-internal)?\.aliyuncs\.com$`),
"google": regexp.MustCompile(`^storage\.googleapis\.com$`),
"gdch": regexp.MustCompile(`objectstorage\..*`),
}
)

Expand Down