Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
default: major
---

# Changed block pruning to an active decision for integrators rather than a passive option

This fixes a race condition on some nodes when chain subscribers are slow where blocks would be removed from the store before they could be indexed.
37 changes: 37 additions & 0 deletions chain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,40 @@ func TestReorgExpiringFileContractOrder(t *testing.T) {
t.Fatal("expected the chain states to be equal after reorg with new manager")
}
}

func TestPruneBlocks(t *testing.T) {
n, genesisBlock := testutil.V2Network()

store, tipState, err := chain.NewDBStore(chain.NewMemDB(), n, genesisBlock, nil)
if err != nil {
t.Fatal(err)
}
cm := chain.NewManager(store, tipState)

// mine a bunch of blocks
testutil.MineBlocks(t, cm, types.VoidAddress, 100)

// prune up to height 50
cm.PruneBlocks(50)

// ensure blocks < 50 are pruned
for height := range uint64(50) {
if index, ok := cm.BestIndex(height); !ok {
t.Fatalf("expected header at height %d to exist", height)
} else if _, exists := cm.Block(index.ID); exists {
t.Fatalf("expected block at height %d to not exist", height)
}
}

// ensure blocks >= 50 exist
for height := uint64(50); height <= 100; height++ {
index, ok := cm.BestIndex(height)
if !ok {
t.Fatalf("expected block at height %d to exist", height)
} else if block, exists := cm.Block(index.ID); !exists {
t.Fatalf("expected block at height %d to exist", height)
} else if block.ID() != index.ID {
t.Fatalf("block ID mismatch at height %d: expected %s, got %s", height, index.ID, block.ID())
}
}
}
34 changes: 26 additions & 8 deletions chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ type Manager struct {
expiringFileContractOrder map[types.BlockID][]types.FileContractID

// configuration options
log *zap.Logger
pruneTarget uint64
log *zap.Logger

txpool struct {
txns []types.Transaction
Expand Down Expand Up @@ -430,12 +429,6 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs

if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget {
if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok {
m.store.PruneBlock(index.ID)
}
}
return nil
}

Expand Down Expand Up @@ -527,6 +520,31 @@ func (m *Manager) reorgTo(index types.ChainIndex) error {
return nil
}

// PruneBlocks prunes any blocks below the specified height
// from the store. This should only be called after all
// subscribers have processed blocks up to the specified height.
//
// Once the blocks are removed, they cannot be re-added without
// resyncing from genesis.
//
// This can take a while depending on the number of blocks
// it is recommended to call this frequently to avoid
// a large backlog.
func (m *Manager) PruneBlocks(height uint64) {
m.mu.Lock()
defer m.mu.Unlock()

for h := height; h > 0; h-- {
index, ok := m.store.BestIndex(h - 1)
if !ok {
break // block does not exist
} else if _, _, ok := m.store.Block(index.ID); !ok {
break // block does not exist
}
m.store.PruneBlock(index.ID)
}
}

// UpdatesSince returns at most max updates on the path between index and the
// Manager's current tip.
func (m *Manager) UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []RevertUpdate, aus []ApplyUpdate, err error) {
Expand Down
7 changes: 0 additions & 7 deletions chain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ func WithLog(l *zap.Logger) ManagerOption {
}
}

// WithPruneTarget sets the target number of blocks to store.
func WithPruneTarget(n uint64) ManagerOption {
return func(m *Manager) {
m.pruneTarget = n
}
}

// WithExpiringContractOrder sets the order of file contracts that are expiring
// at a given height. This is used to work around a bug in the chain db
// where the order of expiring file contracts is not preserved across
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/calcswaps/main.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions internal/cmd/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func main() {
if err != nil {
log.Panic("failed to create store", zap.Error(err))
}
cm := chain.NewManager(store, tipState, chain.WithLog(log.Named("chain")), chain.WithPruneTarget(pruneTarget))
cm := chain.NewManager(store, tipState, chain.WithLog(log.Named("chain")))
log = log.With(zap.Stringer("start", cm.Tip()))

l, err := net.Listen("tcp", ":0")
Expand Down Expand Up @@ -186,21 +186,24 @@ func main() {

log.Info("starting sync", zap.String("network", network), zap.String("dir", dir))

reorgCh := make(chan struct{}, 1)
reorgCh := make(chan types.ChainIndex, 1)
var lastLog time.Time
cm.OnReorg(func(ci types.ChainIndex) {
if time.Since(lastLog) > 5*time.Second {
// debounce
log.Info("synced to", zap.Stringer("tip", ci))
lastLog = time.Now()
}
reorgCh <- struct{}{}
reorgCh <- ci
})

for {
select {
case <-reorgCh:
// still syncing
case tip := <-reorgCh:
// still syncing, prune blocks
if tip.Height > 144 {
cm.PruneBlocks(tip.Height - 144)
}
continue
case <-ctx.Done():
log.Info("shutting down")
Expand Down