Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/sebundefined/thebus.svg)](https://pkg.go.dev/github.com/sebundefined/thebus)
[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/release.yml)
[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/release.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/release.yml)

# thebus
**thebus** is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging.
Expand Down
22 changes: 22 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,32 @@ import (
"context"
)

// Bus is the main interface of thebus.
// It provides a publish/subscribe API for sending and receiving messages
// on named topics. A Bus implementation is safe for concurrent use
// by multiple goroutines.
type Bus interface {
// Publish sends a message on the given topic.
// If there are subscribers, the message is enqueued in the topic’s buffer
// and delivered asynchronously. If the buffer is full, behavior depends on
// configuration: the call could return ErrQueueFull.
// Returns a PublishAck indicating whether the message was enqueued
// and how many subscribers were present at publish.
Publish(topic string, data []byte) (PublishAck, error)
// Subscribe registers a new subscription to the given topic.
// A subscription receives all messages published after it is created.
// Options (buffer size, drop policy, copy strategy, etc.) can be set
// via SubscribeOption functions.
// The subscription is automatically unsubscribed when the provided
// context is canceled.
Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error)
// Unsubscribe removes a subscriber by ID from a topic.
// It is safe to call multiple times; redundant calls are ignored.
Unsubscribe(topic string, subscriberID string) error
// Close shuts down the bus and all topics.
// After Close, no further publish or subscribe is allowed. Existing
// subscriptions are closed and workers are stopped. Close is idempotent.
Close() error
// Stats returns runtime statistics about the bus, topics, and subscribers.
Stats() (StatsResults, error)
}
114 changes: 67 additions & 47 deletions bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type bus struct {

var _ Bus = (*bus)(nil)

// New creates a new Bus instance with the given options.
// It applies default values, normalizes the configuration, and starts the bus in an open state.
// The returned Bus is ready to accept Publish and Subscribe calls.
func New(opts ...Option) (Bus, error) {
cfg := BuildConfig(opts...).Normalize()
b := &bus{
Expand All @@ -32,6 +35,7 @@ func New(opts ...Option) (Bus, error) {
return b, nil
}

// Publish a message on a specific topic. It returns a PublishAck and/or an error.
func (b *bus) Publish(topic string, data []byte) (PublishAck, error) {
now := time.Now().UTC()
if len(strings.TrimSpace(topic)) == 0 {
Expand All @@ -40,59 +44,52 @@ func (b *bus) Publish(topic string, data []byte) (PublishAck, error) {
if !b.open.Load() {
return PublishAck{}, ErrClosed
}
subscribersCount := 0
var state *topicState
var ack PublishAck
var errOut error
err := b.withReadState(topic, func(st *topicState) error {
if st == nil {
if st == nil || len(st.subs) == 0 {
ack = PublishAck{Topic: topic, Enqueued: false, Subscribers: 0}
return nil
}
subscribersCount = len(st.subs)
state = st
seq := st.seq.Add(1)

payload := data
if b.cfg.CopyOnPublish {
cp := make([]byte, len(data))
copy(cp, data)
payload = cp
}

mr := messageRef{
topic: topic,
ts: now,
seq: seq,
payload: payload,
}

select {
case st.inQueue <- mr:
st.counters.Published.Add(1)
b.totals.Published.Add(1)
ack = PublishAck{
Topic: topic,
Enqueued: true,
Subscribers: len(st.subs),
}
default:
ack = PublishAck{
Topic: topic,
Enqueued: false,
Subscribers: len(st.subs),
}
errOut = ErrQueueFull
}
return nil
})
if err != nil {
return PublishAck{}, err
}

if subscribersCount == 0 {
return PublishAck{
Topic: topic,
Enqueued: false,
Subscribers: subscribersCount,
}, nil
}

b.mutex.RLock() // protect against close
cur := b.subscriptions[topic]
if cur != state || cur == nil {
b.mutex.RUnlock()
return PublishAck{Topic: topic, Enqueued: false, Subscribers: subscribersCount}, nil
}
seq := state.seq.Add(1)
mr := messageRef{
topic: topic,
ts: now,
seq: seq,
payload: data,
}
select {
case state.inQueue <- mr:
b.totals.Published.Add(1)
state.counters.Published.Add(1)
b.mutex.RUnlock()
return PublishAck{
Topic: topic,
Enqueued: true,
Subscribers: subscribersCount,
}, nil
default:
b.mutex.RUnlock()
return PublishAck{
Topic: topic,
Enqueued: false,
Subscribers: subscribersCount,
}, ErrQueueFull
}
return ack, errOut
}

func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) {
Expand Down Expand Up @@ -165,8 +162,31 @@ func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error {
}

func (b *bus) Unsubscribe(topic string, subscriberID string) error {
//TODO implement me
panic("implement me")
// Standard check
if !b.open.Load() {
return nil
}
if len(strings.TrimSpace(topic)) == 0 {
return ErrInvalidTopic
}
b.mutex.RLock()
state, ok := b.subscriptions[topic]
if !ok {
b.mutex.RUnlock()
return nil
}
sub, ok := state.subs[subscriberID]
if !ok {
b.mutex.RUnlock()
return nil
}
b.mutex.RUnlock()
err := sub.unsubscribeFunc()
if err != nil {
return err
}

return nil
}

func (b *bus) Close() error {
Expand Down
9 changes: 8 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type Config struct {
TopicIdleTTL time.Duration // if Janitor enabled (0 = off)
JanitorInterval time.Duration // 0 = off
IDGenerator IDGenerator // default to DefaultIDGenerator
CopyOnPublish bool // default false

// Default for subscribers (peuvent être override par Subscribe options)
// Default for subscribers (Can be overridden by sub)
DefaultSubBufferSize int // default: 128
DefaultSendTimeout time.Duration // default: 200ms
DefaultDropIfFull bool // default: true
Expand Down Expand Up @@ -178,6 +179,12 @@ func WithIDGenerator(IDGenerator IDGenerator) Option {
}
}

func WithCopyOnPublish(b bool) Option {
return func(cfg *Config) {
cfg.CopyOnPublish = b
}
}

func BuildConfig(opts ...Option) *Config {
cfg := DefaultConfig()
for _, opt := range opts {
Expand Down
1 change: 1 addition & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package thebus

import "time"

// Message represents a message delivered to a subscriber.
type Message struct {
Topic string
Timestamp time.Time
Expand Down
3 changes: 3 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"time"
)

// Counters holds numeric counters for published, delivered, etc.
type Counters struct {
Published uint64
Delivered uint64
Failed uint64
Dropped uint64
}

// StatsResults represents aggregated statistics of the bus.
type StatsResults struct {
StartedAt time.Time
Open bool
Expand All @@ -21,6 +23,7 @@ type StatsResults struct {
PerTopic map[string]TopicStats
}

// TopicStats represents statistics for a single topic.
type TopicStats struct {
Subscribers int
Buffered int
Expand Down
5 changes: 5 additions & 0 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error {
return nil
}

// Subscription represents a live subscription to a topic.
type Subscription interface {
// GetID returns the unique ID of the subscription.
GetID() string
// GetTopic returns the topic name of the subscription.
GetTopic() string
// Read returns the channel from which messages can be consumed.
Read() <-chan Message
// Unsubscribe cancels the subscription.
Unsubscribe() error
}

Expand Down
Loading