Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/govulncheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/setup-go@v5
with:
go-version: "1.22"
go-version: "1.23"
check-latest: true
- uses: actions/checkout@v4
- uses: technote-space/get-diff-action@v6
Expand Down
7 changes: 7 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ linters-settings:
- github.com/stretchr/testify/require
- github.com/syndtr/goleveldb
- github.com/decred/dcrd/dcrec/secp256k1/v4
- github.com/aws/aws-sdk-go-v2/aws
- github.com/aws/aws-sdk-go-v2/config
- github.com/aws/aws-sdk-go-v2/credentials
- github.com/grafana/pyroscope-go
- github.com/aws/aws-sdk-go-v2/service/s3
- github.com/grafana/otel-profiling-go
- github.com/celestiaorg/nmt
test:
files:
- "$test"
Expand Down
616 changes: 415 additions & 201 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion blocksync/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {

//nolint:lll // ignore line length in tests
func TestBlocksyncMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
block := types.MakeBlock(int64(3), types.MakeData([]types.Tx{types.Tx("Hello World")}), nil, nil)
block.Version.Block = 11 // overwrite updated protocol version

bpb, err := block.ToProto()
Expand Down
21 changes: 20 additions & 1 deletion blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 10
)

type consensusReactor interface {
Expand Down Expand Up @@ -500,6 +503,22 @@ FOR_LOOP:
// validate the block before we persist it
err = bcR.blockExec.ValidateBlock(state, first)
}

if err == nil {
var stateMachineValid bool
// Block sync doesn't check that the `Data` in a block is valid.
// Since celestia-core can't determine if the `Data` in a block
// is valid, the next line asks celestia-app to check if the
// block is valid via ProcessProposal. If this step wasn't
// performed, a malicious node could fabricate an alternative
// set of transactions that would cause a different app hash and
// thus cause this node to panic.
stateMachineValid, err = bcR.blockExec.ProcessProposal(first, state)
if !stateMachineValid {
err = fmt.Errorf("application has rejected syncing block (%X) at height %d", first.Hash(), first.Height)
}
}

presentExtCommit := extCommit != nil
extensionsEnabled := state.ConsensusParams.ABCI.VoteExtensionsEnabled(first.Height)
if presentExtCommit != extensionsEnabled {
Expand Down Expand Up @@ -546,7 +565,7 @@ FOR_LOOP:

// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
state, err = bcR.blockExec.ApplyVerifiedBlock(state, firstID, first)
state, err = bcR.blockExec.ApplyVerifiedBlock(state, firstID, first, second.LastCommit)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
Expand Down
5 changes: 3 additions & 2 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func newReactor(

lastExtCommit := seenExtCommit.Clone()

thisBlock := state.MakeBlock(blockHeight, nil, lastExtCommit.ToCommit(), nil, state.Validators.Proposer.Address)
thisBlock := state.MakeBlock(blockHeight, types.MakeData([]types.Tx{}), lastExtCommit.ToCommit(), nil, state.Validators.Proposer.Address)

thisParts, err := thisBlock.MakePartSet(types.BlockPartSizeBytes)
require.NoError(t, err)
Expand All @@ -165,7 +165,7 @@ func newReactor(
ExtendedSignatures: []types.ExtendedCommitSig{vote.ExtendedCommitSig()},
}

state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, err = blockExec.ApplyBlock(state, blockID, thisBlock, lastExtCommit.ToCommit())
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
Expand Down Expand Up @@ -523,6 +523,7 @@ func (bcR *ByzantineReactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Pe
// Receive implements Reactor by handling 4 types of messages (look below).
// Copied unchanged from reactor.go so the correct respondToPeer is called.
func (bcR *ByzantineReactor) Receive(e p2p.Envelope) { //nolint: dupl
fmt.Println("Receive", e.Message)
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
Expand Down
25 changes: 25 additions & 0 deletions cmd/cometbft/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cfg "github.com/cometbft/cometbft/config"
cmtos "github.com/cometbft/cometbft/libs/os"
"github.com/cometbft/cometbft/libs/trace"
nm "github.com/cometbft/cometbft/node"
)

Expand Down Expand Up @@ -90,6 +91,30 @@ func AddNodeFlags(cmd *cobra.Command) {
"db_dir",
config.DBPath,
"database directory")

cmd.PersistentFlags().String(
trace.FlagTracePushConfig,
config.Instrumentation.TracePushConfig,
trace.FlagTracePushConfigDescription,
)

cmd.PersistentFlags().String(
trace.FlagTracePullAddress,
config.Instrumentation.TracePullAddress,
trace.FlagTracePullAddressDescription,
)

cmd.PersistentFlags().String(
trace.FlagPyroscopeURL,
config.Instrumentation.PyroscopeURL,
trace.FlagPyroscopeURLDescription,
)

cmd.PersistentFlags().Bool(
trace.FlagPyroscopeTrace,
config.Instrumentation.PyroscopeTrace,
trace.FlagPyroscopeTraceDescription,
)
}

// NewRunNodeCmd returns the command that allows the CLI to start a node.
Expand Down
116 changes: 114 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (

MempoolTypeFlood = "flood"
MempoolTypeNop = "nop"
MempoolTypeCAT = "cat"
)

// NOTE: Most of the structs & relevant comments + the
Expand All @@ -64,6 +65,12 @@ var (

// taken from https://semver.org/
semverRegexp = regexp.MustCompile(`^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`)

// DefaultTracingTables is a list of tables that are used for storing traces.
// This global var is filled by an init function in the schema package. This
// allows for the schema package to contain all the relevant logic while
// avoiding import cycles.
DefaultTracingTables = ""
)

// Config defines the top level configuration for a CometBFT node
Expand Down Expand Up @@ -781,6 +788,12 @@ type MempoolConfig struct {
// performance results using the default P2P configuration.
ExperimentalMaxGossipConnectionsToPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_non_persistent_peers"`

// MaxGossipDelay is the maximum allotted time that the reactor expects a transaction to
// arrive before issuing a new request to a different peer
// Only applicable to the v2 / CAT mempool
// Default is 200ms
MaxGossipDelay time.Duration `mapstructure:"max-gossip-delay"`
}

// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
Expand Down Expand Up @@ -978,8 +991,13 @@ func (cfg *BlockSyncConfig) ValidateBasic() error {
// including timeouts and details about the WAL and the block structure.
type ConsensusConfig struct {
RootDir string `mapstructure:"home"`
WalPath string `mapstructure:"wal_file"`
walFile string // overrides WalPath if set
// If set to true, only internal messages will be written
// to the WAL. External messages like votes, proposals
// block parts, will not be written
// Default: true
OnlyInternalWal bool `mapstructure:"only_internal_wal"`
WalPath string `mapstructure:"wal_file"`
walFile string // overrides WalPath if set

// How long we wait for a proposal block before prevoting nil
TimeoutPropose time.Duration `mapstructure:"timeout_propose"`
Expand Down Expand Up @@ -1016,6 +1034,7 @@ type ConsensusConfig struct {
// DefaultConsensusConfig returns a default configuration for the consensus service
func DefaultConsensusConfig() *ConsensusConfig {
return &ConsensusConfig{
OnlyInternalWal: true,
WalPath: filepath.Join(DefaultDataDir, "cs.wal", "wal"),
TimeoutPropose: 3000 * time.Millisecond,
TimeoutProposeDelta: 500 * time.Millisecond,
Expand All @@ -1036,6 +1055,7 @@ func DefaultConsensusConfig() *ConsensusConfig {
// TestConsensusConfig returns a configuration for testing the consensus service
func TestConsensusConfig() *ConsensusConfig {
cfg := DefaultConsensusConfig()
cfg.OnlyInternalWal = false
cfg.TimeoutPropose = 40 * time.Millisecond
cfg.TimeoutProposeDelta = 1 * time.Millisecond
cfg.TimeoutPrevote = 10 * time.Millisecond
Expand Down Expand Up @@ -1083,6 +1103,32 @@ func (cfg *ConsensusConfig) Commit(t time.Time) time.Time {
return t.Add(cfg.TimeoutCommit)
}

// ProposeWithCustomTimeout is identical to Propose. However,
// it calculates the amount of time to wait for a proposal using the supplied
// customTimeout.
// If customTimeout is 0, the TimeoutPropose from cfg is used.
func (cfg *ConsensusConfig) ProposeWithCustomTimeout(round int32, customTimeout time.Duration) time.Duration {
// this is to capture any unforeseen cases where the customTimeout is 0
var timeoutPropose = customTimeout
if timeoutPropose == 0 {
// falling back to default timeout
timeoutPropose = cfg.TimeoutPropose
}
return time.Duration(timeoutPropose.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
}

// CommitWithCustomTimeout is identical to Commit. However, it calculates the time for commit using the supplied customTimeout.
// If customTimeout is 0, the TimeoutCommit from cfg is used.
func (cfg *ConsensusConfig) CommitWithCustomTimeout(t time.Time, customTimeout time.Duration) time.Time {
// this is to capture any unforeseen cases where the customTimeout is 0
var timeoutCommit = customTimeout
if timeoutCommit == 0 {
// falling back to default timeout
timeoutCommit = cfg.TimeoutCommit
}
return t.Add(timeoutCommit)
}

// WalFile returns the full path to the write-ahead log file
func (cfg *ConsensusConfig) WalFile() string {
if cfg.walFile != "" {
Expand Down Expand Up @@ -1224,6 +1270,38 @@ type InstrumentationConfig struct {

// Instrumentation namespace.
Namespace string `mapstructure:"namespace"`

// TracePushConfig is the relative path of the push config. This second
// config contains credentials for where and how often to.
TracePushConfig string `mapstructure:"trace_push_config"`

// TracePullAddress is the address that the trace server will listen on for
// pulling data.
TracePullAddress string `mapstructure:"trace_pull_address"`

// TraceType is the type of tracer used. Options are "local" and "noop".
TraceType string `mapstructure:"trace_type"`

// TraceBufferSize is the number of traces to write in a single batch.
TraceBufferSize int `mapstructure:"trace_push_batch_size"`

// TracingTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables. It is represented as a
// comma separate string. For example: "consensus_round_state,mempool_tx".
TracingTables string `mapstructure:"tracing_tables"`

// PyroscopeURL is the pyroscope url used to establish a connection with a
// pyroscope continuous profiling server.
PyroscopeURL string `mapstructure:"pyroscope_url"`

// PyroscopeProfile is a flag that enables tracing with pyroscope.
PyroscopeTrace bool `mapstructure:"pyroscope_trace"`

// PyroscopeProfileTypes is a list of profile types to be traced with
// pyroscope. Available profile types are: cpu, alloc_objects, alloc_space,
// inuse_objects, inuse_space, goroutines, mutex_count, mutex_duration,
// block_count, block_duration.
PyroscopeProfileTypes []string `mapstructure:"pyroscope_profile_types"`
}

// DefaultInstrumentationConfig returns a default configuration for metrics
Expand All @@ -1234,6 +1312,23 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
PrometheusListenAddr: ":26660",
MaxOpenConnections: 3,
Namespace: "cometbft",
TracePushConfig: "",
TracePullAddress: "",
TraceType: "noop",
TraceBufferSize: 1000,
TracingTables: DefaultTracingTables,
PyroscopeURL: "",
PyroscopeTrace: false,
PyroscopeProfileTypes: []string{
"cpu",
"alloc_objects",
"inuse_objects",
"goroutines",
"mutex_count",
"mutex_duration",
"block_count",
"block_duration",
},
}
}

Expand All @@ -1249,6 +1344,23 @@ func (cfg *InstrumentationConfig) ValidateBasic() error {
if cfg.MaxOpenConnections < 0 {
return errors.New("max_open_connections can't be negative")
}
if cfg.PyroscopeTrace && cfg.PyroscopeURL == "" {
return errors.New("pyroscope_trace can't be enabled if profiling is disabled")
}
// if there is not TracePushConfig configured, then we do not need to validate the rest
// of the config because we are not connecting.
if cfg.TracePushConfig == "" {
return nil
}
if cfg.TracePullAddress == "" {
return errors.New("token is required")
}
if cfg.TraceType == "" {
return errors.New("org is required")
}
if cfg.TraceBufferSize <= 0 {
return errors.New("batch size must be greater than 0")
}
return nil
}

Expand Down
28 changes: 28 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,31 @@ func TestInstrumentationConfigValidateBasic(t *testing.T) {
cfg.MaxOpenConnections = -1
assert.Error(t, cfg.ValidateBasic())
}

func TestProposeWithCustomTimeout(t *testing.T) {
cfg := config.DefaultConsensusConfig()

// customTimeout is 0, should fallback to default timeout
round := int32(1)
expectedTimeout := time.Duration(cfg.TimeoutPropose.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
assert.Equal(t, expectedTimeout, cfg.ProposeWithCustomTimeout(round, time.Duration(0)))

// customTimeout is not 0
customTimeout := 2 * time.Second
expectedTimeout = time.Duration(customTimeout.Nanoseconds()+cfg.TimeoutProposeDelta.Nanoseconds()*int64(round)) * time.Nanosecond
assert.Equal(t, expectedTimeout, cfg.ProposeWithCustomTimeout(round, customTimeout))
}

func TestCommitWithCustomTimeout(t *testing.T) {
cfg := config.DefaultConsensusConfig()

// customTimeout is 0, should fallback to default timeout
inputTime := time.Now()
expectedTime := inputTime.Add(cfg.TimeoutCommit)
assert.Equal(t, expectedTime, cfg.CommitWithCustomTimeout(inputTime, time.Duration(0)))

// customTimeout is not 0
customTimeout := 2 * time.Second
expectedTime = inputTime.Add(customTimeout)
assert.Equal(t, expectedTime, cfg.CommitWithCustomTimeout(inputTime, customTimeout))
}
Loading
Loading