diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 9324d58012f..98a6323be31 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -72,7 +72,6 @@ const ( optionNameStaticNodes = "static-nodes" optionNameAllowPrivateCIDRs = "allow-private-cidrs" optionNameSleepAfter = "sleep-after" - optionNameUsePostageSnapshot = "use-postage-snapshot" optionNameStorageIncentivesEnable = "storage-incentives-enable" optionNameStateStoreCacheCapacity = "statestore-cache-capacity" optionNameTargetNeighborhood = "target-neighborhood" @@ -288,7 +287,6 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Bool(optionNamePProfMutex, false, "enable pprof mutex profile") cmd.Flags().StringSlice(optionNameStaticNodes, []string{}, "protect nodes from getting kicked out on bootnode") cmd.Flags().Bool(optionNameAllowPrivateCIDRs, false, "allow to advertise private CIDRs to the public network") - cmd.Flags().Bool(optionNameUsePostageSnapshot, false, "bootstrap node using postage snapshot from the network") cmd.Flags().Bool(optionNameStorageIncentivesEnable, true, "enable storage incentives feature") cmd.Flags().Uint64(optionNameStateStoreCacheCapacity, 100_000, "lru memory caching capacity in number of statestore entries") cmd.Flags().String(optionNameTargetNeighborhood, "", "neighborhood to target in binary format (ex: 111111001) for mining the initial overlay") diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 9633b9bd6f1..b58978bbe00 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -334,7 +334,6 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo TracingEndpoint: tracingEndpoint, TracingServiceName: c.config.GetString(optionNameTracingServiceName), TrxDebugMode: c.config.GetBool(optionNameTransactionDebugMode), - UsePostageSnapshot: c.config.GetBool(optionNameUsePostageSnapshot), WarmupTime: c.config.GetDuration(optionWarmUpTime), WelcomeMessage: c.config.GetString(optionWelcomeMessage), WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress), diff --git a/packaging/bee.yaml b/packaging/bee.yaml index 41a6b2a2568..f5354f8e4d4 100644 --- a/packaging/bee.yaml +++ b/packaging/bee.yaml @@ -110,8 +110,6 @@ password-file: "/var/lib/bee/password" # tracing-service-name: bee ## skips the gas estimate step for contract transactions # transaction-debug-mode: false -## bootstrap node using postage snapshot from the network -# use-postage-snapshot: false ## log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace # verbosity: info ## maximum node warmup duration; proceeds when stable or after this time diff --git a/packaging/homebrew-amd64/bee.yaml b/packaging/homebrew-amd64/bee.yaml index 4e93b8e3bab..811ffe62d60 100644 --- a/packaging/homebrew-amd64/bee.yaml +++ b/packaging/homebrew-amd64/bee.yaml @@ -110,8 +110,6 @@ password-file: "/usr/local/var/lib/swarm-bee/password" # tracing-service-name: bee ## skips the gas estimate step for contract transactions # transaction-debug-mode: false -## bootstrap node using postage snapshot from the network -# use-postage-snapshot: false ## log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace # verbosity: info ## maximum node warmup duration; proceeds when stable or after this time diff --git a/packaging/homebrew-arm64/bee.yaml b/packaging/homebrew-arm64/bee.yaml index 291288c3eed..382405cb149 100644 --- a/packaging/homebrew-arm64/bee.yaml +++ b/packaging/homebrew-arm64/bee.yaml @@ -110,8 +110,6 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" # tracing-service-name: bee ## skips the gas estimate step for contract transactions # transaction-debug-mode: false -## bootstrap node using postage snapshot from the network -# use-postage-snapshot: false ## log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace # verbosity: info ## maximum node warmup duration; proceeds when stable or after this time diff --git a/packaging/scoop/bee.yaml b/packaging/scoop/bee.yaml index d73c9f4feb1..58b48f70b52 100644 --- a/packaging/scoop/bee.yaml +++ b/packaging/scoop/bee.yaml @@ -110,8 +110,6 @@ password-file: "./password" # tracing-service-name: bee ## skips the gas estimate step for contract transactions # transaction-debug-mode: false -## bootstrap node using postage snapshot from the network -# use-postage-snapshot: false ## log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace # verbosity: info ## maximum node warmup duration; proceeds when stable or after this time diff --git a/pkg/node/bootstrap.go b/pkg/node/bootstrap.go deleted file mode 100644 index eab32166ec9..00000000000 --- a/pkg/node/bootstrap.go +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright 2022 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package node - -import ( - "context" - "crypto/ecdsa" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "io" - "math/big" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethersphere/bee/v2/pkg/accounting" - "github.com/ethersphere/bee/v2/pkg/addressbook" - "github.com/ethersphere/bee/v2/pkg/crypto" - "github.com/ethersphere/bee/v2/pkg/feeds" - "github.com/ethersphere/bee/v2/pkg/feeds/factory" - "github.com/ethersphere/bee/v2/pkg/file" - "github.com/ethersphere/bee/v2/pkg/file/joiner" - "github.com/ethersphere/bee/v2/pkg/file/loadsave" - "github.com/ethersphere/bee/v2/pkg/file/redundancy" - "github.com/ethersphere/bee/v2/pkg/hive" - "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/manifest" - "github.com/ethersphere/bee/v2/pkg/p2p/libp2p" - "github.com/ethersphere/bee/v2/pkg/postage" - "github.com/ethersphere/bee/v2/pkg/pricer" - "github.com/ethersphere/bee/v2/pkg/pricing" - "github.com/ethersphere/bee/v2/pkg/retrieval" - "github.com/ethersphere/bee/v2/pkg/settlement/pseudosettle" - "github.com/ethersphere/bee/v2/pkg/spinlock" - "github.com/ethersphere/bee/v2/pkg/stabilization" - "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storer" - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/ethersphere/bee/v2/pkg/topology" - "github.com/ethersphere/bee/v2/pkg/topology/kademlia" - "github.com/ethersphere/bee/v2/pkg/topology/lightnode" - "github.com/ethersphere/bee/v2/pkg/tracing" - "github.com/hashicorp/go-multierror" - ma "github.com/multiformats/go-multiaddr" -) - -var ( - // zeroed out while waiting to be replacement for the new snapshot feed address - // must be different to avoid stale reads on the old contract - snapshotFeed = swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") - errDataMismatch = errors.New("data length mismatch") -) - -const ( - getSnapshotRetries = 3 - retryWait = time.Second * 5 - timeout = time.Minute * 2 -) - -func bootstrapNode( - ctx context.Context, - addr string, - swarmAddress swarm.Address, - nonce []byte, - addressbook addressbook.Interface, - bootnodes []ma.Multiaddr, - lightNodes *lightnode.Container, - stateStore storage.StateStorer, - signer crypto.Signer, - networkID uint64, - logger log.Logger, - libp2pPrivateKey *ecdsa.PrivateKey, - detector *stabilization.Detector, - o *Options, -) (snapshot *postage.ChainSnapshot, retErr error) { - tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{ - Enabled: o.TracingEnabled, - Endpoint: o.TracingEndpoint, - ServiceName: o.TracingServiceName, - }) - if err != nil { - return nil, fmt.Errorf("tracer: %w", err) - } - - p2pCtx, p2pCancel := context.WithCancel(ctx) - - b := &Bee{ - ctxCancel: p2pCancel, - tracerCloser: tracerCloser, - } - - defer func() { - retErr = multierror.Append(new(multierror.Error), retErr, b.Shutdown()).ErrorOrNil() - }() - - p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{ - PrivateKey: libp2pPrivateKey, - NATAddr: o.NATAddr, - NATWSSAddr: o.NATWSSAddr, - EnableWS: o.EnableWS, - EnableWSS: o.EnableWSS, - WSSAddr: o.WSSAddr, - AutoTLSStorageDir: o.AutoTLSStorageDir, - AutoTLSDomain: o.AutoTLSDomain, - AutoTLSRegistrationEndpoint: o.AutoTLSRegistrationEndpoint, - AutoTLSCAEndpoint: o.AutoTLSCAEndpoint, - WelcomeMessage: o.WelcomeMessage, - FullNode: false, - Nonce: nonce, - }) - if err != nil { - return nil, fmt.Errorf("p2p service: %w", err) - } - b.p2pService = p2ps - b.p2pHalter = p2ps - - hive := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, o.AllowPrivateCIDRs, swarmAddress, logger) - - if err = p2ps.AddProtocol(hive.Protocol()); err != nil { - return nil, fmt.Errorf("hive service: %w", err) - } - b.hiveCloser = hive - - kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, detector, logger, - kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode, StaticNodes: o.StaticNodes, DataDir: o.DataDir}) - if err != nil { - return nil, fmt.Errorf("unable to create kademlia: %w", err) - } - b.topologyCloser = kad - b.topologyHalter = kad - hive.SetAddPeersHandler(kad.AddPeers) - p2ps.SetPickyNotifier(kad) - - paymentThreshold, _ := new(big.Int).SetString(o.PaymentThreshold, 10) - lightPaymentThreshold := new(big.Int).Div(paymentThreshold, big.NewInt(lightFactor)) - - pricer := pricer.NewFixedPricer(swarmAddress, basePrice) - - pricing := pricing.New(p2ps, logger, paymentThreshold, lightPaymentThreshold, big.NewInt(minPaymentThreshold)) - if err = p2ps.AddProtocol(pricing.Protocol()); err != nil { - return nil, fmt.Errorf("pricing service: %w", err) - } - - acc, err := accounting.NewAccounting( - paymentThreshold, - o.PaymentTolerance, - o.PaymentEarly, - logger, - stateStore, - pricing, - big.NewInt(refreshRate), - lightFactor, - p2ps, - ) - if err != nil { - return nil, fmt.Errorf("accounting: %w", err) - } - b.accountingCloser = acc - - // bootstrapper mode uses the light node refresh rate - enforcedRefreshRate := big.NewInt(lightRefreshRate) - - pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, enforcedRefreshRate, enforcedRefreshRate, p2ps) - if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil { - return nil, fmt.Errorf("pseudosettle service: %w", err) - } - - acc.SetRefreshFunc(pseudosettleService.Pay) - - pricing.SetPaymentThresholdObserver(acc) - - localStore, err := storer.New(ctx, "", &storer.Options{ - CacheCapacity: 1_000_000, - }) - if err != nil { - return nil, fmt.Errorf("local store creation: %w", err) - } - b.localstoreCloser = localStore - - radiusF := func() (uint8, error) { return swarm.MaxBins, nil } - - retrieve := retrieval.New(swarmAddress, radiusF, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching) - if err = p2ps.AddProtocol(retrieve.Protocol()); err != nil { - return nil, fmt.Errorf("retrieval service: %w", err) - } - b.retrievalCloser = retrieve - - localStore.SetRetrievalService(retrieve) - - if err := kad.Start(p2pCtx); err != nil { - return nil, err - } - - if err := p2ps.Ready(); err != nil { - return nil, err - } - - if err := waitPeers(kad); err != nil { - return nil, errors.New("timed out waiting for kademlia peers") - } - - logger.Info("bootstrap: trying to fetch stamps snapshot") - - var ( - snapshotRootCh swarm.Chunk - reader file.Joiner - l int64 - eventsJSON []byte - ) - - for range getSnapshotRetries { - if err != nil { - time.Sleep(retryWait) - } - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - snapshotRootCh, err = getLatestSnapshot(ctx, localStore.Download(true), localStore.Cache(), snapshotFeed) - if err != nil { - logger.Warning("bootstrap: fetching snapshot failed", "error", err) - continue - } - break - } - if err != nil { - return nil, err - } - - for range getSnapshotRetries { - if err != nil { - time.Sleep(retryWait) - } - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - reader, l, err = joiner.NewJoiner(ctx, localStore.Download(true), localStore.Cache(), snapshotRootCh.Address(), snapshotRootCh) - if err != nil { - logger.Warning("bootstrap: file joiner failed", "error", err) - continue - } - - eventsJSON, err = io.ReadAll(reader) - if err != nil { - logger.Warning("bootstrap: reading failed", "error", err) - continue - } - - if len(eventsJSON) != int(l) { - err = errDataMismatch - logger.Warning("bootstrap: count mismatch", "error", err) - continue - } - break - } - if err != nil { - return nil, err - } - - events := postage.ChainSnapshot{} - err = json.Unmarshal(eventsJSON, &events) - if err != nil { - return nil, err - } - - return &events, nil -} - -// wait till some peers are connected. returns true if all is ok -func waitPeers(kad *kademlia.Kad) error { - const minPeersCount = 25 - return spinlock.WaitWithInterval(time.Minute, time.Second, func() bool { - count := 0 - _ = kad.EachConnectedPeer(func(_ swarm.Address, _ uint8) (bool, bool, error) { - count++ - return false, false, nil - }, topology.Select{}) - return count >= minPeersCount - }) -} - -func getLatestSnapshot( - ctx context.Context, - st storage.Getter, - putter storage.Putter, - address swarm.Address, -) (swarm.Chunk, error) { - ls := loadsave.NewReadonly(st, putter, redundancy.DefaultLevel) - feedFactory := factory.New(st) - - m, err := manifest.NewDefaultManifestReference( - address, - ls, - ) - if err != nil { - return nil, fmt.Errorf("not a manifest: %w", err) - } - - e, err := m.Lookup(ctx, "/") - if err != nil { - return nil, fmt.Errorf("node lookup: %w", err) - } - - var ( - owner, topic []byte - t = new(feeds.Type) - ) - meta := e.Metadata() - if e := meta["swarm-feed-owner"]; e != "" { - owner, err = hex.DecodeString(e) - if err != nil { - return nil, err - } - } - if e := meta["swarm-feed-topic"]; e != "" { - topic, err = hex.DecodeString(e) - if err != nil { - return nil, err - } - } - if e := meta["swarm-feed-type"]; e != "" { - err := t.FromString(e) - if err != nil { - return nil, err - } - } - if len(owner) == 0 || len(topic) == 0 { - return nil, fmt.Errorf("node lookup: %s", "feed metadata absent") - } - f := feeds.New(topic, common.BytesToAddress(owner)) - - l, err := feedFactory.NewLookup(*t, f) - if err != nil { - return nil, fmt.Errorf("feed lookup failed: %w", err) - } - - u, _, _, err := l.At(ctx, time.Now().Unix(), 0) - if err != nil { - return nil, err - } - - return feeds.GetWrappedChunk(ctx, st, u, false) -} - -func batchStoreExists(s storage.StateStorer) (bool, error) { - hasOne := false - err := s.Iterate("batchstore_", func(key, value []byte) (stop bool, err error) { - hasOne = true - return true, err - }) - - return hasOne, err -} diff --git a/pkg/node/node.go b/pkg/node/node.go index 56cdbc4572f..7564d894f58 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -63,6 +63,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/stabilization" "github.com/ethersphere/bee/v2/pkg/status" "github.com/ethersphere/bee/v2/pkg/steward" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storageincentives" "github.com/ethersphere/bee/v2/pkg/storageincentives/redistribution" "github.com/ethersphere/bee/v2/pkg/storageincentives/staking" @@ -117,7 +118,6 @@ type Bee struct { saludCloser io.Closer storageIncetivesCloser io.Closer pushSyncCloser io.Closer - retrievalCloser io.Closer shutdownInProgress bool shutdownMutex sync.Mutex syncingStopped *syncutil.Signaler @@ -182,7 +182,6 @@ type Options struct { TracingEndpoint string TracingServiceName string TrxDebugMode bool - UsePostageSnapshot bool WarmupTime time.Duration WelcomeMessage string WhitelistedWithdrawalAddress []string @@ -629,34 +628,6 @@ func NewBee( logger.Debug("node warmup check: period complete.", "periodEndTime", t, "eventsInPeriod", periodCount, "rateStdDev", stDev) } - var initBatchState *postage.ChainSnapshot - // Bootstrap node with postage snapshot only if it is running on mainnet, is a fresh - // install or explicitly asked by user to resync - if networkID == mainnetNetworkID && o.UsePostageSnapshot && (!batchStoreExists || o.Resync) { - start := time.Now() - logger.Info("cold postage start detected. fetching postage stamp snapshot from swarm") - initBatchState, err = bootstrapNode( - ctx, - addr, - swarmAddress, - nonce, - addressbook, - bootnodes, - lightNodes, - stateStore, - signer, - networkID, - log.Noop, - libp2pPrivateKey, - detector, - o, - ) - logger.Info("bootstrapper created", "elapsed", time.Since(start)) - if err != nil { - logger.Error(err, "bootstrapper failed to fetch batch state") - } - } - var registry *prometheus.Registry if apiService != nil { @@ -845,7 +816,7 @@ func NewBee( if err != nil { logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") } else { - err = snapshotBatchSvc.Start(ctx, postageSyncStart, initBatchState) + err = snapshotBatchSvc.Start(ctx, postageSyncStart) syncStatus.Store(true) if err != nil { syncErr.Store(err) @@ -873,7 +844,7 @@ func NewBee( } if o.FullNodeMode { - err = batchSvc.Start(ctx, postageSyncStart, initBatchState) + err = batchSvc.Start(ctx, postageSyncStart) syncStatus.Store(true) if err != nil { syncErr.Store(err) @@ -882,7 +853,7 @@ func NewBee( } else { go func() { logger.Info("started postage contract data sync in the background...") - err := batchSvc.Start(ctx, postageSyncStart, initBatchState) + err := batchSvc.Start(ctx, postageSyncStart) syncStatus.Store(true) if err != nil { syncErr.Store(err) @@ -1529,3 +1500,13 @@ func validatePublicAddress(addr string) error { return nil } + +func batchStoreExists(s storage.StateStorer) (bool, error) { + hasOne := false + err := s.Iterate("batchstore_", func(key, value []byte) (stop bool, err error) { + hasOne = true + return true, err + }) + + return hasOne, err +} diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 6ad76b0bed4..6268b681d8b 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -239,14 +239,14 @@ func (svc *batchService) TransactionEnd() error { var ErrInterruped = errors.New("postage sync interrupted") -func (svc *batchService) Start(ctx context.Context, startBlock uint64, initState *postage.ChainSnapshot) (err error) { +func (svc *batchService) Start(ctx context.Context, startBlock uint64) (err error) { dirty := false err = svc.stateStore.Get(dirtyDBKey, &dirty) if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } - if dirty || svc.resync || initState != nil { + if dirty || svc.resync { if dirty { svc.logger.Warning("batch service: dirty shutdown detected, resetting batch store") @@ -268,11 +268,7 @@ func (svc *batchService) Start(ctx context.Context, startBlock uint64, initState startBlock = cs.Block } - if initState != nil && initState.LastBlockNumber > startBlock { - startBlock = initState.LastBlockNumber - } - - syncedChan := svc.listener.Listen(ctx, startBlock+1, svc, initState) + syncedChan := svc.listener.Listen(ctx, startBlock+1, svc) return <-syncedChan } diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index 9500acaba28..2a9ef622c10 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -32,7 +32,7 @@ var ( type mockListener struct { } -func (*mockListener) Listen(ctx context.Context, from uint64, updater postage.EventUpdater, _ *postage.ChainSnapshot) <-chan error { +func (*mockListener) Listen(ctx context.Context, from uint64, updater postage.EventUpdater) <-chan error { c := make(chan error, 1) c <- nil return c @@ -514,7 +514,7 @@ func TestTransactionOk(t *testing.T) { t.Parallel() svc, store, s := newTestStoreAndService(t) - if err := svc.Start(context.Background(), 10, nil); err != nil { + if err := svc.Start(context.Background(), 10); err != nil { t.Fatal(err) } @@ -530,7 +530,7 @@ func TestTransactionOk(t *testing.T) { if err != nil { t.Fatal(err) } - if err := svc2.Start(context.Background(), 10, nil); err != nil { + if err := svc2.Start(context.Background(), 10); err != nil { t.Fatal(err) } @@ -543,7 +543,7 @@ func TestTransactionError(t *testing.T) { t.Parallel() svc, store, s := newTestStoreAndService(t) - if err := svc.Start(context.Background(), 10, nil); err != nil { + if err := svc.Start(context.Background(), 10); err != nil { t.Fatal(err) } @@ -555,7 +555,7 @@ func TestTransactionError(t *testing.T) { if err != nil { t.Fatal(err) } - if err := svc2.Start(context.Background(), 10, nil); err != nil { + if err := svc2.Start(context.Background(), 10); err != nil { t.Fatal(err) } diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 2288fb9fcbf..5b7999658b1 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -21,7 +21,7 @@ type EventUpdater interface { UpdateDepth(id []byte, depth uint8, normalisedBalance *big.Int, txHash common.Hash) error UpdatePrice(price *big.Int, txHash common.Hash) error UpdateBlockNumber(blockNumber uint64) error - Start(ctx context.Context, startBlock uint64, initState *ChainSnapshot) error + Start(ctx context.Context, startBlock uint64) error TransactionStart() error TransactionEnd() error @@ -90,7 +90,7 @@ type ChainStateGetter interface { // Listener provides a blockchain event iterator. type Listener interface { io.Closer - Listen(ctx context.Context, from uint64, updater EventUpdater, initState *ChainSnapshot) <-chan error + Listen(ctx context.Context, from uint64, updater EventUpdater) <-chan error } type BatchEventListener interface { diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index d9d52b2c5a1..5bdfa9a249e 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -186,7 +186,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error } } -func (l *listener) Listen(ctx context.Context, from uint64, updater postage.EventUpdater, initState *postage.ChainSnapshot) <-chan error { +func (l *listener) Listen(ctx context.Context, from uint64, updater postage.EventUpdater) <-chan error { ctx, cancel := context.WithCancel(ctx) go func() { <-l.quit @@ -226,13 +226,6 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even return nil } - if initState != nil { - err := processEvents(initState.Events, initState.LastBlockNumber+1) - if err != nil { - l.logger.Error(err, "failed bootstrapping from initial state") - } - } - batchFactor, err := strconv.ParseUint(batchFactorOverridePublic, 10, 64) if err != nil { l.logger.Warning("batch factor conversation failed", "batch_factor", batchFactor, "error", err) diff --git a/pkg/postage/listener/listener_test.go b/pkg/postage/listener/listener_test.go index bd15c2659ab..bb34086bb6a 100644 --- a/pkg/postage/listener/listener_test.go +++ b/pkg/postage/listener/listener_test.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" chaincfg "github.com/ethersphere/bee/v2/pkg/config" "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/listener" "github.com/ethersphere/bee/v2/pkg/util/abiutil" "github.com/ethersphere/bee/v2/pkg/util/syncutil" @@ -81,7 +80,7 @@ func TestListener(t *testing.T) { backoffTime, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -122,7 +121,7 @@ func TestListener(t *testing.T) { backoffTime, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -164,7 +163,7 @@ func TestListener(t *testing.T) { ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -203,7 +202,7 @@ func TestListener(t *testing.T) { backoffTime, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -266,7 +265,7 @@ func TestListener(t *testing.T) { backoffTime, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -346,7 +345,7 @@ func TestListener(t *testing.T) { 0, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -374,7 +373,7 @@ func TestListener(t *testing.T) { 0, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case <-c.C: @@ -401,7 +400,7 @@ func TestListener(t *testing.T) { backoffTime, ) testutil.CleanupCloser(t, l) - <-l.Listen(context.Background(), 0, ev, nil) + <-l.Listen(context.Background(), 0, ev) select { case e := <-ev.eventC: @@ -418,122 +417,6 @@ func TestListener(t *testing.T) { }) } -func TestListenerBatchState(t *testing.T) { - t.Parallel() - - ev := newEventUpdaterMock() - mf := newMockFilterer() - - create := createArgs{ - id: hash[:], - owner: addr[:], - amount: big.NewInt(42), - normalisedAmount: big.NewInt(43), - depth: 100, - } - - topup := topupArgs{ - id: hash[:], - amount: big.NewInt(0), - normalisedBalance: big.NewInt(1), - } - - depthIncrease := depthArgs{ - id: hash[:], - depth: 200, - normalisedBalance: big.NewInt(2), - } - - priceUpdate := priceArgs{ - price: big.NewInt(500), - } - - snapshot := &postage.ChainSnapshot{ - Events: []types.Log{ - create.toLog(496), - topup.toLog(497), - depthIncrease.toLog(498), - priceUpdate.toLog(499), - }, - FirstBlockNumber: 496, - LastBlockNumber: 499, - Timestamp: time.Now().Unix(), - } - - stop := make(chan struct{}) - done := make(chan struct{}) - errs := make(chan error) - noOfEvents := 0 - - go func() { - for { - select { - case <-stop: - return - case e := <-ev.eventC: - noOfEvents++ - switch ev := e.(type) { - case blockNumberCall: - if ev.blockNumber < 497 && ev.blockNumber > 500 { - errs <- fmt.Errorf("invalid blocknumber call %d", ev.blockNumber) - return - } - if ev.blockNumber == 500 { - close(done) - return - } - case createArgs: - if err := ev.compare(create); err != nil { - errs <- err - return - } - case topupArgs: - if err := ev.compare(topup); err != nil { - errs <- err - return - } - case depthArgs: - if err := ev.compare(depthIncrease); err != nil { - errs <- err - return - } - case priceArgs: - if err := ev.compare(priceUpdate); err != nil { - errs <- err - return - } - } - } - } - }() - - l := listener.New( - nil, - log.Noop, - mf, - postageStampContractAddress, - postageStampContractABI, - 1, - stallingTimeout, - backoffTime, - ) - testutil.CleanupCloser(t, l) - l.Listen(context.Background(), snapshot.LastBlockNumber+1, ev, snapshot) - - defer close(stop) - - select { - case <-time.After(5 * time.Second): - t.Fatal("timedout waiting for events to be processed", noOfEvents) - case err := <-errs: - t.Fatal(err) - case <-done: - if noOfEvents != 9 { - t.Fatal("invalid count of events on completion", noOfEvents) - } - } -} - func newEventUpdaterMock() *updater { return &updater{ eventC: make(chan any, 1), @@ -595,7 +478,7 @@ func (u *updater) UpdateBlockNumber(blockNumber uint64) error { return u.blockNumberUpdateError } -func (u *updater) Start(ctx context.Context, bno uint64, cs *postage.ChainSnapshot) error { +func (u *updater) Start(ctx context.Context, bno uint64) error { return nil }