diff --git a/README.md b/README.md index f259a3390..4b1172938 100644 --- a/README.md +++ b/README.md @@ -483,6 +483,17 @@ src > dst | src = dst | ❌ src <= dst | src != dst | ✅ src <= dst | src == dst | ❌ +###### Hash only +With `--hash-only` flag, it's possible to use the strategy that would only compare file sizes and hashes. Source treated as **source of truth** and any difference in sizes or hashes would cause `s5cmd` to copy source object to destination. Multipart upload will always sync the file. + +The hash can be stored remotely or calculated locally. If `s5cmd` calculate the hash from a local file, it performs many operations. To perform these operations in parallel and quickly, the `sync` uses the `numworkers` flag. As many `numworkers` are specified, as many threads will be created to calculate the hash. + +hash | size | should sync +------------|--------------|------------- +src != dst | src == dst | ✅ +src != dst | src != dst | ✅ +src == dst | src == dst | ❌ + ### Dry run `--dry-run` flag will output what operations will be performed without actually carrying out those operations. @@ -674,6 +685,8 @@ For example, if you are uploading 100 files to an S3 bucket and the `--numworker s5cmd --numworkers 10 cp '/Users/foo/bar/*' s3://mybucket/foo/bar/ ``` +Additionally, this flag is used to calculate hashes when using `sync` operation with `--hash-only` flag. + ### concurrency `concurrency` is a `cp` command option. It sets the number of parts that will be uploaded or downloaded in parallel for a single file. diff --git a/command/sync.go b/command/sync.go index 6dede5a66..a9e9b1895 100644 --- a/command/sync.go +++ b/command/sync.go @@ -52,22 +52,25 @@ Examples: 05. Sync S3 bucket to local folder but use size as only comparison criteria. > s5cmd {{.HelpName}} --size-only "s3://bucket/*" folder/ - 06. Sync a file to S3 bucket + 06. Sync S3 bucket to local folder but use size and hash as comparasion criteria. + > s5cmd {{.HelpName}} --hash-only "s3://bucket/*" folder/ + + 07. Sync a file to S3 bucket > s5cmd {{.HelpName}} myfile.gz s3://bucket/ - 07. Sync matching S3 objects to another bucket + 08. Sync matching S3 objects to another bucket > s5cmd {{.HelpName}} "s3://bucket/*.gz" s3://target-bucket/prefix/ - 08. Perform KMS Server Side Encryption of the object(s) at the destination + 09. Perform KMS Server Side Encryption of the object(s) at the destination > s5cmd {{.HelpName}} --sse aws:kms s3://bucket/object s3://target-bucket/prefix/object - 09. Perform KMS-SSE of the object(s) at the destination using customer managed Customer Master Key (CMK) key id + 10. Perform KMS-SSE of the object(s) at the destination using customer managed Customer Master Key (CMK) key id > s5cmd {{.HelpName}} --sse aws:kms --sse-kms-key-id s3://bucket/object s3://target-bucket/prefix/object - 10. Sync all files to S3 bucket but exclude the ones with txt and gz extension + 11. Sync all files to S3 bucket but exclude the ones with txt and gz extension > s5cmd {{.HelpName}} --exclude "*.txt" --exclude "*.gz" dir/ s3://bucket - 11. Sync all files to S3 bucket but include the only ones with txt and gz extension + 12. Sync all files to S3 bucket but include the only ones with txt and gz extension > s5cmd {{.HelpName}} --include "*.txt" --include "*.gz" dir/ s3://bucket ` @@ -81,6 +84,10 @@ func NewSyncCommandFlags() []cli.Flag { Name: "size-only", Usage: "make size of object only criteria to decide whether an object should be synced", }, + &cli.BoolFlag{ + Name: "hash-only", + Usage: "make hash and size of object only criteria to decide whether an object should be synced", + }, &cli.BoolFlag{ Name: "exit-on-error", Usage: "stops the sync process if an error is received", @@ -130,6 +137,7 @@ type Sync struct { // flags delete bool sizeOnly bool + hashOnly bool exitOnError bool // s3 options @@ -138,6 +146,7 @@ type Sync struct { followSymlinks bool storageClass storage.StorageClass raw bool + numWorkers int srcRegion string dstRegion string @@ -154,12 +163,14 @@ func NewSync(c *cli.Context) Sync { // flags delete: c.Bool("delete"), sizeOnly: c.Bool("size-only"), + hashOnly: c.Bool("hash-only"), exitOnError: c.Bool("exit-on-error"), // flags followSymlinks: !c.Bool("no-follow-symlinks"), storageClass: storage.StorageClass(c.String("storage-class")), raw: c.Bool("raw"), + numWorkers: c.Int("numworkers"), // region settings srcRegion: c.String("source-region"), dstRegion: c.String("destination-region"), @@ -228,11 +239,11 @@ func (s Sync) Run(c *cli.Context) error { } }() - strategy := NewStrategy(s.sizeOnly) // create comparison strategy. - pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run + strategy := NewStrategy(s.sizeOnly, s.hashOnly) // create comparison strategy. + pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run // Create commands in background. - go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch) + go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch, s.numWorkers) err = NewRun(c, pipeReader).Run(ctx) return multierror.Append(err, merrorWaiter).ErrorOrNil() @@ -444,6 +455,7 @@ func (s Sync) planRun( strategy SyncStrategy, w io.WriteCloser, isBatch bool, + numWorkers int, ) { defer w.Close() @@ -474,26 +486,29 @@ func (s Sync) planRun( }() // both in source and destination - wg.Add(1) - go func() { - defer wg.Done() - for commonObject := range common { - sourceObject, destObject := commonObject.src, commonObject.dst - curSourceURL, curDestURL := sourceObject.URL, destObject.URL - err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue - } + // needs several goroutines because HashSync reads a lot of files from the file system + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for commonObject := range common { + sourceObject, destObject := commonObject.src, commonObject.dst + curSourceURL, curDestURL := sourceObject.URL, destObject.URL + err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } - command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue + command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } + fmt.Fprintln(w, command) } - fmt.Fprintln(w, command) - } - }() + }() + } // only in destination wg.Add(1) diff --git a/command/sync_strategy.go b/command/sync_strategy.go index fe6e948e9..62473fe85 100644 --- a/command/sync_strategy.go +++ b/command/sync_strategy.go @@ -1,6 +1,11 @@ package command import ( + "crypto/md5" + "encoding/hex" + "io" + "os" + errorpkg "github.com/peak/s5cmd/v2/error" "github.com/peak/s5cmd/v2/storage" ) @@ -11,9 +16,11 @@ type SyncStrategy interface { ShouldSync(srcObject, dstObject *storage.Object) error } -func NewStrategy(sizeOnly bool) SyncStrategy { +func NewStrategy(sizeOnly bool, hashOnly bool) SyncStrategy { if sizeOnly { return &SizeOnlyStrategy{} + } else if hashOnly { + return &HashStrategy{} } else { return &SizeAndModificationStrategy{} } @@ -50,3 +57,55 @@ func (sm *SizeAndModificationStrategy) ShouldSync(srcObj, dstObj *storage.Object return errorpkg.ErrObjectIsNewerAndSizesMatch } + +// HashStrategy determines to sync based on objects' hashes and sizes. +// It treats source object as the source-of-truth; Source object can be local file or remote (s3). +// +// md5 hash: src != dst should sync: yes +// md5 hash: src == dst should sync: no +// md5 hash: src multipart upload should sync: yes (always) +// md5 hash: can't open src should sync: yes (but cp won't be able to open the file) +type HashStrategy struct{} + +func (s *HashStrategy) ShouldSync(srcObj, dstObj *storage.Object) error { + // Firstly check size. Maybe the sizes will be different. + if srcObj.Size != dstObj.Size { + return nil + } + + srcHash := getHash(srcObj) + dstHash := getHash(dstObj) + + if srcHash == dstHash { + return errorpkg.ErrObjectEtagsMatch + } + + return nil +} + +func getHash(obj *storage.Object) string { + // if remote (s3) then should has Etag + // if not remote (s3) but has Etag then return it + if obj.URL.IsRemote() || obj.Etag != "" { + return obj.Etag + } else { + // cp.go opens the file again. It MAY be possible not to open the file again to calculate the hash. + // fs.go Stat loads file metadata. It is possible to calculate md5 hash in that place, but not necessary. + file, err := os.OpenFile(obj.URL.String(), os.O_RDONLY, 0644) + // Can't open source file? Push it to the storage. + // Not sure about this place. Maybe should throw exception and stop execution. + // But if can't open file here, then can't open file in cp and upload it. + if err != nil { + return "" + } + defer file.Close() + + var md5Obj = md5.New() + buf := make([]byte, obj.Size) + if _, err := io.CopyBuffer(md5Obj, file, buf); err != nil { + return "" + } + + return hex.EncodeToString(md5Obj.Sum(nil)) + } +} diff --git a/e2e/sync_test.go b/e2e/sync_test.go index d592e6d52..0696eaf00 100644 --- a/e2e/sync_test.go +++ b/e2e/sync_test.go @@ -1717,7 +1717,7 @@ func TestSyncS3BucketToS3BucketSizeOnly(t *testing.T) { sourceS3Content := map[string]string{ "main.py": "S: this is an updated python file", "testfile.txt": "S: this is a test file", - "readme.md": "S: this is a readve file", + "readme.md": "S: this is a readme file", "a/another_test_file.txt": "S: yet another txt file", } @@ -2749,3 +2749,145 @@ func TestSyncS3ObjectsIntoAnotherBucketWithIncludeFilters(t *testing.T) { assertError(t, err, errS3NoSuchKey) } } + +// sync --hash-only folder/ s3://bucket/ +func TestSyncLocalFolderToS3BucketSameObjectsHashOnly(t *testing.T) { + t.Parallel() + + s3client, s5cmd := setup(t) + + bucket := s3BucketFromTestName(t) + createBucket(t, s3client, bucket) + + folderLayout := []fs.PathOp{ + fs.WithFile("test.py", "S: this is a python file"), // will be uploaded, different content (hash), size same + fs.WithFile("testfile.txt", "SD: this is a test file"), // will not be uploaded, same content (hash) and size. + fs.WithDir("a", + fs.WithFile("another_test_file.txt", "S: yet another txt file"), // will be uploaded, different content (hash), same size. + ), + fs.WithDir("abc", + fs.WithDir("def", + fs.WithFile("main.py", "S: python file"), // will be uploaded, remote does not have it + ), + ), + } + + workdir := fs.NewDir(t, "somedir", folderLayout...) + defer workdir.Remove() + + s3Content := map[string]string{ + "test.py": "D: this is a python file", + "testfile.txt": "SD: this is a test file", + "a/another_test_file.txt": "D: yet another txt file", + } + + for filename, content := range s3Content { + putFile(t, s3client, bucket, filename, content) + } + + src := fmt.Sprintf("%v/", workdir.Path()) + src = filepath.ToSlash(src) + dst := fmt.Sprintf("s3://%s/", bucket) + + // log debug + cmd := s5cmd("--log", "debug", "sync", "--hash-only", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(`DEBUG "sync %vtestfile.txt %vtestfile.txt": object ETag matches`, src, dst), + 1: equals(`cp %va/another_test_file.txt %va/another_test_file.txt`, src, dst), + 2: equals(`cp %vabc/def/main.py %vabc/def/main.py`, src, dst), + 3: equals(`cp %vtest.py %vtest.py`, src, dst), + }, sortInput(true)) + + // expected folder structure without the timestamp. + expected := fs.Expected(t, folderLayout...) + assert.Assert(t, fs.Equal(workdir.Path(), expected)) + + expectedS3Content := map[string]string{ + "test.py": "S: this is a python file", + "testfile.txt": "SD: this is a test file", + "a/another_test_file.txt": "S: yet another txt file", + "abc/def/main.py": "S: python file", + } + + // assert s3 + for key, content := range expectedS3Content { + assert.Assert(t, ensureS3Object(s3client, bucket, key, content)) + } +} + +// sync --hash-only s3://bucket/* s3://destbucket/ +func TestSyncS3BucketToS3BucketHashOnly(t *testing.T) { + t.Parallel() + + now := time.Now() + timeSource := newFixedTimeSource(now) + s3client, s5cmd := setup(t, withTimeSource(timeSource)) + + bucket := s3BucketFromTestName(t) + dstbucket := s3BucketFromTestNameWithPrefix(t, "dst") + createBucket(t, s3client, bucket) + createBucket(t, s3client, dstbucket) + + sourceS3Content := map[string]string{ + "main.py": "S: this is an updated python file", // destination does not have it + "testfile.txt": "SD: this is a test file", // will not be uploaded, destination has the file with the same hash/size + "readme.md": "S: this is a readme file", // will be uploaded, destination has file with different hash, but same size + "a/another_test_file.txt": "S: yet another txt file", // will be uploaded, destination has file with different hash, but same size + } + + destS3Content := map[string]string{ + "testfile.txt": "SD: this is a test file", + "readme.md": "D: this is a readme file", + "a/another_test_file.txt": "D: yet another txt file", + } + + // make source files older in bucket. + // timestamps should be ignored with --hash-only flag + timeSource.Advance(-time.Minute) + for filename, content := range destS3Content { + putFile(t, s3client, dstbucket, filename, content) + } + timeSource.Advance(time.Minute) + + for filename, content := range sourceS3Content { + putFile(t, s3client, bucket, filename, content) + } + + bucketPath := fmt.Sprintf("s3://%v", bucket) + src := fmt.Sprintf("%s/*", bucketPath) + dst := fmt.Sprintf("s3://%v/", dstbucket) + + // log debug + cmd := s5cmd("--log", "debug", "sync", "--hash-only", src, dst) + result := icmd.RunCmd(cmd) + + result.Assert(t, icmd.Success) + + assertLines(t, result.Stdout(), map[int]compareFunc{ + 0: equals(`DEBUG "sync %v/testfile.txt %vtestfile.txt": object ETag matches`, bucketPath, dst), + 1: equals(`cp %v/a/another_test_file.txt %va/another_test_file.txt`, bucketPath, dst), + 2: equals(`cp %v/main.py %vmain.py`, bucketPath, dst), + 3: equals(`cp %v/readme.md %vreadme.md`, bucketPath, dst), + }, sortInput(true)) + + // assert s3 objects in source + for key, content := range sourceS3Content { + assert.Assert(t, ensureS3Object(s3client, bucket, key, content)) + } + + expectedDestS3Content := map[string]string{ + "main.py": "S: this is an updated python file", + "testfile.txt": "SD: this is a test file", // same as source + "readme.md": "S: this is a readme file", + "a/another_test_file.txt": "S: yet another txt file", + } + + // assert s3 objects in destination + for key, content := range expectedDestS3Content { + assert.Assert(t, ensureS3Object(s3client, dstbucket, key, content)) + } +} diff --git a/error/error.go b/error/error.go index fa7925ba4..68e428f95 100644 --- a/error/error.go +++ b/error/error.go @@ -77,6 +77,9 @@ var ( // ErrObjectSizesMatch indicates the sizes of objects match. ErrObjectSizesMatch = fmt.Errorf("object size matches") + // ErrObjectEtagsMatch indicates the Etag of objects match. + ErrObjectEtagsMatch = fmt.Errorf("object ETag matches") + // ErrObjectIsNewerAndSizesMatch indicates the specified object is newer or same age and sizes of objects match. ErrObjectIsNewerAndSizesMatch = fmt.Errorf("%v and %v", ErrObjectIsNewer, ErrObjectSizesMatch) @@ -88,7 +91,7 @@ var ( // ErrObjectIsNewer or ErrObjectSizesMatch. func IsWarning(err error) bool { switch err { - case ErrObjectExists, ErrObjectIsNewer, ErrObjectSizesMatch, ErrObjectIsNewerAndSizesMatch, ErrorObjectIsGlacier: + case ErrObjectExists, ErrObjectIsNewer, ErrObjectSizesMatch, ErrObjectEtagsMatch, ErrObjectIsNewerAndSizesMatch, ErrorObjectIsGlacier: return true } diff --git a/storage/storage.go b/storage/storage.go index 1a6171263..e5d1f5e40 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -242,6 +242,9 @@ func (o Object) ToBytes() []byte { enc.Encode(o.ModTime.Format(time.RFC3339Nano)) enc.Encode(o.Type.mode) enc.Encode(o.Size) + // Needs to compare source and destination file by ETag. + // We can add flag "NeedETag" to save only when needed. + enc.Encode(o.Etag) return buf.Bytes() } @@ -260,6 +263,11 @@ func FromBytes(data []byte) extsort.SortType { o.ModTime = &tmp dec.Decode(&o.Type.mode) dec.Decode(&o.Size) + // Needs to compare source and destination file by ETag. + // fs.go Filesystem Stat saves empty ("") Etag value. This means that there is no point to decode this value here. + // We can add flag "NeedETag" to save only when needed. + dec.Decode(&o.Etag) + return o }