Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 61 additions & 48 deletions pkg/clmimicry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"slices"

"github.com/ethpandaops/xatu/pkg/proto/libp2p"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand Down Expand Up @@ -57,42 +56,82 @@ const (
// - "HANDLE_STATUS": Processing of status requests

// HandleHermesEvent processes a Hermes trace event and routes it to the appropriate handler
func (p *Processor) HandleHermesEvent(ctx context.Context, event *TraceEvent) error {
func (p *Processor) HandleHermesEvent(ctx context.Context, event TraceEvent) error {
if event == nil {
return errors.New("event is nil")
}

p.log.WithField("type", event.Type).Trace("Received Hermes event")
p.log.WithField("type", fmt.Sprintf("%T", event)).Trace("Received Hermes event")

traceMeta := &libp2p.TraceEventMetadata{
PeerId: wrapperspb.String(event.PeerID.String()),
PeerId: wrapperspb.String(event.GetPeerID().String()),
}

clientMeta, err := p.metaProvider.GetClientMeta(ctx)
if err != nil {
return fmt.Errorf("failed to get client meta: %w", err)
}

// Route the event to the appropriate handler based on its category.
switch {
// GossipSub protocol events.
case isGossipSubEvent(event):
return p.handleHermesGossipSubEvent(ctx, event, clientMeta, traceMeta)

// libp2p pubsub protocol level events.
case isLibp2pEvent(event):
return p.handleHermesLibp2pEvent(ctx, event, clientMeta, traceMeta)

// libp2p core networking events.
case isLibp2pCoreEvent(event):
return p.handleHermesLibp2pCoreEvent(ctx, event, clientMeta, traceMeta)

// Request/Response (RPC) protocol events.
case isRpcEvent(event):
return p.handleHermesRPCEvent(ctx, event, clientMeta, traceMeta)
// Route the event to the appropriate handler based on its type.
switch e := event.(type) {
// GossipSub events
case *BeaconBlockEvent:
return p.handleBeaconBlockEvent(ctx, e, clientMeta, traceMeta)
case *AttestationEvent:
return p.handleAttestationEvent(ctx, e, clientMeta, traceMeta)
case *AggregateAndProofEvent:
return p.handleAggregateAndProofEvent(ctx, e, clientMeta, traceMeta)
case *BlobSidecarEvent:
return p.handleBlobSidecarEvent(ctx, e, clientMeta, traceMeta)
case *DataColumnSidecarEvent:
return p.handleDataColumnSidecarEvent(ctx, e, clientMeta, traceMeta)

// libp2p trace events
case *AddPeerEvent:
return p.handleAddPeerEvent(ctx, e, clientMeta, traceMeta)
case *RemovePeerEvent:
return p.handleRemovePeerEvent(ctx, e, clientMeta, traceMeta)
case *JoinEvent:
return p.handleJoinEvent(ctx, e, clientMeta, traceMeta)
case *LeaveEvent:
return p.handleLeaveEvent(ctx, e, clientMeta, traceMeta)
case *GraftEvent:
return p.handleGraftEvent(ctx, e, clientMeta, traceMeta)
case *PruneEvent:
return p.handlePruneEvent(ctx, e, clientMeta, traceMeta)
case *PublishMessageEvent:
return p.handlePublishMessageEvent(ctx, e, clientMeta, traceMeta)
case *RejectMessageEvent:
return p.handleRejectMessageEvent(ctx, e, clientMeta, traceMeta)
case *DuplicateMessageEvent:
return p.handleDuplicateMessageEvent(ctx, e, clientMeta, traceMeta)
case *DeliverMessageEvent:
return p.handleDeliverMessageEvent(ctx, e, clientMeta, traceMeta)
case *RecvRPCEvent:
return p.handleRecvRPCEvent(ctx, e, clientMeta, traceMeta)
case *SendRPCEvent:
return p.handleSendRPCEvent(ctx, e, clientMeta, traceMeta)
case *DropRPCEvent:
return p.handleDropRPCEvent(ctx, e, clientMeta, traceMeta)

// libp2p core events
case *ConnectedEvent:
return p.handleConnectedEvent(ctx, e, clientMeta, traceMeta)
case *DisconnectedEvent:
return p.handleDisconnectedEvent(ctx, e, clientMeta, traceMeta)
case *SyntheticHeartbeatEvent:
return p.handleSyntheticHeartbeatEvent(ctx, e, clientMeta, traceMeta)

// RPC events
case *HandleMetadataEvent:
return p.handleMetadataEvent(ctx, e, clientMeta, traceMeta)
case *HandleStatusEvent:
return p.handleStatusEvent(ctx, e, clientMeta, traceMeta)
case *CustodyProbeEvent:
return p.handleCustodyProbeEvent(ctx, e, clientMeta, traceMeta)

default:
p.log.WithField("type", event.Type).Debug("unsupported Hermes event")
p.log.WithField("type", fmt.Sprintf("%T", event)).Debug("unsupported event type")

return nil
}
Expand All @@ -111,29 +150,3 @@ func getNetworkID(clientMeta *xatu.ClientMeta) string {

return networkStr
}

// isRpcEvent checks if the event is a RPC event.
func isRpcEvent(event *TraceEvent) bool {
_, exists := rpcToXatuEventMap[event.Type]

return exists
}

// isLibp2pCoreEvent checks if the event is a libp2p core event.
func isLibp2pCoreEvent(event *TraceEvent) bool {
_, exists := libp2pCoreToXatuEventMap[event.Type]

return exists
}

// isLibp2pEvent checks if the event is a libp2p event.
func isLibp2pEvent(event *TraceEvent) bool {
_, exists := libp2pToXatuEventMap[event.Type]

return exists
}

// isGossipSubEvent checks if the event is a gossipsub event.
func isGossipSubEvent(event *TraceEvent) bool {
return slices.Contains(gossipsubEventTypes, event.Type)
}
Loading
Loading