Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: thebus_ci

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
test:
name: thebus_test
runs-on: ubuntu-22.04

steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v5
- name: Set up Go 1.24
uses: actions/setup-go@v6
with:
go-version: "1.24"
- run: go version
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ go.work.sum
.env

# Editor/IDE
# .idea/
# .vscode/
.idea/
.vscode/
58 changes: 58 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Based on amazing example from https://gist.github.com/serinth/16391e360692f6a000e5a10382d1148c
SERVICE ?= $(shell basename `go list`)
VERSION ?= $(shell git describe --tags --always --dirty --match=v* 2> /dev/null || cat $(PWD)/.version 2> /dev/null || echo v0)
PACKAGE ?= $(shell go list)
PACKAGES ?= $(shell go list ./...)
FILES ?= $(shell find . -type f -name '*.go' -not -path "./vendor/*")

# Binaries
PROTOC ?= protoc

.PHONY: help clean fmt lint vet test test-cover all

default: help

help: ## show this help
@echo 'usage: make [target] ...'
@echo ''
@echo 'targets:'
@egrep '^(.+)\:\ .*##\ (.+)' ${MAKEFILE_LIST} | sed 's/:.*##/#/' | column -t -c 2 -s '#'

all: ## clean, format, build and unit test
make clean-all
make gofmt
make build
make test

env: ## Print useful environment variables to stdout
echo $(CURDIR)
echo $(SERVICE)
echo $(PACKAGE)
echo $(VERSION)

clean: ## go clean
go clean

build:
go build main.go

test:
go test -v ./... -short

test-it:
go test -v ./...

test-bench: ## run benchmark tests
go test -bench ./...

# Generate test coverage
test-cover: ## Run test coverage and generate html report
rm -fr coverage
mkdir coverage
go list -f '{{if gt (len .TestGoFiles) 0}}"go test -covermode count -coverprofile {{.Name}}.coverprofile -coverpkg ./... {{.ImportPath}}"{{end}}' ./... | xargs -I {} bash -c {}
echo "mode: count" > coverage/cover.out
grep -h -v "^mode:" *.coverprofile >> "coverage/cover.out"
rm *.coverprofile
go tool cover -html=coverage/cover.out -o=coverage/cover.html

test-all: test test-bench test-cover
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,39 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/sebundefined/thebus/v1.svg)](https://pkg.go.dev/github.com/sebundefined/thebus/v1)
[![Build Status](https://github.com/sebundefined/thebus/actions/workflows/ci.yml/badge.svg)](https://github.com/sebudefined/thebus/actions/workflows/ci.yml)

# thebus
thebus is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging.
**thebus** is a lightweight message-oriented middleware that provides a dead-simple API for in-process pub/sub messaging.


## Getting started

Just install **thebus** in your project by using the following command.

```shell
go get -u github.com/sebundefined/thebus
```
## Example Usage

### Simple

```go
package main


```

### With custom options


## Features


## Why would you choose a thebus for your app ?

## Testing

See [CONTRIBUTING.md](./CONTRIBUTING.md) for instructions.

## Versioning


13 changes: 13 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package thebus

import (
"context"
)

type Bus interface {
Publish(topic string, data []byte) (PublishAck, error)
Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error)
Unsubscribe(topic string, subscriberID string) error
Close() error
Stats() (StatsResults, error)
}
1 change: 1 addition & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package thebus
222 changes: 222 additions & 0 deletions bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package thebus

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
)

type bus struct {
mutex sync.RWMutex
cfg *Config
open atomic.Bool
startedAt time.Time
subscriptions map[string]*topicState
totals atomicCounters
}

var _ Bus = (*bus)(nil)

func New(opts ...Option) (Bus, error) {
cfg := BuildConfig(opts...).Normalize()
b := &bus{
startedAt: time.Now(),
cfg: cfg,
totals: atomicCounters{},
subscriptions: make(map[string]*topicState),
}
b.open.Store(true)
return b, nil
}

func (b *bus) Publish(topic string, data []byte) (PublishAck, error) {
//TODO implement me
panic("implement me")
}

func (b *bus) Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (Subscription, error) {
// Standard checks
if !b.open.Load() {
return nil, ErrClosed
}
if len(strings.TrimSpace(topic)) == 0 {
return nil, ErrInvalidTopic
}

// Building the config based on the default one
// (check in the future the default @SebUndefined)
cfg := BuildSubscriptionConfig(opts...).Normalize()

// Building the subscription
id := b.cfg.IDGenerator()
msgChan := make(chan Message, cfg.BufferSize)
sub := &subscription{
subscriptionID: id,
cfg: cfg,
topic: topic,
messageChan: msgChan,
messages: (<-chan Message)(msgChan),
}
// Saving, function under lock so ok
err := b.withWriteState(topic, true, func(state *topicState) error {
// Recheck in case of closed before the first lock
// It is possible that someone close it pending we wait for the first lock
// I do it for avoiding weird state....
if !b.open.Load() {
// unlock useless here because it is handled by withWriteState
return ErrClosed
}
if b.cfg.MaxSubscribersPerTopic > 0 && len(state.subs) >= b.cfg.MaxSubscribersPerTopic {
return fmt.Errorf("too many subscribers per topic (max: %d)", b.cfg.MaxSubscribersPerTopic)
}
state.subs[id] = sub
return nil
})
if err != nil {
return nil, err
}
sub.unsubscribeFunc = b.buildUnsubscribeFunction(id, topic)
go func() {
select {
case <-ctx.Done():
_ = sub.Unsubscribe() // idempotent
}
}()

return sub, nil
}

func (b *bus) buildUnsubscribeFunction(id string, topic string) func() error {
return func() error {
b.mutex.Lock()
if state, ok := b.subscriptions[topic]; ok {
delete(state.subs, id)
if len(state.subs) == 0 && len(state.inQueue) == 0 && b.cfg.AutoDeleteEmptyTopics {
if state.closed.CompareAndSwap(false, true) {
close(state.inQueue)
delete(b.subscriptions, topic)
}
}
}
b.mutex.Unlock()
return nil
}
}

func (b *bus) Unsubscribe(topic string, subscriberID string) error {
//TODO implement me
panic("implement me")
}

func (b *bus) Close() error {
// refuse new publish and subscribe
if !b.open.CompareAndSwap(true, false) {
return nil
}
b.mutex.Lock()
states := make([]*topicState, 0, len(b.subscriptions))
for _, st := range b.subscriptions {
states = append(states, st)
}
// close the inQueue of each topic
for _, st := range states {
if st.closed.CompareAndSwap(false, true) { // ← idem ici
close(st.inQueue)
}
}
b.mutex.Unlock()

// Wait for the worker to stop
for _, st := range states {
st.wg.Wait()
}

// Cleaning memory
b.mutex.Lock()
b.subscriptions = make(map[string]*topicState) // reset propre
b.mutex.Unlock()

return nil
}

func (b *bus) Stats() (StatsResults, error) {
b.mutex.RLock()
defer b.mutex.RUnlock()
perTopic := make(map[string]TopicStats, len(b.subscriptions))
subscriberCounts := 0
for topic, state := range b.subscriptions {
buffered := 0
for _, sub := range state.subs {
subscriberCounts++
buffered += len(sub.messageChan)
}
perTopic[topic] = TopicStats{
Subscribers: len(state.subs),
Buffered: buffered,
Counters: Counters{
Published: state.counters.Published.Load(),
Delivered: state.counters.Delivered.Load(),
Failed: state.counters.Failed.Load(),
Dropped: state.counters.Dropped.Load(),
},
}
}
s := StatsResults{
StartedAt: b.startedAt,
Open: b.open.Load(),
Topics: len(b.subscriptions),
Subscribers: subscriberCounts,
Totals: Counters{
Published: b.totals.Published.Load(),
Delivered: b.totals.Delivered.Load(),
Failed: b.totals.Failed.Load(),
Dropped: b.totals.Dropped.Load(),
},
PerTopic: perTopic,
}
return s, nil
}

func (b *bus) withReadState(topic string, callback func(st *topicState) error) error {
b.mutex.RLock()
defer b.mutex.RUnlock()
return callback(b.subscriptions[topic])
}

func (b *bus) withWriteState(topic string, createIfNotExists bool, writeFunc func(state *topicState) error) error {
b.mutex.Lock()
state, ok := b.subscriptions[topic]
start := false
if !ok {
if !createIfNotExists {
err := writeFunc(nil)
b.mutex.Unlock()
return err
}
if b.cfg.MaxTopics > 0 && len(b.subscriptions) >= b.cfg.MaxTopics {
b.mutex.Unlock()
return fmt.Errorf("too many topics (max=%d)", b.cfg.MaxTopics)
}
qSize := b.cfg.TopicQueueSize
if qSize <= 0 {
qSize = DefaultTopicQueueSize
}
state = newTopicState(qSize)
// add the state
b.subscriptions[topic] = state

if state.started.CompareAndSwap(false, true) {
state.wg.Add(1)
start = true
}
}
err := writeFunc(state)
b.mutex.Unlock()
if start {
go b.runFanOut(topic, state)
}
return err
}
Loading
Loading