From 543c61a6b0a12c326a8bdead59f02b61b94fef2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Schottst=C3=A4dt?= Date: Mon, 5 May 2025 19:03:29 +0200 Subject: [PATCH] feat: backup restore streaming --- .env.example | 4 + .gitignore | 4 +- .vscode/launch.json | 5 +- internal/core/services/snapshot/progress.go | 49 +--- .../core/services/snapshot/progress_reader.go | 57 +++++ .../core/services/snapshot/restore_service.go | 210 +++++++----------- internal/utils/misc.go | 23 ++ 7 files changed, 183 insertions(+), 169 deletions(-) create mode 100644 .env.example create mode 100644 internal/core/services/snapshot/progress_reader.go diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6002147 --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +DRUID_S3_ACCESS_KEY= +DRUID_S3_SECRET_KEY= +DRUID_S3_BUCKET=druid-testing +DRUID_S3_ENDPOINT=fsn1 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7610f8f..ddcecc5 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ debug** druid-cli-test -!.docker/** \ No newline at end of file +!.docker/** + +.env \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index a0d35ab..d39948b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,9 +11,10 @@ "args": [ "backup", "--cwd", - "${workspaceFolder}/examples/backup-test", - "https://gist.github.com/jlmaccal/ca6b08a307800f80986661075c82c432/raw/dbd5e7a38d0660b7047476de33da142bd79c22ef/test.tgz" + "${workspaceFolder}/examples/minecraft", + "testtest.tgz" ], + "envFile": "${workspaceFolder}/.env", }, { "name": "Debug Daemon restore", diff --git a/internal/core/services/snapshot/progress.go b/internal/core/services/snapshot/progress.go index 81425a1..4b6666a 100644 --- a/internal/core/services/snapshot/progress.go +++ b/internal/core/services/snapshot/progress.go @@ -2,52 +2,27 @@ package services import ( "fmt" - "io" "github.com/highcard-dev/daemon/internal/utils/logger" "go.uber.org/zap" ) -type ProgressTracker struct { - reader io.ReadCloser - read int64 - fileSize int64 - lastPercent float64 +type GeneralProgressTracker struct { + total int64 + read int64 } -func (pr *ProgressTracker) Read(p []byte) (int, error) { - n, err := pr.reader.Read(p) - pr.read += int64(n) - - // Calculate current percentage of upload progress - currentPercent := (float64(pr.read) * 100) / float64(pr.fileSize) - - // Update progress if we've moved at least 0.1% or it's been more than the update frequency since the last update - if currentPercent > pr.lastPercent+0.1 { - logger.Log().Info("Snapshot operation progress", zap.String("percentage", fmt.Sprintf("%.1f%%", currentPercent)), zap.String("read", fmt.Sprintf("%d/%d", pr.read, pr.fileSize))) - pr.lastPercent = currentPercent - } - - // If the upload is finished - if pr.read == pr.fileSize { - logger.Log().Info("Snapshot operation complete") +func NewGeneralProgressTracker(total int64) *GeneralProgressTracker { + return &GeneralProgressTracker{ + total: total, + read: 0, } - - return n, err -} - -func (pr *ProgressTracker) Close() error { - pr.lastPercent = 100 - return pr.reader.Close() } -func (pt *ProgressTracker) TrackProgress(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser { - pt.reader = stream - pt.fileSize = totalSize - logger.Log().Info("Snapshot operation started", zap.String("source", src), zap.String("size", fmt.Sprintf("%d bytes", totalSize))) - return pt +func (pt *GeneralProgressTracker) GetPercent() float64 { + return (float64(pt.read) / float64(pt.total)) * 100 } - -func (pt *ProgressTracker) GetPercent() float64 { - return pt.lastPercent +func (pt *GeneralProgressTracker) TrackProgress() { + pt.read++ + logger.Log().Info("Progress", zap.Int64("total", pt.total), zap.Int64("read", pt.read), zap.String("percentage", fmt.Sprintf("%.1f%%", pt.GetPercent()))) } diff --git a/internal/core/services/snapshot/progress_reader.go b/internal/core/services/snapshot/progress_reader.go new file mode 100644 index 0000000..8294bb1 --- /dev/null +++ b/internal/core/services/snapshot/progress_reader.go @@ -0,0 +1,57 @@ +package services + +import ( + "fmt" + "io" + + "github.com/highcard-dev/daemon/internal/utils" + "github.com/highcard-dev/daemon/internal/utils/logger" + "go.uber.org/zap" +) + +type ProgressTracker struct { + reader io.ReadCloser + read int64 + fileSize int64 + lastPercent float64 +} + +func (pr *ProgressTracker) Read(p []byte) (int, error) { + n, err := pr.reader.Read(p) + pr.read += int64(n) + + // Calculate current percentage of upload progress + currentPercent := (float64(pr.read) * 100) / float64(pr.fileSize) + + sizeHuman := utils.HumanizeBytes(pr.fileSize) + readHuman := utils.HumanizeBytes(pr.read) + + // Update progress if we've moved at least 0.1% or it's been more than the update frequency since the last update + if currentPercent > pr.lastPercent+0.1 { + logger.Log().Info("Snapshot operation progress", zap.String("percentage", fmt.Sprintf("%.1f%%", currentPercent)), zap.String("read", fmt.Sprintf("%s/%s", readHuman, sizeHuman))) + pr.lastPercent = currentPercent + } + + // If the upload is finished + if pr.read == pr.fileSize { + logger.Log().Info("Snapshot operation complete") + } + + return n, err +} + +func (pr *ProgressTracker) Close() error { + pr.lastPercent = 100 + return pr.reader.Close() +} + +func (pt *ProgressTracker) TrackProgress(src string, currentSize, totalSize int64, stream io.ReadCloser) io.ReadCloser { + pt.reader = stream + pt.fileSize = totalSize + logger.Log().Info("Snapshot operation started", zap.String("source", src), zap.String("size", fmt.Sprintf("%d bytes", totalSize))) + return pt +} + +func (pt *ProgressTracker) GetPercent() float64 { + return pt.lastPercent +} diff --git a/internal/core/services/snapshot/restore_service.go b/internal/core/services/snapshot/restore_service.go index e3e1e9e..aae0f1f 100644 --- a/internal/core/services/snapshot/restore_service.go +++ b/internal/core/services/snapshot/restore_service.go @@ -5,14 +5,12 @@ import ( "compress/gzip" "context" "crypto/tls" - "errors" "fmt" "io" "net/http" "os" "path" "path/filepath" - "strings" "github.com/hashicorp/go-getter" "github.com/highcard-dev/daemon/internal/core/ports" @@ -34,7 +32,7 @@ func NewSnapshotService() *SnapshotService { } } -func (rc *SnapshotService) setActivity(mode ports.SnapshotMode, progressTracker *ProgressTracker) { +func (rc *SnapshotService) setActivity(mode ports.SnapshotMode, progressTracker ports.ProgressTracker) { rc.currentMode = mode rc.currentProgressTracker = progressTracker } @@ -46,98 +44,37 @@ func (rc *SnapshotService) GetCurrentProgressTracker() *ports.ProgressTracker { return &rc.currentProgressTracker } -func (rc *SnapshotService) Snapshot(dir string, destination string, options ports.SnapshotOptions) error { - - var target string - if options.TempDir == "" { - target = filepath.Join(dir, "snapshot.tgz") - } else { - target = filepath.Join(options.TempDir, "snapshot.tgz") - } - - logger.Log().Info("Creating snapshot", zap.String("source", dir), zap.String("destination", target)) - // Define the source URL and destination directory - err := rc.createTarGz(dir, target) - if err != nil { - return err - } - logger.Log().Info("Snapshot created", zap.String("source", dir), zap.String("destination", target)) - - //TODO: upload - if strings.HasPrefix(destination, "http") { - logger.Log().Info("Uploading snapshot", zap.String("source", target), zap.String("destination", destination)) - err = rc.uploadFileUsingPresignedURL(destination, target) +func (rc *SnapshotService) countFilesRec(dir string) (int64, error) { + var fileCount int64 + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { - logger.Log().Error("Error occured while uploading snapshot", zap.Error(err)) return err } - logger.Log().Info("Snapshot uploaded", zap.String("source", target), zap.String("destination", destination)) - } else if options.S3Destination != nil { - logger.Log().Info("Uploading snapshot", zap.String("source", target), zap.String("destination", destination)) - err = rc.uploadFileUsingS3(destination, target, options.S3Destination) - if err != nil { - logger.Log().Error("Error occured while uploading snapshot", zap.Error(err)) - return err + if info.Mode().IsRegular() { + fileCount++ } - logger.Log().Info("Snapshot uploaded", zap.String("source", target), zap.String("destination", destination)) - } else { - return errors.New("destination must be a presigned S3 URL") - } - - return os.Remove(target) -} - -func (rc *SnapshotService) uploadFileUsingS3(objectKey, filePath string, s3Destination *ports.S3Destination) error { - - ctx := context.TODO() - - endpoint := s3Destination.Endpoint - region := s3Destination.Region - if region == "" { - region = "us-east-1" - } - accessKey := s3Destination.AccessKey - secretKey := s3Destination.SecretKey - bucketName := s3Destination.Bucket - - // Load AWS config with custom S3-compatible settings - minioClient, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(accessKey, secretKey, ""), - Secure: true, + return nil }) if err != nil { - return fmt.Errorf("Failed to create S3 client: %v", err) + return 0, err } + return fileCount, nil +} - // Open the file - file, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - // Get the file size - fileInfo, err := file.Stat() - if err != nil { - return fmt.Errorf("failed to get file info: %w", err) - } - fileSize := fileInfo.Size() +func (rc *SnapshotService) Snapshot(dir string, destination string, options ports.SnapshotOptions) error { - // Wrap the file reader in the ProgressReader with an update frequency of 1 second - progressReader := &ProgressTracker{ - reader: file, - fileSize: fileSize, - } - rc.setActivity(ports.SnapshotModeSnapshot, progressReader) - defer rc.setActivity(ports.SnapshotModeNoop, nil) + totalFiles := int64(0) + totalFiles, _ = rc.countFilesRec(dir) + progessTracker := NewGeneralProgressTracker(totalFiles) - contentType := "application/octet-stream" - _, err = minioClient.PutObject(ctx, bucketName, objectKey, progressReader, fileSize, minio.PutObjectOptions{ContentType: contentType}) - if err != nil { - return fmt.Errorf("failed to upload file: %v", err) + rc.setActivity(ports.SnapshotModeSnapshot, progessTracker) + defer rc.setActivity(ports.SnapshotModeNoop, progessTracker) + //check if rootPath exists + if _, err := os.Stat(dir); os.IsNotExist(err) { + return fmt.Errorf("source path does not exist: %s", dir) } - return nil + return rc.uploadS3(dir, destination, options.S3Destination, progessTracker) } func (rc *SnapshotService) RestoreSnapshot(dir string, source string, options ports.RestoreSnapshotOptions) error { @@ -192,71 +129,86 @@ func (rc *SnapshotService) RestoreSnapshot(dir string, source string, options po return nil } -func (rc *SnapshotService) createTarGz(rootPath, target string) error { - // Create the target .tgz file - tgzFile, err := os.Create(target) - if err != nil { - return err - } - defer tgzFile.Close() +func (rc *SnapshotService) uploadS3(rootPath, objectKey string, s3Destination *ports.S3Destination, progessTracker *GeneralProgressTracker) error { - // Create a gzip writer - gzipWriter := gzip.NewWriter(tgzFile) - defer gzipWriter.Close() + pipeReader, pipeWriter := io.Pipe() - // Create a tar writer - tarWriter := tar.NewWriter(gzipWriter) - defer tarWriter.Close() + go func() { + defer pipeWriter.Close() - // Walk through the source directory - return filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } + // Create a gzip writer + gzipWriter := gzip.NewWriter(pipeWriter) + defer gzipWriter.Close() - // Skip the target file - if absTarget, err := filepath.Abs(target); err != nil { - return err - } else if absFile, err := filepath.Abs(path); err != nil { - return err - } else if absFile == absTarget { - return nil - } + // Create a tar writer + tarWriter := tar.NewWriter(gzipWriter) + defer tarWriter.Close() - linkName := "" - if info.Mode()&os.ModeSymlink == os.ModeSymlink { - linkName, err = os.Readlink(path) + // Walk through the source directory + filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } - } - hdr, err := tar.FileInfoHeader(info, linkName) - if err != nil { - return err - } + linkName := "" + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + linkName, err = os.Readlink(path) + if err != nil { + return err + } + } - hdr.Name, _ = filepath.Rel(rootPath, path) + hdr, err := tar.FileInfoHeader(info, linkName) + if err != nil { + return err + } - if err := tarWriter.WriteHeader(hdr); err != nil { - return err - } + hdr.Name, _ = filepath.Rel(rootPath, path) - if info.Mode().IsRegular() { - file, err := os.Open(path) - if err != nil { + if err := tarWriter.WriteHeader(hdr); err != nil { return err } - defer file.Close() - _, err = io.Copy(tarWriter, file) - return err - } + if info.Mode().IsRegular() { + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() - return nil + _, err = io.Copy(tarWriter, file) + progessTracker.TrackProgress() + return err + } + + return nil + }) + }() + + endpoint := s3Destination.Endpoint + region := s3Destination.Region + if region == "" { + region = "us-east-1" + } + accessKey := s3Destination.AccessKey + secretKey := s3Destination.SecretKey + bucketName := s3Destination.Bucket + + // Load AWS config with custom S3-compatible settings + minioClient, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKey, secretKey, ""), + Secure: true, }) + if err != nil { + return fmt.Errorf("failed to create S3 client: %v", err) + } + + contentType := "application/octet-stream" + _, err = minioClient.PutObject(context.TODO(), bucketName, objectKey, pipeReader, -1, minio.PutObjectOptions{ContentType: contentType}) + return err } +// Todo: refactor this to do streaming upload func (rc *SnapshotService) uploadFileUsingPresignedURL(presignedURL, filePath string) error { // Open the file file, err := os.Open(filePath) diff --git a/internal/utils/misc.go b/internal/utils/misc.go index 5e95deb..0630dc3 100644 --- a/internal/utils/misc.go +++ b/internal/utils/misc.go @@ -2,6 +2,7 @@ package utils import ( "errors" + "fmt" "strings" ) @@ -40,3 +41,25 @@ func InterfaceToStringSlice(data interface{}) ([]string, error) { } return instructions, nil } + +func HumanizeBytes(bytes int64) string { + const ( + KB = 1024 + MB = KB * 1024 + GB = MB * 1024 + TB = GB * 1024 + ) + + switch { + case bytes < KB: + return fmt.Sprintf("%dB", bytes) + case bytes < MB: + return fmt.Sprintf("%.2fKB", float64(bytes)/KB) + case bytes < GB: + return fmt.Sprintf("%.2fMB", float64(bytes)/MB) + case bytes < TB: + return fmt.Sprintf("%.2fGB", float64(bytes)/GB) + default: + return fmt.Sprintf("%.2fTB", float64(bytes)/TB) + } +}