diff --git a/common/utils/crypto.go b/common/utils/crypto.go new file mode 100644 index 0000000..2c831e4 --- /dev/null +++ b/common/utils/crypto.go @@ -0,0 +1,37 @@ +package utils + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/pkg/errors" + "github.com/shutter-network/shutter/shlib/shcrypto" +) + +func ComputeIdentity(prefix []byte, sender common.Address) []byte { + imageBytes := append(prefix, sender.Bytes()...) + return imageBytes +} + +func DecryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { + decryptionKey := new(shcrypto.EpochSecretKey) + err := decryptionKey.Unmarshal(key) + if err != nil { + return nil, errors.Wrapf(err, "invalid decryption key") + } + encryptedMsg := new(shcrypto.EncryptedMessage) + err = encryptedMsg.Unmarshal(encrypted) + if err != nil { + return nil, errors.Wrapf(err, "invalid encrypted msg") + } + decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to decrypt message") + } + + tx := new(types.Transaction) + err = tx.UnmarshalBinary(decryptedMsg) + if err != nil { + return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") + } + return tx, nil +} diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index 646591c..5712c01 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -58,7 +58,7 @@ type CreateDecryptedTXParams struct { TxHash []byte TxStatus TxStatusVal DecryptionKeyID int64 - TransactionSubmittedEventID int64 + TransactionSubmittedEventID pgtype.Int8 } func (q *Queries) CreateDecryptedTX(ctx context.Context, arg CreateDecryptedTXParams) error { @@ -222,7 +222,7 @@ func (q *Queries) CreateProposerDuties(ctx context.Context, arg CreateProposerDu return err } -const createTransactionSubmittedEvent = `-- name: CreateTransactionSubmittedEvent :exec +const createTransactionSubmittedEvent = `-- name: CreateTransactionSubmittedEvent :one INSERT into transaction_submitted_event ( event_block_hash, event_block_number, @@ -237,6 +237,7 @@ INSERT into transaction_submitted_event ( ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING +RETURNING id ` type CreateTransactionSubmittedEventParams struct { @@ -252,8 +253,8 @@ type CreateTransactionSubmittedEventParams struct { EventTxHash []byte } -func (q *Queries) CreateTransactionSubmittedEvent(ctx context.Context, arg CreateTransactionSubmittedEventParams) error { - _, err := q.db.Exec(ctx, createTransactionSubmittedEvent, +func (q *Queries) CreateTransactionSubmittedEvent(ctx context.Context, arg CreateTransactionSubmittedEventParams) (int64, error) { + row := q.db.QueryRow(ctx, createTransactionSubmittedEvent, arg.EventBlockHash, arg.EventBlockNumber, arg.EventTxIndex, @@ -265,7 +266,9 @@ func (q *Queries) CreateTransactionSubmittedEvent(ctx context.Context, arg Creat arg.EncryptedTransaction, arg.EventTxHash, ) - return err + var id int64 + err := row.Scan(&id) + return id, err } const createTransactionSubmittedEventsSyncedUntil = `-- name: CreateTransactionSubmittedEventsSyncedUntil :exec @@ -369,6 +372,24 @@ func (q *Queries) CreateValidatorStatus(ctx context.Context, arg CreateValidator return err } +const deleteTransactionSubmittedEventFromBlockNumber = `-- name: DeleteTransactionSubmittedEventFromBlockNumber :exec +DELETE FROM transaction_submitted_event WHERE event_block_number >= $1 +` + +func (q *Queries) DeleteTransactionSubmittedEventFromBlockNumber(ctx context.Context, eventBlockNumber int64) error { + _, err := q.db.Exec(ctx, deleteTransactionSubmittedEventFromBlockNumber, eventBlockNumber) + return err +} + +const deleteValidatorRegistrationMessageFromBlockNumber = `-- name: DeleteValidatorRegistrationMessageFromBlockNumber :exec +DELETE FROM validator_registration_message WHERE event_block_number >= $1 +` + +func (q *Queries) DeleteValidatorRegistrationMessageFromBlockNumber(ctx context.Context, eventBlockNumber int64) error { + _, err := q.db.Exec(ctx, deleteValidatorRegistrationMessageFromBlockNumber, eventBlockNumber) + return err +} + const queryBlockFromSlot = `-- name: QueryBlockFromSlot :one SELECT block_hash, block_number, block_timestamp, created_at, updated_at, slot FROM block WHERE slot = $1 FOR UPDATE @@ -388,6 +409,78 @@ func (q *Queries) QueryBlockFromSlot(ctx context.Context, slot int64) (Block, er return i, err } +const queryDecryptedTX = `-- name: QueryDecryptedTX :one +SELECT id, slot, tx_index, tx_hash, tx_status, decryption_key_id, transaction_submitted_event_id, created_at, updated_at, block_number FROM decrypted_tx WHERE decryption_key_id = $1 AND tx_hash = $2 +` + +type QueryDecryptedTXParams struct { + DecryptionKeyID int64 + TxHash []byte +} + +func (q *Queries) QueryDecryptedTX(ctx context.Context, arg QueryDecryptedTXParams) (DecryptedTx, error) { + row := q.db.QueryRow(ctx, queryDecryptedTX, arg.DecryptionKeyID, arg.TxHash) + var i DecryptedTx + err := row.Scan( + &i.ID, + &i.Slot, + &i.TxIndex, + &i.TxHash, + &i.TxStatus, + &i.DecryptionKeyID, + &i.TransactionSubmittedEventID, + &i.CreatedAt, + &i.UpdatedAt, + &i.BlockNumber, + ) + return i, err +} + +const queryDecryptionKeyAndMessage = `-- name: QueryDecryptionKeyAndMessage :many +SELECT + dk.id, + dk.key, + dkm.slot +FROM + decryption_key dk +JOIN + decryption_keys_message_decryption_key dkd ON dk.id = dkd.decryption_key_id +JOIN + decryption_keys_message dkm ON dkm.slot = dkd.decryption_keys_message_slot +WHERE dk.eon = $1 AND dk.identity_preimage = $2 +` + +type QueryDecryptionKeyAndMessageParams struct { + Eon pgtype.Int8 + IdentityPreimage []byte +} + +type QueryDecryptionKeyAndMessageRow struct { + ID int64 + Key []byte + Slot int64 +} + +func (q *Queries) QueryDecryptionKeyAndMessage(ctx context.Context, arg QueryDecryptionKeyAndMessageParams) ([]QueryDecryptionKeyAndMessageRow, error) { + rows, err := q.db.Query(ctx, queryDecryptionKeyAndMessage, arg.Eon, arg.IdentityPreimage) + if err != nil { + return nil, err + } + defer rows.Close() + var items []QueryDecryptionKeyAndMessageRow + for rows.Next() { + var i QueryDecryptionKeyAndMessageRow + if err := rows.Scan(&i.ID, &i.Key, &i.Slot); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryDecryptionKeyShare = `-- name: QueryDecryptionKeyShare :many SELECT eon, identity_preimage, keyper_index, decryption_key_share, slot, created_at, updated_at FROM decryption_key_share WHERE eon = $1 AND identity_preimage = $2 AND keyper_index = $3 @@ -476,6 +569,30 @@ func (q *Queries) QueryDecryptionKeysAndMessage(ctx context.Context, slot int64) return items, nil } +const queryTranasctionSubmittedEventIDsUsingBlock = `-- name: QueryTranasctionSubmittedEventIDsUsingBlock :many +SELECT id FROM transaction_submitted_event WHERE event_block_number >= $1 +` + +func (q *Queries) QueryTranasctionSubmittedEventIDsUsingBlock(ctx context.Context, eventBlockNumber int64) ([]int64, error) { + rows, err := q.db.Query(ctx, queryTranasctionSubmittedEventIDsUsingBlock, eventBlockNumber) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int64 + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryTransactionSubmittedEvent = `-- name: QueryTransactionSubmittedEvent :many SELECT id, event_block_hash, event_block_number, event_tx_index, event_log_index, eon, tx_index, identity_prefix, sender, encrypted_transaction, created_at, updated_at, event_tx_hash FROM transaction_submitted_event WHERE eon = $1 AND tx_index >= $2 AND tx_index < $2 + $3 ORDER BY tx_index ASC @@ -611,6 +728,53 @@ func (q *Queries) QueryValidatorStatuses(ctx context.Context, arg QueryValidator return items, nil } +const setTransactionSubmittedEventIDsNullForDecryptedTX = `-- name: SetTransactionSubmittedEventIDsNullForDecryptedTX :exec +UPDATE decrypted_tx +SET transaction_submitted_event_id = NULL +WHERE transaction_submitted_event_id = ANY($1::bigint[]) +` + +func (q *Queries) SetTransactionSubmittedEventIDsNullForDecryptedTX(ctx context.Context, dollar_1 []int64) error { + _, err := q.db.Exec(ctx, setTransactionSubmittedEventIDsNullForDecryptedTX, dollar_1) + return err +} + +const updateDecryptedTx = `-- name: UpdateDecryptedTx :exec +UPDATE decrypted_tx +SET + slot = $2, + tx_index = $3, + tx_hash = $4, + tx_status = $5, + decryption_key_id = $6, + transaction_submitted_event_id = $7, + updated_at = NOW() +WHERE id = $1 +` + +type UpdateDecryptedTxParams struct { + ID int64 + Slot int64 + TxIndex int64 + TxHash []byte + TxStatus TxStatusVal + DecryptionKeyID int64 + TransactionSubmittedEventID pgtype.Int8 +} + +func (q *Queries) UpdateDecryptedTx(ctx context.Context, arg UpdateDecryptedTxParams) error { + _, err := q.db.Exec(ctx, updateDecryptedTx, + arg.ID, + arg.Slot, + arg.TxIndex, + arg.TxHash, + arg.TxStatus, + arg.DecryptionKeyID, + arg.TransactionSubmittedEventID, + ) + return err +} + const upsertTX = `-- name: UpsertTX :exec INSERT INTO decrypted_tx ( slot, @@ -635,7 +799,7 @@ type UpsertTXParams struct { TxHash []byte TxStatus TxStatusVal DecryptionKeyID int64 - TransactionSubmittedEventID int64 + TransactionSubmittedEventID pgtype.Int8 BlockNumber pgtype.Int8 } diff --git a/internal/data/models.sqlc.gen.go b/internal/data/models.sqlc.gen.go index 1361757..182859c 100644 --- a/internal/data/models.sqlc.gen.go +++ b/internal/data/models.sqlc.gen.go @@ -117,7 +117,7 @@ type DecryptedTx struct { TxHash []byte TxStatus TxStatusVal DecryptionKeyID int64 - TransactionSubmittedEventID int64 + TransactionSubmittedEventID pgtype.Int8 CreatedAt pgtype.Timestamptz UpdatedAt pgtype.Timestamptz BlockNumber pgtype.Int8 diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index d4b09f9..0d2429c 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -1,4 +1,4 @@ --- name: CreateTransactionSubmittedEvent :exec +-- name: CreateTransactionSubmittedEvent :one INSERT into transaction_submitted_event ( event_block_hash, event_block_number, @@ -12,7 +12,8 @@ INSERT into transaction_submitted_event ( event_tx_hash ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) -ON CONFLICT DO NOTHING; +ON CONFLICT DO NOTHING +RETURNING id; -- name: CreateDecryptionKeyMessages :exec WITH data (slot, instance_id, eon, tx_pointer) AS ( @@ -183,3 +184,46 @@ SET block_hash = $1, block_number = $2; -- name: QueryTransactionSubmittedEventsSyncedUntil :one SELECT block_hash, block_number FROM transaction_submitted_events_synced_until LIMIT 1; + +-- name: QueryTranasctionSubmittedEventIDsUsingBlock :many +SELECT id FROM transaction_submitted_event WHERE event_block_number >= $1; ; + +-- name: SetTransactionSubmittedEventIDsNullForDecryptedTX :exec +UPDATE decrypted_tx +SET transaction_submitted_event_id = NULL +WHERE transaction_submitted_event_id = ANY($1::bigint[]); + + +-- name: DeleteTransactionSubmittedEventFromBlockNumber :exec +DELETE FROM transaction_submitted_event WHERE event_block_number >= $1; + +-- name: DeleteValidatorRegistrationMessageFromBlockNumber :exec +DELETE FROM validator_registration_message WHERE event_block_number >= $1; + +-- name: QueryDecryptionKeyAndMessage :many +SELECT + dk.id, + dk.key, + dkm.slot +FROM + decryption_key dk +JOIN + decryption_keys_message_decryption_key dkd ON dk.id = dkd.decryption_key_id +JOIN + decryption_keys_message dkm ON dkm.slot = dkd.decryption_keys_message_slot +WHERE dk.eon = $1 AND dk.identity_preimage = $2; + +-- name: QueryDecryptedTX :one +SELECT * FROM decrypted_tx WHERE decryption_key_id = $1 AND tx_hash = $2; + +-- name: UpdateDecryptedTx :exec +UPDATE decrypted_tx +SET + slot = $2, + tx_index = $3, + tx_hash = $4, + tx_status = $5, + decryption_key_id = $6, + transaction_submitted_event_id = $7, + updated_at = NOW() +WHERE id = $1; diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index 892890c..f3dbfd6 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -20,12 +20,12 @@ import ( sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" metricsCommon "github.com/shutter-network/observer/common" + "github.com/shutter-network/observer/common/database" dbTypes "github.com/shutter-network/observer/common/database" "github.com/shutter-network/observer/common/utils" "github.com/shutter-network/observer/internal/data" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" - "github.com/shutter-network/shutter/shlib/shcrypto" blst "github.com/supranational/blst/bindings/go" ) @@ -69,13 +69,13 @@ func NewTxMapperDB( } } -func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error { +func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) (int64, error) { q := tm.dbQuery if tx != nil { // Use transaction if available q = tm.dbQuery.WithTx(tx) } - err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ + id, err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ EventBlockHash: st.Raw.BlockHash.Bytes(), EventBlockNumber: int64(st.Raw.BlockNumber), EventTxIndex: int64(st.Raw.TxIndex), @@ -88,10 +88,10 @@ func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.T EventTxHash: st.Raw.TxHash.Bytes(), }) if err != nil { - return err + return 0, err } metricsEncTxReceived.Inc() - return nil + return id, nil } func (tm *TxMapperDB) AddDecryptionKeysAndMessages( @@ -411,7 +411,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: common.Hash{}.Bytes(), TxStatus: data.TxStatusValNotdecrypted, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, + TransactionSubmittedEventID: dbTypes.Int64ToPgTypeInt8(txSubEvent.ID), }) if err != nil { log.Err(err).Msg("failed to create decrypted tx") @@ -451,7 +451,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: decryptedTx.Hash().Bytes(), TxStatus: data.TxStatusValPending, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, + TransactionSubmittedEventID: dbTypes.Int64ToPgTypeInt8(txSubEvent.ID), }) if err != nil { txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) @@ -464,7 +464,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: decryptedTx.Hash().Bytes(), TxStatus: data.TxStatusValInvalid, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, + TransactionSubmittedEventID: dbTypes.Int64ToPgTypeInt8(txSubEvent.ID), }) if err != nil { log.Err(err).Msg("failed to create decrypted tx") @@ -480,7 +480,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: decryptedTx.Hash().Bytes(), TxStatus: data.TxStatusValPending, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, + TransactionSubmittedEventID: dbTypes.Int64ToPgTypeInt8(txSubEvent.ID), }) if err != nil { txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) @@ -505,7 +505,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: txHash[:], TxStatus: data.TxStatusValNotincluded, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, + TransactionSubmittedEventID: dbTypes.Int64ToPgTypeInt8(txSubEvent.ID), }) if err != nil { log.Err(err).Msg("failed to upsert decrypted tx") @@ -550,7 +550,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxHash: receipt.TxHash.Bytes(), TxStatus: txStatus, DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, + TransactionSubmittedEventID: database.Int64ToPgTypeInt8(txSubEvent.ID), BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, }) if err != nil { @@ -691,21 +691,16 @@ func getDecryptionMessageInfos(dkam *DecKeysAndMessages) ([]int64, []int64, []in return eons, slots, instanceIDs, txPointers, keyIndexes } -func computeIdentity(prefix []byte, sender common.Address) []byte { - imageBytes := append(prefix, sender.Bytes()...) - return imageBytes -} - func getDecryptedTX( txSubEvent data.TransactionSubmittedEvent, identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, ) (*types.Transaction, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + identityPreimage := utils.ComputeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] if !ok { return nil, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) } - tx, err := decryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) + tx, err := utils.DecryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) if err != nil { return nil, err } @@ -716,7 +711,7 @@ func getDecryptionKeyID( txSubEvent data.TransactionSubmittedEvent, identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, ) (int64, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + identityPreimage := utils.ComputeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] if !ok { return 0, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) @@ -724,30 +719,6 @@ func getDecryptionKeyID( return dkam.DecryptionKeyID, nil } -func decryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { - decryptionKey := new(shcrypto.EpochSecretKey) - err := decryptionKey.Unmarshal(key) - if err != nil { - return nil, errors.Wrapf(err, "invalid decryption key") - } - encryptedMsg := new(shcrypto.EncryptedMessage) - err = encryptedMsg.Unmarshal(encrypted) - if err != nil { - return nil, errors.Wrapf(err, "invalid encrypted msg") - } - decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) - if err != nil { - return nil, errors.Wrapf(err, "failed to decrypt message") - } - - tx := new(types.Transaction) - err = tx.UnmarshalBinary(decryptedMsg) - if err != nil { - return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") - } - return tx, nil -} - // waitForReceiptWithTimeout waits for a transaction receipt with a provided timeout. func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash common.Hash, receiptWaitTimeout time.Duration, txErrorSignalCh chan error) (*types.Receipt, error) { ctx, cancel := context.WithTimeout(ctx, receiptWaitTimeout) diff --git a/internal/metrics/types.go b/internal/metrics/types.go index 42b228a..0955734 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -51,7 +51,7 @@ type TxExecution struct { } type TxMapper interface { - AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error + AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) (int64, error) AddDecryptionKeysAndMessages( ctx context.Context, dkam *DecKeysAndMessages, diff --git a/internal/syncer/transaction_submitted_syncer.go b/internal/syncer/transaction_submitted_syncer.go index 878978d..7acc81c 100644 --- a/internal/syncer/transaction_submitted_syncer.go +++ b/internal/syncer/transaction_submitted_syncer.go @@ -1,7 +1,9 @@ package syncer import ( + "bytes" "context" + "errors" "fmt" "math/big" @@ -12,6 +14,8 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" + "github.com/shutter-network/observer/common/database" + "github.com/shutter-network/observer/common/utils" "github.com/shutter-network/observer/internal/data" "github.com/shutter-network/observer/internal/metrics" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley" @@ -29,6 +33,8 @@ type TransactionSubmittedSyncer struct { ethClient *ethclient.Client txMapper metrics.TxMapper syncStartBlockNumber uint64 + genesisTimestamp uint64 + slotDuration uint64 } func NewTransactionSubmittedSyncer( @@ -37,6 +43,8 @@ func NewTransactionSubmittedSyncer( ethClient *ethclient.Client, txMapper metrics.TxMapper, syncStartBlockNumber uint64, + genesisTimestamp uint64, + slotDuration uint64, ) *TransactionSubmittedSyncer { return &TransactionSubmittedSyncer{ contract: contract, @@ -45,11 +53,113 @@ func NewTransactionSubmittedSyncer( ethClient: ethClient, txMapper: txMapper, syncStartBlockNumber: syncStartBlockNumber, + genesisTimestamp: genesisTimestamp, + slotDuration: slotDuration, } } +func getNumReorgedBlocksForTransactionSubmitted(syncedUntil *data.QueryTransactionSubmittedEventsSyncedUntilRow, header *types.Header) int { + shouldBeParent := header.Number.Int64() == syncedUntil.BlockNumber+1 + isParent := bytes.Equal(header.ParentHash.Bytes(), syncedUntil.BlockHash) + isReorg := shouldBeParent && !isParent + if !isReorg { + return 0 + } + // We don't know how deep the reorg is, so we make a conservative guess. Assuming higher depths + // is safer because it means we resync a little bit more. + depth := AssumedReorgDepth + if syncedUntil.BlockNumber < int64(depth) { + return int(syncedUntil.BlockNumber) + } + return depth +} + +// resetSyncStatus clears the db from its recent history after a reorg of given depth. +func (ets *TransactionSubmittedSyncer) resetSyncStatus(ctx context.Context, numReorgedBlocks int) error { + if numReorgedBlocks == 0 { + return nil + } + + tx, err := ets.db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + qtx := ets.dbQuery.WithTx(tx) + + syncStatus, err := qtx.QueryTransactionSubmittedEventsSyncedUntil(ctx) + if err != nil { + return fmt.Errorf("failed to query sync status from db in order to reset it, %w", err) + } + if syncStatus.BlockNumber < int64(numReorgedBlocks) { + return fmt.Errorf("detected reorg deeper (%d) than blocks synced (%d)", syncStatus.BlockNumber, numReorgedBlocks) + } + + deleteFromInclusive := syncStatus.BlockNumber - int64(numReorgedBlocks) + 1 + + ids, err := qtx.QueryTranasctionSubmittedEventIDsUsingBlock(ctx, deleteFromInclusive) + if err != nil { + return fmt.Errorf("failed to query transaction submitted event ids, %w", err) + } + + err = qtx.SetTransactionSubmittedEventIDsNullForDecryptedTX(ctx, ids) + if err != nil { + return fmt.Errorf("unable to set ids to null, %w", err) + } + + err = qtx.DeleteTransactionSubmittedEventFromBlockNumber(ctx, deleteFromInclusive) + if err != nil { + return fmt.Errorf("failed to delete transaction submitted events from db, %w", err) + } + + // Currently, we don't have enough information in the db to populate block hash and slot. + // However, using default values here is fine since the syncer is expected to resync + // immediately after this function call which will set the correct values. When we do proper + // reorg handling, we should store the full block data of the previous blocks so that we can + // avoid this. + newSyncedUntilBlockNumber := deleteFromInclusive - 1 + err = qtx.CreateTransactionSubmittedEventsSyncedUntil(ctx, data.CreateTransactionSubmittedEventsSyncedUntilParams{ + BlockHash: []byte{}, + BlockNumber: newSyncedUntilBlockNumber, + }) + if err != nil { + return fmt.Errorf("failed to reset transaction submitted event sync status in db, %w", err) + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to commit db transaction, %w", err) + } + + log.Info(). + Int("depth", numReorgedBlocks). + Int64("previous-synced-until", syncStatus.BlockNumber). + Int64("new-synced-until", newSyncedUntilBlockNumber). + Msg("sync status reset due to reorg") + return nil +} + +func (ets *TransactionSubmittedSyncer) HandlePotentialReorg(ctx context.Context, header *types.Header) error { + syncedUntil, err := ets.dbQuery.QueryTransactionSubmittedEventsSyncedUntil(ctx) + if err == pgx.ErrNoRows { + return nil + } + if err != nil { + return fmt.Errorf("failed to query transaction submitted events sync status, %w", err) + } + + numReorgedBlocks := getNumReorgedBlocksForTransactionSubmitted(&syncedUntil, header) + if numReorgedBlocks > 0 { + return ets.resetSyncStatus(ctx, numReorgedBlocks) + } + return nil +} + func (ets *TransactionSubmittedSyncer) Sync(ctx context.Context, header *types.Header) error { - // TODO: handle reorgs + if err := ets.HandlePotentialReorg(ctx, header); err != nil { + return err + } + syncedUntil, err := ets.dbQuery.QueryTransactionSubmittedEventsSyncedUntil(ctx) if err != nil && err != pgx.ErrNoRows { return fmt.Errorf("failed to query transaction submitted events sync status, %v", err) @@ -94,16 +204,157 @@ func (ets *TransactionSubmittedSyncer) syncRange( } defer tx.Rollback(ctx) qtx := ets.dbQuery.WithTx(tx) - for _, event := range events { - err := ets.txMapper.AddTransactionSubmittedEvent(ctx, tx, event) + + currentBlockNumber := 0 + txIndexInBlock := 0 + if len(events) > 0 { + currentBlockNumber = int(events[0].Raw.BlockNumber) + } + for i, event := range events { + txSubmittedEventID, err := ets.txMapper.AddTransactionSubmittedEvent(ctx, tx, event) if err != nil { log.Err(err).Msg("err adding transaction submitted event") return err } + log.Info(). Uint64("block", event.Raw.BlockNumber). Hex("encrypted transaction (hex)", event.EncryptedTransaction). Msg("new encrypted transaction") + + // check if we are processing the event in the same block + if currentBlockNumber == int(event.Raw.BlockNumber) { + // just increment in event in same block so we can compare it with + // correct transaction index inside the block + if i > 0 { + txIndexInBlock += 1 + } + } else { + // realistically this condition will hit when currentBlockNumber is less then event.Raw.Blocknumber + // reset the index and currentBlock since we are encountering a + // new event which will be compared from the beginning + txIndexInBlock = 0 + currentBlockNumber = int(event.Raw.BlockNumber) + } + + // Try to find decryption keys for this event + dk, err := ets.dbQuery.QueryDecryptionKeyAndMessage(ctx, data.QueryDecryptionKeyAndMessageParams{ + Eon: database.Uint64ToPgTypeInt8(event.Eon), + IdentityPreimage: utils.ComputeIdentity(event.IdentityPrefix[:], event.Sender), + }) + if err != nil { + // keys not released yet + if errors.Is(err, pgx.ErrNoRows) { + continue + } else { + log.Err(err).Msg("err querying decryption keys") + return err + } + } + + // If we have decryption keys, try to decrypt and process the transaction + if len(dk) > 0 { + decryptionKey := dk[0].Key + slot := dk[0].Slot + decryptionKeyID := dk[0].ID + + // Try to decrypt the transaction + decryptedTx, err := utils.DecryptTransaction(decryptionKey, event.EncryptedTransaction) + if err != nil { + // If decryption fails, for some unusual reason throw an error + return err + } + + decryptedTxData, err := ets.dbQuery.QueryDecryptedTX(ctx, data.QueryDecryptedTXParams{ + DecryptionKeyID: decryptionKeyID, + TxHash: decryptedTx.Hash().Bytes(), + }) + if err != nil { + log.Err(err).Msg("err querying decrypted tx from db") + return err + } + // Try to get transaction receipt + receipt, err := ets.ethClient.TransactionReceipt(ctx, decryptedTx.Hash()) + if err != nil { + // If receipt not found, check if transaction is pending + _, isPending, err := ets.ethClient.TransactionByHash(ctx, decryptedTx.Hash()) + if err != nil { + err = ets.dbQuery.UpdateDecryptedTx(ctx, data.UpdateDecryptedTxParams{ + ID: decryptedTxData.ID, + Slot: decryptedTxData.Slot, + TxIndex: int64(event.TxIndex), + TxHash: decryptedTxData.TxHash, + TxStatus: decryptedTxData.TxStatus, + DecryptionKeyID: decryptedTxData.DecryptionKeyID, + TransactionSubmittedEventID: database.Int64ToPgTypeInt8(txSubmittedEventID), + }) + if err != nil { + log.Err(err).Msg("failed to update decrypted tx") + return err + } + continue + } + + if isPending { + // Transaction is pending + err = ets.dbQuery.UpdateDecryptedTx(ctx, data.UpdateDecryptedTxParams{ + ID: decryptedTxData.ID, + Slot: decryptedTxData.Slot, + TxIndex: int64(event.TxIndex), + TxHash: decryptedTxData.TxHash, + TxStatus: data.TxStatusValPending, + DecryptionKeyID: decryptedTxData.DecryptionKeyID, + TransactionSubmittedEventID: database.Int64ToPgTypeInt8(txSubmittedEventID), + }) + if err != nil { + log.Err(err).Msg("failed to update decrypted tx") + return err + } + } + } else if receipt != nil { + // Transaction is included and has a receipt + block, err := ets.ethClient.BlockByNumber(ctx, receipt.BlockNumber) + if err != nil { + log.Err(err).Uint64("block-number", receipt.BlockNumber.Uint64()).Msg("failed to retrieve block") + return err + } + inclusionSlot := utils.GetSlotForBlock(block.Header().Time, ets.genesisTimestamp, ets.slotDuration) + + txStatus := data.TxStatusValShieldedinclusion + log.Info().Uint("tx-index", receipt.TransactionIndex). + Uint64("inclusion-slot", uint64(slot)). + Msg("receipt data") + + log.Info().Int("index", txIndexInBlock). + Int64("inclusion-slot", slot). + Msg("local data") + + // Check if transaction indices match + if receipt.TransactionIndex != uint(txIndexInBlock) { + log.Info().Uint("tx-index", receipt.TransactionIndex).Msg("transaction index mismatch") + txStatus = data.TxStatusValUnshieldedinclusion + } + // Check if slots match + if inclusionSlot != uint64(slot) { + log.Info().Int64("slot", slot).Msg("transaction slot mismatch") + txStatus = data.TxStatusValUnshieldedinclusion + } + + err = ets.dbQuery.UpdateDecryptedTx(ctx, data.UpdateDecryptedTxParams{ + ID: decryptedTxData.ID, + Slot: decryptedTxData.Slot, + TxIndex: int64(event.TxIndex), + TxHash: decryptedTxData.TxHash, + TxStatus: txStatus, + DecryptionKeyID: decryptedTxData.DecryptionKeyID, + TransactionSubmittedEventID: database.Int64ToPgTypeInt8(txSubmittedEventID), + }) + if err != nil { + log.Err(err).Msg("failed to update decrypted tx") + return err + } + } + } } err = qtx.CreateTransactionSubmittedEventsSyncedUntil(ctx, data.CreateTransactionSubmittedEventsSyncedUntilParams{ BlockNumber: int64(end), @@ -150,3 +401,16 @@ func (s *TransactionSubmittedSyncer) fetchEvents( } return events, nil } + +/* + slot 9 => block 9 + + slot 10 => block 10 => 2 seq 2 dec keys + + slot 11 => block 11 => 8 seq 8 keys yet + + ... + + slot 10 => block 10 5 decrypted txs 15 normal txs = 20 txs + for 5 dec txs => +*/ diff --git a/internal/syncer/validator_registry.go b/internal/syncer/validator_registry.go index 496c6c8..43098a6 100644 --- a/internal/syncer/validator_registry.go +++ b/internal/syncer/validator_registry.go @@ -1,6 +1,7 @@ package syncer import ( + "bytes" "context" "fmt" "math/big" @@ -43,8 +44,97 @@ func NewValidatorRegistrySyncer( } } +func getNumReorgedBlocksForValidatorRegistrations(syncedUntil *data.QueryValidatorRegistryEventsSyncedUntilRow, header *types.Header) int { + shouldBeParent := header.Number.Int64() == syncedUntil.BlockNumber+1 + isParent := bytes.Equal(header.ParentHash.Bytes(), syncedUntil.BlockHash) + isReorg := shouldBeParent && !isParent + if !isReorg { + return 0 + } + // We don't know how deep the reorg is, so we make a conservative guess. Assuming higher depths + // is safer because it means we resync a little bit more. + depth := AssumedReorgDepth + if syncedUntil.BlockNumber < int64(depth) { + return int(syncedUntil.BlockNumber) + } + return depth +} + +// resetSyncStatus clears the db from its recent history after a reorg of given depth. +func (vts *ValidatorRegistrySyncer) resetSyncStatus(ctx context.Context, numReorgedBlocks int) error { + if numReorgedBlocks == 0 { + return nil + } + + tx, err := vts.db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + qtx := vts.dbQuery.WithTx(tx) + + syncStatus, err := qtx.QueryValidatorRegistryEventsSyncedUntil(ctx) + if err != nil { + return fmt.Errorf("failed to query sync status from db in order to reset it, %w", err) + } + if syncStatus.BlockNumber < int64(numReorgedBlocks) { + return fmt.Errorf("detected reorg deeper (%d) than blocks synced (%d)", syncStatus.BlockNumber, numReorgedBlocks) + } + + deleteFromInclusive := syncStatus.BlockNumber - int64(numReorgedBlocks) + 1 + + err = qtx.DeleteValidatorRegistrationMessageFromBlockNumber(ctx, deleteFromInclusive) + if err != nil { + return fmt.Errorf("failed to delete validator registration event from db, %w", err) + } + + // Currently, we don't have enough information in the db to populate block hash and slot. + // However, using default values here is fine since the syncer is expected to resync + // immediately after this function call which will set the correct values. When we do proper + // reorg handling, we should store the full block data of the previous blocks so that we can + // avoid this. + newSyncedUntilBlockNumber := deleteFromInclusive - 1 + err = qtx.CreateValidatorRegistryEventsSyncedUntil(ctx, data.CreateValidatorRegistryEventsSyncedUntilParams{ + BlockHash: []byte{}, + BlockNumber: newSyncedUntilBlockNumber, + }) + if err != nil { + return fmt.Errorf("failed to reset validator registration event sync status in db, %w", err) + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to commit db transaction, %w", err) + } + log.Info(). + Int("depth", numReorgedBlocks). + Int64("previous-synced-until", syncStatus.BlockNumber). + Int64("new-synced-until", newSyncedUntilBlockNumber). + Msg("sync status reset due to reorg") + return nil +} + +func (vts *ValidatorRegistrySyncer) HandlePotentialReorg(ctx context.Context, header *types.Header) error { + syncedUntil, err := vts.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) + if err == pgx.ErrNoRows { + return nil + } + if err != nil { + return fmt.Errorf("failed to query validator registry events sync status, %w", err) + } + + numReorgedBlocks := getNumReorgedBlocksForValidatorRegistrations(&syncedUntil, header) + if numReorgedBlocks > 0 { + return vts.resetSyncStatus(ctx, numReorgedBlocks) + } + return nil +} + func (vts *ValidatorRegistrySyncer) Sync(ctx context.Context, header *types.Header) error { - // TODO: handle reorgs + if err := vts.HandlePotentialReorg(ctx, header); err != nil { + return err + } + syncedUntil, err := vts.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) if err != nil && err != pgx.ErrNoRows { return fmt.Errorf("failed to query validator registry sync status, %v", err) diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 9a14b67..5a2d473 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -117,7 +117,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error { return err } - transactionSubmittedSyncer := syncer.NewTransactionSubmittedSyncer(sequencerContract, w.db, ethClient, txMapper, blockNumber) + transactionSubmittedSyncer := syncer.NewTransactionSubmittedSyncer(sequencerContract, w.db, ethClient, txMapper, blockNumber, GenesisTimestamp, SlotDuration) validatorRegistrySyncer := syncer.NewValidatorRegistrySyncer(validatorRegistryContract, w.db, ethClient, txMapper, ValidatorRegistryDeploymentBlockNumber) blocksWatcher := NewBlocksWatcher(w.config, ethClient, txMapper, transactionSubmittedSyncer, validatorRegistrySyncer) diff --git a/migrations/20250411160831_drop_null_foreign_key_constraint.sql b/migrations/20250411160831_drop_null_foreign_key_constraint.sql new file mode 100644 index 0000000..85e6567 --- /dev/null +++ b/migrations/20250411160831_drop_null_foreign_key_constraint.sql @@ -0,0 +1,10 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE decrypted_tx +ALTER COLUMN transaction_submitted_event_id DROP NOT NULL; + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +-- +goose StatementEnd diff --git a/tests/init_test.go b/tests/init_test.go index 63454d7..e8da013 100644 --- a/tests/init_test.go +++ b/tests/init_test.go @@ -11,6 +11,7 @@ import ( "github.com/shutter-network/observer/common" "github.com/shutter-network/observer/internal/data" "github.com/shutter-network/observer/internal/metrics" + "github.com/shutter-network/observer/internal/syncer" "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" "github.com/stretchr/testify/suite" ) @@ -22,8 +23,11 @@ type TestMetricsSuite struct { testDB *common.TestDatabase - txMapperDB metrics.TxMapper - dbQuery *data.Queries + txMapperDB metrics.TxMapper + txSubmittedSyncer *syncer.TransactionSubmittedSyncer + validatorRegistrySyncer *syncer.ValidatorRegistrySyncer + + dbQuery *data.Queries } func TestMain(t *testing.T) { @@ -43,5 +47,7 @@ func (s *TestMetricsSuite) SetupSuite() { s.testDB = common.SetupTestDatabase(migrationsPath) s.txMapperDB = metrics.NewTxMapperDB(ctx, s.testDB.DbInstance, &common.Config{}, ðclient.Client{}, &beaconapiclient.Client{}, 1, rand.Uint64(), rand.Uint64()) + s.txSubmittedSyncer = syncer.NewTransactionSubmittedSyncer(nil, s.testDB.DbInstance, nil, s.txMapperDB, 0, rand.Uint64(), rand.Uint64()) + s.validatorRegistrySyncer = syncer.NewValidatorRegistrySyncer(nil, s.testDB.DbInstance, nil, s.txMapperDB, 0) s.dbQuery = data.New(s.testDB.DbInstance) } diff --git a/tests/reorg_test.go b/tests/reorg_test.go new file mode 100644 index 0000000..97e454a --- /dev/null +++ b/tests/reorg_test.go @@ -0,0 +1,93 @@ +package tests + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/shutter-network/observer/internal/data" +) + +func (s *TestMetricsSuite) TestHandlePotentialReorgTXSubmittedEvent() { + ctx := context.Background() + // Setup initial sync status + blockHash := [32]byte{1, 2, 3} + err := s.dbQuery.CreateTransactionSubmittedEventsSyncedUntil(ctx, data.CreateTransactionSubmittedEventsSyncedUntilParams{ + BlockHash: blockHash[:], + BlockNumber: 99, + }) + s.Require().NoError(err) + + // Create header with matching parent hash + header := &types.Header{ + Number: big.NewInt(100), + ParentHash: blockHash, + } + + // no reorg + err = s.txSubmittedSyncer.HandlePotentialReorg(ctx, header) + s.Require().NoError(err) + + blockHash = [32]byte{1, 2, 3} + err = s.dbQuery.CreateTransactionSubmittedEventsSyncedUntil(ctx, data.CreateTransactionSubmittedEventsSyncedUntilParams{ + BlockHash: blockHash[:], + BlockNumber: 99, + }) + s.Require().NoError(err) + + header = &types.Header{ + Number: big.NewInt(100), + ParentHash: [32]byte{4, 5, 6}, + } + + // reorg + err = s.txSubmittedSyncer.HandlePotentialReorg(ctx, header) + s.Require().NoError(err) + + syncStatus, err := s.dbQuery.QueryTransactionSubmittedEventsSyncedUntil(ctx) + s.Require().NoError(err) + s.Require().Equal(int64(89), syncStatus.BlockNumber) + s.Require().Empty(syncStatus.BlockHash) +} + +func (s *TestMetricsSuite) TestHandlePotentialReorgValidatorRegistry() { + ctx := context.Background() + // Setup initial sync status + blockHash := [32]byte{1, 2, 3} + err := s.dbQuery.CreateValidatorRegistryEventsSyncedUntil(ctx, data.CreateValidatorRegistryEventsSyncedUntilParams{ + BlockHash: blockHash[:], + BlockNumber: 99, + }) + s.Require().NoError(err) + + // Create header with matching parent hash + header := &types.Header{ + Number: big.NewInt(100), + ParentHash: blockHash, + } + + // no reorg + err = s.validatorRegistrySyncer.HandlePotentialReorg(ctx, header) + s.Require().NoError(err) + + blockHash = [32]byte{1, 2, 3} + err = s.dbQuery.CreateValidatorRegistryEventsSyncedUntil(ctx, data.CreateValidatorRegistryEventsSyncedUntilParams{ + BlockHash: blockHash[:], + BlockNumber: 99, + }) + s.Require().NoError(err) + + header = &types.Header{ + Number: big.NewInt(100), + ParentHash: [32]byte{4, 5, 6}, + } + + // reorg + err = s.validatorRegistrySyncer.HandlePotentialReorg(ctx, header) + s.Require().NoError(err) + + syncStatus, err := s.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) + s.Require().NoError(err) + s.Require().Equal(int64(89), syncStatus.BlockNumber) + s.Require().Empty(syncStatus.BlockHash) +} diff --git a/tests/transaction_test.go b/tests/transaction_test.go index c7bfb94..0586437 100644 --- a/tests/transaction_test.go +++ b/tests/transaction_test.go @@ -32,7 +32,7 @@ func (s *TestMetricsSuite) TestEncryptedTransaction() { eventTxHash, err := generateRandomBytes(32) s.Require().NoError(err) - err = s.dbQuery.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ + _, err = s.dbQuery.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ TxIndex: txIndex, Eon: eon, EventBlockHash: eventBlockHash, diff --git a/tests/tx_mapper_test.go b/tests/tx_mapper_test.go index 4cf7741..f01cc7c 100644 --- a/tests/tx_mapper_test.go +++ b/tests/tx_mapper_test.go @@ -102,7 +102,7 @@ func (s *TestMetricsSuite) TestAddTransactionSubmittedEventAndDecryptionData() { eventTxHash, err := generateRandomBytes(32) s.Require().NoError(err) - err = s.txMapperDB.AddTransactionSubmittedEvent(ctx, nil, &sequencer.SequencerTransactionSubmitted{ + _, err = s.txMapperDB.AddTransactionSubmittedEvent(ctx, nil, &sequencer.SequencerTransactionSubmitted{ Eon: uint64(eon), TxIndex: uint64(txIndex), IdentityPrefix: [32]byte(identityPrefix),