From 9d2dd436fd9c644a79596a5e15ec4046bd61cc98 Mon Sep 17 00:00:00 2001 From: folbrich Date: Sat, 14 Mar 2026 14:44:54 +0100 Subject: [PATCH 1/4] Refactor assembly pipeline into plan-based architecture Introduce AssemblePlan that separates planning from execution in file assembly. The plan pre-computes all chunk placements (self-seed, file seeds, store fetches, skip-in-place) into a DAG of steps with explicit dependencies, replacing the interleaved sequencer approach. This lays the groundwork for #312 (destination-as-seed) by making assembly sources composable and the planning phase extensible. Key changes: - New AssemblePlan with functional options and step-based execution - Split assembly sources into separate files (fileseed, selfseed, store, skip) - Self-seed matching now uses longestMatchFrom for longer sequences - Plan validation detects stale file seeds before execution - Comprehensive tests for plan generation and in-place detection - Remove sequencer.go, selfseed.go in favor of new plan types Closes #312 --- assemble-fileseed.go | 24 +++ assemble-plan.go | 352 +++++++++++++++++++++++++++++++++++++ assemble-plan_test.go | 344 ++++++++++++++++++++++++++++++++++++ assemble-selfseed.go | 174 ++++++++++++++++++ assemble-skip.go | 20 +++ assemble-step.go | 62 +++++++ assemble-store.go | 31 ++++ assemble.go | 326 ++++++++++++++++------------------ assemble_test.go | 230 ++++++++---------------- cmd/desync/extract_test.go | 5 +- errors.go | 14 ++ extractstats.go | 4 +- fileseed.go | 24 +-- go.mod | 2 +- nullseed.go | 9 - seed.go | 29 --- selfseed.go | 93 ---------- selfseed_test.go | 136 -------------- sequencer.go | 176 ------------------- 19 files changed, 1251 insertions(+), 804 deletions(-) create mode 100644 assemble-fileseed.go create mode 100644 assemble-plan.go create mode 100644 assemble-plan_test.go create mode 100644 assemble-selfseed.go create mode 100644 assemble-skip.go create mode 100644 assemble-step.go create mode 100644 assemble-store.go delete mode 100644 selfseed.go delete mode 100644 selfseed_test.go delete mode 100644 sequencer.go diff --git a/assemble-fileseed.go b/assemble-fileseed.go new file mode 100644 index 0000000..6c193d8 --- /dev/null +++ b/assemble-fileseed.go @@ -0,0 +1,24 @@ +package desync + +import ( + "fmt" + "os" +) + +type fileSeedSource struct { + segment SeedSegment + seed Seed + srcFile string + offset uint64 + length uint64 + isBlank bool +} + +func (s *fileSeedSource) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + blocksize := blocksizeOfFile(f.Name()) + return s.segment.WriteInto(f, s.offset, s.length, blocksize, s.isBlank) +} + +func (s *fileSeedSource) String() string { + return fmt.Sprintf("FileSeed(%s): Copy to [%d:%d]", s.srcFile, s.offset, s.offset+s.length) +} diff --git a/assemble-plan.go b/assemble-plan.go new file mode 100644 index 0000000..1679980 --- /dev/null +++ b/assemble-plan.go @@ -0,0 +1,352 @@ +package desync + +import ( + "errors" + "fmt" + "os" + + "golang.org/x/sync/errgroup" +) + +type PlanOption func(*AssemblePlan) + +func PlanWithConcurrency(n int) PlanOption { + return func(p *AssemblePlan) { + p.concurrency = n + } +} + +func PlanWithSeeds(seeds []Seed) PlanOption { + return func(p *AssemblePlan) { + p.seeds = seeds + } +} + +func PlanWithTargetIsBlank(isBlank bool) PlanOption { + return func(p *AssemblePlan) { + p.targetIsBlank = isBlank + } +} + +// AssemblePlan holds a directed acyclic graph of steps. +type AssemblePlan struct { + idx Index + concurrency int + target string + store Store + seeds []Seed + targetIsBlank bool + + // Placements is an intermediate representation of the target index, + // capturing what source is used to populate each chunk. It mirrors the + // length of the index but a single step can span multiple chunks. + placements []*placement + + selfSeed *selfSeed +} + +type assembleSource interface { + fmt.Stringer + Execute(f *os.File) (copied uint64, cloned uint64, err error) +} + +type placement struct { + source assembleSource + dependsOnStart int // index of another placement this one depends on + dependsOnSize int // number of sequential placements (from dependsOnStart) this depends on +} + +// NewPlan creates a fully populated AssemblePlan. +func NewPlan(name string, idx Index, s Store, opts ...PlanOption) (*AssemblePlan, error) { + p := &AssemblePlan{ + idx: idx, + concurrency: 1, + target: name, + store: s, + targetIsBlank: true, + placements: make([]*placement, len(idx.Chunks)), + } + for _, opt := range opts { + opt(p) + } + + ss, err := newSelfSeed(p.target, p.idx, p.concurrency) + if err != nil { + return nil, err + } + p.selfSeed = ss + + if err := p.generate(); err != nil { + p.Close() + return nil, err + } + return p, nil +} + +// Close releases resources held by the plan. +func (p *AssemblePlan) Close() { + if p.selfSeed != nil { + p.selfSeed.Close() + } +} + +// Validate checks that all file seed placements still match their underlying +// data. Returns a SeedInvalid error if a seed file was modified after its +// index was created. +// TODO: run the verification steps in parallel. +func (p *AssemblePlan) Validate() error { + seen := make(map[*placement]struct{}) + fileMap := make(map[string]*os.File) + defer func() { + for _, f := range fileMap { + f.Close() + } + }() + + invalidSeeds := make(map[Seed]error) + failedFiles := make(map[string]struct{}) + + for _, pl := range p.placements { + if _, ok := seen[pl]; ok { + continue + } + seen[pl] = struct{}{} + + fs, ok := pl.source.(*fileSeedSource) + if !ok || fs.srcFile == "" { + continue + } + + // Skip seeds and files already known to be invalid + if _, ok := invalidSeeds[fs.seed]; ok { + continue + } + if _, ok := failedFiles[fs.srcFile]; ok { + invalidSeeds[fs.seed] = fmt.Errorf("seed file %s could not be opened", fs.srcFile) + continue + } + + if _, ok := fileMap[fs.srcFile]; !ok { + f, err := os.Open(fs.srcFile) + if err != nil { + failedFiles[fs.srcFile] = struct{}{} + invalidSeeds[fs.seed] = err + continue + } + fileMap[fs.srcFile] = f + } + + if err := fs.segment.Validate(fileMap[fs.srcFile]); err != nil { + invalidSeeds[fs.seed] = err + } + } + + if len(invalidSeeds) > 0 { + seeds := make([]Seed, 0, len(invalidSeeds)) + errs := make([]error, 0, len(invalidSeeds)) + for seed, err := range invalidSeeds { + seeds = append(seeds, seed) + errs = append(errs, err) + } + return SeedInvalid{Seeds: seeds, Err: errors.Join(errs...)} + } + return nil +} + +func (p *AssemblePlan) generate() error { + // Mark chunks that are already correct in the target file so they can + // be skipped during assembly. + if !p.targetIsBlank { + f, err := os.Open(p.target) + if err == nil { + var g errgroup.Group + g.SetLimit(p.concurrency) + for i, chunk := range p.idx.Chunks { + g.Go(func() error { + b := make([]byte, chunk.Size) + if _, err := f.ReadAt(b, int64(chunk.Start)); err != nil { + return nil + } + if Digest.Sum(b) == chunk.ID { + p.placements[i] = &placement{source: &skipInPlace{ + start: chunk.Start, + end: chunk.Start + chunk.Size, + }} + } + return nil + }) + } + g.Wait() + f.Close() + + // Merge consecutive in-place chunks into a single placement + // so that Steps() produces one step per run instead of one + // per chunk. This works because Steps() deduplicates by + // pointer identity. + var run *placement + for i, pl := range p.placements { + if pl == nil { + run = nil + continue + } + if _, ok := pl.source.(*skipInPlace); !ok { + run = nil + continue + } + if run == nil { + run = pl + continue + } + // Extend the existing run and share the pointer + run.source.(*skipInPlace).end = p.idx.Chunks[i].Start + p.idx.Chunks[i].Size + p.placements[i] = run + } + } + } + + // Find all matches in file itself. As it's populated, sections can be + // copied to other chunks. This involves depending on earlier steps + // before chunks can be copied within the file. + for i := 0; i < len(p.idx.Chunks); i++ { + if p.placements[i] != nil { + continue // Already filled + } + + start, n := p.selfSeed.longestMatchFrom(p.idx.Chunks, i) + if n < 1 { + continue + } + + // Repeat the same placement for all chunks in the sequence. + // We dedup sequences later. + pl := &placement{} + + // We can use up to n chunks from the seed, find out how much + // we can actually use without overwriting any existing placements + // in the list. + var ( + to = i + size int + ) + for range n { + if p.placements[i] != nil { + break + } + + p.placements[i] = pl + i++ + size++ + } + i-- // compensate for the outer loop's i++ + + // Update the step with the potentially adjusted length + pl.source = p.selfSeed.getSegment(start, to, size) + pl.dependsOnStart = start + pl.dependsOnSize = size + } + + // Check file seeds for matches in unfilled positions. + for _, seed := range p.seeds { + for i := 0; i < len(p.idx.Chunks); { + if p.placements[i] != nil { + i++ + continue + } + + // Count consecutive unfilled positions to bound the match. + available := 0 + for j := i; j < len(p.idx.Chunks) && p.placements[j] == nil; j++ { + available++ + } + + n, segment := seed.LongestMatchWith(p.idx.Chunks[i : i+available]) + if n < 1 { + i++ + continue + } + + offset := p.idx.Chunks[i].Start + last := p.idx.Chunks[i+n-1] + length := last.Start + last.Size - offset + + pl := &placement{ + source: &fileSeedSource{ + segment: segment, + seed: seed, + srcFile: segment.FileName(), + offset: offset, + length: length, + isBlank: p.targetIsBlank, + }, + } + + for j := i; j < i+n; j++ { + p.placements[j] = pl + } + i += n + } + } + + // Fill any gaps in the file by copying from the store. + for i := range p.placements { + if p.placements[i] != nil { + continue + } + p.placements[i] = &placement{ + source: ©FromStore{ + store: p.store, + chunk: p.idx.Chunks[i], + }, + } + } + + // We now have a fully populated list of placements. Some are + // duplicates, spanning multiple chunks. Dependencies are only defined + // forward, like chunk-A needs chunk-B to be written first, etc. + + return nil +} + +func (p *AssemblePlan) Steps() []*PlanStep { + // Create a step for every unique placement, counting how many + // index chunks each step covers. + stepsPerPlacement := make(map[*placement]*PlanStep) + for _, pl := range p.placements { + step, ok := stepsPerPlacement[pl] + if !ok { + step = &PlanStep{ + source: pl.source, + } + stepsPerPlacement[pl] = step + } + step.numChunks++ + } + + // Link the steps together. Use a seen set to avoid redundant work + // when the same placement pointer spans multiple chunks. + linked := make(map[*placement]struct{}, len(stepsPerPlacement)) + for _, pl := range p.placements { + if _, ok := linked[pl]; ok { + continue + } + linked[pl] = struct{}{} + + for i := pl.dependsOnStart; i < pl.dependsOnStart+pl.dependsOnSize; i++ { + stepsPerPlacement[pl].addDependency(stepsPerPlacement[p.placements[i]]) + stepsPerPlacement[p.placements[i]].addDependent(stepsPerPlacement[pl]) + } + } + + // Make a slice of steps, preserving the order + steps := make([]*PlanStep, 0, len(stepsPerPlacement)) + for _, pl := range p.placements { + s, ok := stepsPerPlacement[pl] + if !ok { + continue + } + steps = append(steps, s) + delete(stepsPerPlacement, pl) + } + + return steps +} diff --git a/assemble-plan_test.go b/assemble-plan_test.go new file mode 100644 index 0000000..238f532 --- /dev/null +++ b/assemble-plan_test.go @@ -0,0 +1,344 @@ +package desync + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSelfSeedPlanSteps(t *testing.T) { + tests := map[string]struct { + index Index + expected []string + }{ + "all unique chunks": { + index: indexSequence(0x01, 0x02, 0x03), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + "single chunk": { + index: indexSequence(0x01), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + }, + }, + "repeated pair at end": { + // Sequence: 01 02 03 01 02 01 02 + // Positions 0,1 copy from 5,6; positions 3,4 copy from 5,6; + // positions 2,5,6 come from store. + index: indexSequence(0x01, 0x02, 0x03, 0x01, 0x02, 0x01, 0x02), + expected: []string{ + "SelfSeed: Copy [500:700] to [0:200]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [200:300]", + "SelfSeed: Copy [500:700] to [300:500]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [500:600]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [600:700]", + }, + }, + "full duplicate sequence": { + // Sequence: 01 02 03 01 02 03 + // Positions 0-2 copy from 3-5; positions 3-5 come from store. + index: indexSequence(0x01, 0x02, 0x03, 0x01, 0x02, 0x03), + expected: []string{ + "SelfSeed: Copy [300:600] to [0:300]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [300:400]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [400:500]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [500:600]", + }, + }, + "same chunk repeated": { + // Sequence: 01 01 01 + // Position 2 comes from store, then positions 0 and 1 each + // self-seed from position 2. + index: indexSequence(0x01, 0x01, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "SelfSeed: Copy [200:300] to [100:200]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + "single repeated chunk": { + // Sequence: 01 02 01 + // Position 0 copies from 2; positions 1,2 come from store. + index: indexSequence(0x01, 0x02, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + plan, err := NewPlan("test", test.index, nil) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + require.Equal(t, test.expected, got) + }) + } +} + +func TestInPlaceChunkDetection(t *testing.T) { + // Create chunk data and compute their IDs + data1 := make([]byte, 100) + data1[0] = 0x01 + id1 := Digest.Sum(data1) + + data2 := make([]byte, 100) + data2[0] = 0x02 + id2 := Digest.Sum(data2) + + data3 := make([]byte, 100) + data3[0] = 0x03 + id3 := Digest.Sum(data3) + + idx := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + {ID: id3, Start: 200, Size: 100}, + }, + } + + // Create a target file where chunks 0 and 2 match but chunk 1 does not + target := filepath.Join(t.TempDir(), "target") + f, err := os.Create(target) + require.NoError(t, err) + + _, err = f.Write(data1) // chunk 0: correct + require.NoError(t, err) + _, err = f.Write(make([]byte, 100)) // chunk 1: wrong data + require.NoError(t, err) + _, err = f.Write(data3) // chunk 2: correct + require.NoError(t, err) + f.Close() + + plan, err := NewPlan(target, idx, nil, PlanWithTargetIsBlank(false)) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + + cid2 := ChunkID(id2) + expected := []string{ + "InPlace: Skip [0:100]", + fmt.Sprintf("Store: Copy %s to [100:200]", &cid2), + "InPlace: Skip [200:300]", + } + require.Equal(t, expected, got) + + // Subtest: all chunks match in-place — consecutive ones should merge + t.Run("consecutive merge", func(t *testing.T) { + target2 := filepath.Join(t.TempDir(), "target2") + f2, err := os.Create(target2) + require.NoError(t, err) + _, err = f2.Write(data1) + require.NoError(t, err) + _, err = f2.Write(data2) + require.NoError(t, err) + _, err = f2.Write(data3) + require.NoError(t, err) + f2.Close() + + plan2, err := NewPlan(target2, idx, nil, PlanWithTargetIsBlank(false)) + require.NoError(t, err) + defer plan2.Close() + + steps2 := plan2.Steps() + got2 := make([]string, len(steps2)) + for i, s := range steps2 { + got2[i] = s.source.String() + } + + expected2 := []string{ + "InPlace: Skip [0:300]", + } + require.Equal(t, expected2, got2) + }) +} + +func TestFileSeedPlanSteps(t *testing.T) { + tests := map[string]struct { + target Index + seed Index + expected []string + }{ + "basic matching": { + // Target: 01, 02, 03, 04 + // Seed: 02, 03 + // Chunks 1-2 from seed, 0 and 3 from store. + target: indexSequence(0x01, 0x02, 0x03, 0x04), + seed: indexSequence(0x02, 0x03), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "FileSeed(seed): Copy to [100:300]", + "Store: Copy 0400000000000000000000000000000000000000000000000000000000000000 to [300:400]", + }, + }, + "all from seed": { + // Target: 01, 02, 03 + // Seed: 01, 02, 03 + // One seed step covering all. + target: indexSequence(0x01, 0x02, 0x03), + seed: indexSequence(0x01, 0x02, 0x03), + expected: []string{ + "FileSeed(seed): Copy to [0:300]", + }, + }, + "no match": { + // Target: 01, 02 + // Seed: 05, 06 + // Both from store. + target: indexSequence(0x01, 0x02), + seed: indexSequence(0x05, 0x06), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + }, + }, + "self-seed priority": { + // Target: 01, 02, 01 + // Seed: 01, 02, 01 + // Self-seed fills position 0 (copy from position 2), + // seed fills positions 1-2 (matching seed chunks 02, 01). + target: indexSequence(0x01, 0x02, 0x01), + seed: indexSequence(0x01, 0x02, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "FileSeed(seed): Copy to [100:300]", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + seed, err := NewIndexSeed("test", "seed", test.seed) + require.NoError(t, err) + + plan, err := NewPlan("test", test.target, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + require.Equal(t, test.expected, got) + }) + } +} + +func TestFileSeedValidation(t *testing.T) { + // Create two chunks with known data and compute their IDs + data1 := make([]byte, 100) + data1[0] = 0xAA + id1 := Digest.Sum(data1) + + data2 := make([]byte, 100) + data2[0] = 0xBB + id2 := Digest.Sum(data2) + + seedIndex := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + }, + } + + // Target index matches the seed exactly + targetIndex := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + }, + } + + t.Run("valid seed", func(t *testing.T) { + seedFile := filepath.Join(t.TempDir(), "seed") + f, err := os.Create(seedFile) + require.NoError(t, err) + _, err = f.Write(data1) + require.NoError(t, err) + _, err = f.Write(data2) + require.NoError(t, err) + f.Close() + + seed, err := NewIndexSeed("target", seedFile, seedIndex) + require.NoError(t, err) + + plan, err := NewPlan("target", targetIndex, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + require.NoError(t, plan.Validate()) + }) + + t.Run("invalid seed", func(t *testing.T) { + seedFile := filepath.Join(t.TempDir(), "seed") + f, err := os.Create(seedFile) + require.NoError(t, err) + _, err = f.Write(data1) + require.NoError(t, err) + _, err = f.Write(data2) + require.NoError(t, err) + f.Close() + + seed, err := NewIndexSeed("target", seedFile, seedIndex) + require.NoError(t, err) + + plan, err := NewPlan("target", targetIndex, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + // Corrupt the seed file after the plan was created + err = os.WriteFile(seedFile, make([]byte, 200), 0644) + require.NoError(t, err) + + err = plan.Validate() + require.Error(t, err) + + var seedErr SeedInvalid + require.ErrorAs(t, err, &seedErr) + require.Equal(t, []Seed{seed}, seedErr.Seeds) + }) + + t.Run("null seed skipped", func(t *testing.T) { + // Create a null chunk index — data is all zeros + nullData := make([]byte, 100) + nullID := Digest.Sum(nullData) + + nullTargetIndex := Index{ + Chunks: []IndexChunk{ + {ID: nullID, Start: 0, Size: 100}, + }, + } + + // Use a null seed (FileName() returns "", so Validate skips it) + ns := &nullChunkSeed{id: nullID} + defer ns.close() + + plan, err := NewPlan("target", nullTargetIndex, nil, PlanWithSeeds([]Seed{ns})) + require.NoError(t, err) + defer plan.Close() + + require.NoError(t, plan.Validate()) + }) +} diff --git a/assemble-selfseed.go b/assemble-selfseed.go new file mode 100644 index 0000000..d86ce2d --- /dev/null +++ b/assemble-selfseed.go @@ -0,0 +1,174 @@ +package desync + +import ( + "fmt" + "io" + "os" +) + +type selfSeed struct { + file string + index Index + pos map[ChunkID][]int + canReflink bool + readers chan *os.File +} + +func newSelfSeed(file string, index Index, n int) (*selfSeed, error) { + s := &selfSeed{ + file: file, + pos: make(map[ChunkID][]int), + index: index, + canReflink: CanClone(file, file), + readers: make(chan *os.File, n), + } + for i, c := range s.index.Chunks { + s.pos[c.ID] = append(s.pos[c.ID], i) + } + // Only open read handles if the file exists. If it doesn't, self-seed + // segments won't be created since there's nothing to match. + if _, err := os.Stat(file); err == nil { + for range n { + f, err := os.Open(file) + if err != nil { + s.Close() + return nil, err + } + s.readers <- f + } + } + return s, nil +} + +func (s *selfSeed) Close() { + if s.readers == nil { + return + } + close(s.readers) + for f := range s.readers { + f.Close() + } + s.readers = nil +} + +// longestMatchFrom returns the longest sequence of matching chunks after a +// given starting position. +func (s *selfSeed) longestMatchFrom(chunks []IndexChunk, startPos int) (int, int) { + if len(chunks) <= startPos || len(s.index.Chunks) == 0 { + return 0, 0 + } + pos, ok := s.pos[chunks[startPos].ID] + if !ok { + return 0, 0 + } + // From every position of chunks[startPos] in the source, find a slice of + // matching chunks. Then return the longest of those slices. + var ( + maxStart int + maxLen int + limit int + ) + if !s.canReflink { + // Limit the maximum number of chunks, in a single sequence, to + // avoid having jobs that are too unbalanced. However, if + // reflinks are supported, we don't limit it to make it faster + // and take less space. + limit = 100 + } + for _, p := range pos { + if p <= startPos { + continue + } + start, n := s.maxMatchFrom(chunks[startPos:], p, limit) + // Clamp to prevent source [p, p+n) overlapping destination [startPos, startPos+n) + if max := p - startPos; n > max { + n = max + } + if n >= maxLen { // Using >= here to get the last (longest) match + maxStart = start + maxLen = n + } + if limit != 0 && limit == maxLen { + break + } + } + return maxStart, maxLen +} + +func (s *selfSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) (int, int) { + if len(chunks) == 0 { + return 0, 0 + } + var ( + sp int + dp = p + ) + for { + if limit != 0 && sp == limit { + break + } + if dp >= len(s.index.Chunks) || sp >= len(chunks) { + break + } + if chunks[sp].ID != s.index.Chunks[dp].ID { + break + } + dp++ + sp++ + } + return p, dp - p +} + +func (s *selfSeed) getSegment(from, to, length int) *selfSeedSegment { + return &selfSeedSegment{ + seed: s, + from: from, + to: to, + length: length, + } +} + +type selfSeedSegment struct { + seed *selfSeed + from int // Index of the first chunk to copy from + to int // Index of the first chunk to copy to + length int // Number of chunks to copy +} + +func (s *selfSeedSegment) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + srcStart := s.seed.index.Chunks[s.from].Start + dstStart := s.seed.index.Chunks[s.to].Start + lastFrom := s.from + s.length - 1 + length := s.seed.index.Chunks[lastFrom].Start + s.seed.index.Chunks[lastFrom].Size - srcStart + + blocksize := blocksizeOfFile(f.Name()) + + // Use reflinks if supported and blocks are aligned + if s.seed.canReflink && srcStart%blocksize == dstStart%blocksize { + return 0, length, CloneRange(f, f, srcStart, length, dstStart) + } + + // Borrow a read handle from the pool + src := <-s.seed.readers + defer func() { s.seed.readers <- src }() + + if _, err := src.Seek(int64(srcStart), io.SeekStart); err != nil { + return 0, 0, err + } + if _, err := f.Seek(int64(dstStart), io.SeekStart); err != nil { + return 0, 0, err + } + _, err = io.CopyBuffer(f, io.LimitReader(src, int64(length)), make([]byte, 64*1024)) + return length, 0, err +} + +func (s *selfSeedSegment) String() string { + fromStart := s.seed.index.Chunks[s.from].Start + toStart := s.seed.index.Chunks[s.to].Start + lastFromChunkIndex := s.from + s.length - 1 + lastToChunkIndex := s.to + s.length - 1 + fromEnd := s.seed.index.Chunks[lastFromChunkIndex].Start + s.seed.index.Chunks[lastFromChunkIndex].Size + toEnd := s.seed.index.Chunks[lastToChunkIndex].Start + s.seed.index.Chunks[lastToChunkIndex].Size + + return fmt.Sprintf("SelfSeed: Copy [%d:%d] to [%d:%d]", fromStart, fromEnd, toStart, toEnd) +} diff --git a/assemble-skip.go b/assemble-skip.go new file mode 100644 index 0000000..52cdb0a --- /dev/null +++ b/assemble-skip.go @@ -0,0 +1,20 @@ +package desync + +import ( + "fmt" + "os" +) + +// skipInPlace skips data chunks that are already in place. +type skipInPlace struct { + start uint64 + end uint64 +} + +func (s *skipInPlace) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + return 0, 0, nil +} + +func (s *skipInPlace) String() string { + return fmt.Sprintf("InPlace: Skip [%d:%d]", s.start, s.end) +} diff --git a/assemble-step.go b/assemble-step.go new file mode 100644 index 0000000..9701b62 --- /dev/null +++ b/assemble-step.go @@ -0,0 +1,62 @@ +package desync + +import ( + "iter" + "maps" +) + +type PlanStep struct { + source assembleSource + + // numChunks is the number of index chunks this step covers. + numChunks int + + // Steps that depend on this one. + dependents stepSet + + // Steps that this one depends on. + dependencies stepSet +} + +// addDependent adds a step that depends on this one. +func (n *PlanStep) addDependent(other *PlanStep) { + if n.dependents == nil { + n.dependents = newStepSet() + } + n.dependents.add(other) +} + +// addDependency adds a step that this one depends on. +func (n *PlanStep) addDependency(other *PlanStep) { + if n.dependencies == nil { + n.dependencies = newStepSet() + } + n.dependencies.add(other) +} + +// ready returns true when all dependencies have been resolved. +func (n *PlanStep) ready() bool { + return n.dependencies.len() == 0 +} + +type stepSet map[*PlanStep]struct{} + +func newStepSet() stepSet { + return make(stepSet) +} + +func (s stepSet) add(n *PlanStep) { + s[n] = struct{}{} +} + +func (s stepSet) remove(n *PlanStep) { + delete(s, n) +} + +func (s stepSet) Each() iter.Seq[*PlanStep] { + return maps.Keys(s) +} + +func (s stepSet) len() int { + return len(s) +} diff --git a/assemble-store.go b/assemble-store.go new file mode 100644 index 0000000..2e9ae64 --- /dev/null +++ b/assemble-store.go @@ -0,0 +1,31 @@ +package desync + +import ( + "fmt" + "os" +) + +type copyFromStore struct { + store Store + chunk IndexChunk +} + +func (s *copyFromStore) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + chunk, err := s.store.GetChunk(s.chunk.ID) + if err != nil { + return 0, 0, err + } + b, err := chunk.Data() + if err != nil { + return 0, 0, err + } + if s.chunk.Size != uint64(len(b)) { + return 0, 0, fmt.Errorf("unexpected size for chunk %s", s.chunk.ID) + } + _, err = f.WriteAt(b, int64(s.chunk.Start)) + return 0, 0, err +} + +func (s *copyFromStore) String() string { + return fmt.Sprintf("Store: Copy %v to [%d:%d]", s.chunk.ID.String(), s.chunk.Start, s.chunk.Start+s.chunk.Size) +} diff --git a/assemble.go b/assemble.go index ca13980..48e1843 100644 --- a/assemble.go +++ b/assemble.go @@ -2,9 +2,13 @@ package desync import ( "context" + "errors" "fmt" - "golang.org/x/sync/errgroup" "os" + "slices" + "sync" + + "golang.org/x/sync/errgroup" ) // InvalidSeedAction represents the action that we will take if a seed @@ -25,61 +29,6 @@ type AssembleOptions struct { InvalidSeedAction InvalidSeedAction } -// writeChunk tries to write a chunk by looking at the self seed, if it is already existing in the -// destination file or by taking it from the store. The in-place check runs first to avoid unnecessary -// writes. If the target already has the correct data, no write is performed. -func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Store, stats *ExtractStats, isBlank bool) error { - // If we operate on an existing file there's a good chance we already - // have the data written for this chunk. Let's read it from disk and - // compare to what is expected. This is checked first to avoid rewriting - // data that is already correct, even for chunks available in the selfSeed. - if !isBlank { - b := make([]byte, c.Size) - if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum == c.ID { - // Record we kept this chunk in the file (when using in-place extract) - stats.incChunksInPlace() - return nil - } - } - - // If we already took this chunk from the store we can reuse it by looking - // into the selfSeed. - if segment := ss.getChunk(c.ID); segment != nil { - copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank) - if err != nil { - return err - } - stats.addBytesCopied(copied) - stats.addBytesCloned(cloned) - return nil - } - - // Record this chunk having been pulled from the store - stats.incChunksFromStore() - // Pull the (compressed) chunk from the store - chunk, err := s.GetChunk(c.ID) - if err != nil { - return err - } - b, err := chunk.Data() - if err != nil { - return err - } - // Might as well verify the chunk size while we're at it - if c.Size != uint64(len(b)) { - return fmt.Errorf("unexpected size for chunk %s", c.ID.String()) - } - // Write the decompressed chunk into the file at the right position - if _, err = f.WriteAt(b, int64(c.Start)); err != nil { - return err - } - return nil -} - // AssembleFile re-assembles a file based on a list of index chunks. It runs n // goroutines, creating one filehandle for the file "name" per goroutine // and writes to the file simultaneously. If progress is provided, it'll be @@ -89,18 +38,11 @@ func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Stor // differ from the expected content. This can be used to complete partly // written files. func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, options AssembleOptions) (*ExtractStats, error) { - type Job struct { - segment IndexSegment - source SeedSegment - } var ( - attempt = 1 - in = make(chan Job) isBlank bool isBlkDevice bool - pb ProgressBar + attempt = 1 ) - g, ctx := errgroup.WithContext(ctx) // Initialize stats to be gathered during extraction stats := &ExtractStats{ @@ -147,138 +89,178 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] defer ns.close() seeds = append([]Seed{ns}, seeds...) - // Start a self-seed which will become usable once chunks are written contiguously - // beginning at position 0. There is no need to add this to the seeds list because - // when we create a plan it will be empty. - ss, err := newSelfSeed(name, idx) - if err != nil { - return stats, err - } - // Record the total number of seeds and blocksize in the stats stats.Seeds = len(seeds) stats.Blocksize = blocksize - // Start the workers, each having its own filehandle to write concurrently - for i := 0; i < options.N; i++ { - f, err := os.OpenFile(name, os.O_RDWR, 0666) - if err != nil { - return stats, fmt.Errorf("unable to open file %s, %s", name, err) - } - defer f.Close() - g.Go(func() error { - for job := range in { - pb.Add(job.segment.lengthChunks()) - if job.source != nil { - // If we have a seedSegment we expect 1 or more chunks between - // the start and the end of this segment. - stats.addChunksFromSeed(uint64(job.segment.lengthChunks())) - offset := job.segment.start() - length := job.segment.lengthBytes() - copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank) - if err != nil { - return err - } - - // Validate that the written chunks are exactly what we were expecting. - // Because the seed might point to a RW location, if the data changed - // while we were extracting an index, we might end up writing to the - // destination some unexpected values. - for _, c := range job.segment.chunks() { - b := make([]byte, c.Size) - if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum != c.ID { - if options.InvalidSeedAction == InvalidSeedActionRegenerate { - // Try harder before giving up and aborting - Log.WithField("ID", c.ID.String()).Info("The seed may have changed during processing, trying to take the chunk from the self seed or the store") - if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { - return err - } - } else { - return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name) - } - } - } - - stats.addBytesCopied(copied) - stats.addBytesCloned(cloned) - // Record this segment's been written in the self-seed to make it - // available going forward - ss.add(job.segment) - continue - } + // Create the plan +retry: + plan, err := NewPlan(name, idx, s, + PlanWithConcurrency(options.N), + PlanWithSeeds(seeds), + PlanWithTargetIsBlank(isBlank), + ) + if err != nil { + return stats, err + } - // If we don't have a seedSegment we expect an IndexSegment with just - // a single chunk, that we can take from either the selfSeed, from the - // destination file, or from the store. - if len(job.segment.chunks()) != 1 { - panic("Received an unexpected segment that doesn't contain just a single chunk") - } - c := job.segment.chunks()[0] + // Validate the seed indexes provided and potentially regenerate them + if err := plan.Validate(); err != nil { + // Close the invalid plan + plan.Close() - if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { - return err - } + var seedError SeedInvalid + if errors.As(err, &seedError) { - // Record this chunk's been written in the self-seed. - // Even if we already confirmed that this chunk is present in the - // self-seed, we still need to record it as being written, otherwise - // the self-seed position pointer doesn't advance as we expect. - ss.add(job.segment) - } - return nil - }) - } - - // Let the sequencer break up the index into segments, create and validate a plan, - // feed the workers, and stop if there are any errors - seq := NewSeedSequencer(idx, seeds...) - plan := seq.Plan() - for { - validatingPrefix := fmt.Sprintf("Attempt %d: Validating ", attempt) - if err := plan.Validate(ctx, options.N, NewProgressBar(validatingPrefix)); err != nil { - // This plan has at least one invalid seed switch options.InvalidSeedAction { case InvalidSeedActionBailOut: return stats, err case InvalidSeedActionRegenerate: - Log.WithError(err).Info("Unable to use one of the chosen seeds, regenerating it") - if err := seq.RegenerateInvalidSeeds(ctx, options.N, attempt); err != nil { - return stats, err + Log.WithError(err).Info("Unable to use one or more seeds, regenerating them") + for i, s := range seedError.Seeds { + if err := s.RegenerateIndex(ctx, options.N, attempt, i+1); err != nil { + return stats, err + } } + attempt++ + goto retry case InvalidSeedActionSkip: - // Recreate the plan. This time the seed marked as invalid will be skipped - Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it") + Log.WithError(err).Infof("Unable to use one or more seeds, skipping them") + seeds = slices.DeleteFunc(seeds, func(s Seed) bool { + return slices.Contains(seedError.Seeds, s) + }) + goto retry default: panic("Unhandled InvalidSeedAction") } + } + return stats, err + } + defer plan.Close() - attempt += 1 - seq.Rewind() - plan = seq.Plan() - continue + // Generate the plan steps necessary to build the target + steps := plan.Steps() + if len(steps) == 0 { + return stats, nil + } + + // Split the steps into those that are independent and those that + // require other steps to complete first. + var ( + ready []*PlanStep + delayed = make(stepSet) + ) + for _, step := range steps { + if step.ready() { + ready = append(ready, step) + } else { + delayed.add(step) } - // Found a valid plan - break } - pb = NewProgressBar(fmt.Sprintf("Attempt %d: Assembling ", attempt)) + // Set up progress bar + pb := NewProgressBar(fmt.Sprintf("Attempt %d: Assembling ", attempt)) pb.SetTotal(len(idx.Chunks)) pb.Start() defer pb.Finish() -loop: - for _, segment := range plan { - select { - case <-ctx.Done(): - break loop - case in <- Job{segment.indexSegment, segment.source}: - } + // Create two channels, one for steps that can run and one for those + // that are complete. + var ( + inProgress = make(chan *PlanStep, len(steps)) + completed = make(chan *PlanStep, options.N) + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(options.N) + + // Bring up the workers + for range options.N { + g.Go(func() error { + f, err := os.OpenFile(name, os.O_RDWR, 0666) + if err != nil { + return fmt.Errorf("unable to open file %s, %s", name, err) + } + defer f.Close() + for { + select { + case step, ok := <-inProgress: + if !ok { + return nil + } + copied, cloned, err := step.source.Execute(f) + if err != nil { + return err + } + // Update byte-level stats + stats.addBytesCopied(copied) + stats.addBytesCloned(cloned) + // Update chunk-level stats based on source type + switch step.source.(type) { + case *copyFromStore: + stats.incChunksFromStore() + case *skipInPlace: + stats.addChunksInPlace(uint64(step.numChunks)) + case *fileSeedSource, *selfSeedSegment: + stats.addChunksFromSeed(uint64(step.numChunks)) + } + select { + case completed <- step: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + // Populate all steps that are ready to be executed + for _, step := range ready { + inProgress <- step } - close(in) - return stats, g.Wait() + // Start the dispatch goroutine which runs the plan. This should be + // outside the errgroup as it'll only be stopped once the workers are + // done. + var wg sync.WaitGroup + wg.Go(func() { + for step := range completed { + pb.Add(step.numChunks) + + // Go through all the steps that are blocked by this + // one and remove the dependency. If all deps have been + // removed, send them for processing and remove them + // from the ready list. + for b := range step.dependents.Each() { + b.dependencies.remove(step) + if b.ready() { + delayed.remove(b) + inProgress <- b + } + } + + // If there are no more delayed steps, close the work queue. + if delayed.len() == 0 { + close(inProgress) + break + } + } + + // Drain the completed queue, updating the progress bar for any + // steps that finished after the work queue was closed. + for step := range completed { + pb.Add(step.numChunks) + } + }) + + // Wait for the workers to complete + err = g.Wait() + + // Stop the dispatch goroutine + close(completed) + wg.Wait() + + return stats, err } diff --git a/assemble_test.go b/assemble_test.go index 88e2cb7..b28b3fa 100644 --- a/assemble_test.go +++ b/assemble_test.go @@ -10,27 +10,37 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// Build an index with a pre-determined set of (potentially repeated) chunks +func indexSequence(ids ...uint8) Index { + var ( + chunks = make([]IndexChunk, len(ids)) + start uint64 = 0 + size uint64 = 100 + ) + for i, id := range ids { + chunks[i] = IndexChunk{Start: start, Size: size, ID: ChunkID{id}} + start += size + } + return Index{Chunks: chunks} +} + func TestExtract(t *testing.T) { // Make a test file that's guaranteed to have duplicate chunks. b, err := os.ReadFile("testdata/chunker.input") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for range 4 { // Replicate it a few times to make sure we get dupes b = append(b, b...) } b = append(b, make([]byte, 2*ChunkSizeMaxDefault)...) // want to have at least one null-chunk in the input in, err := os.CreateTemp("", "in") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer os.RemoveAll(in.Name()) - if _, err := io.Copy(in, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(in, bytes.NewReader(b)) + require.NoError(t, err) in.Close() // Record the checksum of the input file, used to compare to the output later @@ -44,76 +54,50 @@ func TestExtract(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Chop up the input file into a (temporary) local store store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } - if err := ChopFile(context.Background(), in.Name(), index.Chunks, s, 10, NewProgressBar("")); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + err = ChopFile(context.Background(), in.Name(), index.Chunks, s, 10, NewProgressBar("")) + require.NoError(t, err) // Make a blank store - used to test a case where no chunk *should* be requested blankstore := t.TempDir() bs, err := NewLocalStore(blankstore, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Prepare output files for each test - first a non-existing one - out1, err := os.CreateTemp("", "out1") - if err != nil { - t.Fatal(err) - } - os.Remove(out1.Name()) + outdir := t.TempDir() + out1 := filepath.Join(outdir, "out1") // This one is a complete file matching what we expect at the end out2, err := os.CreateTemp("", "out2") - if err != nil { - t.Fatal(err) - } - if _, err := io.Copy(out2, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = io.Copy(out2, bytes.NewReader(b)) + require.NoError(t, err) out2.Close() defer os.Remove(out2.Name()) // Incomplete or damaged file that has most but not all data out3, err := os.CreateTemp("", "out3") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) b[0] ^= 0xff // flip some bits b[len(b)-1] ^= 0xff b = append(b, 0) // make it longer - if _, err := io.Copy(out3, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(out3, bytes.NewReader(b)) + require.NoError(t, err) out3.Close() defer os.Remove(out3.Name()) - // At this point we have the data needed for the test setup - // in - Temp file that represents the original input file - // inSub - MD5 of the input file - // index - Index file for the input file - // s - Local store containing the chunks needed to rebuild the input file - // bs - A blank local store, all GetChunk fail on it - // out1 - Just a non-existing file that gets assembled - // out2 - The output file already fully complete, no GetChunk should be needed - // out3 - Partial/damaged file with most, but not all data correct - // seedIndex + seedFile - Seed file to help assemble the input tests := map[string]struct { outfile string store Store - seed []Seed }{ - "extract to new file": {outfile: out1.Name(), store: s}, + "extract to new file": {outfile: out1, store: s}, "extract to complete file": {outfile: out2.Name(), store: bs}, "extract to incomplete file": {outfile: out3.Name(), store: s}, } @@ -121,19 +105,16 @@ func TestExtract(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { defer os.Remove(test.outfile) - if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, + _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, AssembleOptions{10, InvalidSeedActionBailOut}, - ); err != nil { - t.Fatal(err) - } - b, err := os.ReadFile(test.outfile) - if err != nil { - t.Fatal(err) - } - outSum := md5.Sum(b) - if inSum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + ) + require.NoError(t, err) + + outBytes, err := os.ReadFile(test.outfile) + require.NoError(t, err) + + outSum := md5.Sum(outBytes) + assert.Equal(t, inSum, outSum, "checksum of extracted file doesn't match expected") }) } } @@ -142,9 +123,7 @@ func TestSeed(t *testing.T) { // Prepare different types of data slices that'll be used to assemble target // and seed files with varying amount of duplication data1, err := os.ReadFile("testdata/chunker.input") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) null := make([]byte, 4*ChunkSizeMaxDefault) rand1 := make([]byte, 4*ChunkSizeMaxDefault) rand.Read(rand1) @@ -155,9 +134,7 @@ func TestSeed(t *testing.T) { store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Define tests with files with different content, by building files out // of sets of byte slices to create duplication or not between the target and @@ -201,13 +178,10 @@ func TestSeed(t *testing.T) { t.Run(name, func(t *testing.T) { // Build the destination file so we can chunk it dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dstBytes := join(test.target...) - if _, err := io.Copy(dst, bytes.NewReader(dstBytes)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(dst, bytes.NewReader(dstBytes)) + require.NoError(t, err) dst.Close() defer os.Remove(dst.Name()) @@ -222,25 +196,19 @@ func TestSeed(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Chop up the input file into the store - if err := ChopFile(context.Background(), dst.Name(), dstIndex.Chunks, s, 10, NewProgressBar("")); err != nil { - t.Fatal(err) - } + err = ChopFile(context.Background(), dst.Name(), dstIndex.Chunks, s, 10, NewProgressBar("")) + require.NoError(t, err) // Build the seed files and indexes then populate the array of seeds var seeds []Seed for _, f := range test.seeds { seedFile, err := os.CreateTemp("", "seed") - if err != nil { - t.Fatal(err) - } - if _, err := io.Copy(seedFile, bytes.NewReader(join(f...))); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = io.Copy(seedFile, bytes.NewReader(join(f...))) + require.NoError(t, err) seedFile.Close() defer os.Remove(seedFile.Name()) seedIndex, _, err := IndexFromFile( @@ -250,29 +218,20 @@ func TestSeed(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) seed, err := NewIndexSeed(dst.Name(), seedFile.Name(), seedIndex) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) seeds = append(seeds, seed) } - if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, + _, err = AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, AssembleOptions{10, InvalidSeedActionBailOut}, - ); err != nil { - t.Fatal(err) - } + ) + require.NoError(t, err) b, err := os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) outSum := md5.Sum(b) - if dstSum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + assert.Equal(t, dstSum, outSum, "checksum of extracted file doesn't match expected") }) } @@ -286,9 +245,7 @@ func TestSelfSeedInPlace(t *testing.T) { store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Build a number of fake chunks that can then be used in the test in any order type rawChunk struct { @@ -303,9 +260,7 @@ func TestSelfSeedInPlace(t *testing.T) { b := make([]byte, size) rand.Read(b) chunk := NewChunk(b) - if err = s.StoreChunk(chunk); err != nil { - t.Fatal(err) - } + require.NoError(t, s.StoreChunk(chunk)) chunks[i] = rawChunk{chunk.ID(), b} } @@ -354,40 +309,28 @@ func TestSelfSeedInPlace(t *testing.T) { // Build a temp target file pre-populated with the correct content dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer os.Remove(dst.Name()) _, err = dst.Write(b) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dst.Close() // Extract the file stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, AssembleOptions{1, InvalidSeedActionBailOut}, ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Compare the checksums to that of the input data b, err = os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) outSum := md5.Sum(b) - if sum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + assert.Equal(t, sum, outSum, "checksum of extracted file doesn't match expected") // All chunks must be in-place. The in-place check in writeChunk // runs before the self-seed lookup, so repeated chunks are not // re-written from the self-seed. - if stats.ChunksInPlace != uint64(len(test.index)) { - t.Fatalf("expected all %d chunks in-place, got %d", len(test.index), stats.ChunksInPlace) - } + assert.Equal(t, uint64(len(test.index)), stats.ChunksInPlace, "expected all chunks in-place") }) } @@ -410,38 +353,3 @@ func readCaibxFile(t *testing.T, indexLocation string) (idx Index) { require.NoError(t, err) return idx } - -func TestExtractWithNonStaticSeeds(t *testing.T) { - n := 10 - outDir := t.TempDir() - out := filepath.Join(outDir, "out") - - // Test a seed that is initially valid, but becomes corrupted halfway through - // the extraction operation - MockValidate = true - - store, err := NewLocalStore("testdata/blob2.store", StoreOptions{}) - require.NoError(t, err) - defer store.Close() - - index := readCaibxFile(t, "testdata/blob2.caibx") - - var seeds []Seed - srcIndex := readCaibxFile(t, "testdata/blob2_corrupted.caibx") - seed, err := NewIndexSeed(out, "testdata/blob2_corrupted", srcIndex) - seeds = append(seeds, seed) - - // Test that the MockValidate works as expected - seq := NewSeedSequencer(index, seeds...) - plan := seq.Plan() - err = plan.Validate(context.Background(), n, NullProgressBar{}) - require.NoError(t, err) - - options := AssembleOptions{n, InvalidSeedActionRegenerate} - _, err = AssembleFile(context.Background(), out, index, store, seeds, options) - require.NoError(t, err) - - //Test the output - err = VerifyIndex(context.Background(), out, index, n, NullProgressBar{}) - require.NoError(t, err) -} diff --git a/cmd/desync/extract_test.go b/cmd/desync/extract_test.go index d857ac4..105ee1d 100644 --- a/cmd/desync/extract_test.go +++ b/cmd/desync/extract_test.go @@ -68,9 +68,10 @@ func TestExtractCommand(t *testing.T) { // Explicitly set blob1 seed because seed-dir skips a seed if it's the same index file we gave in input. {"extract with seed directory without skipping invalid seeds", []string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, - // Same as above, no need for `--skip-invalid-seeds` + // The plan generator processes seeds in order, so the corrupted seed + // may get placements that fail validation. Use --skip-invalid-seeds. {"extract with multiple corrupted seeds", - []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1}, {"extract with single seed that has all the expected chunks", []string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, // blob2_corrupted is a corrupted blob that doesn't match its seed index. We regenerate the seed index to match diff --git a/errors.go b/errors.go index 45425c6..5d0fa12 100644 --- a/errors.go +++ b/errors.go @@ -44,3 +44,17 @@ func (e InvalidFormat) Error() string { type Interrupted struct{} func (e Interrupted) Error() string { return "interrupted" } + +// SeedInvalid is returned when a seed's data doesn't match its index. +type SeedInvalid struct { + Seeds []Seed + Err error +} + +func (e SeedInvalid) Error() string { + return fmt.Sprintf("invalid seed: %s", e.Err) +} + +func (e SeedInvalid) Unwrap() error { + return e.Err +} diff --git a/extractstats.go b/extractstats.go index 9deefcf..229f7b9 100644 --- a/extractstats.go +++ b/extractstats.go @@ -22,8 +22,8 @@ func (s *ExtractStats) incChunksFromStore() { atomic.AddUint64(&s.ChunksFromStore, 1) } -func (s *ExtractStats) incChunksInPlace() { - atomic.AddUint64(&s.ChunksInPlace, 1) +func (s *ExtractStats) addChunksInPlace(n uint64) { + atomic.AddUint64(&s.ChunksInPlace, n) } func (s *ExtractStats) addChunksFromSeed(n uint64) { diff --git a/fileseed.go b/fileseed.go index be1c8a6..10791ab 100644 --- a/fileseed.go +++ b/fileseed.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "sync" ) // FileSeed is used to copy or clone blocks from an existing index+blob during @@ -15,8 +14,6 @@ type FileSeed struct { index Index pos map[ChunkID][]int canReflink bool - isInvalid bool - mu sync.RWMutex } // NewIndexSeed initializes a new seed that uses an existing index and its blob @@ -26,7 +23,6 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error pos: make(map[ChunkID][]int), index: index, canReflink: CanClone(dstFile, srcFile), - isInvalid: false, } for i, c := range s.index.Chunks { s.pos[c.ID] = append(s.pos[c.ID], i) @@ -39,12 +35,7 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error // if reflinks are not supported. If there is no match, it returns a length of zero // and a nil SeedSegment. func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - s.mu.RLock() - isInvalid := s.isInvalid - s.mu.RUnlock() - - // isInvalid can be concurrently read or written. Use a mutex to avoid a race - if len(chunks) == 0 || len(s.index.Chunks) == 0 || isInvalid { + if len(chunks) == 0 || len(s.index.Chunks) == 0 { return 0, nil } pos, ok := s.pos[chunks[0].ID] @@ -87,7 +78,6 @@ func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seed } s.index = index - s.SetInvalid(false) s.pos = make(map[ChunkID][]int, len(s.index.Chunks)) for i, c := range s.index.Chunks { s.pos[c.ID] = append(s.pos[c.ID], i) @@ -96,18 +86,6 @@ func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seed return nil } -func (s *FileSeed) SetInvalid(value bool) { - s.mu.Lock() - defer s.mu.Unlock() - s.isInvalid = value -} - -func (s *FileSeed) IsInvalid() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.isInvalid -} - // Returns a slice of chunks from the seed. Compares chunks from position 0 // with seed chunks starting at p. A "limit" value of zero means that there is no limit. func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk { diff --git a/go.mod b/go.mod index 50d0a7e..81d97b5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/folbricht/desync -go 1.24.0 +go 1.25.0 require ( cloud.google.com/go/storage v1.30.1 diff --git a/nullseed.go b/nullseed.go index 0cc45a9..92d363b 100644 --- a/nullseed.go +++ b/nullseed.go @@ -77,15 +77,6 @@ func (s *nullChunkSeed) RegenerateIndex(ctx context.Context, n int, attempt int, panic("A nullseed can't be regenerated") } -func (s *nullChunkSeed) SetInvalid(value bool) { - panic("A nullseed is never expected to be invalid") -} - -func (s *nullChunkSeed) IsInvalid() bool { - // A nullseed is never expected to be invalid - return false -} - type nullChunkSection struct { from, to uint64 blockfile *os.File diff --git a/seed.go b/seed.go index ffc49f3..b5fa88c 100644 --- a/seed.go +++ b/seed.go @@ -14,8 +14,6 @@ const DefaultBlockSize = 4096 type Seed interface { LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error - SetInvalid(value bool) - IsInvalid() bool } // SeedSegment represents a matching range between a Seed and a file being @@ -27,30 +25,3 @@ type SeedSegment interface { Validate(file *os.File) error WriteInto(dst *os.File, offset, end, blocksize uint64, isBlank bool) (copied uint64, cloned uint64, err error) } - -// IndexSegment represents a contiguous section of an index which is used when -// assembling a file from seeds. first/last are positions in the index. -type IndexSegment struct { - index Index - first, last int -} - -func (s IndexSegment) lengthChunks() int { - return s.last - s.first + 1 -} - -func (s IndexSegment) lengthBytes() uint64 { - return s.end() - s.start() -} - -func (s IndexSegment) start() uint64 { - return s.index.Chunks[s.first].Start -} - -func (s IndexSegment) end() uint64 { - return s.index.Chunks[s.last].Start + s.index.Chunks[s.last].Size -} - -func (s IndexSegment) chunks() []IndexChunk { - return s.index.Chunks[s.first : s.last+1] -} diff --git a/selfseed.go b/selfseed.go deleted file mode 100644 index 86a5220..0000000 --- a/selfseed.go +++ /dev/null @@ -1,93 +0,0 @@ -package desync - -import ( - "context" - "sync" -) - -// FileSeed is used to populate a contiguous seed during extraction in order -// to copy/clone ranges that were written to the output file earlier. This is -// to potentially dedup/reflink duplicate chunks or ranges of chunks within the -// same file. -type selfSeed struct { - file string - index Index - pos map[ChunkID][]int - canReflink bool - written int - mu sync.RWMutex - cache map[int]int -} - -// newSelfSeed initializes a new seed based on the file being extracted -func newSelfSeed(file string, index Index) (*selfSeed, error) { - s := selfSeed{ - file: file, - pos: make(map[ChunkID][]int), - index: index, - canReflink: CanClone(file, file), - cache: make(map[int]int), - } - return &s, nil -} - -// add records a new segment that's been written to the file. Since only contiguous -// ranges of chunks are considered and writing happens concurrently, the segment -// written here will not be usable until all earlier chunks have been written as -// well. -func (s *selfSeed) add(segment IndexSegment) { - s.mu.Lock() - defer s.mu.Unlock() - - // Make a record of this segment in the cache since those could come in - // out-of-order - s.cache[segment.first] = segment.last + 1 - - // Advance pos until we find a chunk we don't yet have recorded while recording - // the chunk positions we do have in the position map used to find seed matches. - // Since it's guaranteed that the numbers are only increasing, we drop old numbers - // from the cache map to keep its size to a minimum and only store out-of-sequence - // numbers - for { - // See if we can advance the write pointer in the self-seed which requires - // consecutive chunks. If we don't have the next segment yet, just keep it - // in the cache until we do. - next, ok := s.cache[s.written] - if !ok { - break - } - // Record all chunks in this segment as written by adding them to the position map - for i := s.written; i < next; i++ { - chunk := s.index.Chunks[i] - s.pos[chunk.ID] = append(s.pos[chunk.ID], i) - } - delete(s.cache, s.written) - s.written = next - } -} - -// getChunk returns a segment with the requested chunk ID. If selfSeed doesn't -// have the requested chunk, nil will be returned. -func (s *selfSeed) getChunk(id ChunkID) SeedSegment { - s.mu.RLock() - pos, ok := s.pos[id] - s.mu.RUnlock() - if !ok { - return nil - } - first := pos[0] - return newFileSeedSegment(s.file, s.index.Chunks[first:first+1], s.canReflink) -} - -func (s *selfSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { - panic("A selfSeed can't be regenerated") -} - -func (s *selfSeed) SetInvalid(value bool) { - panic("A selfSeed is never expected to be invalid") -} - -func (s *selfSeed) IsInvalid() bool { - // A selfSeed is never expected to be invalid - return false -} diff --git a/selfseed_test.go b/selfseed_test.go deleted file mode 100644 index 93c38e8..0000000 --- a/selfseed_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package desync - -import ( - "context" - "crypto/md5" - "crypto/rand" - "os" - "testing" -) - -func TestSelfSeed(t *testing.T) { - // Setup a temporary store - store := t.TempDir() - - s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } - - // Build a number of fake chunks that can then be used in the test in any order - type rawChunk struct { - id ChunkID - data []byte - } - size := 1024 - numChunks := 10 - chunks := make([]rawChunk, numChunks) - - for i := range numChunks { - b := make([]byte, size) - rand.Read(b) - chunk := NewChunk(b) - if err = s.StoreChunk(chunk); err != nil { - t.Fatal(err) - } - chunks[i] = rawChunk{chunk.ID(), b} - } - - // Define tests with files with different content, by building files out - // of sets of byte slices to create duplication or not between the target and - // its seeds. Also define a min/max of bytes that should be cloned (from the - // self-seed). That number can vary since even with 1 worker goroutine there - // another feeder goroutine which can influence timings/results a little. - tests := map[string]struct { - index []int - minCloned int - maxCloned int - }{ - "single chunk": { - index: []int{0}, - minCloned: 0, - maxCloned: 0, - }, - "repeating single chunk": { - index: []int{0, 0, 0, 0, 0}, - minCloned: 3 * size, - maxCloned: 4 * size, - }, - "repeating chunk sequence": { - index: []int{0, 1, 2, 0, 1, 2, 2}, - minCloned: 4 * size, - maxCloned: 4 * size, - }, - "repeating chunk sequence mid file": { - index: []int{1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3}, - minCloned: 7 * size, - maxCloned: 7 * size, - }, - "repeating chunk sequence reversed": { - index: []int{0, 1, 2, 2, 1, 0}, - minCloned: 2 * size, - maxCloned: 3 * size, - }, - "non-repeating chunks": { - index: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - minCloned: 0, - maxCloned: 0, - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - // Build an index from the target chunks - var idx Index - var b []byte - for i, p := range test.index { - chunk := IndexChunk{ - ID: chunks[p].id, - Start: uint64(i * size), - Size: uint64(size), - } - b = append(b, chunks[p].data...) - idx.Chunks = append(idx.Chunks, chunk) - } - - // Calculate the expected checksum - sum := md5.Sum(b) - - // Build a temp target file to extract into - dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } - defer os.Remove(dst.Name()) - defer dst.Close() - - // Extract the file - stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, - AssembleOptions{1, InvalidSeedActionBailOut}, - ) - if err != nil { - t.Fatal(err) - } - - // Compare the checksums to that of the input data - b, err = os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } - outSum := md5.Sum(b) - if sum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } - - // Compare to the expected number of bytes copied or cloned from the seed - fromSeed := int(stats.BytesCopied + stats.BytesCloned) - if fromSeed < test.minCloned { - t.Fatalf("expected min %d bytes copied/cloned from self-seed, got %d", test.minCloned, fromSeed) - } - if fromSeed > test.maxCloned { - t.Fatalf("expected max %d bytes copied/cloned from self-seed, got %d", test.maxCloned, fromSeed) - } - }) - } - -} diff --git a/sequencer.go b/sequencer.go deleted file mode 100644 index 1248a06..0000000 --- a/sequencer.go +++ /dev/null @@ -1,176 +0,0 @@ -package desync - -import ( - "context" - "golang.org/x/sync/errgroup" - "os" -) - -// SeedSequencer is used to find sequences of chunks from seed files when assembling -// a file from an index. Using seeds reduces the need to download and decompress chunks -// from chunk stores. It also enables the use of reflinking/cloning of sections of -// files from a seed file where supported to reduce disk usage. -type SeedSequencer struct { - seeds []Seed - index Index - current int -} - -// SeedSegmentCandidate represent a single segment that we expect to use -// in a Plan -type SeedSegmentCandidate struct { - seed Seed - source SeedSegment - indexSegment IndexSegment -} - -type Plan []SeedSegmentCandidate - -var MockValidate = false - -// NewSeedSequencer initializes a new sequencer from a number of seeds. -func NewSeedSequencer(idx Index, src ...Seed) *SeedSequencer { - return &SeedSequencer{ - seeds: src, - index: idx, - } -} - -// Plan returns a new possible plan, representing an ordered list of -// segments that can be used to re-assemble the requested file -func (r *SeedSequencer) Plan() (plan Plan) { - for { - seed, segment, source, done := r.Next() - plan = append(plan, SeedSegmentCandidate{seed, source, segment}) - if done { - break - } - } - return plan -} - -// Next returns a sequence of index chunks (from the target index) and the -// longest matching segment from one of the seeds. If source is nil, no -// match was found in the seeds and the chunk needs to be retrieved from a -// store. If done is true, the sequencer is complete. -func (r *SeedSequencer) Next() (seed Seed, segment IndexSegment, source SeedSegment, done bool) { - var ( - max uint64 - advance = 1 - ) - for _, s := range r.seeds { - n, m := s.LongestMatchWith(r.index.Chunks[r.current:]) - if n > 0 && m.Size() > max { - seed = s - source = m - advance = n - max = m.Size() - } - } - - segment = IndexSegment{index: r.index, first: r.current, last: r.current + advance - 1} - r.current += advance - return seed, segment, source, r.current >= len(r.index.Chunks) -} - -// Rewind resets the current target index to the beginning. -func (r *SeedSequencer) Rewind() { - r.current = 0 -} - -// isFileSeed returns true if this segment is pointing to a fileSeed -func (s SeedSegmentCandidate) isFileSeed() bool { - // We expect an empty filename when using nullSeeds - return s.source != nil && s.source.FileName() != "" -} - -// RegenerateInvalidSeeds regenerates the index to match the unexpected seed content -func (r *SeedSequencer) RegenerateInvalidSeeds(ctx context.Context, n int, attempt int) error { - seedNumber := 1 - for _, s := range r.seeds { - if s.IsInvalid() { - if err := s.RegenerateIndex(ctx, n, attempt, seedNumber); err != nil { - return err - } - seedNumber += 1 - } - } - return nil -} - -// Validate validates a proposed plan by checking if all the chosen chunks -// are correctly provided from the seeds. In case a seed has invalid chunks, the -// entire seed is marked as invalid and an error is returned. -func (p Plan) Validate(ctx context.Context, n int, pb ProgressBar) (err error) { - type Job struct { - candidate SeedSegmentCandidate - file *os.File - } - var ( - in = make(chan Job) - fileMap = make(map[string]*os.File) - ) - if MockValidate { - // This is used in the automated tests to mock a plan that is valid - return nil - } - length := 0 - for _, s := range p { - if !s.isFileSeed() { - continue - } - length += s.indexSegment.lengthChunks() - } - pb.SetTotal(length) - pb.Start() - defer pb.Finish() - // Share a single file descriptor per seed for all the goroutines - for _, s := range p { - if !s.isFileSeed() { - continue - } - name := s.source.FileName() - if _, present := fileMap[name]; present { - continue - } else { - file, err := os.Open(name) - if err != nil { - // We were not able to open the seed. Mark it as invalid and return - s.seed.SetInvalid(true) - return err - } - fileMap[name] = file - defer file.Close() - } - } - g, ctx := errgroup.WithContext(ctx) - // Concurrently validate all the chunks in this plan - for range n { - g.Go(func() error { - for job := range in { - if err := job.candidate.source.Validate(job.file); err != nil { - job.candidate.seed.SetInvalid(true) - return err - } - pb.Add(job.candidate.indexSegment.lengthChunks()) - } - return nil - }) - } - -loop: - for _, s := range p { - if !s.isFileSeed() { - // This is not a fileSeed, we have nothing to validate - continue - } - select { - case <-ctx.Done(): - break loop - case in <- Job{s, fileMap[s.source.FileName()]}: - } - } - close(in) - - return g.Wait() -} From 757ae03508b352921ad0d015cc325d26e72a18a0 Mon Sep 17 00:00:00 2001 From: folbrich Date: Sat, 14 Mar 2026 15:33:38 +0100 Subject: [PATCH 2/4] Run validation concurrently --- assemble-plan.go | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/assemble-plan.go b/assemble-plan.go index 1679980..469cdab 100644 --- a/assemble-plan.go +++ b/assemble-plan.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "os" + "sync" "golang.org/x/sync/errgroup" ) @@ -93,8 +94,14 @@ func (p *AssemblePlan) Close() { // Validate checks that all file seed placements still match their underlying // data. Returns a SeedInvalid error if a seed file was modified after its // index was created. -// TODO: run the verification steps in parallel. func (p *AssemblePlan) Validate() error { + // Phase 1 — Sequential: collect unique fileSeedSource placements, open + // their backing files, and build a list of items to validate. + type validateItem struct { + fs *fileSeedSource + file *os.File + } + seen := make(map[*placement]struct{}) fileMap := make(map[string]*os.File) defer func() { @@ -106,6 +113,7 @@ func (p *AssemblePlan) Validate() error { invalidSeeds := make(map[Seed]error) failedFiles := make(map[string]struct{}) + var items []validateItem for _, pl := range p.placements { if _, ok := seen[pl]; ok { continue @@ -136,11 +144,26 @@ func (p *AssemblePlan) Validate() error { fileMap[fs.srcFile] = f } - if err := fs.segment.Validate(fileMap[fs.srcFile]); err != nil { - invalidSeeds[fs.seed] = err - } + items = append(items, validateItem{fs: fs, file: fileMap[fs.srcFile]}) + } + + // Phase 2 — Concurrent: validate each segment in parallel. + var mu sync.Mutex + var g errgroup.Group + g.SetLimit(p.concurrency) + for _, item := range items { + g.Go(func() error { + if err := item.fs.segment.Validate(item.file); err != nil { + mu.Lock() + invalidSeeds[item.fs.seed] = err + mu.Unlock() + } + return nil + }) } + g.Wait() + // Phase 3 — Sequential: build the error result. if len(invalidSeeds) > 0 { seeds := make([]Seed, 0, len(invalidSeeds)) errs := make([]error, 0, len(invalidSeeds)) From dc2f0303798375a696d6da5aecca80aa7dd0e89f Mon Sep 17 00:00:00 2001 From: folbrich Date: Wed, 18 Mar 2026 13:18:59 +0100 Subject: [PATCH 3/4] Simplify seed logic and improve plan performance --- assemble-plan.go | 69 +++++++++++++++++++++++++------------------ assemble-selfseed.go | 50 +++++++++++++------------------ fileseed.go | 70 ++++++++++++++++++++++++++++---------------- nullseed.go | 24 +++++++-------- seed.go | 3 +- 5 files changed, 120 insertions(+), 96 deletions(-) diff --git a/assemble-plan.go b/assemble-plan.go index 469cdab..f4db61d 100644 --- a/assemble-plan.go +++ b/assemble-plan.go @@ -235,7 +235,7 @@ func (p *AssemblePlan) generate() error { continue // Already filled } - start, n := p.selfSeed.longestMatchFrom(p.idx.Chunks, i) + start, n := p.selfSeed.LongestMatchFrom(p.idx.Chunks, i) if n < 1 { continue } @@ -263,50 +263,63 @@ func (p *AssemblePlan) generate() error { i-- // compensate for the outer loop's i++ // Update the step with the potentially adjusted length - pl.source = p.selfSeed.getSegment(start, to, size) + seedOffset := p.idx.Chunks[start].Start + last := p.idx.Chunks[start+size-1] + length := last.Start + last.Size - seedOffset + offset := p.idx.Chunks[to].Start + + pl.source = p.selfSeed.GetSegment(seedOffset, offset, length) pl.dependsOnStart = start pl.dependsOnSize = size } // Check file seeds for matches in unfilled positions. for _, seed := range p.seeds { - for i := 0; i < len(p.idx.Chunks); { + for i := 0; i < len(p.idx.Chunks); i++ { if p.placements[i] != nil { - i++ continue } - // Count consecutive unfilled positions to bound the match. - available := 0 - for j := i; j < len(p.idx.Chunks) && p.placements[j] == nil; j++ { - available++ - } - - n, segment := seed.LongestMatchWith(p.idx.Chunks[i : i+available]) + seedOffset, n := seed.LongestMatchFrom(p.idx.Chunks, i) if n < 1 { - i++ continue } - offset := p.idx.Chunks[i].Start - last := p.idx.Chunks[i+n-1] - length := last.Start + last.Size - offset - - pl := &placement{ - source: &fileSeedSource{ - segment: segment, - seed: seed, - srcFile: segment.FileName(), - offset: offset, - length: length, - isBlank: p.targetIsBlank, - }, + // Repeat the same placement for all chunks in the sequence. + // We dedup sequences later. + pl := &placement{} + + // We can use up to n chunks from the seed, find out how much + // we can actually use without overwriting any existing placements + // in the list. + var ( + to = i + size int + ) + for range n { + if p.placements[i] != nil { + break + } + p.placements[i] = pl + i++ + size++ } + i-- // compensate for the outer loop's i++ - for j := i; j < i+n; j++ { - p.placements[j] = pl + // Update the step with the potentially adjusted length + offset := p.idx.Chunks[to].Start + last := p.idx.Chunks[to+size-1] + length := last.Start + last.Size - offset + segment := seed.GetSegment(seedOffset, length) + + pl.source = &fileSeedSource{ + segment: segment, + seed: seed, + srcFile: segment.FileName(), + offset: offset, + length: length, + isBlank: p.targetIsBlank, } - i += n } } diff --git a/assemble-selfseed.go b/assemble-selfseed.go index d86ce2d..7005b68 100644 --- a/assemble-selfseed.go +++ b/assemble-selfseed.go @@ -51,9 +51,9 @@ func (s *selfSeed) Close() { s.readers = nil } -// longestMatchFrom returns the longest sequence of matching chunks after a +// LongestMatchFrom returns the longest sequence of matching chunks after a // given starting position. -func (s *selfSeed) longestMatchFrom(chunks []IndexChunk, startPos int) (int, int) { +func (s *selfSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (int, int) { if len(chunks) <= startPos || len(s.index.Chunks) == 0 { return 0, 0 } @@ -119,56 +119,46 @@ func (s *selfSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) (int, int return p, dp - p } -func (s *selfSeed) getSegment(from, to, length int) *selfSeedSegment { +func (s *selfSeed) GetSegment(srcOffset, dstOffset, size uint64) *selfSeedSegment { return &selfSeedSegment{ - seed: s, - from: from, - to: to, - length: length, + seed: s, + srcOffset: srcOffset, + dstOffset: dstOffset, + size: size, } } type selfSeedSegment struct { - seed *selfSeed - from int // Index of the first chunk to copy from - to int // Index of the first chunk to copy to - length int // Number of chunks to copy + seed *selfSeed + srcOffset uint64 + dstOffset uint64 + size uint64 } func (s *selfSeedSegment) Execute(f *os.File) (copied uint64, cloned uint64, err error) { - srcStart := s.seed.index.Chunks[s.from].Start - dstStart := s.seed.index.Chunks[s.to].Start - lastFrom := s.from + s.length - 1 - length := s.seed.index.Chunks[lastFrom].Start + s.seed.index.Chunks[lastFrom].Size - srcStart - blocksize := blocksizeOfFile(f.Name()) // Use reflinks if supported and blocks are aligned - if s.seed.canReflink && srcStart%blocksize == dstStart%blocksize { - return 0, length, CloneRange(f, f, srcStart, length, dstStart) + if s.seed.canReflink && s.srcOffset%blocksize == s.dstOffset%blocksize { + return 0, s.size, CloneRange(f, f, s.srcOffset, s.size, s.dstOffset) } // Borrow a read handle from the pool src := <-s.seed.readers defer func() { s.seed.readers <- src }() - if _, err := src.Seek(int64(srcStart), io.SeekStart); err != nil { + if _, err := src.Seek(int64(s.srcOffset), io.SeekStart); err != nil { return 0, 0, err } - if _, err := f.Seek(int64(dstStart), io.SeekStart); err != nil { + if _, err := f.Seek(int64(s.dstOffset), io.SeekStart); err != nil { return 0, 0, err } - _, err = io.CopyBuffer(f, io.LimitReader(src, int64(length)), make([]byte, 64*1024)) - return length, 0, err + _, err = io.CopyBuffer(f, io.LimitReader(src, int64(s.size)), make([]byte, 64*1024)) + return s.size, 0, err } func (s *selfSeedSegment) String() string { - fromStart := s.seed.index.Chunks[s.from].Start - toStart := s.seed.index.Chunks[s.to].Start - lastFromChunkIndex := s.from + s.length - 1 - lastToChunkIndex := s.to + s.length - 1 - fromEnd := s.seed.index.Chunks[lastFromChunkIndex].Start + s.seed.index.Chunks[lastFromChunkIndex].Size - toEnd := s.seed.index.Chunks[lastToChunkIndex].Start + s.seed.index.Chunks[lastToChunkIndex].Size - - return fmt.Sprintf("SelfSeed: Copy [%d:%d] to [%d:%d]", fromStart, fromEnd, toStart, toEnd) + return fmt.Sprintf("SelfSeed: Copy [%d:%d] to [%d:%d]", + s.srcOffset, s.srcOffset+s.size, + s.dstOffset, s.dstOffset+s.size) } diff --git a/fileseed.go b/fileseed.go index 10791ab..bb2408f 100644 --- a/fileseed.go +++ b/fileseed.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "sort" ) // FileSeed is used to copy or clone blocks from an existing index+blob during @@ -30,24 +31,24 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error return &s, nil } -// LongestMatchWith returns the longest sequence of chunks anywhere in Source -// that match `chunks` starting at chunks[0], limiting the maximum number of chunks -// if reflinks are not supported. If there is no match, it returns a length of zero -// and a nil SeedSegment. -func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - if len(chunks) == 0 || len(s.index.Chunks) == 0 { - return 0, nil +// LongestMatchFrom returns the longest sequence of chunks anywhere in the seed +// that match chunks starting at chunks[startPos]. It returns the byte offset +// of the match in the seed and the number of matching chunks. Returns (0, 0) +// if there is no match. +func (s *FileSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, int) { + if startPos >= len(chunks) || len(s.index.Chunks) == 0 { + return 0, 0 } - pos, ok := s.pos[chunks[0].ID] + pos, ok := s.pos[chunks[startPos].ID] if !ok { - return 0, nil + return 0, 0 } - // From every position of chunks[0] in the source, find a slice of - // matching chunks. Then return the longest of those slices. + // From every position of chunks[startPos] in the source, find a run of + // matching chunks. Then return the longest of those runs. var ( - match []IndexChunk - max int - limit int + bestSeedPos int + maxLen int + limit int ) if !s.canReflink { // Limit the maximum number of chunks, in a single sequence, to avoid @@ -57,16 +58,34 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { limit = 100 } for _, p := range pos { - m := s.maxMatchFrom(chunks, p, limit) - if len(m) > max { - match = m - max = len(m) + seedPos, n := s.maxMatchFrom(chunks[startPos:], p, limit) + if n > maxLen { + bestSeedPos = seedPos + maxLen = n } - if limit != 0 && limit == max { + if limit != 0 && limit == maxLen { break } } - return max, newFileSeedSegment(s.srcFile, match, s.canReflink) + if maxLen == 0 { + return 0, 0 + } + return s.index.Chunks[bestSeedPos].Start, maxLen +} + +// GetSegment constructs a SeedSegment for a matched range identified by its +// byte offset and size in the seed. +func (s *FileSeed) GetSegment(offset, size uint64) SeedSegment { + i := sort.Search(len(s.index.Chunks), func(j int) bool { + return s.index.Chunks[j].Start >= offset + }) + var covered uint64 + end := i + for end < len(s.index.Chunks) && covered < size { + covered += s.index.Chunks[end].Size + end++ + } + return newFileSeedSegment(s.srcFile, s.index.Chunks[i:end], s.canReflink) } func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { @@ -86,11 +105,12 @@ func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seed return nil } -// Returns a slice of chunks from the seed. Compares chunks from position 0 -// with seed chunks starting at p. A "limit" value of zero means that there is no limit. -func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk { +// maxMatchFrom compares chunks from position 0 with seed chunks starting at p. +// Returns (p, count) where p is the seed start and count is the number of +// matching chunks. A "limit" value of zero means that there is no limit. +func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) (int, int) { if len(chunks) == 0 { - return nil + return 0, 0 } var ( sp int @@ -109,7 +129,7 @@ func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexCh dp++ sp++ } - return s.index.Chunks[p:dp] + return p, dp - p } type fileSeedSegment struct { diff --git a/nullseed.go b/nullseed.go index 92d363b..464b19d 100644 --- a/nullseed.go +++ b/nullseed.go @@ -42,9 +42,9 @@ func (s *nullChunkSeed) close() error { return nil } -func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - if len(chunks) == 0 { - return 0, nil +func (s *nullChunkSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, int) { + if startPos >= len(chunks) { + return 0, 0 } var ( n int @@ -53,7 +53,7 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) if !s.canReflink { limit = 100 } - for _, c := range chunks { + for _, c := range chunks[startPos:] { if limit != 0 && limit == n { break } @@ -62,12 +62,12 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) } n++ } - if n == 0 { - return 0, nil - } - return n, &nullChunkSection{ - from: chunks[0].Start, - to: chunks[n-1].Start + chunks[n-1].Size, + return 0, n +} + +func (s *nullChunkSeed) GetSegment(offset, size uint64) SeedSegment { + return &nullChunkSection{ + size: size, blockfile: s.blockfile, canReflink: s.canReflink, } @@ -78,7 +78,7 @@ func (s *nullChunkSeed) RegenerateIndex(ctx context.Context, n int, attempt int, } type nullChunkSection struct { - from, to uint64 + size uint64 blockfile *os.File canReflink bool } @@ -92,7 +92,7 @@ func (s *nullChunkSection) FileName() string { return "" } -func (s *nullChunkSection) Size() uint64 { return s.to - s.from } +func (s *nullChunkSection) Size() uint64 { return s.size } func (s *nullChunkSection) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { if length != s.Size() { diff --git a/seed.go b/seed.go index b5fa88c..2692e87 100644 --- a/seed.go +++ b/seed.go @@ -12,7 +12,8 @@ const DefaultBlockSize = 4096 // another index+blob that present on disk already and is used to copy or clone // existing chunks or blocks into the target from. type Seed interface { - LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) + LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, int) + GetSegment(offset, size uint64) SeedSegment RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error } From f2ddded52d2a90f016f7c7de7976911a22989f9f Mon Sep 17 00:00:00 2001 From: folbrich Date: Wed, 18 Mar 2026 13:34:14 +0100 Subject: [PATCH 4/4] reorg --- assemble-fileseed.go | 233 +++++++++++++++++++++++++++++++++++++++++ fileseed.go | 239 ------------------------------------------- 2 files changed, 233 insertions(+), 239 deletions(-) delete mode 100644 fileseed.go diff --git a/assemble-fileseed.go b/assemble-fileseed.go index 6c193d8..149cd5d 100644 --- a/assemble-fileseed.go +++ b/assemble-fileseed.go @@ -1,10 +1,243 @@ package desync import ( + "context" "fmt" + "io" "os" + "sort" ) +// FileSeed is used to copy or clone blocks from an existing index+blob during +// file extraction. +type FileSeed struct { + srcFile string + index Index + pos map[ChunkID][]int + canReflink bool +} + +// NewIndexSeed initializes a new seed that uses an existing index and its blob +func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error) { + s := FileSeed{ + srcFile: srcFile, + pos: make(map[ChunkID][]int), + index: index, + canReflink: CanClone(dstFile, srcFile), + } + for i, c := range s.index.Chunks { + s.pos[c.ID] = append(s.pos[c.ID], i) + } + return &s, nil +} + +// LongestMatchFrom returns the longest sequence of chunks anywhere in the seed +// that match chunks starting at chunks[startPos]. It returns the byte offset +// of the match in the seed and the number of matching chunks. Returns (0, 0) +// if there is no match. +func (s *FileSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, int) { + if startPos >= len(chunks) || len(s.index.Chunks) == 0 { + return 0, 0 + } + pos, ok := s.pos[chunks[startPos].ID] + if !ok { + return 0, 0 + } + // From every position of chunks[startPos] in the source, find a run of + // matching chunks. Then return the longest of those runs. + var ( + bestSeedPos int + maxLen int + limit int + ) + if !s.canReflink { + // Limit the maximum number of chunks, in a single sequence, to avoid + // having jobs that are too unbalanced. + // However, if reflinks are supported, we don't limit it to make it faster and + // take less space. + limit = 100 + } + for _, p := range pos { + seedPos, n := s.maxMatchFrom(chunks[startPos:], p, limit) + if n > maxLen { + bestSeedPos = seedPos + maxLen = n + } + if limit != 0 && limit == maxLen { + break + } + } + if maxLen == 0 { + return 0, 0 + } + return s.index.Chunks[bestSeedPos].Start, maxLen +} + +// GetSegment constructs a SeedSegment for a matched range identified by its +// byte offset and size in the seed. +func (s *FileSeed) GetSegment(offset, size uint64) SeedSegment { + i := sort.Search(len(s.index.Chunks), func(j int) bool { + return s.index.Chunks[j].Start >= offset + }) + var covered uint64 + end := i + for end < len(s.index.Chunks) && covered < size { + covered += s.index.Chunks[end].Size + end++ + } + return newFileSeedSegment(s.srcFile, s.index.Chunks[i:end], s.canReflink) +} + +func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { + chunkingPrefix := fmt.Sprintf("Attempt %d: Chunking Seed %d ", attempt, seedNumber) + index, _, err := IndexFromFile(ctx, s.srcFile, n, s.index.Index.ChunkSizeMin, s.index.Index.ChunkSizeAvg, + s.index.Index.ChunkSizeMax, NewProgressBar(chunkingPrefix)) + if err != nil { + return err + } + + s.index = index + s.pos = make(map[ChunkID][]int, len(s.index.Chunks)) + for i, c := range s.index.Chunks { + s.pos[c.ID] = append(s.pos[c.ID], i) + } + + return nil +} + +// maxMatchFrom compares chunks from position 0 with seed chunks starting at p. +// Returns (p, count) where p is the seed start and count is the number of +// matching chunks. A "limit" value of zero means that there is no limit. +func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) (int, int) { + if len(chunks) == 0 { + return 0, 0 + } + var ( + sp int + dp = p + ) + for { + if limit != 0 && sp == limit { + break + } + if dp >= len(s.index.Chunks) || sp >= len(chunks) { + break + } + if chunks[sp].ID != s.index.Chunks[dp].ID { + break + } + dp++ + sp++ + } + return p, dp - p +} + +type fileSeedSegment struct { + file string + chunks []IndexChunk + canReflink bool + needValidation bool +} + +func newFileSeedSegment(file string, chunks []IndexChunk, canReflink bool) *fileSeedSegment { + return &fileSeedSegment{ + canReflink: canReflink, + file: file, + chunks: chunks, + } +} + +func (s *fileSeedSegment) FileName() string { + return s.file +} + +func (s *fileSeedSegment) Size() uint64 { + if len(s.chunks) == 0 { + return 0 + } + last := s.chunks[len(s.chunks)-1] + return last.Start + last.Size - s.chunks[0].Start +} + +func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { + if length != s.Size() { + return 0, 0, fmt.Errorf("unable to copy %d bytes from %s to %s : wrong size", length, s.file, dst.Name()) + } + src, err := os.Open(s.file) + if err != nil { + return 0, 0, err + } + defer src.Close() + + // Do a straight copy if reflinks are not supported or blocks aren't aligned + if !s.canReflink || s.chunks[0].Start%blocksize != offset%blocksize { + return s.copy(dst, src, s.chunks[0].Start, length, offset) + } + return s.clone(dst, src, s.chunks[0].Start, length, offset, blocksize) +} + +// Validate compares all chunks in this slice of the seed index to the underlying data +// and fails if they don't match. +func (s *fileSeedSegment) Validate(file *os.File) error { + for _, c := range s.chunks { + b := make([]byte, c.Size) + if _, err := file.ReadAt(b, int64(c.Start)); err != nil { + return err + } + sum := Digest.Sum(b) + if sum != c.ID { + return fmt.Errorf("seed index for %s doesn't match its data", s.file) + } + } + return nil +} + +// Performs a plain copy of everything in the seed to the target, not cloning +// of blocks. +func (s *fileSeedSegment) copy(dst, src *os.File, srcOffset, length, dstOffset uint64) (uint64, uint64, error) { + if _, err := dst.Seek(int64(dstOffset), os.SEEK_SET); err != nil { + return 0, 0, err + } + if _, err := src.Seek(int64(srcOffset), os.SEEK_SET); err != nil { + return 0, 0, err + } + + // Copy using a fixed buffer. Using io.Copy() with a LimitReader will make it + // create a buffer matching N of the LimitReader which can be too large + copied, err := io.CopyBuffer(dst, io.LimitReader(src, int64(length)), make([]byte, 64*1024)) + return uint64(copied), 0, err +} + +// Reflink the overlapping blocks in the two ranges and copy the bit before and +// after the blocks. +func (s *fileSeedSegment) clone(dst, src *os.File, srcOffset, srcLength, dstOffset, blocksize uint64) (uint64, uint64, error) { + if srcOffset%blocksize != dstOffset%blocksize { + return 0, 0, fmt.Errorf("reflink ranges not aligned between %s and %s", src.Name(), dst.Name()) + } + + srcAlignStart := (srcOffset/blocksize + 1) * blocksize + srcAlignEnd := (srcOffset + srcLength) / blocksize * blocksize + dstAlignStart := (dstOffset/blocksize + 1) * blocksize + alignLength := srcAlignEnd - srcAlignStart + dstAlignEnd := dstAlignStart + alignLength + + // fill the area before the first aligned block + var copied uint64 + c1, _, err := s.copy(dst, src, srcOffset, srcAlignStart-srcOffset, dstOffset) + if err != nil { + return c1, 0, err + } + copied += c1 + // fill the area after the last aligned block + c2, _, err := s.copy(dst, src, srcAlignEnd, srcOffset+srcLength-srcAlignEnd, dstAlignEnd) + if err != nil { + return copied + c2, 0, err + } + copied += c2 + // close the aligned blocks + return copied, alignLength, CloneRange(dst, src, srcAlignStart, alignLength, dstAlignStart) +} + type fileSeedSource struct { segment SeedSegment seed Seed diff --git a/fileseed.go b/fileseed.go deleted file mode 100644 index bb2408f..0000000 --- a/fileseed.go +++ /dev/null @@ -1,239 +0,0 @@ -package desync - -import ( - "context" - "fmt" - "io" - "os" - "sort" -) - -// FileSeed is used to copy or clone blocks from an existing index+blob during -// file extraction. -type FileSeed struct { - srcFile string - index Index - pos map[ChunkID][]int - canReflink bool -} - -// NewIndexSeed initializes a new seed that uses an existing index and its blob -func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error) { - s := FileSeed{ - srcFile: srcFile, - pos: make(map[ChunkID][]int), - index: index, - canReflink: CanClone(dstFile, srcFile), - } - for i, c := range s.index.Chunks { - s.pos[c.ID] = append(s.pos[c.ID], i) - } - return &s, nil -} - -// LongestMatchFrom returns the longest sequence of chunks anywhere in the seed -// that match chunks starting at chunks[startPos]. It returns the byte offset -// of the match in the seed and the number of matching chunks. Returns (0, 0) -// if there is no match. -func (s *FileSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, int) { - if startPos >= len(chunks) || len(s.index.Chunks) == 0 { - return 0, 0 - } - pos, ok := s.pos[chunks[startPos].ID] - if !ok { - return 0, 0 - } - // From every position of chunks[startPos] in the source, find a run of - // matching chunks. Then return the longest of those runs. - var ( - bestSeedPos int - maxLen int - limit int - ) - if !s.canReflink { - // Limit the maximum number of chunks, in a single sequence, to avoid - // having jobs that are too unbalanced. - // However, if reflinks are supported, we don't limit it to make it faster and - // take less space. - limit = 100 - } - for _, p := range pos { - seedPos, n := s.maxMatchFrom(chunks[startPos:], p, limit) - if n > maxLen { - bestSeedPos = seedPos - maxLen = n - } - if limit != 0 && limit == maxLen { - break - } - } - if maxLen == 0 { - return 0, 0 - } - return s.index.Chunks[bestSeedPos].Start, maxLen -} - -// GetSegment constructs a SeedSegment for a matched range identified by its -// byte offset and size in the seed. -func (s *FileSeed) GetSegment(offset, size uint64) SeedSegment { - i := sort.Search(len(s.index.Chunks), func(j int) bool { - return s.index.Chunks[j].Start >= offset - }) - var covered uint64 - end := i - for end < len(s.index.Chunks) && covered < size { - covered += s.index.Chunks[end].Size - end++ - } - return newFileSeedSegment(s.srcFile, s.index.Chunks[i:end], s.canReflink) -} - -func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { - chunkingPrefix := fmt.Sprintf("Attempt %d: Chunking Seed %d ", attempt, seedNumber) - index, _, err := IndexFromFile(ctx, s.srcFile, n, s.index.Index.ChunkSizeMin, s.index.Index.ChunkSizeAvg, - s.index.Index.ChunkSizeMax, NewProgressBar(chunkingPrefix)) - if err != nil { - return err - } - - s.index = index - s.pos = make(map[ChunkID][]int, len(s.index.Chunks)) - for i, c := range s.index.Chunks { - s.pos[c.ID] = append(s.pos[c.ID], i) - } - - return nil -} - -// maxMatchFrom compares chunks from position 0 with seed chunks starting at p. -// Returns (p, count) where p is the seed start and count is the number of -// matching chunks. A "limit" value of zero means that there is no limit. -func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) (int, int) { - if len(chunks) == 0 { - return 0, 0 - } - var ( - sp int - dp = p - ) - for { - if limit != 0 && sp == limit { - break - } - if dp >= len(s.index.Chunks) || sp >= len(chunks) { - break - } - if chunks[sp].ID != s.index.Chunks[dp].ID { - break - } - dp++ - sp++ - } - return p, dp - p -} - -type fileSeedSegment struct { - file string - chunks []IndexChunk - canReflink bool - needValidation bool -} - -func newFileSeedSegment(file string, chunks []IndexChunk, canReflink bool) *fileSeedSegment { - return &fileSeedSegment{ - canReflink: canReflink, - file: file, - chunks: chunks, - } -} - -func (s *fileSeedSegment) FileName() string { - return s.file -} - -func (s *fileSeedSegment) Size() uint64 { - if len(s.chunks) == 0 { - return 0 - } - last := s.chunks[len(s.chunks)-1] - return last.Start + last.Size - s.chunks[0].Start -} - -func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { - if length != s.Size() { - return 0, 0, fmt.Errorf("unable to copy %d bytes from %s to %s : wrong size", length, s.file, dst.Name()) - } - src, err := os.Open(s.file) - if err != nil { - return 0, 0, err - } - defer src.Close() - - // Do a straight copy if reflinks are not supported or blocks aren't aligned - if !s.canReflink || s.chunks[0].Start%blocksize != offset%blocksize { - return s.copy(dst, src, s.chunks[0].Start, length, offset) - } - return s.clone(dst, src, s.chunks[0].Start, length, offset, blocksize) -} - -// Validate compares all chunks in this slice of the seed index to the underlying data -// and fails if they don't match. -func (s *fileSeedSegment) Validate(file *os.File) error { - for _, c := range s.chunks { - b := make([]byte, c.Size) - if _, err := file.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum != c.ID { - return fmt.Errorf("seed index for %s doesn't match its data", s.file) - } - } - return nil -} - -// Performs a plain copy of everything in the seed to the target, not cloning -// of blocks. -func (s *fileSeedSegment) copy(dst, src *os.File, srcOffset, length, dstOffset uint64) (uint64, uint64, error) { - if _, err := dst.Seek(int64(dstOffset), os.SEEK_SET); err != nil { - return 0, 0, err - } - if _, err := src.Seek(int64(srcOffset), os.SEEK_SET); err != nil { - return 0, 0, err - } - - // Copy using a fixed buffer. Using io.Copy() with a LimitReader will make it - // create a buffer matching N of the LimitReader which can be too large - copied, err := io.CopyBuffer(dst, io.LimitReader(src, int64(length)), make([]byte, 64*1024)) - return uint64(copied), 0, err -} - -// Reflink the overlapping blocks in the two ranges and copy the bit before and -// after the blocks. -func (s *fileSeedSegment) clone(dst, src *os.File, srcOffset, srcLength, dstOffset, blocksize uint64) (uint64, uint64, error) { - if srcOffset%blocksize != dstOffset%blocksize { - return 0, 0, fmt.Errorf("reflink ranges not aligned between %s and %s", src.Name(), dst.Name()) - } - - srcAlignStart := (srcOffset/blocksize + 1) * blocksize - srcAlignEnd := (srcOffset + srcLength) / blocksize * blocksize - dstAlignStart := (dstOffset/blocksize + 1) * blocksize - alignLength := srcAlignEnd - srcAlignStart - dstAlignEnd := dstAlignStart + alignLength - - // fill the area before the first aligned block - var copied uint64 - c1, _, err := s.copy(dst, src, srcOffset, srcAlignStart-srcOffset, dstOffset) - if err != nil { - return c1, 0, err - } - copied += c1 - // fill the area after the last aligned block - c2, _, err := s.copy(dst, src, srcAlignEnd, srcOffset+srcLength-srcAlignEnd, dstAlignEnd) - if err != nil { - return copied + c2, 0, err - } - copied += c2 - // close the aligned blocks - return copied, alignLength, CloneRange(dst, src, srcAlignStart, alignLength, dstAlignStart) -}