Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions azuresb/azuresb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package azuresb_test

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/velmie/broker"

"github.com/velmie/broker/azuresb"
)

type fakeReceiver struct {
receiveFunc func(ctx context.Context) (*azuresb.Message, error)
closeFunc func() error
}

func (fr *fakeReceiver) Receive(ctx context.Context) (*azuresb.Message, error) {
if fr.receiveFunc != nil {
return fr.receiveFunc(ctx)
}
return nil, nil
}

func (fr *fakeReceiver) Close() error {
if fr.closeFunc != nil {
return fr.closeFunc()
}
return nil
}

type fakeReceiverFactory struct {
receiver azuresb.Receiver
err error
}

func (f *fakeReceiverFactory) CreateReceiver(topic string) (azuresb.Receiver, error) {
if f.err != nil {
return nil, f.err
}
return f.receiver, nil
}

func TestMessage_CompleteAndAbandon(t *testing.T) {
t.Run("Complete calls completeFunc", func(t *testing.T) {
completeCalled := false
msg := azuresb.NewMessage([]byte("test"), nil, "id",
func() error {
completeCalled = true
return nil
},
nil,
)
if err := msg.Complete(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if !completeCalled {
t.Error("completeFunc was not called")
}
})

t.Run("Abandon calls abandonFunc", func(t *testing.T) {
abandonCalled := false
msg := azuresb.NewMessage([]byte("test"), nil, "id",
nil,
func() error {
abandonCalled = true
return nil
},
)
if err := msg.Abandon(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if !abandonCalled {
t.Error("abandonFunc was not called")
}
})

t.Run("Complete with nil completeFunc returns nil", func(t *testing.T) {
msg := azuresb.NewMessage([]byte("test"), nil, "id", nil, nil)
if err := msg.Complete(); err != nil {
t.Errorf("unexpected error: %v", err)
}
})

t.Run("Abandon with nil abandonFunc returns nil", func(t *testing.T) {
msg := azuresb.NewMessage([]byte("test"), nil, "id", nil, nil)
if err := msg.Abandon(); err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}

func TestSubscriber_Subscribe_HandlerSuccess(t *testing.T) {
var completeCalled bool

fakeMsg := azuresb.NewMessage([]byte("payload"), map[string]string{"foo": "bar"}, "msg-123",
func() error {
completeCalled = true
return nil
},
nil,
)

msgReturned := false
fr := &fakeReceiver{
receiveFunc: func(ctx context.Context) (*azuresb.Message, error) {
if !msgReturned {
msgReturned = true
return fakeMsg, nil
}
<-ctx.Done()
return nil, ctx.Err()
},
closeFunc: func() error {
return nil
},
}

factory := &fakeReceiverFactory{receiver: fr}
subscriber := azuresb.NewSubscriber(factory)

handlerCalled := make(chan struct{})
handler := func(e broker.Event) error {
if e.Topic() != "test-topic" {
t.Errorf("expected topic 'test-topic', got %s", e.Topic())
}
msg := e.Message()
if string(msg.Body) != "payload" {
t.Errorf("expected payload 'payload', got %s", msg.Body)
}
close(handlerCalled)
return nil
}

sub, err := subscriber.Subscribe("test-topic", handler, func(o *broker.SubscribeOptions) {
o.AutoAck = true
})
if err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}

select {
case <-handlerCalled:
case <-time.After(1 * time.Second):
t.Fatal("handler was not called")
}

if err := sub.Unsubscribe(); err != nil {
t.Errorf("Unsubscribe returned error: %v", err)
}
time.Sleep(100 * time.Millisecond)
if !completeCalled {
t.Error("expected complete to be called (auto ack), but it was not")
}
}

func TestSubscriber_Subscribe_HandlerError(t *testing.T) {
var abandonCalled bool

fakeMsg := azuresb.NewMessage([]byte("error-payload"), map[string]string{"key": "val"}, "msg-error",
nil,
func() error {
abandonCalled = true
return nil
},
)

msgReturned := false
fr := &fakeReceiver{
receiveFunc: func(ctx context.Context) (*azuresb.Message, error) {
if !msgReturned {
msgReturned = true
return fakeMsg, nil
}
<-ctx.Done()
return nil, ctx.Err()
},
closeFunc: func() error {
return nil
},
}
factory := &fakeReceiverFactory{receiver: fr}
subscriber := azuresb.NewSubscriber(factory)

handlerCalled := make(chan struct{})
handler := func(e broker.Event) error {
close(handlerCalled)
return errors.New("handler error")
}

sub, err := subscriber.Subscribe("test-topic", handler, func(o *broker.SubscribeOptions) {
o.AutoAck = true
})
if err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}

select {
case <-handlerCalled:
case <-time.After(1 * time.Second):
t.Fatal("handler was not called")
}

if err := sub.Unsubscribe(); err != nil {
t.Errorf("Unsubscribe returned error: %v", err)
}
time.Sleep(100 * time.Millisecond)
if !abandonCalled {
t.Error("expected abandon to be called due to handler error, but it was not")
}
}

func TestSubscription_UnsubscribeAndMethods(t *testing.T) {
fr := &fakeReceiver{
receiveFunc: func(ctx context.Context) (*azuresb.Message, error) {
return nil, context.Canceled
},
closeFunc: func() error {
return nil
},
}
factory := &fakeReceiverFactory{receiver: fr}
subscriber := azuresb.NewSubscriber(factory)

handler := func(e broker.Event) error { return nil }

sub, err := subscriber.Subscribe("test-topic", handler)
if err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}

if sub.Topic() != "test-topic" {
t.Errorf("expected topic 'test-topic', got %s", sub.Topic())
}
if sub.Handler() == nil {
t.Error("expected non-nil handler")
}
if sub.Options() == nil {
t.Error("expected non-nil subscribe options")
}

if err := sub.Unsubscribe(); err != nil {
t.Errorf("Unsubscribe returned error: %v", err)
}

select {
case <-sub.Done():
case <-time.After(1 * time.Second):
t.Error("Done channel was not closed after unsubscribe")
}
}

func TestSubscription_MultipleUnsubscribe(t *testing.T) {
fr := &fakeReceiver{
receiveFunc: func(ctx context.Context) (*azuresb.Message, error) {
<-ctx.Done()
return nil, ctx.Err()
},
closeFunc: func() error {
return nil
},
}
factory := &fakeReceiverFactory{receiver: fr}
subscriber := azuresb.NewSubscriber(factory)

handler := func(e broker.Event) error { return nil }

sub, err := subscriber.Subscribe("test-topic", handler)
if err != nil {
t.Fatalf("Subscribe returned error: %v", err)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := sub.Unsubscribe(); err != nil {
t.Errorf("Unsubscribe returned error: %v", err)
}
}()
}
wg.Wait()
}
19 changes: 19 additions & 0 deletions azuresb/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package azuresb

import "fmt"

type DefaultReceiverFactory struct {
opts []AzureReceiverOption
}

func NewDefaultReceiverFactory(opts ...AzureReceiverOption) *DefaultReceiverFactory {
return &DefaultReceiverFactory{opts: opts}
}

func (f DefaultReceiverFactory) CreateReceiver(topic string) (Receiver, error) {
r, err := NewAzureReceiver(topic, f.opts...)
if err != nil {
return nil, fmt.Errorf("azuresb.DefaultReceiverFactory.CreateReceiver: %w", err)
}
return r, nil
}
21 changes: 21 additions & 0 deletions azuresb/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module github.com/velmie/broker/azuresb

go 1.23.0

require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0
github.com/stretchr/testify v1.10.0
github.com/velmie/broker v0.9.0
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/go-amqp v1.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
61 changes: 61 additions & 0 deletions azuresb/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0 h1:JNgM3Tz592fUHU2vgwgvOgKxo5s9Ki0y2wicBeckn70=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.8.0/go.mod h1:6vUKmzY17h6dpn9ZLAhM4R/rcrltBeq52qZIkUR7Oro=
github.com/Azure/go-amqp v1.3.0 h1://1rikYhoIQNXJFXyoO/Rlb4+4EkHYfJceNtLlys2/4=
github.com/Azure/go-amqp v1.3.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 h1:kYRSnvJju5gYVyhkij+RTJ/VR6QIUaCfWeaFm2ycsjQ=
github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/velmie/broker v0.9.0 h1:WOyo4gPEl9hy8MyfDR1s3K/t4TSTdu/uYA6IgpPWsv4=
github.com/velmie/broker v0.9.0/go.mod h1:SS2XA9nsUaDeLYN8Z51lrZc0WFsY8m5cYWhCUblKZ40=
go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo=
go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading