From 01273b124a05d84fc7a9f851373db5c2c87eb556 Mon Sep 17 00:00:00 2001 From: Nate Date: Tue, 3 Feb 2026 11:26:03 -0800 Subject: [PATCH 1/3] change pruning --- ...ntegrators_rather_than_a_passive_option.md | 7 ++++ chain/db_test.go | 37 +++++++++++++++++++ chain/manager.go | 33 +++++++++++++---- chain/options.go | 7 ---- internal/cmd/calcswaps/main.go | 2 +- internal/cmd/sync/main.go | 13 ++++--- 6 files changed, 78 insertions(+), 21 deletions(-) create mode 100644 .changeset/change_block_pruning_to_an_active_decision_for_integrators_rather_than_a_passive_option.md diff --git a/.changeset/change_block_pruning_to_an_active_decision_for_integrators_rather_than_a_passive_option.md b/.changeset/change_block_pruning_to_an_active_decision_for_integrators_rather_than_a_passive_option.md new file mode 100644 index 00000000..16c52f22 --- /dev/null +++ b/.changeset/change_block_pruning_to_an_active_decision_for_integrators_rather_than_a_passive_option.md @@ -0,0 +1,7 @@ +--- +default: major +--- + +# Changed block pruning to an active decision for integrators rather than a passive option + +This fixes a race condition on some nodes when chain subscribers are slow where blocks would be removed from the store before they could be indexed. diff --git a/chain/db_test.go b/chain/db_test.go index 59650d2a..6cb4d3ca 100644 --- a/chain/db_test.go +++ b/chain/db_test.go @@ -338,3 +338,40 @@ func TestReorgExpiringFileContractOrder(t *testing.T) { t.Fatal("expected the chain states to be equal after reorg with new manager") } } + +func TestPruneBlocks(t *testing.T) { + n, genesisBlock := testutil.V2Network() + + store, tipState, err := chain.NewDBStore(chain.NewMemDB(), n, genesisBlock, nil) + if err != nil { + t.Fatal(err) + } + cm := chain.NewManager(store, tipState) + + // mine a bunch of blocks + testutil.MineBlocks(t, cm, types.VoidAddress, 100) + + // prune up to height 50 + cm.PruneBlocks(50) + + // ensure blocks < 50 are pruned + for height := range uint64(50) { + if index, ok := cm.BestIndex(height); !ok { + t.Fatalf("expected header at height %d to exist", height) + } else if _, exists := cm.Block(index.ID); exists { + t.Fatalf("expected block at height %d to exist", height) + } + } + + // ensure blocks >= 50 exist + for height := uint64(50); height <= 100; height++ { + index, ok := cm.BestIndex(height) + if !ok { + t.Fatalf("expected block at height %d to exist", height) + } else if block, exists := cm.Block(index.ID); !exists { + t.Fatalf("expected block at height %d to exist", height) + } else if block.ID() != index.ID { + t.Fatalf("block ID mismatch at height %d: expected %s, got %s", height, index.ID, block.ID()) + } + } +} diff --git a/chain/manager.go b/chain/manager.go index ae9a70b0..2d0ffe35 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -85,8 +85,7 @@ type Manager struct { expiringFileContractOrder map[types.BlockID][]types.FileContractID // configuration options - log *zap.Logger - pruneTarget uint64 + log *zap.Logger txpool struct { txns []types.Transaction @@ -430,12 +429,6 @@ func (m *Manager) applyTip(index types.ChainIndex) error { m.store.ApplyBlock(cs, cau) m.applyPoolUpdate(cau, cs) m.tipState = cs - - if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget { - if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok { - m.store.PruneBlock(index.ID) - } - } return nil } @@ -527,6 +520,30 @@ func (m *Manager) reorgTo(index types.ChainIndex) error { return nil } +// PruneBlocks prunes any blocks below the specified height +// from the store. This should only be called after all +// subscribers have processed blocks up to the specified height. +// +// Once the blocks are removed, they cannot be re-added without +// resyncing from genesis. +// +// This can take a while depending on the number of blocks +// it is recommended to call this frequently to avoid +// a large backlog. +func (m *Manager) PruneBlocks(height uint64) { + m.mu.Lock() + defer m.mu.Unlock() + + for h := height; h > 0; h-- { + println(h - 1) + index, ok := m.store.BestIndex(h - 1) + if !ok { + break // block does not exist + } + m.store.PruneBlock(index.ID) + } +} + // UpdatesSince returns at most max updates on the path between index and the // Manager's current tip. func (m *Manager) UpdatesSince(index types.ChainIndex, maxBlocks int) (rus []RevertUpdate, aus []ApplyUpdate, err error) { diff --git a/chain/options.go b/chain/options.go index 4ba72bce..6f495cd8 100644 --- a/chain/options.go +++ b/chain/options.go @@ -15,13 +15,6 @@ func WithLog(l *zap.Logger) ManagerOption { } } -// WithPruneTarget sets the target number of blocks to store. -func WithPruneTarget(n uint64) ManagerOption { - return func(m *Manager) { - m.pruneTarget = n - } -} - // WithExpiringContractOrder sets the order of file contracts that are expiring // at a given height. This is used to work around a bug in the chain db // where the order of expiring file contracts is not preserved across diff --git a/internal/cmd/calcswaps/main.go b/internal/cmd/calcswaps/main.go index 080fa421..9531c15f 100644 --- a/internal/cmd/calcswaps/main.go +++ b/internal/cmd/calcswaps/main.go @@ -98,7 +98,7 @@ func main() { if err != nil { log.Panic("failed to create clean db", zap.Error(err)) } - cm := chain.NewManager(cleanStore, cs, chain.WithLog(log.Named("chain")), chain.WithPruneTarget(144)) + cm := chain.NewManager(cleanStore, cs, chain.WithLog(log.Named("chain"))) tip := min(tipState.Index.Height, maxCheckHeight) overwriteIDs := make(map[types.BlockID][]types.FileContractID) diff --git a/internal/cmd/sync/main.go b/internal/cmd/sync/main.go index e21c0851..527185cf 100644 --- a/internal/cmd/sync/main.go +++ b/internal/cmd/sync/main.go @@ -151,7 +151,7 @@ func main() { if err != nil { log.Panic("failed to create store", zap.Error(err)) } - cm := chain.NewManager(store, tipState, chain.WithLog(log.Named("chain")), chain.WithPruneTarget(pruneTarget)) + cm := chain.NewManager(store, tipState, chain.WithLog(log.Named("chain"))) log = log.With(zap.Stringer("start", cm.Tip())) l, err := net.Listen("tcp", ":0") @@ -186,7 +186,7 @@ func main() { log.Info("starting sync", zap.String("network", network), zap.String("dir", dir)) - reorgCh := make(chan struct{}, 1) + reorgCh := make(chan types.ChainIndex, 1) var lastLog time.Time cm.OnReorg(func(ci types.ChainIndex) { if time.Since(lastLog) > 5*time.Second { @@ -194,13 +194,16 @@ func main() { log.Info("synced to", zap.Stringer("tip", ci)) lastLog = time.Now() } - reorgCh <- struct{}{} + reorgCh <- ci }) for { select { - case <-reorgCh: - // still syncing + case tip := <-reorgCh: + // still syncing, prune blocks + if tip.Height > 144 { + cm.PruneBlocks(tip.Height - 144) + } continue case <-ctx.Done(): log.Info("shutting down") From c427e24afd6e28690cd87f12bbc47ec238a518b0 Mon Sep 17 00:00:00 2001 From: Nate Date: Tue, 3 Feb 2026 12:40:59 -0800 Subject: [PATCH 2/3] Use block separately --- chain/manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/chain/manager.go b/chain/manager.go index 2d0ffe35..0218c36e 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -535,10 +535,11 @@ func (m *Manager) PruneBlocks(height uint64) { defer m.mu.Unlock() for h := height; h > 0; h-- { - println(h - 1) index, ok := m.store.BestIndex(h - 1) if !ok { break // block does not exist + } else if _, _, ok := m.store.Block(index.ID); !ok { + break // block does not exist } m.store.PruneBlock(index.ID) } From 6e6627581e02a9b65a897e0f3145864165ec5833 Mon Sep 17 00:00:00 2001 From: Nate Date: Tue, 3 Feb 2026 12:41:52 -0800 Subject: [PATCH 3/3] fix comment --- chain/db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/db_test.go b/chain/db_test.go index 6cb4d3ca..d13e1ffa 100644 --- a/chain/db_test.go +++ b/chain/db_test.go @@ -359,7 +359,7 @@ func TestPruneBlocks(t *testing.T) { if index, ok := cm.BestIndex(height); !ok { t.Fatalf("expected header at height %d to exist", height) } else if _, exists := cm.Block(index.ID); exists { - t.Fatalf("expected block at height %d to exist", height) + t.Fatalf("expected block at height %d to not exist", height) } }