diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..0713b60 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,23 @@ +name: thebus_ci + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + name: thebus_test + runs-on: ubuntu-22.04 + + steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v5 + - name: Set up Go 1.24 + uses: actions/setup-go@v6 + with: + go-version: "1.24" + - run: go version \ No newline at end of file diff --git a/.gitignore b/.gitignore index aaadf73..ed20272 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,5 @@ go.work.sum .env # Editor/IDE -# .idea/ -# .vscode/ +.idea/ +.vscode/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..23f1557 --- /dev/null +++ b/Makefile @@ -0,0 +1,58 @@ +# Based on amazing example from https://gist.github.com/serinth/16391e360692f6a000e5a10382d1148c +SERVICE ?= $(shell basename `go list`) +VERSION ?= $(shell git describe --tags --always --dirty --match=v* 2> /dev/null || cat $(PWD)/.version 2> /dev/null || echo v0) +PACKAGE ?= $(shell go list) +PACKAGES ?= $(shell go list ./...) +FILES ?= $(shell find . -type f -name '*.go' -not -path "./vendor/*") + +# Binaries +PROTOC ?= protoc + +.PHONY: help clean fmt lint vet test test-cover all + +default: help + +help: ## show this help + @echo 'usage: make [target] ...' + @echo '' + @echo 'targets:' + @egrep '^(.+)\:\ .*##\ (.+)' ${MAKEFILE_LIST} | sed 's/:.*##/#/' | column -t -c 2 -s '#' + +all: ## clean, format, build and unit test + make clean-all + make gofmt + make build + make test + +env: ## Print useful environment variables to stdout + echo $(CURDIR) + echo $(SERVICE) + echo $(PACKAGE) + echo $(VERSION) + +clean: ## go clean + go clean + +build: + go build main.go + +test: + go test -v ./... -short + +test-it: + go test -v ./... + +test-bench: ## run benchmark tests + go test -bench ./... + +# Generate test coverage +test-cover: ## Run test coverage and generate html report + rm -fr coverage + mkdir coverage + go list -f '{{if gt (len .TestGoFiles) 0}}"go test -covermode count -coverprofile {{.Name}}.coverprofile -coverpkg ./... {{.ImportPath}}"{{end}}' ./... | xargs -I {} bash -c {} + echo "mode: count" > coverage/cover.out + grep -h -v "^mode:" *.coverprofile >> "coverage/cover.out" + rm *.coverprofile + go tool cover -html=coverage/cover.out -o=coverage/cover.html + +test-all: test test-bench test-cover \ No newline at end of file diff --git a/README.md b/README.md index 68a9527..415d604 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,39 @@ +[![Go Reference](https://pkg.go.dev/badge/github.com/sebundefined/thebus/v1.svg)](https://pkg.go.dev/github.com/sebundefined/thebus/v1) +[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/ci.yml) + # thebus -thebus is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. +**thebus** is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging. + + +## Getting started + +Just install **thebus** in your project by using the following command. + +```shell +go get -u github.com/sebundefined/thebus +``` +## Example Usage + +### Simple + +```go +package main + + +``` + +### With custom options + + +## Features + + +## Why would you choose a thebus for your app ? + +## Testing + +See [CONTRIBUTING.md](./CONTRIBUTING.md) for instructions. + +## Versioning + + diff --git a/api.go b/api.go new file mode 100644 index 0000000..60bc999 --- /dev/null +++ b/api.go @@ -0,0 +1,13 @@ +package thebus + +import ( + "context" +) + +type Bus interface { + Publish(topic string, data []byte) (PublishAck, error) + Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) + Unsubscribe(topic string, subscriberID string) error + Close() error + Stats() (StatsResults, error) +} diff --git a/bench_test.go b/bench_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/bench_test.go @@ -0,0 +1 @@ +package thebus diff --git a/bus.go b/bus.go new file mode 100644 index 0000000..72dd2a5 --- /dev/null +++ b/bus.go @@ -0,0 +1,222 @@ +package thebus + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" +) + +type bus struct { + mutex sync.RWMutex + cfg *Config + open atomic.Bool + startedAt time.Time + subscriptions map[string]*topicState + totals atomicCounters +} + +var _ Bus = (*bus)(nil) + +func New(opts ...Option) (Bus, error) { + cfg := BuildConfig(opts...).Normalize() + b := &bus{ + startedAt: time.Now(), + cfg: cfg, + totals: atomicCounters{}, + subscriptions: make(map[string]*topicState), + } + b.open.Store(true) + return b, nil +} + +func (b *bus) Publish(topic string, data []byte) (PublishAck, error) { + //TODO implement me + panic("implement me") +} + +func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) { + // Standard checks + if !b.open.Load() { + return nil, ErrClosed + } + if len(strings.TrimSpace(topic)) == 0 { + return nil, ErrInvalidTopic + } + + // Building the config based on the default one + // (check in the future the default @SebUndefined) + cfg := BuildSubscriptionConfig(opts...).Normalize() + + // Building the subscription + id := b.cfg.IDGenerator() + msgChan := make(chan Message, cfg.BufferSize) + sub := &subscription{ + subscriptionID: id, + cfg: cfg, + topic: topic, + messageChan: msgChan, + messages: (<-chan Message)(msgChan), + } + // Saving, function under lock so ok + err := b.withWriteState(topic, true, func(state *topicState) error { + // Recheck in case of closed before the first lock + // It is possible that someone close it pending we wait for the first lock + // I do it for avoiding weird state.... + if !b.open.Load() { + // unlock useless here because it is handled by withWriteState + return ErrClosed + } + if b.cfg.MaxSubscribersPerTopic > 0 && len(state.subs) >= b.cfg.MaxSubscribersPerTopic { + return fmt.Errorf("too many subscribers per topic (max: %d)", b.cfg.MaxSubscribersPerTopic) + } + state.subs[id] = sub + return nil + }) + if err != nil { + return nil, err + } + sub.unsubscribeFunc = b.buildUnsubscribeFunction(id, topic) + go func() { + select { + case <-ctx.Done(): + _ = sub.Unsubscribe() // idempotent + } + }() + + return sub, nil +} + +func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error { + return func() error { + b.mutex.Lock() + if state, ok := b.subscriptions[topic]; ok { + delete(state.subs, id) + if len(state.subs) == 0 && len(state.inQueue) == 0 && b.cfg.AutoDeleteEmptyTopics { + if state.closed.CompareAndSwap(false, true) { + close(state.inQueue) + delete(b.subscriptions, topic) + } + } + } + b.mutex.Unlock() + return nil + } +} + +func (b *bus) Unsubscribe(topic string, subscriberID string) error { + //TODO implement me + panic("implement me") +} + +func (b *bus) Close() error { + // refuse new publish and subscribe + if !b.open.CompareAndSwap(true, false) { + return nil + } + b.mutex.Lock() + states := make([]*topicState, 0, len(b.subscriptions)) + for _, st := range b.subscriptions { + states = append(states, st) + } + // close the inQueue of each topic + for _, st := range states { + if st.closed.CompareAndSwap(false, true) { // ← idem ici + close(st.inQueue) + } + } + b.mutex.Unlock() + + // Wait for the worker to stop + for _, st := range states { + st.wg.Wait() + } + + // Cleaning memory + b.mutex.Lock() + b.subscriptions = make(map[string]*topicState) // reset propre + b.mutex.Unlock() + + return nil +} + +func (b *bus) Stats() (StatsResults, error) { + b.mutex.RLock() + defer b.mutex.RUnlock() + perTopic := make(map[string]TopicStats, len(b.subscriptions)) + subscriberCounts := 0 + for topic, state := range b.subscriptions { + buffered := 0 + for _, sub := range state.subs { + subscriberCounts++ + buffered += len(sub.messageChan) + } + perTopic[topic] = TopicStats{ + Subscribers: len(state.subs), + Buffered: buffered, + Counters: Counters{ + Published: state.counters.Published.Load(), + Delivered: state.counters.Delivered.Load(), + Failed: state.counters.Failed.Load(), + Dropped: state.counters.Dropped.Load(), + }, + } + } + s := StatsResults{ + StartedAt: b.startedAt, + Open: b.open.Load(), + Topics: len(b.subscriptions), + Subscribers: subscriberCounts, + Totals: Counters{ + Published: b.totals.Published.Load(), + Delivered: b.totals.Delivered.Load(), + Failed: b.totals.Failed.Load(), + Dropped: b.totals.Dropped.Load(), + }, + PerTopic: perTopic, + } + return s, nil +} + +func (b *bus) withReadState(topic string, callback func(st *topicState) error) error { + b.mutex.RLock() + defer b.mutex.RUnlock() + return callback(b.subscriptions[topic]) +} + +func (b *bus) withWriteState(topic string, createIfNotExists bool, writeFunc func(state *topicState) error) error { + b.mutex.Lock() + state, ok := b.subscriptions[topic] + start := false + if !ok { + if !createIfNotExists { + err := writeFunc(nil) + b.mutex.Unlock() + return err + } + if b.cfg.MaxTopics > 0 && len(b.subscriptions) >= b.cfg.MaxTopics { + b.mutex.Unlock() + return fmt.Errorf("too many topics (max=%d)", b.cfg.MaxTopics) + } + qSize := b.cfg.TopicQueueSize + if qSize <= 0 { + qSize = DefaultTopicQueueSize + } + state = newTopicState(qSize) + // add the state + b.subscriptions[topic] = state + + if state.started.CompareAndSwap(false, true) { + state.wg.Add(1) + start = true + } + } + err := writeFunc(state) + b.mutex.Unlock() + if start { + go b.runFanOut(topic, state) + } + return err +} diff --git a/bus_test.go b/bus_test.go new file mode 100644 index 0000000..8a2af37 --- /dev/null +++ b/bus_test.go @@ -0,0 +1,352 @@ +package thebus + +import ( + "testing" + "time" +) + +func TestWithReadState(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withReadState("nope", func(st *topicState) error { + if st != nil { + t.Fatal("expected nil state for unknown topic") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + // Test if exists + err = bb.withWriteState("nope", true, func(st *topicState) error { + if st == nil { + t.Fatal("expected non-nil state for unknown topic") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + +func TestWithWriteStateNoCreation(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + called := false + err := bb.withWriteState("t", false, func(st *topicState) error { + called = true + if st != nil { + t.Fatal("expected nil state when createIfNotExists=false and topic missing") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + if !called { + t.Fatal("callback not called") + } +} + +func TestWithWriteStateCreateIfNotExists(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withWriteState("t", true, func(st *topicState) error { + if st == nil { + t.Fatal("state should be created") + } + if st.inQueue == nil { + t.Fatal("inQueue should be initialized") + } + if !st.started.Load() { + t.Fatal("worker should be marked started") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} +func TestWithWriteStateCreateMaxTopics(t *testing.T) { + b, _ := New(WithMaxTopics(1)) + defer b.Close() + bb := b.(*bus) + + // 1rst topic should be ok + if err := bb.withWriteState("t1", true, func(st *topicState) error { + if st == nil { + t.Fatal("st should not be nil") + } + return nil + }); err != nil { + t.Fatal(err) + } + + // use same topic, should not block + if err := bb.withWriteState("t1", true, func(st *topicState) error { + if st == nil { + t.Fatal("st should not be nil (existing topic)") + } + return nil + }); err != nil { + t.Fatalf("unexpected err on existing topic: %v", err) + } + + // Break ! max topic should border the creation + if err := bb.withWriteState("t2", true, func(st *topicState) error { return nil }); err == nil { + t.Fatal("expected error MaxTopics exceeded") + } + + // The topic 1 is stated (just for clarification + bb.mutex.RLock() + st := bb.subscriptions["t1"] + bb.mutex.RUnlock() + if st == nil || !st.started.Load() { + t.Fatal("topic t1 should exist and be started") + } +} + +func TestWithWriteStateCreateStartTopicRead(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + err := bb.withWriteState("t", true, func(st *topicState) error { + // Add a message for checking the health + select { + case st.inQueue <- messageRef{topic: "t", ts: time.Now(), payload: []byte("x")}: + default: + t.Fatal("inQueue should be writable") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + // Wait for worker to read, should not panic + time.Sleep(10 * time.Millisecond) +} + +func TestSnapshotSubsLocked(t *testing.T) { + m := map[string]*subscription{ + "A": {subscriptionID: "A"}, + } + out := snapshotSubsLocked(m) + if len(out) != 1 { + t.Fatalf("want 1, got %d", len(out)) + } + + // Mutate the map, should not modify the main map from the bus + delete(m, "A") + if len(out) != 1 { + t.Fatal("snapshot should be independent from map mutation") + } +} + +type testInputTryDelivery struct { + sub *subscription + msg Message + timer *time.Timer +} + +func TestTryDeliver(t *testing.T) { + tests := []struct { + name string + input testInputTryDelivery + output bool + }{ + { + name: "success", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: true, + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{}, + nil, + }, + output: true, + }, + { + name: "failure", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: true, + }, + }, + Message{ + Topic: "test", + Timestamp: time.Now(), + Payload: []byte("test"), + Seq: 0, + }, + nil, + }, + output: false, + }, + { + name: "success_timeout", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID1", + cfg: SubscriptionConfig{ + DropIfFull: false, + SendTimeout: 2 * time.Second, + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{}, + time.NewTimer(2 * time.Second), + }, + output: true, + }, + { + name: "failure_timeout", + input: testInputTryDelivery{ + &subscription{ + subscriptionID: "ID2", + cfg: SubscriptionConfig{ + DropIfFull: false, + SendTimeout: 50 * time.Millisecond, // court timeout + }, + topic: "test", + messageChan: make(chan Message, 1), + }, + Message{ + Topic: "test", + Timestamp: time.Now(), + Payload: []byte("x"), + }, + time.NewTimer(time.Hour), + }, + output: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // ugly, but for this use case specifically we fill the chan + // It will result in a timeout + if test.name == "failure_timeout" { + test.input.sub.messageChan <- Message{} + } + res := tryDeliver(test.input.sub, test.input.msg, test.input.timer) + if res != test.output { + t.Errorf("res = %v, want %v", res, test.output) + } + }) + } +} + +func TestUnsubscribeDeleteTopicOnLastSub(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + // create a topic and a sub manually + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + + // build unsubribe fonc and test it + unsub := bb.buildUnsubscribeFunction("S", "t") + if err := unsub(); err != nil { + t.Fatal(err) + } + + // check if deleted + bb.mutex.RLock() + _, ok := bb.subscriptions["t"] + bb.mutex.RUnlock() + if ok { + t.Fatal("topic should be deleted when last sub removed and queue empty") + } +} + +func TestUnsubscribeKeepTopicWhenAutoDeleteDisabled(t *testing.T) { + b, _ := New(WithAutoDeleteEmptyTopics(false)) + bb := b.(*bus) + + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + unsub := bb.buildUnsubscribeFunction("S", "t") + _ = unsub() + + bb.mutex.RLock() + _, ok := bb.subscriptions["t"] + bb.mutex.RUnlock() + if !ok { + t.Fatal("topic should remain when auto-delete is disabled") + } +} + +func TestRunFanOutDeliveredCounter(t *testing.T) { + b, _ := New() + bb := b.(*bus) + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.subs["S"] = &subscription{subscriptionID: "S", messageChan: make(chan Message, 1)} + return nil + }) + // push a message + _ = bb.withWriteState("t", true, func(st *topicState) error { + select { + case st.inQueue <- messageRef{ + topic: "t", + ts: time.Now(), + seq: 0, + payload: nil}: + + default: + t.Fatal("should not receive any messages") + } + return nil + }) + time.Sleep(20 * time.Millisecond) + bb.mutex.RLock() + ts := bb.subscriptions["t"] + del := ts.counters.Delivered.Load() + bb.mutex.RUnlock() + if del < 1 { + t.Fatalf("expected Delivered>=1, got %d", del) + } +} + +func TestRunFanOutDroppedCounter(t *testing.T) { + b, _ := New() + bb := b.(*bus) + + _ = bb.withWriteState("t", true, func(st *topicState) error { + sub := &subscription{ + subscriptionID: "S", + cfg: SubscriptionConfig{BufferSize: 1, DropIfFull: true}, + messageChan: make(chan Message, 1), + } + sub.messageChan <- Message{} + st.subs["S"] = sub + return nil + }) + _ = bb.withWriteState("t", true, func(st *topicState) error { + st.inQueue <- messageRef{topic: "t", payload: []byte("x"), ts: time.Now()} + return nil + }) + time.Sleep(20 * time.Millisecond) + + bb.mutex.RLock() + ts := bb.subscriptions["t"] + dropped := ts.counters.Dropped.Load() + bb.mutex.RUnlock() + if dropped < 1 { + t.Fatalf("expected Dropped>=1, got %d", dropped) + } +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..3968001 --- /dev/null +++ b/config.go @@ -0,0 +1,187 @@ +package thebus + +import ( + "time" +) + +const ( + DefaultTopicQueueSize = 1024 + DefaultSubBufferSize = 128 + DefaultSendTimeout = 200 * time.Millisecond +) + +// ############################################################################## +// ################################ CONFIG ################################## +// ############################################################################## + +// Config is the main configuration for thebus +type Config struct { + // Topics / queues + TopicQueueSize int // default: 1024 + AutoDeleteEmptyTopics bool // default: true + TopicIdleTTL time.Duration // if Janitor enabled (0 = off) + JanitorInterval time.Duration // 0 = off + IDGenerator IDGenerator // default to DefaultIDGenerator + + // Default for subscribers (peuvent être override par Subscribe options) + DefaultSubBufferSize int // default: 128 + DefaultSendTimeout time.Duration // default: 200ms + DefaultDropIfFull bool // default: true + DefaultStrategy SubscriptionStrategy // default: SubscriptionStrategyPayloadShared + + // Limits (0 = unlimited) + MaxTopics int // default : 0 (unlimited) + MaxSubscribersPerTopic int // default : 0 (unlimited) + + // Observability + Logger Logger // default: nopLogger + Metrics MetricsHooks // default: nopMetrics + PanicHandler func(topic string, v any) // default: nil (no recover) +} + +func (cfg *Config) Normalize() *Config { + if cfg.TopicQueueSize <= 0 { + cfg.TopicQueueSize = DefaultTopicQueueSize + } + if cfg.TopicIdleTTL < 0 { + cfg.TopicIdleTTL = 0 + } + if cfg.JanitorInterval < 0 { + cfg.JanitorInterval = 0 + } + if cfg.IDGenerator == nil { + cfg.IDGenerator = DefaultIDGenerator + } + if cfg.DefaultSubBufferSize <= 0 { + cfg.DefaultSubBufferSize = DefaultSubBufferSize + } + if cfg.DefaultSendTimeout <= 0 { + cfg.DefaultSendTimeout = DefaultSendTimeout + } + if !cfg.DefaultStrategy.IsValid() { + cfg.DefaultStrategy = SubscriptionStrategyPayloadShared + } + if cfg.MaxTopics < 0 { + cfg.MaxTopics = 0 + } + if cfg.MaxSubscribersPerTopic < 0 { + cfg.MaxSubscribersPerTopic = 0 + } + if cfg.Logger == nil { + cfg.Logger = &noopLogger{} + } + return cfg +} + +func DefaultConfig() *Config { + return &Config{ + TopicQueueSize: DefaultTopicQueueSize, + AutoDeleteEmptyTopics: true, + TopicIdleTTL: 0 * time.Second, + JanitorInterval: 0 * time.Second, + DefaultSubBufferSize: DefaultSubBufferSize, + DefaultSendTimeout: DefaultSendTimeout, + DefaultDropIfFull: true, + DefaultStrategy: SubscriptionStrategyPayloadShared, + IDGenerator: DefaultIDGenerator, + Logger: NoopLogger(), + } +} + +// ############################################################################## +// ############################### OPTIONS ################################## +// ############################################################################## + +type Option func(*Config) + +func WithTopicQueueSize(size int) Option { + return func(cfg *Config) { + cfg.TopicQueueSize = size + } +} + +func WithAutoDeleteEmptyTopics(b bool) Option { + return func(cfg *Config) { + cfg.AutoDeleteEmptyTopics = b + } +} + +func WithTopicIdleTTL(ttl time.Duration) Option { + return func(cfg *Config) { + cfg.TopicIdleTTL = ttl + } +} + +func WithJanitorInterval(interval time.Duration) Option { + return func(cfg *Config) { + cfg.JanitorInterval = interval + } +} + +func WithDefaultSubBufferSize(size int) Option { + return func(cfg *Config) { + cfg.DefaultSubBufferSize = size + } +} + +func WithDefaultSendTimeout(timeout time.Duration) Option { + return func(cfg *Config) { + cfg.DefaultSendTimeout = timeout + } +} + +func WithDefaultDropIfFull() Option { + return func(cfg *Config) { + cfg.DefaultDropIfFull = true + } +} + +func WithDefaultStrategy(strategy SubscriptionStrategy) Option { + return func(cfg *Config) { + cfg.DefaultStrategy = strategy + } +} + +func WithMaxTopics(max int) Option { + return func(cfg *Config) { + cfg.MaxTopics = max + } +} + +func WithMaxSubscribersPerTopic(max int) Option { + return func(cfg *Config) { + cfg.MaxSubscribersPerTopic = max + } +} + +func WithLogger(logger Logger) Option { + return func(cfg *Config) { + cfg.Logger = logger + } +} + +func WithMetrics(metrics MetricsHooks) Option { + return func(cfg *Config) { + cfg.Metrics = metrics + } +} + +func WithPanicHandler(handler func(topic string, v any)) Option { + return func(cfg *Config) { + cfg.PanicHandler = handler + } +} + +func WithIDGenerator(IDGenerator IDGenerator) Option { + return func(cfg *Config) { + cfg.IDGenerator = IDGenerator + } +} + +func BuildConfig(opts ...Option) *Config { + cfg := DefaultConfig() + for _, opt := range opts { + opt(cfg) + } + return cfg +} diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/config_test.go @@ -0,0 +1 @@ +package thebus diff --git a/error.go b/error.go new file mode 100644 index 0000000..83bd8a7 --- /dev/null +++ b/error.go @@ -0,0 +1,13 @@ +package thebus + +import "errors" + +var ( + ErrClosed = errors.New("thebus.closed") + ErrQueueFull = errors.New("thebus.queue.full") + ErrInvalidTopic = errors.New("thebus.invalid.topic") + ErrInvalidSubscriberID = errors.New("thebus.invalid.subscriberID") + ErrInvalidTopicName = errors.New("thebus.invalid.topic.name") + ErrInvalidTopicNameReserved = errors.New("thebus.invalid.topic.name.reserved") + ErrIDGeneratorNotSet = errors.New("thebus.idGenerator.not_set") +) diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/example_test.go @@ -0,0 +1 @@ +package thebus diff --git a/fanout.go b/fanout.go new file mode 100644 index 0000000..7beb042 --- /dev/null +++ b/fanout.go @@ -0,0 +1,73 @@ +package thebus + +import "time" + +func (b *bus) runFanOut(topic string, state *topicState) { + defer state.wg.Done() + + timer := time.NewTimer(time.Hour) + if !timer.Stop() { + <-timer.C + } + + for mr := range state.inQueue { + // snapshot sous RLock + b.mutex.RLock() + subs := snapshotSubsLocked(state.subs) + b.mutex.RUnlock() + + for _, sub := range subs { + msg := makeMessage(topic, mr, sub) + if tryDeliver(sub, msg, timer) { + state.counters.Delivered.Add(1) + b.totals.Delivered.Add(1) + } else { + state.counters.Dropped.Add(1) + b.totals.Dropped.Add(1) + } + } + } +} + +// snapshotSubsLocked caller must hold RLock +func snapshotSubsLocked(m map[string]*subscription) []*subscription { + subs := make([]*subscription, 0, len(m)) + for _, s := range m { + subs = append(subs, s) + } + return subs +} + +func tryDeliver(sub *subscription, msg Message, timer *time.Timer) bool { + cfg := sub.cfg + if cfg.DropIfFull { + select { + case sub.messageChan <- msg: + return true + default: + return false + } + } + // timeout + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(cfg.SendTimeout) + + select { + case sub.messageChan <- msg: + // flosh timer if needed + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + return true + case <-timer.C: + return false + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9462ab1 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/sebundefined/thebus + +go 1.24 diff --git a/id.go b/id.go new file mode 100644 index 0000000..ca681d4 --- /dev/null +++ b/id.go @@ -0,0 +1,115 @@ +package thebus + +import ( + "crypto/rand" + "sync" + "time" +) + +// IDGenerator - +type IDGenerator func() string + +func init() { + _, err := rand.Read(stateIDGen.entropy[:]) + if err != nil { + panic(err) + } +} + +// crockfordAlphabet see https://www.baeldung.com/cs/crockfords-base32-encoding +// for a simple article about it +const crockfordAlphabet = "0123456789ABCDEFGHJKMNPQRSTVWXYZ" + +// stateIDGenerator - allows me to keep track of the lastTs and +// generate the entropy again in case we change millisecond +type stateIDGenerator struct { + mu sync.Mutex + lastTs int64 + entropy [10]byte +} + +// next generate again the entropy if the ms increased. +// Get the latest if decrease and increase it if we are in the same ms +func (s *stateIDGenerator) next() (ts int64, e [10]byte) { + now := time.Now().UnixMilli() + if now > s.lastTs { + s.lastTs = now + _, err := rand.Read(s.entropy[:]) + if err != nil { + panic(err) + } + } else if now == s.lastTs { + incrementEntropyBE(&s.entropy) + } else { + now = s.lastTs + incrementEntropyBE(&s.entropy) + } + return now, s.entropy +} + +// stateIDGen - global var for the state +// see init for the default entropy +var stateIDGen = &stateIDGenerator{ + lastTs: time.Now().UnixMilli(), +} + +// DefaultIDGenerator - generate a new uniq id. +// See WithIDGenerator Option if you want to include your own generator (ex: uuid.UUID from Google) +func DefaultIDGenerator() string { + stateIDGen.mu.Lock() + ts, e := stateIDGen.next() + stateIDGen.mu.Unlock() + return encodeULIDLike(ts, e) +} + +// incrementEntropyBE increment the entropy. +// See it as a km/miles counter on a car. The latest number on the counter is increased +// each km/miles. If the increase operation result is 0, it means we have to increase the next number (index - 1) +func incrementEntropyBE(e *[10]byte) { + for i := 9; i >= 0; i-- { + e[i]++ + if e[i] != 0 { + return + } + } +} + +func encodeULIDLike(tsMillis int64, entropy [10]byte) string { + buf := [26]byte{} + writeTimestamp48ToBase32(uint64(tsMillis), buf[0:10]) + writeEntropy80ToBase32(entropy, buf[10:26]) + return string(buf[:]) +} + +// writeTimestamp48ToBase32 +func writeTimestamp48ToBase32(ts uint64, out []byte) { + if len(out) != 10 { + panic("writeTimestamp48ToBase32: out must be len 10") + } + shifts := [...]uint{45, 40, 35, 30, 25, 20, 15, 10, 5, 0} + for i, sh := range shifts { + idx := byte((ts >> sh) & 0x1F) // 0x1F = 5 bits + out[i] = crockfordAlphabet[idx] + } +} + +func writeEntropy80ToBase32(e [10]byte, out []byte) { + if len(out) != 16 { + panic("writeEntropy80ToBase32: out must be len 16") + } + var acc uint64 + bits := 0 + pos := 0 + + for i := 0; i < len(e); i++ { + acc = (acc << 8) | uint64(e[i]) // on pousse 8 bits + bits += 8 + // tant qu'on a au moins 5 bits en stock, on sort 1 char + for bits >= 5 { + idx := byte((acc >> uint(bits-5)) & 0x1F) // prend les 5 bits de tête + out[pos] = crockfordAlphabet[idx] + pos++ + bits -= 5 + } + } +} diff --git a/id_test.go b/id_test.go new file mode 100644 index 0000000..0957d1f --- /dev/null +++ b/id_test.go @@ -0,0 +1,19 @@ +package thebus + +import ( + "testing" +) + +func TestDefaultIDGenerator(t *testing.T) { + var ids []string + for i := 0; i < 100; i++ { + ids = append(ids, DefaultIDGenerator()) + } + uniqValues := make(map[string]bool) + for _, id := range ids { + uniqValues[id] = true + } + if len(uniqValues) != len(ids) { + t.Errorf("uniqValues does not match len(ids): %v != %v", uniqValues, ids) + } +} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..3d8e64a --- /dev/null +++ b/logger.go @@ -0,0 +1,18 @@ +package thebus + +type Logger interface { + Debug(msg string, kv ...any) + Info(msg string, kv ...any) + Warn(msg string, kv ...any) + Error(msg string, kv ...any) +} + +type noopLogger struct{} + +func NoopLogger() Logger { + return &noopLogger{} +} +func (*noopLogger) Info(msg string, kv ...any) {} +func (*noopLogger) Error(msg string, kv ...any) {} +func (*noopLogger) Debug(msg string, kv ...any) {} +func (l *noopLogger) Warn(msg string, kv ...any) {} diff --git a/message.go b/message.go new file mode 100644 index 0000000..8b8e7d7 --- /dev/null +++ b/message.go @@ -0,0 +1,33 @@ +package thebus + +import "time" + +type Message struct { + Topic string + Timestamp time.Time + Payload []byte + Seq uint64 +} + +type messageRef struct { + topic string + ts time.Time + seq uint64 + payload []byte +} + +func makeMessage(topic string, mr messageRef, sub *subscription) Message { + msg := Message{ + Topic: topic, + Timestamp: mr.ts, + Seq: mr.seq, + } + if sub.cfg.Strategy == SubscriptionStrategyPayloadShared { + msg.Payload = mr.payload + } else { + buf := make([]byte, len(mr.payload)) + copy(buf, mr.payload) + msg.Payload = buf + } + return msg +} diff --git a/message_test.go b/message_test.go new file mode 100644 index 0000000..4aba2e6 --- /dev/null +++ b/message_test.go @@ -0,0 +1,63 @@ +package thebus + +import ( + "testing" + "time" +) + +func TestMakeMessage(t *testing.T) { + tests := []struct { + name string + topic string + messageRef messageRef + sub *subscription + outPut Message + }{ + { + name: "Payload Shared", + topic: "shared", + sub: &subscription{ + cfg: SubscriptionConfig{ + Strategy: SubscriptionStrategyPayloadShared, + }, + }, + messageRef: messageRef{ + topic: "shared", + ts: time.Now(), + seq: 0, + payload: []byte("payload"), + }, + }, + { + name: "Payload cloned", + topic: "cloned", + sub: &subscription{ + cfg: SubscriptionConfig{ + Strategy: SubscriptionStrategyPayloadShared, + }, + }, + messageRef: messageRef{ + topic: "cloned", + ts: time.Now(), + seq: 0, + payload: []byte("payload"), + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + m := makeMessage(test.topic, test.messageRef, test.sub) + var sameBacking bool + if len(m.Payload) > 0 && len(test.messageRef.payload) > 0 { + if &m.Payload[0] == &test.messageRef.payload[0] { + sameBacking = true + } + } + if test.sub.cfg.Strategy == SubscriptionStrategyPayloadShared && !sameBacking { + t.Fatal("Expected payload to be shared") + } else if test.sub.cfg.Strategy == SubscriptionStrategyPayloadClonedPerSubscriber && sameBacking { + t.Fatal("Expected payload to be cloned") + } + }) + } +} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..88cd026 --- /dev/null +++ b/metrics.go @@ -0,0 +1,8 @@ +package thebus + +type MetricsHooks interface { + IncPublished(topic string) + IncDelivered(topic string) + IncDropped(topic string) + IncFailed(topic string) +} diff --git a/prom/.gitkeep b/prom/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/publish.go b/publish.go new file mode 100644 index 0000000..ed6d5e7 --- /dev/null +++ b/publish.go @@ -0,0 +1,11 @@ +package thebus + +// PublishAck is returned when your client Publish a message on a topic. +// It returns the Topic, if the message is Enqueued or not and the number of +// subscribers. Note that Subscribers is a snapshot when published. +// it may not reflect the real subscribers count +type PublishAck struct { + Topic string + Enqueued bool + Subscribers int +} diff --git a/slog/logger.go b/slog/logger.go new file mode 100644 index 0000000..9d4a564 --- /dev/null +++ b/slog/logger.go @@ -0,0 +1,3 @@ +package slog + +type Logger interface{} diff --git a/state.go b/state.go new file mode 100644 index 0000000..a32a718 --- /dev/null +++ b/state.go @@ -0,0 +1 @@ +package thebus diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..b226e97 --- /dev/null +++ b/stats.go @@ -0,0 +1,35 @@ +package thebus + +import ( + "sync/atomic" + "time" +) + +type Counters struct { + Published uint64 + Delivered uint64 + Failed uint64 + Dropped uint64 +} + +type StatsResults struct { + StartedAt time.Time + Open bool + Topics int + Subscribers int + Totals Counters + PerTopic map[string]TopicStats +} + +type TopicStats struct { + Subscribers int + Buffered int + Counters +} + +type atomicCounters struct { + Published atomic.Uint64 + Delivered atomic.Uint64 + Failed atomic.Uint64 + Dropped atomic.Uint64 +} diff --git a/subscribe.go b/subscribe.go new file mode 100644 index 0000000..e6e192c --- /dev/null +++ b/subscribe.go @@ -0,0 +1,190 @@ +package thebus + +import ( + "encoding/json" + "fmt" + "slices" + "strings" + "sync" + "sync/atomic" + "time" +) + +// ############################################################################## +// ################################## ENUM ################################## +// ############################################################################## + +// SubscriptionStrategy define if the payload must be +// shared by the subscribers (SubscriptionStrategyPayloadShared, the default) +// or copied (SubscriptionStrategyPayloadClonedPerSubscriber) +type SubscriptionStrategy string + +const ( + SubscriptionStrategyUnknown SubscriptionStrategy = "UNKNOWN" + SubscriptionStrategyPayloadShared SubscriptionStrategy = "PAYLOAD_SHARED" + SubscriptionStrategyPayloadClonedPerSubscriber SubscriptionStrategy = "PAYLOAD_CLONED_PER_SUBSCRIBER" +) + +func (enum SubscriptionStrategy) String() string { + if len(strings.TrimSpace(string(enum))) == 0 { + return string(SubscriptionStrategyUnknown) + } + return string(enum) +} + +func SubscriptionStrategyValues() []SubscriptionStrategy { + return []SubscriptionStrategy{ + SubscriptionStrategyPayloadShared, + SubscriptionStrategyPayloadClonedPerSubscriber, + } +} + +func (enum SubscriptionStrategy) IsValid() bool { + if slices.Contains(SubscriptionStrategyValues(), enum) { + return true + } + return false +} + +func (enum SubscriptionStrategy) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, enum)), nil +} + +func (enum *SubscriptionStrategy) UnmarshalJSON(data []byte) error { + var tmp string + if err := json.Unmarshal(data, &tmp); err != nil { + return err + } + fs := SubscriptionStrategy(tmp) + if !fs.IsValid() { + fs = SubscriptionStrategyUnknown + } + *enum = fs + return nil +} + +type Subscription interface { + GetID() string + GetTopic() string + Read() <-chan Message + Unsubscribe() error +} + +type SubscriptionConfig struct { + Strategy SubscriptionStrategy + BufferSize int + SendTimeout time.Duration + DropIfFull bool +} + +func (cfg SubscriptionConfig) Normalize() SubscriptionConfig { + if !cfg.Strategy.IsValid() { + cfg.Strategy = SubscriptionStrategyPayloadShared + } + if cfg.BufferSize < 1 { + cfg.BufferSize = 128 + } + if cfg.SendTimeout <= 0 { + cfg.DropIfFull = true + } + return cfg +} + +type subscription struct { + subscriptionID string + cfg SubscriptionConfig + topic string + messageChan chan Message + messages <-chan Message + unsubscribeFunc func() error +} + +func DefaultSubscriptionConfig() SubscriptionConfig { + return SubscriptionConfig{ + BufferSize: 128, + SendTimeout: 200 * time.Millisecond, + DropIfFull: true, + Strategy: SubscriptionStrategyPayloadShared, + } +} + +var _ Subscription = (*subscription)(nil) + +func (s *subscription) GetID() string { + return s.subscriptionID +} + +func (s *subscription) GetTopic() string { + return s.topic +} + +func (s *subscription) Read() <-chan Message { + return s.messages +} + +func (s *subscription) Unsubscribe() error { + if s.unsubscribeFunc == nil { + return nil + } + return s.unsubscribeFunc() +} + +// SubscribeOption - +type SubscribeOption func(subCfg *SubscriptionConfig) + +func WithStrategy(strategy SubscriptionStrategy) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.Strategy = strategy + } +} + +func WithBufferSize(bufferSize int) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + if bufferSize < 1 { + return + } + subCfg.BufferSize = bufferSize + } +} +func WithSendTimeout(timeout time.Duration) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.SendTimeout = timeout + } +} + +func WithDropIfFull(dropIfFull bool) SubscribeOption { + return func(subCfg *SubscriptionConfig) { + subCfg.DropIfFull = dropIfFull + } +} + +func BuildSubscriptionConfig(opts ...SubscribeOption) SubscriptionConfig { + cfg := DefaultSubscriptionConfig() + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +type topicState struct { + subs map[string]*subscription + started atomic.Bool + counters atomicCounters + inQueue chan messageRef + seq atomic.Uint64 + wg sync.WaitGroup + closed atomic.Bool +} + +func newTopicState(queueSize int) *topicState { + var queue chan messageRef + if queueSize <= 0 { + queue = make(chan messageRef, DefaultTopicQueueSize) + } else { + queue = make(chan messageRef, queueSize) + } + return &topicState{ + subs: make(map[string]*subscription), + inQueue: queue, + } +} diff --git a/testkit/.gitkeep b/testkit/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/thebus_test.go b/thebus_test.go new file mode 100644 index 0000000..cd76d80 --- /dev/null +++ b/thebus_test.go @@ -0,0 +1,51 @@ +package thebus_test + +import "testing" + +func TestNew(t *testing.T) { + +} + +func TestClose(t *testing.T) {} + +func TestPublishOK(t *testing.T) {} +func TestSubscribeOK(t *testing.T) {} + +func TestIncrementSequence(t *testing.T) {} + +func TestNSubsSubOrder(t *testing.T) {} + +func TestPayloadStrategy(t *testing.T) {} + +func TestBackPressure(t *testing.T) {} + +func TestDropIfFull(t *testing.T) {} + +func TestNotDropIfFull(t *testing.T) {} + +func TestMaxTopicSize(t *testing.T) {} +func TestMaxSubscribersPerTopic(t *testing.T) {} + +func TestConcurrency(t *testing.T) { + +} + +func TestMultipleGoroutine(t *testing.T) { + +} + +func TestConfigAndLimits(t *testing.T) { + +} + +func TestUnsubscribe(t *testing.T) { + +} + +func TestUnsubscribePendingFanout(t *testing.T) { + +} + +func TestInheritedDefaults(t *testing.T) { + +}