Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/cannon.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ Output configuration to send sentry events to a kafka server.
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |
| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. |
| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. |
| outputs[].config.sasl.user | string | | | SASL username. |
| outputs[].config.sasl.password | string | | | SASL password. |
| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). |

### Simple example

Expand Down Expand Up @@ -188,6 +193,27 @@ outputs:
brokers: localhost:19092
topic: events
```

### kafka server output with SCRAM-SHA-512 authentication example

```yaml
name: example-instance-005

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink-scram
type: kafka
config:
brokers: kafka.example.com:9094
topic: events
tls: true
sasl:
mechanism: SCRAM-SHA-512
user: xatu-client
password: "${KAFKA_PASSWORD}"
```
### Complex example with multiple outputs example

```yaml
Expand Down
26 changes: 26 additions & 0 deletions docs/mimicry.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ Output configuration to send sentry events to a kafka server.
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |
| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. |
| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. |
| outputs[].config.sasl.user | string | | | SASL username. |
| outputs[].config.sasl.password | string | | | SASL password. |
| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). |

### Simple example

Expand Down Expand Up @@ -213,6 +218,27 @@ outputs:
topic: events
```

### kafka server output with SCRAM-SHA-512 authentication example

```yaml
name: example-instance-005

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink-scram
type: kafka
config:
brokers: kafka.example.com:9094
topic: events
tls: true
sasl:
mechanism: SCRAM-SHA-512
user: xatu-client
password: "${KAFKA_PASSWORD}"
```

### Complex example with multiple outputs example

```yaml
Expand Down
26 changes: 26 additions & 0 deletions docs/sentry.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Output configuration to send sentry events to a kafka server.
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |
| outputs[].config.tls | bool | `false` | | Enable TLS for Kafka connection. |
| outputs[].config.sasl.mechanism | string | `PLAIN` | `PLAIN` `SCRAM-SHA-256` `SCRAM-SHA-512` `OAUTHBEARER` `GSSAPI` | SASL mechanism to use for authentication. |
| outputs[].config.sasl.user | string | | | SASL username. |
| outputs[].config.sasl.password | string | | | SASL password. |
| outputs[].config.sasl.passwordFile | string | | | Path to file containing SASL password (alternative to password). |

### Simple example

Expand Down Expand Up @@ -158,6 +163,27 @@ outputs:
topic: events
```

### kafka server output with SCRAM-SHA-512 authentication example

```yaml
name: example-instance-005

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink-scram
type: kafka
config:
brokers: kafka.example.com:9094
topic: events
tls: true
sasl:
mechanism: SCRAM-SHA-512
user: xatu-client
password: "${KAFKA_PASSWORD}"
```

### Complex example with multiple outputs example

```yaml
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ require (
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/urfave/cli/v2 v2.27.7 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.2.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU=
github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs=
github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos=
github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg=
Expand Down Expand Up @@ -947,6 +953,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
Expand Down
31 changes: 31 additions & 0 deletions pkg/output/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/IBM/sarama"
"github.com/xdg-go/scram"
)

type CompressionStrategy string
Expand Down Expand Up @@ -45,6 +46,30 @@ var (
SASLTypeGSSAPI SASLMechanism = "GSSAPI"
)

// scramClient implements sarama.SCRAMClient for SCRAM authentication
type scramClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (sc *scramClient) Begin(userName, password, authzID string) (err error) {
sc.Client, err = sc.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
sc.ClientConversation = sc.Client.NewConversation()
return nil
}

func (sc *scramClient) Step(challenge string) (response string, err error) {
return sc.ClientConversation.Step(challenge)
}

func (sc *scramClient) Done() bool {
return sc.ClientConversation.Done()
}

func NewSyncProducer(config *Config) (sarama.SyncProducer, error) {
producerConfig, err := Init(config)
if err != nil {
Expand Down Expand Up @@ -116,8 +141,14 @@ func Init(config *Config) (*sarama.Config, error) {
c.Net.SASL.Mechanism = sarama.SASLTypeOAuth
case SASLTypeSCRAMSHA256:
c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
c.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: scram.SHA256}
}
case SASLTypeSCRAMSHA512:
c.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
c.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &scramClient{HashGeneratorFcn: scram.SHA512}
}
case SASLTypeGSSAPI:
c.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
default:
Expand Down