Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions tools/preconf-rpc/explorer-submitter/submitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package explorersubmitter

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)

type explorerSubmitter struct {
endpoint string
apiKey string
appCode string
client *http.Client
reqChan chan submitRequest
logger *slog.Logger
}

type submitRequest struct {
tx *types.Transaction
from common.Address
}

func New(endpoint, apiKey, appCode string, logger *slog.Logger) *explorerSubmitter {
return &explorerSubmitter{
endpoint: endpoint,
apiKey: apiKey,
appCode: appCode,
client: &http.Client{
Timeout: 15 * time.Second,
},
reqChan: make(chan submitRequest, 100),
logger: logger,
}
}

func (e *explorerSubmitter) Start(ctx context.Context) <-chan struct{} {
done := make(chan struct{})
eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case req := <-e.reqChan:
if err := e.doSubmit(egCtx, req); err != nil {
e.logger.Error("failed to submit to explorer", "error", err)
}
}
}
})

go func() {
defer close(done)
if err := eg.Wait(); err != nil {
if errors.Is(err, context.Canceled) {
e.logger.Info("Explorer submitter stopped")
} else {
e.logger.Error("Explorer submitter exited with error", "error", err)
}
}
}()

return done
}

func (e *explorerSubmitter) Submit(ctx context.Context, tx *types.Transaction, from common.Address) error {
select {
case e.reqChan <- submitRequest{tx: tx, from: from}:
return nil
case <-ctx.Done():
return ctx.Err()
default:
// Drop request if channel is full to prevent blocking
e.logger.Warn("request channel full, dropping request", "txHash", tx.Hash().Hex())
return nil
}
}

type TxData struct {
ChainID string `json:"df_chainid"`
AppCode string `json:"df_appcode"`
TxHash string `json:"df_txhash"`
ExpireTs int64 `json:"df_expire_ts"`
TxInfo TxInfo `json:"df_txinfo"`
}

type TxInfo struct {
From string `json:"from"`
To string `json:"to"`
}

func (e *explorerSubmitter) doSubmit(ctx context.Context, req submitRequest) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in doSubmit: %v", r)
}
}()

if e.endpoint == "" {
return nil
}

chainID := "1"
expireTs := time.Now().Add(15 * time.Minute).Unix()

txHash := req.tx.Hash().Hex()
from := req.from.Hex()
to := ""
if req.tx.To() != nil {
to = req.tx.To().Hex()
}

txData := TxData{
ChainID: chainID,
AppCode: e.appCode,
TxHash: txHash,
ExpireTs: expireTs,
TxInfo: TxInfo{
From: from,
To: to,
},
}

jsonData, err := json.Marshal(txData)
if err != nil {
return fmt.Errorf("failed to marshal json data: %w", err)
}

params := url.Values{}
params.Add("apikey", e.apiKey)
params.Add("action", "submitTxPending")
params.Add("JsonData", string(jsonData))

reqURL, err := url.Parse(e.endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint url: %w", err)
}

reqURL.RawQuery = params.Encode()

httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL.String(), nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

resp, err := e.client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to send request to explorer endpoint: %v", err.(*url.Error).Err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("received non-ok status code: %d, body: %s", resp.StatusCode, string(bodyBytes))
}

e.logger.Debug("Successfully submitted tx to explorer", "hash", txHash)
return nil
}
44 changes: 44 additions & 0 deletions tools/preconf-rpc/explorer-submitter/submitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package explorersubmitter

import (
"context"
"fmt"
"math/big"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/primev/mev-commit/x/util"
"github.com/stretchr/testify/require"
)

func TestSubmit(t *testing.T) {
endpoint := os.Getenv("EXPLORER_API_ENDPOINT")
apiKey := os.Getenv("EXPLORER_API_KEY")
appCode := os.Getenv("EXPLORER_APPCODE")

if endpoint == "" || apiKey == "" || appCode == "" {
t.Skip("skipping integration test, flags not provided")
}

logger := util.NewTestLogger(os.Stdout)
submitter := New(endpoint, apiKey, appCode, logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

done := submitter.Start(ctx)

tx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
fmt.Printf("Submitting mock transaction with hash: %s\n", tx.Hash().Hex())
err := submitter.Submit(ctx, tx, common.Address{})
require.NoError(t, err)

// allow processing
time.Sleep(2 * time.Second)

cancel()
<-done
}
24 changes: 24 additions & 0 deletions tools/preconf-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,24 @@ var (
return nil
},
}

optionExplorerEndpoint = &cli.StringFlag{
Name: "explorer-endpoint",
Usage: "Explorer API endpoint for submitting transactions",
EnvVars: []string{"EXPLORER_API_ENDPOINT"},
}

optionExplorerApiKey = &cli.StringFlag{
Name: "explorer-apikey",
Usage: "Explorer API Key",
EnvVars: []string{"EXPLORER_API_KEY"},
}

optionExplorerAppCode = &cli.StringFlag{
Name: "explorer-appcode",
Usage: "Explorer App Code",
EnvVars: []string{"EXPLORER_APPCODE"},
}
)

func main() {
Expand Down Expand Up @@ -337,6 +355,9 @@ func main() {
optionBackrunnerAPIURL,
optionBackrunnerRPCURL,
optionBackrunnerAPIKey,
optionExplorerEndpoint,
optionExplorerApiKey,
optionExplorerAppCode,
optionPointsAPIURL,
optionPointsAPIKey,
},
Expand Down Expand Up @@ -429,6 +450,9 @@ func main() {
BackrunnerAPIURL: c.String(optionBackrunnerAPIURL.Name),
BackrunnerRPC: c.String(optionBackrunnerRPCURL.Name),
BackrunnerAPIKey: c.String(optionBackrunnerAPIKey.Name),
ExplorerEndpoint: c.String(optionExplorerEndpoint.Name),
ExplorerApiKey: c.String(optionExplorerApiKey.Name),
ExplorerAppCode: c.String(optionExplorerAppCode.Name),
PointsAPIURL: c.String(optionPointsAPIURL.Name),
PointsAPIKey: c.String(optionPointsAPIKey.Name),
}
Expand Down
15 changes: 15 additions & 0 deletions tools/preconf-rpc/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ type Backrunner interface {
Backrun(ctx context.Context, rawTx string, commitments []*bidderapiv1.Commitment) error
}

type ExplorerSubmitter interface {
Submit(ctx context.Context, tx *types.Transaction, from common.Address) error
}

type TxSender struct {
logger *slog.Logger
store Store
Expand Down Expand Up @@ -191,6 +195,7 @@ type TxSender struct {
receiptSignal map[common.Hash][]chan struct{}
receiptMtx sync.Mutex
metrics *metrics
explorerSubmitter ExplorerSubmitter
}

func noOpFastTrack(_ []*bidderapiv1.Commitment, _ bool) bool {
Expand All @@ -207,6 +212,7 @@ func NewTxSender(
simulator Simulator,
backrunner Backrunner,
settlementChainId *big.Int,
explorerSubmitter ExplorerSubmitter,
logger *slog.Logger,
) (*TxSender, error) {
txnAttemptHistory, err := lru.New[common.Hash, *txnAttempt](1000)
Expand Down Expand Up @@ -242,6 +248,7 @@ func NewTxSender(
bidTimeout: bidTimeout,
receiptSignal: make(map[common.Hash][]chan struct{}),
metrics: newMetrics(),
explorerSubmitter: explorerSubmitter,
}, nil
}

Expand Down Expand Up @@ -333,6 +340,14 @@ func (t *TxSender) Enqueue(ctx context.Context, tx *Transaction) error {

t.triggerSender()

if err := t.explorerSubmitter.Submit(
context.Background(),
tx.Transaction,
tx.Sender,
); err != nil {
t.logger.Error("Failed to submit tx to explorer", "error", err)
}

return nil
}

Expand Down
9 changes: 9 additions & 0 deletions tools/preconf-rpc/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (m *mockBackrunner) Backrun(ctx context.Context, rawTx string, commitments
return nil
}

type MockExplorerSubmitter struct{}

func (m *MockExplorerSubmitter) Submit(ctx context.Context, tx *types.Transaction, from common.Address) error {
return nil
}

func TestSender(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -330,6 +336,7 @@ func TestSender(t *testing.T) {
&mockSimulator{},
&mockBackrunner{},
big.NewInt(1), // Settlement chain ID
&MockExplorerSubmitter{},
util.NewTestLogger(os.Stdout),
)
if err != nil {
Expand Down Expand Up @@ -584,6 +591,7 @@ func TestCancelTransaction(t *testing.T) {
&mockSimulator{},
&mockBackrunner{},
big.NewInt(1), // Settlement chain ID
&MockExplorerSubmitter{},
util.NewTestLogger(os.Stdout),
)
if err != nil {
Expand Down Expand Up @@ -672,6 +680,7 @@ func TestIgnoreProvidersOnRetry(t *testing.T) {
&mockSimulator{},
&mockBackrunner{},
big.NewInt(1), // Settlement chain ID
&MockExplorerSubmitter{},
util.NewTestLogger(io.Discard),
)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions tools/preconf-rpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/primev/mev-commit/tools/preconf-rpc/backrunner"
bidder "github.com/primev/mev-commit/tools/preconf-rpc/bidder"
"github.com/primev/mev-commit/tools/preconf-rpc/blocktracker"
explorersubmitter "github.com/primev/mev-commit/tools/preconf-rpc/explorer-submitter"
"github.com/primev/mev-commit/tools/preconf-rpc/handlers"
"github.com/primev/mev-commit/tools/preconf-rpc/notifier"
"github.com/primev/mev-commit/tools/preconf-rpc/points"
Expand Down Expand Up @@ -79,6 +80,9 @@ type Config struct {
BackrunnerRPC string
BackrunnerAPIURL string
BackrunnerAPIKey string
ExplorerEndpoint string
ExplorerApiKey string
ExplorerAppCode string
PointsAPIURL string
PointsAPIKey string
}
Expand Down Expand Up @@ -288,6 +292,16 @@ func New(config *Config) (*Service, error) {
s.closers = append(s.closers, channelCloser(backrunnerDone))
metricsRegistry.MustRegister(brunner.Metrics()...)

expSubmitter := explorersubmitter.New(
config.ExplorerEndpoint,
config.ExplorerApiKey,
config.ExplorerAppCode,
config.Logger.With("module", "explorersubmitter"),
)
expSubmitterDone := expSubmitter.Start(ctx)
healthChecker.Register(health.CloseChannelHealthCheck("ExplorerSubmitter", expSubmitterDone))
s.closers = append(s.closers, channelCloser(expSubmitterDone))

sndr, err := sender.NewTxSender(
rpcstore,
bidderClient,
Expand All @@ -298,6 +312,7 @@ func New(config *Config) (*Service, error) {
simulator,
brunner,
settlementChainID,
expSubmitter,
config.Logger.With("module", "txsender"),
)
if err != nil {
Expand Down
Loading