From dc2855afc4e8867994b983b9606c058ec460f716 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Wed, 3 Sep 2025 20:58:12 +0200 Subject: [PATCH 01/14] Initial commit --- .gitignore | 4 ++-- api.go | 11 +++++++++++ bench_test.go | 1 + config.go | 1 + errors.go | 1 + example_test.go | 1 + go.mod | 3 +++ internal/bus.go | 1 + internal/fanout.go | 0 internal/metrics.go | 0 internal/publish.go | 0 internal/state.go | 0 internal/subscribe.go | 0 stats.go | 1 + thebus_test.go | 1 + 15 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 api.go create mode 100644 bench_test.go create mode 100644 config.go create mode 100644 errors.go create mode 100644 example_test.go create mode 100644 go.mod create mode 100644 internal/bus.go create mode 100644 internal/fanout.go create mode 100644 internal/metrics.go create mode 100644 internal/publish.go create mode 100644 internal/state.go create mode 100644 internal/subscribe.go create mode 100644 stats.go create mode 100644 thebus_test.go diff --git a/.gitignore b/.gitignore index aaadf73..ed20272 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,5 @@ go.work.sum .env # Editor/IDE -# .idea/ -# .vscode/ +.idea/ +.vscode/ diff --git a/api.go b/api.go new file mode 100644 index 0000000..877230a --- /dev/null +++ b/api.go @@ -0,0 +1,11 @@ +package thebus + +import "context" + +type Bus interface { + Publish(topic string, data []byte) (PublishAck, error) + Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) + Unsubscribe(topic string, subscriberID string) error + Close() error + Stats() (StatsResults, error) +} diff --git a/bench_test.go b/bench_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/bench_test.go @@ -0,0 +1 @@ +package thebus diff --git a/config.go b/config.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/config.go @@ -0,0 +1 @@ +package thebus diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/errors.go @@ -0,0 +1 @@ +package thebus diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/example_test.go @@ -0,0 +1 @@ +package thebus diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9462ab1 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/sebundefined/thebus + +go 1.24 diff --git a/internal/bus.go b/internal/bus.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/bus.go @@ -0,0 +1 @@ +package internal diff --git a/internal/fanout.go b/internal/fanout.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/metrics.go b/internal/metrics.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/publish.go b/internal/publish.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/state.go b/internal/state.go new file mode 100644 index 0000000..e69de29 diff --git a/internal/subscribe.go b/internal/subscribe.go new file mode 100644 index 0000000..e69de29 diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/stats.go @@ -0,0 +1 @@ +package thebus diff --git a/thebus_test.go b/thebus_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/thebus_test.go @@ -0,0 +1 @@ +package thebus From 9c804b57a43e0f217102d39677fb9cf7764d9ad3 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Wed, 3 Sep 2025 21:15:19 +0200 Subject: [PATCH 02/14] Adding README.md structure. --- README.md | 32 ++++++++++++++++++++++++++++++++ internal/fanout.go | 1 + internal/metrics.go | 1 + internal/publish.go | 1 + internal/state.go | 1 + internal/subscribe.go | 1 + slog/logger.go | 3 +++ 7 files changed, 40 insertions(+) create mode 100644 slog/logger.go diff --git a/README.md b/README.md index 68a9527..1f2568f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,34 @@ +[![Go Reference](https://pkg.go.dev/badge/github.com/sebundefined/thebus/v1.svg)](https://pkg.go.dev/github.com/sebundefined/thebus/v1) +[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/ci.yml) + # thebus thebus is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. + +## Getting started + +Just install **thebus** in your project by using the following command. + +```shell +go get -u github.com/sebundefined/thebus +``` +## Example Usage + +```go +package main + + +``` + + +## Features + + +## Why would you choose a thebus for your app ? + +## Testing + +See [CONTRIBUTING.md](./CONTRIBUTING.md) for instructions. + +## Versioning + + diff --git a/internal/fanout.go b/internal/fanout.go index e69de29..5bf0569 100644 --- a/internal/fanout.go +++ b/internal/fanout.go @@ -0,0 +1 @@ +package internal diff --git a/internal/metrics.go b/internal/metrics.go index e69de29..5bf0569 100644 --- a/internal/metrics.go +++ b/internal/metrics.go @@ -0,0 +1 @@ +package internal diff --git a/internal/publish.go b/internal/publish.go index e69de29..5bf0569 100644 --- a/internal/publish.go +++ b/internal/publish.go @@ -0,0 +1 @@ +package internal diff --git a/internal/state.go b/internal/state.go index e69de29..5bf0569 100644 --- a/internal/state.go +++ b/internal/state.go @@ -0,0 +1 @@ +package internal diff --git a/internal/subscribe.go b/internal/subscribe.go index e69de29..5bf0569 100644 --- a/internal/subscribe.go +++ b/internal/subscribe.go @@ -0,0 +1 @@ +package internal diff --git a/slog/logger.go b/slog/logger.go new file mode 100644 index 0000000..9d4a564 --- /dev/null +++ b/slog/logger.go @@ -0,0 +1,3 @@ +package slog + +type Logger interface{} From 800a036f85fb7b05e9b2ce08c2ecabcc23897b78 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Wed, 3 Sep 2025 21:20:08 +0200 Subject: [PATCH 03/14] gitkeep for directory structure saved on GH --- prom/.gitkeep | 0 testkit/.gitkeep | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 prom/.gitkeep create mode 100644 testkit/.gitkeep diff --git a/prom/.gitkeep b/prom/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/testkit/.gitkeep b/testkit/.gitkeep new file mode 100644 index 0000000..e69de29 From 81722b02738e9b06a0a80c2fe82451b7df6feb71 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Thu, 4 Sep 2025 07:55:10 +0200 Subject: [PATCH 04/14] adding basic configuration for thebus --- api.go | 13 ++++ config.go | 181 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+) diff --git a/api.go b/api.go index 877230a..09da978 100644 --- a/api.go +++ b/api.go @@ -9,3 +9,16 @@ type Bus interface { Close() error Stats() (StatsResults, error) } + +type Logger interface { + Info(msg string, kv ...any) + Error(msg string, kv ...any) + Debug(msg string, kv ...any) +} + +type MetricsHooks interface { + IncPublished(topic string) + IncDelivered(topic string) + IncDropped(topic string) + IncFailed(topic string) +} diff --git a/config.go b/config.go index a32a718..035a9df 100644 --- a/config.go +++ b/config.go @@ -1 +1,182 @@ package thebus + +import ( + "encoding/json" + "fmt" + "slices" + "strings" + "time" +) + +// ############################################################################## +// ################################## ENUM ################################## +// ############################################################################## + +// SubscriptionStrategy define if the payload must be +// shared by the subscribers (SubscriptionStrategyPayloadShared, the default) +// or copied (SubscriptionStrategyPayloadClonedPerSubscriber) +type SubscriptionStrategy string + +const ( + SubscriptionStrategyUnknown SubscriptionStrategy = "UNKNOWN" + SubscriptionStrategyPayloadShared SubscriptionStrategy = "PAYLOAD_SHARED" + SubscriptionStrategyPayloadClonedPerSubscriber SubscriptionStrategy = "PAYLOAD_CLONED_PER_SUBSCRIBER" +) + +func (enum SubscriptionStrategy) String() string { + if len(strings.TrimSpace(string(enum))) == 0 { + return string(SubscriptionStrategyUnknown) + } + return string(enum) +} + +func SubscriptionStrategyValues() []SubscriptionStrategy { + return []SubscriptionStrategy{ + SubscriptionStrategyPayloadShared, + SubscriptionStrategyPayloadClonedPerSubscriber, + } +} + +func (enum SubscriptionStrategy) IsValid() bool { + if slices.Contains(SubscriptionStrategyValues(), enum) { + return true + } + return false +} + +func (enum SubscriptionStrategy) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, enum)), nil +} + +func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error { + var tmp string + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + fs := SubscriptionStrategy(tmp) + if !fs.IsValid() { + fs = SubscriptionStrategyUnknown + } + *enum = fs + return nil +} + +// ############################################################################## +// ################################ CONFIG ################################## +// ############################################################################## + +// Config is the main configuration for thebus +type Config struct { + // Topics / queues + TopicQueueSize int // default: 1024 + AutoDeleteEmptyTopics bool // default: true + TopicIdleTTL time.Duration // if Janitor enabled (0 = off) + JanitorInterval time.Duration // 0 = off + + // Default for subscribers (peuvent être override par Subscribe options) + DefaultSubBufferSize int // default: 128 + DefaultSendTimeout time.Duration // default: 200ms + DefaultDropIfFull bool // default: true + DefaultStrategy SubscriptionStrategy // default: SubscriptionStrategyPayloadShared + + // Limits (0 = unlimited) + MaxTopics int // default : 0 (unlimited) + MaxSubscribersPerTopic int // default : 0 (unlimited) + + // Observability + Logger Logger // default: nopLogger + Metrics MetricsHooks // default: nopMetrics + PanicHandler func(topic string, v any) // default: nil (no recover) +} + +func DefaultConfig() *Config { + return &Config{ + TopicQueueSize: 1024, + AutoDeleteEmptyTopics: true, + TopicIdleTTL: 0 * time.Second, + JanitorInterval: 0 * time.Second, + DefaultSubBufferSize: 128, + DefaultSendTimeout: 200 * time.Millisecond, + DefaultDropIfFull: true, + DefaultStrategy: SubscriptionStrategyPayloadShared, + } +} + +// ############################################################################## +// ############################### OPTIONS ################################## +// ############################################################################## + +type Option func(*Config) + +func WithTopicQueueSize(size int) Option { + return func(cfg *Config) { + cfg.TopicQueueSize = size + } +} + +func WithAutoDeleteEmptyTopics() Option { + return func(cfg *Config) { + cfg.AutoDeleteEmptyTopics = true + } +} + +func WithTopicIdleTTL(ttl time.Duration) Option { + return func(cfg *Config) { + cfg.TopicIdleTTL = ttl + } +} + +func WithJanitorInterval(interval time.Duration) Option { + return func(cfg *Config) { + cfg.JanitorInterval = interval + } +} + +func WithDefaultSubBufferSize(size int) Option { + return func(cfg *Config) { + cfg.DefaultSubBufferSize = size + } +} + +func WithDefaultSendTimeout(timeout time.Duration) Option { + return func(cfg *Config) { + cfg.DefaultSendTimeout = timeout + } +} + +func WithDefaultDropIfFull() Option { + return func(cfg *Config) { + cfg.DefaultDropIfFull = true + } +} + +func WithDefaultStrategy(strategy SubscriptionStrategy) Option { + return func(cfg *Config) { + cfg.DefaultStrategy = strategy + } +} +func WithLogger(logger Logger) Option { + return func(cfg *Config) { + cfg.Logger = logger + } +} + +func WithMetrics(metrics MetricsHooks) Option { + return func(cfg *Config) { + cfg.Metrics = metrics + } +} + +func WithPanicHandler(handler func(topic string, v any)) Option { + return func(cfg *Config) { + cfg.PanicHandler = handler + } +} + +func BuildConfig(opts ...Option) *Config { + cfg := DefaultConfig() + for _, opt := range opts { + opt(cfg) + } + return cfg +} From cc0244584d91ca83424455a0a340571f8a31843f Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Thu, 4 Sep 2025 08:17:15 +0200 Subject: [PATCH 05/14] adding override config per subscription --- api.go | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- config.go | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/api.go b/api.go index 09da978..3a9395b 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,9 @@ package thebus -import "context" +import ( + "context" + "time" +) type Bus interface { Publish(topic string, data []byte) (PublishAck, error) @@ -10,6 +13,64 @@ type Bus interface { Stats() (StatsResults, error) } +// ############################################################################## +// ############################### PUB/SUB ################################## +// ############################################################################## + +// PublishAck is returned when your client Publish a message on a topic. +// It returns the Topic, if the message is Enqueued or not and the number of +// subscribers. Note that Subscribers is a snapshot when published. +// it may not reflect the real subscribers count +type PublishAck struct { + Topic string + Enqueued bool + Subscribers int +} + +type Subscription interface { + GetID() string + GetTopic() string + Read() <-chan Message + Unsubscribe() error +} + +type Message struct { + Topic string + Timestamp time.Time + Payload []byte + Seq uint64 +} + +// ############################################################################## +// ################################# STATS ################################## +// ############################################################################## + +type Counters struct { + Published uint64 + Delivered uint64 + Failed uint64 + Dropped uint64 +} + +type StatsResults struct { + StartedAt time.Time + Open bool + Topics int + Subscribers int + Totals Counters + PerTopic map[string]TopicStats +} + +type TopicStats struct { + Subscribers int + Buffered int + Counters +} + +// ############################################################################## +// ############################# THIRD PARTY ################################ +// ############################################################################## + type Logger interface { Info(msg string, kv ...any) Error(msg string, kv ...any) diff --git a/config.go b/config.go index 035a9df..d108b70 100644 --- a/config.go +++ b/config.go @@ -102,6 +102,22 @@ func DefaultConfig() *Config { } } +type SubscriptionConfig struct { + Strategy SubscriptionStrategy + BufferSize int + SendTimeout time.Duration + DropIfFull bool +} + +func DefaultSubscriptionConfig() SubscriptionConfig { + return SubscriptionConfig{ + BufferSize: 128, + SendTimeout: 200 * time.Millisecond, + DropIfFull: true, + Strategy: SubscriptionStrategyPayloadShared, + } +} + // ############################################################################## // ############################### OPTIONS ################################## // ############################################################################## @@ -180,3 +196,32 @@ func BuildConfig(opts ...Option) *Config { } return cfg } + +// SubscribeOption - +type SubscribeOption func(subCfg *SubscriptionConfig) + +func WithStrategy(strategy SubscriptionStrategy) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.Strategy = strategy + } +} + +func WithBufferSize(bufferSize int) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + if bufferSize < 1 { + return + } + subCfg.BufferSize = bufferSize + } +} +func WithSendTimeout(timeout time.Duration) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.SendTimeout = timeout + } +} + +func WithDropIfFull(dropIfFull bool) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.DropIfFull = dropIfFull + } +} From 3f8adb0078a77278ecbcfa18e7ca9053028e6890 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Thu, 4 Sep 2025 08:49:48 +0200 Subject: [PATCH 06/14] Moving implementation to root dir... Cyclic import predicted... :( --- bus.go | 1 + fanout.go | 1 + internal/bus.go | 1 - internal/fanout.go | 1 - internal/metrics.go | 1 - internal/publish.go | 1 - internal/state.go | 1 - internal/subscribe.go | 1 - metrics.go | 1 + publish.go | 1 + state.go | 1 + subscribe.go | 1 + 12 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 bus.go create mode 100644 fanout.go delete mode 100644 internal/bus.go delete mode 100644 internal/fanout.go delete mode 100644 internal/metrics.go delete mode 100644 internal/publish.go delete mode 100644 internal/state.go delete mode 100644 internal/subscribe.go create mode 100644 metrics.go create mode 100644 publish.go create mode 100644 state.go create mode 100644 subscribe.go diff --git a/bus.go b/bus.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/bus.go @@ -0,0 +1 @@ +package thebus diff --git a/fanout.go b/fanout.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/fanout.go @@ -0,0 +1 @@ +package thebus diff --git a/internal/bus.go b/internal/bus.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/bus.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/internal/fanout.go b/internal/fanout.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/fanout.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/internal/metrics.go b/internal/metrics.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/metrics.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/internal/publish.go b/internal/publish.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/publish.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/internal/state.go b/internal/state.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/state.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/internal/subscribe.go b/internal/subscribe.go deleted file mode 100644 index 5bf0569..0000000 --- a/internal/subscribe.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/metrics.go @@ -0,0 +1 @@ +package thebus diff --git a/publish.go b/publish.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/publish.go @@ -0,0 +1 @@ +package thebus diff --git a/state.go b/state.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/state.go @@ -0,0 +1 @@ +package thebus diff --git a/subscribe.go b/subscribe.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/subscribe.go @@ -0,0 +1 @@ +package thebus From 8f0830564801b9a3529d5ca8f5c6fb3305b0b099 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Thu, 4 Sep 2025 09:04:21 +0200 Subject: [PATCH 07/14] TODO find a way to avoid dependecy to google uuid. Try to avoid any dependecy in this lib... --- api.go | 72 ---------------------------------------------------- logger.go | 7 +++++ message.go | 10 ++++++++ metrics.go | 7 +++++ publish.go | 10 ++++++++ stats.go | 34 +++++++++++++++++++++++++ subscribe.go | 34 +++++++++++++++++++++++++ 7 files changed, 102 insertions(+), 72 deletions(-) create mode 100644 logger.go create mode 100644 message.go diff --git a/api.go b/api.go index 3a9395b..60bc999 100644 --- a/api.go +++ b/api.go @@ -2,7 +2,6 @@ package thebus import ( "context" - "time" ) type Bus interface { @@ -12,74 +11,3 @@ type Bus interface { Close() error Stats() (StatsResults, error) } - -// ############################################################################## -// ############################### PUB/SUB ################################## -// ############################################################################## - -// PublishAck is returned when your client Publish a message on a topic. -// It returns the Topic, if the message is Enqueued or not and the number of -// subscribers. Note that Subscribers is a snapshot when published. -// it may not reflect the real subscribers count -type PublishAck struct { - Topic string - Enqueued bool - Subscribers int -} - -type Subscription interface { - GetID() string - GetTopic() string - Read() <-chan Message - Unsubscribe() error -} - -type Message struct { - Topic string - Timestamp time.Time - Payload []byte - Seq uint64 -} - -// ############################################################################## -// ################################# STATS ################################## -// ############################################################################## - -type Counters struct { - Published uint64 - Delivered uint64 - Failed uint64 - Dropped uint64 -} - -type StatsResults struct { - StartedAt time.Time - Open bool - Topics int - Subscribers int - Totals Counters - PerTopic map[string]TopicStats -} - -type TopicStats struct { - Subscribers int - Buffered int - Counters -} - -// ############################################################################## -// ############################# THIRD PARTY ################################ -// ############################################################################## - -type Logger interface { - Info(msg string, kv ...any) - Error(msg string, kv ...any) - Debug(msg string, kv ...any) -} - -type MetricsHooks interface { - IncPublished(topic string) - IncDelivered(topic string) - IncDropped(topic string) - IncFailed(topic string) -} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..d07930b --- /dev/null +++ b/logger.go @@ -0,0 +1,7 @@ +package thebus + +type Logger interface { + Info(msg string, kv ...any) + Error(msg string, kv ...any) + Debug(msg string, kv ...any) +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..7a77222 --- /dev/null +++ b/message.go @@ -0,0 +1,10 @@ +package thebus + +import "time" + +type Message struct { + Topic string + Timestamp time.Time + Payload []byte + Seq uint64 +} diff --git a/metrics.go b/metrics.go index a32a718..88cd026 100644 --- a/metrics.go +++ b/metrics.go @@ -1 +1,8 @@ package thebus + +type MetricsHooks interface { + IncPublished(topic string) + IncDelivered(topic string) + IncDropped(topic string) + IncFailed(topic string) +} diff --git a/publish.go b/publish.go index a32a718..ed6d5e7 100644 --- a/publish.go +++ b/publish.go @@ -1 +1,11 @@ package thebus + +// PublishAck is returned when your client Publish a message on a topic. +// It returns the Topic, if the message is Enqueued or not and the number of +// subscribers. Note that Subscribers is a snapshot when published. +// it may not reflect the real subscribers count +type PublishAck struct { + Topic string + Enqueued bool + Subscribers int +} diff --git a/stats.go b/stats.go index a32a718..b226e97 100644 --- a/stats.go +++ b/stats.go @@ -1 +1,35 @@ package thebus + +import ( + "sync/atomic" + "time" +) + +type Counters struct { + Published uint64 + Delivered uint64 + Failed uint64 + Dropped uint64 +} + +type StatsResults struct { + StartedAt time.Time + Open bool + Topics int + Subscribers int + Totals Counters + PerTopic map[string]TopicStats +} + +type TopicStats struct { + Subscribers int + Buffered int + Counters +} + +type atomicCounters struct { + Published atomic.Uint64 + Delivered atomic.Uint64 + Failed atomic.Uint64 + Dropped atomic.Uint64 +} diff --git a/subscribe.go b/subscribe.go index a32a718..aee5228 100644 --- a/subscribe.go +++ b/subscribe.go @@ -1 +1,35 @@ package thebus + +type Subscription interface { + GetID() string + GetTopic() string + Read() <-chan Message + Unsubscribe() error +} + +type subscription struct { + subscriptionID uuid.UUID + cfg SubscriptionConfig + topic string + messageChan chan Message + messages <-chan Message + unsubscribeFunc func() error +} + +var _ Subscription = (*subscription)(nil) + +func (s *subscription) GetID() string { + return s.subscriptionID.String() +} + +func (s *subscription) GetTopic() string { + return s.topic +} + +func (s *subscription) Read() <-chan Message { + return s.messages +} + +func (s *subscription) Unsubscribe() error { + return s.unsubscribeFunc() +} From 1e9e5140c579236f75730cf3d8d6eee995de1560 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Thu, 4 Sep 2025 09:05:24 +0200 Subject: [PATCH 08/14] TODO find a way to avoid dependecy to google uuid. Try to avoid any dependecy in this lib... --- id.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 id.go diff --git a/id.go b/id.go new file mode 100644 index 0000000..85d518b --- /dev/null +++ b/id.go @@ -0,0 +1,7 @@ +package thebus + +type ID string + +func NewID() ID { + return ID("") +} From 3e0159a44a2b3a223a8638bf8a825b364b81d7ce Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Sat, 6 Sep 2025 08:56:18 +0200 Subject: [PATCH 09/14] Adding a simple id generator and change the package to github.com/sebundefined/thebus/v1 for versioning --- config.go | 53 ++++-------------------- error.go | 1 + go.mod | 2 +- id.go | 114 +++++++++++++++++++++++++++++++++++++++++++++++++-- id_test.go | 19 +++++++++ subscribe.go | 51 ++++++++++++++++++++++- 6 files changed, 189 insertions(+), 51 deletions(-) create mode 100644 error.go create mode 100644 id_test.go diff --git a/config.go b/config.go index d108b70..ccef564 100644 --- a/config.go +++ b/config.go @@ -72,6 +72,7 @@ type Config struct { AutoDeleteEmptyTopics bool // default: true TopicIdleTTL time.Duration // if Janitor enabled (0 = off) JanitorInterval time.Duration // 0 = off + IDGenerator IDGenerator // default to DefaultIDGenerator // Default for subscribers (peuvent être override par Subscribe options) DefaultSubBufferSize int // default: 128 @@ -99,22 +100,7 @@ func DefaultConfig() *Config { DefaultSendTimeout: 200 * time.Millisecond, DefaultDropIfFull: true, DefaultStrategy: SubscriptionStrategyPayloadShared, - } -} - -type SubscriptionConfig struct { - Strategy SubscriptionStrategy - BufferSize int - SendTimeout time.Duration - DropIfFull bool -} - -func DefaultSubscriptionConfig() SubscriptionConfig { - return SubscriptionConfig{ - BufferSize: 128, - SendTimeout: 200 * time.Millisecond, - DropIfFull: true, - Strategy: SubscriptionStrategyPayloadShared, + IDGenerator: DefaultIDGenerator, } } @@ -189,6 +175,12 @@ func WithPanicHandler(handler func(topic string, v any)) Option { } } +func (cfg *Config) WithIDGenerator(IDGenerator IDGenerator) Option { + return func(cfg *Config) { + cfg.IDGenerator = IDGenerator + } +} + func BuildConfig(opts ...Option) *Config { cfg := DefaultConfig() for _, opt := range opts { @@ -196,32 +188,3 @@ func BuildConfig(opts ...Option) *Config { } return cfg } - -// SubscribeOption - -type SubscribeOption func(subCfg *SubscriptionConfig) - -func WithStrategy(strategy SubscriptionStrategy) SubscribeOption { - return func(subCfg *SubscriptionConfig) { - subCfg.Strategy = strategy - } -} - -func WithBufferSize(bufferSize int) SubscribeOption { - return func(subCfg *SubscriptionConfig) { - if bufferSize < 1 { - return - } - subCfg.BufferSize = bufferSize - } -} -func WithSendTimeout(timeout time.Duration) SubscribeOption { - return func(subCfg *SubscriptionConfig) { - subCfg.SendTimeout = timeout - } -} - -func WithDropIfFull(dropIfFull bool) SubscribeOption { - return func(subCfg *SubscriptionConfig) { - subCfg.DropIfFull = dropIfFull - } -} diff --git a/error.go b/error.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/error.go @@ -0,0 +1 @@ +package thebus diff --git a/go.mod b/go.mod index 9462ab1..38129c9 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/sebundefined/thebus +module github.com/sebundefined/thebus/v1 go 1.24 diff --git a/id.go b/id.go index 85d518b..ca681d4 100644 --- a/id.go +++ b/id.go @@ -1,7 +1,115 @@ package thebus -type ID string +import ( + "crypto/rand" + "sync" + "time" +) -func NewID() ID { - return ID("") +// IDGenerator - +type IDGenerator func() string + +func init() { + _, err := rand.Read(stateIDGen.entropy[:]) + if err != nil { + panic(err) + } +} + +// crockfordAlphabet see https://www.baeldung.com/cs/crockfords-base32-encoding +// for a simple article about it +const crockfordAlphabet = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" + +// stateIDGenerator - allows me to keep track of the lastTs and +// generate the entropy again in case we change millisecond +type stateIDGenerator struct { + mu sync.Mutex + lastTs int64 + entropy [10]byte +} + +// next generate again the entropy if the ms increased. +// Get the latest if decrease and increase it if we are in the same ms +func (s *stateIDGenerator) next() (ts int64, e [10]byte) { + now := time.Now().UnixMilli() + if now > s.lastTs { + s.lastTs = now + _, err := rand.Read(s.entropy[:]) + if err != nil { + panic(err) + } + } else if now == s.lastTs { + incrementEntropyBE(&s.entropy) + } else { + now = s.lastTs + incrementEntropyBE(&s.entropy) + } + return now, s.entropy +} + +// stateIDGen - global var for the state +// see init for the default entropy +var stateIDGen = &stateIDGenerator{ + lastTs: time.Now().UnixMilli(), +} + +// DefaultIDGenerator - generate a new uniq id. +// See WithIDGenerator Option if you want to include your own generator (ex: uuid.UUID from Google) +func DefaultIDGenerator() string { + stateIDGen.mu.Lock() + ts, e := stateIDGen.next() + stateIDGen.mu.Unlock() + return encodeULIDLike(ts, e) +} + +// incrementEntropyBE increment the entropy. +// See it as a km/miles counter on a car. The latest number on the counter is increased +// each km/miles. If the increase operation result is 0, it means we have to increase the next number (index - 1) +func incrementEntropyBE(e *[10]byte) { + for i := 9; i >= 0; i-- { + e[i]++ + if e[i] != 0 { + return + } + } +} + +func encodeULIDLike(tsMillis int64, entropy [10]byte) string { + buf := [26]byte{} + writeTimestamp48ToBase32(uint64(tsMillis), buf[0:10]) + writeEntropy80ToBase32(entropy, buf[10:26]) + return string(buf[:]) +} + +// writeTimestamp48ToBase32 +func writeTimestamp48ToBase32(ts uint64, out []byte) { + if len(out) != 10 { + panic("writeTimestamp48ToBase32: out must be len 10") + } + shifts := [...]uint{45, 40, 35, 30, 25, 20, 15, 10, 5, 0} + for i, sh := range shifts { + idx := byte((ts >> sh) & 0x1F) // 0x1F = 5 bits + out[i] = crockfordAlphabet[idx] + } +} + +func writeEntropy80ToBase32(e [10]byte, out []byte) { + if len(out) != 16 { + panic("writeEntropy80ToBase32: out must be len 16") + } + var acc uint64 + bits := 0 + pos := 0 + + for i := 0; i < len(e); i++ { + acc = (acc << 8) | uint64(e[i]) // on pousse 8 bits + bits += 8 + // tant qu'on a au moins 5 bits en stock, on sort 1 char + for bits >= 5 { + idx := byte((acc >> uint(bits-5)) & 0x1F) // prend les 5 bits de tête + out[pos] = crockfordAlphabet[idx] + pos++ + bits -= 5 + } + } } diff --git a/id_test.go b/id_test.go new file mode 100644 index 0000000..0957d1f --- /dev/null +++ b/id_test.go @@ -0,0 +1,19 @@ +package thebus + +import ( + "testing" +) + +func TestDefaultIDGenerator(t *testing.T) { + var ids []string + for i := 0; i < 100; i++ { + ids = append(ids, DefaultIDGenerator()) + } + uniqValues := make(map[string]bool) + for _, id := range ids { + uniqValues[id] = true + } + if len(uniqValues) != len(ids) { + t.Errorf("uniqValues does not match len(ids): %v != %v", uniqValues, ids) + } +} diff --git a/subscribe.go b/subscribe.go index aee5228..c270ca4 100644 --- a/subscribe.go +++ b/subscribe.go @@ -1,5 +1,7 @@ package thebus +import "time" + type Subscription interface { GetID() string GetTopic() string @@ -8,7 +10,7 @@ type Subscription interface { } type subscription struct { - subscriptionID uuid.UUID + subscriptionID string cfg SubscriptionConfig topic string messageChan chan Message @@ -16,10 +18,26 @@ type subscription struct { unsubscribeFunc func() error } +type SubscriptionConfig struct { + Strategy SubscriptionStrategy + BufferSize int + SendTimeout time.Duration + DropIfFull bool +} + +func DefaultSubscriptionConfig() SubscriptionConfig { + return SubscriptionConfig{ + BufferSize: 128, + SendTimeout: 200 * time.Millisecond, + DropIfFull: true, + Strategy: SubscriptionStrategyPayloadShared, + } +} + var _ Subscription = (*subscription)(nil) func (s *subscription) GetID() string { - return s.subscriptionID.String() + return s.subscriptionID } func (s *subscription) GetTopic() string { @@ -33,3 +51,32 @@ func (s *subscription) Read() <-chan Message { func (s *subscription) Unsubscribe() error { return s.unsubscribeFunc() } + +// SubscribeOption - +type SubscribeOption func(subCfg *SubscriptionConfig) + +func WithStrategy(strategy SubscriptionStrategy) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.Strategy = strategy + } +} + +func WithBufferSize(bufferSize int) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + if bufferSize < 1 { + return + } + subCfg.BufferSize = bufferSize + } +} +func WithSendTimeout(timeout time.Duration) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.SendTimeout = timeout + } +} + +func WithDropIfFull(dropIfFull bool) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.DropIfFull = dropIfFull + } +} From 8a596f437b67e176bb8cb131754ade89ffc39bbd Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Sat, 6 Sep 2025 09:07:25 +0200 Subject: [PATCH 10/14] Deleting v1 after reading doc about golang versioning (not useful for v0 and v1, or if no breaking changes) Add test file for the future --- README.md | 6 +++++- error.go => bus_test.go | 0 config_test.go | 1 + fanout_test.go | 1 + go.mod | 2 +- 5 files changed, 8 insertions(+), 2 deletions(-) rename error.go => bus_test.go (100%) create mode 100644 config_test.go create mode 100644 fanout_test.go diff --git a/README.md b/README.md index 1f2568f..47e06b1 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,20 @@ thebus is a lightweight message-oriented middleware that provides a dead-simple Just install **thebus** in your project by using the following command. ```shell -go get -u github.com/sebundefined/thebus +go get -u github.com/sebundefined/thebus/v1 ``` ## Example Usage +### Simple + ```go package main ``` +### With custom options + ## Features diff --git a/error.go b/bus_test.go similarity index 100% rename from error.go rename to bus_test.go diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/config_test.go @@ -0,0 +1 @@ +package thebus diff --git a/fanout_test.go b/fanout_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/fanout_test.go @@ -0,0 +1 @@ +package thebus diff --git a/go.mod b/go.mod index 38129c9..9462ab1 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/sebundefined/thebus/v1 +module github.com/sebundefined/thebus go 1.24 From 372ef68cdfb17e9c5a51684bce4bec74d210f352 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Sat, 6 Sep 2025 09:34:12 +0200 Subject: [PATCH 11/14] Add error and bus bone --- bus.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ config.go | 9 +++++++- error.go | 13 ++++++++++++ errors.go | 1 - 4 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 error.go delete mode 100644 errors.go diff --git a/bus.go b/bus.go index a32a718..3b4d38e 100644 --- a/bus.go +++ b/bus.go @@ -1 +1,62 @@ package thebus + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type bus struct { + mutex sync.RWMutex + cfg *Config + open atomic.Bool + startedAt time.Time + //subscriptions map[string]*structState + totals atomicCounters +} + +var _ Bus = (*bus)(nil) + +func New(opts ...Option) (Bus, error) { + cfg := DefaultConfig() + for _, opt := range opts { + opt(cfg) + } + if err := cfg.Validate(); err != nil { + return nil, err + } + b := &bus{ + startedAt: time.Now(), + cfg: cfg, + totals: atomicCounters{}, + //subscriptions: make(map[string]*structState), + } + b.open.Store(true) + return b, nil +} + +func (b *bus) Publish(topic string, data []byte) (PublishAck, error) { + //TODO implement me + panic("implement me") +} + +func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) { + //TODO implement me + panic("implement me") +} + +func (b *bus) Unsubscribe(topic string, subscriberID string) error { + //TODO implement me + panic("implement me") +} + +func (b *bus) Close() error { + //TODO implement me + panic("implement me") +} + +func (b *bus) Stats() (StatsResults, error) { + //TODO implement me + panic("implement me") +} diff --git a/config.go b/config.go index ccef564..0059e5b 100644 --- a/config.go +++ b/config.go @@ -175,12 +175,19 @@ func WithPanicHandler(handler func(topic string, v any)) Option { } } -func (cfg *Config) WithIDGenerator(IDGenerator IDGenerator) Option { +func WithIDGenerator(IDGenerator IDGenerator) Option { return func(cfg *Config) { cfg.IDGenerator = IDGenerator } } +func (cfg *Config) Validate() error { + if cfg.IDGenerator == nil { + return ErrIDGeneratorNotSet + } + return nil +} + func BuildConfig(opts ...Option) *Config { cfg := DefaultConfig() for _, opt := range opts { diff --git a/error.go b/error.go new file mode 100644 index 0000000..83bd8a7 --- /dev/null +++ b/error.go @@ -0,0 +1,13 @@ +package thebus + +import "errors" + +var ( + ErrClosed = errors.New("thebus.closed") + ErrQueueFull = errors.New("thebus.queue.full") + ErrInvalidTopic = errors.New("thebus.invalid.topic") + ErrInvalidSubscriberID = errors.New("thebus.invalid.subscriberID") + ErrInvalidTopicName = errors.New("thebus.invalid.topic.name") + ErrInvalidTopicNameReserved = errors.New("thebus.invalid.topic.name.reserved") + ErrIDGeneratorNotSet = errors.New("thebus.idGenerator.not_set") +) diff --git a/errors.go b/errors.go deleted file mode 100644 index a32a718..0000000 --- a/errors.go +++ /dev/null @@ -1 +0,0 @@ -package thebus From e24a6803d93f286e4e72afd4f76e47844f1187f3 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Sat, 6 Sep 2025 10:47:33 +0200 Subject: [PATCH 12/14] Adding methods for getting the state and create it if needed. --- README.md | 5 +++-- bus.go | 63 +++++++++++++++++++++++++++++++++++++++++++--------- config.go | 6 ++++- message.go | 27 ++++++++++++++++++++++ subscribe.go | 30 ++++++++++++++++++------- 5 files changed, 110 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 47e06b1..415d604 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,15 @@ [![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/ci.yml) # thebus -thebus is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. +**thebus** is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. + ## Getting started Just install **thebus** in your project by using the following command. ```shell -go get -u github.com/sebundefined/thebus/v1 +go get -u github.com/sebundefined/thebus ``` ## Example Usage diff --git a/bus.go b/bus.go index 3b4d38e..caf031d 100644 --- a/bus.go +++ b/bus.go @@ -2,18 +2,19 @@ package thebus import ( "context" + "fmt" "sync" "sync/atomic" "time" ) type bus struct { - mutex sync.RWMutex - cfg *Config - open atomic.Bool - startedAt time.Time - //subscriptions map[string]*structState - totals atomicCounters + mutex sync.RWMutex + cfg *Config + open atomic.Bool + startedAt time.Time + subscriptions map[string]*topicState + totals atomicCounters } var _ Bus = (*bus)(nil) @@ -27,10 +28,10 @@ func New(opts ...Option) (Bus, error) { return nil, err } b := &bus{ - startedAt: time.Now(), - cfg: cfg, - totals: atomicCounters{}, - //subscriptions: make(map[string]*structState), + startedAt: time.Now(), + cfg: cfg, + totals: atomicCounters{}, + subscriptions: make(map[string]*topicState), } b.open.Store(true) return b, nil @@ -60,3 +61,45 @@ func (b *bus) Stats() (StatsResults, error) { //TODO implement me panic("implement me") } + +// getStateReadLocked +// Caller must hold RLock. +func (b *bus) getStateReadLocked(topic string) (*topicState, bool) { + state, ok := b.subscriptions[topic] + return state, ok +} + +// getOrCreateStateLocked ensure that the state for a specific topic is initialized. +// If the topic is found, it returns it otherwise, it creates a new one. +// getOrCreateStateLocked start the fanOut method automatically. +// Caller must hold Lock. +func (b *bus) getOrCreateStateLocked(topic string) (*topicState, bool, error) { + state, ok := b.subscriptions[topic] + created := false + if !ok { + if b.cfg.MaxTopics > 0 && len(b.subscriptions) >= b.cfg.MaxTopics { + return nil, + false, + fmt.Errorf("too many topics (max=%d)", b.cfg.MaxTopics) + } + created = true + // Create the chanel. Apply default in case of bad config + var queue chan messageRef + if b.cfg.TopicQueueSize <= 0 { + queue = make(chan messageRef, DefaultTopicQueueSize) + } else { + queue = make(chan messageRef, b.cfg.TopicQueueSize) + } + state = &topicState{ + subs: make(map[string]*subscription), + counters: atomicCounters{}, + inQueue: queue, + } + b.subscriptions[topic] = state + if state.started.CompareAndSwap(false, true) { + state.wg.Add(1) + // TODO run the fanout like go l.runFanOut(topic, state) + } + } + return state, created, nil +} diff --git a/config.go b/config.go index 0059e5b..0978134 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,10 @@ import ( "time" ) +const ( + DefaultTopicQueueSize = 1024 +) + // ############################################################################## // ################################## ENUM ################################## // ############################################################################## @@ -92,7 +96,7 @@ type Config struct { func DefaultConfig() *Config { return &Config{ - TopicQueueSize: 1024, + TopicQueueSize: DefaultTopicQueueSize, AutoDeleteEmptyTopics: true, TopicIdleTTL: 0 * time.Second, JanitorInterval: 0 * time.Second, diff --git a/message.go b/message.go index 7a77222..09cecc1 100644 --- a/message.go +++ b/message.go @@ -8,3 +8,30 @@ type Message struct { Payload []byte Seq uint64 } + +type messageRef struct { + topic string + ts time.Time + seq uint64 + payload []byte +} + +func newMessageRef(topic string, ts time.Time, seq uint64, payload []byte) messageRef { + return messageRef{ + topic: topic, + ts: ts, + seq: seq, + payload: payload, + } +} + +func newMessageRefCopy(topic string, ts time.Time, seq uint64, payload []byte) messageRef { + newPayload := make([]byte, len(payload)) + copy(newPayload, payload) + return messageRef{ + topic: topic, + ts: ts, + seq: seq, + payload: newPayload, + } +} diff --git a/subscribe.go b/subscribe.go index c270ca4..1c90bc5 100644 --- a/subscribe.go +++ b/subscribe.go @@ -1,6 +1,10 @@ package thebus -import "time" +import ( + "sync" + "sync/atomic" + "time" +) type Subscription interface { GetID() string @@ -9,6 +13,13 @@ type Subscription interface { Unsubscribe() error } +type SubscriptionConfig struct { + Strategy SubscriptionStrategy + BufferSize int + SendTimeout time.Duration + DropIfFull bool +} + type subscription struct { subscriptionID string cfg SubscriptionConfig @@ -18,13 +29,6 @@ type subscription struct { unsubscribeFunc func() error } -type SubscriptionConfig struct { - Strategy SubscriptionStrategy - BufferSize int - SendTimeout time.Duration - DropIfFull bool -} - func DefaultSubscriptionConfig() SubscriptionConfig { return SubscriptionConfig{ BufferSize: 128, @@ -80,3 +84,13 @@ func WithDropIfFull(dropIfFull bool) SubscribeOption { subCfg.DropIfFull = dropIfFull } } + +type topicState struct { + subs map[string]*subscription + started atomic.Bool + counters atomicCounters + inQueue chan messageRef + seq atomic.Uint64 + wg sync.WaitGroup + closed atomic.Bool +} From 4f5506db6af4dc6fd3e504de589187c7caf79668 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Sun, 7 Sep 2025 13:53:03 +0200 Subject: [PATCH 13/14] Add some test to complete before coding Publish and stats --- Makefile | 58 +++++++++++++++++++++ bus.go | 135 ++++++++++++++++++++++++++++++++++++++----------- fanout.go | 72 ++++++++++++++++++++++++++ fanout_test.go | 106 ++++++++++++++++++++++++++++++++++++++ message.go | 28 +++++----- subscribe.go | 37 ++++++++++++++ 6 files changed, 390 insertions(+), 46 deletions(-) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..23f1557 --- /dev/null +++ b/Makefile @@ -0,0 +1,58 @@ +# Based on amazing example from https://gist.github.com/serinth/16391e360692f6a000e5a10382d1148c +SERVICE ?= $(shell basename `go list`) +VERSION ?= $(shell git describe --tags --always --dirty --match=v* 2> /dev/null || cat $(PWD)/.version 2> /dev/null || echo v0) +PACKAGE ?= $(shell go list) +PACKAGES ?= $(shell go list ./...) +FILES ?= $(shell find . -type f -name '*.go' -not -path "./vendor/*") + +# Binaries +PROTOC ?= protoc + +.PHONY: help clean fmt lint vet test test-cover all + +default: help + +help: ## show this help + @echo 'usage: make [target] ...' + @echo '' + @echo 'targets:' + @egrep '^(.+)\:\ .*##\ (.+)' ${MAKEFILE_LIST} | sed 's/:.*##/#/' | column -t -c 2 -s '#' + +all: ## clean, format, build and unit test + make clean-all + make gofmt + make build + make test + +env: ## Print useful environment variables to stdout + echo $(CURDIR) + echo $(SERVICE) + echo $(PACKAGE) + echo $(VERSION) + +clean: ## go clean + go clean + +build: + go build main.go + +test: + go test -v ./... -short + +test-it: + go test -v ./... + +test-bench: ## run benchmark tests + go test -bench ./... + +# Generate test coverage +test-cover: ## Run test coverage and generate html report + rm -fr coverage + mkdir coverage + go list -f '{{if gt (len .TestGoFiles) 0}}"go test -covermode count -coverprofile {{.Name}}.coverprofile -coverpkg ./... {{.ImportPath}}"{{end}}' ./... | xargs -I {} bash -c {} + echo "mode: count" > coverage/cover.out + grep -h -v "^mode:" *.coverprofile >> "coverage/cover.out" + rm *.coverprofile + go tool cover -html=coverage/cover.out -o=coverage/cover.html + +test-all: test test-bench test-cover \ No newline at end of file diff --git a/bus.go b/bus.go index caf031d..854cdee 100644 --- a/bus.go +++ b/bus.go @@ -3,6 +3,7 @@ package thebus import ( "context" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -43,8 +44,72 @@ func (b *bus) Publish(topic string, data []byte) (PublishAck, error) { } func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) { - //TODO implement me - panic("implement me") + // Standard checks + if !b.open.Load() { + return nil, ErrClosed + } + if len(strings.TrimSpace(topic)) == 0 { + return nil, ErrInvalidTopic + } + + // Building the config based on the default one + // (check in the future the default @SebUndefined) + cfg := BuildSubscriptionConfig(opts...).Normalize() + + // Building the subscription + id := b.cfg.IDGenerator() + msgChan := make(chan Message, cfg.BufferSize) + sub := &subscription{ + subscriptionID: id, + cfg: cfg, + topic: topic, + messageChan: msgChan, + messages: (<-chan Message)(msgChan), + } + // Saving, function under lock so ok + err := b.withWriteState(topic, true, func(state *topicState) error { + // Recheck in case of closed before the first lock + // It is possible that someone close it pending we wait for the first lock + // I do it for avoiding weird state.... + if !b.open.Load() { + // unlock useless here because it is handled by withWriteState + return ErrClosed + } + if b.cfg.MaxSubscribersPerTopic > 0 && len(state.subs) >= b.cfg.MaxSubscribersPerTopic { + return fmt.Errorf("too many subscribers per topic (max: %d)", b.cfg.MaxSubscribersPerTopic) + } + state.subs[id] = sub + return nil + }) + if err != nil { + return nil, err + } + sub.unsubscribeFunc = b.buildUnsubscribeFunction(id, topic) + go func() { + select { + case <-ctx.Done(): + _ = sub.Unsubscribe() // idempotent + } + }() + + return sub, nil +} + +func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error { + return func() error { + b.mutex.Lock() + if state, ok := b.subscriptions[topic]; ok { + delete(state.subs, id) + if len(state.subs) == 0 && len(state.inQueue) == 0 { + if state.closed.CompareAndSwap(false, true) { // ← garde-fou + close(state.inQueue) + delete(b.subscriptions, topic) + } + } + } + b.mutex.Unlock() + return nil + } } func (b *bus) Unsubscribe(topic string, subscriberID string) error { @@ -62,44 +127,54 @@ func (b *bus) Stats() (StatsResults, error) { panic("implement me") } -// getStateReadLocked -// Caller must hold RLock. -func (b *bus) getStateReadLocked(topic string) (*topicState, bool) { - state, ok := b.subscriptions[topic] - return state, ok +func (b *bus) snapshotState(topic string) (*topicState, int, bool) { + b.mutex.RLock() + subsCount := 0 + st, ok := b.subscriptions[topic] + if ok { + subsCount = len(st.subs) + } + b.mutex.RUnlock() + return st, subsCount, ok } -// getOrCreateStateLocked ensure that the state for a specific topic is initialized. -// If the topic is found, it returns it otherwise, it creates a new one. -// getOrCreateStateLocked start the fanOut method automatically. -// Caller must hold Lock. -func (b *bus) getOrCreateStateLocked(topic string) (*topicState, bool, error) { +func (b *bus) withReadState(topic string, callback func(st *topicState) error) error { + b.mutex.RLock() + defer b.mutex.RUnlock() + return callback(b.subscriptions[topic]) +} + +func (b *bus) withWriteState(topic string, createIfNotExists bool, writeFunc func(state *topicState) error) error { + b.mutex.Lock() state, ok := b.subscriptions[topic] - created := false + start := false if !ok { - if b.cfg.MaxTopics > 0 && len(b.subscriptions) >= b.cfg.MaxTopics { - return nil, - false, - fmt.Errorf("too many topics (max=%d)", b.cfg.MaxTopics) + if !createIfNotExists { + err := writeFunc(nil) + b.mutex.Unlock() + return err } - created = true - // Create the chanel. Apply default in case of bad config - var queue chan messageRef - if b.cfg.TopicQueueSize <= 0 { - queue = make(chan messageRef, DefaultTopicQueueSize) - } else { - queue = make(chan messageRef, b.cfg.TopicQueueSize) + if b.cfg.MaxTopics > 0 && len(b.subscriptions) >= b.cfg.MaxTopics { + b.mutex.Unlock() + return fmt.Errorf("too many topics (max=%d)", b.cfg.MaxTopics) } - state = &topicState{ - subs: make(map[string]*subscription), - counters: atomicCounters{}, - inQueue: queue, + qSize := b.cfg.TopicQueueSize + if qSize <= 0 { + qSize = DefaultTopicQueueSize } + state = newTopicState(qSize) + // add the state b.subscriptions[topic] = state + if state.started.CompareAndSwap(false, true) { state.wg.Add(1) - // TODO run the fanout like go l.runFanOut(topic, state) + start = true } } - return state, created, nil + err := writeFunc(state) + b.mutex.Unlock() + if start { + go b.runFanOut(topic, state) + } + return err } diff --git a/fanout.go b/fanout.go index a32a718..7beb042 100644 --- a/fanout.go +++ b/fanout.go @@ -1 +1,73 @@ package thebus + +import "time" + +func (b *bus) runFanOut(topic string, state *topicState) { + defer state.wg.Done() + + timer := time.NewTimer(time.Hour) + if !timer.Stop() { + <-timer.C + } + + for mr := range state.inQueue { + // snapshot sous RLock + b.mutex.RLock() + subs := snapshotSubsLocked(state.subs) + b.mutex.RUnlock() + + for _, sub := range subs { + msg := makeMessage(topic, mr, sub) + if tryDeliver(sub, msg, timer) { + state.counters.Delivered.Add(1) + b.totals.Delivered.Add(1) + } else { + state.counters.Dropped.Add(1) + b.totals.Dropped.Add(1) + } + } + } +} + +// snapshotSubsLocked caller must hold RLock +func snapshotSubsLocked(m map[string]*subscription) []*subscription { + subs := make([]*subscription, 0, len(m)) + for _, s := range m { + subs = append(subs, s) + } + return subs +} + +func tryDeliver(sub *subscription, msg Message, timer *time.Timer) bool { + cfg := sub.cfg + if cfg.DropIfFull { + select { + case sub.messageChan <- msg: + return true + default: + return false + } + } + // timeout + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(cfg.SendTimeout) + + select { + case sub.messageChan <- msg: + // flosh timer if needed + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + return true + case <-timer.C: + return false + } +} diff --git a/fanout_test.go b/fanout_test.go index a32a718..0b2da45 100644 --- a/fanout_test.go +++ b/fanout_test.go @@ -1 +1,107 @@ package thebus + +import ( + "testing" + "time" +) + +func TestSnapshotSubsLocked(t *testing.T) { + tests := []struct { + name string + input map[string]*subscription + output []*subscription + }{ + { + name: "empty", + input: make(map[string]*subscription), + output: make([]*subscription, 0), + }, + { + name: "found", + input: map[string]*subscription{ + "ID1": { + subscriptionID: "ID1", + cfg: SubscriptionConfig{}, + topic: "", + messageChan: nil, + messages: nil, + unsubscribeFunc: nil, + }, + }, + output: []*subscription{ + { + subscriptionID: "ID1", + cfg: SubscriptionConfig{}, + topic: "", + messageChan: nil, + messages: nil, + unsubscribeFunc: nil, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + output := snapshotSubsLocked(test.input) + if len(test.output) != len(output) { + t.Errorf("len(input) = %d, want %d", len(test.input), len(test.output)) + } + }) + } +} + +type testInputTryDelivery struct { + sub *subscription + msg Message + timer *time.Timer +} + +func TestTryDeliver(t *testing.T) { + tests := []struct { + name string + input testInputTryDelivery + output bool + }{ + { + name: "success", + input: testInputTryDelivery{ + &subscription{}, + Message{}, + nil, + }, + output: true, + }, + { + name: "failure", + input: testInputTryDelivery{ + &subscription{}, + Message{}, + nil, + }, + output: false, + }, + { + name: "success_timeout", + input: testInputTryDelivery{ + &subscription{}, + Message{}, + nil, + }, + output: true, + }, + { + name: "failure_timeout", + input: testInputTryDelivery{ + &subscription{}, + Message{}, + nil, + }, + output: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + }) + } +} diff --git a/message.go b/message.go index 09cecc1..8b8e7d7 100644 --- a/message.go +++ b/message.go @@ -16,22 +16,18 @@ type messageRef struct { payload []byte } -func newMessageRef(topic string, ts time.Time, seq uint64, payload []byte) messageRef { - return messageRef{ - topic: topic, - ts: ts, - seq: seq, - payload: payload, +func makeMessage(topic string, mr messageRef, sub *subscription) Message { + msg := Message{ + Topic: topic, + Timestamp: mr.ts, + Seq: mr.seq, } -} - -func newMessageRefCopy(topic string, ts time.Time, seq uint64, payload []byte) messageRef { - newPayload := make([]byte, len(payload)) - copy(newPayload, payload) - return messageRef{ - topic: topic, - ts: ts, - seq: seq, - payload: newPayload, + if sub.cfg.Strategy == SubscriptionStrategyPayloadShared { + msg.Payload = mr.payload + } else { + buf := make([]byte, len(mr.payload)) + copy(buf, mr.payload) + msg.Payload = buf } + return msg } diff --git a/subscribe.go b/subscribe.go index 1c90bc5..1168b63 100644 --- a/subscribe.go +++ b/subscribe.go @@ -20,6 +20,19 @@ type SubscriptionConfig struct { DropIfFull bool } +func (cfg SubscriptionConfig) Normalize() SubscriptionConfig { + if !cfg.Strategy.IsValid() { + cfg.Strategy = SubscriptionStrategyPayloadShared + } + if cfg.BufferSize < 1 { + cfg.BufferSize = 128 + } + if cfg.SendTimeout <= 0 { + cfg.DropIfFull = true + } + return cfg +} + type subscription struct { subscriptionID string cfg SubscriptionConfig @@ -53,6 +66,9 @@ func (s *subscription) Read() <-chan Message { } func (s *subscription) Unsubscribe() error { + if s.unsubscribeFunc == nil { + return nil + } return s.unsubscribeFunc() } @@ -85,6 +101,14 @@ func WithDropIfFull(dropIfFull bool) SubscribeOption { } } +func BuildSubscriptionConfig(opts ...SubscribeOption) SubscriptionConfig { + cfg := DefaultSubscriptionConfig() + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + type topicState struct { subs map[string]*subscription started atomic.Bool @@ -94,3 +118,16 @@ type topicState struct { wg sync.WaitGroup closed atomic.Bool } + +func newTopicState(queueSize int) *topicState { + var queue chan messageRef + if queueSize <= 0 { + queue = make(chan messageRef, DefaultTopicQueueSize) + } else { + queue = make(chan messageRef, queueSize) + } + return &topicState{ + subs: make(map[string]*subscription), + inQueue: queue, + } +} From 7e88afb0b2df1ec73ebc025be5fa74734eb7e7c1 Mon Sep 17 00:00:00 2001 From: SebUndefined Date: Mon, 8 Sep 2025 10:44:47 +0200 Subject: [PATCH 14/14] Add some test --- .github/workflows/ci.yml | 23 +++ bus.go | 86 +++++++--- bus_test.go | 351 +++++++++++++++++++++++++++++++++++++++ config.go | 122 ++++++-------- fanout_test.go | 107 ------------ logger.go | 13 +- message_test.go | 63 +++++++ subscribe.go | 57 +++++++ thebus_test.go | 52 +++++- 9 files changed, 675 insertions(+), 199 deletions(-) create mode 100644 .github/workflows/ci.yml delete mode 100644 fanout_test.go create mode 100644 message_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..0713b60 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,23 @@ +name: thebus_ci + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + name: thebus_test + runs-on: ubuntu-22.04 + + steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v5 + - name: Set up Go 1.24 + uses: actions/setup-go@v6 + with: + go-version: "1.24" + - run: go version \ No newline at end of file diff --git a/bus.go b/bus.go index 854cdee..72dd2a5 100644 --- a/bus.go +++ b/bus.go @@ -21,13 +21,7 @@ type bus struct { var _ Bus = (*bus)(nil) func New(opts ...Option) (Bus, error) { - cfg := DefaultConfig() - for _, opt := range opts { - opt(cfg) - } - if err := cfg.Validate(); err != nil { - return nil, err - } + cfg := BuildConfig(opts...).Normalize() b := &bus{ startedAt: time.Now(), cfg: cfg, @@ -100,8 +94,8 @@ func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error { b.mutex.Lock() if state, ok := b.subscriptions[topic]; ok { delete(state.subs, id) - if len(state.subs) == 0 && len(state.inQueue) == 0 { - if state.closed.CompareAndSwap(false, true) { // ← garde-fou + if len(state.subs) == 0 && len(state.inQueue) == 0 && b.cfg.AutoDeleteEmptyTopics { + if state.closed.CompareAndSwap(false, true) { close(state.inQueue) delete(b.subscriptions, topic) } @@ -118,24 +112,72 @@ func (b *bus) Unsubscribe(topic string, subscriberID string) error { } func (b *bus) Close() error { - //TODO implement me - panic("implement me") -} + // refuse new publish and subscribe + if !b.open.CompareAndSwap(true, false) { + return nil + } + b.mutex.Lock() + states := make([]*topicState, 0, len(b.subscriptions)) + for _, st := range b.subscriptions { + states = append(states, st) + } + // close the inQueue of each topic + for _, st := range states { + if st.closed.CompareAndSwap(false, true) { // ← idem ici + close(st.inQueue) + } + } + b.mutex.Unlock() -func (b *bus) Stats() (StatsResults, error) { - //TODO implement me - panic("implement me") + // Wait for the worker to stop + for _, st := range states { + st.wg.Wait() + } + + // Cleaning memory + b.mutex.Lock() + b.subscriptions = make(map[string]*topicState) // reset propre + b.mutex.Unlock() + + return nil } -func (b *bus) snapshotState(topic string) (*topicState, int, bool) { +func (b *bus) Stats() (StatsResults, error) { b.mutex.RLock() - subsCount := 0 - st, ok := b.subscriptions[topic] - if ok { - subsCount = len(st.subs) + defer b.mutex.RUnlock() + perTopic := make(map[string]TopicStats, len(b.subscriptions)) + subscriberCounts := 0 + for topic, state := range b.subscriptions { + buffered := 0 + for _, sub := range state.subs { + subscriberCounts++ + buffered += len(sub.messageChan) + } + perTopic[topic] = TopicStats{ + Subscribers: len(state.subs), + Buffered: buffered, + Counters: Counters{ + Published: state.counters.Published.Load(), + Delivered: state.counters.Delivered.Load(), + Failed: state.counters.Failed.Load(), + Dropped: state.counters.Dropped.Load(), + }, + } + } + s := StatsResults{ + StartedAt: b.startedAt, + Open: b.open.Load(), + Topics: len(b.subscriptions), + Subscribers: subscriberCounts, + Totals: Counters{ + Published: b.totals.Published.Load(), + Delivered: b.totals.Delivered.Load(), + Failed: b.totals.Failed.Load(), + Dropped: b.totals.Dropped.Load(), + }, + PerTopic: perTopic, } - b.mutex.RUnlock() - return st, subsCount, ok + return s, nil } func (b *bus) withReadState(topic string, callback func(st *topicState) error) error { diff --git a/bus_test.go b/bus_test.go index a32a718..8a2af37 100644 --- a/bus_test.go +++ b/bus_test.go @@ -1 +1,352 @@ package thebus + +import ( + "testing" + "time" +) + +func TestWithReadState(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withReadState("nope", func(st *topicState) error { + if st != nil { + t.Fatal("expected nil state for unknown topic") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + // Test if exists + err = bb.withWriteState("nope", true, func(st *topicState) error { + if st == nil { + t.Fatal("expected non-nil state for unknown topic") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestWithWriteStateNoCreation(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + called := false + err := bb.withWriteState("t", false, func(st *topicState) error { + called = true + if st != nil { + t.Fatal("expected nil state when createIfNotExists=false and topic missing") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + if !called { + t.Fatal("callback not called") + } +} + +func TestWithWriteStateCreateIfNotExists(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withWriteState("t", true, func(st *topicState) error { + if st == nil { + t.Fatal("state should be created") + } + if st.inQueue == nil { + t.Fatal("inQueue should be initialized") + } + if !st.started.Load() { + t.Fatal("worker should be marked started") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} +func TestWithWriteStateCreateMaxTopics(t *testing.T) { + b, _ := New(WithMaxTopics(1)) + defer b.Close() + bb := b.(*bus) + + // 1rst topic should be ok + if err := bb.withWriteState("t1", true, func(st *topicState) error { + if st == nil { + t.Fatal("st should not be nil") + } + return nil + }); err != nil { + t.Fatal(err) + } + + // use same topic, should not block + if err := bb.withWriteState("t1", true, func(st *topicState) error { + if st == nil { + t.Fatal("st should not be nil (existing topic)") + } + return nil + }); err != nil { + t.Fatalf("unexpected err on existing topic: %v", err) + } + + // Break ! max topic should border the creation + if err := bb.withWriteState("t2", true, func(st *topicState) error { return nil }); err == nil { + t.Fatal("expected error MaxTopics exceeded") + } + + // The topic 1 is stated (just for clarification + bb.mutex.RLock() + st := bb.subscriptions["t1"] + bb.mutex.RUnlock() + if st == nil || !st.started.Load() { + t.Fatal("topic t1 should exist and be started") + } +} + +func TestWithWriteStateCreateStartTopicRead(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withWriteState("t", true, func(st *topicState) error { + // Add a message for checking the health + select { + case st.inQueue <- messageRef{topic: "t", ts: time.Now(), payload: []byte("x")}: + default: + t.Fatal("inQueue should be writable") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + // Wait for worker to read, should not panic + time.Sleep(10 * time.Millisecond) +} + +func TestSnapshotSubsLocked(t *testing.T) { + m := map[string]*subscription{ + "A": {subscriptionID: "A"}, + } + out := snapshotSubsLocked(m) + if len(out) != 1 { + t.Fatalf("want 1, got %d", len(out)) + } + + // Mutate the map, should not modify the main map from the bus + delete(m, "A") + if len(out) != 1 { + t.Fatal("snapshot should be independent from map mutation") + } +} + +type testInputTryDelivery struct { + sub *subscription + msg Message + timer *time.Timer +} + +func TestTryDeliver(t *testing.T) { + tests := []struct { + name string + input testInputTryDelivery + output bool + }{ + { + name: "success", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: true, + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{}, + nil, + }, + output: true, + }, + { + name: "failure", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: true, + }, + }, + Message{ + Topic: "test", + Timestamp: time.Now(), + Payload: []byte("test"), + Seq: 0, + }, + nil, + }, + output: false, + }, + { + name: "success_timeout", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: false, + SendTimeout: 2 * time.Second, + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{}, + time.NewTimer(2 * time.Second), + }, + output: true, + }, + { + name: "failure_timeout", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID2", + cfg: SubscriptionConfig{ + DropIfFull: false, + SendTimeout: 50 * time.Millisecond, // court timeout + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{ + Topic: "test", + Timestamp: time.Now(), + Payload: []byte("x"), + }, + time.NewTimer(time.Hour), + }, + output: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // ugly, but for this use case specifically we fill the chan + // It will result in a timeout + if test.name == "failure_timeout" { + test.input.sub.messageChan <- Message{} + } + res := tryDeliver(test.input.sub, test.input.msg, test.input.timer) + if res != test.output { + t.Errorf("res = %v, want %v", res, test.output) + } + }) + } +} + +func TestUnsubscribeDeleteTopicOnLastSub(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + // create a topic and a sub manually + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + + // build unsubribe fonc and test it + unsub := bb.buildUnsubscribeFunction("S", "t") + if err := unsub(); err != nil { + t.Fatal(err) + } + + // check if deleted + bb.mutex.RLock() + _, ok := bb.subscriptions["t"] + bb.mutex.RUnlock() + if ok { + t.Fatal("topic should be deleted when last sub removed and queue empty") + } +} + +func TestUnsubscribeKeepTopicWhenAutoDeleteDisabled(t *testing.T) { + b, _ := New(WithAutoDeleteEmptyTopics(false)) + bb := b.(*bus) + + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + unsub := bb.buildUnsubscribeFunction("S", "t") + _ = unsub() + + bb.mutex.RLock() + _, ok := bb.subscriptions["t"] + bb.mutex.RUnlock() + if !ok { + t.Fatal("topic should remain when auto-delete is disabled") + } +} + +func TestRunFanOutDeliveredCounter(t *testing.T) { + b, _ := New() + bb := b.(*bus) + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + // push a message + _ = bb.withWriteState("t", true, func(st *topicState) error { + select { + case st.inQueue <- messageRef{ + topic: "t", + ts: time.Now(), + seq: 0, + payload: nil}: + + default: + t.Fatal("should not receive any messages") + } + return nil + }) + time.Sleep(20 * time.Millisecond) + bb.mutex.RLock() + ts := bb.subscriptions["t"] + del := ts.counters.Delivered.Load() + bb.mutex.RUnlock() + if del < 1 { + t.Fatalf("expected Delivered>=1, got %d", del) + } +} + +func TestRunFanOutDroppedCounter(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + _ = bb.withWriteState("t", true, func(st *topicState) error { + sub := &subscription{ + subscriptionID: "S", + cfg: SubscriptionConfig{BufferSize: 1, DropIfFull: true}, + messageChan: make(chan Message, 1), + } + sub.messageChan <- Message{} + st.subs["S"] = sub + return nil + }) + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.inQueue <- messageRef{topic: "t", payload: []byte("x"), ts: time.Now()} + return nil + }) + time.Sleep(20 * time.Millisecond) + + bb.mutex.RLock() + ts := bb.subscriptions["t"] + dropped := ts.counters.Dropped.Load() + bb.mutex.RUnlock() + if dropped < 1 { + t.Fatalf("expected Dropped>=1, got %d", dropped) + } +} diff --git a/config.go b/config.go index 0978134..3968001 100644 --- a/config.go +++ b/config.go @@ -1,70 +1,15 @@ package thebus import ( - "encoding/json" - "fmt" - "slices" - "strings" "time" ) const ( DefaultTopicQueueSize = 1024 + DefaultSubBufferSize = 128 + DefaultSendTimeout = 200 * time.Millisecond ) -// ############################################################################## -// ################################## ENUM ################################## -// ############################################################################## - -// SubscriptionStrategy define if the payload must be -// shared by the subscribers (SubscriptionStrategyPayloadShared, the default) -// or copied (SubscriptionStrategyPayloadClonedPerSubscriber) -type SubscriptionStrategy string - -const ( - SubscriptionStrategyUnknown SubscriptionStrategy = "UNKNOWN" - SubscriptionStrategyPayloadShared SubscriptionStrategy = "PAYLOAD_SHARED" - SubscriptionStrategyPayloadClonedPerSubscriber SubscriptionStrategy = "PAYLOAD_CLONED_PER_SUBSCRIBER" -) - -func (enum SubscriptionStrategy) String() string { - if len(strings.TrimSpace(string(enum))) == 0 { - return string(SubscriptionStrategyUnknown) - } - return string(enum) -} - -func SubscriptionStrategyValues() []SubscriptionStrategy { - return []SubscriptionStrategy{ - SubscriptionStrategyPayloadShared, - SubscriptionStrategyPayloadClonedPerSubscriber, - } -} - -func (enum SubscriptionStrategy) IsValid() bool { - if slices.Contains(SubscriptionStrategyValues(), enum) { - return true - } - return false -} - -func (enum SubscriptionStrategy) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`"%s"`, enum)), nil -} - -func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error { - var tmp string - if err := json.Unmarshal(data, &tmp); err != nil { - return err - } - fs := SubscriptionStrategy(tmp) - if !fs.IsValid() { - fs = SubscriptionStrategyUnknown - } - *enum = fs - return nil -} - // ############################################################################## // ################################ CONFIG ################################## // ############################################################################## @@ -94,17 +39,52 @@ type Config struct { PanicHandler func(topic string, v any) // default: nil (no recover) } +func (cfg *Config) Normalize() *Config { + if cfg.TopicQueueSize <= 0 { + cfg.TopicQueueSize = DefaultTopicQueueSize + } + if cfg.TopicIdleTTL < 0 { + cfg.TopicIdleTTL = 0 + } + if cfg.JanitorInterval < 0 { + cfg.JanitorInterval = 0 + } + if cfg.IDGenerator == nil { + cfg.IDGenerator = DefaultIDGenerator + } + if cfg.DefaultSubBufferSize <= 0 { + cfg.DefaultSubBufferSize = DefaultSubBufferSize + } + if cfg.DefaultSendTimeout <= 0 { + cfg.DefaultSendTimeout = DefaultSendTimeout + } + if !cfg.DefaultStrategy.IsValid() { + cfg.DefaultStrategy = SubscriptionStrategyPayloadShared + } + if cfg.MaxTopics < 0 { + cfg.MaxTopics = 0 + } + if cfg.MaxSubscribersPerTopic < 0 { + cfg.MaxSubscribersPerTopic = 0 + } + if cfg.Logger == nil { + cfg.Logger = &noopLogger{} + } + return cfg +} + func DefaultConfig() *Config { return &Config{ TopicQueueSize: DefaultTopicQueueSize, AutoDeleteEmptyTopics: true, TopicIdleTTL: 0 * time.Second, JanitorInterval: 0 * time.Second, - DefaultSubBufferSize: 128, - DefaultSendTimeout: 200 * time.Millisecond, + DefaultSubBufferSize: DefaultSubBufferSize, + DefaultSendTimeout: DefaultSendTimeout, DefaultDropIfFull: true, DefaultStrategy: SubscriptionStrategyPayloadShared, IDGenerator: DefaultIDGenerator, + Logger: NoopLogger(), } } @@ -120,9 +100,9 @@ func WithTopicQueueSize(size int) Option { } } -func WithAutoDeleteEmptyTopics() Option { +func WithAutoDeleteEmptyTopics(b bool) Option { return func(cfg *Config) { - cfg.AutoDeleteEmptyTopics = true + cfg.AutoDeleteEmptyTopics = b } } @@ -161,6 +141,19 @@ func WithDefaultStrategy(strategy SubscriptionStrategy) Option { cfg.DefaultStrategy = strategy } } + +func WithMaxTopics(max int) Option { + return func(cfg *Config) { + cfg.MaxTopics = max + } +} + +func WithMaxSubscribersPerTopic(max int) Option { + return func(cfg *Config) { + cfg.MaxSubscribersPerTopic = max + } +} + func WithLogger(logger Logger) Option { return func(cfg *Config) { cfg.Logger = logger @@ -185,13 +178,6 @@ func WithIDGenerator(IDGenerator IDGenerator) Option { } } -func (cfg *Config) Validate() error { - if cfg.IDGenerator == nil { - return ErrIDGeneratorNotSet - } - return nil -} - func BuildConfig(opts ...Option) *Config { cfg := DefaultConfig() for _, opt := range opts { diff --git a/fanout_test.go b/fanout_test.go deleted file mode 100644 index 0b2da45..0000000 --- a/fanout_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package thebus - -import ( - "testing" - "time" -) - -func TestSnapshotSubsLocked(t *testing.T) { - tests := []struct { - name string - input map[string]*subscription - output []*subscription - }{ - { - name: "empty", - input: make(map[string]*subscription), - output: make([]*subscription, 0), - }, - { - name: "found", - input: map[string]*subscription{ - "ID1": { - subscriptionID: "ID1", - cfg: SubscriptionConfig{}, - topic: "", - messageChan: nil, - messages: nil, - unsubscribeFunc: nil, - }, - }, - output: []*subscription{ - { - subscriptionID: "ID1", - cfg: SubscriptionConfig{}, - topic: "", - messageChan: nil, - messages: nil, - unsubscribeFunc: nil, - }, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - output := snapshotSubsLocked(test.input) - if len(test.output) != len(output) { - t.Errorf("len(input) = %d, want %d", len(test.input), len(test.output)) - } - }) - } -} - -type testInputTryDelivery struct { - sub *subscription - msg Message - timer *time.Timer -} - -func TestTryDeliver(t *testing.T) { - tests := []struct { - name string - input testInputTryDelivery - output bool - }{ - { - name: "success", - input: testInputTryDelivery{ - &subscription{}, - Message{}, - nil, - }, - output: true, - }, - { - name: "failure", - input: testInputTryDelivery{ - &subscription{}, - Message{}, - nil, - }, - output: false, - }, - { - name: "success_timeout", - input: testInputTryDelivery{ - &subscription{}, - Message{}, - nil, - }, - output: true, - }, - { - name: "failure_timeout", - input: testInputTryDelivery{ - &subscription{}, - Message{}, - nil, - }, - output: true, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - - }) - } -} diff --git a/logger.go b/logger.go index d07930b..3d8e64a 100644 --- a/logger.go +++ b/logger.go @@ -1,7 +1,18 @@ package thebus type Logger interface { + Debug(msg string, kv ...any) Info(msg string, kv ...any) + Warn(msg string, kv ...any) Error(msg string, kv ...any) - Debug(msg string, kv ...any) } + +type noopLogger struct{} + +func NoopLogger() Logger { + return &noopLogger{} +} +func (*noopLogger) Info(msg string, kv ...any) {} +func (*noopLogger) Error(msg string, kv ...any) {} +func (*noopLogger) Debug(msg string, kv ...any) {} +func (l *noopLogger) Warn(msg string, kv ...any) {} diff --git a/message_test.go b/message_test.go new file mode 100644 index 0000000..4aba2e6 --- /dev/null +++ b/message_test.go @@ -0,0 +1,63 @@ +package thebus + +import ( + "testing" + "time" +) + +func TestMakeMessage(t *testing.T) { + tests := []struct { + name string + topic string + messageRef messageRef + sub *subscription + outPut Message + }{ + { + name: "Payload Shared", + topic: "shared", + sub: &subscription{ + cfg: SubscriptionConfig{ + Strategy: SubscriptionStrategyPayloadShared, + }, + }, + messageRef: messageRef{ + topic: "shared", + ts: time.Now(), + seq: 0, + payload: []byte("payload"), + }, + }, + { + name: "Payload cloned", + topic: "cloned", + sub: &subscription{ + cfg: SubscriptionConfig{ + Strategy: SubscriptionStrategyPayloadShared, + }, + }, + messageRef: messageRef{ + topic: "cloned", + ts: time.Now(), + seq: 0, + payload: []byte("payload"), + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + m := makeMessage(test.topic, test.messageRef, test.sub) + var sameBacking bool + if len(m.Payload) > 0 && len(test.messageRef.payload) > 0 { + if &m.Payload[0] == &test.messageRef.payload[0] { + sameBacking = true + } + } + if test.sub.cfg.Strategy == SubscriptionStrategyPayloadShared && !sameBacking { + t.Fatal("Expected payload to be shared") + } else if test.sub.cfg.Strategy == SubscriptionStrategyPayloadClonedPerSubscriber && sameBacking { + t.Fatal("Expected payload to be cloned") + } + }) + } +} diff --git a/subscribe.go b/subscribe.go index 1168b63..e6e192c 100644 --- a/subscribe.go +++ b/subscribe.go @@ -1,11 +1,68 @@ package thebus import ( + "encoding/json" + "fmt" + "slices" + "strings" "sync" "sync/atomic" "time" ) +// ############################################################################## +// ################################## ENUM ################################## +// ############################################################################## + +// SubscriptionStrategy define if the payload must be +// shared by the subscribers (SubscriptionStrategyPayloadShared, the default) +// or copied (SubscriptionStrategyPayloadClonedPerSubscriber) +type SubscriptionStrategy string + +const ( + SubscriptionStrategyUnknown SubscriptionStrategy = "UNKNOWN" + SubscriptionStrategyPayloadShared SubscriptionStrategy = "PAYLOAD_SHARED" + SubscriptionStrategyPayloadClonedPerSubscriber SubscriptionStrategy = "PAYLOAD_CLONED_PER_SUBSCRIBER" +) + +func (enum SubscriptionStrategy) String() string { + if len(strings.TrimSpace(string(enum))) == 0 { + return string(SubscriptionStrategyUnknown) + } + return string(enum) +} + +func SubscriptionStrategyValues() []SubscriptionStrategy { + return []SubscriptionStrategy{ + SubscriptionStrategyPayloadShared, + SubscriptionStrategyPayloadClonedPerSubscriber, + } +} + +func (enum SubscriptionStrategy) IsValid() bool { + if slices.Contains(SubscriptionStrategyValues(), enum) { + return true + } + return false +} + +func (enum SubscriptionStrategy) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, enum)), nil +} + +func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error { + var tmp string + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + fs := SubscriptionStrategy(tmp) + if !fs.IsValid() { + fs = SubscriptionStrategyUnknown + } + *enum = fs + return nil +} + type Subscription interface { GetID() string GetTopic() string diff --git a/thebus_test.go b/thebus_test.go index a32a718..cd76d80 100644 --- a/thebus_test.go +++ b/thebus_test.go @@ -1 +1,51 @@ -package thebus +package thebus_test + +import "testing" + +func TestNew(t *testing.T) { + +} + +func TestClose(t *testing.T) {} + +func TestPublishOK(t *testing.T) {} +func TestSubscribeOK(t *testing.T) {} + +func TestIncrementSequence(t *testing.T) {} + +func TestNSubsSubOrder(t *testing.T) {} + +func TestPayloadStrategy(t *testing.T) {} + +func TestBackPressure(t *testing.T) {} + +func TestDropIfFull(t *testing.T) {} + +func TestNotDropIfFull(t *testing.T) {} + +func TestMaxTopicSize(t *testing.T) {} +func TestMaxSubscribersPerTopic(t *testing.T) {} + +func TestConcurrency(t *testing.T) { + +} + +func TestMultipleGoroutine(t *testing.T) { + +} + +func TestConfigAndLimits(t *testing.T) { + +} + +func TestUnsubscribe(t *testing.T) { + +} + +func TestUnsubscribePendingFanout(t *testing.T) { + +} + +func TestInheritedDefaults(t *testing.T) { + +}