From c1a19fd9d973ea65763dab432b25cc2f1fe63454 Mon Sep 17 00:00:00 2001 From: alok Date: Wed, 13 Sep 2023 03:52:24 +0530 Subject: [PATCH 1/4] chore: refactor builder workflows --- cmd/builder/main.go | 210 +++++++++++++ go.mod | 18 +- go.sum | 32 +- pkg/apiserver/api.go | 135 +++++++++ pkg/apiserver/middleware.go | 36 +++ pkg/builder/api/api.go | 277 ++++++++++++++++++ pkg/builder/api/middleware.go | 36 +++ pkg/builder/node.go | 34 +++ pkg/builder/preconf/bid_worker.go | 89 ++++++ pkg/builder/preconf/bid_worker_test.go | 56 ++++ pkg/builder/searcherclient/searcher_client.go | 217 ++++++++++++++ pkg/builder/searcherclient/searcher_conn.go | 259 ++++++++++++++++ 12 files changed, 1374 insertions(+), 25 deletions(-) create mode 100644 cmd/builder/main.go create mode 100644 pkg/apiserver/api.go create mode 100644 pkg/apiserver/middleware.go create mode 100644 pkg/builder/api/api.go create mode 100644 pkg/builder/api/middleware.go create mode 100644 pkg/builder/node.go create mode 100644 pkg/builder/preconf/bid_worker.go create mode 100644 pkg/builder/preconf/bid_worker_test.go create mode 100644 pkg/builder/searcherclient/searcher_client.go create mode 100644 pkg/builder/searcherclient/searcher_conn.go diff --git a/cmd/builder/main.go b/cmd/builder/main.go new file mode 100644 index 0000000..74096c7 --- /dev/null +++ b/cmd/builder/main.go @@ -0,0 +1,210 @@ +package main + +import ( + "fmt" + "io" + "os" + "strconv" + "time" + + "github.com/primev/builder-boost/pkg/builder" + "github.com/urfave/cli/v2" + "golang.org/x/exp/slog" +) + +var ( + // version is the current version of the application. It is set during build. + version string = "dev" + // commit is the current commit of the application. It is set during build. + commit string = "dirty" + // commitTime is the time the application was built. It is set during build. + commitTime string +) + +var flags = []cli.Flag{ + &cli.StringFlag{ + Name: "loglvl", + Usage: "logging level: trace, debug, info, warn, error or fatal", + Value: "info", + EnvVars: []string{"LOGLVL"}, + }, + &cli.StringFlag{ + Name: "logfmt", + Usage: "format logs as text, json or none", + Value: "text", + EnvVars: []string{"LOGFMT"}, + }, + &cli.StringFlag{ + Name: "addr", + Usage: "server listen address", + Value: ":18550", + EnvVars: []string{"BOOST_ADDR"}, + }, + &cli.StringFlag{ + Name: "env", + Usage: "service environment (development, production, etc.)", + Value: "development", + EnvVars: []string{"ENV"}, + }, + &cli.StringFlag{ + Name: "agentaddr", + Usage: "datadog agent address", + Value: "", + EnvVars: []string{"AGENT_ADDR"}, + }, + &cli.StringFlag{ + Name: "rollupkey", + Usage: "Private key to interact with rollup", + Value: "", + EnvVars: []string{"ROLLUP_KEY"}, + Required: true, + }, + &cli.StringFlag{ + Name: "rollupaddr", + Usage: "Rollup RPC address", + Value: "https://ethereum-sepolia.blockpi.network/v1/rpc/public", + EnvVars: []string{"ROLLUP_ADDR"}, + }, + &cli.StringFlag{ + Name: "rollupcontract", + Usage: "Rollup contract address", + Value: "0x6219a236EFFa91567d5ba4a0A5134297a35b0b2A", + EnvVars: []string{"ROLLUP_CONTRACT"}, + }, + &cli.StringFlag{ + Name: "buildertoken", + Usage: "Token used to authenticate request as originating from builder", + Value: "", + EnvVars: []string{"BUILDER_AUTH_TOKEN"}, + Required: true, + }, + &cli.BoolFlag{ + Name: "metrics", + Usage: "enables metrics tracking for boost", + Value: false, + EnvVars: []string{"METRICS"}, + }, + &cli.StringFlag{ + Name: "dacontract", + Usage: "DA contract address", + Value: "0xac27A2cbdBA8768D49e359ebA326fC1F27832ED4", + EnvVars: []string{"ROLLUP_CONTRACT"}, + }, + &cli.StringFlag{ + Name: "daaddr", + Usage: "DA RPC address", + Value: "http://54.200.76.18:8545", + EnvVars: []string{"ROLLUP_ADDR"}, + }, + &cli.BoolFlag{ + Name: "inclusionlist", + Usage: "enables inclusion list for boost", + Value: false, + EnvVars: []string{"INCLUSION_LIST"}, + }, +} + +func main() { + + // Parse commit time + commitTimeInt, err := strconv.ParseInt(commitTime, 10, 64) + if err != nil { + commitTimeInt = time.Now().Unix() + } + + app := &cli.App{ + Name: "builder boost", + Version: fmt.Sprintf("%s-%s", version, commit), + Compiled: time.Unix(commitTimeInt, 0), + Usage: "entry point to primev protocol", + Flags: flags, + Action: run(), + } + + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(app.Writer, "exited with error: %v\n", err) + } +} + +func run() cli.ActionFunc { + return func(c *cli.Context) error { + logger, err := newLogger(c.String("loglvl"), c.String("logfmt"), c.App.Writer) + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) + } + + nd, err := builder.NewNode( + c.Context, + builder.Options{ + Logger: logger, + ServerAddr: c.String("addr"), + TracingEndpoint: c.String("agentaddr"), + RollupKey: c.String("rollupkey"), + RollupAddr: c.String("rollupaddr"), + RollupContract: c.String("rollupcontract"), + BuilderToken: c.String("buildertoken"), + MetricsEnabled: c.Bool("metrics"), + DAContract: c.String("dacontract"), + DARPCAddr: c.String("daaddr"), + InclusionListEnabled: c.Bool("inclusionlist"), + }, + ) + if err != nil { + return fmt.Errorf("failed to create node: %w", err) + } + + <-c.Done() + fmt.Fprintf(c.App.Writer, "shutting down...\n") + closed := make(chan struct{}) + + go func() { + defer close(closed) + + err := nd.Close() + if err != nil { + logger.Error("failed to close node", "error", err) + } + }() + + select { + case <-closed: + case <-time.After(5 * time.Second): + logger.Error("failed to close node in time") + } + + return nil + } +} + +func newLogger(lvl, logFmt string, sink io.Writer) (*slog.Logger, error) { + var ( + level = new(slog.LevelVar) // Info by default + handler slog.Handler + ) + + switch lvl { + case "debug": + level.Set(slog.LevelDebug) + case "info": + level.Set(slog.LevelInfo) + case "warn": + level.Set(slog.LevelWarn) + case "error": + level.Set(slog.LevelError) + default: + return nil, fmt.Errorf("invalid log level: %s", lvl) + } + + switch logFmt { + case "text": + handler = slog.NewTextHandler(sink, &slog.HandlerOptions{Level: level}) + case "none": + fallthrough + case "json": + handler = slog.NewJSONHandler(sink, &slog.HandlerOptions{Level: level}) + default: + return nil, fmt.Errorf("invalid log format: %s", logFmt) + } + + return slog.New(handler), nil +} diff --git a/go.mod b/go.mod index b7c7c70..65d3180 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/primev/builder-boost -go 1.18 +go 1.20 require ( github.com/alecthomas/assert v1.0.0 @@ -16,7 +16,8 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.2 github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa - golang.org/x/sync v0.1.0 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 + golang.org/x/sync v0.3.0 gopkg.in/DataDog/dd-trace-go.v1 v1.50.1 ) @@ -70,7 +71,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect - golang.org/x/crypto v0.7.0 + golang.org/x/crypto v0.13.0 golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1 // indirect @@ -168,12 +169,11 @@ require ( go.uber.org/fx v1.19.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect - golang.org/x/mod v0.10.0 // indirect - golang.org/x/net v0.8.0 // indirect - golang.org/x/sys v0.7.0 // indirect - golang.org/x/text v0.8.0 // indirect - golang.org/x/tools v0.7.0 // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/net v0.15.0 // indirect + golang.org/x/sys v0.12.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/tools v0.13.0 // indirect gonum.org/v1/gonum v0.11.0 // indirect lukechampine.com/blake3 v1.1.7 // indirect nhooyr.io/websocket v1.8.7 // indirect diff --git a/go.sum b/go.sum index accb3ed..2cff937 100644 --- a/go.sum +++ b/go.sum @@ -622,11 +622,11 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -639,8 +639,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= -golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -666,8 +666,8 @@ golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -681,8 +681,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -726,8 +726,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -737,8 +737,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -765,8 +765,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= -golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/apiserver/api.go b/pkg/apiserver/api.go new file mode 100644 index 0000000..b47b474 --- /dev/null +++ b/pkg/apiserver/api.go @@ -0,0 +1,135 @@ +package apiserver + +import ( + "bytes" + "encoding/json" + "errors" + "expvar" + "fmt" + "net/http" + "net/http/pprof" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/exp/slog" +) + +const ( + defaultNamespace = "primev" +) + +type searcherKey struct{} + +type API struct { + *http.Server + + metricsRegistry *prometheus.Registry + router *http.ServeMux + logger *slog.Logger +} + +func NewAPI() *API { + return &API{} +} + +func (a *API) registerDebugEndpoints() { + // register metrics handler + a.router.Handle("/metrics", promhttp.HandlerFor(a.MetricsRegistry(), promhttp.HandlerOpts{})) + + // register pprof handlers + a.router.Handle( + "/debug/pprof", + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + u := r.URL + u.Path += "/" + http.Redirect(w, r, u.String(), http.StatusPermanentRedirect) + }), + ) + a.router.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + a.router.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + a.router.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + a.router.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + a.router.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + a.router.Handle("/debug/pprof/{profile}", http.HandlerFunc(pprof.Index)) + a.router.Handle("/debug/vars", expvar.Handler()) +} + +func newMetrics(version string) (r *prometheus.Registry) { + r = prometheus.NewRegistry() + + // register standard metrics + r.MustRegister( + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{ + Namespace: defaultNamespace, + }), + collectors.NewGoCollector(), + prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: defaultNamespace, + Name: "info", + Help: "builder-boost information.", + ConstLabels: prometheus.Labels{ + "version": version, + }, + }), + ) + + return r +} + +func (a *API) MetricsRegistry() *prometheus.Registry { + return a.metricsRegistry +} + +func (a *API) Router() *http.ServeMux { + return a.router +} + +type statusResponse struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func WriteResponse(w http.ResponseWriter, code int, message any) error { + var b bytes.Buffer + switch message.(type) { + case string: + err := json.NewEncoder(&b).Encode(statusResponse{Code: code, Message: message.(string)}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to encode status response: %w", err) + } + default: + err := json.NewEncoder(&b).Encode(message) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to encode response: %w", err) + } + } + + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, b.String()) + return nil +} + +func MethodHandler(method string, handler http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != method { + WriteResponse(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + handler(w, r) + } +} + +func BindJSON[T any](w http.ResponseWriter, r *http.Request) (T, error) { + var body T + + if r.Body == nil { + return body, errors.New("no body") + } + defer r.Body.Close() + + return body, json.NewDecoder(r.Body).Decode(&body) +} diff --git a/pkg/apiserver/middleware.go b/pkg/apiserver/middleware.go new file mode 100644 index 0000000..f191b33 --- /dev/null +++ b/pkg/apiserver/middleware.go @@ -0,0 +1,36 @@ +package apiserver + +import ( + "net/http" + "time" + + "golang.org/x/exp/slog" +) + +type responseStatusRecorder struct { + http.ResponseWriter + status int + size int +} + +func (r *responseStatusRecorder) WriteHeader(status int) { + r.status = status + r.ResponseWriter.WriteHeader(status) +} + +func newAccessLogHandler(log *slog.Logger) func(http.Handler) http.Handler { + return func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + recorder := &responseStatusRecorder{ResponseWriter: w} + + start := time.Now() + h.ServeHTTP(recorder, req) + log.Info("api access", + "status", recorder.status, + "method", req.Method, + "path", req.URL.Path, + "duration", time.Since(start), + ) + }) + } +} diff --git a/pkg/builder/api/api.go b/pkg/builder/api/api.go new file mode 100644 index 0000000..0eda519 --- /dev/null +++ b/pkg/builder/api/api.go @@ -0,0 +1,277 @@ +package api + +import ( + "errors" + "net/http" + "time" + + "github.com/attestantio/go-builder-client/api/capella" + "github.com/ethereum/go-ethereum/common" + "github.com/gorilla/websocket" + "github.com/primev/builder-boost/pkg/apiserver" + "github.com/primev/builder-boost/pkg/builder/searcherclient" + "github.com/primev/builder-boost/pkg/rollup" + "github.com/primev/builder-boost/pkg/utils" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/slog" +) + +const ( + defaultNamespace = "primev" +) + +type searcherKey struct{} + +type API struct { + metricsRegistry *prometheus.Registry + router *http.ServeMux + rollUp rollup.Rollup + logger *slog.Logger + sclient searcherclient.SearcherClient +} + +func NewAPI() *API { + return &API{} +} + +// IDResponse is a simple struct for returning an ID +type IDResponse struct { + ID string `json:"id"` +} + +// handleBuilderID returns the builder ID as an IDResponse +func (a *API) handleBuilderID(w http.ResponseWriter, r *http.Request) { + logger := a.logger.With("method", "handleBuilderID") + resp := IDResponse{ID: a.rollUp.GetBuilderAddress().Hex()} + + err := apiserver.WriteResponse(w, http.StatusOK, resp) + if err != nil { + logger.Error("error writing response", "err", err) + } +} + +// CommitmentResponse is a simple struct for returning a commitment +type CommitmentResponse struct { + Commitment string `json:"commitment"` +} + +// handleSearcherCommitment returns the searcher commitment as a CommitmentResponse +func (a *API) handleSearcherCommitment(w http.ResponseWriter, r *http.Request) { + logger := a.logger.With("method", "handleSearcherCommitment") + + searcherAddress, ok := r.Context().Value(searcherKey{}).(common.Address) + if !ok { + logger.Error("error getting searcher address from context") + err := apiserver.WriteResponse(w, http.StatusBadRequest, "searcher address not found") + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + commitment := CommitmentResponse{Commitment: a.rollUp.GetCommitment(searcherAddress).Hex()} + err := apiserver.WriteResponse(w, http.StatusOK, commitment) + if err != nil { + logger.Error("error writing response", "err", err) + } +} + +// connectSearcher is the handler to connect a searcher to the builder for the websocket execution hints +// TODO(@ckartik): Move the handling of searcher connection to service layer +// +// GET /ws?token=abcd where "abcd" is the authentication token of the searcher +// The handler authenticates based on the following criteria: +// 1. The token is valid +// 2. The searcher behind the token has active subscription +// 3. The searcher behind the token is not already connected +func (a *API) connectSearcher(w http.ResponseWriter, r *http.Request) { + logger := a.logger.With("method", "connectSearcher") + + // Use verification scheme on token + token := r.URL.Query().Get("token") + if token == "" { + logger.Error("token is not provided") + err := apiserver.WriteResponse(w, http.StatusBadRequest, "token is not provided") + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + builderAddress := a.rollUp.GetBuilderAddress() + + searcherAddress, ok := utils.VerifyAuthenticationToken(token, builderAddress.Hex()) + if !ok { + logger.Error("token is not valid", "token", token) + err := apiserver.WriteResponse(w, http.StatusForbidden, "token is not valid") + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + _, err := a.rollUp.GetMinimalStake(builderAddress) + if err != nil { + if errors.Is(rollup.ErrNoMinimalStakeSet, err) { + logger.Error( + "no minimal stake in the rollup contract", + "builder_address", builderAddress, + ) + err := apiserver.WriteResponse( + w, + http.StatusForbidden, + "no minimal stake in the rollup contract", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + logger.Error("failed to get minimal stake", "err", err) + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "failed to get minimal stake", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + commitment := a.rollUp.GetCommitment(searcherAddress) + blockNumber, err := a.rollUp.GetBlockNumber() + if err != nil { + logger.Error("failed to get block number", "err", err) + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "failed to get block number", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + subscriptionEnd, err := a.rollUp.GetSubscriptionEnd(commitment) + if err != nil { + logger.Error("failed to get subscription end", "err", err) + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "failed to get subscription end", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + // Check is subscription is expired + if subscriptionEnd.Cmp(blockNumber) < 0 { + logger.Error("subscription is expired", "searcher", searcherAddress.Hex()) + err := apiserver.WriteResponse( + w, + http.StatusForbidden, + "subscription is expired", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + if a.sclient.IsConnected(searcherAddress.Hex()) { + logger.Error("searcher is already connected, closing old connection", + "searcher", searcherAddress.Hex(), + ) + a.sclient.RemoveSearcher(searcherAddress.Hex()) + } + + ws := websocket.Upgrader{ + ReadBufferSize: 1028, + WriteBufferSize: 1028, + CheckOrigin: func(r *http.Request) bool { return true }, + } + + conn, err := ws.Upgrade(w, r, nil) + if err != nil { + logger.Error("failed to upgrade connection", "err", err) + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "failed to upgrade connection", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + searcher := searcherclient.NewSearcher( + searcherAddress.Hex(), + conn, + a.rollUp, + a.logger.With("searcher", searcherAddress.Hex()), + subscriptionEnd.Int64(), + func(reason string) { a.sclient.Disconnected(searcherAddress.Hex(), reason) }, + ) + a.sclient.AddSearcher(searcher) + + logger.Info("searcher attempting connection", + "searcher", searcherAddress.Hex(), + "block_number", blockNumber, + "subscription_end", subscriptionEnd, + ) +} + +// builder related handlers +func (a *API) submitBlock(w http.ResponseWriter, r *http.Request) { + logger := a.logger.With("method", "submitBlock") + + br, err := apiserver.BindJSON[capella.SubmitBlockRequest](w, r) + if err != nil { + logger.Error("failed to decode submit block request", "err", err) + return + } + + if err := a.sclient.SubmitBlock(r.Context(), &br); err != nil { + logger.Error("failed to submit block", "err", err) + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "failed to submit block", + ) + if err != nil { + logger.Error("error writing response", "err", err) + } + return + } + + // if a.MetricsEnabled { + // a.metrics.Duration.WithLabelValues("algo_processing", "N/A").Observe(time.Since(now).Seconds()) + // a.metrics.PayloadsRecieved.Inc() + // } + err = apiserver.WriteResponse(w, http.StatusOK, "block submitted") + if err != nil { + logger.Error("error writing response", "err", err) + } +} + +type healthCheck struct { + Searchers []string `json:"connected_searchers"` + WorkerHeartBeat time.Time `json:"worker_heartbeat"` +} + +// healthCheck detremines if the service is healthy +// how many connections are open +func (a *API) handleHealthCheck(w http.ResponseWriter, r *http.Request) { + logger := a.logger.With("method", "handleHealthCheck") + sInfo := a.sclient.GetSeacherInfo() + + err := apiserver.WriteResponse(w, http.StatusOK, sInfo) + if err != nil { + logger.Error("error writing response", "err", err) + } +} diff --git a/pkg/builder/api/middleware.go b/pkg/builder/api/middleware.go new file mode 100644 index 0000000..4992225 --- /dev/null +++ b/pkg/builder/api/middleware.go @@ -0,0 +1,36 @@ +package api + +import ( + "net/http" + "time" + + "golang.org/x/exp/slog" +) + +type responseStatusRecorder struct { + http.ResponseWriter + status int + size int +} + +func (r *responseStatusRecorder) WriteHeader(status int) { + r.status = status + r.ResponseWriter.WriteHeader(status) +} + +func newAccessLogHandler(log *slog.Logger) func(http.Handler) http.Handler { + return func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + recorder := &responseStatusRecorder{ResponseWriter: w} + + start := time.Now() + h.ServeHTTP(recorder, req) + log.Info("api access", + "status", recorder.status, + "method", req.Method, + "path", req.URL.Path, + "duration", time.Since(start), + ) + }) + } +} diff --git a/pkg/builder/node.go b/pkg/builder/node.go new file mode 100644 index 0000000..99e6206 --- /dev/null +++ b/pkg/builder/node.go @@ -0,0 +1,34 @@ +package builder + +import ( + "context" + "io" + + "golang.org/x/exp/slog" +) + +// Options are the options for the builder node +type Options struct { + Logger *slog.Logger + ServerAddr string + TracingEndpoint string + RollupKey string + RollupAddr string + RollupContract string + BuilderToken string + MetricsEnabled bool + DAContract string + DARPCAddr string + InclusionListEnabled bool +} + +type Node struct { + io.Closer +} + +func NewNode( + ctx context.Context, + opts Options, +) (*Node, error) { + return &Node{}, nil +} diff --git a/pkg/builder/preconf/bid_worker.go b/pkg/builder/preconf/bid_worker.go new file mode 100644 index 0000000..e8a5e13 --- /dev/null +++ b/pkg/builder/preconf/bid_worker.go @@ -0,0 +1,89 @@ +package builder + +import ( + "crypto/ecdsa" + "io" + + "github.com/primev/builder-boost/pkg/preconf" + "golang.org/x/exp/slog" +) + +type BidReader interface { + BidReader() <-chan preconf.PreConfBid +} + +type CommitmentWriter interface { + WriteCommitment(commitment preconf.PreconfCommitment) error +} + +type PreconfWorker struct { + io.Closer + + reader BidReader + writer CommitmentWriter + logger *slog.Logger + builderKey *ecdsa.PrivateKey + quit chan struct{} +} + +func NewPreconfWorker( + reader BidReader, + writer CommitmentWriter, + logger *slog.Logger, + builderKey *ecdsa.PrivateKey, +) *PreconfWorker { + p := &PreconfWorker{ + reader: reader, + writer: writer, + logger: logger, + builderKey: builderKey, + quit: make(chan struct{}), + } + + go p.run() + return p +} + +func (w *PreconfWorker) run() { + for { + select { + case <-w.quit: + w.logger.Info("preconf worker quit") + return + case bid, more := <-w.reader.BidReader(): + if !more { + w.logger.Info("preconf bid reader closed, closing preconf worker") + return + } + address, err := bid.VerifySearcherSignature() + if err != nil { + w.logger.Error("failed to verify searcher signature", "err", err) + continue + } + + w.logger.Info("preconf bid verified", + "address", address.Hex(), + "bid_tnx", bid.TxnHash, + "bid_amt", bid.GetBidAmt(), + ) + + commitment, err := bid.ConstructCommitment(w.builderKey) + if err != nil { + w.logger.Error("failed to construct commitment", "err", err) + continue + } + + w.logger.Info("commitment constructed", "commitment", commitment) + + err = w.writer.WriteCommitment(commitment) + if err != nil { + w.logger.Error("failed to write commitment", "err", err) + } + } + } +} + +func (w *PreconfWorker) Close() error { + close(w.quit) + return nil +} diff --git a/pkg/builder/preconf/bid_worker_test.go b/pkg/builder/preconf/bid_worker_test.go new file mode 100644 index 0000000..024fff8 --- /dev/null +++ b/pkg/builder/preconf/bid_worker_test.go @@ -0,0 +1,56 @@ +package builder_test + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "os" + "testing" + + "github.com/primev/builder-boost/pkg/builder" + "github.com/primev/builder-boost/pkg/preconf" + "golang.org/x/exp/slog" +) + +type testBidReader struct { +} + +func (r *testBidReader) BidReader() <-chan preconf.PreConfBid { + return nil +} + +type testCommitmentWriter struct { +} + +func (w *testCommitmentWriter) WriteCommitment(commitment preconf.PreconfCommitment) error { + return nil +} + +func newTestLogger() *slog.Logger { + testLogger := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }) + return slog.New(testLogger) +} + +func TestPreconfWorker(t *testing.T) { + t.Parallel() + + privateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatal(err) + } + + reader := &testBidReader{} + writer := &testCommitmentWriter{} + + worker := builder.NewPreconfWorker(reader, writer, newTestLogger(), privateKey) + + t.Cleanup(func() { + err := worker.Close() + if err != nil { + t.Fatal(err) + } + }) + +} diff --git a/pkg/builder/searcherclient/searcher_client.go b/pkg/builder/searcherclient/searcher_client.go new file mode 100644 index 0000000..8e597a8 --- /dev/null +++ b/pkg/builder/searcherclient/searcher_client.go @@ -0,0 +1,217 @@ +package searcherclient + +import ( + "context" + "encoding/binary" + "errors" + "io" + "math/big" + "sync" + "time" + + "github.com/attestantio/go-builder-client/api/capella" + "github.com/ethereum/go-ethereum/core/types" + "golang.org/x/exp/slog" +) + +// SearcherInfo is a struct containing information about a searcher +type SearcherInfo struct { + ID string `json:"id"` + Latency time.Duration `json:"latency"` + Validity time.Time `json:"validity"` + Heartbeat time.Time `json:"heartbeat"` +} + +// SearcherClient is an interface for managing searcher connections +type SearcherClient interface { + // AddSearcher adds a searcher to the client + AddSearcher(s Searcher) + // RemoveSearcher removes a searcher from the client + RemoveSearcher(id string) + // Disconnected is called when a searcher disconnects + Disconnected(id, reason string) + // IsConnected returns true if the searcher is connected + IsConnected(id string) bool + // Send sends a payload to all relevant searchers + SubmitBlock(ctx context.Context, payload *capella.SubmitBlockRequest) error + // GetSearcherInfo returns a list of searcher info + GetSeacherInfo() []SearcherInfo + + io.Closer +} + +type Transaction struct { + Count int64 `json:"count"` + MinPriorityFee int64 `json:"MinPriorityFee"` + MaxPriorityFee int64 `json:"MaxPriorityFee"` +} + +type Metadata struct { + Builder string `json:"builder"` + Number int64 `json:"number"` + BlockHash string `json:"blockHash"` + Timestamp string `json:"timestamp"` + BaseFee uint32 `json:"baseFee"` + Transactions Transaction `json:"standard_transactions"` + ClientTransactions []string `json:"personal_transactions,omitempty"` + SentTimestamp time.Time `json:"sent_timestamp"` // Timestamp of block sent to the searcher + RecTimestamp time.Time `json:"rec_timestamp"` // Timestamp of block received by the builder instance +} + +type SuperPayload struct { + InternalMetadata Metadata + SearcherTxns map[string][]string +} + +type searcherClient struct { + mu sync.Mutex + searchers map[string]Searcher + disconnected map[string]string + inclusionProofActive bool + logger *slog.Logger +} + +func NewSearcherClient(logger *slog.Logger, inclusionProof bool) SearcherClient { + return &searcherClient{ + searchers: make(map[string]Searcher), + logger: logger, + inclusionProofActive: inclusionProof, + } +} + +func (s *searcherClient) AddSearcher(searcher Searcher) { + s.mu.Lock() + defer s.mu.Unlock() + s.searchers[searcher.ID()] = searcher + delete(s.disconnected, searcher.ID()) +} + +func (s *searcherClient) RemoveSearcher(id string) { + s.mu.Lock() + defer s.mu.Unlock() + searcher, found := s.searchers[id] + if !found { + return + } + err := searcher.Close() + if err != nil { + s.logger.Error("failed to close searcher", "err", err) + } + delete(s.searchers, searcher.ID()) +} + +func (s *searcherClient) Disconnected(id, reason string) { + s.mu.Lock() + defer s.mu.Unlock() + s.disconnected[id] = reason + delete(s.searchers, id) +} + +func (s *searcherClient) SubmitBlock( + ctx context.Context, + payload *capella.SubmitBlockRequest, +) error { + if payload == nil || len(payload.ExecutionPayload.Transactions) == 0 { + return nil + } + + minTipTxn, maxTipTxn := big.NewInt(0), big.NewInt(0) + ts := time.Unix(int64(payload.ExecutionPayload.Timestamp), 0).Format(time.RFC1123) + baseFee := binary.LittleEndian.Uint32(payload.ExecutionPayload.BaseFeePerGas[:]) + + blockMetadata := SuperPayload{ + InternalMetadata: Metadata{ + RecTimestamp: time.Now(), + Builder: payload.Message.BuilderPubkey.String(), + Number: int64(payload.ExecutionPayload.BlockNumber), + BlockHash: payload.Message.BlockHash.String(), + Timestamp: ts, + BaseFee: baseFee, + Transactions: Transaction{ + Count: int64(len(payload.ExecutionPayload.Transactions)), + }, + }, + } + if s.inclusionProofActive { + blockMetadata.SearcherTxns = make(map[string][]string) + } + + for _, btxn := range payload.ExecutionPayload.Transactions { + var txn types.Transaction + err := txn.UnmarshalBinary(btxn) + if err != nil { + s.logger.Error("failed to decode transaction", "err", err) + continue + } + // Extract Min/Max + if txn.GasTipCap().Cmp(minTipTxn) < 0 { + minTipTxn = txn.GasTipCap() + } + if txn.GasTipCap().Cmp(maxTipTxn) > 0 { + maxTipTxn = txn.GasTipCap() + } + + from, err := types.Sender(types.LatestSignerForChainID(txn.ChainId()), &txn) + if err != nil { + s.logger.Error("failed to decode sender of transaction", + "err", err, + "txn", txn.Hash().String(), + ) + continue + } + clientID := from.Hex() + if s.inclusionProofActive { + blockMetadata.SearcherTxns[clientID] = append( + blockMetadata.SearcherTxns[clientID], + txn.Hash().String(), + ) + } + } + + blockMetadata.InternalMetadata.Transactions.MinPriorityFee = minTipTxn.Int64() + blockMetadata.InternalMetadata.Transactions.MaxPriorityFee = maxTipTxn.Int64() + + s.logger.Info("block metadata processed", "blockMetadata", blockMetadata) + + s.mu.Lock() + defer s.mu.Unlock() + + for _, searcher := range s.searchers { + if _, ok := blockMetadata.SearcherTxns[searcher.ID()]; ok { + searcher.Send(ctx, blockMetadata) + } + } + + return nil +} + +func (s *searcherClient) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + var err error + for _, searcher := range s.searchers { + err = errors.Join(searcher.Close(), err) + delete(s.searchers, searcher.ID()) + } + + return err +} + +func (s *searcherClient) IsConnected(id string) bool { + s.mu.Lock() + defer s.mu.Unlock() + _, exists := s.searchers[id] + return exists +} + +func (s *searcherClient) GetSeacherInfo() []SearcherInfo { + s.mu.Lock() + defer s.mu.Unlock() + + searcherInfo := make([]SearcherInfo, 0, len(s.searchers)) + for _, searcher := range s.searchers { + searcherInfo = append(searcherInfo, searcher.GetInfo()) + } + return searcherInfo +} diff --git a/pkg/builder/searcherclient/searcher_conn.go b/pkg/builder/searcherclient/searcher_conn.go new file mode 100644 index 0000000..bfeb498 --- /dev/null +++ b/pkg/builder/searcherclient/searcher_conn.go @@ -0,0 +1,259 @@ +package searcherclient + +import ( + "context" + "encoding/json" + "errors" + "io" + "math/big" + "sync" + "time" + + "github.com/gorilla/websocket" + "golang.org/x/exp/slog" + "golang.org/x/sync/errgroup" +) + +// Searcher is the interface that wraps the lifecycle of a searcher connection. +type Searcher interface { + // ID returns the ID of the searcher. + ID() string + // Send sends a payload to the searcher. + Send(ctx context.Context, payload SuperPayload) error + // GetInfo returns information about the searcher. + GetInfo() SearcherInfo + + io.Closer +} + +// RollUp is the interface that wraps the block number of a rollup contract. +type RollUp interface { + GetBlockNumber() (*big.Int, error) +} + +// DisconnectHandler is a function that is called when a searcher disconnects. +type DisconnectHandler func(reason string) + +var ( + // ErrSearcherDisconnected is returned when a searcher is disconnected. + ErrSearcherDisconnected = errors.New("searcher disconnected") + // ErrSearcherSubscriptionEnded is returned when a searcher's subscription has ended. + ErrSearcherSubscriptionEnded = errors.New("searcher subscription ended") + // ErrQuit is returned when user issues a close. + ErrQuit = errors.New("server quit") + // ErrIO is returned when there is an IO error. + ErrIO = errors.New("IO error") +) + +// NewSearcher creates a new searcher connection. +func NewSearcher( + id string, + conn *websocket.Conn, + rollup RollUp, + logger *slog.Logger, + subscriptionEnd int64, + disconnectCB DisconnectHandler, +) Searcher { + s := &searcherConn{ + wsConn: conn, + searcherID: id, + quit: make(chan struct{}), + inbox: make(chan wsMsg), + outbox: make(chan wsMsg), + logger: logger, + rollup: rollup, + subscriptionEnd: subscriptionEnd, + heartbeat: time.Now().Unix(), + disconnectCB: disconnectCB, + } + + go s.startWorker() + return s +} + +type wsMsg struct { + msgType int + msg []byte +} + +type searcherConn struct { + wsConn *websocket.Conn + searcherID string + quit chan struct{} + mu sync.Mutex + disconnected bool + subscriptionEnd int64 + logger *slog.Logger + heartbeat int64 + inbox chan wsMsg + outbox chan wsMsg + rollup RollUp + disconnectCB DisconnectHandler +} + +func (s *searcherConn) isDisconnected() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.disconnected +} + +func (s *searcherConn) setDisconnected() { + s.mu.Lock() + defer s.mu.Unlock() + s.disconnected = true +} + +func (s *searcherConn) isSubscriptionEnded() bool { + blkNo, _ := s.rollup.GetBlockNumber() + return blkNo.Int64() > s.subscriptionEnd +} + +func (s *searcherConn) setHeartbeat() { + s.mu.Lock() + defer s.mu.Unlock() + s.heartbeat = time.Now().Unix() +} + +func (s *searcherConn) startWorker() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + eg, egCtx := errgroup.WithContext(ctx) + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + // Start the reader + eg.Go(func() error { + for { + select { + case <-s.quit: + return ErrQuit + case <-egCtx.Done(): + return egCtx.Err() + default: + } + msgType, msg, err := s.wsConn.ReadMessage() + if err != nil { + return errors.Join(ErrIO, err) + } + select { + case s.inbox <- wsMsg{msgType: msgType, msg: msg}: + case <-s.quit: + return ErrQuit + } + } + }) + + eg.Go(func() error { + for { + select { + case <-s.quit: + return ErrQuit + case msg := <-s.inbox: + switch msg.msgType { + case websocket.CloseMessage: + return ErrSearcherDisconnected + case websocket.PingMessage: + err := s.wsConn.WriteMessage(websocket.PongMessage, []byte("pong")) + if err != nil { + s.logger.Error("failed to send pong", "err", err) + return errors.Join(ErrIO, err) + } + case websocket.PongMessage: + default: + s.logger.Error("unknown message type", "msgType", msg.msgType) + // non-fatal error + } + s.setHeartbeat() + case payload := <-s.outbox: + err := s.wsConn.WriteMessage(payload.msgType, payload.msg) + if err != nil { + s.logger.Error("failed to send payload", "err", err) + return errors.Join(ErrIO, err) + } + s.setHeartbeat() + case <-ticker.C: + if s.isSubscriptionEnded() { + err := s.wsConn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage( + websocket.ClosePolicyViolation, + "subscription ended", + ), + ) + return errors.Join(ErrSearcherSubscriptionEnded, err) + } + + if time.Now().Unix()-s.heartbeat > 10 { + err := s.wsConn.WriteMessage(websocket.PingMessage, []byte("ping")) + if err != nil { + s.logger.Error("failed to send ping", "err", err) + return errors.Join(ErrIO, err) + } + } + } + } + }) + + switch err := eg.Wait(); { + case err == nil: + // this should never happen + s.logger.Error("worker exited with no error") + case errors.Is(err, ErrQuit): + s.logger.Info("worker stopped", "err", err) + case errors.Is(err, ErrSearcherDisconnected): + s.logger.Error("searcher disconnected", "err", err) + s.disconnectCB(err.Error()) + case errors.Is(err, ErrSearcherSubscriptionEnded): + s.logger.Error("searcher subscription ended", "err", err) + s.disconnectCB(err.Error()) + default: + s.logger.Error("could not reach searcher", "err", err) + s.disconnectCB(err.Error()) + } +} + +func (s *searcherConn) ID() string { + return s.searcherID +} + +func (s *searcherConn) GetInfo() SearcherInfo { + return SearcherInfo{ + ID: s.searcherID, + Heartbeat: time.Unix(s.heartbeat, 0), + } +} + +func (s *searcherConn) Close() error { + close(s.quit) + return s.wsConn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "closing connection"), + ) +} + +func (s *searcherConn) Send(ctx context.Context, payload SuperPayload) error { + jsonPayload, err := json.Marshal(payload) + if err != nil { + s.logger.Error("failed to marshal payload", "err", err) + return err + } + + select { + case <-s.quit: + return ErrQuit + default: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case s.outbox <- wsMsg{ + msgType: websocket.TextMessage, + msg: jsonPayload, + }: + } + + return nil +} From d24f98a0206d21dbd2e6d3c955815e2a5bdfd7da Mon Sep 17 00:00:00 2001 From: alok Date: Thu, 14 Sep 2023 23:57:33 +0530 Subject: [PATCH 2/4] chore: refactor builder workflows --- pkg/apiserver/api.go | 76 +--- pkg/apiserver/helpers.go | 64 ++++ pkg/builder/api/api.go | 120 +++++- pkg/builder/api/middleware.go | 36 -- pkg/builder/node.go | 1 + pkg/builder/preconf/bid_worker_test.go | 2 +- pkg/builder/searcherclient/searcher_client.go | 35 +- .../searcherclient/searcher_client_test.go | 213 +++++++++++ pkg/builder/searcherclient/searcher_conn.go | 177 +++------ .../searcherclient/searcher_conn_test.go | 346 ++++++++++++++++++ 10 files changed, 815 insertions(+), 255 deletions(-) create mode 100644 pkg/apiserver/helpers.go delete mode 100644 pkg/builder/api/middleware.go create mode 100644 pkg/builder/searcherclient/searcher_client_test.go create mode 100644 pkg/builder/searcherclient/searcher_conn_test.go diff --git a/pkg/apiserver/api.go b/pkg/apiserver/api.go index b47b474..92cbd1f 100644 --- a/pkg/apiserver/api.go +++ b/pkg/apiserver/api.go @@ -1,11 +1,7 @@ package apiserver import ( - "bytes" - "encoding/json" - "errors" "expvar" - "fmt" "net/http" "net/http/pprof" @@ -21,7 +17,7 @@ const ( type searcherKey struct{} -type API struct { +type Service struct { *http.Server metricsRegistry *prometheus.Registry @@ -29,13 +25,13 @@ type API struct { logger *slog.Logger } -func NewAPI() *API { - return &API{} +func New() *Service { + return &Service{} } -func (a *API) registerDebugEndpoints() { +func (a *Service) registerDebugEndpoints() { // register metrics handler - a.router.Handle("/metrics", promhttp.HandlerFor(a.MetricsRegistry(), promhttp.HandlerOpts{})) + a.router.Handle("/metrics", promhttp.HandlerFor(a.metricsRegistry, promhttp.HandlerOpts{})) // register pprof handlers a.router.Handle( @@ -77,59 +73,19 @@ func newMetrics(version string) (r *prometheus.Registry) { return r } -func (a *API) MetricsRegistry() *prometheus.Registry { - return a.metricsRegistry -} - -func (a *API) Router() *http.ServeMux { - return a.router -} - -type statusResponse struct { - Code int `json:"code"` - Message string `json:"message"` -} - -func WriteResponse(w http.ResponseWriter, code int, message any) error { - var b bytes.Buffer - switch message.(type) { - case string: - err := json.NewEncoder(&b).Encode(statusResponse{Code: code, Message: message.(string)}) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return fmt.Errorf("failed to encode status response: %w", err) - } - default: - err := json.NewEncoder(&b).Encode(message) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return fmt.Errorf("failed to encode response: %w", err) - } +func (a *Service) ChainHandlers( + path string, + handler http.Handler, + mws ...func(http.Handler) http.Handler, +) { + h := handler + for i := len(mws) - 1; i > 0; i-- { + h = mws[i](h) } - - w.WriteHeader(code) - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, b.String()) - return nil + a.router.Handle(path, h) } -func MethodHandler(method string, handler http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.Method != method { - WriteResponse(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - handler(w, r) - } +func (a *Service) RegisterMetricsCollectors(cs ...prometheus.Collector) { + a.metricsRegistry.MustRegister(cs...) } -func BindJSON[T any](w http.ResponseWriter, r *http.Request) (T, error) { - var body T - - if r.Body == nil { - return body, errors.New("no body") - } - defer r.Body.Close() - - return body, json.NewDecoder(r.Body).Decode(&body) -} diff --git a/pkg/apiserver/helpers.go b/pkg/apiserver/helpers.go new file mode 100644 index 0000000..610881e --- /dev/null +++ b/pkg/apiserver/helpers.go @@ -0,0 +1,64 @@ +package apiserver + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" +) + +type statusResponse struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// WriteResponse helper is used to write a response to the client with the given code +// and message. If the message is a string, it will be wrapped in a statusResponse +// struct. Otherwise, the message will be encoded as JSON. +func WriteResponse(w http.ResponseWriter, code int, message any) error { + var b bytes.Buffer + switch message.(type) { + case string: + err := json.NewEncoder(&b).Encode(statusResponse{Code: code, Message: message.(string)}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to encode status response: %w", err) + } + default: + err := json.NewEncoder(&b).Encode(message) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to encode response: %w", err) + } + } + + w.WriteHeader(code) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, b.String()) + return nil +} + +// MethodHandler helper is used to wrap a handler and ensure that the request method +// matches the given method. If the method does not match, a 405 is returned. +func MethodHandler(method string, handler http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != method { + WriteResponse(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + handler(w, r) + } +} + +// BindJSON helper is used to bind the request body to the given type. +func BindJSON[T any](w http.ResponseWriter, r *http.Request) (T, error) { + var body T + + if r.Body == nil { + return body, errors.New("no body") + } + defer r.Body.Close() + + return body, json.NewDecoder(r.Body).Decode(&body) +} diff --git a/pkg/builder/api/api.go b/pkg/builder/api/api.go index 0eda519..08de4ae 100644 --- a/pkg/builder/api/api.go +++ b/pkg/builder/api/api.go @@ -1,6 +1,7 @@ package api import ( + "context" "errors" "net/http" "time" @@ -12,7 +13,6 @@ import ( "github.com/primev/builder-boost/pkg/builder/searcherclient" "github.com/primev/builder-boost/pkg/rollup" "github.com/primev/builder-boost/pkg/utils" - "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slog" ) @@ -22,16 +22,61 @@ const ( type searcherKey struct{} -type API struct { - metricsRegistry *prometheus.Registry - router *http.ServeMux - rollUp rollup.Rollup - logger *slog.Logger - sclient searcherclient.SearcherClient +type api struct { + builderToken string + rollUp rollup.Rollup + logger *slog.Logger + sclient searcherclient.SearcherClient } -func NewAPI() *API { - return &API{} +type APIServer interface { + ChainHandlers(string, http.Handler, ...func(http.Handler) http.Handler) +} + +// RegisterAPI registers the API handlers with the provided server. It doesnt +// return anything as it is assumed that the server will be started after +// registration. Lifecycle of arguments is the responsibility of the caller. +func RegisterAPI( + token string, + server APIServer, + rollUp rollup.Rollup, + logger *slog.Logger, + sclient searcherclient.SearcherClient, +) { + + a := &api{ + builderToken: token, + rollUp: rollUp, + logger: logger, + sclient: sclient, + } + server.ChainHandlers( + "/health", + apiserver.MethodHandler(http.MethodGet, a.handleHealthCheck), + a.authSearcher, + ) + + server.ChainHandlers( + "/builder", + apiserver.MethodHandler(http.MethodGet, a.handleBuilderID), + a.authenticateBuilder, + ) + + server.ChainHandlers( + "/commitment", + apiserver.MethodHandler(http.MethodGet, a.handleSearcherCommitment), + a.authSearcher, + ) + + server.ChainHandlers( + "/primev/v1/builder/blocks", + apiserver.MethodHandler(http.MethodPost, a.submitBlock), + ) + + server.ChainHandlers( + "/ws", + apiserver.MethodHandler(http.MethodGet, a.connectSearcher), + ) } // IDResponse is a simple struct for returning an ID @@ -39,8 +84,24 @@ type IDResponse struct { ID string `json:"id"` } +func (a *api) authenticateBuilder(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authToken := r.Header.Get("X-Builder-Token") + if authToken != a.builderToken { + a.logger.Error("failed to authenticate builder request") + err := apiserver.WriteResponse(w, http.StatusUnauthorized, "token invalid") + if err != nil { + a.logger.Error("error writing response", "err", err) + } + return + } + + next.ServeHTTP(w, r) + }) +} + // handleBuilderID returns the builder ID as an IDResponse -func (a *API) handleBuilderID(w http.ResponseWriter, r *http.Request) { +func (a *api) handleBuilderID(w http.ResponseWriter, r *http.Request) { logger := a.logger.With("method", "handleBuilderID") resp := IDResponse{ID: a.rollUp.GetBuilderAddress().Hex()} @@ -55,14 +116,43 @@ type CommitmentResponse struct { Commitment string `json:"commitment"` } +func (a *api) authSearcher(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + authToken := r.Header.Get("X-Primev-Signature") + builderAddress := a.rollUp.GetBuilderAddress() + searcherAddress, ok := utils.VerifyAuthenticationToken(authToken, builderAddress.Hex()) + if !ok { + a.logger.Error( + "error verifying authentication token", + "token", authToken, + "builderAddress", builderAddress.Hex(), + ) + err := apiserver.WriteResponse(w, http.StatusUnauthorized, "token is not valid") + if err != nil { + a.logger.Error("error writing response", "err", err) + } + return + } + + reqClone := r.Clone(context.WithValue(r.Context(), searcherKey{}, searcherAddress)) + + next.ServeHTTP(w, reqClone) + }) +} + // handleSearcherCommitment returns the searcher commitment as a CommitmentResponse -func (a *API) handleSearcherCommitment(w http.ResponseWriter, r *http.Request) { +func (a *api) handleSearcherCommitment(w http.ResponseWriter, r *http.Request) { logger := a.logger.With("method", "handleSearcherCommitment") searcherAddress, ok := r.Context().Value(searcherKey{}).(common.Address) if !ok { logger.Error("error getting searcher address from context") - err := apiserver.WriteResponse(w, http.StatusBadRequest, "searcher address not found") + // This should never happen + err := apiserver.WriteResponse( + w, + http.StatusInternalServerError, + "searcher address not found", + ) if err != nil { logger.Error("error writing response", "err", err) } @@ -84,7 +174,7 @@ func (a *API) handleSearcherCommitment(w http.ResponseWriter, r *http.Request) { // 1. The token is valid // 2. The searcher behind the token has active subscription // 3. The searcher behind the token is not already connected -func (a *API) connectSearcher(w http.ResponseWriter, r *http.Request) { +func (a *api) connectSearcher(w http.ResponseWriter, r *http.Request) { logger := a.logger.With("method", "connectSearcher") // Use verification scheme on token @@ -227,7 +317,7 @@ func (a *API) connectSearcher(w http.ResponseWriter, r *http.Request) { } // builder related handlers -func (a *API) submitBlock(w http.ResponseWriter, r *http.Request) { +func (a *api) submitBlock(w http.ResponseWriter, r *http.Request) { logger := a.logger.With("method", "submitBlock") br, err := apiserver.BindJSON[capella.SubmitBlockRequest](w, r) @@ -266,7 +356,7 @@ type healthCheck struct { // healthCheck detremines if the service is healthy // how many connections are open -func (a *API) handleHealthCheck(w http.ResponseWriter, r *http.Request) { +func (a *api) handleHealthCheck(w http.ResponseWriter, r *http.Request) { logger := a.logger.With("method", "handleHealthCheck") sInfo := a.sclient.GetSeacherInfo() diff --git a/pkg/builder/api/middleware.go b/pkg/builder/api/middleware.go deleted file mode 100644 index 4992225..0000000 --- a/pkg/builder/api/middleware.go +++ /dev/null @@ -1,36 +0,0 @@ -package api - -import ( - "net/http" - "time" - - "golang.org/x/exp/slog" -) - -type responseStatusRecorder struct { - http.ResponseWriter - status int - size int -} - -func (r *responseStatusRecorder) WriteHeader(status int) { - r.status = status - r.ResponseWriter.WriteHeader(status) -} - -func newAccessLogHandler(log *slog.Logger) func(http.Handler) http.Handler { - return func(h http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - recorder := &responseStatusRecorder{ResponseWriter: w} - - start := time.Now() - h.ServeHTTP(recorder, req) - log.Info("api access", - "status", recorder.status, - "method", req.Method, - "path", req.URL.Path, - "duration", time.Since(start), - ) - }) - } -} diff --git a/pkg/builder/node.go b/pkg/builder/node.go index 99e6206..c3bc0ff 100644 --- a/pkg/builder/node.go +++ b/pkg/builder/node.go @@ -30,5 +30,6 @@ func NewNode( ctx context.Context, opts Options, ) (*Node, error) { + return &Node{}, nil } diff --git a/pkg/builder/preconf/bid_worker_test.go b/pkg/builder/preconf/bid_worker_test.go index 024fff8..01115c8 100644 --- a/pkg/builder/preconf/bid_worker_test.go +++ b/pkg/builder/preconf/bid_worker_test.go @@ -7,7 +7,7 @@ import ( "os" "testing" - "github.com/primev/builder-boost/pkg/builder" + builder "github.com/primev/builder-boost/pkg/builder/preconf" "github.com/primev/builder-boost/pkg/preconf" "golang.org/x/exp/slog" ) diff --git a/pkg/builder/searcherclient/searcher_client.go b/pkg/builder/searcherclient/searcher_client.go index 8e597a8..9edf83f 100644 --- a/pkg/builder/searcherclient/searcher_client.go +++ b/pkg/builder/searcherclient/searcher_client.go @@ -16,10 +16,9 @@ import ( // SearcherInfo is a struct containing information about a searcher type SearcherInfo struct { - ID string `json:"id"` - Latency time.Duration `json:"latency"` - Validity time.Time `json:"validity"` - Heartbeat time.Time `json:"heartbeat"` + ID string `json:"id"` + Validity int64 `json:"validity"` + Heartbeat time.Time `json:"heartbeat"` } // SearcherClient is an interface for managing searcher connections @@ -136,7 +135,7 @@ func (s *searcherClient) SubmitBlock( blockMetadata.SearcherTxns = make(map[string][]string) } - for _, btxn := range payload.ExecutionPayload.Transactions { + for idx, btxn := range payload.ExecutionPayload.Transactions { var txn types.Transaction err := txn.UnmarshalBinary(btxn) if err != nil { @@ -144,10 +143,10 @@ func (s *searcherClient) SubmitBlock( continue } // Extract Min/Max - if txn.GasTipCap().Cmp(minTipTxn) < 0 { + if txn.GasTipCap().Cmp(minTipTxn) < 0 || idx == 0 { minTipTxn = txn.GasTipCap() } - if txn.GasTipCap().Cmp(maxTipTxn) > 0 { + if txn.GasTipCap().Cmp(maxTipTxn) > 0 || idx == 0 { maxTipTxn = txn.GasTipCap() } @@ -173,12 +172,22 @@ func (s *searcherClient) SubmitBlock( s.logger.Info("block metadata processed", "blockMetadata", blockMetadata) - s.mu.Lock() - defer s.mu.Unlock() - - for _, searcher := range s.searchers { - if _, ok := blockMetadata.SearcherTxns[searcher.ID()]; ok { - searcher.Send(ctx, blockMetadata) + if s.inclusionProofActive { + s.mu.Lock() + defer s.mu.Unlock() + + for _, searcher := range s.searchers { + if _, ok := blockMetadata.SearcherTxns[searcher.ID()]; ok { + err := searcher.Send(ctx, blockMetadata) + if err != nil { + s.logger.Error("failed to send block metadata to searcher", + "err", err, + "searcher", searcher.ID(), + ) + } + // this could only happen if the callback for disconnection is + // not called yet + } } } diff --git a/pkg/builder/searcherclient/searcher_client_test.go b/pkg/builder/searcherclient/searcher_client_test.go new file mode 100644 index 0000000..c6d47cd --- /dev/null +++ b/pkg/builder/searcherclient/searcher_client_test.go @@ -0,0 +1,213 @@ +package searcherclient_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/attestantio/go-builder-client/api/capella" + "github.com/primev/builder-boost/pkg/builder/searcherclient" +) + +type testSearcherC struct { + id string + payloads []searcherclient.SuperPayload + heartbeat time.Time + validity int64 + closed bool +} + +func (t *testSearcherC) ID() string { + return t.id +} + +func (t *testSearcherC) Send(_ context.Context, payload searcherclient.SuperPayload) error { + t.payloads = append(t.payloads, payload) + return nil +} + +func (t *testSearcherC) GetInfo() searcherclient.SearcherInfo { + return searcherclient.SearcherInfo{ + ID: t.id, + Heartbeat: t.heartbeat, + Validity: t.validity, + } +} + +func (t *testSearcherC) Close() error { + t.closed = true + return nil +} + +func TestSearcherClient(t *testing.T) { + t.Parallel() + + t.Run("new and close", func(t *testing.T) { + client := searcherclient.NewSearcherClient(newTestLogger(), false) + + err := client.Close() + if err != nil { + t.Fatal(err) + } + }) + + t.Run("add and remove searcher", func(t *testing.T) { + client := searcherclient.NewSearcherClient(newTestLogger(), false) + t.Cleanup(func() { + err := client.Close() + if err != nil { + t.Fatal(err) + } + }) + + srch := &testSearcherC{ + id: "test", + } + + client.AddSearcher(srch) + client.RemoveSearcher("test") + + if !srch.closed { + t.Fatal("expected searcher to be closed") + } + + info := client.GetSeacherInfo() + if len(info) != 0 { + t.Fatalf("expected info to be empty, got %v", info) + } + }) + + t.Run("get searcher info", func(t *testing.T) { + client := searcherclient.NewSearcherClient(newTestLogger(), false) + t.Cleanup(func() { + err := client.Close() + if err != nil { + t.Fatal(err) + } + }) + + srch := &testSearcherC{ + id: "test", + heartbeat: time.Now(), + validity: 100, + } + + client.AddSearcher(srch) + + info := client.GetSeacherInfo() + if len(info) != 1 { + t.Fatalf("expected info to have 1 entry, got %v", info) + } + + if info[0].ID != srch.id { + t.Fatalf("expected id to be %v, got %v", srch.id, info[0].ID) + } + + if info[0].Heartbeat != srch.heartbeat { + t.Fatalf("expected heartbeat to be %v, got %v", srch.heartbeat, info[0].Heartbeat) + } + + if info[0].Validity != srch.validity { + t.Fatalf("expected validity to be %v, got %v", srch.validity, info[0].Validity) + } + + if !client.IsConnected(srch.id) { + t.Fatalf("expected searcher to be connected") + } + }) + + t.Run("send payload", func(t *testing.T) { + client := searcherclient.NewSearcherClient(newTestLogger(), true) + t.Cleanup(func() { + err := client.Close() + if err != nil { + t.Fatal(err) + } + }) + + srch := &testSearcherC{ + id: "0xeBc71Ae61372b940c940cF51510dEAB056E1f24C", + } + + client.AddSearcher(srch) + + var payload capella.SubmitBlockRequest + + err := json.Unmarshal([]byte(jsonTxn), &payload) + if err != nil { + t.Fatal(err) + } + + err = client.SubmitBlock(context.Background(), &payload) + if err != nil { + t.Fatal(err) + } + + if len(srch.payloads) != 1 { + t.Fatalf("expected payloads to have 1 entry, got %v", srch.payloads) + } + + if len(srch.payloads[0].SearcherTxns) != 1 { + t.Fatalf( + "expected searcher txns to have 1 entry, got %v", + srch.payloads[0].SearcherTxns, + ) + } + + if srch.payloads[0].InternalMetadata.Transactions.Count != 1 { + t.Fatalf( + "expected internal metadata to have 1 entry, got %v", + srch.payloads[0].InternalMetadata.Transactions.Count, + ) + } + + if srch.payloads[0].InternalMetadata.Transactions.MaxPriorityFee != 25000000000 { + t.Fatalf( + "expected internal metadata to have max priority fee of 25000000000, got %v", + srch.payloads[0].InternalMetadata.Transactions.MaxPriorityFee, + ) + } + + if srch.payloads[0].InternalMetadata.Transactions.MinPriorityFee != 25000000000 { + t.Fatalf( + "expected internal metadata to have min priority fee of 25000000000, got %v", + srch.payloads[0].InternalMetadata.Transactions.MaxPriorityFee, + ) + } + }) +} + +var jsonTxn = `{ + "message": { + "slot": "2246842", + "parent_hash": "0x04509e89bb0a974344d0855af9c6e41f26f3198c9edb1aac21e5c11e04745ef4", + "block_hash": "0x2a4f3ec7cd9572a9b28aa7fb0a2c7799b90681b67f04cd64bd5e01023b81bc3b", + "builder_pubkey": "0xaa1488eae4b06a1fff840a2b6db167afc520758dc2c8af0dfb57037954df3431b747e2f900fe8805f05d635e9a29717b", + "proposer_pubkey": "0xa66d5b1cf24a38a598a45d16818d04e1c1331f8535591e7b9d3d13e390bfb466a0180098b4656131e087b72bf10be172", + "proposer_fee_recipient": "0xf24a01ae29dec4629dfb4170647c4ed4efc392cd", + "gas_limit": "30000000", + "gas_used": "1884333", + "value": "4429853700147000" + }, + "execution_payload": { + "parent_hash": "0x04509e89bb0a974344d0855af9c6e41f26f3198c9edb1aac21e5c11e04745ef4", + "fee_recipient": "0x812fc9524961d0566b3207fee1a567fef23e5e38", + "state_root": "0x1c70df2d36ef350d3cdacb781e0153d00c1f50d77c08ff4740b2185b741483c7", + "receipts_root": "0xafd17a85f9de5eefa2f7e38b44a2fc560ede4dbf81433d3dbbf7da6560e49bcc", + "logs_bloom": "0x0000020000100000000002040802000001012408000024000000000000000008000a00000004040000800000148011000008100000001000080000108020000000002008c00020000080000e600010004080100440802000000000000001000000698000060018020084200205100940000002900900000100000810000000004100040000000000000040084004800001000000003100000004000000000000020800000000000100014100400802002040000000000400000008000000400400000022200000004400000000004000200000480200000000000100000020001034000000002000002000002000000080000000080800001204000000106000", + "prev_randao": "0xa868ab4cfafc9107183622c8d955b293558cfbaa661fd1ae41403db3f3b35618", + "block_number": "3379258", + "gas_limit": "30000000", + "gas_used": "1884333", + "timestamp": "1682695704", + "extra_data": "0xd883010b05846765746888676f312e32302e33856c696e7578", + "base_fee_per_gas": "7", + "block_hash": "0x2a4f3ec7cd9572a9b28aa7fb0a2c7799b90681b67f04cd64bd5e01023b81bc3b", + "transactions": [ + "0x02f901b883aa36a782538c8505d21dba00850ba43b74008316e360944a32aa121683ba185e2a55a52a9881135ab91e9a80b90144979986110000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000400000000000000000000000092c55b159f45648957f32c8a017ac7d62b16e1f744cd496a1dc36bcd4a73510cb07ac0a0c9989c3dbdfa6f56b5d54307286956a900000000000000000000000092c55b159f45648957f32c8a017ac7d62b16e1f7ac8b9b632aec60ff00f2e8877aeb2f3fc22842ea5f11ccffdac7db0d60d6655500000000000000000000000092c55b159f45648957f32c8a017ac7d62b16e1f76109664ef022c6a16e05d84e1f07416863fa669258886f750beb9d930e3be22f00000000000000000000000092c55b159f45648957f32c8a017ac7d62b16e1f78ddb09b000ddbb3cc50d7b7ebd90b69eb87a340b4138f9ffcc2e5e1831bfa955c080a03024de7fe25e0c43fd0bbd649a5616de42734ac627fd1e67c9424f66a99aa809a004ecfed7c69b0c629e1e7b54c0892aa44589a7ee6f19ae58949ab48b11a1b9c7" + ], + "withdrawals": [] + }, + "signature": "0x890edd3019111f248105b150d41e31b2a526c01ba6d8695943394796e5825c34b693efdcb05e89b0ec544d08bae6463c098e5e3855b03b1a707398cccb2c7168c86233816f6a4a026fc0d0ee660121f0951d8a76dc3c9b3dcdd7a832e0f5c44d" + }` diff --git a/pkg/builder/searcherclient/searcher_conn.go b/pkg/builder/searcherclient/searcher_conn.go index bfeb498..c8aabef 100644 --- a/pkg/builder/searcherclient/searcher_conn.go +++ b/pkg/builder/searcherclient/searcher_conn.go @@ -6,14 +6,15 @@ import ( "errors" "io" "math/big" - "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" "golang.org/x/exp/slog" - "golang.org/x/sync/errgroup" ) +var wsTimeout = 10 * time.Second + // Searcher is the interface that wraps the lifecycle of a searcher connection. type Searcher interface { // ID returns the ID of the searcher. @@ -35,14 +36,8 @@ type RollUp interface { type DisconnectHandler func(reason string) var ( - // ErrSearcherDisconnected is returned when a searcher is disconnected. - ErrSearcherDisconnected = errors.New("searcher disconnected") - // ErrSearcherSubscriptionEnded is returned when a searcher's subscription has ended. - ErrSearcherSubscriptionEnded = errors.New("searcher subscription ended") // ErrQuit is returned when user issues a close. ErrQuit = errors.New("server quit") - // ErrIO is returned when there is an IO error. - ErrIO = errors.New("IO error") ) // NewSearcher creates a new searcher connection. @@ -58,15 +53,15 @@ func NewSearcher( wsConn: conn, searcherID: id, quit: make(chan struct{}), - inbox: make(chan wsMsg), outbox: make(chan wsMsg), logger: logger, rollup: rollup, subscriptionEnd: subscriptionEnd, - heartbeat: time.Now().Unix(), disconnectCB: disconnectCB, } + s.heartbeat.Store(time.Now().Unix()) + go s.startWorker() return s } @@ -80,137 +75,61 @@ type searcherConn struct { wsConn *websocket.Conn searcherID string quit chan struct{} - mu sync.Mutex - disconnected bool subscriptionEnd int64 logger *slog.Logger - heartbeat int64 - inbox chan wsMsg + heartbeat atomic.Int64 outbox chan wsMsg rollup RollUp disconnectCB DisconnectHandler } -func (s *searcherConn) isDisconnected() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.disconnected -} - -func (s *searcherConn) setDisconnected() { - s.mu.Lock() - defer s.mu.Unlock() - s.disconnected = true -} - func (s *searcherConn) isSubscriptionEnded() bool { - blkNo, _ := s.rollup.GetBlockNumber() + blkNo, err := s.rollup.GetBlockNumber() + if err != nil { + // if we can't get the block number, assume the server is not in a good state + // so we should disconnect + s.logger.Error("failed to get block number", "err", err) + return true + } return blkNo.Int64() > s.subscriptionEnd } -func (s *searcherConn) setHeartbeat() { - s.mu.Lock() - defer s.mu.Unlock() - s.heartbeat = time.Now().Unix() -} - func (s *searcherConn) startWorker() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - eg, egCtx := errgroup.WithContext(ctx) - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - // Start the reader - eg.Go(func() error { - for { - select { - case <-s.quit: - return ErrQuit - case <-egCtx.Done(): - return egCtx.Err() - default: + for { + select { + case <-s.quit: + s.disconnectCB("server closing connection") + _ = s.wsConn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage( + websocket.CloseNormalClosure, + "closing connection", + ), + ) + s.logger.Info("closing connection") + return + case payload := <-s.outbox: + if s.isSubscriptionEnded() { + s.disconnectCB("subscription ended") + _ = s.wsConn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage( + websocket.ClosePolicyViolation, + "subscription ended", + ), + ) + s.logger.Info("subscription ended") + return } - msgType, msg, err := s.wsConn.ReadMessage() + err := s.wsConn.WriteMessage(payload.msgType, payload.msg) if err != nil { - return errors.Join(ErrIO, err) - } - select { - case s.inbox <- wsMsg{msgType: msgType, msg: msg}: - case <-s.quit: - return ErrQuit - } - } - }) - - eg.Go(func() error { - for { - select { - case <-s.quit: - return ErrQuit - case msg := <-s.inbox: - switch msg.msgType { - case websocket.CloseMessage: - return ErrSearcherDisconnected - case websocket.PingMessage: - err := s.wsConn.WriteMessage(websocket.PongMessage, []byte("pong")) - if err != nil { - s.logger.Error("failed to send pong", "err", err) - return errors.Join(ErrIO, err) - } - case websocket.PongMessage: - default: - s.logger.Error("unknown message type", "msgType", msg.msgType) - // non-fatal error - } - s.setHeartbeat() - case payload := <-s.outbox: - err := s.wsConn.WriteMessage(payload.msgType, payload.msg) - if err != nil { - s.logger.Error("failed to send payload", "err", err) - return errors.Join(ErrIO, err) - } - s.setHeartbeat() - case <-ticker.C: - if s.isSubscriptionEnded() { - err := s.wsConn.WriteMessage( - websocket.CloseMessage, - websocket.FormatCloseMessage( - websocket.ClosePolicyViolation, - "subscription ended", - ), - ) - return errors.Join(ErrSearcherSubscriptionEnded, err) - } - - if time.Now().Unix()-s.heartbeat > 10 { - err := s.wsConn.WriteMessage(websocket.PingMessage, []byte("ping")) - if err != nil { - s.logger.Error("failed to send ping", "err", err) - return errors.Join(ErrIO, err) - } - } + s.logger.Error("failed to send payload", "err", err) + s.disconnectCB("failed to send payload") + return } + s.logger.Info("sent payload") + s.heartbeat.Store(time.Now().Unix()) } - }) - - switch err := eg.Wait(); { - case err == nil: - // this should never happen - s.logger.Error("worker exited with no error") - case errors.Is(err, ErrQuit): - s.logger.Info("worker stopped", "err", err) - case errors.Is(err, ErrSearcherDisconnected): - s.logger.Error("searcher disconnected", "err", err) - s.disconnectCB(err.Error()) - case errors.Is(err, ErrSearcherSubscriptionEnded): - s.logger.Error("searcher subscription ended", "err", err) - s.disconnectCB(err.Error()) - default: - s.logger.Error("could not reach searcher", "err", err) - s.disconnectCB(err.Error()) } } @@ -221,16 +140,14 @@ func (s *searcherConn) ID() string { func (s *searcherConn) GetInfo() SearcherInfo { return SearcherInfo{ ID: s.searcherID, - Heartbeat: time.Unix(s.heartbeat, 0), + Heartbeat: time.Unix(s.heartbeat.Load(), 0), + Validity: s.subscriptionEnd, } } func (s *searcherConn) Close() error { close(s.quit) - return s.wsConn.WriteMessage( - websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, "closing connection"), - ) + return nil } func (s *searcherConn) Send(ctx context.Context, payload SuperPayload) error { diff --git a/pkg/builder/searcherclient/searcher_conn_test.go b/pkg/builder/searcherclient/searcher_conn_test.go new file mode 100644 index 0000000..51b5a3f --- /dev/null +++ b/pkg/builder/searcherclient/searcher_conn_test.go @@ -0,0 +1,346 @@ +package searcherclient_test + +import ( + "context" + "encoding/json" + "io" + "math/big" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/primev/builder-boost/pkg/builder/searcherclient" + "golang.org/x/exp/slog" +) + +var upgrader = websocket.Upgrader{} + +type wsMsg struct { + msgType int + msg []byte +} + +type testSearcher struct { + inbox chan wsMsg +} + +func (t *testSearcher) handler(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + + c.SetCloseHandler(func(code int, text string) error { + t.inbox <- wsMsg{msgType: websocket.CloseMessage, msg: []byte(text)} + return nil + }) + + for { + mt, message, err := c.ReadMessage() + if err != nil { + break + } + t.inbox <- wsMsg{msgType: mt, msg: message} + } +} + +func newTestServer(t *testing.T) (*testSearcher, *websocket.Conn) { + s := &testSearcher{ + inbox: make(chan wsMsg), + } + + srv := httptest.NewServer(http.HandlerFunc(s.handler)) + + t.Cleanup(func() { + srv.Close() + }) + + // Convert http://127.0.0.1 to ws://127.0.0.1 + u := "ws" + strings.TrimPrefix(srv.URL, "http") + + // Connect to the server + ws, _, err := websocket.DefaultDialer.Dial(u, nil) + if err != nil { + t.Fatalf("%v", err) + } + t.Cleanup(func() { + _ = ws.Close() + }) + + return s, ws +} + +type testRollup struct { + blockno int64 +} + +func (r *testRollup) GetBlockNumber() (*big.Int, error) { + return big.NewInt(r.blockno), nil +} + +func newTestLogger() *slog.Logger { + testLogger := slog.NewTextHandler(io.Discard, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }) + return slog.New(testLogger) +} + +func TestSearcherConn(t *testing.T) { + t.Parallel() + + t.Run("new and close", func(t *testing.T) { + s, ws := newTestServer(t) + rollUp := &testRollup{blockno: 0} + + var disconnectReason string + + // Create a new searcher + srch := searcherclient.NewSearcher( + "test", + ws, + rollUp, + newTestLogger(), + 100, + func(reason string) { + disconnectReason = reason + }, + ) + + // Close the searcher + err := srch.Close() + if err != nil { + t.Fatal(err) + } + + msg := <-s.inbox + if msg.msgType != websocket.CloseMessage { + t.Fatalf("expected close message, got %v", msg.msgType) + } + if disconnectReason != "server closing connection" { + t.Fatalf( + "expected disconnect reason to be 'server closing connection', got %v", + disconnectReason, + ) + } + }) + + t.Run("get searcher info after init", func(t *testing.T) { + _, ws := newTestServer(t) + rollUp := &testRollup{blockno: 0} + + // Create a new searcher + srch := searcherclient.NewSearcher( + "test", + ws, + rollUp, + newTestLogger(), + 100, + func(reason string) {}, + ) + + t.Cleanup(func() { + err := srch.Close() + if err != nil { + t.Errorf("failed to close searcher: %v", err) + } + }) + + info := srch.GetInfo() + if srch.ID() != info.ID { + t.Fatalf("expected id to be %v, got %v", srch.ID(), info.ID) + } + if info.Heartbeat.Unix() == 0 { + t.Fatalf("expected heartbeat to be non-zero") + } + if info.Validity != 100 { + t.Fatalf("expected validity to be 100, got %v", info.Validity) + } + }) + + t.Run("send payload", func(t *testing.T) { + s, ws := newTestServer(t) + rollUp := &testRollup{blockno: 0} + + // Create a new searcher + srch := searcherclient.NewSearcher( + "test", + ws, + rollUp, + newTestLogger(), + 100, + func(reason string) {}, + ) + + t.Cleanup(func() { + err := srch.Close() + if err != nil { + t.Errorf("failed to close searcher: %v", err) + } + }) + + info := srch.GetInfo() + time.Sleep(1 * time.Second) + + err := srch.Send(context.Background(), searcherclient.SuperPayload{ + InternalMetadata: searcherclient.Metadata{ + Builder: "test", + Number: 0, + BlockHash: "test", + }, + SearcherTxns: map[string][]string{ + "test": {"test"}, + }, + }) + if err != nil { + t.Fatalf("failed to send payload: %v", err) + } + + msg := <-s.inbox + if msg.msgType != websocket.TextMessage { + t.Fatalf("expected text message, got %v", msg.msgType) + } + + var payload searcherclient.SuperPayload + err = json.Unmarshal(msg.msg, &payload) + if err != nil { + t.Fatalf("failed to unmarshal payload: %v", err) + } + + if payload.InternalMetadata.Builder != "test" { + t.Fatalf("expected builder to be 'test', got %v", payload.InternalMetadata.Builder) + } + + if payload.InternalMetadata.Number != 0 { + t.Fatalf("expected number to be 0, got %v", payload.InternalMetadata.Number) + } + + if payload.InternalMetadata.BlockHash != "test" { + t.Fatalf("expected block hash to be 'test', got %v", payload.InternalMetadata.BlockHash) + } + + if len(payload.SearcherTxns) != 1 { + t.Fatalf("expected 1 searcher txns, got %v", len(payload.SearcherTxns)) + } + + if len(payload.SearcherTxns["test"]) != 1 { + t.Fatalf( + "expected 1 searcher txns for 'test', got %v", + len(payload.SearcherTxns["test"]), + ) + } + + if payload.SearcherTxns["test"][0] != "test" { + t.Fatalf("expected searcher txn to be 'test', got %v", payload.SearcherTxns["test"][0]) + } + + newInfo := srch.GetInfo() + if !newInfo.Heartbeat.After(info.Heartbeat) { + t.Fatalf("expected id to be %v, got %v", info, newInfo) + } + }) + + t.Run("subscription ended", func(t *testing.T) { + s, ws := newTestServer(t) + rollUp := &testRollup{blockno: 101} + disconnectReason := "" + + // Create a new searcher + srch := searcherclient.NewSearcher( + "test", + ws, + rollUp, + newTestLogger(), + 100, + func(reason string) { disconnectReason = reason }, + ) + + t.Cleanup(func() { + err := srch.Close() + if err != nil { + t.Errorf("failed to close searcher: %v", err) + } + }) + + err := srch.Send(context.Background(), searcherclient.SuperPayload{ + InternalMetadata: searcherclient.Metadata{ + Builder: "test", + Number: 0, + BlockHash: "test", + }, + SearcherTxns: map[string][]string{ + "test": {"test"}, + }, + }) + if err != nil { + t.Fatalf("failed to send payload: %v", err) + } + + msg := <-s.inbox + if msg.msgType != websocket.CloseMessage { + t.Fatalf("expected Close message, got %v", msg.msgType) + } + + if disconnectReason != "subscription ended" { + t.Fatalf( + "expected disconnect reason to be 'subscription ended', got %v", + disconnectReason, + ) + } + }) + + t.Run("client closed connection", func(t *testing.T) { + _, ws := newTestServer(t) + rollUp := &testRollup{blockno: 0} + disconnectReason := "" + closed := make(chan struct{}) + + // Create a new searcher + srch := searcherclient.NewSearcher( + "test", + ws, + rollUp, + newTestLogger(), + 100, + func(reason string) { disconnectReason = reason; close(closed) }, + ) + + t.Cleanup(func() { + err := srch.Close() + if err != nil { + t.Errorf("failed to close searcher: %v", err) + } + }) + + err := ws.Close() + if err != nil { + t.Fatalf("failed to close websocket: %v", err) + } + + err = srch.Send(context.Background(), searcherclient.SuperPayload{ + InternalMetadata: searcherclient.Metadata{ + Builder: "test", + Number: 0, + BlockHash: "test", + }, + SearcherTxns: map[string][]string{ + "test": {"test"}, + }, + }) + if err != nil { + t.Fatalf("failed to send payload: %v", err) + } + + <-closed + + if disconnectReason != "failed to send payload" { + t.Fatalf( + "expected disconnect reason to be 'failed to send', got %v", + disconnectReason, + ) + } + }) +} From bf1ab4d68b657de5f2c7b6afbb17c2994fcfbd92 Mon Sep 17 00:00:00 2001 From: alok Date: Thu, 14 Sep 2023 23:58:35 +0530 Subject: [PATCH 3/4] chore: refactor builder workflows --- pkg/builder/preconf/bid_worker.go | 2 +- pkg/builder/preconf/bid_worker_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/builder/preconf/bid_worker.go b/pkg/builder/preconf/bid_worker.go index e8a5e13..6e35665 100644 --- a/pkg/builder/preconf/bid_worker.go +++ b/pkg/builder/preconf/bid_worker.go @@ -1,4 +1,4 @@ -package builder +package preconf import ( "crypto/ecdsa" diff --git a/pkg/builder/preconf/bid_worker_test.go b/pkg/builder/preconf/bid_worker_test.go index 01115c8..eea30df 100644 --- a/pkg/builder/preconf/bid_worker_test.go +++ b/pkg/builder/preconf/bid_worker_test.go @@ -1,4 +1,4 @@ -package builder_test +package preconf_test import ( "crypto/ecdsa" From 2572d707289ae46fb0b6da0e7f147463f164ac6e Mon Sep 17 00:00:00 2001 From: alok Date: Fri, 15 Sep 2023 20:26:36 +0530 Subject: [PATCH 4/4] chore: refactor builder workflows --- pkg/apiserver/api.go | 29 ++++-- pkg/apiserver/api_test.go | 141 +++++++++++++++++++++++++++++ pkg/apiserver/helpers.go | 7 +- pkg/apiserver/helpers_test.go | 164 ++++++++++++++++++++++++++++++++++ pkg/builder/api/api.go | 4 - pkg/builder/node.go | 49 +++++++++- 6 files changed, 380 insertions(+), 14 deletions(-) create mode 100644 pkg/apiserver/api_test.go create mode 100644 pkg/apiserver/helpers_test.go diff --git a/pkg/apiserver/api.go b/pkg/apiserver/api.go index 92cbd1f..9e39de7 100644 --- a/pkg/apiserver/api.go +++ b/pkg/apiserver/api.go @@ -17,16 +17,27 @@ const ( type searcherKey struct{} +// Service wraps http.Server with additional functionality for metrics and +// other common middlewares. type Service struct { - *http.Server - metricsRegistry *prometheus.Registry router *http.ServeMux logger *slog.Logger } -func New() *Service { - return &Service{} +// New creates a new Service. +func New( + version string, + logger *slog.Logger, +) *Service { + srv := &Service{ + router: http.NewServeMux(), + logger: logger, + metricsRegistry: newMetrics(version), + } + + srv.registerDebugEndpoints() + return srv } func (a *Service) registerDebugEndpoints() { @@ -73,19 +84,25 @@ func newMetrics(version string) (r *prometheus.Registry) { return r } +// Router returns the router. +func (a *Service) Router() http.Handler { + return newAccessLogHandler(a.logger)(a.router) +} + +// ChainHandlers chains middlewares and handler. func (a *Service) ChainHandlers( path string, handler http.Handler, mws ...func(http.Handler) http.Handler, ) { h := handler - for i := len(mws) - 1; i > 0; i-- { + for i := len(mws) - 1; i >= 0; i-- { h = mws[i](h) } a.router.Handle(path, h) } +// RegisterMetricsCollectors registers prometheus collectors. func (a *Service) RegisterMetricsCollectors(cs ...prometheus.Collector) { a.metricsRegistry.MustRegister(cs...) } - diff --git a/pkg/apiserver/api_test.go b/pkg/apiserver/api_test.go new file mode 100644 index 0000000..1ccf946 --- /dev/null +++ b/pkg/apiserver/api_test.go @@ -0,0 +1,141 @@ +package apiserver_test + +import ( + "bytes" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/primev/builder-boost/pkg/apiserver" + "golang.org/x/exp/slog" +) + +func newTestLogger(w io.Writer) *slog.Logger { + testLogger := slog.NewTextHandler(w, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }) + return slog.New(testLogger) +} + +func TestAPIServer(t *testing.T) { + t.Parallel() + + t.Run("new and close", func(t *testing.T) { + var logBuf bytes.Buffer + s := apiserver.New( + "test", + newTestLogger(&logBuf), + ) + + srv := httptest.NewServer(s.Router()) + t.Cleanup(func() { + srv.Close() + }) + + r, err := http.NewRequest("GET", srv.URL+"/metrics", nil) + if err != nil { + t.Fatal(err) + } + + resp, err := http.DefaultClient.Do(r) + if err != nil { + t.Fatal(err) + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + var b bytes.Buffer + n, err := b.ReadFrom(resp.Body) + if err != nil { + t.Fatal(err) + } + + if n == 0 { + t.Fatal("expected non-zero body") + } + + if !strings.Contains(b.String(), "test") { + t.Fatalf("expected body to contain 'test', got %q", b.String()) + } + + if !strings.Contains(b.String(), "go_info") { + t.Fatalf("expected body to contain 'go_info', got %q", b.String()) + } + + if !strings.Contains(b.String(), "go_memstats") { + t.Fatalf("expected body to contain 'go_memstats', got %q", b.String()) + } + + if !strings.Contains(b.String(), "go_gc_duration_seconds") { + t.Fatalf("expected body to contain 'go_gc_duration_seconds', got %q", b.String()) + } + + if !strings.Contains(logBuf.String(), "api access") { + t.Fatalf("expected log to contain 'api access', got %q", logBuf.String()) + } + }) + + t.Run("chain handlers", func(t *testing.T) { + var logBuf bytes.Buffer + s := apiserver.New( + "test", + newTestLogger(&logBuf), + ) + + srv := httptest.NewServer(s.Router()) + t.Cleanup(func() { + srv.Close() + }) + + var orderedHandlerActions []int + s.ChainHandlers( + "/chain", + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + orderedHandlerActions = append(orderedHandlerActions, 3) + w.WriteHeader(http.StatusOK) + }), + func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + orderedHandlerActions = append(orderedHandlerActions, 1) + next.ServeHTTP(w, r) + }) + }, + func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + orderedHandlerActions = append(orderedHandlerActions, 2) + next.ServeHTTP(w, r) + }) + }, + ) + + r, err := http.NewRequest("GET", srv.URL+"/chain", nil) + if err != nil { + t.Fatal(err) + } + + resp, err := http.DefaultClient.Do(r) + if err != nil { + t.Fatal(err) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + if len(orderedHandlerActions) != 3 { + t.Fatalf("expected 3 handler actions, got %d", len(orderedHandlerActions)) + } + + for i, v := range []int{1, 2, 3} { + if orderedHandlerActions[i] != v { + t.Fatalf("expected handler action %d, got %d", v, orderedHandlerActions[i]) + } + } + }) +} diff --git a/pkg/apiserver/helpers.go b/pkg/apiserver/helpers.go index 610881e..748d802 100644 --- a/pkg/apiserver/helpers.go +++ b/pkg/apiserver/helpers.go @@ -8,19 +8,20 @@ import ( "net/http" ) -type statusResponse struct { +// StatusResponse is a helper struct used to wrap a string message with a status code. +type StatusResponse struct { Code int `json:"code"` Message string `json:"message"` } // WriteResponse helper is used to write a response to the client with the given code -// and message. If the message is a string, it will be wrapped in a statusResponse +// and message. If the message is a string, it will be wrapped in a StatusResponse // struct. Otherwise, the message will be encoded as JSON. func WriteResponse(w http.ResponseWriter, code int, message any) error { var b bytes.Buffer switch message.(type) { case string: - err := json.NewEncoder(&b).Encode(statusResponse{Code: code, Message: message.(string)}) + err := json.NewEncoder(&b).Encode(StatusResponse{Code: code, Message: message.(string)}) if err != nil { w.WriteHeader(http.StatusInternalServerError) return fmt.Errorf("failed to encode status response: %w", err) diff --git a/pkg/apiserver/helpers_test.go b/pkg/apiserver/helpers_test.go new file mode 100644 index 0000000..1869ea6 --- /dev/null +++ b/pkg/apiserver/helpers_test.go @@ -0,0 +1,164 @@ +package apiserver_test + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/primev/builder-boost/pkg/apiserver" +) + +type testHandler struct { + called bool +} + +func (h *testHandler) Handle(_ http.ResponseWriter, _ *http.Request) { + h.called = true +} + +func TestMethodHandler(t *testing.T) { + t.Parallel() + + t.Run("method not allowed", func(t *testing.T) { + h := &testHandler{} + mh := apiserver.MethodHandler("GET", http.HandlerFunc(h.Handle)) + + r, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + + w := httptest.NewRecorder() + mh.ServeHTTP(w, r) + + if w.Code != http.StatusMethodNotAllowed { + t.Fatalf("expected status code %d, got %d", http.StatusMethodNotAllowed, w.Code) + } + + if h.called { + t.Fatal("handler should not have been called") + } + }) + + t.Run("method allowed", func(t *testing.T) { + h := &testHandler{} + mh := apiserver.MethodHandler("GET", http.HandlerFunc(h.Handle)) + + r, err := http.NewRequest("GET", "/", nil) + if err != nil { + t.Fatal(err) + } + + w := httptest.NewRecorder() + mh.ServeHTTP(w, r) + + if !h.called { + t.Fatal("handler should have been called") + } + }) +} + +func TestBindJSON(t *testing.T) { + t.Parallel() + + t.Run("bad request", func(t *testing.T) { + type v struct { + Foo string `json:"foo"` + } + + r, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + w := httptest.NewRecorder() + + if _, err := apiserver.BindJSON[v](w, r); err == nil { + t.Fatal("expected error") + } + }) + + t.Run("ok", func(t *testing.T) { + type v struct { + Foo string `json:"foo"` + } + + b := bytes.NewBuffer([]byte(`{"foo":"bar"}`)) + + r, err := http.NewRequest("POST", "/", b) + if err != nil { + t.Fatal(err) + } + + w := httptest.NewRecorder() + + vv, err := apiserver.BindJSON[v](w, r) + if err != nil { + t.Fatal(err) + } + + if vv.Foo != "bar" { + t.Fatalf("expected foo to be %q, got %q", "bar", vv.Foo) + } + }) +} + +func TestWriteResponse(t *testing.T) { + t.Parallel() + + t.Run("string", func(t *testing.T) { + w := httptest.NewRecorder() + + if err := apiserver.WriteResponse(w, http.StatusOK, "foo"); err != nil { + t.Fatal(err) + } + + if w.Code != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, w.Code) + } + + resp := apiserver.StatusResponse{ + Code: http.StatusOK, + Message: "foo", + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(resp); err != nil { + t.Fatal(err) + } + buf.WriteByte('\n') + + if bytes.Compare(w.Body.Bytes(), buf.Bytes()) != 0 { + t.Fatalf("expected body %q, got %q", buf.String(), w.Body.String()) + } + }) + + t.Run("struct", func(t *testing.T) { + type v struct { + Foo string `json:"foo"` + } + + rq := v{Foo: "bar"} + + w := httptest.NewRecorder() + + if err := apiserver.WriteResponse(w, http.StatusOK, rq); err != nil { + t.Fatal(err) + } + + if w.Code != http.StatusOK { + t.Fatalf("expected status code %d, got %d", http.StatusOK, w.Code) + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(rq); err != nil { + t.Fatal(err) + } + buf.WriteByte('\n') + + if bytes.Compare(w.Body.Bytes(), buf.Bytes()) != 0 { + t.Fatalf("expected body %q, got %q", buf.String(), w.Body.String()) + } + }) +} diff --git a/pkg/builder/api/api.go b/pkg/builder/api/api.go index 08de4ae..b4629dd 100644 --- a/pkg/builder/api/api.go +++ b/pkg/builder/api/api.go @@ -339,10 +339,6 @@ func (a *api) submitBlock(w http.ResponseWriter, r *http.Request) { return } - // if a.MetricsEnabled { - // a.metrics.Duration.WithLabelValues("algo_processing", "N/A").Observe(time.Since(now).Seconds()) - // a.metrics.PayloadsRecieved.Inc() - // } err = apiserver.WriteResponse(w, http.StatusOK, "block submitted") if err != nil { logger.Error("error writing response", "err", err) diff --git a/pkg/builder/node.go b/pkg/builder/node.go index c3bc0ff..b66d2cd 100644 --- a/pkg/builder/node.go +++ b/pkg/builder/node.go @@ -2,13 +2,23 @@ package builder import ( "context" + "errors" "io" + "net/http" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/primev/builder-boost/pkg/apiserver" + "github.com/primev/builder-boost/pkg/builder/api" + "github.com/primev/builder-boost/pkg/builder/searcherclient" + "github.com/primev/builder-boost/pkg/rollup" "golang.org/x/exp/slog" ) // Options are the options for the builder node type Options struct { + Version string Logger *slog.Logger ServerAddr string TracingEndpoint string @@ -23,6 +33,7 @@ type Options struct { } type Node struct { + server *http.Server io.Closer } @@ -30,6 +41,42 @@ func NewNode( ctx context.Context, opts Options, ) (*Node, error) { + rollupKeyBytes := common.FromHex(opts.RollupKey) + rollupKey := crypto.ToECDSAUnsafe(rollupKeyBytes) - return &Node{}, nil + rollupClient, err := ethclient.Dial(opts.RollupAddr) + if err != nil { + return nil, err + } + + rollupContract := common.HexToAddress(opts.RollupContract) + + ru, err := rollup.New(rollupClient, rollupContract, rollupKey, nil) + if err != nil { + return nil, err + } + + sclient := searcherclient.NewSearcherClient(opts.Logger, opts.InclusionListEnabled) + + srv := apiserver.New(opts.Version, opts.Logger) + api.RegisterAPI(opts.BuilderToken, srv, ru, opts.Logger, sclient) + + server := &http.Server{ + Addr: opts.ServerAddr, + Handler: srv.Router(), + } + + go func() { + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + opts.Logger.Error("failed to start server", "err", err) + } + }() + + return &Node{ + server: server, + }, nil +} + +func (n *Node) Close() error { + return n.server.Close() }