diff --git a/docs/cannon.md b/docs/cannon.md index c6452b8db..0df2049d0 100644 --- a/docs/cannon.md +++ b/docs/cannon.md @@ -118,6 +118,11 @@ Output configuration to send sentry events to a kafka server. | outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | | outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | | outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | +| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. | +| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. | +| outputs[].config.sasl.user | string | | | SASL username. | +| outputs[].config.sasl.password | string | | | SASL password. | +| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). | ### Simple example @@ -188,6 +193,27 @@ outputs: brokers: localhost:19092 topic: events ``` + +### kafka server output with SCRAM-SHA-512 authentication example + +```yaml +name: example-instance-005 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink-scram + type: kafka + config: + brokers: kafka.example.com:9094 + topic: events + tls: true + sasl: + mechanism: SCRAM-SHA-512 + user: xatu-client + password: "${KAFKA_PASSWORD}" +``` ### Complex example with multiple outputs example ```yaml diff --git a/docs/mimicry.md b/docs/mimicry.md index 1d0b699f3..e5d81b560 100644 --- a/docs/mimicry.md +++ b/docs/mimicry.md @@ -121,6 +121,11 @@ Output configuration to send sentry events to a kafka server. | outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | | outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | | outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | +| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. | +| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. | +| outputs[].config.sasl.user | string | | | SASL username. | +| outputs[].config.sasl.password | string | | | SASL password. | +| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). | ### Simple example @@ -213,6 +218,27 @@ outputs: topic: events ``` +### kafka server output with SCRAM-SHA-512 authentication example + +```yaml +name: example-instance-005 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink-scram + type: kafka + config: + brokers: kafka.example.com:9094 + topic: events + tls: true + sasl: + mechanism: SCRAM-SHA-512 + user: xatu-client + password: "${KAFKA_PASSWORD}" +``` + ### Complex example with multiple outputs example ```yaml diff --git a/docs/sentry.md b/docs/sentry.md index 30fab8db2..aa6416c02 100644 --- a/docs/sentry.md +++ b/docs/sentry.md @@ -96,6 +96,11 @@ Output configuration to send sentry events to a kafka server. | outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | | outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | | outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | +| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. | +| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. | +| outputs[].config.sasl.user | string | | | SASL username. | +| outputs[].config.sasl.password | string | | | SASL password. | +| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). | ### Simple example @@ -158,6 +163,27 @@ outputs: topic: events ``` +### kafka server output with SCRAM-SHA-512 authentication example + +```yaml +name: example-instance-005 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink-scram + type: kafka + config: + brokers: kafka.example.com:9094 + topic: events + tls: true + sasl: + mechanism: SCRAM-SHA-512 + user: xatu-client + password: "${KAFKA_PASSWORD}" +``` + ### Complex example with multiple outputs example ```yaml diff --git a/go.mod b/go.mod index f3b77de27..17de79446 100644 --- a/go.mod +++ b/go.mod @@ -238,6 +238,9 @@ require ( github.com/tklauser/numcpus v0.10.0 // indirect github.com/urfave/cli/v2 v2.27.7 // indirect github.com/wlynxg/anet v0.0.5 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.2.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect diff --git a/go.sum b/go.sum index 91a8e3512..90f27df78 100644 --- a/go.sum +++ b/go.sum @@ -765,6 +765,12 @@ github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs= +github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= @@ -947,6 +953,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go index 83d5692bb..53d1a52cc 100644 --- a/pkg/output/kafka/client.go +++ b/pkg/output/kafka/client.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/IBM/sarama" + "github.com/xdg-go/scram" ) type CompressionStrategy string @@ -45,6 +46,30 @@ var ( SASLTypeGSSAPI SASLMechanism = "GSSAPI" ) +// scramClient implements sarama.SCRAMClient for SCRAM authentication +type scramClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (sc *scramClient) Begin(userName, password, authzID string) (err error) { + sc.Client, err = sc.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + sc.ClientConversation = sc.Client.NewConversation() + return nil +} + +func (sc *scramClient) Step(challenge string) (response string, err error) { + return sc.ClientConversation.Step(challenge) +} + +func (sc *scramClient) Done() bool { + return sc.ClientConversation.Done() +} + func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { producerConfig, err := Init(config) if err != nil { @@ -116,8 +141,14 @@ func Init(config *Config) (*sarama.Config, error) { c.Net.SASL.Mechanism = sarama.SASLTypeOAuth case SASLTypeSCRAMSHA256: c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + c.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: scram.SHA256} + } case SASLTypeSCRAMSHA512: c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + c.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &scramClient{HashGeneratorFcn: scram.SHA512} + } case SASLTypeGSSAPI: c.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI default: