diff --git a/fileseed.go b/assemble-fileseed.go similarity index 67% rename from fileseed.go rename to assemble-fileseed.go index be1c8a6..8ed0f57 100644 --- a/fileseed.go +++ b/assemble-fileseed.go @@ -5,7 +5,7 @@ import ( "fmt" "io" "os" - "sync" + "sort" ) // FileSeed is used to copy or clone blocks from an existing index+blob during @@ -15,18 +15,15 @@ type FileSeed struct { index Index pos map[ChunkID][]int canReflink bool - isInvalid bool - mu sync.RWMutex } // NewIndexSeed initializes a new seed that uses an existing index and its blob -func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error) { +func NewFileSeed(dstFile string, srcFile string, index Index) (*FileSeed, error) { s := FileSeed{ srcFile: srcFile, pos: make(map[ChunkID][]int), index: index, canReflink: CanClone(dstFile, srcFile), - isInvalid: false, } for i, c := range s.index.Chunks { s.pos[c.ID] = append(s.pos[c.ID], i) @@ -34,29 +31,24 @@ func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error return &s, nil } -// LongestMatchWith returns the longest sequence of chunks anywhere in Source -// that match `chunks` starting at chunks[0], limiting the maximum number of chunks -// if reflinks are not supported. If there is no match, it returns a length of zero -// and a nil SeedSegment. -func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - s.mu.RLock() - isInvalid := s.isInvalid - s.mu.RUnlock() - - // isInvalid can be concurrently read or written. Use a mutex to avoid a race - if len(chunks) == 0 || len(s.index.Chunks) == 0 || isInvalid { - return 0, nil +// LongestMatchFrom returns the longest sequence of chunks anywhere in the seed +// that match chunks starting at chunks[startPos]. It returns the byte offset +// and byte length of the match in the seed, plus the chunk offset and chunk +// length. Returns (0, 0, 0, 0) if there is no match. +func (s *FileSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, uint64, int, int) { + if startPos >= len(chunks) || len(s.index.Chunks) == 0 { + return 0, 0, 0, 0 } - pos, ok := s.pos[chunks[0].ID] + pos, ok := s.pos[chunks[startPos].ID] if !ok { - return 0, nil + return 0, 0, 0, 0 } - // From every position of chunks[0] in the source, find a slice of - // matching chunks. Then return the longest of those slices. + // From every position of chunks[startPos] in the source, find a run of + // matching chunks. Then return the longest of those runs. var ( - match []IndexChunk - max int - limit int + bestSeedPos int + maxLen int + limit int ) if !s.canReflink { // Limit the maximum number of chunks, in a single sequence, to avoid @@ -66,16 +58,37 @@ func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { limit = 100 } for _, p := range pos { - m := s.maxMatchFrom(chunks, p, limit) - if len(m) > max { - match = m - max = len(m) + seedPos, n := maxMatchFrom(chunks[startPos:], s.index.Chunks, p, limit) + if n > maxLen { + bestSeedPos = seedPos + maxLen = n } - if limit != 0 && limit == max { + if limit != 0 && limit == maxLen { break } } - return max, newFileSeedSegment(s.srcFile, match, s.canReflink) + if maxLen == 0 { + return 0, 0, 0, 0 + } + byteOffset := s.index.Chunks[bestSeedPos].Start + last := s.index.Chunks[bestSeedPos+maxLen-1] + byteLength := last.Start + last.Size - byteOffset + return byteOffset, byteLength, bestSeedPos, maxLen +} + +// GetSegment constructs a SeedSegment for a matched range identified by its +// byte offset and size in the seed. +func (s *FileSeed) GetSegment(offset, size uint64) SeedSegment { + i := sort.Search(len(s.index.Chunks), func(j int) bool { + return s.index.Chunks[j].Start >= offset + }) + var covered uint64 + end := i + for end < len(s.index.Chunks) && covered < size { + covered += s.index.Chunks[end].Size + end++ + } + return newFileSeedSegment(s.srcFile, s.index.Chunks[i:end], s.canReflink) } func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { @@ -87,7 +100,6 @@ func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seed } s.index = index - s.SetInvalid(false) s.pos = make(map[ChunkID][]int, len(s.index.Chunks)) for i, c := range s.index.Chunks { s.pos[c.ID] = append(s.pos[c.ID], i) @@ -96,44 +108,6 @@ func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seed return nil } -func (s *FileSeed) SetInvalid(value bool) { - s.mu.Lock() - defer s.mu.Unlock() - s.isInvalid = value -} - -func (s *FileSeed) IsInvalid() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.isInvalid -} - -// Returns a slice of chunks from the seed. Compares chunks from position 0 -// with seed chunks starting at p. A "limit" value of zero means that there is no limit. -func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk { - if len(chunks) == 0 { - return nil - } - var ( - sp int - dp = p - ) - for { - if limit != 0 && sp == limit { - break - } - if dp >= len(s.index.Chunks) || sp >= len(chunks) { - break - } - if chunks[sp].ID != s.index.Chunks[dp].ID { - break - } - dp++ - sp++ - } - return s.index.Chunks[p:dp] -} - type fileSeedSegment struct { file string chunks []IndexChunk @@ -239,3 +213,27 @@ func (s *fileSeedSegment) clone(dst, src *os.File, srcOffset, srcLength, dstOffs // close the aligned blocks return copied, alignLength, CloneRange(dst, src, srcAlignStart, alignLength, dstAlignStart) } + +type fileSeedSource struct { + segment SeedSegment + seed Seed + offset uint64 + length uint64 + isBlank bool +} + +func (s *fileSeedSource) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + blocksize := blocksizeOfFile(f.Name()) + return s.segment.WriteInto(f, s.offset, s.length, blocksize, s.isBlank) +} + +func (s *fileSeedSource) Seed() Seed { return s.seed } +func (s *fileSeedSource) File() string { return s.segment.FileName() } + +func (s *fileSeedSource) Validate(file *os.File) error { + return s.segment.Validate(file) +} + +func (s *fileSeedSource) String() string { + return fmt.Sprintf("FileSeed(%s): Copy to [%d:%d]", s.segment.FileName(), s.offset, s.offset+s.length) +} diff --git a/assemble-inplacecopy.go b/assemble-inplacecopy.go new file mode 100644 index 0000000..b436a59 --- /dev/null +++ b/assemble-inplacecopy.go @@ -0,0 +1,57 @@ +package desync + +import ( + "fmt" + "os" +) + +// inPlaceCopy copies a chunk from one position to another within the same file. +// It uses ReadAt/WriteAt (pread/pwrite) which are position-independent and safe +// for concurrent use on the same file handle. +type inPlaceCopy struct { + srcOffset uint64 + srcSize uint64 + dstOffset uint64 + dstSize uint64 + + // Cycle-breaking: the first mover in a cycle pre-reads the buffered + // operation's source before executing its own copy. + preBuffers []*inPlaceCopy // targets whose writeBuf to populate before own copy + writeBuf []byte // non-nil → write from this buffer, skip file read +} + +func (s *inPlaceCopy) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + // Step 1: Pre-read sources for cycle-broken chunks before our own copy + // overwrites their data. + for _, pb := range s.preBuffers { + pb.writeBuf = make([]byte, pb.srcSize) + if _, err := f.ReadAt(pb.writeBuf, int64(pb.srcOffset)); err != nil { + return 0, 0, fmt.Errorf("inPlaceCopy pre-buffer read at %d: %w", pb.srcOffset, err) + } + } + + // Step 2: If this chunk was cycle-broken, write from the pre-read buffer. + if s.writeBuf != nil { + if _, err := f.WriteAt(s.writeBuf, int64(s.dstOffset)); err != nil { + return 0, 0, fmt.Errorf("inPlaceCopy buffer write at %d: %w", s.dstOffset, err) + } + return s.dstSize, 0, nil + } + + // Step 3: Normal copy — read source into a temp buffer, then write to dest. + // Always buffer first to handle overlapping ranges safely. + buf := make([]byte, s.srcSize) + if _, err := f.ReadAt(buf, int64(s.srcOffset)); err != nil { + return 0, 0, fmt.Errorf("inPlaceCopy read at %d: %w", s.srcOffset, err) + } + if _, err := f.WriteAt(buf, int64(s.dstOffset)); err != nil { + return 0, 0, fmt.Errorf("inPlaceCopy write at %d: %w", s.dstOffset, err) + } + return s.dstSize, 0, nil +} + +func (s *inPlaceCopy) String() string { + return fmt.Sprintf("InPlace: Copy [%d:%d] to [%d:%d]", + s.srcOffset, s.srcOffset+s.srcSize, + s.dstOffset, s.dstOffset+s.dstSize) +} diff --git a/assemble-inplaceseed.go b/assemble-inplaceseed.go new file mode 100644 index 0000000..6763c50 --- /dev/null +++ b/assemble-inplaceseed.go @@ -0,0 +1,18 @@ +package desync + +// InPlaceSeed is a FileSeed where the source and destination are the same file. +// This makes the relationship explicit when desync extract is used with seeds +// that resolve to the same path as the extraction target. +type InPlaceSeed struct { + *FileSeed +} + +// NewInPlaceSeed initializes a seed where the source and destination are the +// same file. It passes the file path as both src and dst to NewFileSeed. +func NewInPlaceSeed(file string, index Index) (*InPlaceSeed, error) { + fs, err := NewFileSeed(file, file, index) + if err != nil { + return nil, err + } + return &InPlaceSeed{FileSeed: fs}, nil +} diff --git a/assemble-plan.go b/assemble-plan.go new file mode 100644 index 0000000..16a9bf9 --- /dev/null +++ b/assemble-plan.go @@ -0,0 +1,829 @@ +package desync + +import ( + "errors" + "fmt" + "os" + "slices" + "sort" + "sync" + + "golang.org/x/sync/errgroup" +) + +type PlanOption func(*AssemblePlan) + +func PlanWithConcurrency(n int) PlanOption { + return func(p *AssemblePlan) { + p.concurrency = n + } +} + +func PlanWithSeeds(seeds []Seed) PlanOption { + return func(p *AssemblePlan) { + p.seeds = seeds + } +} + +func PlanWithTargetIsBlank(isBlank bool) PlanOption { + return func(p *AssemblePlan) { + p.targetIsBlank = isBlank + } +} + +// AssemblePlan holds a directed acyclic graph of steps. +type AssemblePlan struct { + idx Index + concurrency int + target string + store Store + seeds []Seed + targetIsBlank bool + + // Placements is an intermediate representation of the target index, + // capturing what source is used to populate each chunk. It mirrors the + // length of the index but a single step can span multiple chunks. + placements []*placement + + // InPlaceReads is a list of placements with sources that read sections + // from the target file. This needs to happen before any steps that + // overwrite the in-place source data. This is sparsely populated and + // used to express a dependency in the form "don't write to this chunk + // until these chunks are read from the in-place target". + inPlaceReads []*placement + + // inPlaceDeps records ordering constraints between in-place copy + // placements produced by Tarjan's SCC linearization. Each entry + // says placement[from] must complete before placement[to] starts. + inPlaceDeps []inPlaceDep + + // inPlaceOrder lists placements from generateInPlace in their + // desired step output order: skips first, then copies in + // linearized cycle order. Steps() iterates this before + // p.placements so in-place operations precede other sources. + inPlaceOrder []*placement + + selfSeed *selfSeed +} + +type inPlaceDep struct{ from, to int } + +type assembleSource interface { + fmt.Stringer + Execute(f *os.File) (copied uint64, cloned uint64, err error) +} + +type assembleSeedSource interface { + assembleSource + Seed() Seed + File() string + Validate(file *os.File) error +} + +type placement struct { + source assembleSource + dependsOnStart int // index of another placement this one depends on + dependsOnSize int // number of sequential placements (from dependsOnStart) this depends on +} + +// NewPlan creates a fully populated AssemblePlan. +func NewPlan(name string, idx Index, s Store, opts ...PlanOption) (*AssemblePlan, error) { + p := &AssemblePlan{ + idx: idx, + concurrency: 1, + target: name, + store: s, + targetIsBlank: true, + placements: make([]*placement, len(idx.Chunks)), + inPlaceReads: make([]*placement, len(idx.Chunks)), + } + for _, opt := range opts { + opt(p) + } + + ss, err := newSelfSeed(p.target, p.idx, p.concurrency) + if err != nil { + return nil, err + } + p.selfSeed = ss + + if err := p.generate(); err != nil { + p.Close() + return nil, err + } + return p, nil +} + +// Close releases resources held by the plan. +func (p *AssemblePlan) Close() { + if p.selfSeed != nil { + p.selfSeed.Close() + } +} + +// Validate checks that all file seed placements still match their underlying +// data. Returns a SeedInvalid error if a seed file was modified after its +// index was created. +func (p *AssemblePlan) Validate() error { + // Phase 1 — Sequential: collect unique fileSeedSource placements, open + // their backing files, and build a list of items to validate. + type validateItem struct { + fs assembleSeedSource + file *os.File + } + + seen := make(map[*placement]struct{}) + fileMap := make(map[string]*os.File) + defer func() { + for _, f := range fileMap { + f.Close() + } + }() + + invalidSeeds := make(map[Seed]error) + failedFiles := make(map[string]struct{}) + + var items []validateItem + for _, pl := range p.placements { + if _, ok := seen[pl]; ok { + continue + } + seen[pl] = struct{}{} + + fs, ok := pl.source.(assembleSeedSource) + if !ok || fs.File() == "" { + continue + } + + // Skip seeds and files already known to be invalid + if _, ok := invalidSeeds[fs.Seed()]; ok { + continue + } + if _, ok := failedFiles[fs.File()]; ok { + invalidSeeds[fs.Seed()] = fmt.Errorf("seed file %s could not be opened", fs.File()) + continue + } + + if _, ok := fileMap[fs.File()]; !ok { + f, err := os.Open(fs.File()) + if err != nil { + failedFiles[fs.File()] = struct{}{} + invalidSeeds[fs.Seed()] = err + continue + } + fileMap[fs.File()] = f + } + + items = append(items, validateItem{fs: fs, file: fileMap[fs.File()]}) + } + + // Phase 2 — Concurrent: validate each segment in parallel. + var mu sync.Mutex + var g errgroup.Group + g.SetLimit(p.concurrency) + for _, item := range items { + g.Go(func() error { + if err := item.fs.Validate(item.file); err != nil { + mu.Lock() + invalidSeeds[item.fs.Seed()] = err + mu.Unlock() + } + return nil + }) + } + g.Wait() + + // Phase 3 — Sequential: build the error result. + if len(invalidSeeds) > 0 { + seeds := make([]Seed, 0, len(invalidSeeds)) + errs := make([]error, 0, len(invalidSeeds)) + for seed, err := range invalidSeeds { + seeds = append(seeds, seed) + errs = append(errs, err) + } + return SeedInvalid{Seeds: seeds, Err: errors.Join(errs...)} + } + return nil +} + +func (p *AssemblePlan) generate() error { + // Find the in-place seed, if any. There can only be one. + var inPlaceSeed *InPlaceSeed + for _, seed := range p.seeds { + if ips, ok := seed.(*InPlaceSeed); ok { + inPlaceSeed = ips + break + } + } + + // When the target file already exists, mark chunks that are already + // correct so they can be skipped during assembly. If we have an + // in-place seed, its index tells us what's already in place without + // any file I/O. Otherwise fall back to reading and hashing each chunk. + if !p.targetIsBlank { + if inPlaceSeed != nil { + p.generateInPlace(inPlaceSeed) + } else { + p.generateSkips() + } + } + + // Find all matches in file itself as they're written. As it's + // populated, sections can be copied to other chunks. This involves + // depending on earlier steps before chunks can be copied within the + // file. + for i := 0; i < len(p.idx.Chunks); i++ { + if p.placements[i] != nil { + continue // Already filled + } + + _, _, start, n := p.selfSeed.LongestMatchFrom(p.idx.Chunks, i) + if n < 1 { + continue + } + + // Repeat the same placement for all chunks in the sequence. + // We dedup sequences later. + pl := &placement{} + + // We can use up to n chunks from the seed, find out how much + // we can actually use without overwriting any existing placements + // in the list. + var ( + to = i + size int + ) + for range n { + if p.placements[i] != nil { + break + } + + p.placements[i] = pl + i++ + size++ + } + i-- // compensate for the outer loop's i++ + + // Update the step with the potentially adjusted length + seedOffset := p.idx.Chunks[start].Start + last := p.idx.Chunks[start+size-1] + length := last.Start + last.Size - seedOffset + offset := p.idx.Chunks[to].Start + + pl.source = p.selfSeed.GetSegment(seedOffset, offset, length) + pl.dependsOnStart = start + pl.dependsOnSize = size + } + + // Check file seeds for matches in unfilled positions. + for _, seed := range p.seeds { + if _, ok := seed.(*InPlaceSeed); ok { // Skip the in-place seed, it's already handled + continue + } + + for i := 0; i < len(p.idx.Chunks); i++ { + if p.placements[i] != nil { + continue + } + + seedOffset, _, _, n := seed.LongestMatchFrom(p.idx.Chunks, i) + if n < 1 { + continue + } + + // Repeat the same placement for all chunks in the sequence. + // We dedup sequences later. + pl := &placement{} + + // We can use up to n chunks from the seed, find out how much + // we can actually use without overwriting any existing placements + // in the list. + var ( + to = i + size int + ) + for range n { + if p.placements[i] != nil { + break + } + p.placements[i] = pl + i++ + size++ + } + i-- // compensate for the outer loop's i++ + + // Update the step with the potentially adjusted length + offset := p.idx.Chunks[to].Start + last := p.idx.Chunks[to+size-1] + length := last.Start + last.Size - offset + segment := seed.GetSegment(seedOffset, length) + + pl.source = &fileSeedSource{ + segment: segment, + seed: seed, + offset: offset, + length: length, + isBlank: p.targetIsBlank, + } + } + } + + // Fill any gaps in the file by copying from the store. + for i := range p.placements { + if p.placements[i] != nil { + continue + } + p.placements[i] = &placement{ + source: ©FromStore{ + store: p.store, + chunk: p.idx.Chunks[i], + }, + } + } + + // We now have a fully populated list of placements. Some are + // duplicates, spanning multiple chunks. Dependencies are only defined + // forward, like chunk-A needs chunk-B to be written first, etc. + + return nil +} + +func (p *AssemblePlan) Steps() []*PlanStep { + // Create a step for every unique placement, counting how many + // index chunks each step covers. + stepsPerPlacement := make(map[*placement]*PlanStep) + for _, pl := range p.placements { + step, ok := stepsPerPlacement[pl] + if !ok { + step = &PlanStep{ + source: pl.source, + } + stepsPerPlacement[pl] = step + } + step.numChunks++ + } + + // Link the steps together. Use a seen set to avoid redundant work + // when the same placement pointer spans multiple chunks. + linked := make(map[*placement]struct{}, len(stepsPerPlacement)) + for _, pl := range p.placements { + if _, ok := linked[pl]; ok { + continue + } + linked[pl] = struct{}{} + + for i := pl.dependsOnStart; i < pl.dependsOnStart+pl.dependsOnSize; i++ { + stepsPerPlacement[pl].addDependency(stepsPerPlacement[p.placements[i]]) + stepsPerPlacement[p.placements[i]].addDependent(stepsPerPlacement[pl]) + } + } + + // Link in-place read dependencies: if a subsequent step (store + // copy, file seed) writes to a byte range that an in-place + // copy needs to read, the in-place copy must execute first. + for i, inPlaceRead := range p.inPlaceReads { + if inPlaceRead == nil { + continue + } + target := p.placements[i] + if target == inPlaceRead { + continue + } + ipStep := stepsPerPlacement[inPlaceRead] + step := stepsPerPlacement[target] + if step != ipStep { + step.addDependency(ipStep) + ipStep.addDependent(step) + } + } + + // Link in-place inter-operation dependencies from Tarjan + // linearization. These ensure cycle members and cross-SCC + // operations execute in the correct order. + for _, dep := range p.inPlaceDeps { + from := stepsPerPlacement[p.placements[dep.from]] + to := stepsPerPlacement[p.placements[dep.to]] + if from != to { + to.addDependency(from) + from.addDependent(to) + } + } + + // Make a slice of steps, preserving the order. Iterate + // inPlaceOrder first so in-place seed placements (skips + copies) + // precede other sources. Then iterate p.placements for everything + // else. Deduplication by pointer identity ensures each step + // appears exactly once. + steps := make([]*PlanStep, 0, len(stepsPerPlacement)) + for _, pl := range p.inPlaceOrder { + s, ok := stepsPerPlacement[pl] + if !ok { + continue + } + steps = append(steps, s) + delete(stepsPerPlacement, pl) + } + for _, pl := range p.placements { + s, ok := stepsPerPlacement[pl] + if !ok { + continue + } + steps = append(steps, s) + delete(stepsPerPlacement, pl) + } + + return steps +} + +// generateSkips reads the target file and marks chunks that are already in +// the correct position so they can be skipped during assembly. Consecutive +// matching chunks are merged into a single placement for efficiency. +func (p *AssemblePlan) generateSkips() { + f, err := os.Open(p.target) + if err != nil { + return + } + + var g errgroup.Group + g.SetLimit(p.concurrency) + for i, chunk := range p.idx.Chunks { + g.Go(func() error { + b := make([]byte, chunk.Size) + if _, err := f.ReadAt(b, int64(chunk.Start)); err != nil { + return nil + } + if Digest.Sum(b) == chunk.ID { + p.placements[i] = &placement{source: &skipInPlace{ + start: chunk.Start, + end: chunk.Start + chunk.Size, + }} + } + return nil + }) + } + g.Wait() + f.Close() + + // Merge consecutive in-place chunks into a single placement + // so that Steps() produces one step per run instead of one + // per chunk. This works because Steps() deduplicates by + // pointer identity. + var run *placement + for i, pl := range p.placements { + if pl == nil { + run = nil + continue + } + if _, ok := pl.source.(*skipInPlace); !ok { + run = nil + continue + } + if run == nil { + run = pl + continue + } + // Extend the existing run and share the pointer + run.source.(*skipInPlace).end = p.idx.Chunks[i].Start + p.idx.Chunks[i].Size + p.placements[i] = run + } +} + +// generateInPlace processes an in-place seed to classify each target chunk. +// Chunks already at the correct position get skipInPlace placements (detected +// by comparing the seed and target indexes, with no file I/O). Chunks that +// exist at different offsets get inPlaceCopy placements, with dependency +// cycles resolved using Tarjan's SCC algorithm. +func (p *AssemblePlan) generateInPlace(seed *InPlaceSeed) { + // Stage 1: Source mapping — index every chunk in the seed by its + // ChunkID, recording all byte ranges where it appears. + type byteRange struct{ start, size uint64 } + srcOf := make(map[ChunkID][]byteRange) + for _, c := range seed.index.Chunks { + srcOf[c.ID] = append(srcOf[c.ID], byteRange{c.Start, c.Size}) + } + + // Stage 2: Operation list — walk target index and classify each chunk. + type moveOp struct { + targetIdx int // index into p.idx.Chunks + srcStart uint64 // byte offset in old file + srcSize uint64 + dstStart uint64 // byte offset in target file + dstSize uint64 + } + var moves []moveOp + + // Create inPlaceSeedSkip placements for chunks that are already at + // the correct position. A chunk is "in place" when its ChunkID + // appears in the seed index at the same byte offset and size as in + // the target index. This is a pure index comparison — no file I/O. + // Unlike skipInPlace (created by generateSkips after hashing), + // these carry validation info so Validate() can verify the data. + for i, c := range p.idx.Chunks { + sources, ok := srcOf[c.ID] + if !ok { + continue + } + for _, src := range sources { + if src.start == c.Start && src.size == c.Size { + pl := &placement{source: &inPlaceSeedSkip{ + chunk: c, + seed: seed, + file: p.target, + }} + p.placements[i] = pl + p.inPlaceOrder = append(p.inPlaceOrder, pl) + break + } + } + } + + for i, c := range p.idx.Chunks { + if p.placements[i] != nil { + continue // Already placed (skipInPlace or other) + } + + sources := srcOf[c.ID] + if len(sources) == 0 { + continue // Not in seed; will be filled by store or file seed later + } + + // Use the first available copy as the move source. + src := sources[0] + moves = append(moves, moveOp{ + targetIdx: i, + srcStart: src.start, + srcSize: src.size, + dstStart: c.Start, + dstSize: c.Size, + }) + } + + if len(moves) == 0 { + return + } + + // Stage 3: Dependency graph — edge from i to j when move i's source + // overlaps move j's destination (i must read before j writes). + n := len(moves) + succ := make([][]int, n) + + // Build a sorted index of moves by destination start for O(n log n) overlap search. + sortedByDst := make([]int, n) + for i := range sortedByDst { + sortedByDst[i] = i + } + sort.Slice(sortedByDst, func(a, b int) bool { + return moves[sortedByDst[a]].dstStart < moves[sortedByDst[b]].dstStart + }) + + for i := range moves { + srcEnd := moves[i].srcStart + moves[i].srcSize + // First move whose dstStart+dstSize > srcStart + lo := sort.Search(len(sortedByDst), func(k int) bool { + m := moves[sortedByDst[k]] + return m.dstStart+m.dstSize > moves[i].srcStart + }) + for k := lo; k < len(sortedByDst); k++ { + j := sortedByDst[k] + if moves[j].dstStart >= srcEnd { + break + } + if i != j { + succ[i] = append(succ[i], j) + } + } + } + + // Stage 4: Tarjan's SCC + linearization + sccs := tarjanSCC(n, succ) + slices.Reverse(sccs) // topological order + + // Pre-compute minimum target index per SCC for deterministic sorting. + sccMin := make([]int, len(sccs)) + for si, scc := range sccs { + m := moves[scc[0]].targetIdx + for _, i := range scc[1:] { + if moves[i].targetIdx < m { + m = moves[i].targetIdx + } + } + sccMin[si] = m + } + + // Stable-sort independent SCCs by minimum target index so the + // output order is deterministic and follows the target layout. + indices := make([]int, len(sccs)) + for i := range indices { + indices[i] = i + } + slices.SortStableFunc(indices, func(a, b int) int { + return sccMin[a] - sccMin[b] + }) + sorted := make([][]int, len(sccs)) + for i, idx := range indices { + sorted[i] = sccs[idx] + } + sccs = sorted + + for _, scc := range sccs { + if len(scc) == 1 { + // Non-cyclic: single placement. + m := moves[scc[0]] + pl := &placement{source: &inPlaceCopy{ + srcOffset: m.srcStart, + srcSize: m.srcSize, + dstOffset: m.dstStart, + dstSize: m.dstSize, + }} + p.placements[m.targetIdx] = pl + p.inPlaceOrder = append(p.inPlaceOrder, pl) + continue + } + + // Cycle: pick the member with smallest srcSize as buffer-break. + bufIdx := scc[0] + for _, i := range scc[1:] { + if moves[i].srcSize < moves[bufIdx].srcSize { + bufIdx = i + } + } + + // Remove bufIdx's outgoing edges and topologically sort the + // remaining cycle members. + localSucc := make(map[int][]int, len(scc)) + localInDeg := make(map[int]int, len(scc)) + for _, i := range scc { + if i == bufIdx { + continue // exclude buffer-break from topo sort + } + localInDeg[i] = 0 + } + for _, i := range scc { + if i == bufIdx { + continue + } + for _, j := range succ[i] { + // Only consider edges within this SCC (excluding bufIdx). + if _, ok := localInDeg[j]; ok { + localSucc[i] = append(localSucc[i], j) + localInDeg[j]++ + } + } + } + + // Kahn's algorithm for topological sort within the cycle. + var queue []int + for _, i := range scc { + if i == bufIdx { + continue + } + if localInDeg[i] == 0 { + queue = append(queue, i) + } + } + var order []int + for len(queue) > 0 { + cur := queue[0] + queue = queue[1:] + order = append(order, cur) + for _, j := range localSucc[cur] { + localInDeg[j]-- + if localInDeg[j] == 0 { + queue = append(queue, j) + } + } + } + + // Build the inPlaceCopy sources. The first element in order + // gets preBuffers pointing to the buffer-break target. The + // buffer-break target writes from writeBuf. + bufMove := moves[bufIdx] + bufCopy := &inPlaceCopy{ + srcOffset: bufMove.srcStart, + srcSize: bufMove.srcSize, + dstOffset: bufMove.dstStart, + dstSize: bufMove.dstSize, + } + + // Create placements in order, with the first one pre-buffering + // the cycle-break target. + var prevIdx int + for k, i := range order { + m := moves[i] + ipc := &inPlaceCopy{ + srcOffset: m.srcStart, + srcSize: m.srcSize, + dstOffset: m.dstStart, + dstSize: m.dstSize, + } + if k == 0 { + ipc.preBuffers = []*inPlaceCopy{bufCopy} + } + pl := &placement{source: ipc} + p.placements[m.targetIdx] = pl + p.inPlaceOrder = append(p.inPlaceOrder, pl) + + // Record ordering dependencies between consecutive cycle members. + if k > 0 { + p.inPlaceDeps = append(p.inPlaceDeps, inPlaceDep{ + from: moves[prevIdx].targetIdx, + to: m.targetIdx, + }) + } + prevIdx = i + } + + // The buffer-break target is placed last and depends on the + // last member in order. + bufPl := &placement{source: bufCopy} + p.placements[bufMove.targetIdx] = bufPl + p.inPlaceOrder = append(p.inPlaceOrder, bufPl) + if len(order) > 0 { + p.inPlaceDeps = append(p.inPlaceDeps, inPlaceDep{ + from: moves[order[len(order)-1]].targetIdx, + to: bufMove.targetIdx, + }) + } + } + + // Stage 5: Populate inPlaceReads — for each move, find all target + // chunks whose byte range overlaps the move's source range and + // record the dependency so subsequent writes wait for the read. + // Only record dependencies for positions not yet placed (nil). + // Non-nil positions at this point are all in-place sources whose + // ordering is already handled by inPlaceDeps above. + for _, m := range moves { + srcEnd := m.srcStart + m.srcSize + pl := p.placements[m.targetIdx] + // Binary search for first chunk where Start+Size > srcStart. + lo := sort.Search(len(p.idx.Chunks), func(j int) bool { + return p.idx.Chunks[j].Start+p.idx.Chunks[j].Size > m.srcStart + }) + for j := lo; j < len(p.idx.Chunks); j++ { + if p.idx.Chunks[j].Start >= srcEnd { + break + } + if p.placements[j] != nil { + continue + } + p.inPlaceReads[j] = pl + } + } +} + +// tarjanSCC finds all strongly connected components of a directed graph. +// adj[v] lists the successors of node v. Returns SCCs in reverse +// topological order (sinks first). +func tarjanSCC(n int, adj [][]int) [][]int { + index := make([]int, n) + lowlink := make([]int, n) + onStack := make([]bool, n) + for i := range index { + index[i] = -1 + } + + var ( + stack []int + sccs [][]int + idx int + ) + + var visit func(v int) + visit = func(v int) { + index[v] = idx + lowlink[v] = idx + idx++ + stack = append(stack, v) + onStack[v] = true + + for _, w := range adj[v] { + if index[w] == -1 { + visit(w) + lowlink[v] = min(lowlink[v], lowlink[w]) + } else if onStack[w] { + lowlink[v] = min(lowlink[v], index[w]) + } + } + + if lowlink[v] == index[v] { + var scc []int + for { + w := stack[len(stack)-1] + stack = stack[:len(stack)-1] + onStack[w] = false + scc = append(scc, w) + if w == v { + break + } + } + sccs = append(sccs, scc) + } + } + + for v := range n { + if index[v] == -1 { + visit(v) + } + } + return sccs +} diff --git a/assemble-plan_test.go b/assemble-plan_test.go new file mode 100644 index 0000000..6107951 --- /dev/null +++ b/assemble-plan_test.go @@ -0,0 +1,534 @@ +package desync + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSelfSeedPlanSteps(t *testing.T) { + tests := map[string]struct { + index Index + expected []string + }{ + "all unique chunks": { + index: indexSequence(0x01, 0x02, 0x03), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + "single chunk": { + index: indexSequence(0x01), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + }, + }, + "repeated pair at end": { + // Sequence: 01 02 03 01 02 01 02 + // Positions 0,1 copy from 5,6; positions 3,4 copy from 5,6; + // positions 2,5,6 come from store. + index: indexSequence(0x01, 0x02, 0x03, 0x01, 0x02, 0x01, 0x02), + expected: []string{ + "SelfSeed: Copy [500:700] to [0:200]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [200:300]", + "SelfSeed: Copy [500:700] to [300:500]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [500:600]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [600:700]", + }, + }, + "full duplicate sequence": { + // Sequence: 01 02 03 01 02 03 + // Positions 0-2 copy from 3-5; positions 3-5 come from store. + index: indexSequence(0x01, 0x02, 0x03, 0x01, 0x02, 0x03), + expected: []string{ + "SelfSeed: Copy [300:600] to [0:300]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [300:400]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [400:500]", + "Store: Copy 0300000000000000000000000000000000000000000000000000000000000000 to [500:600]", + }, + }, + "same chunk repeated": { + // Sequence: 01 01 01 + // Position 2 comes from store, then positions 0 and 1 each + // self-seed from position 2. + index: indexSequence(0x01, 0x01, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "SelfSeed: Copy [200:300] to [100:200]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + "single repeated chunk": { + // Sequence: 01 02 01 + // Position 0 copies from 2; positions 1,2 come from store. + index: indexSequence(0x01, 0x02, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [200:300]", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + plan, err := NewPlan("test", test.index, nil) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + require.Equal(t, test.expected, got) + }) + } +} + +func TestInPlaceChunkDetection(t *testing.T) { + // Create chunk data and compute their IDs + data1 := make([]byte, 100) + data1[0] = 0x01 + id1 := Digest.Sum(data1) + + data2 := make([]byte, 100) + data2[0] = 0x02 + id2 := Digest.Sum(data2) + + data3 := make([]byte, 100) + data3[0] = 0x03 + id3 := Digest.Sum(data3) + + idx := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + {ID: id3, Start: 200, Size: 100}, + }, + } + + // Create a target file where chunks 0 and 2 match but chunk 1 does not + target := filepath.Join(t.TempDir(), "target") + f, err := os.Create(target) + require.NoError(t, err) + + _, err = f.Write(data1) // chunk 0: correct + require.NoError(t, err) + _, err = f.Write(make([]byte, 100)) // chunk 1: wrong data + require.NoError(t, err) + _, err = f.Write(data3) // chunk 2: correct + require.NoError(t, err) + f.Close() + + plan, err := NewPlan(target, idx, nil, PlanWithTargetIsBlank(false)) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + + cid2 := ChunkID(id2) + expected := []string{ + "InPlace: Skip [0:100]", + fmt.Sprintf("Store: Copy %s to [100:200]", &cid2), + "InPlace: Skip [200:300]", + } + require.Equal(t, expected, got) + + // Subtest: all chunks match in-place — consecutive ones should merge + t.Run("consecutive merge", func(t *testing.T) { + target2 := filepath.Join(t.TempDir(), "target2") + f2, err := os.Create(target2) + require.NoError(t, err) + _, err = f2.Write(data1) + require.NoError(t, err) + _, err = f2.Write(data2) + require.NoError(t, err) + _, err = f2.Write(data3) + require.NoError(t, err) + f2.Close() + + plan2, err := NewPlan(target2, idx, nil, PlanWithTargetIsBlank(false)) + require.NoError(t, err) + defer plan2.Close() + + steps2 := plan2.Steps() + got2 := make([]string, len(steps2)) + for i, s := range steps2 { + got2[i] = s.source.String() + } + + expected2 := []string{ + "InPlace: Skip [0:300]", + } + require.Equal(t, expected2, got2) + }) +} + +func TestFileSeedPlanSteps(t *testing.T) { + tests := map[string]struct { + target Index + seed Index + expected []string + }{ + "basic matching": { + // Target: 01, 02, 03, 04 + // Seed: 02, 03 + // Chunks 1-2 from seed, 0 and 3 from store. + target: indexSequence(0x01, 0x02, 0x03, 0x04), + seed: indexSequence(0x02, 0x03), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "FileSeed(seed): Copy to [100:300]", + "Store: Copy 0400000000000000000000000000000000000000000000000000000000000000 to [300:400]", + }, + }, + "all from seed": { + // Target: 01, 02, 03 + // Seed: 01, 02, 03 + // One seed step covering all. + target: indexSequence(0x01, 0x02, 0x03), + seed: indexSequence(0x01, 0x02, 0x03), + expected: []string{ + "FileSeed(seed): Copy to [0:300]", + }, + }, + "no match": { + // Target: 01, 02 + // Seed: 05, 06 + // Both from store. + target: indexSequence(0x01, 0x02), + seed: indexSequence(0x05, 0x06), + expected: []string{ + "Store: Copy 0100000000000000000000000000000000000000000000000000000000000000 to [0:100]", + "Store: Copy 0200000000000000000000000000000000000000000000000000000000000000 to [100:200]", + }, + }, + "self-seed priority": { + // Target: 01, 02, 01 + // Seed: 01, 02, 01 + // Self-seed fills position 0 (copy from position 2), + // seed fills positions 1-2 (matching seed chunks 02, 01). + target: indexSequence(0x01, 0x02, 0x01), + seed: indexSequence(0x01, 0x02, 0x01), + expected: []string{ + "SelfSeed: Copy [200:300] to [0:100]", + "FileSeed(seed): Copy to [100:300]", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + seed, err := NewFileSeed("test", "seed", test.seed) + require.NoError(t, err) + + plan, err := NewPlan("test", test.target, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + require.Equal(t, test.expected, got) + }) + } +} + +func TestInPlaceSeedPlanSteps(t *testing.T) { + // Create variable-size chunks with known data and compute real ChunkIDs. + // Each chunk is filled with a distinct byte so the SHA512/256 hash is unique. + type chunk struct { + id ChunkID + data []byte + size uint64 + } + newChunk := func(size int, fill byte) chunk { + data := make([]byte, size) + for i := range data { + data[i] = fill + } + return chunk{id: ChunkID(Digest.Sum(data)), data: data, size: uint64(size)} + } + + A := newChunk(200, 0xAA) + B := newChunk(150, 0xBB) + C := newChunk(100, 0xCC) + D := newChunk(50, 0xDD) + E := newChunk(180, 0xEE) // only appears in store + F := newChunk(120, 0xFF) // only appears in file seed + + // buildIndex creates an Index with contiguous chunk offsets. + buildIndex := func(chunks ...chunk) Index { + ic := make([]IndexChunk, len(chunks)) + var start uint64 + for i, c := range chunks { + ic[i] = IndexChunk{ID: c.id, Start: start, Size: c.size} + start += c.size + } + return Index{Chunks: ic} + } + + // writeFile writes concatenated chunk data to a temp file and returns its path. + writeFile := func(t *testing.T, chunks ...chunk) string { + t.Helper() + f := filepath.Join(t.TempDir(), "target") + var content []byte + for _, c := range chunks { + content = append(content, c.data...) + } + require.NoError(t, os.WriteFile(f, content, 0644)) + return f + } + + // planSteps is a helper that creates a plan and returns its step strings. + planSteps := func(t *testing.T, name string, target Index, opts ...PlanOption) []string { + t.Helper() + plan, err := NewPlan(name, target, nil, opts...) + require.NoError(t, err) + t.Cleanup(func() { plan.Close() }) + steps := plan.Steps() + got := make([]string, len(steps)) + for i, s := range steps { + got[i] = s.source.String() + } + return got + } + + // storeStep formats a store-sourced step string. + storeStep := func(c chunk, start uint64) string { + id := c.id + return fmt.Sprintf("Store: Copy %s to [%d:%d]", &id, start, start+c.size) + } + + t.Run("swap two chunks", func(t *testing.T) { + // In-place: [A:200][B:150] + // Target: [B:150][A:200] + // One cycle: A↔B. + f := writeFile(t, A, B) + inPlace, err := NewInPlaceSeed(f, buildIndex(A, B)) + require.NoError(t, err) + + got := planSteps(t, f, buildIndex(B, A), + PlanWithSeeds([]Seed{inPlace}), PlanWithTargetIsBlank(false)) + expected := []string{ + "InPlace: Copy [0:200] to [150:350]", + "InPlace: Copy [200:350] to [0:150]", + } + require.Equal(t, expected, got) + }) + + t.Run("two independent cycles", func(t *testing.T) { + // In-place: [A:200][B:150][C:100][D:50] + // Target: [B:150][A:200][D:50][C:100] + // Cycle 1: A↔B in byte range [0,350) + // Cycle 2: C↔D in byte range [350,500) + f := writeFile(t, A, B, C, D) + inPlace, err := NewInPlaceSeed(f, buildIndex(A, B, C, D)) + require.NoError(t, err) + + got := planSteps(t, f, buildIndex(B, A, D, C), + PlanWithSeeds([]Seed{inPlace}), PlanWithTargetIsBlank(false)) + expected := []string{ + "InPlace: Copy [0:200] to [150:350]", + "InPlace: Copy [200:350] to [0:150]", + "InPlace: Copy [350:450] to [400:500]", + "InPlace: Copy [450:500] to [350:400]", + } + require.Equal(t, expected, got) + }) + + t.Run("rearrange with store chunks", func(t *testing.T) { + // In-place: [A:200][B:150] + // Target: [B:150][A:200][E:180] + // A↔B cycle, E from store (not in seed). + f := writeFile(t, A, B) + inPlace, err := NewInPlaceSeed(f, buildIndex(A, B)) + require.NoError(t, err) + + got := planSteps(t, f, buildIndex(B, A, E), + PlanWithSeeds([]Seed{inPlace}), PlanWithTargetIsBlank(false)) + expected := []string{ + "InPlace: Copy [0:200] to [150:350]", + "InPlace: Copy [200:350] to [0:150]", + storeStep(E, 350), + } + require.Equal(t, expected, got) + }) + + t.Run("partial rearrangement with skip", func(t *testing.T) { + // In-place: [A:200][B:150][C:100] + // Target: [A:200][C:100][B:150] + // A already at [0:200] in both indexes → skip. + // B↔C cycle: B [200:350]→[300:450], C [350:450]→[200:300]. + f := writeFile(t, A, B, C) + inPlace, err := NewInPlaceSeed(f, buildIndex(A, B, C)) + require.NoError(t, err) + + got := planSteps(t, f, buildIndex(A, C, B), + PlanWithSeeds([]Seed{inPlace}), PlanWithTargetIsBlank(false)) + expected := []string{ + "InPlace: Skip [0:200]", + "InPlace: Copy [200:350] to [300:450]", + "InPlace: Copy [350:450] to [200:300]", + } + require.Equal(t, expected, got) + }) + + t.Run("mixed in-place and file seed", func(t *testing.T) { + // In-place: [A:200][B:150] + // File seed "seedfile": [F:120] + // Target: [A:200][F:120][B:150] + // A at same offset → skip. + // B moves [200:350]→[320:470] (B must read before F writes to [200:320]). + // F from file seed at [200:320]. + f := writeFile(t, A, B) + inPlaceSeed, err := NewInPlaceSeed(f, buildIndex(A, B)) + require.NoError(t, err) + fileSeed, err := NewFileSeed(f, "seedfile", buildIndex(F)) + require.NoError(t, err) + + got := planSteps(t, f, buildIndex(A, F, B), + PlanWithSeeds([]Seed{inPlaceSeed, fileSeed}), PlanWithTargetIsBlank(false)) + expected := []string{ + "InPlace: Skip [0:200]", + "InPlace: Copy [200:350] to [320:470]", + "FileSeed(seedfile): Copy to [200:320]", + } + require.Equal(t, expected, got) + }) +} + +func TestFileSeedValidation(t *testing.T) { + // Create two chunks with known data and compute their IDs + data1 := make([]byte, 100) + data1[0] = 0xAA + id1 := Digest.Sum(data1) + + data2 := make([]byte, 100) + data2[0] = 0xBB + id2 := Digest.Sum(data2) + + seedIndex := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + }, + } + + // Target index matches the seed exactly + targetIndex := Index{ + Chunks: []IndexChunk{ + {ID: id1, Start: 0, Size: 100}, + {ID: id2, Start: 100, Size: 100}, + }, + } + + t.Run("valid seed", func(t *testing.T) { + seedFile := filepath.Join(t.TempDir(), "seed") + f, err := os.Create(seedFile) + require.NoError(t, err) + _, err = f.Write(data1) + require.NoError(t, err) + _, err = f.Write(data2) + require.NoError(t, err) + f.Close() + + seed, err := NewFileSeed("target", seedFile, seedIndex) + require.NoError(t, err) + + plan, err := NewPlan("target", targetIndex, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + require.NoError(t, plan.Validate()) + }) + + t.Run("invalid seed", func(t *testing.T) { + seedFile := filepath.Join(t.TempDir(), "seed") + f, err := os.Create(seedFile) + require.NoError(t, err) + _, err = f.Write(data1) + require.NoError(t, err) + _, err = f.Write(data2) + require.NoError(t, err) + f.Close() + + seed, err := NewFileSeed("target", seedFile, seedIndex) + require.NoError(t, err) + + plan, err := NewPlan("target", targetIndex, nil, PlanWithSeeds([]Seed{seed})) + require.NoError(t, err) + defer plan.Close() + + // Corrupt the seed file after the plan was created + err = os.WriteFile(seedFile, make([]byte, 200), 0644) + require.NoError(t, err) + + err = plan.Validate() + require.Error(t, err) + + var seedErr SeedInvalid + require.ErrorAs(t, err, &seedErr) + require.Equal(t, []Seed{seed}, seedErr.Seeds) + }) + + t.Run("invalid in-place seed skip", func(t *testing.T) { + // Create a file with two chunks where the first is already at the + // correct position (will be an inPlaceSeedSkip). After plan + // creation, corrupt the file and verify Validate catches it. + f := filepath.Join(t.TempDir(), "target") + err := os.WriteFile(f, append(data1, data2...), 0644) + require.NoError(t, err) + + inPlace, err := NewInPlaceSeed(f, seedIndex) + require.NoError(t, err) + + plan, err := NewPlan(f, targetIndex, nil, + PlanWithSeeds([]Seed{inPlace}), PlanWithTargetIsBlank(false)) + require.NoError(t, err) + defer plan.Close() + + // Corrupt the target file after the plan was created + require.NoError(t, os.WriteFile(f, make([]byte, 200), 0644)) + + err = plan.Validate() + require.Error(t, err) + + var seedErr SeedInvalid + require.ErrorAs(t, err, &seedErr) + }) + + t.Run("null seed skipped", func(t *testing.T) { + // Create a null chunk index — data is all zeros + nullData := make([]byte, 100) + nullID := Digest.Sum(nullData) + + nullTargetIndex := Index{ + Chunks: []IndexChunk{ + {ID: nullID, Start: 0, Size: 100}, + }, + } + + // Use a null seed (FileName() returns "", so Validate skips it) + ns := &nullChunkSeed{id: nullID} + defer ns.close() + + plan, err := NewPlan("target", nullTargetIndex, nil, PlanWithSeeds([]Seed{ns})) + require.NoError(t, err) + defer plan.Close() + + require.NoError(t, plan.Validate()) + }) +} diff --git a/assemble-selfseed.go b/assemble-selfseed.go new file mode 100644 index 0000000..ccb3e92 --- /dev/null +++ b/assemble-selfseed.go @@ -0,0 +1,146 @@ +package desync + +import ( + "fmt" + "io" + "os" +) + +type selfSeed struct { + file string + index Index + pos map[ChunkID][]int + canReflink bool + readers chan *os.File +} + +func newSelfSeed(file string, index Index, n int) (*selfSeed, error) { + s := &selfSeed{ + file: file, + pos: make(map[ChunkID][]int), + index: index, + canReflink: CanClone(file, file), + readers: make(chan *os.File, n), + } + for i, c := range s.index.Chunks { + s.pos[c.ID] = append(s.pos[c.ID], i) + } + // Only open read handles if the file exists. If it doesn't, self-seed + // segments won't be created since there's nothing to match. + if _, err := os.Stat(file); err == nil { + for range n { + f, err := os.Open(file) + if err != nil { + s.Close() + return nil, err + } + s.readers <- f + } + } + return s, nil +} + +func (s *selfSeed) Close() { + if s.readers == nil { + return + } + close(s.readers) + for f := range s.readers { + f.Close() + } + s.readers = nil +} + +// LongestMatchFrom returns the longest sequence of matching chunks after a +// given starting position. +func (s *selfSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, uint64, int, int) { + if len(chunks) <= startPos || len(s.index.Chunks) == 0 { + return 0, 0, 0, 0 + } + pos, ok := s.pos[chunks[startPos].ID] + if !ok { + return 0, 0, 0, 0 + } + // From every position of chunks[startPos] in the source, find a slice of + // matching chunks. Then return the longest of those slices. + var ( + maxStart int + maxLen int + limit int + ) + if !s.canReflink { + // Limit the maximum number of chunks, in a single sequence, to + // avoid having jobs that are too unbalanced. However, if + // reflinks are supported, we don't limit it to make it faster + // and take less space. + limit = 100 + } + for _, p := range pos { + if p <= startPos { + continue + } + start, n := maxMatchFrom(chunks[startPos:], s.index.Chunks, p, limit) + // Clamp to prevent source [p, p+n) overlapping destination [startPos, startPos+n) + if max := p - startPos; n > max { + n = max + } + if n >= maxLen { // Using >= here to get the last (longest) match + maxStart = start + maxLen = n + } + if limit != 0 && limit == maxLen { + break + } + } + if maxLen == 0 { + return 0, 0, 0, 0 + } + byteOffset := s.index.Chunks[maxStart].Start + last := s.index.Chunks[maxStart+maxLen-1] + byteLength := last.Start + last.Size - byteOffset + return byteOffset, byteLength, maxStart, maxLen +} + +func (s *selfSeed) GetSegment(srcOffset, dstOffset, size uint64) *selfSeedSegment { + return &selfSeedSegment{ + seed: s, + srcOffset: srcOffset, + dstOffset: dstOffset, + size: size, + } +} + +type selfSeedSegment struct { + seed *selfSeed + srcOffset uint64 + dstOffset uint64 + size uint64 +} + +func (s *selfSeedSegment) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + blocksize := blocksizeOfFile(f.Name()) + + // Use reflinks if supported and blocks are aligned + if s.seed.canReflink && s.srcOffset%blocksize == s.dstOffset%blocksize { + return 0, s.size, CloneRange(f, f, s.srcOffset, s.size, s.dstOffset) + } + + // Borrow a read handle from the pool + src := <-s.seed.readers + defer func() { s.seed.readers <- src }() + + if _, err := src.Seek(int64(s.srcOffset), io.SeekStart); err != nil { + return 0, 0, err + } + if _, err := f.Seek(int64(s.dstOffset), io.SeekStart); err != nil { + return 0, 0, err + } + _, err = io.CopyBuffer(f, io.LimitReader(src, int64(s.size)), make([]byte, 64*1024)) + return s.size, 0, err +} + +func (s *selfSeedSegment) String() string { + return fmt.Sprintf("SelfSeed: Copy [%d:%d] to [%d:%d]", + s.srcOffset, s.srcOffset+s.size, + s.dstOffset, s.dstOffset+s.size) +} diff --git a/assemble-skip.go b/assemble-skip.go new file mode 100644 index 0000000..5e9e394 --- /dev/null +++ b/assemble-skip.go @@ -0,0 +1,52 @@ +package desync + +import ( + "fmt" + "os" +) + +// skipInPlace skips data chunks that are already in place. +type skipInPlace struct { + start uint64 + end uint64 +} + +func (s *skipInPlace) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + return 0, 0, nil +} + +func (s *skipInPlace) String() string { + return fmt.Sprintf("InPlace: Skip [%d:%d]", s.start, s.end) +} + +// inPlaceSeedSkip skips a chunk that is already in the correct position +// according to an in-place seed's index. Unlike skipInPlace (which is +// created after hashing the data), this is based on index comparison +// and carries validation info so Validate() can verify the data. +type inPlaceSeedSkip struct { + chunk IndexChunk + seed Seed + file string +} + +func (s *inPlaceSeedSkip) Execute(f *os.File) (uint64, uint64, error) { + return 0, 0, nil +} + +func (s *inPlaceSeedSkip) String() string { + return fmt.Sprintf("InPlace: Skip [%d:%d]", s.chunk.Start, s.chunk.Start+s.chunk.Size) +} + +func (s *inPlaceSeedSkip) Seed() Seed { return s.seed } +func (s *inPlaceSeedSkip) File() string { return s.file } + +func (s *inPlaceSeedSkip) Validate(file *os.File) error { + b := make([]byte, s.chunk.Size) + if _, err := file.ReadAt(b, int64(s.chunk.Start)); err != nil { + return err + } + if Digest.Sum(b) != s.chunk.ID { + return fmt.Errorf("in-place seed index for %s doesn't match its data", s.file) + } + return nil +} diff --git a/assemble-step.go b/assemble-step.go new file mode 100644 index 0000000..9701b62 --- /dev/null +++ b/assemble-step.go @@ -0,0 +1,62 @@ +package desync + +import ( + "iter" + "maps" +) + +type PlanStep struct { + source assembleSource + + // numChunks is the number of index chunks this step covers. + numChunks int + + // Steps that depend on this one. + dependents stepSet + + // Steps that this one depends on. + dependencies stepSet +} + +// addDependent adds a step that depends on this one. +func (n *PlanStep) addDependent(other *PlanStep) { + if n.dependents == nil { + n.dependents = newStepSet() + } + n.dependents.add(other) +} + +// addDependency adds a step that this one depends on. +func (n *PlanStep) addDependency(other *PlanStep) { + if n.dependencies == nil { + n.dependencies = newStepSet() + } + n.dependencies.add(other) +} + +// ready returns true when all dependencies have been resolved. +func (n *PlanStep) ready() bool { + return n.dependencies.len() == 0 +} + +type stepSet map[*PlanStep]struct{} + +func newStepSet() stepSet { + return make(stepSet) +} + +func (s stepSet) add(n *PlanStep) { + s[n] = struct{}{} +} + +func (s stepSet) remove(n *PlanStep) { + delete(s, n) +} + +func (s stepSet) Each() iter.Seq[*PlanStep] { + return maps.Keys(s) +} + +func (s stepSet) len() int { + return len(s) +} diff --git a/assemble-store.go b/assemble-store.go new file mode 100644 index 0000000..2e9ae64 --- /dev/null +++ b/assemble-store.go @@ -0,0 +1,31 @@ +package desync + +import ( + "fmt" + "os" +) + +type copyFromStore struct { + store Store + chunk IndexChunk +} + +func (s *copyFromStore) Execute(f *os.File) (copied uint64, cloned uint64, err error) { + chunk, err := s.store.GetChunk(s.chunk.ID) + if err != nil { + return 0, 0, err + } + b, err := chunk.Data() + if err != nil { + return 0, 0, err + } + if s.chunk.Size != uint64(len(b)) { + return 0, 0, fmt.Errorf("unexpected size for chunk %s", s.chunk.ID) + } + _, err = f.WriteAt(b, int64(s.chunk.Start)) + return 0, 0, err +} + +func (s *copyFromStore) String() string { + return fmt.Sprintf("Store: Copy %v to [%d:%d]", s.chunk.ID.String(), s.chunk.Start, s.chunk.Start+s.chunk.Size) +} diff --git a/assemble.go b/assemble.go index ca13980..d053783 100644 --- a/assemble.go +++ b/assemble.go @@ -2,9 +2,13 @@ package desync import ( "context" + "errors" "fmt" - "golang.org/x/sync/errgroup" "os" + "slices" + "sync" + + "golang.org/x/sync/errgroup" ) // InvalidSeedAction represents the action that we will take if a seed @@ -25,61 +29,6 @@ type AssembleOptions struct { InvalidSeedAction InvalidSeedAction } -// writeChunk tries to write a chunk by looking at the self seed, if it is already existing in the -// destination file or by taking it from the store. The in-place check runs first to avoid unnecessary -// writes. If the target already has the correct data, no write is performed. -func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Store, stats *ExtractStats, isBlank bool) error { - // If we operate on an existing file there's a good chance we already - // have the data written for this chunk. Let's read it from disk and - // compare to what is expected. This is checked first to avoid rewriting - // data that is already correct, even for chunks available in the selfSeed. - if !isBlank { - b := make([]byte, c.Size) - if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum == c.ID { - // Record we kept this chunk in the file (when using in-place extract) - stats.incChunksInPlace() - return nil - } - } - - // If we already took this chunk from the store we can reuse it by looking - // into the selfSeed. - if segment := ss.getChunk(c.ID); segment != nil { - copied, cloned, err := segment.WriteInto(f, c.Start, c.Size, blocksize, isBlank) - if err != nil { - return err - } - stats.addBytesCopied(copied) - stats.addBytesCloned(cloned) - return nil - } - - // Record this chunk having been pulled from the store - stats.incChunksFromStore() - // Pull the (compressed) chunk from the store - chunk, err := s.GetChunk(c.ID) - if err != nil { - return err - } - b, err := chunk.Data() - if err != nil { - return err - } - // Might as well verify the chunk size while we're at it - if c.Size != uint64(len(b)) { - return fmt.Errorf("unexpected size for chunk %s", c.ID.String()) - } - // Write the decompressed chunk into the file at the right position - if _, err = f.WriteAt(b, int64(c.Start)); err != nil { - return err - } - return nil -} - // AssembleFile re-assembles a file based on a list of index chunks. It runs n // goroutines, creating one filehandle for the file "name" per goroutine // and writes to the file simultaneously. If progress is provided, it'll be @@ -89,18 +38,11 @@ func writeChunk(c IndexChunk, ss *selfSeed, f *os.File, blocksize uint64, s Stor // differ from the expected content. This can be used to complete partly // written files. func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []Seed, options AssembleOptions) (*ExtractStats, error) { - type Job struct { - segment IndexSegment - source SeedSegment - } var ( - attempt = 1 - in = make(chan Job) isBlank bool isBlkDevice bool - pb ProgressBar + attempt = 1 ) - g, ctx := errgroup.WithContext(ctx) // Initialize stats to be gathered during extraction stats := &ExtractStats{ @@ -126,12 +68,25 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] isBlank = true } + // Find the in-place seed size (if any) to decide truncation strategy. + var inPlaceSeedSize int64 + for _, seed := range seeds { + if ips, ok := seed.(*InPlaceSeed); ok { + inPlaceSeedSize = ips.index.Length() + break + } + } + // Truncate the output file to the full expected size. Not only does this // confirm there's enough disk space, but it allows for an optimization - // when dealing with the Null Chunk + // when dealing with the Null Chunk. If the in-place seed is larger than + // the target, defer truncation until after assembly so in-place reads + // can access the tail data. if !isBlkDevice { - if err := os.Truncate(name, idx.Length()); err != nil { - return stats, err + if inPlaceSeedSize <= idx.Length() { + if err := os.Truncate(name, idx.Length()); err != nil { + return stats, err + } } } @@ -147,138 +102,186 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] defer ns.close() seeds = append([]Seed{ns}, seeds...) - // Start a self-seed which will become usable once chunks are written contiguously - // beginning at position 0. There is no need to add this to the seeds list because - // when we create a plan it will be empty. - ss, err := newSelfSeed(name, idx) - if err != nil { - return stats, err - } - // Record the total number of seeds and blocksize in the stats stats.Seeds = len(seeds) stats.Blocksize = blocksize - // Start the workers, each having its own filehandle to write concurrently - for i := 0; i < options.N; i++ { - f, err := os.OpenFile(name, os.O_RDWR, 0666) - if err != nil { - return stats, fmt.Errorf("unable to open file %s, %s", name, err) - } - defer f.Close() - g.Go(func() error { - for job := range in { - pb.Add(job.segment.lengthChunks()) - if job.source != nil { - // If we have a seedSegment we expect 1 or more chunks between - // the start and the end of this segment. - stats.addChunksFromSeed(uint64(job.segment.lengthChunks())) - offset := job.segment.start() - length := job.segment.lengthBytes() - copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank) - if err != nil { - return err - } - - // Validate that the written chunks are exactly what we were expecting. - // Because the seed might point to a RW location, if the data changed - // while we were extracting an index, we might end up writing to the - // destination some unexpected values. - for _, c := range job.segment.chunks() { - b := make([]byte, c.Size) - if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - return err - } - sum := Digest.Sum(b) - if sum != c.ID { - if options.InvalidSeedAction == InvalidSeedActionRegenerate { - // Try harder before giving up and aborting - Log.WithField("ID", c.ID.String()).Info("The seed may have changed during processing, trying to take the chunk from the self seed or the store") - if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { - return err - } - } else { - return fmt.Errorf("written data in %s doesn't match its expected hash value, seed may have changed during processing", name) - } - } - } - - stats.addBytesCopied(copied) - stats.addBytesCloned(cloned) - // Record this segment's been written in the self-seed to make it - // available going forward - ss.add(job.segment) - continue - } - - // If we don't have a seedSegment we expect an IndexSegment with just - // a single chunk, that we can take from either the selfSeed, from the - // destination file, or from the store. - if len(job.segment.chunks()) != 1 { - panic("Received an unexpected segment that doesn't contain just a single chunk") - } - c := job.segment.chunks()[0] + // Create the plan +retry: + plan, err := NewPlan(name, idx, s, + PlanWithConcurrency(options.N), + PlanWithSeeds(seeds), + PlanWithTargetIsBlank(isBlank), + ) + if err != nil { + return stats, err + } - if err := writeChunk(c, ss, f, blocksize, s, stats, isBlank); err != nil { - return err - } + // Validate the seed indexes provided and potentially regenerate them + if err := plan.Validate(); err != nil { + // Close the invalid plan + plan.Close() - // Record this chunk's been written in the self-seed. - // Even if we already confirmed that this chunk is present in the - // self-seed, we still need to record it as being written, otherwise - // the self-seed position pointer doesn't advance as we expect. - ss.add(job.segment) - } - return nil - }) - } + var seedError SeedInvalid + if errors.As(err, &seedError) { - // Let the sequencer break up the index into segments, create and validate a plan, - // feed the workers, and stop if there are any errors - seq := NewSeedSequencer(idx, seeds...) - plan := seq.Plan() - for { - validatingPrefix := fmt.Sprintf("Attempt %d: Validating ", attempt) - if err := plan.Validate(ctx, options.N, NewProgressBar(validatingPrefix)); err != nil { - // This plan has at least one invalid seed switch options.InvalidSeedAction { case InvalidSeedActionBailOut: return stats, err case InvalidSeedActionRegenerate: - Log.WithError(err).Info("Unable to use one of the chosen seeds, regenerating it") - if err := seq.RegenerateInvalidSeeds(ctx, options.N, attempt); err != nil { - return stats, err + Log.WithError(err).Info("Unable to use one or more seeds, regenerating them") + for i, s := range seedError.Seeds { + if err := s.RegenerateIndex(ctx, options.N, attempt, i+1); err != nil { + return stats, err + } } + attempt++ + goto retry case InvalidSeedActionSkip: - // Recreate the plan. This time the seed marked as invalid will be skipped - Log.WithError(err).Info("Unable to use one of the chosen seeds, skipping it") + Log.WithError(err).Infof("Unable to use one or more seeds, skipping them") + seeds = slices.DeleteFunc(seeds, func(s Seed) bool { + return slices.Contains(seedError.Seeds, s) + }) + goto retry default: panic("Unhandled InvalidSeedAction") } + } + return stats, err + } + defer plan.Close() - attempt += 1 - seq.Rewind() - plan = seq.Plan() - continue + // Generate the plan steps necessary to build the target + steps := plan.Steps() + if len(steps) == 0 { + return stats, nil + } + + // Split the steps into those that are independent and those that + // require other steps to complete first. + var ( + ready []*PlanStep + delayed = make(stepSet) + ) + for _, step := range steps { + if step.ready() { + ready = append(ready, step) + } else { + delayed.add(step) } - // Found a valid plan - break } - pb = NewProgressBar(fmt.Sprintf("Attempt %d: Assembling ", attempt)) + // Set up progress bar + pb := NewProgressBar(fmt.Sprintf("Attempt %d: Assembling ", attempt)) pb.SetTotal(len(idx.Chunks)) pb.Start() defer pb.Finish() -loop: - for _, segment := range plan { - select { - case <-ctx.Done(): - break loop - case in <- Job{segment.indexSegment, segment.source}: + // Create two channels, one for steps that can run and one for those + // that are complete. + var ( + inProgress = make(chan *PlanStep, len(steps)) + completed = make(chan *PlanStep, options.N) + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(options.N) + + // Bring up the workers + for range options.N { + g.Go(func() error { + f, err := os.OpenFile(name, os.O_RDWR, 0666) + if err != nil { + return fmt.Errorf("unable to open file %s, %s", name, err) + } + defer f.Close() + for { + select { + case step, ok := <-inProgress: + if !ok { + return nil + } + copied, cloned, err := step.source.Execute(f) + if err != nil { + return err + } + // Update byte-level stats + stats.addBytesCopied(copied) + stats.addBytesCloned(cloned) + // Update chunk-level stats based on source type + switch step.source.(type) { + case *copyFromStore: + stats.incChunksFromStore() + case *skipInPlace, *inPlaceSeedSkip, *inPlaceCopy: + stats.addChunksInPlace(uint64(step.numChunks)) + case *fileSeedSource, *selfSeedSegment: + stats.addChunksFromSeed(uint64(step.numChunks)) + } + select { + case completed <- step: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + // Populate all steps that are ready to be executed + for _, step := range ready { + inProgress <- step + } + + // Start the dispatch goroutine which runs the plan. This should be + // outside the errgroup as it'll only be stopped once the workers are + // done. + var wg sync.WaitGroup + wg.Go(func() { + for step := range completed { + pb.Add(step.numChunks) + + // Go through all the steps that are blocked by this + // one and remove the dependency. If all deps have been + // removed, send them for processing and remove them + // from the ready list. + for b := range step.dependents.Each() { + b.dependencies.remove(step) + if b.ready() { + delayed.remove(b) + inProgress <- b + } + } + + // If there are no more delayed steps, close the work queue. + if delayed.len() == 0 { + close(inProgress) + break + } + } + + // Drain the completed queue, updating the progress bar for any + // steps that finished after the work queue was closed. + for step := range completed { + pb.Add(step.numChunks) + } + }) + + // Wait for the workers to complete + err = g.Wait() + + // Stop the dispatch goroutine + close(completed) + wg.Wait() + + // If the in-place seed was larger than the target, truncate now that + // all in-place reads are complete. + if err == nil && inPlaceSeedSize > idx.Length() && !isBlkDevice { + if err := os.Truncate(name, idx.Length()); err != nil { + return stats, err } } - close(in) - return stats, g.Wait() + return stats, err } diff --git a/assemble_test.go b/assemble_test.go index 88e2cb7..b56bb27 100644 --- a/assemble_test.go +++ b/assemble_test.go @@ -10,27 +10,37 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// Build an index with a pre-determined set of (potentially repeated) chunks +func indexSequence(ids ...uint8) Index { + var ( + chunks = make([]IndexChunk, len(ids)) + start uint64 = 0 + size uint64 = 100 + ) + for i, id := range ids { + chunks[i] = IndexChunk{Start: start, Size: size, ID: ChunkID{id}} + start += size + } + return Index{Chunks: chunks} +} + func TestExtract(t *testing.T) { // Make a test file that's guaranteed to have duplicate chunks. b, err := os.ReadFile("testdata/chunker.input") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for range 4 { // Replicate it a few times to make sure we get dupes b = append(b, b...) } b = append(b, make([]byte, 2*ChunkSizeMaxDefault)...) // want to have at least one null-chunk in the input in, err := os.CreateTemp("", "in") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer os.RemoveAll(in.Name()) - if _, err := io.Copy(in, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(in, bytes.NewReader(b)) + require.NoError(t, err) in.Close() // Record the checksum of the input file, used to compare to the output later @@ -44,76 +54,50 @@ func TestExtract(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Chop up the input file into a (temporary) local store store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } - if err := ChopFile(context.Background(), in.Name(), index.Chunks, s, 10, NewProgressBar("")); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + + err = ChopFile(context.Background(), in.Name(), index.Chunks, s, 10, NewProgressBar("")) + require.NoError(t, err) // Make a blank store - used to test a case where no chunk *should* be requested blankstore := t.TempDir() bs, err := NewLocalStore(blankstore, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Prepare output files for each test - first a non-existing one - out1, err := os.CreateTemp("", "out1") - if err != nil { - t.Fatal(err) - } - os.Remove(out1.Name()) + outdir := t.TempDir() + out1 := filepath.Join(outdir, "out1") // This one is a complete file matching what we expect at the end out2, err := os.CreateTemp("", "out2") - if err != nil { - t.Fatal(err) - } - if _, err := io.Copy(out2, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = io.Copy(out2, bytes.NewReader(b)) + require.NoError(t, err) out2.Close() defer os.Remove(out2.Name()) // Incomplete or damaged file that has most but not all data out3, err := os.CreateTemp("", "out3") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) b[0] ^= 0xff // flip some bits b[len(b)-1] ^= 0xff b = append(b, 0) // make it longer - if _, err := io.Copy(out3, bytes.NewReader(b)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(out3, bytes.NewReader(b)) + require.NoError(t, err) out3.Close() defer os.Remove(out3.Name()) - // At this point we have the data needed for the test setup - // in - Temp file that represents the original input file - // inSub - MD5 of the input file - // index - Index file for the input file - // s - Local store containing the chunks needed to rebuild the input file - // bs - A blank local store, all GetChunk fail on it - // out1 - Just a non-existing file that gets assembled - // out2 - The output file already fully complete, no GetChunk should be needed - // out3 - Partial/damaged file with most, but not all data correct - // seedIndex + seedFile - Seed file to help assemble the input tests := map[string]struct { outfile string store Store - seed []Seed }{ - "extract to new file": {outfile: out1.Name(), store: s}, + "extract to new file": {outfile: out1, store: s}, "extract to complete file": {outfile: out2.Name(), store: bs}, "extract to incomplete file": {outfile: out3.Name(), store: s}, } @@ -121,19 +105,16 @@ func TestExtract(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { defer os.Remove(test.outfile) - if _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, + _, err := AssembleFile(context.Background(), test.outfile, index, test.store, nil, AssembleOptions{10, InvalidSeedActionBailOut}, - ); err != nil { - t.Fatal(err) - } - b, err := os.ReadFile(test.outfile) - if err != nil { - t.Fatal(err) - } - outSum := md5.Sum(b) - if inSum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + ) + require.NoError(t, err) + + outBytes, err := os.ReadFile(test.outfile) + require.NoError(t, err) + + outSum := md5.Sum(outBytes) + assert.Equal(t, inSum, outSum, "checksum of extracted file doesn't match expected") }) } } @@ -142,9 +123,7 @@ func TestSeed(t *testing.T) { // Prepare different types of data slices that'll be used to assemble target // and seed files with varying amount of duplication data1, err := os.ReadFile("testdata/chunker.input") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) null := make([]byte, 4*ChunkSizeMaxDefault) rand1 := make([]byte, 4*ChunkSizeMaxDefault) rand.Read(rand1) @@ -155,9 +134,7 @@ func TestSeed(t *testing.T) { store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Define tests with files with different content, by building files out // of sets of byte slices to create duplication or not between the target and @@ -201,13 +178,10 @@ func TestSeed(t *testing.T) { t.Run(name, func(t *testing.T) { // Build the destination file so we can chunk it dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dstBytes := join(test.target...) - if _, err := io.Copy(dst, bytes.NewReader(dstBytes)); err != nil { - t.Fatal(err) - } + _, err = io.Copy(dst, bytes.NewReader(dstBytes)) + require.NoError(t, err) dst.Close() defer os.Remove(dst.Name()) @@ -222,25 +196,19 @@ func TestSeed(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Chop up the input file into the store - if err := ChopFile(context.Background(), dst.Name(), dstIndex.Chunks, s, 10, NewProgressBar("")); err != nil { - t.Fatal(err) - } + err = ChopFile(context.Background(), dst.Name(), dstIndex.Chunks, s, 10, NewProgressBar("")) + require.NoError(t, err) // Build the seed files and indexes then populate the array of seeds var seeds []Seed for _, f := range test.seeds { seedFile, err := os.CreateTemp("", "seed") - if err != nil { - t.Fatal(err) - } - if _, err := io.Copy(seedFile, bytes.NewReader(join(f...))); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = io.Copy(seedFile, bytes.NewReader(join(f...))) + require.NoError(t, err) seedFile.Close() defer os.Remove(seedFile.Name()) seedIndex, _, err := IndexFromFile( @@ -250,29 +218,20 @@ func TestSeed(t *testing.T) { ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault, NewProgressBar(""), ) - if err != nil { - t.Fatal(err) - } - seed, err := NewIndexSeed(dst.Name(), seedFile.Name(), seedIndex) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + seed, err := NewFileSeed(dst.Name(), seedFile.Name(), seedIndex) + require.NoError(t, err) seeds = append(seeds, seed) } - if _, err := AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, + _, err = AssembleFile(context.Background(), dst.Name(), dstIndex, s, seeds, AssembleOptions{10, InvalidSeedActionBailOut}, - ); err != nil { - t.Fatal(err) - } + ) + require.NoError(t, err) b, err := os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) outSum := md5.Sum(b) - if dstSum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + assert.Equal(t, dstSum, outSum, "checksum of extracted file doesn't match expected") }) } @@ -286,9 +245,7 @@ func TestSelfSeedInPlace(t *testing.T) { store := t.TempDir() s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Build a number of fake chunks that can then be used in the test in any order type rawChunk struct { @@ -303,9 +260,7 @@ func TestSelfSeedInPlace(t *testing.T) { b := make([]byte, size) rand.Read(b) chunk := NewChunk(b) - if err = s.StoreChunk(chunk); err != nil { - t.Fatal(err) - } + require.NoError(t, s.StoreChunk(chunk)) chunks[i] = rawChunk{chunk.ID(), b} } @@ -354,40 +309,28 @@ func TestSelfSeedInPlace(t *testing.T) { // Build a temp target file pre-populated with the correct content dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer os.Remove(dst.Name()) _, err = dst.Write(b) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dst.Close() // Extract the file stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, AssembleOptions{1, InvalidSeedActionBailOut}, ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Compare the checksums to that of the input data b, err = os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) outSum := md5.Sum(b) - if sum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } + assert.Equal(t, sum, outSum, "checksum of extracted file doesn't match expected") // All chunks must be in-place. The in-place check in writeChunk // runs before the self-seed lookup, so repeated chunks are not // re-written from the self-seed. - if stats.ChunksInPlace != uint64(len(test.index)) { - t.Fatalf("expected all %d chunks in-place, got %d", len(test.index), stats.ChunksInPlace) - } + assert.Equal(t, uint64(len(test.index)), stats.ChunksInPlace, "expected all chunks in-place") }) } @@ -401,47 +344,252 @@ func join(slices ...[]byte) []byte { return out } -func readCaibxFile(t *testing.T, indexLocation string) (idx Index) { - is, err := NewLocalIndexStore(filepath.Dir(indexLocation)) - require.NoError(t, err) - defer is.Close() - indexName := filepath.Base(indexLocation) - idx, err = is.GetIndex(indexName) - require.NoError(t, err) - return idx -} +// TestAssembleIntegration exercises the full assembly pipeline end-to-end, +// combining all source types in a single reconstruction: in-place skips, +// in-place copies (including cycle detection with buffer-break), self-seed, +// file seeds, and store fetches. It uses variable-size chunks so that +// byte-offset calculations, overlap detection, and buffer sizing are tested +// with non-uniform boundaries. +// +// Each scenario writes an "old" file (the in-place seed), then calls +// AssembleFile to reconstruct a different target layout. The test verifies +// both the output content (md5 checksum + file size) and the per-source +// chunk statistics reported by ExtractStats. +func TestAssembleIntegration(t *testing.T) { + // Create 10 chunks of different sizes filled with random data. + // Variable sizes ensure offset math in overlaps(), inPlaceCopy.Execute() + // (buffer sizing), and Tarjan cycle detection are exercised with + // non-trivial byte boundaries. + type rawChunk struct { + id ChunkID + data []byte + } + chunkSizes := []int{1024, 768, 512, 896, 640, 1152, 384, 1280, 576, 704} + chunks := make([]rawChunk, len(chunkSizes)) + for i, size := range chunkSizes { + b := make([]byte, size) + rand.Read(b) + id := Digest.Sum(b) + chunks[i] = rawChunk{id: id, data: b} + } + + // Named constants for chunk indices to make scenario definitions readable. + const ( + A = 0 // 1024 bytes + B = 1 // 768 bytes + C = 2 // 512 bytes + D = 3 // 896 bytes + E = 4 // 640 bytes + F = 5 // 1152 bytes + G = 6 // 384 bytes + H = 7 // 1280 bytes + X = 8 // 576 bytes + Y = 9 // 704 bytes + ) + + // buildIndex constructs an Index from chunk references, laying them out + // contiguously. It also sets ChunkSizeMax to the largest chunk in the + // index, which is required by newNullChunkSeed inside AssembleFile. + buildIndex := func(indices ...int) Index { + ic := make([]IndexChunk, len(indices)) + var offset uint64 + var maxSize uint64 + for i, idx := range indices { + size := uint64(len(chunks[idx].data)) + ic[i] = IndexChunk{ID: chunks[idx].id, Start: offset, Size: size} + offset += size + if size > maxSize { + maxSize = size + } + } + return Index{ + Index: FormatIndex{ChunkSizeMax: maxSize}, + Chunks: ic, + } + } -func TestExtractWithNonStaticSeeds(t *testing.T) { - n := 10 - outDir := t.TempDir() - out := filepath.Join(outDir, "out") + // buildContent returns the raw bytes for a sequence of chunks, + // used both as file content and as the expected output for verification. + buildContent := func(indices ...int) []byte { + var out []byte + for _, idx := range indices { + out = append(out, chunks[idx].data...) + } + return out + } - // Test a seed that is initially valid, but becomes corrupted halfway through - // the extraction operation - MockValidate = true + // buildStore creates a TestStore containing only the specified chunks. + // Limiting the store to the minimum required set means that if the + // planner incorrectly routes a chunk to the store (instead of a seed + // or in-place source), the test fails with ChunkMissing rather than + // silently succeeding. + buildStore := func(indices ...int) *TestStore { + s := &TestStore{Chunks: make(map[ChunkID][]byte)} + for _, idx := range indices { + s.Chunks[chunks[idx].id] = chunks[idx].data + } + return s + } - store, err := NewLocalStore("testdata/blob2.store", StoreOptions{}) - require.NoError(t, err) - defer store.Close() + type scenario struct { + name string + inPlaceIndices []int // Chunks written to target file before assembly (the "old" content) + targetIndices []int // Desired output layout + fileSeedIndices []int // External file seed content (nil = no file seed) + storeIndices []int // Chunks available in the store + wantInPlace uint64 + wantFromSeeds uint64 + wantFromStore uint64 + } - index := readCaibxFile(t, "testdata/blob2.caibx") + scenarios := []scenario{ + // Scenario 1: exercises every source type in one assembly. + // + // In-place seed (old file): [A][B][C][D][E] = 3840 bytes + // Target: [B][A][C][F][G][G][D][H] = 6400 bytes + // File seed: [F][X][X] + // Store: G, H + // + // After truncation to 6400 bytes the file is: + // [A:1024][B:768][C:512][D:896][E:640][zeros:2560] + // + // Source analysis per target position: + // Pos 0 (B): in-place copy — B exists at seed offset 1024, target offset 0. + // Part of A↔B cycle (asymmetric sizes: 1024 vs 768). + // Pos 1 (A): in-place copy — A exists at seed offset 0, target offset 768. + // Part of A↔B cycle. Buffer-break picks B (smaller src). + // Pos 2 (C): skip in-place — C is at offset 1792 in both seed and target. + // Pos 3 (F): file seed — F is not in the in-place seed, found in file seed. + // D's in-place read [2304:3200] overlaps F's write [2304:3456], + // so D's read must complete first (enforced by inPlaceReads). + // Pos 4 (G): self-seed — G appears at both pos 4 and 5. Self-seed copies + // from pos 5 (requires source position > target position). + // Pos 5 (G): store — self-seed can't source from itself (p <= startPos). + // Pos 6 (D): in-place copy — D at seed offset 2304, target offset 4224. + // Independent move, no cycle. + // Pos 7 (H): store — H is not in any seed. + { + name: "all source types combined", + inPlaceIndices: []int{A, B, C, D, E}, + targetIndices: []int{B, A, C, F, G, G, D, H}, + fileSeedIndices: []int{F, X, X}, + storeIndices: []int{G, H}, + wantInPlace: 4, // B (cycle), A (cycle), C (skip), D (independent move) + wantFromSeeds: 2, // F (file seed), G at pos 4 (self-seed) + wantFromStore: 2, // G at pos 5, H + }, - var seeds []Seed - srcIndex := readCaibxFile(t, "testdata/blob2_corrupted.caibx") - seed, err := NewIndexSeed(out, "testdata/blob2_corrupted", srcIndex) - seeds = append(seeds, seed) + // Scenario 2: in-place seed is larger than the target. + // + // In-place seed: [A][B][C][D] = 3200 bytes + // Target: [B][A] = 1792 bytes + // + // Since the seed (3200) is larger than the target (1792), truncation + // is deferred until after assembly so that in-place reads can access + // the full seed data. A↔B form a swap cycle. After assembly, the + // file is truncated to 1792 bytes. + { + name: "in-place seed larger than target", + inPlaceIndices: []int{A, B, C, D}, + targetIndices: []int{B, A}, + storeIndices: nil, + wantInPlace: 2, // A↔B swap cycle + wantFromSeeds: 0, + wantFromStore: 0, + }, - // Test that the MockValidate works as expected - seq := NewSeedSequencer(index, seeds...) - plan := seq.Plan() - err = plan.Validate(context.Background(), n, NullProgressBar{}) - require.NoError(t, err) + // Scenario 3: in-place seed is smaller than the target. + // + // In-place seed: [A][B] = 1792 bytes + // Target: [A][B][C][D] = 3200 bytes + // + // The file is extended (truncated up) to 3200 bytes. A and B are + // already at the correct offsets and detected by the initial scan. + // C and D are beyond the seed data and must come from the store. + { + name: "in-place seed smaller than target", + inPlaceIndices: []int{A, B}, + targetIndices: []int{A, B, C, D}, + storeIndices: []int{C, D}, + wantInPlace: 2, // A, B detected in-place by initial scan + wantFromSeeds: 0, + wantFromStore: 2, // C, D fetched from store + }, + } - options := AssembleOptions{n, InvalidSeedActionRegenerate} - _, err = AssembleFile(context.Background(), out, index, store, seeds, options) - require.NoError(t, err) + for _, sc := range scenarios { + t.Run(sc.name, func(t *testing.T) { + dir := t.TempDir() + targetPath := filepath.Join(dir, "target") + + // Write the "old" file content — this is what the in-place seed + // describes. AssembleFile will detect it as non-empty, run the + // initial scan, and use the in-place seed to rearrange chunks. + inPlaceContent := buildContent(sc.inPlaceIndices...) + require.NoError(t, os.WriteFile(targetPath, inPlaceContent, 0644)) + + // Create the in-place seed. This wraps a FileSeed where source + // and destination are the same file. + inPlaceIdx := buildIndex(sc.inPlaceIndices...) + inPlaceSeed, err := NewInPlaceSeed(targetPath, inPlaceIdx) + require.NoError(t, err) + seeds := []Seed{inPlaceSeed} + + // If the scenario includes a file seed, write it to a separate + // file and create a FileSeed that maps its chunks by ID. + if sc.fileSeedIndices != nil { + seedPath := filepath.Join(dir, "fileseed") + seedContent := buildContent(sc.fileSeedIndices...) + require.NoError(t, os.WriteFile(seedPath, seedContent, 0644)) + seedIdx := buildIndex(sc.fileSeedIndices...) + fs, err := NewFileSeed(targetPath, seedPath, seedIdx) + require.NoError(t, err) + seeds = append(seeds, fs) + } - //Test the output - err = VerifyIndex(context.Background(), out, index, n, NullProgressBar{}) + // Build the target index (desired output layout) and compute + // the expected content for verification. + targetIdx := buildIndex(sc.targetIndices...) + expected := buildContent(sc.targetIndices...) + expectedSum := md5.Sum(expected) + + // Build the store with only the chunks that should be fetched + // from it. Any chunk incorrectly routed here will succeed; + // any chunk missing from here will fail with ChunkMissing. + store := buildStore(sc.storeIndices...) + + // Run the full assembly pipeline with 4 concurrent workers. + stats, err := AssembleFile( + context.Background(), targetPath, targetIdx, store, seeds, + AssembleOptions{N: 4, InvalidSeedAction: InvalidSeedActionBailOut}, + ) + require.NoError(t, err) + + // Verify the output file matches the expected content. + output, err := os.ReadFile(targetPath) + require.NoError(t, err) + assert.Equal(t, int64(len(expected)), int64(len(output)), "output file size mismatch") + outSum := md5.Sum(output) + assert.Equal(t, expectedSum, outSum, "output checksum mismatch") + + // Verify that chunks were sourced from the expected places. + // This catches planner bugs where the output is correct but + // chunks were fetched from the wrong source (e.g. store + // instead of in-place copy). + assert.Equal(t, len(sc.targetIndices), stats.ChunksTotal, "ChunksTotal") + assert.Equal(t, sc.wantInPlace, stats.ChunksInPlace, "ChunksInPlace") + assert.Equal(t, sc.wantFromSeeds, stats.ChunksFromSeeds, "ChunksFromSeeds") + assert.Equal(t, sc.wantFromStore, stats.ChunksFromStore, "ChunksFromStore") + }) + } +} + +func readCaibxFile(t *testing.T, indexLocation string) (idx Index) { + is, err := NewLocalIndexStore(filepath.Dir(indexLocation)) + require.NoError(t, err) + defer is.Close() + indexName := filepath.Base(indexLocation) + idx, err = is.GetIndex(indexName) require.NoError(t, err) + return idx } diff --git a/cmd/desync/extract.go b/cmd/desync/extract.go index e7e17b4..ae0fff0 100644 --- a/cmd/desync/extract.go +++ b/cmd/desync/extract.go @@ -45,9 +45,10 @@ the index from STDIN. If a seed is invalid, by default the extract operation wil aborted. With --skip-invalid-seeds, the invalid seeds will be discarded and the extraction will continue without them. Otherwise with --regenerate-invalid-seeds, any invalid seed indexes will be regenerated, in memory, by using the -available data, and neither data nor indexes will be changed on disk. Also, if the seed changes -while processing, its invalid chunks will be taken from the self seed, or the store, instead -of aborting.`, +available data, and neither data nor indexes will be changed on disk. +Seeds are validated once before extraction begins. If a seed file is modified while +the extraction is running, it could result in a corrupted target file. In such cases, +use the verify-index command to check the integrity of the target afterwards.`, Example: ` desync extract -s http://192.168.1.1/ -c /path/to/local file.caibx largefile.bin desync extract -s /mnt/store -s /tmp/other/store file.tar.caibx file.tar desync extract -s /mnt/store --seed /mnt/v1.caibx v2.caibx v2.vmdk @@ -170,6 +171,10 @@ func writeInplace(ctx context.Context, name string, idx desync.Index, s desync.S func readSeeds(dstFile string, seedsInfo []string, opts cmdStoreOptions) ([]desync.Seed, error) { var seeds []desync.Seed + absDst, err := filepath.Abs(dstFile) + if err != nil { + return nil, err + } for _, seedInfo := range seedsInfo { var ( srcIndexFile string @@ -196,7 +201,17 @@ func readSeeds(dstFile string, seedsInfo []string, opts cmdStoreOptions) ([]desy return nil, err } - seed, err := desync.NewIndexSeed(dstFile, srcFile, srcIndex) + absSrc, err := filepath.Abs(srcFile) + if err != nil { + return nil, err + } + + var seed desync.Seed + if absSrc == absDst { + seed, err = desync.NewInPlaceSeed(srcFile, srcIndex) + } else { + seed, err = desync.NewFileSeed(dstFile, srcFile, srcIndex) + } if err != nil { return nil, err } @@ -211,6 +226,10 @@ func readSeedDirs(dstFile, dstIdxFile string, dirs []string, opts cmdStoreOption if err != nil { return nil, err } + absDst, err := filepath.Abs(dstFile) + if err != nil { + return nil, err + } for _, dir := range dirs { err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -240,7 +259,16 @@ func readSeedDirs(dstFile, dstIdxFile string, dirs []string, opts cmdStoreOption if err != nil { return err } - seed, err := desync.NewIndexSeed(dstFile, srcFile, srcIndex) + absSrc, err := filepath.Abs(srcFile) + if err != nil { + return err + } + var seed desync.Seed + if absSrc == absDst { + seed, err = desync.NewInPlaceSeed(srcFile, srcIndex) + } else { + seed, err = desync.NewFileSeed(dstFile, srcFile, srcIndex) + } if err != nil { return err } diff --git a/cmd/desync/extract_test.go b/cmd/desync/extract_test.go index d857ac4..105ee1d 100644 --- a/cmd/desync/extract_test.go +++ b/cmd/desync/extract_test.go @@ -68,9 +68,10 @@ func TestExtractCommand(t *testing.T) { // Explicitly set blob1 seed because seed-dir skips a seed if it's the same index file we gave in input. {"extract with seed directory without skipping invalid seeds", []string{"-s", "testdata/blob1.store", "--seed-dir", "testdata", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, - // Same as above, no need for `--skip-invalid-seeds` + // The plan generator processes seeds in order, so the corrupted seed + // may get placements that fail validation. Use --skip-invalid-seeds. {"extract with multiple corrupted seeds", - []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, + []string{"--store", "testdata/empty.store", "--seed", "testdata/blob2_corrupted.caibx", "--seed", "testdata/blob1.caibx", "--skip-invalid-seeds", "testdata/blob1.caibx"}, out1}, {"extract with single seed that has all the expected chunks", []string{"--store", "testdata/empty.store", "--seed", "testdata/blob1.caibx", "testdata/blob1.caibx"}, out1}, // blob2_corrupted is a corrupted blob that doesn't match its seed index. We regenerate the seed index to match diff --git a/errors.go b/errors.go index 45425c6..5d0fa12 100644 --- a/errors.go +++ b/errors.go @@ -44,3 +44,17 @@ func (e InvalidFormat) Error() string { type Interrupted struct{} func (e Interrupted) Error() string { return "interrupted" } + +// SeedInvalid is returned when a seed's data doesn't match its index. +type SeedInvalid struct { + Seeds []Seed + Err error +} + +func (e SeedInvalid) Error() string { + return fmt.Sprintf("invalid seed: %s", e.Err) +} + +func (e SeedInvalid) Unwrap() error { + return e.Err +} diff --git a/extractstats.go b/extractstats.go index 9deefcf..229f7b9 100644 --- a/extractstats.go +++ b/extractstats.go @@ -22,8 +22,8 @@ func (s *ExtractStats) incChunksFromStore() { atomic.AddUint64(&s.ChunksFromStore, 1) } -func (s *ExtractStats) incChunksInPlace() { - atomic.AddUint64(&s.ChunksInPlace, 1) +func (s *ExtractStats) addChunksInPlace(n uint64) { + atomic.AddUint64(&s.ChunksInPlace, n) } func (s *ExtractStats) addChunksFromSeed(n uint64) { diff --git a/go.mod b/go.mod index 50d0a7e..81d97b5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/folbricht/desync -go 1.24.0 +go 1.25.0 require ( cloud.google.com/go/storage v1.30.1 diff --git a/nullseed.go b/nullseed.go index 0cc45a9..a784542 100644 --- a/nullseed.go +++ b/nullseed.go @@ -42,9 +42,9 @@ func (s *nullChunkSeed) close() error { return nil } -func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) { - if len(chunks) == 0 { - return 0, nil +func (s *nullChunkSeed) LongestMatchFrom(chunks []IndexChunk, startPos int) (uint64, uint64, int, int) { + if startPos >= len(chunks) { + return 0, 0, 0, 0 } var ( n int @@ -53,7 +53,7 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) if !s.canReflink { limit = 100 } - for _, c := range chunks { + for _, c := range chunks[startPos:] { if limit != 0 && limit == n { break } @@ -63,11 +63,16 @@ func (s *nullChunkSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) n++ } if n == 0 { - return 0, nil + return 0, 0, 0, 0 } - return n, &nullChunkSection{ - from: chunks[0].Start, - to: chunks[n-1].Start + chunks[n-1].Size, + last := chunks[startPos+n-1] + byteLength := last.Start + last.Size - chunks[startPos].Start + return 0, byteLength, 0, n +} + +func (s *nullChunkSeed) GetSegment(offset, size uint64) SeedSegment { + return &nullChunkSection{ + size: size, blockfile: s.blockfile, canReflink: s.canReflink, } @@ -77,17 +82,8 @@ func (s *nullChunkSeed) RegenerateIndex(ctx context.Context, n int, attempt int, panic("A nullseed can't be regenerated") } -func (s *nullChunkSeed) SetInvalid(value bool) { - panic("A nullseed is never expected to be invalid") -} - -func (s *nullChunkSeed) IsInvalid() bool { - // A nullseed is never expected to be invalid - return false -} - type nullChunkSection struct { - from, to uint64 + size uint64 blockfile *os.File canReflink bool } @@ -101,7 +97,7 @@ func (s *nullChunkSection) FileName() string { return "" } -func (s *nullChunkSection) Size() uint64 { return s.to - s.from } +func (s *nullChunkSection) Size() uint64 { return s.size } func (s *nullChunkSection) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) { if length != s.Size() { diff --git a/seed.go b/seed.go index ffc49f3..1e8f08a 100644 --- a/seed.go +++ b/seed.go @@ -12,10 +12,40 @@ const DefaultBlockSize = 4096 // another index+blob that present on disk already and is used to copy or clone // existing chunks or blocks into the target from. type Seed interface { - LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) + // LongestMatchFrom returns the longest sequence of chunks anywhere in the seed + // that match chunks starting at chunks[startPos]. It returns the byte offset + // and byte length of the match in the seed, plus the chunk offset and chunk + // length. Returns (0, 0, 0, 0) if there is no match. + LongestMatchFrom(chunks []IndexChunk, startPos int) (byteOffset uint64, byteLength uint64, chunkOffset int, chunkLength int) + GetSegment(offset, size uint64) SeedSegment RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error - SetInvalid(value bool) - IsInvalid() bool +} + +// maxMatchFrom compares chunks starting at position 0 with seedChunks starting +// at position p. Returns (p, count) where count is the number of consecutive +// matching chunks. A limit of zero means no limit. +func maxMatchFrom(chunks, seedChunks []IndexChunk, p, limit int) (int, int) { + if len(chunks) == 0 { + return 0, 0 + } + var ( + sp int + dp = p + ) + for { + if limit != 0 && sp == limit { + break + } + if dp >= len(seedChunks) || sp >= len(chunks) { + break + } + if chunks[sp].ID != seedChunks[dp].ID { + break + } + dp++ + sp++ + } + return p, dp - p } // SeedSegment represents a matching range between a Seed and a file being @@ -27,30 +57,3 @@ type SeedSegment interface { Validate(file *os.File) error WriteInto(dst *os.File, offset, end, blocksize uint64, isBlank bool) (copied uint64, cloned uint64, err error) } - -// IndexSegment represents a contiguous section of an index which is used when -// assembling a file from seeds. first/last are positions in the index. -type IndexSegment struct { - index Index - first, last int -} - -func (s IndexSegment) lengthChunks() int { - return s.last - s.first + 1 -} - -func (s IndexSegment) lengthBytes() uint64 { - return s.end() - s.start() -} - -func (s IndexSegment) start() uint64 { - return s.index.Chunks[s.first].Start -} - -func (s IndexSegment) end() uint64 { - return s.index.Chunks[s.last].Start + s.index.Chunks[s.last].Size -} - -func (s IndexSegment) chunks() []IndexChunk { - return s.index.Chunks[s.first : s.last+1] -} diff --git a/selfseed.go b/selfseed.go deleted file mode 100644 index 86a5220..0000000 --- a/selfseed.go +++ /dev/null @@ -1,93 +0,0 @@ -package desync - -import ( - "context" - "sync" -) - -// FileSeed is used to populate a contiguous seed during extraction in order -// to copy/clone ranges that were written to the output file earlier. This is -// to potentially dedup/reflink duplicate chunks or ranges of chunks within the -// same file. -type selfSeed struct { - file string - index Index - pos map[ChunkID][]int - canReflink bool - written int - mu sync.RWMutex - cache map[int]int -} - -// newSelfSeed initializes a new seed based on the file being extracted -func newSelfSeed(file string, index Index) (*selfSeed, error) { - s := selfSeed{ - file: file, - pos: make(map[ChunkID][]int), - index: index, - canReflink: CanClone(file, file), - cache: make(map[int]int), - } - return &s, nil -} - -// add records a new segment that's been written to the file. Since only contiguous -// ranges of chunks are considered and writing happens concurrently, the segment -// written here will not be usable until all earlier chunks have been written as -// well. -func (s *selfSeed) add(segment IndexSegment) { - s.mu.Lock() - defer s.mu.Unlock() - - // Make a record of this segment in the cache since those could come in - // out-of-order - s.cache[segment.first] = segment.last + 1 - - // Advance pos until we find a chunk we don't yet have recorded while recording - // the chunk positions we do have in the position map used to find seed matches. - // Since it's guaranteed that the numbers are only increasing, we drop old numbers - // from the cache map to keep its size to a minimum and only store out-of-sequence - // numbers - for { - // See if we can advance the write pointer in the self-seed which requires - // consecutive chunks. If we don't have the next segment yet, just keep it - // in the cache until we do. - next, ok := s.cache[s.written] - if !ok { - break - } - // Record all chunks in this segment as written by adding them to the position map - for i := s.written; i < next; i++ { - chunk := s.index.Chunks[i] - s.pos[chunk.ID] = append(s.pos[chunk.ID], i) - } - delete(s.cache, s.written) - s.written = next - } -} - -// getChunk returns a segment with the requested chunk ID. If selfSeed doesn't -// have the requested chunk, nil will be returned. -func (s *selfSeed) getChunk(id ChunkID) SeedSegment { - s.mu.RLock() - pos, ok := s.pos[id] - s.mu.RUnlock() - if !ok { - return nil - } - first := pos[0] - return newFileSeedSegment(s.file, s.index.Chunks[first:first+1], s.canReflink) -} - -func (s *selfSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error { - panic("A selfSeed can't be regenerated") -} - -func (s *selfSeed) SetInvalid(value bool) { - panic("A selfSeed is never expected to be invalid") -} - -func (s *selfSeed) IsInvalid() bool { - // A selfSeed is never expected to be invalid - return false -} diff --git a/selfseed_test.go b/selfseed_test.go deleted file mode 100644 index 93c38e8..0000000 --- a/selfseed_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package desync - -import ( - "context" - "crypto/md5" - "crypto/rand" - "os" - "testing" -) - -func TestSelfSeed(t *testing.T) { - // Setup a temporary store - store := t.TempDir() - - s, err := NewLocalStore(store, StoreOptions{}) - if err != nil { - t.Fatal(err) - } - - // Build a number of fake chunks that can then be used in the test in any order - type rawChunk struct { - id ChunkID - data []byte - } - size := 1024 - numChunks := 10 - chunks := make([]rawChunk, numChunks) - - for i := range numChunks { - b := make([]byte, size) - rand.Read(b) - chunk := NewChunk(b) - if err = s.StoreChunk(chunk); err != nil { - t.Fatal(err) - } - chunks[i] = rawChunk{chunk.ID(), b} - } - - // Define tests with files with different content, by building files out - // of sets of byte slices to create duplication or not between the target and - // its seeds. Also define a min/max of bytes that should be cloned (from the - // self-seed). That number can vary since even with 1 worker goroutine there - // another feeder goroutine which can influence timings/results a little. - tests := map[string]struct { - index []int - minCloned int - maxCloned int - }{ - "single chunk": { - index: []int{0}, - minCloned: 0, - maxCloned: 0, - }, - "repeating single chunk": { - index: []int{0, 0, 0, 0, 0}, - minCloned: 3 * size, - maxCloned: 4 * size, - }, - "repeating chunk sequence": { - index: []int{0, 1, 2, 0, 1, 2, 2}, - minCloned: 4 * size, - maxCloned: 4 * size, - }, - "repeating chunk sequence mid file": { - index: []int{1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3}, - minCloned: 7 * size, - maxCloned: 7 * size, - }, - "repeating chunk sequence reversed": { - index: []int{0, 1, 2, 2, 1, 0}, - minCloned: 2 * size, - maxCloned: 3 * size, - }, - "non-repeating chunks": { - index: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - minCloned: 0, - maxCloned: 0, - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - // Build an index from the target chunks - var idx Index - var b []byte - for i, p := range test.index { - chunk := IndexChunk{ - ID: chunks[p].id, - Start: uint64(i * size), - Size: uint64(size), - } - b = append(b, chunks[p].data...) - idx.Chunks = append(idx.Chunks, chunk) - } - - // Calculate the expected checksum - sum := md5.Sum(b) - - // Build a temp target file to extract into - dst, err := os.CreateTemp("", "dst") - if err != nil { - t.Fatal(err) - } - defer os.Remove(dst.Name()) - defer dst.Close() - - // Extract the file - stats, err := AssembleFile(context.Background(), dst.Name(), idx, s, nil, - AssembleOptions{1, InvalidSeedActionBailOut}, - ) - if err != nil { - t.Fatal(err) - } - - // Compare the checksums to that of the input data - b, err = os.ReadFile(dst.Name()) - if err != nil { - t.Fatal(err) - } - outSum := md5.Sum(b) - if sum != outSum { - t.Fatal("checksum of extracted file doesn't match expected") - } - - // Compare to the expected number of bytes copied or cloned from the seed - fromSeed := int(stats.BytesCopied + stats.BytesCloned) - if fromSeed < test.minCloned { - t.Fatalf("expected min %d bytes copied/cloned from self-seed, got %d", test.minCloned, fromSeed) - } - if fromSeed > test.maxCloned { - t.Fatalf("expected max %d bytes copied/cloned from self-seed, got %d", test.maxCloned, fromSeed) - } - }) - } - -} diff --git a/sequencer.go b/sequencer.go deleted file mode 100644 index 1248a06..0000000 --- a/sequencer.go +++ /dev/null @@ -1,176 +0,0 @@ -package desync - -import ( - "context" - "golang.org/x/sync/errgroup" - "os" -) - -// SeedSequencer is used to find sequences of chunks from seed files when assembling -// a file from an index. Using seeds reduces the need to download and decompress chunks -// from chunk stores. It also enables the use of reflinking/cloning of sections of -// files from a seed file where supported to reduce disk usage. -type SeedSequencer struct { - seeds []Seed - index Index - current int -} - -// SeedSegmentCandidate represent a single segment that we expect to use -// in a Plan -type SeedSegmentCandidate struct { - seed Seed - source SeedSegment - indexSegment IndexSegment -} - -type Plan []SeedSegmentCandidate - -var MockValidate = false - -// NewSeedSequencer initializes a new sequencer from a number of seeds. -func NewSeedSequencer(idx Index, src ...Seed) *SeedSequencer { - return &SeedSequencer{ - seeds: src, - index: idx, - } -} - -// Plan returns a new possible plan, representing an ordered list of -// segments that can be used to re-assemble the requested file -func (r *SeedSequencer) Plan() (plan Plan) { - for { - seed, segment, source, done := r.Next() - plan = append(plan, SeedSegmentCandidate{seed, source, segment}) - if done { - break - } - } - return plan -} - -// Next returns a sequence of index chunks (from the target index) and the -// longest matching segment from one of the seeds. If source is nil, no -// match was found in the seeds and the chunk needs to be retrieved from a -// store. If done is true, the sequencer is complete. -func (r *SeedSequencer) Next() (seed Seed, segment IndexSegment, source SeedSegment, done bool) { - var ( - max uint64 - advance = 1 - ) - for _, s := range r.seeds { - n, m := s.LongestMatchWith(r.index.Chunks[r.current:]) - if n > 0 && m.Size() > max { - seed = s - source = m - advance = n - max = m.Size() - } - } - - segment = IndexSegment{index: r.index, first: r.current, last: r.current + advance - 1} - r.current += advance - return seed, segment, source, r.current >= len(r.index.Chunks) -} - -// Rewind resets the current target index to the beginning. -func (r *SeedSequencer) Rewind() { - r.current = 0 -} - -// isFileSeed returns true if this segment is pointing to a fileSeed -func (s SeedSegmentCandidate) isFileSeed() bool { - // We expect an empty filename when using nullSeeds - return s.source != nil && s.source.FileName() != "" -} - -// RegenerateInvalidSeeds regenerates the index to match the unexpected seed content -func (r *SeedSequencer) RegenerateInvalidSeeds(ctx context.Context, n int, attempt int) error { - seedNumber := 1 - for _, s := range r.seeds { - if s.IsInvalid() { - if err := s.RegenerateIndex(ctx, n, attempt, seedNumber); err != nil { - return err - } - seedNumber += 1 - } - } - return nil -} - -// Validate validates a proposed plan by checking if all the chosen chunks -// are correctly provided from the seeds. In case a seed has invalid chunks, the -// entire seed is marked as invalid and an error is returned. -func (p Plan) Validate(ctx context.Context, n int, pb ProgressBar) (err error) { - type Job struct { - candidate SeedSegmentCandidate - file *os.File - } - var ( - in = make(chan Job) - fileMap = make(map[string]*os.File) - ) - if MockValidate { - // This is used in the automated tests to mock a plan that is valid - return nil - } - length := 0 - for _, s := range p { - if !s.isFileSeed() { - continue - } - length += s.indexSegment.lengthChunks() - } - pb.SetTotal(length) - pb.Start() - defer pb.Finish() - // Share a single file descriptor per seed for all the goroutines - for _, s := range p { - if !s.isFileSeed() { - continue - } - name := s.source.FileName() - if _, present := fileMap[name]; present { - continue - } else { - file, err := os.Open(name) - if err != nil { - // We were not able to open the seed. Mark it as invalid and return - s.seed.SetInvalid(true) - return err - } - fileMap[name] = file - defer file.Close() - } - } - g, ctx := errgroup.WithContext(ctx) - // Concurrently validate all the chunks in this plan - for range n { - g.Go(func() error { - for job := range in { - if err := job.candidate.source.Validate(job.file); err != nil { - job.candidate.seed.SetInvalid(true) - return err - } - pb.Add(job.candidate.indexSegment.lengthChunks()) - } - return nil - }) - } - -loop: - for _, s := range p { - if !s.isFileSeed() { - // This is not a fileSeed, we have nothing to validate - continue - } - select { - case <-ctx.Done(): - break loop - case in <- Job{s, fileMap[s.source.FileName()]}: - } - } - close(in) - - return g.Wait() -}