From 517937f6b583201023814fa1ac7492ed17525889 Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Mon, 2 Sep 2024 10:46:52 +0200 Subject: [PATCH 1/6] Open psql port locally --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 0572d98..f4317fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,8 @@ services: db: image: postgres:14.12 restart: unless-stopped + ports: + - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=${DB_PASSWORD} From 91ae241d48e41b637400dca806d3881e555ab3cc Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Thu, 12 Sep 2024 23:12:49 +0200 Subject: [PATCH 2/6] Check db before deleting old blocks from cache --- internal/data/metrics.sql.go | 12 ++++++++++++ internal/data/sql/queries/metrics.sql | 4 ++++ internal/metrics/tx_mapper_db.go | 17 +++++++++++++++++ internal/metrics/types.go | 1 + internal/watcher/decryption_keys.go | 7 +++++++ 5 files changed, 41 insertions(+) diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index 77b461a..85a8935 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -348,6 +348,18 @@ func (q *Queries) CreateValidatorStatus(ctx context.Context, arg CreateValidator return err } +const queryBlock = `-- name: QueryBlock :one +SELECT COUNT(*) FROM block +WHERE block_number = $1 +` + +func (q *Queries) QueryBlock(ctx context.Context, blockNumber int64) (int64, error) { + row := q.db.QueryRow(ctx, queryBlock, blockNumber) + var count int64 + err := row.Scan(&count) + return count, err +} + const queryBlockFromSlot = `-- name: QueryBlockFromSlot :one SELECT block_hash, block_number, block_timestamp, created_at, updated_at, slot FROM block WHERE slot = $1 FOR UPDATE diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index 0b82da8..b09b1c9 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -81,6 +81,10 @@ ON CONFLICT DO NOTHING; SELECT * FROM block WHERE slot = $1 FOR UPDATE; +-- name: QueryBlock :one +SELECT COUNT(*) FROM block +WHERE block_number = $1; + -- name: CreateDecryptedTX :exec INSERT into decrypted_tx( slot, diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index 233429c..54ee349 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -189,6 +189,23 @@ func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeySh return nil } +func (tm *TxMapperDB) BlockExists(ctx context.Context, blockNumber int64) (bool, error) { + tx, err := tm.db.Begin(ctx) + if err != nil { + return false, err + } + qtx := tm.dbQuery.WithTx(tx) + count, err := qtx.QueryBlock(ctx, blockNumber) + if err != nil { + return false, err + } + if count == 1 { + return true, nil + } + return false, nil + +} + func (tm *TxMapperDB) AddBlock( ctx context.Context, b *data.Block, diff --git a/internal/metrics/types.go b/internal/metrics/types.go index ed78606..6fba4fd 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -59,6 +59,7 @@ type TxMapper interface { ctx context.Context, b *data.Block, ) error + BlockExists(ctx context.Context, blockNumber int64) (bool, error) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) AddValidatorRegistryEvent(ctx context.Context, vr *validatorRegistryBindings.ValidatorregistryUpdated) error UpdateValidatorStatus(ctx context.Context) error diff --git a/internal/watcher/decryption_keys.go b/internal/watcher/decryption_keys.go index e348281..cd709a3 100644 --- a/internal/watcher/decryption_keys.go +++ b/internal/watcher/decryption_keys.go @@ -101,6 +101,13 @@ func (pmw *P2PMsgsWatcher) clearOldBlocks(latestEv *BlockReceivedEvent) { } } for _, block := range tooOld { + exists, err := pmw.txMapper.BlockExists(context.Background(), int64(block)) + if err != nil { + log.Err(err).Uint64("block", block).Msg("could not query if block exists") + } + if !exists { + log.Err(fmt.Errorf("want to delete block that does not exist in db")).Uint64("block", block) + } delete(pmw.recentBlocks, block) } } From fd9235f49b84153312d316755b202c173c8297a7 Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Wed, 18 Sep 2024 11:13:54 +0200 Subject: [PATCH 3/6] Enable pprof --- cmd/cli/cli.go | 9 +++++++++ docker-compose.yml | 1 + main.go | 6 ++++++ 3 files changed, 16 insertions(+) diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index d45514e..71b68a4 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -3,10 +3,14 @@ package cli import ( "context" "fmt" + "net/http" "os" + "runtime" "strconv" "strings" + _ "net/http/pprof" + "github.com/rs/zerolog/log" "github.com/mitchellh/mapstructure" @@ -112,6 +116,11 @@ func Cmd() *cobra.Command { } func Start() error { + log.Info().Msg("go run pprof") + go func() { + log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:7070", nil)).Msg("started pprof") + }() + runtime.SetBlockProfileRate(1) log.Info().Msg("Starting Observer") // start services here ctx := context.Background() diff --git a/docker-compose.yml b/docker-compose.yml index f4317fb..3680f87 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ services: - P2P_DISCOVERY_NAMESPACE=${P2P_DISCOVERY_NAMESPACE} ports: - "23003:23003" + - "7070:6060" command: ["start", "--rpc-url", "${RPC_URL}", "--beacon-api-url", "${BEACON_API_URL}", "--sequencer-contract-address", "${SEQUENCER_CONTRACT_ADDRESS}", "--validator-registry-contract-address", "${VALIDATOR_REGISTRY_CONTRACT_ADDRESS}", "--p2pkey", "${P2P_KEY}"] db: diff --git a/main.go b/main.go index e5c5e85..ac01fc1 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,19 @@ package main import ( + "net/http" "os" "github.com/rs/zerolog/log" "github.com/shutter-network/gnosh-metrics/cmd/cli" + + _ "net/http/pprof" ) func main() { + go func() { + log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:6060", nil)).Msg("started pprof") + }() status := 0 if err := cli.Cmd().Execute(); err != nil { log.Info().Err(err).Msg("failed running server") From bad41ae24e4c9b95ecc3662df0fad7050021ae6e Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Mon, 30 Sep 2024 12:45:07 +0200 Subject: [PATCH 4/6] Don't log raw bytes --- internal/watcher/watcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 284d3bc..612c7f1 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -140,7 +140,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error { return err } log.Info(). - Bytes("encrypted transaction", txEvent.EncryptedTransaction). + Hex("encrypted transaction (hex)", txEvent.EncryptedTransaction). Msg("new encrypted transaction") case dd := <-decryptionDataChannel: keys, identites := getDecryptionKeysAndIdentities(dd.Keys) @@ -178,7 +178,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error { return err } log.Info(). - Bytes("key shares", share.Share). + Hex("key shares (hex)", share.Share). Int64("slot", ks.Slot). Msg("new key shares") } From a6de95548846c4cc0c29d7c43c7095eac403f99a Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Mon, 30 Sep 2024 14:24:39 +0200 Subject: [PATCH 5/6] Add more verbose logging to find dead lock --- internal/watcher/blocks.go | 4 ++++ internal/watcher/decryption_keys.go | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/internal/watcher/blocks.go b/internal/watcher/blocks.go index 51c68bf..f10bda4 100644 --- a/internal/watcher/blocks.go +++ b/internal/watcher/blocks.go @@ -34,13 +34,16 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error newHeads := make(chan *types.Header) sub, err := bw.ethClient.SubscribeNewHead(ctx, newHeads) if err != nil { + log.Info().Err(err).Msg("error on subscribe, no new head") return err } runner.Defer(sub.Unsubscribe) runner.Go(func() error { for { + log.Info().Msg("looking for new head") select { case <-ctx.Done(): + log.Info().Msg("context cancelled, no new head") return ctx.Err() case head := <-newHeads: log.Info(). @@ -53,6 +56,7 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error } bw.blocksChannel <- ev case err := <-sub.Err(): + log.Info().Err(err).Msg("got an err, no new head") return err } } diff --git a/internal/watcher/decryption_keys.go b/internal/watcher/decryption_keys.go index cd709a3..ae07047 100644 --- a/internal/watcher/decryption_keys.go +++ b/internal/watcher/decryption_keys.go @@ -54,24 +54,41 @@ func (pmw *P2PMsgsWatcher) handleDecryptionKeyMsg(msg *p2pmsg.DecryptionKeys) ([ func (pmw *P2PMsgsWatcher) insertBlocks(ctx context.Context) error { for { + log.Info().Msg("polling insertBlocks, for new head channel") select { case <-ctx.Done(): + log.Info().Msg("context cancelled for insertBlocks, no new head") return ctx.Err() case ev, ok := <-pmw.blocksChannel: if !ok { + log.Info().Msg("returning NIL from insertBlocks, no new head") return nil } + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("planning to insertBlock, with new head") err := pmw.insertBlock(ctx, ev) if err != nil { + log.Info().Err(err).Msg("error in insertBlock, no new head") return err } + log.Info().Msg("calling ClearOldBlocks, before new head") pmw.clearOldBlocks(ev) } } } func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEvent) error { + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("trying to obtain lock for insertBlock new head") pmw.recentBlocksMux.Lock() + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("obtained lock for insertBlock new head") defer pmw.recentBlocksMux.Unlock() pmw.recentBlocks[ev.Header.Number.Uint64()] = ev if ev.Header.Number.Uint64() > pmw.mostRecentBlock { @@ -91,7 +108,15 @@ func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEve } func (pmw *P2PMsgsWatcher) clearOldBlocks(latestEv *BlockReceivedEvent) { + log.Info(). + Hex("block-hash", latestEv.Header.Hash().Bytes()). + Int64("block-number", latestEv.Header.Number.Int64()). + Msg("trying to obtain lock for clearOldBlocks new head") pmw.recentBlocksMux.Lock() + log.Info(). + Hex("block-hash", latestEv.Header.Hash().Bytes()). + Int64("block-number", latestEv.Header.Number.Int64()). + Msg("obtained lock for clearOldBlocks new head") defer pmw.recentBlocksMux.Unlock() tooOld := []uint64{} From dd0c02cef976d1a0fc8a784ecdf9ee2fec84567b Mon Sep 17 00:00:00 2001 From: Konrad Feldmeier Date: Mon, 30 Sep 2024 14:26:17 +0200 Subject: [PATCH 6/6] Revert "Check db before deleting old blocks from cache" This reverts commit 91ae241d48e41b637400dca806d3881e555ab3cc. --- internal/data/metrics.sql.go | 12 ------------ internal/data/sql/queries/metrics.sql | 4 ---- internal/metrics/tx_mapper_db.go | 17 ----------------- internal/metrics/types.go | 1 - internal/watcher/decryption_keys.go | 7 ------- 5 files changed, 41 deletions(-) diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index 85a8935..77b461a 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -348,18 +348,6 @@ func (q *Queries) CreateValidatorStatus(ctx context.Context, arg CreateValidator return err } -const queryBlock = `-- name: QueryBlock :one -SELECT COUNT(*) FROM block -WHERE block_number = $1 -` - -func (q *Queries) QueryBlock(ctx context.Context, blockNumber int64) (int64, error) { - row := q.db.QueryRow(ctx, queryBlock, blockNumber) - var count int64 - err := row.Scan(&count) - return count, err -} - const queryBlockFromSlot = `-- name: QueryBlockFromSlot :one SELECT block_hash, block_number, block_timestamp, created_at, updated_at, slot FROM block WHERE slot = $1 FOR UPDATE diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index b09b1c9..0b82da8 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -81,10 +81,6 @@ ON CONFLICT DO NOTHING; SELECT * FROM block WHERE slot = $1 FOR UPDATE; --- name: QueryBlock :one -SELECT COUNT(*) FROM block -WHERE block_number = $1; - -- name: CreateDecryptedTX :exec INSERT into decrypted_tx( slot, diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index 54ee349..233429c 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -189,23 +189,6 @@ func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeySh return nil } -func (tm *TxMapperDB) BlockExists(ctx context.Context, blockNumber int64) (bool, error) { - tx, err := tm.db.Begin(ctx) - if err != nil { - return false, err - } - qtx := tm.dbQuery.WithTx(tx) - count, err := qtx.QueryBlock(ctx, blockNumber) - if err != nil { - return false, err - } - if count == 1 { - return true, nil - } - return false, nil - -} - func (tm *TxMapperDB) AddBlock( ctx context.Context, b *data.Block, diff --git a/internal/metrics/types.go b/internal/metrics/types.go index 6fba4fd..ed78606 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -59,7 +59,6 @@ type TxMapper interface { ctx context.Context, b *data.Block, ) error - BlockExists(ctx context.Context, blockNumber int64) (bool, error) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) AddValidatorRegistryEvent(ctx context.Context, vr *validatorRegistryBindings.ValidatorregistryUpdated) error UpdateValidatorStatus(ctx context.Context) error diff --git a/internal/watcher/decryption_keys.go b/internal/watcher/decryption_keys.go index ae07047..d9018a2 100644 --- a/internal/watcher/decryption_keys.go +++ b/internal/watcher/decryption_keys.go @@ -126,13 +126,6 @@ func (pmw *P2PMsgsWatcher) clearOldBlocks(latestEv *BlockReceivedEvent) { } } for _, block := range tooOld { - exists, err := pmw.txMapper.BlockExists(context.Background(), int64(block)) - if err != nil { - log.Err(err).Uint64("block", block).Msg("could not query if block exists") - } - if !exists { - log.Err(fmt.Errorf("want to delete block that does not exist in db")).Uint64("block", block) - } delete(pmw.recentBlocks, block) } }