From 0ea9d3b69e1b59aa053abf2c9014e8358906f0c2 Mon Sep 17 00:00:00 2001 From: Joshua Gilman Date: Thu, 22 Jan 2026 17:36:45 -0800 Subject: [PATCH] feat(core): return CopyStats from copy operations Add CopyStats return value to CopyTo, CopyToWithOptions, CopyDir, and CopyFile methods. The stats include FileCount (files successfully copied), TotalBytes (sum of uncompressed sizes), and Skipped (files skipped due to existing without overwrite). This is a breaking API change - callers must now handle the additional return value. Acceptable for a pre-1.0 library. Changes: - Add internal ProcessStats type for batch processor - Update batch.Process() to return (ProcessStats, error) - Add public CopyStats type in core/blob_opts.go - Update all copy methods to return (CopyStats, error) - Re-export CopyStats in types.go - Update all tests, benchmarks, and examples - Update documentation with new signatures Co-Authored-By: Claude Opus 4.5 --- benchmarks/estargz/benchmark_compare_test.go | 4 +- core/bench_cache_test.go | 4 +- core/bench_test.go | 4 +- core/benchmark_core_test.go | 10 +- core/blob.go | 51 +++++++---- core/blob_opts.go | 12 +++ core/blob_test.go | 96 +++++++++++++++++--- core/internal/batch/batch.go | 93 ++++++++++++------- core/internal/batch/batch_bench_test.go | 2 +- core/internal/batch/batch_test.go | 13 ++- core/internal/batch/stats.go | 20 ++++ docs/docs/guides/extraction.md | 38 ++++---- docs/docs/guides/performance-tuning.md | 16 ++-- docs/docs/reference/api.md | 24 +++-- docs/src/pages/index.tsx | 3 +- examples/provenance/pull.go | 4 +- integration/archive_test.go | 18 ++-- integration/error_test.go | 2 +- types.go | 3 + 19 files changed, 294 insertions(+), 123 deletions(-) create mode 100644 core/internal/batch/stats.go diff --git a/benchmarks/estargz/benchmark_compare_test.go b/benchmarks/estargz/benchmark_compare_test.go index 36d843f..0e91727 100644 --- a/benchmarks/estargz/benchmark_compare_test.go +++ b/benchmarks/estargz/benchmark_compare_test.go @@ -432,7 +432,7 @@ func BenchmarkCompareCopyDir(b *testing.B) { } b.StartTimer() - if err := bb.CopyDir(destDir, prefix, opts...); err != nil { + if _, err := bb.CopyDir(destDir, prefix, opts...); err != nil { b.Fatal(err) } @@ -554,7 +554,7 @@ func BenchmarkCompareCopyAll(b *testing.B) { } b.StartTimer() - if err := bb.CopyDir(destDir, prefix, opts...); err != nil { + if _, err := bb.CopyDir(destDir, prefix, opts...); err != nil { b.Fatal(err) } diff --git a/core/bench_cache_test.go b/core/bench_cache_test.go index cfbaa1d..c81342a 100644 --- a/core/bench_cache_test.go +++ b/core/bench_cache_test.go @@ -392,7 +392,7 @@ func BenchmarkBlobCopyDirBlockCache(b *testing.B) { b.Fatal(err) } - if err := cached.CopyDir(destDir, ".", CopyWithOverwrite(true)); err != nil { + if _, err := cached.CopyDir(destDir, ".", CopyWithOverwrite(true)); err != nil { b.Fatal(err) } @@ -400,7 +400,7 @@ func BenchmarkBlobCopyDirBlockCache(b *testing.B) { b.ReportAllocs() b.ResetTimer() for b.Loop() { - if err := cached.CopyDir(destDir, ".", CopyWithOverwrite(true)); err != nil { + if _, err := cached.CopyDir(destDir, ".", CopyWithOverwrite(true)); err != nil { b.Fatal(err) } } diff --git a/core/bench_test.go b/core/bench_test.go index e288d27..4a4f04c 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -458,7 +458,7 @@ func BenchmarkBlobCopyDirHTTPMatrix(b *testing.B) { } b.StartTimer() - if err := blob.CopyDir(destDir, prefix, opts...); err != nil { + if _, err := blob.CopyDir(destDir, prefix, opts...); err != nil { b.Fatal(err) } @@ -821,7 +821,7 @@ func benchmarkBlobCopyDir(b *testing.B, label string, workers int, cleanDest boo opts = append(opts, CopyWithCleanDest(true)) } - if err := blob.CopyDir(destDir, prefix, opts...); err != nil { + if _, err := blob.CopyDir(destDir, prefix, opts...); err != nil { b.Fatal(err) } diff --git a/core/benchmark_core_test.go b/core/benchmark_core_test.go index 144628f..a45b35d 100644 --- a/core/benchmark_core_test.go +++ b/core/benchmark_core_test.go @@ -816,7 +816,7 @@ func BenchmarkSequentialVsRandom(b *testing.B) { b.ReportAllocs() b.ResetTimer() for b.Loop() { - if err := processor.Process(entries, discardSink{}); err != nil { + if _, err := processor.Process(entries, discardSink{}); err != nil { b.Fatal(err) } } @@ -860,7 +860,8 @@ func BenchmarkDirectoryTransfer(b *testing.B) { fn: func(blob *Blob, _ benchByteSource) error { entries := blob.collectPrefixEntries(prefix) processor := batch.NewProcessor(blob.reader.Source(), blob.reader.Pool(), blob.maxFileSize, batch.WithReadConcurrency(1)) - return processor.Process(entries, discardSink{}) + _, err := processor.Process(entries, discardSink{}) + return err }, }, { @@ -1133,7 +1134,7 @@ func BenchmarkCopyDirContiguous(b *testing.B) { b.ReportAllocs() b.ResetTimer() for b.Loop() { - if err := processor.Process(entries, discardSink{}); err != nil { + if _, err := processor.Process(entries, discardSink{}); err != nil { b.Fatal(err) } } @@ -1182,7 +1183,8 @@ func BenchmarkCopyDirVsIndividual(b *testing.B) { fn: func(blob *Blob) error { entries := blob.collectPrefixEntries(prefix) processor := batch.NewProcessor(blob.reader.Source(), blob.reader.Pool(), blob.maxFileSize, batch.WithReadConcurrency(1)) - return processor.Process(entries, discardSink{}) + _, err := processor.Process(entries, discardSink{}) + return err }, }, { diff --git a/core/blob.go b/core/blob.go index 834d593..ec495e3 100644 --- a/core/blob.go +++ b/core/blob.go @@ -424,9 +424,9 @@ func (b *Blob) Len() int { // - Existing files are skipped (use CopyWithOverwrite to overwrite) // - File modes and times are not preserved (use CopyWithPreserveMode/Times) // - Range reads are pipelined (when beneficial) with concurrency 4 (use CopyWithReadConcurrency to change) -func (b *Blob) CopyTo(destDir string, paths ...string) error { +func (b *Blob) CopyTo(destDir string, paths ...string) (CopyStats, error) { if len(paths) == 0 { - return nil + return CopyStats{}, nil } cfg := copyConfig{} @@ -434,9 +434,9 @@ func (b *Blob) CopyTo(destDir string, paths ...string) error { } // CopyToWithOptions extracts specific files with options. -func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOption) error { +func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOption) (CopyStats, error) { if len(paths) == 0 { - return nil + return CopyStats{}, nil } cfg := copyConfig{} @@ -444,7 +444,7 @@ func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOpt opt(&cfg) } if cfg.cleanDest { - return errors.New("CopyWithCleanDest is only supported by CopyDir") + return CopyStats{}, errors.New("CopyWithCleanDest is only supported by CopyDir") } return b.copyEntries(destDir, b.collectPathEntries(paths), &cfg) } @@ -463,7 +463,7 @@ func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOpt // - Existing files are skipped (use CopyWithOverwrite to overwrite) // - File modes and times are not preserved (use CopyWithPreserveMode/Times) // - Range reads are pipelined (when beneficial) with concurrency 4 (use CopyWithReadConcurrency to change) -func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) error { +func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) (CopyStats, error) { cfg := copyConfig{} for _, opt := range opts { opt(&cfg) @@ -471,10 +471,10 @@ func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) error { if cfg.cleanDest { target, err := cleanCopyDest(destDir, prefix) if err != nil { - return err + return CopyStats{}, err } if err := os.RemoveAll(target); err != nil { - return fmt.Errorf("clean destination %s: %w", target, err) + return CopyStats{}, fmt.Errorf("clean destination %s: %w", target, err) } cfg.overwrite = true } @@ -496,46 +496,52 @@ func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) error { // overwrite is disabled), CopyFile returns fs.ErrExist. This explicit error // behavior is more appropriate for single-file operations where the caller // likely wants to know if the copy didn't happen. -func (b *Blob) CopyFile(srcPath, destPath string, opts ...CopyOption) error { +func (b *Blob) CopyFile(srcPath, destPath string, opts ...CopyOption) (CopyStats, error) { cfg := copyConfig{} for _, opt := range opts { opt(&cfg) } if cfg.cleanDest { - return errors.New("CopyWithCleanDest is only supported by CopyDir") + return CopyStats{}, errors.New("CopyWithCleanDest is only supported by CopyDir") } // Normalize and validate source path srcPath = NormalizePath(srcPath) if !fs.ValidPath(srcPath) { - return &fs.PathError{Op: "copyfile", Path: srcPath, Err: fs.ErrInvalid} + return CopyStats{}, &fs.PathError{Op: "copyfile", Path: srcPath, Err: fs.ErrInvalid} } // Look up entry, verify it's a file view, ok := b.idx.LookupView(srcPath) if !ok { - return &fs.PathError{Op: "copyfile", Path: srcPath, Err: fs.ErrNotExist} + return CopyStats{}, &fs.PathError{Op: "copyfile", Path: srcPath, Err: fs.ErrNotExist} } entry := blobtype.EntryFromViewWithPath(view, srcPath) if entry.Mode.IsDir() { - return &fs.PathError{Op: "copyfile", Path: srcPath, Err: errors.New("cannot copy directory")} + return CopyStats{}, &fs.PathError{Op: "copyfile", Path: srcPath, Err: errors.New("cannot copy directory")} } // Check destination (unless overwrite) if !cfg.overwrite { if _, err := os.Stat(destPath); err == nil { - return &fs.PathError{Op: "copyfile", Path: destPath, Err: fs.ErrExist} + return CopyStats{}, &fs.PathError{Op: "copyfile", Path: destPath, Err: fs.ErrExist} } } // Open source file (handles decompression + verification) src, err := b.Open(srcPath) if err != nil { - return err + return CopyStats{}, err } defer src.Close() - return copyFileAtomic(src, destPath, &entry, &cfg) + if err := copyFileAtomic(src, destPath, &entry, &cfg); err != nil { + return CopyStats{}, err + } + return CopyStats{ + FileCount: 1, + TotalBytes: entry.OriginalSize, + }, nil } // copyFileAtomic writes content from src to destPath atomically using a temp file. @@ -638,13 +644,13 @@ func (b *Blob) collectPrefixEntries(prefix string) []*batch.Entry { } // copyEntries uses the batch processor to copy entries to destDir. -func (b *Blob) copyEntries(destDir string, entries []*batch.Entry, cfg *copyConfig) error { +func (b *Blob) copyEntries(destDir string, entries []*batch.Entry, cfg *copyConfig) (CopyStats, error) { if len(entries) == 0 { - return nil + return CopyStats{}, nil } for _, entry := range entries { if !fs.ValidPath(entry.Path) { - return &fs.PathError{Op: "copy", Path: entry.Path, Err: fs.ErrInvalid} + return CopyStats{}, &fs.PathError{Op: "copy", Path: entry.Path, Err: fs.ErrInvalid} } } @@ -682,7 +688,12 @@ func (b *Blob) copyEntries(destDir string, entries []*batch.Entry, cfg *copyConf } proc := batch.NewProcessor(b.reader.Source(), b.reader.Pool(), b.maxFileSize, procOpts...) - return proc.Process(entries, sink) + procStats, err := proc.Process(entries, sink) + return CopyStats{ + FileCount: procStats.Processed, + TotalBytes: procStats.TotalBytes, + Skipped: procStats.Skipped, + }, err } func cleanCopyDest(destDir, prefix string) (string, error) { diff --git a/core/blob_opts.go b/core/blob_opts.go index b018742..5796d19 100644 --- a/core/blob_opts.go +++ b/core/blob_opts.go @@ -163,3 +163,15 @@ func CopyWithProgress(fn ProgressFunc) CopyOption { c.progress = fn } } + +// CopyStats contains statistics about a copy operation. +type CopyStats struct { + // FileCount is the number of files successfully copied. + FileCount int + + // TotalBytes is the sum of original (uncompressed) file sizes. + TotalBytes uint64 + + // Skipped is the number of files skipped (e.g., already exist without overwrite). + Skipped int +} diff --git a/core/blob_test.go b/core/blob_test.go index 54bd054..55b170e 100644 --- a/core/blob_test.go +++ b/core/blob_test.go @@ -40,7 +40,7 @@ func TestCopyDirRejectsTraversalPaths(t *testing.T) { require.NoError(t, err) destDir := t.TempDir() - err = archive.CopyDir(destDir, "") + _, err = archive.CopyDir(destDir, "") var pathErr *fs.PathError require.ErrorAs(t, err, &pathErr) require.ErrorIs(t, pathErr.Err, fs.ErrInvalid) @@ -48,6 +48,67 @@ func TestCopyDirRejectsTraversalPaths(t *testing.T) { require.Error(t, statErr) } +func TestCopyDir_ReturnsStats(t *testing.T) { + t.Parallel() + + files := map[string][]byte{ + "a.txt": bytes.Repeat([]byte("a"), 100), + "b.txt": bytes.Repeat([]byte("b"), 200), + "dir/c.txt": bytes.Repeat([]byte("c"), 300), + } + b := createTestArchive(t, files, CompressionNone) + + destDir := t.TempDir() + stats, err := b.CopyDir(destDir, "") + require.NoError(t, err) + + assert.Equal(t, 3, stats.FileCount) + assert.Equal(t, uint64(600), stats.TotalBytes) + assert.Equal(t, 0, stats.Skipped) +} + +func TestCopyDir_SkippedStats(t *testing.T) { + t.Parallel() + + files := map[string][]byte{ + "a.txt": []byte("aaa"), + "b.txt": []byte("bbb"), + } + b := createTestArchive(t, files, CompressionNone) + + destDir := t.TempDir() + + // Pre-create one file + require.NoError(t, os.WriteFile(filepath.Join(destDir, "a.txt"), []byte("existing"), 0o644)) + + // Without overwrite, a.txt should be skipped + stats, err := b.CopyDir(destDir, "") + require.NoError(t, err) + + assert.Equal(t, 1, stats.FileCount) // Only b.txt copied + assert.Equal(t, uint64(3), stats.TotalBytes) // Only b.txt size + assert.Equal(t, 1, stats.Skipped) // a.txt skipped +} + +func TestCopyTo_ReturnsStats(t *testing.T) { + t.Parallel() + + files := map[string][]byte{ + "a.txt": bytes.Repeat([]byte("a"), 100), + "b.txt": bytes.Repeat([]byte("b"), 200), + "c.txt": bytes.Repeat([]byte("c"), 300), + } + b := createTestArchive(t, files, CompressionNone) + + destDir := t.TempDir() + stats, err := b.CopyTo(destDir, "a.txt", "b.txt") + require.NoError(t, err) + + assert.Equal(t, 2, stats.FileCount) + assert.Equal(t, uint64(300), stats.TotalBytes) + assert.Equal(t, 0, stats.Skipped) +} + // createTestArchive creates a Blob for testing with the given files and compression. // Files are specified as a map of path to content. func createTestArchive(t *testing.T, files map[string][]byte, compression Compression) *Blob { @@ -484,9 +545,14 @@ func TestBlob_CopyFile(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "renamed.json") - err := b.CopyFile("config/app.json", dest) + stats, err := b.CopyFile("config/app.json", dest) require.NoError(t, err) + // Verify stats + assert.Equal(t, 1, stats.FileCount) + assert.Equal(t, uint64(len(content)), stats.TotalBytes) + assert.Equal(t, 0, stats.Skipped) + got, err := os.ReadFile(dest) require.NoError(t, err) assert.Equal(t, content, got) @@ -497,7 +563,7 @@ func TestBlob_CopyFile(t *testing.T) { dest := filepath.Join(t.TempDir(), "existing.json") require.NoError(t, os.WriteFile(dest, []byte("existing"), 0o644)) - err := b.CopyFile("config/app.json", dest) + _, err := b.CopyFile("config/app.json", dest) assert.ErrorIs(t, err, fs.ErrExist) }) @@ -506,7 +572,7 @@ func TestBlob_CopyFile(t *testing.T) { dest := filepath.Join(t.TempDir(), "overwrite.json") require.NoError(t, os.WriteFile(dest, []byte("old"), 0o644)) - err := b.CopyFile("config/app.json", dest, CopyWithOverwrite(true)) + _, err := b.CopyFile("config/app.json", dest, CopyWithOverwrite(true)) require.NoError(t, err) got, err := os.ReadFile(dest) @@ -520,7 +586,7 @@ func TestBlob_CopyFile(t *testing.T) { // "config" is a synthetic directory (has config/app.json under it). // Since directories aren't stored as entries, this returns ErrNotExist. - err := b.CopyFile("config", dest) + _, err := b.CopyFile("config", dest) assert.ErrorIs(t, err, fs.ErrNotExist) }) @@ -528,7 +594,7 @@ func TestBlob_CopyFile(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "out.txt") - err := b.CopyFile("nonexistent.txt", dest) + _, err := b.CopyFile("nonexistent.txt", dest) assert.ErrorIs(t, err, fs.ErrNotExist) }) @@ -536,7 +602,7 @@ func TestBlob_CopyFile(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "missing", "parent", "file.txt") - err := b.CopyFile("config/app.json", dest) + _, err := b.CopyFile("config/app.json", dest) assert.Error(t, err) }) @@ -545,7 +611,7 @@ func TestBlob_CopyFile(t *testing.T) { dest := filepath.Join(t.TempDir(), "normalized.json") // Leading slash should be stripped - err := b.CopyFile("/config/app.json", dest) + _, err := b.CopyFile("/config/app.json", dest) require.NoError(t, err) got, err := os.ReadFile(dest) @@ -557,7 +623,7 @@ func TestBlob_CopyFile(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "test.json") - err := b.CopyFile("config/app.json", dest, CopyWithCleanDest(true)) + _, err := b.CopyFile("config/app.json", dest, CopyWithCleanDest(true)) assert.Error(t, err) assert.Contains(t, err.Error(), "CopyWithCleanDest") }) @@ -569,7 +635,7 @@ func TestBlob_CopyFile(t *testing.T) { // Create a directory at the destination require.NoError(t, os.Mkdir(dest, 0o755)) - err := b.CopyFile("config/app.json", dest, CopyWithOverwrite(true)) + _, err := b.CopyFile("config/app.json", dest, CopyWithOverwrite(true)) var pathErr *fs.PathError require.ErrorAs(t, err, &pathErr) assert.Equal(t, "copyfile", pathErr.Op) @@ -588,7 +654,7 @@ func TestBlob_CopyFile_Compressed(t *testing.T) { b := createTestArchive(t, files, CompressionZstd) dest := filepath.Join(t.TempDir(), "extracted.bin") - err := b.CopyFile("data.bin", dest) + _, err := b.CopyFile("data.bin", dest) require.NoError(t, err) got, err := os.ReadFile(dest) @@ -622,7 +688,7 @@ func TestBlob_CopyFile_PreserveMode(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "script.sh") - err := b.CopyFile("script.sh", dest) + _, err := b.CopyFile("script.sh", dest) require.NoError(t, err) info, err := os.Stat(dest) @@ -636,7 +702,7 @@ func TestBlob_CopyFile_PreserveMode(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "script.sh") - err := b.CopyFile("script.sh", dest, CopyWithPreserveMode(true)) + _, err := b.CopyFile("script.sh", dest, CopyWithPreserveMode(true)) require.NoError(t, err) info, err := os.Stat(dest) @@ -670,7 +736,7 @@ func TestBlob_CopyFile_PreserveTimes(t *testing.T) { // Use a 2-second buffer to handle coarse filesystem timestamp resolution beforeCopy := time.Now().Add(-2 * time.Second) - err := b.CopyFile("file.txt", dest) + _, err := b.CopyFile("file.txt", dest) require.NoError(t, err) info, err := os.Stat(dest) @@ -683,7 +749,7 @@ func TestBlob_CopyFile_PreserveTimes(t *testing.T) { t.Parallel() dest := filepath.Join(t.TempDir(), "file.txt") - err := b.CopyFile("file.txt", dest, CopyWithPreserveTimes(true)) + _, err := b.CopyFile("file.txt", dest, CopyWithPreserveTimes(true)) require.NoError(t, err) info, err := os.Stat(dest) diff --git a/core/internal/batch/batch.go b/core/internal/batch/batch.go index 8c7285d..732057d 100644 --- a/core/internal/batch/batch.go +++ b/core/internal/batch/batch.go @@ -153,10 +153,13 @@ func NewProcessor(source file.ByteSource, pool *file.DecompressPool, maxFileSize // entry, the content is decompressed (if needed), hash-verified, and // written to the sink. // -// Processing stops on the first error encountered. -func (p *Processor) Process(entries []*Entry, sink Sink) error { +// Processing stops on the first error encountered. The returned ProcessStats +// contains counts for processed and skipped entries, and total bytes written. +// On error, partial stats are returned reflecting work completed before the error. +func (p *Processor) Process(entries []*Entry, sink Sink) (ProcessStats, error) { + var stats ProcessStats if len(entries) == 0 { - return nil + return stats, nil } // Filter entries that should be processed @@ -164,10 +167,12 @@ func (p *Processor) Process(entries []*Entry, sink Sink) error { for _, entry := range entries { if sink.ShouldProcess(entry) { toProcess = append(toProcess, entry) + } else { + stats.Skipped++ } } if len(toProcess) == 0 { - return nil + return stats, nil } // Initialize progress tracking @@ -178,7 +183,7 @@ func (p *Processor) Process(entries []*Entry, sink Sink) error { sourceSize := p.source.Size() for _, entry := range toProcess { if err := file.ValidateAll(entry, sourceSize, p.maxFileSize); err != nil { - return fmt.Errorf("batch: %s: %w", entry.Path, err) + return stats, fmt.Errorf("batch: %s: %w", entry.Path, err) } } @@ -197,10 +202,15 @@ func (p *Processor) Process(entries []*Entry, sink Sink) error { groups := groupAdjacentEntries(toProcess) p.log().Debug("batch processing", "entries", len(toProcess), "groups", len(groups)) + var procStats ProcessStats + var err error if len(groups) > 1 && (p.readConcurrency > 1 || p.readAheadEnabled) { - return p.processGroupsPipelined(groups, sink) + procStats, err = p.processGroupsPipelined(groups, sink) + } else { + procStats, err = p.processGroupsSequential(groups, sink) } - return p.processGroupsSequential(groups, sink) + stats.add(procStats) + return stats, err } // groupTask represents a pending group read operation for the pipeline. @@ -219,19 +229,22 @@ type groupResult struct { } // processGroupsSequential processes groups one at a time without pipelining. -func (p *Processor) processGroupsSequential(groups []rangeGroup, sink Sink) error { +func (p *Processor) processGroupsSequential(groups []rangeGroup, sink Sink) (ProcessStats, error) { + var stats ProcessStats for _, group := range groups { - if err := p.processGroup(group, sink); err != nil { - return err + groupStats, err := p.processGroup(group, sink) + stats.add(groupStats) + if err != nil { + return stats, err } } - return nil + return stats, nil } //nolint:gocognit,gocyclo // complex pipeline logic requires coordination between producers/consumers -func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) error { +func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) (ProcessStats, error) { if len(groups) == 0 { - return nil + return ProcessStats{}, nil } readWorkers := p.readConcurrency @@ -243,7 +256,7 @@ func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) error if p.readAheadEnabled { limit, err := sizing.ToInt64(p.readAheadBytes, blobtype.ErrSizeOverflow) if err != nil { - return fmt.Errorf("batch: %w", err) + return ProcessStats{}, fmt.Errorf("batch: %w", err) } budget = semaphore.NewWeighted(limit) } @@ -315,6 +328,10 @@ func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) error close(readyCh) }() + // Track stats from the consumer goroutine + var stats ProcessStats + var statsMu sync.Mutex + eg.Go(func() error { next := 0 pending := make(map[int]groupResult, readWorkers) @@ -334,15 +351,16 @@ func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) error break } delete(pending, next) - if err := p.processGroupWithData(res.group, res.data, sink); err != nil { - if budget != nil { - budget.Release(res.size) - } - return err - } + groupStats, err := p.processGroupWithData(res.group, res.data, sink) + statsMu.Lock() + stats.add(groupStats) + statsMu.Unlock() if budget != nil { budget.Release(res.size) } + if err != nil { + return err + } next++ } case <-ctx.Done(): @@ -352,22 +370,23 @@ func (p *Processor) processGroupsPipelined(groups []rangeGroup, sink Sink) error return nil }) - return eg.Wait() + err := eg.Wait() + return stats, err } // processGroup reads a contiguous range and processes each entry. -func (p *Processor) processGroup(group rangeGroup, sink Sink) error { +func (p *Processor) processGroup(group rangeGroup, sink Sink) (ProcessStats, error) { data, err := p.readGroupData(group) if err != nil { - return err + return ProcessStats{}, err } return p.processGroupWithData(group, data, sink) } // processGroupWithData processes all entries in a group using pre-fetched data. -func (p *Processor) processGroupWithData(group rangeGroup, data []byte, sink Sink) error { +func (p *Processor) processGroupWithData(group rangeGroup, data []byte, sink Sink) (ProcessStats, error) { if len(group.entries) == 0 { - return nil + return ProcessStats{}, nil } workers := p.workerCount(group.entries) if workers < 2 { @@ -401,19 +420,24 @@ func groupSize(group rangeGroup) (int64, error) { } // processEntriesSerial processes entries one at a time. -func (p *Processor) processEntriesSerial(entries []*Entry, data []byte, groupStart uint64, sink Sink) error { +func (p *Processor) processEntriesSerial(entries []*Entry, data []byte, groupStart uint64, sink Sink) (ProcessStats, error) { + var stats ProcessStats for _, entry := range entries { if err := p.processEntry(entry, data, groupStart, sink); err != nil { - return err + return stats, err } + stats.Processed++ + stats.TotalBytes += entry.OriginalSize p.reportEntryProgress(entry) } - return nil + return stats, nil } // processEntriesParallel processes entries concurrently. -func (p *Processor) processEntriesParallel(entries []*Entry, data []byte, groupStart uint64, sink Sink, workers int) error { +func (p *Processor) processEntriesParallel(entries []*Entry, data []byte, groupStart uint64, sink Sink, workers int) (ProcessStats, error) { var stop atomic.Bool + var processed atomic.Int64 + var totalBytes atomic.Uint64 errCh := make(chan error, 1) var wg sync.WaitGroup @@ -432,17 +456,24 @@ func (p *Processor) processEntriesParallel(entries []*Entry, data []byte, groupS } return } + processed.Add(1) + totalBytes.Add(entry.OriginalSize) p.reportEntryProgress(entry) } }(w) } wg.Wait() + stats := ProcessStats{ + Processed: int(processed.Load()), + TotalBytes: totalBytes.Load(), + } + select { case err := <-errCh: - return err + return stats, err default: - return nil + return stats, nil } } diff --git a/core/internal/batch/batch_bench_test.go b/core/internal/batch/batch_bench_test.go index f59afd5..ed5e7b1 100644 --- a/core/internal/batch/batch_bench_test.go +++ b/core/internal/batch/batch_bench_test.go @@ -100,7 +100,7 @@ func BenchmarkProcessorPipelined(b *testing.B) { b.ResetTimer() for b.Loop() { - if err := proc.Process(entries, sink); err != nil { + if _, err := proc.Process(entries, sink); err != nil { b.Fatal(err) } } diff --git a/core/internal/batch/batch_test.go b/core/internal/batch/batch_test.go index 4becccd..63c89f8 100644 --- a/core/internal/batch/batch_test.go +++ b/core/internal/batch/batch_test.go @@ -186,13 +186,18 @@ func TestProcessor_ShouldProcess(t *testing.T) { source := &mockByteSource{data: []byte("helloworld")} proc := NewProcessor(source, nil, 0) - err := proc.Process(entries, sink) + stats, err := proc.Process(entries, sink) require.NoError(t, err) // Only a.txt should be written assert.Contains(t, sink.written, "a.txt") assert.NotContains(t, sink.written, "skip.txt") assert.Equal(t, []byte("hello"), sink.written["a.txt"]) + + // Verify stats + assert.Equal(t, 1, stats.Processed) + assert.Equal(t, 1, stats.Skipped) + assert.Equal(t, uint64(5), stats.TotalBytes) } func TestProcessor_EmptyEntries(t *testing.T) { @@ -201,11 +206,13 @@ func TestProcessor_EmptyEntries(t *testing.T) { sink := newMockSink() proc := NewProcessor(&mockByteSource{data: make([]byte, 100)}, nil, 0) - err := proc.Process(nil, sink) + stats, err := proc.Process(nil, sink) assert.NoError(t, err) + assert.Equal(t, ProcessStats{}, stats) - err = proc.Process([]*Entry{}, sink) + stats, err = proc.Process([]*Entry{}, sink) assert.NoError(t, err) + assert.Equal(t, ProcessStats{}, stats) } // sha256Hash returns a valid SHA256 hash for the given content. diff --git a/core/internal/batch/stats.go b/core/internal/batch/stats.go new file mode 100644 index 0000000..f1da4dc --- /dev/null +++ b/core/internal/batch/stats.go @@ -0,0 +1,20 @@ +package batch + +// ProcessStats contains statistics from a batch processing operation. +type ProcessStats struct { + // Processed is the number of entries successfully written to the sink. + Processed int + + // Skipped is the number of entries skipped (ShouldProcess returned false). + Skipped int + + // TotalBytes is the sum of OriginalSize for all processed entries. + TotalBytes uint64 +} + +// add accumulates stats from another ProcessStats into this one. +func (s *ProcessStats) add(other ProcessStats) { + s.Processed += other.Processed + s.Skipped += other.Skipped + s.TotalBytes += other.TotalBytes +} diff --git a/docs/docs/guides/extraction.md b/docs/docs/guides/extraction.md index 4a685d4..d328561 100644 --- a/docs/docs/guides/extraction.md +++ b/docs/docs/guides/extraction.md @@ -26,7 +26,8 @@ func extractFromRegistry(ref, destDir string) error { } // Extract all files - return archive.CopyDir(destDir, ".") + _, err = archive.CopyDir(destDir, ".") + return err } ``` @@ -46,7 +47,7 @@ if err != nil { defer blobFile.Close() // All Blob methods work on BlobFile -err = blobFile.CopyDir("/dest", ".") +_, err = blobFile.CopyDir("/dest", ".") ``` The examples below use `*Blob` but apply equally to `*BlobFile`. @@ -56,7 +57,7 @@ The examples below use `*Blob` but apply equally to `*BlobFile`. To extract specific files by path, use `CopyTo`: ```go -err := archive.CopyTo("/dest/dir", "config.json", "lib/utils.go", "main.go") +_, err := archive.CopyTo("/dest/dir", "config.json", "lib/utils.go", "main.go") ``` The files are extracted to the destination directory, preserving their relative paths: @@ -69,7 +70,7 @@ To pass options, use `CopyToWithOptions`: ```go paths := []string{"config.json", "lib/utils.go"} -err := archive.CopyToWithOptions("/dest/dir", paths, +_, err := archive.CopyToWithOptions("/dest/dir", paths, blob.CopyWithOverwrite(true), blob.CopyWithPreserveMode(true), ) @@ -81,10 +82,14 @@ To extract all files under a directory prefix, use `CopyDir`: ```go // Extract everything under src/ -err := archive.CopyDir("/dest/dir", "src") +stats, err := archive.CopyDir("/dest/dir", "src") // Extract the entire archive -err = archive.CopyDir("/dest/dir", ".") +stats, err = archive.CopyDir("/dest/dir", ".") + +// stats.FileCount contains the number of files extracted +// stats.TotalBytes contains the total uncompressed size +// stats.Skipped contains the number of files skipped (already exist) ``` Files matching the prefix are extracted to the destination directory with their full archive paths: @@ -96,7 +101,7 @@ Files matching the prefix are extracted to the destination directory with their By default, existing files are skipped. To overwrite: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithOverwrite(true), ) ``` @@ -110,7 +115,7 @@ This is useful when you want to ensure files match the archive contents, even if To preserve file permission modes from the archive: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithPreserveMode(true), ) ``` @@ -122,7 +127,7 @@ Without this option, extracted files use the current umask defaults. To preserve file modification times: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithPreserveTimes(true), ) ``` @@ -130,7 +135,7 @@ err := archive.CopyDir("/dest/dir", ".", ### Both Mode and Times ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithPreserveMode(true), blob.CopyWithPreserveTimes(true), ) @@ -141,7 +146,7 @@ err := archive.CopyDir("/dest/dir", ".", For faster extraction when starting fresh, use `CopyWithCleanDest`: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithCleanDest(true), ) ``` @@ -169,7 +174,7 @@ The clean destination option includes safety checks: To control the number of parallel file writers: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithWorkers(8), // Use 8 parallel workers ) ``` @@ -184,7 +189,7 @@ Values: For remote archives, control the number of concurrent range reads: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithReadConcurrency(8), // 8 concurrent HTTP requests ) ``` @@ -196,7 +201,7 @@ The default is 4 concurrent reads. Higher values reduce latency but increase mem To limit memory usage during parallel reads: ```go -err := archive.CopyDir("/dest/dir", ".", +_, err := archive.CopyDir("/dest/dir", ".", blob.CopyWithReadAheadBytes(64 * 1024 * 1024), // 64MB budget ) ``` @@ -208,7 +213,7 @@ This caps the total size of buffered read-ahead data. Use this when extracting l Extraction errors include the file path and underlying cause: ```go -err := archive.CopyDir("/dest/dir", ".") +_, err := archive.CopyDir("/dest/dir", ".") if err != nil { // Errors are wrapped with path context log.Printf("extraction failed: %v", err) @@ -255,7 +260,8 @@ func extractArchive(archive *blob.Blob, destDir string, opts ExtractOptions) err prefix = "." } - return archive.CopyDir(destDir, prefix, copyOpts...) + _, err = archive.CopyDir(destDir, prefix, copyOpts...) + return err } type ExtractOptions struct { diff --git a/docs/docs/guides/performance-tuning.md b/docs/docs/guides/performance-tuning.md index 6786b54..7c8eb2e 100644 --- a/docs/docs/guides/performance-tuning.md +++ b/docs/docs/guides/performance-tuning.md @@ -100,7 +100,7 @@ These options control parallel extraction via `CopyTo` and `CopyDir`. Controls parallel file writers: ```go -err := archive.CopyDir("/dest", ".", +_, err := archive.CopyDir("/dest", ".", blob.CopyWithWorkers(8), ) ``` @@ -118,7 +118,7 @@ Use 0 for automatic heuristics or negative values for serial processing. Controls concurrent range requests for remote archives: ```go -err := archive.CopyDir("/dest", ".", +_, err := archive.CopyDir("/dest", ".", blob.CopyWithReadConcurrency(8), ) ``` @@ -135,7 +135,7 @@ For high-latency connections (> 100ms), try 8-16 concurrent reads. Limits memory used by buffered parallel reads: ```go -err := archive.CopyDir("/dest", ".", +_, err := archive.CopyDir("/dest", ".", blob.CopyWithReadAheadBytes(32 << 20), // 32 MB ) ``` @@ -218,7 +218,7 @@ archive, err := c.Pull(ctx, ref, blob.PullWithDecoderConcurrency(1), // Single-threaded ) -err = archive.CopyDir("/dest", ".", +_, err = archive.CopyDir("/dest", ".", blob.CopyWithWorkers(2), // Few parallel writers blob.CopyWithReadConcurrency(2), // Few parallel reads blob.CopyWithReadAheadBytes(16 << 20), // 16 MB read budget @@ -247,7 +247,7 @@ archive, err := c.Pull(ctx, ref, blob.PullWithDecoderConcurrency(0), // Max decoder parallelism ) -err = archive.CopyDir("/dest", ".", +_, err = archive.CopyDir("/dest", ".", blob.CopyWithWorkers(4), // Parallel file writing blob.CopyWithReadConcurrency(16), // Many concurrent requests ) @@ -262,7 +262,7 @@ archive, err := c.Pull(ctx, ref, blob.PullWithDecoderConcurrency(0), // Use all cores ) -err = archive.CopyDir("/dest", ".", +_, err = archive.CopyDir("/dest", ".", blob.CopyWithWorkers(0), // Auto-detect workers blob.CopyWithReadConcurrency(8), // Parallel reads blob.CopyWithCleanDest(true), // Skip temp files @@ -294,7 +294,7 @@ archive, err := c.Pull(ctx, ref, blob.PullWithVerifyOnClose(true), // Always verify (default) ) -err = archive.CopyDir("/dest", ".", +_, err = archive.CopyDir("/dest", ".", blob.CopyWithPreserveMode(true), blob.CopyWithPreserveTimes(true), ) @@ -314,7 +314,7 @@ Example timing: ```go start := time.Now() -err := archive.CopyDir("/dest", ".") +_, err := archive.CopyDir("/dest", ".") log.Printf("extraction took %v", time.Since(start)) ``` diff --git a/docs/docs/reference/api.md b/docs/docs/reference/api.md index 6730d74..9a0234f 100644 --- a/docs/docs/reference/api.md +++ b/docs/docs/reference/api.md @@ -659,26 +659,38 @@ ReadDir implements `fs.ReadDirFS`. Returns directory entries sorted by name. #### CopyTo ```go -func (b *Blob) CopyTo(destDir string, paths ...string) error +func (b *Blob) CopyTo(destDir string, paths ...string) (CopyStats, error) ``` -CopyTo extracts specific files to a destination directory. +CopyTo extracts specific files to a destination directory. Returns statistics about the copy operation. #### CopyToWithOptions ```go -func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOption) error +func (b *Blob) CopyToWithOptions(destDir string, paths []string, opts ...CopyOption) (CopyStats, error) ``` -CopyToWithOptions extracts specific files with options. +CopyToWithOptions extracts specific files with options. Returns statistics about the copy operation. #### CopyDir ```go -func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) error +func (b *Blob) CopyDir(destDir, prefix string, opts ...CopyOption) (CopyStats, error) ``` -CopyDir extracts all files under a directory prefix. Use prefix "." for all files. +CopyDir extracts all files under a directory prefix. Use prefix "." for all files. Returns statistics about the copy operation. + +#### CopyStats + +```go +type CopyStats struct { + FileCount int // Number of files successfully copied + TotalBytes uint64 // Sum of original (uncompressed) file sizes + Skipped int // Number of files skipped (already exist without overwrite) +} +``` + +CopyStats contains statistics about a copy operation, returned by CopyTo, CopyToWithOptions, CopyDir, and CopyFile. #### Entry diff --git a/docs/src/pages/index.tsx b/docs/src/pages/index.tsx index 968080b..eb46498 100644 --- a/docs/src/pages/index.tsx +++ b/docs/src/pages/index.tsx @@ -502,7 +502,8 @@ c, _ := blob.NewClient( // Lazy load - only downloads what you read archive, _ := c.Pull(ctx, "ghcr.io/org/configs:v1") -archive.CopyDir("./output", "configs/")`} +stats, _ := archive.CopyDir("./output", "configs/") +fmt.Printf("Extracted %d files\n", stats.FileCount)`} diff --git a/examples/provenance/pull.go b/examples/provenance/pull.go index f101dcf..3b9c2b8 100644 --- a/examples/provenance/pull.go +++ b/examples/provenance/pull.go @@ -196,7 +196,7 @@ func extractArchive(archive *blob.Archive, output string) error { fmt.Printf("Extracting to %s...\n", output) - err := archive.CopyDir(output, "", + stats, err := archive.CopyDir(output, "", blob.CopyWithPreserveMode(true), blob.CopyWithPreserveTimes(true), blob.CopyWithOverwrite(true), @@ -205,7 +205,7 @@ func extractArchive(archive *blob.Archive, output string) error { return fmt.Errorf("extract: %w", err) } - fmt.Printf("Extracted %d files to %s\n", entryCount, output) + fmt.Printf("Extracted %d files to %s\n", stats.FileCount, output) fmt.Println("Verification successful!") return nil diff --git a/integration/archive_test.go b/integration/archive_test.go index bca8f0d..fe6bd48 100644 --- a/integration/archive_test.go +++ b/integration/archive_test.go @@ -407,7 +407,7 @@ func TestCopyTo_SpecificFiles(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyTo(destDir, "root.txt", "dir1/a.txt") + _, err = archive.CopyTo(destDir, "root.txt", "dir1/a.txt") require.NoError(t, err, "CopyTo") // Verify extracted files @@ -448,7 +448,7 @@ func TestCopyTo_WithOverwrite(t *testing.T) { require.NoError(t, os.WriteFile(existing, []byte("original"), 0o644)) // Without overwrite - file should not be overwritten - err = archive.CopyTo(destDir, "hello.txt") + _, err = archive.CopyTo(destDir, "hello.txt") require.NoError(t, err, "CopyTo without overwrite") content, err := os.ReadFile(existing) @@ -456,7 +456,7 @@ func TestCopyTo_WithOverwrite(t *testing.T) { assert.Equal(t, []byte("original"), content) // With overwrite - file should be overwritten - err = archive.CopyToWithOptions(destDir, []string{"hello.txt"}, blob.CopyWithOverwrite(true)) + _, err = archive.CopyToWithOptions(destDir, []string{"hello.txt"}, blob.CopyWithOverwrite(true)) require.NoError(t, err, "CopyTo with overwrite") content, err = os.ReadFile(existing) @@ -490,7 +490,7 @@ func TestCopyTo_WithPreserveMode(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyToWithOptions(destDir, []string{"script.sh"}, blob.CopyWithPreserveMode(true)) + _, err = archive.CopyToWithOptions(destDir, []string{"script.sh"}, blob.CopyWithPreserveMode(true)) require.NoError(t, err, "CopyTo with preserve mode") info, err := os.Stat(filepath.Join(destDir, "script.sh")) @@ -517,7 +517,7 @@ func TestCopyTo_WithPreserveTimes(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyToWithOptions(destDir, []string{"hello.txt"}, blob.CopyWithPreserveTimes(true)) + _, err = archive.CopyToWithOptions(destDir, []string{"hello.txt"}, blob.CopyWithPreserveTimes(true)) require.NoError(t, err, "CopyTo with preserve times") // File should exist (detailed time verification would require known mtime) @@ -543,7 +543,7 @@ func TestCopyDir_Prefix(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyDir(destDir, "dir1") + _, err = archive.CopyDir(destDir, "dir1") require.NoError(t, err, "CopyDir") // dir1 files should exist @@ -576,7 +576,7 @@ func TestCopyDir_All(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyDir(destDir, "") + _, err = archive.CopyDir(destDir, "") require.NoError(t, err, "CopyDir all") assertDirContents(t, destDir, nestedArchive) @@ -606,7 +606,7 @@ func TestCopyDir_WithCleanDest(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Dir(oldFile), 0o755)) require.NoError(t, os.WriteFile(oldFile, []byte("old"), 0o644)) - err = archive.CopyDir(destDir, "dir1", blob.CopyWithCleanDest(true)) + _, err = archive.CopyDir(destDir, "dir1", blob.CopyWithCleanDest(true)) require.NoError(t, err, "CopyDir with clean") // Old file should be gone @@ -645,7 +645,7 @@ func TestCopyDir_WithWorkers(t *testing.T) { require.NoError(t, err, "Pull") destDir := t.TempDir() - err = archive.CopyDir(destDir, "data", blob.CopyWithWorkers(4)) + _, err = archive.CopyDir(destDir, "data", blob.CopyWithWorkers(4)) require.NoError(t, err, "CopyDir with workers") // Verify all files were copied diff --git a/integration/error_test.go b/integration/error_test.go index 31af5b9..8812db4 100644 --- a/integration/error_test.go +++ b/integration/error_test.go @@ -279,7 +279,7 @@ func TestError_CopyTo_InvalidPath(t *testing.T) { destDir := t.TempDir() // CopyTo with invalid paths should silently skip them - err = archive.CopyTo(destDir, "../escape", "hello.txt") + _, err = archive.CopyTo(destDir, "../escape", "hello.txt") require.NoError(t, err, "CopyTo should not error on invalid paths") // But valid file should still be copied diff --git a/types.go b/types.go index 840e67d..7bf8c42 100644 --- a/types.go +++ b/types.go @@ -25,6 +25,9 @@ type SkipCompressionFunc = blobcore.SkipCompressionFunc // CopyOption configures CopyTo and CopyDir operations. type CopyOption = blobcore.CopyOption +// CopyStats contains statistics about a copy operation. +type CopyStats = blobcore.CopyStats + // ByteSource provides random access to the data blob. type ByteSource = blobcore.ByteSource