-
Notifications
You must be signed in to change notification settings - Fork 31
Support for stream processing tools such as Kafka and NSQ #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| case message := <-c.broadcastChan: | ||
| if !c.isConnected { | ||
| continue | ||
| } | ||
|
|
||
| _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) | ||
|
|
||
| c.conn.Broker() | ||
| _, err := c.conn.WriteMessages( | ||
| kafka.Message{Value: message}, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we receive a lot of certs every second (3/second is not a bad guess), is it perhaps more efficient and less resource intensive to bulk write messages to kafka ?
example:
_, err = conn.WriteMessages(
kafka.Message{Value: message1},
kafka.Message{Value: message2},
kafka.Message{Value: message3},
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about bulk submissions before. We could buffer the certs for, say, a second and then send out all received certs. With the CT logs from the Google loglist, Certstream itself processes around 250-300 certs per second in total. I have not tried this but there might be problems with this approach as submission of the ~300 or more certificates could take longer than our "buffering time" of 1s.
Another approach could be to always wait until we collected, say, 50 certificates before sending them over to kafka. The issue with that approach is the added delay in case a monitored CA does not release that many certificates. I'm interested in your thoughts though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 300/Sec means that we should strongly consider buffering certs before pushing them to kafka.
- I'm in favour of the second approach, we could buffer 50 certs but we need not wait until we have all 50, we could have a timer/ticker that fires at certain intervals pushing all available certs in the buffer to kafka. (basically a combination of the first and second approach that you outlined)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case (with or without buffering) do you thing we might end up skipping a lot of certs due to the channel being full ? (this might depend on how large certBufferSize is)
| kc := &KafkaClient{ | ||
| conn: conn, | ||
| addr: addr, | ||
| topic: topic, | ||
| BaseClient: BaseClient{ | ||
| broadcastChan: make(chan []byte, certBufferSize), | ||
| stopChan: make(chan struct{}), | ||
| name: name, | ||
| subType: subType, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we send a lot of json data to kafka, which would consume a non significant amount of bandwidth, should we perhaps consider turning on compression for kafka messages (could be a optional setting).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we perhaps consider turning on compression for kafka messages (could be a optional setting).
Sounds like a good idea. Should be easily achievable, too: https://pkg.go.dev/github.com/segmentio/kafka-go#readme-compression
refers to #23 and #35 I needed to change the config to an arrary in order to have proper support for multiple kinds of queues and stream processors. Currently NSQ and kafka are supported. Future releases could support amazon SNS/SQS by creating a new client in the broadcast package and by extending the config.
|
@d-Rickyy-b We couldn't find any modification to the CHANGELOG.md file. If your changes are not suitable for the changelog, that's fine. Otherwise please add them to the changelog! |
|
I rebased the commits onto master to have all the latest in-progress features in the kafka branch as well |
This pull request introduce architectural improvements and enhancements to the broadcast system, adds support for external stream processing (Kafka and NSQ), updates configuration and dependencies, and refines linter and build settings. The main focus is on refactoring the certificate broadcast mechanism to be more modular and extensible, enabling integration with various client types and external systems.
Broadcast System Refactor and Extensibility
Dispatcher(formerlyBroadcastManager) ininternal/broadcast/broadcastmanager.go, which now manages clients via aCertProcessorinterface, allowing for different client implementations (e.g., WebSocket, Kafka, NSQ). The dispatcher handles registration, unregistration, message dispatching, and client statistics.BaseClientstruct to encapsulate common client logic (buffering, skipping, naming, etc.), which can be embedded in different client types.Configuration and External Integration
config.yamlto support external stream processing tools (Kafka and NSQ), added options for real client IP handling, and improved buffer size configuration (renamedbroadcastmanagerbuffer todispatcher).Dependency and Build Updates
Linter and Code Quality Enhancements
.golangci.ymlto use the new v2 format, refined enabled linters, moved formatter settings, and improved exclusion rules for test files.Bug Fixes and Minor Improvements
cmd/certpicker/main.goby properly canceling the context after use..golangci.ymlto v2, refined enabled linters, and improved formatter and exclusion settings.