Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
534 changes: 332 additions & 202 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,3 @@ func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

// func (br *ByzantineReactor) QueueUnprocessedEnvelope(e p2p.UnprocessedEnvelope) {
// br.reactor.QueueUnprocessedEnvelope(e)
// }
5 changes: 1 addition & 4 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ const (

blocksToContributeToBecomeGoodPeer = 10000
votesToContributeToBecomeGoodPeer = 10000

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 1000
)

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -71,7 +68,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
conR.BaseReactor = *p2p.NewBaseReactor(
"Consensus",
conR,
p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
)

for _, option := range options {
option(conR)
Expand Down
4 changes: 1 addition & 3 deletions evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
broadcastEvidenceIntervalS = 10
// If a message fails wait this much before sending it again
peerRetryMessageIntervalMS = 100
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 1
)

// Reactor handles evpool evidence broadcasting amongst peers.
Expand All @@ -41,7 +39,7 @@ func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}

Expand Down
11 changes: 4 additions & 7 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/clist"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/trace"
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"
Expand All @@ -31,16 +30,14 @@ type Reactor struct {
// connections for different groups of peers.
activePersistentPeersSemaphore *semaphore.Weighted
activeNonPersistentPeersSemaphore *semaphore.Weighted
traceClient trace.Tracer
}

// NewReactor returns a new Reactor with the given config and mempool.
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool, traceClient trace.Tracer) *Reactor {
func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
memR := &Reactor{
config: config,
mempool: mempool,
ids: newMempoolIDs(),
traceClient: traceClient,
config: config,
mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
Expand Down
3 changes: 1 addition & 2 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/trace"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/mock"
memproto "github.com/cometbft/cometbft/proto/tendermint/mempool"
Expand Down Expand Up @@ -330,7 +329,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) ([]*Reactor, []*p2p.Switc
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

reactors[i] = NewReactor(config.Mempool, mempool, trace.NoOpTracer()) // so we dont start the consensus states
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}

Expand Down
1 change: 0 additions & 1 deletion node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func createMempoolAndMempoolReactor(
reactor := mempl.NewReactor(
config.Mempool,
mp,
traceClient,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
Expand Down
131 changes: 0 additions & 131 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,82 +46,27 @@ type Reactor interface {
// Receive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor
Receive(Envelope)

// QueueUnprocessedEnvelope is called by the switch when an unprocessed
// envelope is received. Unprocessed envelopes are immediately buffered in a
// queue to avoid blocking. Incoming messages are then passed to a
// processing function. The default processing function unmarshals the
// messages in the order the sender sent them and then calls Receive on the
// reactor. The queue size and the processing function can be changed via
// passing options to the base reactor.
// QueueUnprocessedEnvelope(e UnprocessedEnvelope)
}

//--------------------------------------

type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch

// incoming chan UnprocessedEnvelope

// ctx context.Context
// cancel context.CancelFunc
// // processor is called with the incoming channel and is responsible for
// // unmarshalling the messages and calling Receive on the reactor.
// processor ProcessorFunc
}

type ReactorOptions func(*BaseReactor)

func NewBaseReactor(name string, impl Reactor, opts ...ReactorOptions) *BaseReactor {
// ctx := context.Background()
// ctx, cancel := context.WithCancel(ctx)

br := &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,

// ctx: ctx,
// cancel: cancel,
// incoming: make(chan UnprocessedEnvelope, 100),
// processor: DefaultProcessor(impl),
}

// for _, opt := range opts {
// opt(br)
// }

// go func() {
// err := br.processor(ctx, br.incoming)
// if err != nil {
// err = br.Stop()
// if err != nil {
// panic(err)
// }
// }
// }()

return br
}

// // WithProcessor sets the processor function for the reactor. The processor
// // function is called with the incoming channel and is responsible for
// // unmarshalling the messages and calling Receive on the reactor.
// func WithProcessor(processor ProcessorFunc) ReactorOptions {
// return func(br *BaseReactor) {
// br.processor = processor
// }
// }

// WithIncomingQueueSize sets the size of the incoming message queue for a
// reactor.
func WithIncomingQueueSize(size int) ReactorOptions {
return func(br *BaseReactor) {
// br.incoming = make(chan UnprocessedEnvelope, size)
}
}

func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
Expand All @@ -130,79 +75,3 @@ func (*BaseReactor) AddPeer(Peer) {}
func (*BaseReactor) RemovePeer(Peer, interface{}) {}
func (*BaseReactor) Receive(Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

// // QueueUnprocessedEnvelope is called by the switch when an unprocessed
// // envelope is received. Unprocessed envelopes are immediately buffered in a
// // queue to avoid blocking. The size of the queue can be changed by passing
// // options to the base reactor.
// func (br *BaseReactor) QueueUnprocessedEnvelope(e UnprocessedEnvelope) {
// select {
// // if the context is done, do nothing.
// case <-br.ctx.Done():
// // if not, add the item to the channel.
// case br.incoming <- e:
// }
// }

// func (br *BaseReactor) OnStop() {
// br.cancel()
// close(br.incoming)
// }

// // DefaultProcessor unmarshalls the message and calls Receive on the reactor.
// // This preserves the sender's original order for all messages.
// func DefaultProcessor(impl Reactor) func(context.Context, <-chan UnprocessedEnvelope) error {
// implChannels := impl.GetChannels()

// chIDs := make(map[byte]proto.Message, len(implChannels))
// for _, chDesc := range implChannels {
// chIDs[chDesc.ID] = chDesc.MessageType
// }
// return func(ctx context.Context, incoming <-chan UnprocessedEnvelope) error {
// for {
// select {
// case <-ctx.Done():
// return nil
// case ue, ok := <-incoming:
// if !ok {
// // this means the channel was closed.
// return nil
// }
// mt := chIDs[ue.ChannelID]

// if mt == nil {
// return fmt.Errorf("no message type registered for channel %d", ue.ChannelID)
// }

// msg := proto.Clone(mt)

// err := proto.Unmarshal(ue.Message, msg)
// if err != nil {
// return fmt.Errorf("unmarshaling message: %v into type: %s resulted in error %w", msg, reflect.TypeOf(mt), err)
// }

// if w, ok := msg.(Unwrapper); ok {
// msg, err = w.Unwrap()
// if err != nil {
// return fmt.Errorf("unwrapping message: %v", err)
// }
// }

// labels := []string{
// "peer_id", string(ue.Src.ID()),
// "chID", fmt.Sprintf("%#x", ue.ChannelID),
// }

// ue.Src.Metrics().PeerReceiveBytesTotal.With(labels...).Add(float64(len(ue.Message)))
// ue.Src.Metrics().MessageReceiveBytesTotal.With(append(labels, "message_type", ue.Src.ValueToMetricLabel(msg))...).Add(float64(len(ue.Message)))
// schema.WriteReceivedBytes(ue.Src.TraceClient(), string(ue.Src.ID()), ue.ChannelID, len(ue.Message))

// impl.Receive(Envelope{
// ChannelID: ue.ChannelID,
// Src: ue.Src,
// Message: msg,
// })
// }
// }
// }
// }
123 changes: 0 additions & 123 deletions p2p/base_reactor_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion p2p/mock/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Reactor struct {

func NewReactor() *Reactor {
r := &Reactor{}
r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r, p2p.WithIncomingQueueSize(1))
r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r)
r.SetLogger(log.TestingLogger())
return r
}
Expand Down
Loading
Loading