Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
with:
repository: 0xPolygon/kurtosis-cdk
path: kurtosis-cdk
ref: arpit/bridge-service
ref: v0.3.3

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
with:
repository: 0xPolygon/kurtosis-cdk
path: kurtosis-cdk
ref: arpit/bridge-service
ref: v0.2.31

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk
Expand Down
91 changes: 50 additions & 41 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ func (a *Aggregator) getVerifiedBatchAccInputHash(ctx context.Context, batchNumb

func (a *Aggregator) getAndLockBatchToProve(
ctx context.Context, prover ProverInterface,
) (*state.Batch, []byte, *state.Proof, error) {
) (*state.Batch, *state.Proof, error) {
proverID := prover.ID()
proverName := prover.Name()

Expand All @@ -1076,7 +1076,7 @@ func (a *Aggregator) getAndLockBatchToProve(
// Get last virtual batch number from L1
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

proofExists := true
Expand All @@ -1089,7 +1089,7 @@ func (a *Aggregator) getAndLockBatchToProve(
if err != nil {
tmpLogger.Infof("Error checking proof exists for batch %d", batchNumberToVerify)

return nil, nil, nil, err
return nil, nil, err
}

if proofExists {
Expand All @@ -1101,7 +1101,7 @@ func (a *Aggregator) getAndLockBatchToProve(
err := a.storage.CleanupGeneratedProofs(ctx, math.MaxInt, nil)
if err != nil {
tmpLogger.Infof("Error cleaning up generated proofs for batch %d", batchNumberToVerify)
return nil, nil, nil, err
return nil, nil, err
}
batchNumberToVerify--
break
Expand All @@ -1112,38 +1112,45 @@ func (a *Aggregator) getAndLockBatchToProve(
// Check if the batch has been sequenced
sequence, err := a.l1Syncr.GetSequenceByBatchNumber(ctx, batchNumberToVerify)
if err != nil && !errors.Is(err, entities.ErrNotFound) {
return nil, nil, nil, err
return nil, nil, err
}

// Not found, so it it not possible to verify the batch yet
if sequence == nil || errors.Is(err, entities.ErrNotFound) {
tmpLogger.Infof("Sequencing event for batch %d has not been synced yet, "+
"so it is not possible to verify it yet. Waiting ...", batchNumberToVerify)

return nil, nil, nil, state.ErrNotFound
return nil, nil, state.ErrNotFound
}

stateSequence := state.Sequence{
FromBatchNumber: sequence.FromBatchNumber,
ToBatchNumber: sequence.ToBatchNumber,
}

// Store the sequence in aggregator DB
err = a.storage.AddSequence(ctx, stateSequence, nil)
if err != nil {
tmpLogger.Infof("Error storing sequence for batch %d", batchNumberToVerify)
return nil, nil, err
}

// Get Batch from L1 Syncer
virtualBatch, err := a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, batchNumberToVerify)
if err != nil && !errors.Is(err, entities.ErrNotFound) {
a.logger.Errorf("Error getting virtual batch: %v", err)
return nil, nil, nil, err
return nil, nil, err
} else if errors.Is(err, entities.ErrNotFound) {
a.logger.Infof("Virtual batch %d has not been synced yet, "+
"so it is not possible to verify it yet. Waiting ...", batchNumberToVerify)
return nil, nil, nil, state.ErrNotFound
return nil, nil, state.ErrNotFound
}

// Get Batch from RPC
rpcBatch, err := a.rpcClient.GetBatch(batchNumberToVerify)
if err != nil {
a.logger.Errorf("error getting batch %d from RPC: %v.", batchNumberToVerify, err)
return nil, nil, nil, err
return nil, nil, err
}

// Compare BatchL2Data from virtual batch and rpcBatch (skipping injected batch (1))
Expand All @@ -1164,7 +1171,7 @@ func (a *Aggregator) getAndLockBatchToProve(
oldAccInputHash := a.getAccInputHash(batchNumberToVerify - 1)
if oldAccInputHash == (common.Hash{}) && batchNumberToVerify > 1 {
tmpLogger.Warnf("AccInputHash for previous batch (%d) is not in memory. Waiting ...", batchNumberToVerify-1)
return nil, nil, nil, state.ErrNotFound
return nil, nil, state.ErrNotFound
}

forcedBlockHashL1 := rpcBatch.ForcedBlockHashL1()
Expand All @@ -1174,7 +1181,7 @@ func (a *Aggregator) getAndLockBatchToProve(
l1Block, err := a.l1Syncr.GetL1BlockByNumber(ctx, virtualBatch.BlockNumber)
if err != nil {
a.logger.Errorf("Error getting l1 block: %v", err)
return nil, nil, nil, err
return nil, nil, err
}

forcedBlockHashL1 = l1Block.ParentHash
Expand Down Expand Up @@ -1220,34 +1227,9 @@ func (a *Aggregator) getAndLockBatchToProve(
ForkID: a.cfg.ForkId,
}

// Request the witness from the server, if it is busy just keep looping until it is available
start := time.Now()
witness, err := a.rpcClient.GetWitness(batchNumberToVerify, a.cfg.UseFullWitness)
for err != nil {
if errors.Is(err, rpc.ErrBusy) {
a.logger.Debugf(
"Witness server is busy, retrying get witness for batch %d in %v",
batchNumberToVerify, a.cfg.RetryTime.Duration,
)
} else {
a.logger.Errorf("Failed to get witness for batch %d, err: %v", batchNumberToVerify, err)
}
time.Sleep(a.cfg.RetryTime.Duration)
witness, err = a.rpcClient.GetWitness(batchNumberToVerify, a.cfg.UseFullWitness)
}
end := time.Now()
a.logger.Debugf("Time to get witness for batch %d: %v", batchNumberToVerify, end.Sub(start))

// Store the sequence in aggregator DB
err = a.storage.AddSequence(ctx, stateSequence, nil)
if err != nil {
tmpLogger.Infof("Error storing sequence for batch %d", batchNumberToVerify)

return nil, nil, nil, err
}

// All the data required to generate a proof is ready
tmpLogger.Infof("All information to generate proof for batch %d is ready", virtualBatch.BatchNumber)
tmpLogger.Infof("All information to generate proof for batch %d is ready. "+
"Witness will be requested.", virtualBatch.BatchNumber)
tmpLogger = tmpLogger.WithFields("batch", virtualBatch.BatchNumber)

now := time.Now().Round(time.Microsecond)
Expand All @@ -1264,10 +1246,10 @@ func (a *Aggregator) getAndLockBatchToProve(
if err != nil {
tmpLogger.Errorf("Failed to add batch proof to DB for batch %d, err: %v", virtualBatch.BatchNumber, err)

return nil, nil, nil, err
return nil, nil, err
}

return stateBatch, witness, proof, nil
return stateBatch, proof, nil
}

func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover ProverInterface) (bool, error) {
Expand All @@ -1278,7 +1260,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover ProverInt
)
tmpLogger.Debug("tryGenerateBatchProof start")

batchToProve, witness, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
batchToProve, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
if errors.Is(err0, state.ErrNotFound) || errors.Is(err0, entities.ErrNotFound) {
// nothing to proof, swallow the error
tmpLogger.Debug("Nothing to generate proof")
Expand All @@ -1288,6 +1270,11 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover ProverInt
return false, err0
}

// Request Witness
tmpLogger.Infof("Requesting witness for batch %d", batchToProve.BatchNumber)
witness := a.getWitness(batchToProve.BatchNumber)
tmpLogger.Infof("Witness received for batch %d", batchToProve.BatchNumber)

tmpLogger = tmpLogger.WithFields("batch", batchToProve.BatchNumber)

var (
Expand Down Expand Up @@ -1369,6 +1356,28 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover ProverInt
return true, nil
}

func (a *Aggregator) getWitness(batchNumber uint64) []byte {
// Request the witness from the server, if it is busy just keep looping until it is available
start := time.Now()
witness, err := a.rpcClient.GetWitness(batchNumber, a.cfg.UseFullWitness)
for err != nil {
if errors.Is(err, rpc.ErrBusy) {
a.logger.Debugf(
"Witness server is busy, retrying get witness for batch %d in %v",
batchNumber, a.cfg.RetryTime.Duration,
)
} else {
a.logger.Errorf("Failed to get witness for batch %d, err: %v", batchNumber, err)
}
time.Sleep(a.cfg.RetryTime.Duration)
witness, err = a.rpcClient.GetWitness(batchNumber, a.cfg.UseFullWitness)
}
end := time.Now()
a.logger.Debugf("Time to get witness for batch %d: %v", batchNumber, end.Sub(start))

return witness
}

func (a *Aggregator) performSanityChecks(tmpLogger *log.Logger, stateRoot, accInputHash common.Hash,
batchToProve *state.Batch) {
// Sanity Check: state root from the proof must match the one from the batch
Expand Down
62 changes: 62 additions & 0 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
mocks "github.com/0xPolygon/cdk/aggregator/mocks"
"github.com/0xPolygon/cdk/aggregator/prover"
"github.com/0xPolygon/cdk/config/types"
"github.com/0xPolygon/cdk/rpc"
rpctypes "github.com/0xPolygon/cdk/rpc/types"
"github.com/0xPolygon/cdk/state"
"github.com/0xPolygonHermez/zkevm-synchronizer-l1/synchronizer"
Expand Down Expand Up @@ -1551,6 +1552,51 @@ func Test_tryGenerateBatchProof(t *testing.T) {
assert.ErrorIs(err, errTest)
},
},
{
name: "getAndLockBatchToProve CheckProofExistsForBatch fails",
setup: func(m mox, a *Aggregator) {
m.proverMock.On("Name").Return(proverName).Twice()
m.proverMock.On("ID").Return(proverID).Twice()
m.proverMock.On("Addr").Return("addr")
m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once()
m.storageMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(true, errTest)
},
asserts: func(result bool, a *Aggregator, err error) {
assert.False(result)
assert.ErrorIs(err, errTest)
},
},
{
name: "getAndLockBatchToProve CleanupGeneratedProofs fails",
setup: func(m mox, a *Aggregator) {
m.proverMock.On("Name").Return(proverName).Twice()
m.proverMock.On("ID").Return(proverID).Twice()
m.proverMock.On("Addr").Return("addr")
m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once()
m.storageMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(true, nil)
m.storageMock.On("CleanupGeneratedProofs", mock.Anything, mock.Anything, mock.Anything).Return(errTest).Once()
},
asserts: func(result bool, a *Aggregator, err error) {
assert.False(result)
assert.ErrorIs(err, errTest)
},
},
{
name: "getAndLockBatchToProve GetSequenceByBatchNumber fails",
setup: func(m mox, a *Aggregator) {
m.proverMock.On("Name").Return(proverName).Twice()
m.proverMock.On("ID").Return(proverID).Twice()
m.proverMock.On("Addr").Return("addr")
m.etherman.On("GetLatestVerifiedBatchNum").Return(lastVerifiedBatchNum, nil).Once()
m.storageMock.On("CheckProofExistsForBatch", mock.MatchedBy(matchProverCtxFn), mock.Anything, nil).Return(true, nil)
m.storageMock.On("CleanupGeneratedProofs", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
m.synchronizerMock.On("GetSequenceByBatchNumber", mock.Anything, mock.Anything).Return(nil, errTest).Once()
},
asserts: func(result bool, a *Aggregator, err error) {
assert.False(result)
assert.ErrorIs(err, errTest)
},
},
{
name: "getAndLockBatchToProve returns ErrNotFound",
setup: func(m mox, a *Aggregator) {
Expand Down Expand Up @@ -1919,3 +1965,19 @@ func Test_sanityChecks(t *testing.T) {
return
}()
}

func Test_getWitness(t *testing.T) {
mockRPC := mocks.NewRPCInterfaceMock(t)
sut := &Aggregator{
rpcClient: mockRPC,
logger: log.WithFields("module", "unittest"),
cfg: Config{
RetryTime: types.Duration{Duration: time.Microsecond * 1},
},
}
mockRPC.EXPECT().GetWitness(mock.Anything, mock.Anything).Return([]byte("witness"), errors.New("test error")).Once()
mockRPC.EXPECT().GetWitness(mock.Anything, mock.Anything).Return([]byte("witness"), rpc.ErrBusy).Once()
mockRPC.EXPECT().GetWitness(mock.Anything, mock.Anything).Return([]byte("witness"), nil).Once()
data := sut.getWitness(1234)
require.NotNil(t, data)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/0xPolygon/cdk-contracts-tooling v0.0.2-0.20241225094934-1d381f5703ef
github.com/0xPolygon/cdk-data-availability v0.0.13
github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6
github.com/0xPolygon/zkevm-ethtx-manager v0.2.5
github.com/0xPolygon/zkevm-ethtx-manager v0.2.7
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.7
github.com/agglayer/aggkit v0.1.0-beta3
github.com/ethereum/go-ethereum v1.15.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/0xPolygon/cdk-data-availability v0.0.13 h1:CoZoSGOtg+wCF3s9XfLf+dc9H5
github.com/0xPolygon/cdk-data-availability v0.0.13/go.mod h1:u/vHga1Wdx4qeglcvh3VCGxeU7h/EouZnuYeFBibOZU=
github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6 h1:FXL/rcO7/GtZ3kRFw+C7J6vmGnl8gcazg+Gh/NVmnas=
github.com/0xPolygon/cdk-rpc v0.0.0-20241004114257-6c3cb6eebfb6/go.mod h1:2scWqMMufrQXu7TikDgQ3BsyaKoX8qP26D6E262vSOg=
github.com/0xPolygon/zkevm-ethtx-manager v0.2.5 h1:t6vLS597OjPcoZzvpGXooIy+ANQEX6LoxYBenRmu15I=
github.com/0xPolygon/zkevm-ethtx-manager v0.2.5/go.mod h1:xuLx/9KDv5wO8mnkiiX5pZHyTWQZXyHjmBRBGEzNGXo=
github.com/0xPolygon/zkevm-ethtx-manager v0.2.7 h1:eS2ewz4z+S16ZWQRyci6Y3U9YSR+sx1opB0/NxhmKlQ=
github.com/0xPolygon/zkevm-ethtx-manager v0.2.7/go.mod h1:xuLx/9KDv5wO8mnkiiX5pZHyTWQZXyHjmBRBGEzNGXo=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.7 h1:KJM1QlNZdZjNRS+ajPauD4uG+uaYgItaL+96Om3f8aI=
github.com/0xPolygonHermez/zkevm-synchronizer-l1 v1.0.7/go.mod h1:exl+KHnTN6Y8HG4nSUXni4qKbAug0HjJqpebMSgl72k=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
24 changes: 23 additions & 1 deletion sequencesender/ethtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ func (s *SequenceSender) syncAllEthTxResults(ctx context.Context) (time.Time, er
numResults := len(results)
s.mutexEthTx.Lock()
for _, result := range results {
for txHash := range result.Txs {
log.Debugf("syncAllEthTxResults: id: %s tx:%s", result.ID.String(), txHash.String())
}
txSequence, exists := s.ethTransactions[result.ID]
if !exists {
log.Debugf("transaction %v missing in memory structure. Adding it", result.ID)
Expand Down Expand Up @@ -306,7 +309,7 @@ func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash com

txResult, err := s.ethTxManager.Result(ctx, txHash)
switch {
case errors.Is(err, ethtxmanager.ErrNotFound):
case isEthTxManagerErrNotFound(err):
s.logger.Infof("transaction %v does not exist in ethtxmanager. Marking it", txHash)
txData.OnMonitor = false
// Resend tx
Expand All @@ -326,6 +329,25 @@ func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash com
return nil
}

// this function is use instead of
// errors.Is(err, ethtxmanager.ErrNotFound)
func isEthTxManagerErrNotFound(err error) bool {
if err == nil {
return false
}
if errors.Is(err, ethtxmanager.ErrNotFound) {
return true
}
// Check all wrapped errors looking for the same message
for err != nil {
if err.Error() == ethtxmanager.ErrNotFound.Error() {
return true
}
err = errors.Unwrap(err)
}
return false
}

// loadSentSequencesTransactions loads the file into the memory structure
func (s *SequenceSender) loadSentSequencesTransactions() error {
// Check if file exists
Expand Down
9 changes: 9 additions & 0 deletions sequencesender/ethtx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
"testing"
Expand All @@ -18,6 +19,14 @@ import (
"github.com/stretchr/testify/require"
)

func TestIsEthTxManagerErrNotFound(t *testing.T) {
require.False(t, isEthTxManagerErrNotFound(nil))
require.True(t, isEthTxManagerErrNotFound(ethtxmanager.ErrNotFound))
require.True(t, isEthTxManagerErrNotFound(fmt.Errorf("not found")))
require.True(t, isEthTxManagerErrNotFound(fmt.Errorf("wrapper: %w", ethtxmanager.ErrNotFound)))
require.False(t, isEthTxManagerErrNotFound(errors.New("another error")))
}

func Test_sendTx(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading