diff --git a/README.md b/README.md index 9c7baf3..a501fd1 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/sebundefined/thebus.svg)](https://pkg.go.dev/github.com/sebundefined/thebus) -[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/release.yml) +[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/release.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/release.yml) # thebus **thebus** is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. diff --git a/api.go b/api.go index 60bc999..4839293 100644 --- a/api.go +++ b/api.go @@ -4,10 +4,32 @@ import ( "context" ) +// Bus is the main interface of thebus. +// It provides a publish/subscribe API for sending and receiving messages +// on named topics. A Bus implementation is safe for concurrent use +// by multiple goroutines. type Bus interface { + // Publish sends a message on the given topic. + // If there are subscribers, the message is enqueued in the topic’s buffer + // and delivered asynchronously. If the buffer is full, behavior depends on + // configuration: the call could return ErrQueueFull. + // Returns a PublishAck indicating whether the message was enqueued + // and how many subscribers were present at publish. Publish(topic string, data []byte) (PublishAck, error) + // Subscribe registers a new subscription to the given topic. + // A subscription receives all messages published after it is created. + // Options (buffer size, drop policy, copy strategy, etc.) can be set + // via SubscribeOption functions. + // The subscription is automatically unsubscribed when the provided + // context is canceled. Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) + // Unsubscribe removes a subscriber by ID from a topic. + // It is safe to call multiple times; redundant calls are ignored. Unsubscribe(topic string, subscriberID string) error + // Close shuts down the bus and all topics. + // After Close, no further publish or subscribe is allowed. Existing + // subscriptions are closed and workers are stopped. Close is idempotent. Close() error + // Stats returns runtime statistics about the bus, topics, and subscribers. Stats() (StatsResults, error) } diff --git a/bus.go b/bus.go index 9452496..6a4f17e 100644 --- a/bus.go +++ b/bus.go @@ -20,6 +20,9 @@ type bus struct { var _ Bus = (*bus)(nil) +// New creates a new Bus instance with the given options. +// It applies default values, normalizes the configuration, and starts the bus in an open state. +// The returned Bus is ready to accept Publish and Subscribe calls. func New(opts ...Option) (Bus, error) { cfg := BuildConfig(opts...).Normalize() b := &bus{ @@ -32,6 +35,7 @@ func New(opts ...Option) (Bus, error) { return b, nil } +// Publish a message on a specific topic. It returns a PublishAck and/or an error. func (b *bus) Publish(topic string, data []byte) (PublishAck, error) { now := time.Now().UTC() if len(strings.TrimSpace(topic)) == 0 { @@ -40,59 +44,52 @@ func (b *bus) Publish(topic string, data []byte) (PublishAck, error) { if !b.open.Load() { return PublishAck{}, ErrClosed } - subscribersCount := 0 - var state *topicState + var ack PublishAck + var errOut error err := b.withReadState(topic, func(st *topicState) error { - if st == nil { + if st == nil || len(st.subs) == 0 { + ack = PublishAck{Topic: topic, Enqueued: false, Subscribers: 0} return nil } - subscribersCount = len(st.subs) - state = st + seq := st.seq.Add(1) + + payload := data + if b.cfg.CopyOnPublish { + cp := make([]byte, len(data)) + copy(cp, data) + payload = cp + } + + mr := messageRef{ + topic: topic, + ts: now, + seq: seq, + payload: payload, + } + + select { + case st.inQueue <- mr: + st.counters.Published.Add(1) + b.totals.Published.Add(1) + ack = PublishAck{ + Topic: topic, + Enqueued: true, + Subscribers: len(st.subs), + } + default: + ack = PublishAck{ + Topic: topic, + Enqueued: false, + Subscribers: len(st.subs), + } + errOut = ErrQueueFull + } return nil }) if err != nil { return PublishAck{}, err } - - if subscribersCount == 0 { - return PublishAck{ - Topic: topic, - Enqueued: false, - Subscribers: subscribersCount, - }, nil - } - - b.mutex.RLock() // protect against close - cur := b.subscriptions[topic] - if cur != state || cur == nil { - b.mutex.RUnlock() - return PublishAck{Topic: topic, Enqueued: false, Subscribers: subscribersCount}, nil - } - seq := state.seq.Add(1) - mr := messageRef{ - topic: topic, - ts: now, - seq: seq, - payload: data, - } - select { - case state.inQueue <- mr: - b.totals.Published.Add(1) - state.counters.Published.Add(1) - b.mutex.RUnlock() - return PublishAck{ - Topic: topic, - Enqueued: true, - Subscribers: subscribersCount, - }, nil - default: - b.mutex.RUnlock() - return PublishAck{ - Topic: topic, - Enqueued: false, - Subscribers: subscribersCount, - }, ErrQueueFull - } + return ack, errOut } func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) { @@ -165,8 +162,31 @@ func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error { } func (b *bus) Unsubscribe(topic string, subscriberID string) error { - //TODO implement me - panic("implement me") + // Standard check + if !b.open.Load() { + return nil + } + if len(strings.TrimSpace(topic)) == 0 { + return ErrInvalidTopic + } + b.mutex.RLock() + state, ok := b.subscriptions[topic] + if !ok { + b.mutex.RUnlock() + return nil + } + sub, ok := state.subs[subscriberID] + if !ok { + b.mutex.RUnlock() + return nil + } + b.mutex.RUnlock() + err := sub.unsubscribeFunc() + if err != nil { + return err + } + + return nil } func (b *bus) Close() error { diff --git a/config.go b/config.go index 3968001..bbb58f7 100644 --- a/config.go +++ b/config.go @@ -22,8 +22,9 @@ type Config struct { TopicIdleTTL time.Duration // if Janitor enabled (0 = off) JanitorInterval time.Duration // 0 = off IDGenerator IDGenerator // default to DefaultIDGenerator + CopyOnPublish bool // default false - // Default for subscribers (peuvent être override par Subscribe options) + // Default for subscribers (Can be overridden by sub) DefaultSubBufferSize int // default: 128 DefaultSendTimeout time.Duration // default: 200ms DefaultDropIfFull bool // default: true @@ -178,6 +179,12 @@ func WithIDGenerator(IDGenerator IDGenerator) Option { } } +func WithCopyOnPublish(b bool) Option { + return func(cfg *Config) { + cfg.CopyOnPublish = b + } +} + func BuildConfig(opts ...Option) *Config { cfg := DefaultConfig() for _, opt := range opts { diff --git a/message.go b/message.go index 8b8e7d7..0629f05 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,7 @@ package thebus import "time" +// Message represents a message delivered to a subscriber. type Message struct { Topic string Timestamp time.Time diff --git a/stats.go b/stats.go index b226e97..a79f5b9 100644 --- a/stats.go +++ b/stats.go @@ -5,6 +5,7 @@ import ( "time" ) +// Counters holds numeric counters for published, delivered, etc. type Counters struct { Published uint64 Delivered uint64 @@ -12,6 +13,7 @@ type Counters struct { Dropped uint64 } +// StatsResults represents aggregated statistics of the bus. type StatsResults struct { StartedAt time.Time Open bool @@ -21,6 +23,7 @@ type StatsResults struct { PerTopic map[string]TopicStats } +// TopicStats represents statistics for a single topic. type TopicStats struct { Subscribers int Buffered int diff --git a/subscribe.go b/subscribe.go index e6e192c..a5283e0 100644 --- a/subscribe.go +++ b/subscribe.go @@ -63,10 +63,15 @@ func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error { return nil } +// Subscription represents a live subscription to a topic. type Subscription interface { + // GetID returns the unique ID of the subscription. GetID() string + // GetTopic returns the topic name of the subscription. GetTopic() string + // Read returns the channel from which messages can be consumed. Read() <-chan Message + // Unsubscribe cancels the subscription. Unsubscribe() error }