Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions tools/preconf-rpc/rpcserver/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package rpcserver

import "github.com/prometheus/client_golang/prometheus"

type metrics struct {
methodSuccessCounts *prometheus.CounterVec
methodSuccessDurations *prometheus.HistogramVec
methodFailureCounts *prometheus.CounterVec
methodFailureDurations *prometheus.HistogramVec
proxyMethodSuccessCounts *prometheus.CounterVec
proxyMethodSuccessDurations *prometheus.HistogramVec
proxyMethodFailureCounts *prometheus.CounterVec
proxyMethodFailureDurations *prometheus.HistogramVec
}

func newMetrics() *metrics {
return &metrics{
methodSuccessCounts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "method_success_counts",
Help: "Count of successful RPC method calls",
},
[]string{"method"},
),
methodSuccessDurations: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "method_success_durations_ms",
Help: "Duration of successful RPC method calls",
Buckets: prometheus.ExponentialBuckets(5, 2, 12),
},
[]string{"method"},
),
methodFailureCounts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "method_failure_counts",
Help: "Count of failed RPC method calls",
},
[]string{"method"},
),
methodFailureDurations: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "method_failure_durations_ms",
Help: "Duration of failed RPC method calls",
Buckets: prometheus.ExponentialBuckets(5, 2, 12),
},
[]string{"method"},
),
proxyMethodSuccessCounts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "rpc_proxy_method_success_counts",
Help: "Count of successful proxied RPC method calls",
},
[]string{"method"},
),
proxyMethodSuccessDurations: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "proxy_method_success_durations_ms",
Help: "Duration of successful proxied RPC method calls",
Buckets: prometheus.ExponentialBuckets(5, 2, 12),
},
[]string{"method"},
),
proxyMethodFailureCounts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "proxy_method_failure_counts",
Help: "Count of failed proxied RPC method calls",
},
[]string{"method"},
),
proxyMethodFailureDurations: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "rpc",
Subsystem: "server",
Name: "proxy_method_failure_durations_ms",
Help: "Duration of failed proxied RPC method calls",
Buckets: prometheus.ExponentialBuckets(5, 2, 12),
},
[]string{"method"},
),
}
}
46 changes: 39 additions & 7 deletions tools/preconf-rpc/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -91,6 +92,7 @@ type JSONRPCServer struct {
proxyURL string
httpClient *http.Client
cache *lru.Cache[string, cacheEntry]
metrics *metrics
logger *slog.Logger
}

Expand All @@ -117,11 +119,25 @@ func NewJSONRPCServer(proxyURL string, logger *slog.Logger) (*JSONRPCServer, err
},
Timeout: 15 * time.Second,
},
cache: cache,
logger: logger,
cache: cache,
metrics: newMetrics(),
logger: logger,
}, nil
}

func (s *JSONRPCServer) Metrics() []prometheus.Collector {
return []prometheus.Collector{
s.metrics.methodSuccessCounts,
s.metrics.methodFailureCounts,
s.metrics.methodSuccessDurations,
s.metrics.methodFailureDurations,
s.metrics.proxyMethodSuccessCounts,
s.metrics.proxyMethodFailureCounts,
s.metrics.proxyMethodSuccessDurations,
s.metrics.proxyMethodFailureDurations,
}
}

func (s *JSONRPCServer) RegisterHandler(method string, handler methodHandler) {
s.rwLock.Lock()
s.methods[method] = handler
Expand All @@ -134,8 +150,6 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

s.logger.Debug("Received JSON-RPC request", "method", r.Method)

if r.Header.Get("Content-Type") != "application/json" {
http.Error(w, "Invalid content type", http.StatusUnsupportedMediaType)
return
Expand All @@ -155,6 +169,7 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req jsonRPCRequest
err = json.Unmarshal(body, &req)
if err != nil {
s.logger.Error("Failed to parse JSON-RPC request", "error", err, "body", string(body))
s.writeError(w, nil, CodeParseError, "Failed to parse request")
return
}
Expand All @@ -165,19 +180,20 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

start := time.Now()
defer func() {
s.logger.Debug("Request processing time", "method", req.Method, "id", req.ID, "duration", time.Since(start).String())
}()

if cacheMethods[req.Method] {
if stubbed, resp := maybeStubERC20Meta(req.Method, req.Params); stubbed {
s.writeResponse(w, req.ID, &resp)
s.metrics.methodSuccessCounts.WithLabelValues(req.Method).Inc()
s.metrics.methodSuccessDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}
key := cacheKey(req.Method, req.Params)
if entry, ok := s.cache.Get(key); ok && time.Now().Before(entry.until) {
s.logger.Debug("Cache hit", "method", req.Method, "id", req.ID)
s.writeResponse(w, req.ID, &entry.data)
s.metrics.methodSuccessCounts.WithLabelValues(req.Method).Inc()
s.metrics.methodSuccessDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}
}
Expand All @@ -186,15 +202,21 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
out, statusCode, err := s.proxyRequest(r.Context(), body)
if err != nil {
http.Error(w, err.Error(), statusCode)
s.metrics.proxyMethodFailureCounts.WithLabelValues(req.Method).Inc()
s.metrics.proxyMethodFailureDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}
var resp jsonRPCResponse
if err := json.Unmarshal(out, &resp); err != nil {
http.Error(w, "Failed to parse proxy response", http.StatusInternalServerError)
s.metrics.proxyMethodFailureCounts.WithLabelValues(req.Method).Inc()
s.metrics.proxyMethodFailureDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}
if resp.Error != nil {
s.writeError(w, req.ID, resp.Error.Code, resp.Error.Message)
s.metrics.proxyMethodFailureCounts.WithLabelValues(req.Method).Inc()
s.metrics.proxyMethodFailureDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}
if cacheMethods[req.Method] && resp.Result != nil {
Expand All @@ -206,6 +228,8 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.logger.Debug("Cache store", "method", req.Method, "id", req.ID)
}
s.writeResponse(w, req.ID, resp.Result)
s.metrics.proxyMethodSuccessCounts.WithLabelValues(req.Method).Inc()
s.metrics.proxyMethodSuccessDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
}

s.rwLock.RLock()
Expand All @@ -219,6 +243,10 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp, proxy, err := handler(r.Context(), req.Params...)
switch {
case err != nil:
defer func() {
s.metrics.methodFailureCounts.WithLabelValues(req.Method).Inc()
s.metrics.methodFailureDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
}()
var jsonErr *JSONErr
if ok := errors.As(err, &jsonErr); ok {
// If the error is a JSONErr, we can use it directly.
Expand All @@ -232,10 +260,14 @@ func (s *JSONRPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
case resp == nil:
s.writeError(w, req.ID, CodeCustomError, "No response")
s.metrics.methodFailureCounts.WithLabelValues(req.Method).Inc()
s.metrics.methodFailureDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
return
}

s.writeResponse(w, req.ID, &resp)
s.metrics.methodSuccessCounts.WithLabelValues(req.Method).Inc()
s.metrics.methodSuccessDurations.WithLabelValues(req.Method).Observe(float64(time.Since(start).Milliseconds()))
}

func (s *JSONRPCServer) writeResponse(w http.ResponseWriter, id any, result *json.RawMessage) {
Expand Down
5 changes: 3 additions & 2 deletions tools/preconf-rpc/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,16 @@ func New(config *Config) (*Service, error) {
healthChecker.Register(health.CloseChannelHealthCheck("BidderService", bidderDone))
s.closers = append(s.closers, channelCloser(bidderDone))

metricsRegistry := prometheus.NewRegistry()

rpcServer, err := rpcserver.NewJSONRPCServer(
config.L1RPCHTTPUrl,
config.Logger.With("module", "rpcserver"),
)
if err != nil {
return nil, fmt.Errorf("failed to create RPC server: %w", err)
}

metricsRegistry := prometheus.NewRegistry()
metricsRegistry.MustRegister(rpcServer.Metrics()...)

bidpricer, err := pricer.NewPricer(config.PricerAPIKey, config.Logger.With("module", "bidpricer"))
if err != nil {
Expand Down
Loading