diff --git a/tools/preconf-rpc/explorer-submitter/submitter.go b/tools/preconf-rpc/explorer-submitter/submitter.go new file mode 100644 index 000000000..f915c7b5f --- /dev/null +++ b/tools/preconf-rpc/explorer-submitter/submitter.go @@ -0,0 +1,172 @@ +package explorersubmitter + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "golang.org/x/sync/errgroup" +) + +type explorerSubmitter struct { + endpoint string + apiKey string + appCode string + client *http.Client + reqChan chan submitRequest + logger *slog.Logger +} + +type submitRequest struct { + tx *types.Transaction + from common.Address +} + +func New(endpoint, apiKey, appCode string, logger *slog.Logger) *explorerSubmitter { + return &explorerSubmitter{ + endpoint: endpoint, + apiKey: apiKey, + appCode: appCode, + client: &http.Client{ + Timeout: 15 * time.Second, + }, + reqChan: make(chan submitRequest, 100), + logger: logger, + } +} + +func (e *explorerSubmitter) Start(ctx context.Context) <-chan struct{} { + done := make(chan struct{}) + eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case req := <-e.reqChan: + if err := e.doSubmit(egCtx, req); err != nil { + e.logger.Error("failed to submit to explorer", "error", err) + } + } + } + }) + + go func() { + defer close(done) + if err := eg.Wait(); err != nil { + if errors.Is(err, context.Canceled) { + e.logger.Info("Explorer submitter stopped") + } else { + e.logger.Error("Explorer submitter exited with error", "error", err) + } + } + }() + + return done +} + +func (e *explorerSubmitter) Submit(ctx context.Context, tx *types.Transaction, from common.Address) error { + select { + case e.reqChan <- submitRequest{tx: tx, from: from}: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + // Drop request if channel is full to prevent blocking + e.logger.Warn("request channel full, dropping request", "txHash", tx.Hash().Hex()) + return nil + } +} + +type TxData struct { + ChainID string `json:"df_chainid"` + AppCode string `json:"df_appcode"` + TxHash string `json:"df_txhash"` + ExpireTs int64 `json:"df_expire_ts"` + TxInfo TxInfo `json:"df_txinfo"` +} + +type TxInfo struct { + From string `json:"from"` + To string `json:"to"` +} + +func (e *explorerSubmitter) doSubmit(ctx context.Context, req submitRequest) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic in doSubmit: %v", r) + } + }() + + if e.endpoint == "" { + return nil + } + + chainID := "1" + expireTs := time.Now().Add(15 * time.Minute).Unix() + + txHash := req.tx.Hash().Hex() + from := req.from.Hex() + to := "" + if req.tx.To() != nil { + to = req.tx.To().Hex() + } + + txData := TxData{ + ChainID: chainID, + AppCode: e.appCode, + TxHash: txHash, + ExpireTs: expireTs, + TxInfo: TxInfo{ + From: from, + To: to, + }, + } + + jsonData, err := json.Marshal(txData) + if err != nil { + return fmt.Errorf("failed to marshal json data: %w", err) + } + + params := url.Values{} + params.Add("apikey", e.apiKey) + params.Add("action", "submitTxPending") + params.Add("JsonData", string(jsonData)) + + reqURL, err := url.Parse(e.endpoint) + if err != nil { + return fmt.Errorf("failed to parse endpoint url: %w", err) + } + + reqURL.RawQuery = params.Encode() + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL.String(), nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + resp, err := e.client.Do(httpReq) + if err != nil { + return fmt.Errorf("failed to send request to explorer endpoint: %v", err.(*url.Error).Err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("received non-ok status code: %d, body: %s", resp.StatusCode, string(bodyBytes)) + } + + e.logger.Debug("Successfully submitted tx to explorer", "hash", txHash) + return nil +} diff --git a/tools/preconf-rpc/explorer-submitter/submitter_test.go b/tools/preconf-rpc/explorer-submitter/submitter_test.go new file mode 100644 index 000000000..736e80ccf --- /dev/null +++ b/tools/preconf-rpc/explorer-submitter/submitter_test.go @@ -0,0 +1,85 @@ +package explorersubmitter + +import ( + "context" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/primev/mev-commit/x/util" + "github.com/stretchr/testify/require" +) + +func TestSubmit(t *testing.T) { + reqChan := make(chan *http.Request, 1) + + srv := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + if q.Get("apikey") == "" { + http.Error(w, "missing apikey", http.StatusBadRequest) + return + } + if q.Get("action") != "submitTxPending" { + http.Error(w, "wrong action", http.StatusBadRequest) + return + } + + jsonData := q.Get("JsonData") + if jsonData == "" { + http.Error(w, "missing JsonData", http.StatusBadRequest) + return + } + + var txData TxData + if err := json.Unmarshal([]byte(jsonData), &txData); err != nil { + http.Error(w, "invalid json data", http.StatusBadRequest) + return + } + + if txData.ChainID != "1" { + http.Error(w, "wrong chain id", http.StatusBadRequest) + return + } + + reqChan <- r + w.WriteHeader(http.StatusOK) + }), + ) + defer srv.Close() + + endpoint := srv.URL + apiKey := "test-api-key" + appCode := "test-app-code" + + logger := util.NewTestLogger(os.Stdout) + submitter := New(endpoint, apiKey, appCode, logger) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := submitter.Start(ctx) + + tx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + err := submitter.Submit(ctx, tx, common.Address{}) + require.NoError(t, err) + + select { + case <-reqChan: + // Request received by server. + // Give a moment for the server to reply and the client to process the 200 OK + // preventing "context canceled" error in logs. + time.Sleep(50 * time.Millisecond) + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for request") + } + + cancel() + <-done +} diff --git a/tools/preconf-rpc/main.go b/tools/preconf-rpc/main.go index 6e07b7a81..789573b5c 100644 --- a/tools/preconf-rpc/main.go +++ b/tools/preconf-rpc/main.go @@ -296,6 +296,24 @@ var ( return nil }, } + + optionExplorerEndpoint = &cli.StringFlag{ + Name: "explorer-endpoint", + Usage: "Explorer API endpoint for submitting transactions", + EnvVars: []string{"PRECONF_RPC_EXPLORER_API_ENDPOINT"}, + } + + optionExplorerApiKey = &cli.StringFlag{ + Name: "explorer-apikey", + Usage: "Explorer API Key", + EnvVars: []string{"PRECONF_RPC_EXPLORER_API_KEY"}, + } + + optionExplorerAppCode = &cli.StringFlag{ + Name: "explorer-appcode", + Usage: "Explorer App Code", + EnvVars: []string{"PRECONF_RPC_EXPLORER_APPCODE"}, + } ) func main() { @@ -337,6 +355,9 @@ func main() { optionBackrunnerAPIURL, optionBackrunnerRPCURL, optionBackrunnerAPIKey, + optionExplorerEndpoint, + optionExplorerApiKey, + optionExplorerAppCode, optionPointsAPIURL, optionPointsAPIKey, }, @@ -429,6 +450,9 @@ func main() { BackrunnerAPIURL: c.String(optionBackrunnerAPIURL.Name), BackrunnerRPC: c.String(optionBackrunnerRPCURL.Name), BackrunnerAPIKey: c.String(optionBackrunnerAPIKey.Name), + ExplorerEndpoint: c.String(optionExplorerEndpoint.Name), + ExplorerApiKey: c.String(optionExplorerApiKey.Name), + ExplorerAppCode: c.String(optionExplorerAppCode.Name), PointsAPIURL: c.String(optionPointsAPIURL.Name), PointsAPIKey: c.String(optionPointsAPIKey.Name), } diff --git a/tools/preconf-rpc/sender/sender.go b/tools/preconf-rpc/sender/sender.go index 5d3eac0f8..36f0e60e5 100644 --- a/tools/preconf-rpc/sender/sender.go +++ b/tools/preconf-rpc/sender/sender.go @@ -164,6 +164,10 @@ type Backrunner interface { Backrun(ctx context.Context, rawTx string, commitments []*bidderapiv1.Commitment) error } +type ExplorerSubmitter interface { + Submit(ctx context.Context, tx *types.Transaction, from common.Address) error +} + type TxSender struct { logger *slog.Logger store Store @@ -191,6 +195,7 @@ type TxSender struct { receiptSignal map[common.Hash][]chan struct{} receiptMtx sync.Mutex metrics *metrics + explorerSubmitter ExplorerSubmitter } func noOpFastTrack(_ []*bidderapiv1.Commitment, _ bool) bool { @@ -207,6 +212,7 @@ func NewTxSender( simulator Simulator, backrunner Backrunner, settlementChainId *big.Int, + explorerSubmitter ExplorerSubmitter, logger *slog.Logger, ) (*TxSender, error) { txnAttemptHistory, err := lru.New[common.Hash, *txnAttempt](1000) @@ -242,6 +248,7 @@ func NewTxSender( bidTimeout: bidTimeout, receiptSignal: make(map[common.Hash][]chan struct{}), metrics: newMetrics(), + explorerSubmitter: explorerSubmitter, }, nil } @@ -333,6 +340,14 @@ func (t *TxSender) Enqueue(ctx context.Context, tx *Transaction) error { t.triggerSender() + if err := t.explorerSubmitter.Submit( + context.Background(), + tx.Transaction, + tx.Sender, + ); err != nil { + t.logger.Error("Failed to submit tx to explorer", "error", err) + } + return nil } diff --git a/tools/preconf-rpc/sender/sender_test.go b/tools/preconf-rpc/sender/sender_test.go index faeb47179..627bbfa2f 100644 --- a/tools/preconf-rpc/sender/sender_test.go +++ b/tools/preconf-rpc/sender/sender_test.go @@ -300,6 +300,12 @@ func (m *mockBackrunner) Backrun(ctx context.Context, rawTx string, commitments return nil } +type MockExplorerSubmitter struct{} + +func (m *MockExplorerSubmitter) Submit(ctx context.Context, tx *types.Transaction, from common.Address) error { + return nil +} + func TestSender(t *testing.T) { t.Parallel() @@ -330,6 +336,7 @@ func TestSender(t *testing.T) { &mockSimulator{}, &mockBackrunner{}, big.NewInt(1), // Settlement chain ID + &MockExplorerSubmitter{}, util.NewTestLogger(os.Stdout), ) if err != nil { @@ -584,6 +591,7 @@ func TestCancelTransaction(t *testing.T) { &mockSimulator{}, &mockBackrunner{}, big.NewInt(1), // Settlement chain ID + &MockExplorerSubmitter{}, util.NewTestLogger(os.Stdout), ) if err != nil { @@ -672,6 +680,7 @@ func TestIgnoreProvidersOnRetry(t *testing.T) { &mockSimulator{}, &mockBackrunner{}, big.NewInt(1), // Settlement chain ID + &MockExplorerSubmitter{}, util.NewTestLogger(io.Discard), ) if err != nil { diff --git a/tools/preconf-rpc/service/service.go b/tools/preconf-rpc/service/service.go index 7ade9b74a..de4c344ab 100644 --- a/tools/preconf-rpc/service/service.go +++ b/tools/preconf-rpc/service/service.go @@ -26,6 +26,7 @@ import ( "github.com/primev/mev-commit/tools/preconf-rpc/backrunner" bidder "github.com/primev/mev-commit/tools/preconf-rpc/bidder" "github.com/primev/mev-commit/tools/preconf-rpc/blocktracker" + explorersubmitter "github.com/primev/mev-commit/tools/preconf-rpc/explorer-submitter" "github.com/primev/mev-commit/tools/preconf-rpc/handlers" "github.com/primev/mev-commit/tools/preconf-rpc/notifier" "github.com/primev/mev-commit/tools/preconf-rpc/points" @@ -79,6 +80,9 @@ type Config struct { BackrunnerRPC string BackrunnerAPIURL string BackrunnerAPIKey string + ExplorerEndpoint string + ExplorerApiKey string + ExplorerAppCode string PointsAPIURL string PointsAPIKey string } @@ -288,6 +292,16 @@ func New(config *Config) (*Service, error) { s.closers = append(s.closers, channelCloser(backrunnerDone)) metricsRegistry.MustRegister(brunner.Metrics()...) + expSubmitter := explorersubmitter.New( + config.ExplorerEndpoint, + config.ExplorerApiKey, + config.ExplorerAppCode, + config.Logger.With("module", "explorersubmitter"), + ) + expSubmitterDone := expSubmitter.Start(ctx) + healthChecker.Register(health.CloseChannelHealthCheck("ExplorerSubmitter", expSubmitterDone)) + s.closers = append(s.closers, channelCloser(expSubmitterDone)) + sndr, err := sender.NewTxSender( rpcstore, bidderClient, @@ -298,6 +312,7 @@ func New(config *Config) (*Service, error) { simulator, brunner, settlementChainID, + expSubmitter, config.Logger.With("module", "txsender"), ) if err != nil {