From cbb5ff053697cfbd6f9e190655d2116f99aa5918 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 19 Feb 2025 21:06:19 +0200 Subject: [PATCH] feat: add support for Azure Service Bus --- azuresb/azuresb_test.go | 287 ++++++++++++++++++++++++++++++++++++++ azuresb/factory.go | 19 +++ azuresb/go.mod | 21 +++ azuresb/go.sum | 61 ++++++++ azuresb/message.go | 37 +++++ azuresb/publisher.go | 79 +++++++++++ azuresb/publisher_test.go | 176 +++++++++++++++++++++++ azuresb/readme.md | 93 ++++++++++++ azuresb/receiver.go | 218 +++++++++++++++++++++++++++++ azuresb/sender.go | 71 ++++++++++ azuresb/subscriber.go | 39 ++++++ azuresb/subscription.go | 160 +++++++++++++++++++++ 12 files changed, 1261 insertions(+) create mode 100644 azuresb/azuresb_test.go create mode 100644 azuresb/factory.go create mode 100644 azuresb/go.mod create mode 100644 azuresb/go.sum create mode 100644 azuresb/message.go create mode 100644 azuresb/publisher.go create mode 100644 azuresb/publisher_test.go create mode 100644 azuresb/readme.md create mode 100644 azuresb/receiver.go create mode 100644 azuresb/sender.go create mode 100644 azuresb/subscriber.go create mode 100644 azuresb/subscription.go diff --git a/azuresb/azuresb_test.go b/azuresb/azuresb_test.go new file mode 100644 index 0000000..55accc9 --- /dev/null +++ b/azuresb/azuresb_test.go @@ -0,0 +1,287 @@ +package azuresb_test + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/velmie/broker" + + "github.com/velmie/broker/azuresb" +) + +type fakeReceiver struct { + receiveFunc func(ctx context.Context) (*azuresb.Message, error) + closeFunc func() error +} + +func (fr *fakeReceiver) Receive(ctx context.Context) (*azuresb.Message, error) { + if fr.receiveFunc != nil { + return fr.receiveFunc(ctx) + } + return nil, nil +} + +func (fr *fakeReceiver) Close() error { + if fr.closeFunc != nil { + return fr.closeFunc() + } + return nil +} + +type fakeReceiverFactory struct { + receiver azuresb.Receiver + err error +} + +func (f *fakeReceiverFactory) CreateReceiver(topic string) (azuresb.Receiver, error) { + if f.err != nil { + return nil, f.err + } + return f.receiver, nil +} + +func TestMessage_CompleteAndAbandon(t *testing.T) { + t.Run("Complete calls completeFunc", func(t *testing.T) { + completeCalled := false + msg := azuresb.NewMessage([]byte("test"), nil, "id", + func() error { + completeCalled = true + return nil + }, + nil, + ) + if err := msg.Complete(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !completeCalled { + t.Error("completeFunc was not called") + } + }) + + t.Run("Abandon calls abandonFunc", func(t *testing.T) { + abandonCalled := false + msg := azuresb.NewMessage([]byte("test"), nil, "id", + nil, + func() error { + abandonCalled = true + return nil + }, + ) + if err := msg.Abandon(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !abandonCalled { + t.Error("abandonFunc was not called") + } + }) + + t.Run("Complete with nil completeFunc returns nil", func(t *testing.T) { + msg := azuresb.NewMessage([]byte("test"), nil, "id", nil, nil) + if err := msg.Complete(); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("Abandon with nil abandonFunc returns nil", func(t *testing.T) { + msg := azuresb.NewMessage([]byte("test"), nil, "id", nil, nil) + if err := msg.Abandon(); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) +} + +func TestSubscriber_Subscribe_HandlerSuccess(t *testing.T) { + var completeCalled bool + + fakeMsg := azuresb.NewMessage([]byte("payload"), map[string]string{"foo": "bar"}, "msg-123", + func() error { + completeCalled = true + return nil + }, + nil, + ) + + msgReturned := false + fr := &fakeReceiver{ + receiveFunc: func(ctx context.Context) (*azuresb.Message, error) { + if !msgReturned { + msgReturned = true + return fakeMsg, nil + } + <-ctx.Done() + return nil, ctx.Err() + }, + closeFunc: func() error { + return nil + }, + } + + factory := &fakeReceiverFactory{receiver: fr} + subscriber := azuresb.NewSubscriber(factory) + + handlerCalled := make(chan struct{}) + handler := func(e broker.Event) error { + if e.Topic() != "test-topic" { + t.Errorf("expected topic 'test-topic', got %s", e.Topic()) + } + msg := e.Message() + if string(msg.Body) != "payload" { + t.Errorf("expected payload 'payload', got %s", msg.Body) + } + close(handlerCalled) + return nil + } + + sub, err := subscriber.Subscribe("test-topic", handler, func(o *broker.SubscribeOptions) { + o.AutoAck = true + }) + if err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + + select { + case <-handlerCalled: + case <-time.After(1 * time.Second): + t.Fatal("handler was not called") + } + + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unsubscribe returned error: %v", err) + } + time.Sleep(100 * time.Millisecond) + if !completeCalled { + t.Error("expected complete to be called (auto ack), but it was not") + } +} + +func TestSubscriber_Subscribe_HandlerError(t *testing.T) { + var abandonCalled bool + + fakeMsg := azuresb.NewMessage([]byte("error-payload"), map[string]string{"key": "val"}, "msg-error", + nil, + func() error { + abandonCalled = true + return nil + }, + ) + + msgReturned := false + fr := &fakeReceiver{ + receiveFunc: func(ctx context.Context) (*azuresb.Message, error) { + if !msgReturned { + msgReturned = true + return fakeMsg, nil + } + <-ctx.Done() + return nil, ctx.Err() + }, + closeFunc: func() error { + return nil + }, + } + factory := &fakeReceiverFactory{receiver: fr} + subscriber := azuresb.NewSubscriber(factory) + + handlerCalled := make(chan struct{}) + handler := func(e broker.Event) error { + close(handlerCalled) + return errors.New("handler error") + } + + sub, err := subscriber.Subscribe("test-topic", handler, func(o *broker.SubscribeOptions) { + o.AutoAck = true + }) + if err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + + select { + case <-handlerCalled: + case <-time.After(1 * time.Second): + t.Fatal("handler was not called") + } + + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unsubscribe returned error: %v", err) + } + time.Sleep(100 * time.Millisecond) + if !abandonCalled { + t.Error("expected abandon to be called due to handler error, but it was not") + } +} + +func TestSubscription_UnsubscribeAndMethods(t *testing.T) { + fr := &fakeReceiver{ + receiveFunc: func(ctx context.Context) (*azuresb.Message, error) { + return nil, context.Canceled + }, + closeFunc: func() error { + return nil + }, + } + factory := &fakeReceiverFactory{receiver: fr} + subscriber := azuresb.NewSubscriber(factory) + + handler := func(e broker.Event) error { return nil } + + sub, err := subscriber.Subscribe("test-topic", handler) + if err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + + if sub.Topic() != "test-topic" { + t.Errorf("expected topic 'test-topic', got %s", sub.Topic()) + } + if sub.Handler() == nil { + t.Error("expected non-nil handler") + } + if sub.Options() == nil { + t.Error("expected non-nil subscribe options") + } + + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unsubscribe returned error: %v", err) + } + + select { + case <-sub.Done(): + case <-time.After(1 * time.Second): + t.Error("Done channel was not closed after unsubscribe") + } +} + +func TestSubscription_MultipleUnsubscribe(t *testing.T) { + fr := &fakeReceiver{ + receiveFunc: func(ctx context.Context) (*azuresb.Message, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + closeFunc: func() error { + return nil + }, + } + factory := &fakeReceiverFactory{receiver: fr} + subscriber := azuresb.NewSubscriber(factory) + + handler := func(e broker.Event) error { return nil } + + sub, err := subscriber.Subscribe("test-topic", handler) + if err != nil { + t.Fatalf("Subscribe returned error: %v", err) + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if err := sub.Unsubscribe(); err != nil { + t.Errorf("Unsubscribe returned error: %v", err) + } + }() + } + wg.Wait() +} diff --git a/azuresb/factory.go b/azuresb/factory.go new file mode 100644 index 0000000..9862944 --- /dev/null +++ b/azuresb/factory.go @@ -0,0 +1,19 @@ +package azuresb + +import "fmt" + +type DefaultReceiverFactory struct { + opts []AzureReceiverOption +} + +func NewDefaultReceiverFactory(opts ...AzureReceiverOption) *DefaultReceiverFactory { + return &DefaultReceiverFactory{opts: opts} +} + +func (f DefaultReceiverFactory) CreateReceiver(topic string) (Receiver, error) { + r, err := NewAzureReceiver(topic, f.opts...) + if err != nil { + return nil, fmt.Errorf("azuresb.DefaultReceiverFactory.CreateReceiver: %w", err) + } + return r, nil +} diff --git a/azuresb/go.mod b/azuresb/go.mod new file mode 100644 index 0000000..b02fd55 --- /dev/null +++ b/azuresb/go.mod @@ -0,0 +1,21 @@ +module github.com/velmie/broker/azuresb + +go 1.23.0 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0 + github.com/stretchr/testify v1.10.0 + github.com/velmie/broker v0.9.0 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/go-amqp v1.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/text v0.21.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/azuresb/go.sum b/azuresb/go.sum new file mode 100644 index 0000000..cb1d853 --- /dev/null +++ b/azuresb/go.sum @@ -0,0 +1,61 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0 h1:JNgM3Tz592fUHU2vgwgvOgKxo5s9Ki0y2wicBeckn70= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0/go.mod h1:6vUKmzY17h6dpn9ZLAhM4R/rcrltBeq52qZIkUR7Oro= +github.com/Azure/go-amqp v1.3.0 h1://1rikYhoIQNXJFXyoO/Rlb4+4EkHYfJceNtLlys2/4= +github.com/Azure/go-amqp v1.3.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 h1:kYRSnvJju5gYVyhkij+RTJ/VR6QIUaCfWeaFm2ycsjQ= +github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/velmie/broker v0.9.0 h1:WOyo4gPEl9hy8MyfDR1s3K/t4TSTdu/uYA6IgpPWsv4= +github.com/velmie/broker v0.9.0/go.mod h1:SS2XA9nsUaDeLYN8Z51lrZc0WFsY8m5cYWhCUblKZ40= +go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= +go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/azuresb/message.go b/azuresb/message.go new file mode 100644 index 0000000..4368c1c --- /dev/null +++ b/azuresb/message.go @@ -0,0 +1,37 @@ +package azuresb + +// Message represents a message received from Azure Service Bus. +type Message struct { + Body []byte + Header map[string]string + ID string + completeFunc func() error + abandonFunc func() error +} + +// NewMessage creates a new Message +func NewMessage(body []byte, header map[string]string, id string, completeFunc, abandonFunc func() error) *Message { + return &Message{ + Body: body, + Header: header, + ID: id, + completeFunc: completeFunc, + abandonFunc: abandonFunc, + } +} + +// Complete completes the message (acknowledges successful processing) +func (m *Message) Complete() error { + if m.completeFunc != nil { + return m.completeFunc() + } + return nil +} + +// Abandon releases the message (indicates that processing failed) +func (m *Message) Abandon() error { + if m.abandonFunc != nil { + return m.abandonFunc() + } + return nil +} diff --git a/azuresb/publisher.go b/azuresb/publisher.go new file mode 100644 index 0000000..f10e172 --- /dev/null +++ b/azuresb/publisher.go @@ -0,0 +1,79 @@ +package azuresb + +import ( + "context" + "sync" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/pkg/errors" + "github.com/velmie/broker" +) + +// ASBSender abstracts the methods of an Azure Service Bus sender +type ASBSender interface { + SendMessage(ctx context.Context, msg *azservicebus.Message, opts *azservicebus.SendMessageOptions) error +} + +// SenderFactory defines a factory interface for creating senders +type SenderFactory interface { + // CreateSender returns a new ASBSender for the given topic + CreateSender(topic string) (ASBSender, error) +} + +// Publisher implements the broker.Publisher interface for Azure Service Bus +type Publisher struct { + senderFactory SenderFactory + sendersByTopic sync.Map // map[string]ASBSender +} + +// NewPublisher creates a new Publisher using the provided SenderFactory +func NewPublisher(factory SenderFactory) *Publisher { + return &Publisher{ + senderFactory: factory, + } +} + +// Publish sends a message to the specified topic using Azure Service Bus +func (p *Publisher) Publish(topic string, message *broker.Message) error { + broker.SetIDHeader(message) + + value, ok := p.sendersByTopic.Load(topic) + var sender ASBSender + if ok { + sender = value.(ASBSender) + } else { + var err error + sender, err = p.senderFactory.CreateSender(topic) + if err != nil { + return errors.Wrapf(err, "azuresb: cannot create sender for topic %q", topic) + } + actual, loaded := p.sendersByTopic.LoadOrStore(topic, sender) + if loaded { + sender = actual.(ASBSender) + } + } + + asbMsg := &azservicebus.Message{ + Body: message.Body, + MessageID: &message.ID, + ApplicationProperties: stringMapToAnyMap(message.Header), + } + + ctx := message.Context() + if ctx == nil { + ctx = context.Background() + } + + if err := sender.SendMessage(ctx, asbMsg, nil); err != nil { + return errors.Wrap(err, "azuresb: cannot send message") + } + return nil +} + +func stringMapToAnyMap(in map[string]string) map[string]any { + out := make(map[string]any, len(in)) + for k, v := range in { + out[k] = v + } + return out +} diff --git a/azuresb/publisher_test.go b/azuresb/publisher_test.go new file mode 100644 index 0000000..67074e7 --- /dev/null +++ b/azuresb/publisher_test.go @@ -0,0 +1,176 @@ +package azuresb_test + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/stretchr/testify/require" + "github.com/velmie/broker" + + "github.com/velmie/broker/azuresb" +) + +type fakeASBSender struct { + sendCalled bool + sentMsg *azservicebus.Message + sendErr error + lastCtx context.Context +} + +func (f *fakeASBSender) SendMessage(ctx context.Context, msg *azservicebus.Message, opts *azservicebus.SendMessageOptions) error { + f.sendCalled = true + f.sentMsg = msg + f.lastCtx = ctx + return f.sendErr +} + +type fakeSenderFactory struct { + createSenderFunc func(topic string) (azuresb.ASBSender, error) + createCount int + mu sync.Mutex +} + +func (f *fakeSenderFactory) CreateSender(topic string) (azuresb.ASBSender, error) { + f.mu.Lock() + f.createCount++ + f.mu.Unlock() + return f.createSenderFunc(topic) +} + +func anyMapToHeader(in map[string]any) broker.Header { + out := make(broker.Header, len(in)) + for k, v := range in { + if s, ok := v.(string); ok { + out[k] = s + } + } + return out +} + +func TestPublisherPublishSuccess(t *testing.T) { + fakeSender := &fakeASBSender{} + factory := &fakeSenderFactory{ + createSenderFunc: func(topic string) (azuresb.ASBSender, error) { + return fakeSender, nil + }, + } + + pub := azuresb.NewPublisher(factory) + topic := "test-topic" + msgID := "msg-123" + + message := broker.NewMessage() + message.Body = []byte("test-body") + message.ID = msgID + message.Header = broker.Header{} + + err := pub.Publish(topic, message) + require.NoError(t, err) + + require.True(t, fakeSender.sendCalled, "expected SendMessage to be called") + require.NotNil(t, fakeSender.sentMsg) + + require.Equal(t, []byte("test-body"), fakeSender.sentMsg.Body) + require.NotNil(t, fakeSender.sentMsg.MessageID) + require.Equal(t, msgID, *fakeSender.sentMsg.MessageID) + + expectedHeader := message.Header + require.Equal(t, expectedHeader, anyMapToHeader(fakeSender.sentMsg.ApplicationProperties)) +} + +func TestPublisherPublishSenderError(t *testing.T) { + fakeSender := &fakeASBSender{ + sendErr: errors.New("send failed"), + } + factory := &fakeSenderFactory{ + createSenderFunc: func(topic string) (azuresb.ASBSender, error) { + return fakeSender, nil + }, + } + pub := azuresb.NewPublisher(factory) + topic := "test-topic" + + message := broker.NewMessage() + message.Body = []byte("body") + message.ID = "id1" + message.Header = broker.Header{} + + err := pub.Publish(topic, message) + require.Error(t, err) + require.Contains(t, err.Error(), "azuresb: cannot send message") +} + +func TestPublisherCreateSenderError(t *testing.T) { + factory := &fakeSenderFactory{ + createSenderFunc: func(topic string) (azuresb.ASBSender, error) { + return nil, errors.New("create failed") + }, + } + pub := azuresb.NewPublisher(factory) + topic := "test-topic" + + message := broker.NewMessage() + message.Body = []byte("body") + message.ID = "id1" + message.Header = broker.Header{} + + err := pub.Publish(topic, message) + require.Error(t, err) + require.Contains(t, err.Error(), "azuresb: cannot create sender") +} + +func TestPublisherCaching(t *testing.T) { + callCount := 0 + fakeSender := &fakeASBSender{} + factory := &fakeSenderFactory{ + createSenderFunc: func(topic string) (azuresb.ASBSender, error) { + callCount++ + return fakeSender, nil + }, + } + + pub := azuresb.NewPublisher(factory) + topic := "test-topic" + + message1 := broker.NewMessage() + message1.Body = []byte("body1") + message1.ID = "id1" + message1.Header = broker.Header{} + err := pub.Publish(topic, message1) + require.NoError(t, err) + + message2 := broker.NewMessage() + message2.Body = []byte("body2") + message2.ID = "id2" + message2.Header = broker.Header{} + err = pub.Publish(topic, message2) + require.NoError(t, err) + + require.Equal(t, 1, callCount, "expected CreateSender to be called only once for the same topic") +} + +func TestPublisherCustomContext(t *testing.T) { + fakeSender := &fakeASBSender{} + factory := &fakeSenderFactory{ + createSenderFunc: func(topic string) (azuresb.ASBSender, error) { + return fakeSender, nil + }, + } + pub := azuresb.NewPublisher(factory) + topic := "test-topic" + + customCtx := context.WithValue(context.Background(), "key", "value") + message := broker.NewMessageWithContext(customCtx) + message.Body = []byte("body") + message.ID = "id" + message.Header = broker.Header{} + + err := pub.Publish(topic, message) + require.NoError(t, err) + + require.NotNil(t, fakeSender.lastCtx) + require.Equal(t, "value", fakeSender.lastCtx.Value("key")) +} diff --git a/azuresb/readme.md b/azuresb/readme.md new file mode 100644 index 0000000..346cfbf --- /dev/null +++ b/azuresb/readme.md @@ -0,0 +1,93 @@ +# azuresb + +The package provides a simple, idiomatic API for both publishing and subscribing to messages via Azure Service Bus, +abstracting away the complexity of the underlying Azure SDK. + + +## Usage + +### Publisher example + +Publish messages to an Azure Service Bus queue or topic using a sender factory and publisher. + +```go +package main + +import ( + "log" + + "github.com/velmie/broker" + "github.com/velmie/broker/azuresb" +) + +func main() { + // Initialize the sender factory with your Azure Service Bus connection string + senderFactory := azuresb.NewDefaultPublisherFactory( + azuresb.WithConnectionString("your-connection-string"), + ) + + // Create a new publisher + publisher := azuresb.NewPublisher(senderFactory) + + // Create a broker message + msg := broker.NewMessage() + msg.Body = []byte("Hello, Azure Service Bus!") + msg.ID = "msg-001" + msg.Header.Set("Content-Type", "text/plain") + + // Publish the message to a queue or topic + if err := publisher.Publish("my-queue", msg); err != nil { + log.Fatalf("publish message error: %v", err) + } + log.Println("message published successfully!") +} + +``` + +### Subscriber Example + +Subscribe to messages from an Azure Service Bus queue (or topic subscription) with a custom handler. + +```go +package main + +import ( + "log" + + "github.com/velmie/broker" + "github.com/velmie/broker/azuresb" +) + +func main() { + // Create a receiver factory with your Azure Service Bus connection string. + receiverFactory := azuresb.NewDefaultReceiverFactory( + azuresb.WithConnectionString("your-connection-string"), + // if you're using topics with subscriptions + // azuresb.WithSubscriptionName("your-subscription-name"), + // azuresb.WithReceiverType(azuresb.ReceiverTypeSubscription), + ) + + // Create a new subscriber + subscriber := azuresb.NewSubscriber(receiverFactory) + + // Define the message handler. + handler := func(e broker.Event) error { + msg := e.Message() + log.Printf("Received message with ID %s: %s", msg.ID, string(msg.Body)) + // You can optionally call e.Ack() here if AutoAck is not enabled. + return nil + } + + // Subscribe to a queue or topic + sub, err := subscriber.Subscribe("my-queue", handler, func(o *broker.SubscribeOptions) { + o.AutoAck = true // Automatically acknowledge messages after successful handling + }) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // Wait until the subscription is terminated (e.g., via an external signal). + <-sub.Done() + log.Println("Subscription closed.") +} +``` \ No newline at end of file diff --git a/azuresb/receiver.go b/azuresb/receiver.go new file mode 100644 index 0000000..00e89cd --- /dev/null +++ b/azuresb/receiver.go @@ -0,0 +1,218 @@ +package azuresb + +import ( + "context" + "errors" + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +const ( + ReceiverTypeQueue = ReceiverType("queue") + ReceiverTypeSubscription = ReceiverType("subscription") +) + +// asbReceiver abstracts the methods of the Azure Service Bus receiver +type asbReceiver interface { + ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) + CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error + AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error + Close(ctx context.Context) error +} + +// Receiver is an interface that abstracts the Azure Service Bus message receiver +type Receiver interface { + // Receive retrieves a single message + // It should block until a message is received or the context is cancelled + Receive(ctx context.Context) (*Message, error) + // Close closes the receiver and releases its resources + Close() error +} + +type ReceiverType string + +// AzureReceiver implements the Receiver interface using the Azure Service Bus SDK +type AzureReceiver struct { + receiver asbReceiver + client *azservicebus.Client + managedClient bool +} + +// AzureReceiverOption configures the creation of an AzureReceiver +type AzureReceiverOption func(*azureReceiverConfig) + +type azureReceiverConfig struct { + existingReceiver asbReceiver + client *azservicebus.Client + connectionString string + receiverType ReceiverType + subscriptionName string + receiverOptions *azservicebus.ReceiverOptions +} + +// WithExistingReceiver allows supplying an already created asbReceiver +func WithExistingReceiver(r asbReceiver) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.existingReceiver = r + } +} + +// WithReceiverClient allows providing an existing azservicebus.Client +func WithReceiverClient(client *azservicebus.Client) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.client = client + } +} + +// WithReceiverConnectionString instructs NewAzureReceiver to create its own client using the given connection string +func WithReceiverConnectionString(connStr string) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.connectionString = connStr + } +} + +// WithReceiverType explicitly sets the receiver type. +// Defaults to ReceiverTypeQueue. +func WithReceiverType(rt ReceiverType) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.receiverType = rt + } +} + +// WithSubscriptionName sets the subscription name and marks the receiver type as ReceiverTypeSubscription. +// When using a subscription receiver, the topic provided to Subscribe is treated as the topic name. +func WithSubscriptionName(subscriptionName string) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.receiverType = ReceiverTypeSubscription + cfg.subscriptionName = subscriptionName + } +} + +// WithReceiverOptions allows passing custom Azure receiver options (azservicebus.ReceiverOptions) +// to NewReceiverForSubscription or NewReceiverForQueue. +func WithReceiverOptions(options *azservicebus.ReceiverOptions) AzureReceiverOption { + return func(cfg *azureReceiverConfig) { + cfg.receiverOptions = options + } +} + +// NewAzureReceiver creates a new AzureReceiver using the given topic and options +// The topic parameter (provided by the Subscriber) is used as the queue name for queue receivers +// or as the topic name for subscription receivers +func NewAzureReceiver(topic string, opts ...AzureReceiverOption) (Receiver, error) { + var cfg azureReceiverConfig + cfg.receiverType = ReceiverTypeQueue + + for _, opt := range opts { + opt(&cfg) + } + + if cfg.existingReceiver != nil { + return &AzureReceiver{ + receiver: cfg.existingReceiver, + client: nil, + managedClient: false, + }, nil + } + + if cfg.client != nil { + return createReceiverFromClient(topic, cfg, cfg.client) + } + + if cfg.connectionString != "" { + client, err := azservicebus.NewClientFromConnectionString(cfg.connectionString, nil) + if err != nil { + return nil, fmt.Errorf("failed to create service bus client: %w", err) + } + ar, err := createReceiverFromClient(topic, cfg, client) + if err != nil { + return nil, err + } + azureReceiver := ar.(*AzureReceiver) + azureReceiver.client = client + azureReceiver.managedClient = true + return azureReceiver, nil + } + + return nil, errors.New("insufficient configuration: provide an existing receiver, client, or connection string") +} + +func createReceiverFromClient(topic string, cfg azureReceiverConfig, client *azservicebus.Client) (Receiver, error) { + if cfg.receiverType == ReceiverTypeSubscription { + if cfg.subscriptionName == "" { + return nil, errors.New("subscription name must be provided for a subscription receiver") + } + r, err := client.NewReceiverForSubscription(topic, cfg.subscriptionName, cfg.receiverOptions) + if err != nil { + return nil, fmt.Errorf( + "failed to create receiver for subscription %q on topic %q: %w", + cfg.subscriptionName, + topic, + err, + ) + } + return &AzureReceiver{ + receiver: r, + client: client, + managedClient: false, + }, nil + } + + r, err := client.NewReceiverForQueue(topic, cfg.receiverOptions) + if err != nil { + return nil, fmt.Errorf("failed to create receiver for queue %q: %w", topic, err) + } + return &AzureReceiver{ + receiver: r, + client: client, + managedClient: false, + }, nil +} + +func (r *AzureReceiver) Receive(ctx context.Context) (*Message, error) { + msgs, err := r.receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + return nil, fmt.Errorf("failed to receive messages: %w", err) + } + if len(msgs) == 0 { + return nil, fmt.Errorf("no messages received") + } + receivedMsg := msgs[0] + + // Define functions to complete or abandon the message. + completeFunc := func() error { + if err := r.receiver.CompleteMessage(ctx, receivedMsg, nil); err != nil { + return fmt.Errorf("failed to complete message %q: %w", receivedMsg.MessageID, err) + } + return nil + } + abandonFunc := func() error { + if err := r.receiver.AbandonMessage(ctx, receivedMsg, nil); err != nil { + return fmt.Errorf("failed to abandon message %q: %w", receivedMsg.MessageID, err) + } + return nil + } + + headers := make(map[string]string) + for k, v := range receivedMsg.ApplicationProperties { + headers[k] = fmt.Sprintf("%v", v) + } + + // Wrap the Azure SDK message into our domain-specific Message. + msg := NewMessage(receivedMsg.Body, headers, receivedMsg.MessageID, completeFunc, abandonFunc) + return msg, nil +} + +func (r *AzureReceiver) Close() error { + var lastErr error + if err := r.receiver.Close(context.Background()); err != nil { + lastErr = fmt.Errorf("failed to close receiver: %w", err) + } + if r.managedClient && r.client != nil { + if err := r.client.Close(context.Background()); err != nil { + lastErr = fmt.Errorf("failed to close client: %w", err) + } + } + return lastErr +} diff --git a/azuresb/sender.go b/azuresb/sender.go new file mode 100644 index 0000000..27aeaf9 --- /dev/null +++ b/azuresb/sender.go @@ -0,0 +1,71 @@ +package azuresb + +import ( + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +// AzureSenderOption configures an Azure sender +type AzureSenderOption func(*azureSenderConfig) + +type azureSenderConfig struct { + client *azservicebus.Client + connectionString string +} + +// WithSenderClient sets an existing azservicebus.Client for the sender +func WithSenderClient(client *azservicebus.Client) AzureSenderOption { + return func(cfg *azureSenderConfig) { + cfg.client = client + } +} + +// WithSenderConnectionString sets the connection string for creating a new client +func WithSenderConnectionString(connStr string) AzureSenderOption { + return func(cfg *azureSenderConfig) { + cfg.connectionString = connStr + } +} + +// NewAzureSender creates a new ASBSender for the given topic +func NewAzureSender(topic string, opts ...AzureSenderOption) (ASBSender, error) { + var cfg azureSenderConfig + for _, opt := range opts { + opt(&cfg) + } + if cfg.client == nil { + if cfg.connectionString == "" { + return nil, fmt.Errorf("azuresb: no client or connection string provided") + } + client, err := azservicebus.NewClientFromConnectionString(cfg.connectionString, nil) + if err != nil { + return nil, fmt.Errorf("azuresb: failed to create client: %w", err) + } + cfg.client = client + } + sender, err := cfg.client.NewSender(topic, nil) + if err != nil { + return nil, fmt.Errorf("azuresb: failed to create sender for topic %q: %w", topic, err) + } + return sender, nil +} + +// DefaultSenderFactory is the default implementation of the SenderFactory interface +type DefaultSenderFactory struct { + opts []AzureSenderOption +} + +// NewDefaultSenderFactory creates a new DefaultSenderFactory with the given options +func NewDefaultSenderFactory(opts ...AzureSenderOption) *DefaultSenderFactory { + return &DefaultSenderFactory{opts: opts} +} + +// CreateSender creates a new ASBSender for the given topic +func (f DefaultSenderFactory) CreateSender(topic string) (ASBSender, error) { + s, err := NewAzureSender(topic, f.opts...) + if err != nil { + return nil, fmt.Errorf("azuresb.DefaultSenderFactory.CreateSender: %w", err) + } + return s, nil +} diff --git a/azuresb/subscriber.go b/azuresb/subscriber.go new file mode 100644 index 0000000..1458b84 --- /dev/null +++ b/azuresb/subscriber.go @@ -0,0 +1,39 @@ +package azuresb + +import ( + "fmt" + + "github.com/velmie/broker" +) + +type ReceiverFactory interface { + CreateReceiver(topic string) (Receiver, error) +} + +// Subscriber is an Azure Service Bus subscriber implementation that conforms to the broker.Subscriber interface +type Subscriber struct { + receiverFactory ReceiverFactory +} + +func NewSubscriber(factory ReceiverFactory) *Subscriber { + return &Subscriber{ + receiverFactory: factory, + } +} + +// Subscribe subscribes to a specific topic and starts receiving messages +func (s *Subscriber) Subscribe(topic string, handler broker.Handler, options ...broker.SubscribeOption) (broker.Subscription, error) { + receiver, err := s.receiverFactory.CreateReceiver(topic) + if err != nil { + return nil, fmt.Errorf("azuresb: failed to create receiver for topic %q: %w", topic, err) + } + + opts := broker.DefaultSubscribeOptions() + for _, o := range options { + o(opts) + } + + sub := newSubscription(topic, receiver, handler, opts) + sub.start() + return sub, nil +} diff --git a/azuresb/subscription.go b/azuresb/subscription.go new file mode 100644 index 0000000..4260d6a --- /dev/null +++ b/azuresb/subscription.go @@ -0,0 +1,160 @@ +package azuresb + +import ( + "context" + "fmt" + "sync" + + "github.com/velmie/broker" +) + +// subscription implements broker.Subscription for Azure Service Bus. +type subscription struct { + topic string + receiver Receiver + handler broker.Handler + options *broker.SubscribeOptions + done chan struct{} + once sync.Once + wg sync.WaitGroup + cancelFunc context.CancelFunc +} + +// newSubscription creates a new subscription instance. +func newSubscription(topic string, receiver Receiver, handler broker.Handler, options *broker.SubscribeOptions) *subscription { + return &subscription{ + topic: topic, + receiver: receiver, + handler: handler, + options: options, + done: make(chan struct{}), + } +} + +// start begins receiving messages in a separate goroutine. +func (s *subscription) start() { + ctx, cancel := context.WithCancel(context.Background()) + s.cancelFunc = cancel + s.wg.Add(1) + go s.receiveLoop(ctx) +} + +// receiveLoop continuously receives messages and processes them. +func (s *subscription) receiveLoop(ctx context.Context) { + defer s.wg.Done() + for { + select { + case <-s.done: + return + default: + } + + msg, err := s.receiver.Receive(ctx) + if err != nil { + select { + case <-ctx.Done(): + return + default: + } + if s.options.ErrorHandler != nil { + s.options.ErrorHandler(err, s) + } + continue + } + + brokerMsg := &broker.Message{ + ID: msg.ID, + Body: msg.Body, + Header: msg.Header, + } + + err = s.handler(&azureEvent{ + topic: s.topic, + message: brokerMsg, + azureMsg: msg, + }) + if err != nil { + if aErr := msg.Abandon(); aErr != nil { + if s.options.Logger != nil { + errM := fmt.Sprintf("azuresb.subscription.receiveLoop: Abandon message: %s", aErr) + s.options.Logger.Error(errM) + } + } + if s.options.ErrorHandler != nil { + s.options.ErrorHandler(err, s) + } + } else { + if s.options.AutoAck { + if cErr := msg.Complete(); cErr != nil { + if s.options.Logger != nil { + errM := fmt.Sprintf("azuresb.subscription.receiveLoop: Complete message: %s", cErr) + s.options.Logger.Error(errM) + } + } + } + } + } +} + +// Unsubscribe stops receiving messages and closes the subscription. +func (s *subscription) Unsubscribe() error { + s.once.Do(func() { + close(s.done) + if s.cancelFunc != nil { + s.cancelFunc() + } + s.wg.Wait() + _ = s.receiver.Close() + }) + return nil +} + +// Done returns a channel that is closed when the subscription is unsubscribed. +func (s *subscription) Done() <-chan struct{} { + return s.done +} + +// Topic returns the topic of the subscription. +func (s *subscription) Topic() string { + return s.topic +} + +// InitOptions returns the initial subscription options; not supported so returns nil. +func (s *subscription) InitOptions() []broker.SubscribeOption { + return nil +} + +// Options returns the subscription options. +func (s *subscription) Options() *broker.SubscribeOptions { + return s.options +} + +// Handler returns the broker.Handler for this subscription. +func (s *subscription) Handler() broker.Handler { + return s.handler +} + +// azureEvent implements broker.Event for Azure Service Bus messages. +type azureEvent struct { + topic string + message *broker.Message + azureMsg *Message +} + +// Topic returns the event topic. +func (e *azureEvent) Topic() string { + return e.topic +} + +// Message returns the broker.Message. +func (e *azureEvent) Message() *broker.Message { + return e.message +} + +// Ack completes the Azure Service Bus message. +func (e *azureEvent) Ack() error { + if err := e.azureMsg.Complete(); err != nil { + return fmt.Errorf("azuresb: failed to complete message: %w", err) + } + return nil +}