From 3edc212f6fc711bd4e109a0468e6f4825ab956c9 Mon Sep 17 00:00:00 2001 From: Mohamad mehdi Kharatizadeh Date: Thu, 21 Mar 2019 05:28:30 +0330 Subject: [PATCH 1/7] Remove benchmark from all tests because it takes a lot of time --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 2ef17ad..5bc7475 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,8 @@ help: ## Display this help screen check: dev-dependencies ## Run unit tests go test ./... go test -race ./... + +benchmark: dev-dependencies ## Run benchmarks go test -bench . ./... dependencies: ##‌ Download dependencies From 75ab14d80bf82dbfe6c7a987409a734bab2dc82b Mon Sep 17 00:00:00 2001 From: AmirHossein Roozbahany Date: Fri, 22 Mar 2019 02:41:00 +0430 Subject: [PATCH 2/7] Add http server tests --- internal/server/http.go | 4 +- internal/server/http_test.go | 213 +++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 internal/server/http_test.go diff --git a/internal/server/http.go b/internal/server/http.go index 4551111..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -169,6 +169,7 @@ func (s *httpServer) handleAcknowledge(writer http.ResponseWriter, request *http err = s.broker.Acknowledge(ctx, id) if err != nil { s.handleError(writer, request, err) + return } s.handleOK(writer, request) @@ -191,9 +192,10 @@ func (s *httpServer) handleReQueue(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Acknowledge(ctx, id) + err = s.broker.ReQueue(ctx, id) if err != nil { s.handleError(writer, request, err) + return } s.handleOK(writer, request) diff --git a/internal/server/http_test.go b/internal/server/http_test.go new file mode 100644 index 0000000..d29d161 --- /dev/null +++ b/internal/server/http_test.go @@ -0,0 +1,213 @@ +package server_test + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type mockBroker struct { + mock.Mock +} + +func (m *mockBroker) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockBroker) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { + args := m.Called(ctx) + return args.Get(0).(<-chan ubroker.Delivery), args.Error(1) +} + +func (m *mockBroker) Acknowledge(ctx context.Context, id int) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) ReQueue(ctx context.Context, id int) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) Publish(ctx context.Context, message ubroker.Message) error { + args := m.Called(ctx, message) + return args.Error(0) +} + +type HTTPServerTestSuite struct { + suite.Suite + t *testing.T + broker *mockBroker + server ubroker.HTTPServer +} + +func TestHTTPServerTestSuite(t *testing.T) { + suite.Run(t, &HTTPServerTestSuite{t: t}) +} + +func (s *HTTPServerTestSuite) prepareTest() { + s.broker = new(mockBroker) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.server = server.NewHTTP(s.broker, ":0") + s.server.Run() +} + +func (s *HTTPServerTestSuite) TestEmptyFetch() { + s.broker = new(mockBroker) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.server = server.NewHTTP(s.broker, ":0") + s.server.Run() + + s.assertHTTPFetch(`{"error": "context deadline exceeded"}`, 408) +} + +func (s *HTTPServerTestSuite) TestPublish() { + s.prepareTest() + s.broker.On("Publish", mock.Anything, mock.Anything).Return(nil) + s.httpPublish(`{"body": "hello"}`) + s.broker.AssertCalled( + s.t, + "Publish", + mock.Anything, + ubroker.Message{Body: "hello"}, + ) +} + +func (s *HTTPServerTestSuite) TestFailedReQueue() { + s.prepareTest() + s.broker.On("ReQueue", mock.Anything, 123).Return(ubroker.ErrInvalidID) + body := `{"error": "id is invalid"}` + s.httpReQueue(123, 400, &body) + s.broker.AssertCalled( + s.t, + "ReQueue", + mock.Anything, + 123, + ) +} + +func (s *HTTPServerTestSuite) TestReQueue() { + s.prepareTest() + s.broker.On("ReQueue", mock.Anything, 123).Return(nil) + s.httpReQueue(123, 200, nil) + s.broker.AssertCalled( + s.t, + "ReQueue", + mock.Anything, + 123, + ) +} + +func (s *HTTPServerTestSuite) TestFailedAcknowledge() { + s.prepareTest() + s.broker.On("Acknowledge", mock.Anything, 123).Return(ubroker.ErrInvalidID) + body := `{"error": "id is invalid"}` + s.httpAcknowledge(123, 400, &body) + s.broker.AssertCalled( + s.t, + "Acknowledge", + mock.Anything, + 123, + ) +} + +func (s *HTTPServerTestSuite) TestAcknowledge() { + s.prepareTest() + s.broker.On("Acknowledge", mock.Anything, 123).Return(nil) + s.httpAcknowledge(123, 200, nil) + s.broker.AssertCalled( + s.t, + "Acknowledge", + mock.Anything, + 123, + ) +} + +func (s *HTTPServerTestSuite) httpPublish(message string) { + resp := httptest.NewRecorder() + + req, err := http.NewRequest( + "POST", + "/publish", + strings.NewReader(message), + ) + if err != nil { + s.Fail(err.Error()) + } + + s.server.ServeHTTP(resp, req) + + if _, err := ioutil.ReadAll(resp.Body); err != nil { + s.Fail(err.Error()) + } else { + s.Equal(200, resp.Code) + } +} + +func (s *HTTPServerTestSuite) httpReQueue(id int, rspCode int, rspBody *string) { + resp := httptest.NewRecorder() + + req, err := http.NewRequest("POST", fmt.Sprintf("/requeue/%v", id), nil) + if err != nil { + s.Fail(err.Error()) + } + + s.server.ServeHTTP(resp, req) + + if p, err := ioutil.ReadAll(resp.Body); err != nil { + s.Fail(err.Error()) + } else { + s.Equal(rspCode, resp.Code) + if rspBody != nil { + s.JSONEq(*rspBody, string(p)) + } + } +} + +func (s *HTTPServerTestSuite) httpAcknowledge(id int, rspCode int, rspBody *string) { + resp := httptest.NewRecorder() + + req, err := http.NewRequest("POST", fmt.Sprintf("/acknowledge/%v", id), nil) + if err != nil { + s.Fail(err.Error()) + } + + s.server.ServeHTTP(resp, req) + + if p, err := ioutil.ReadAll(resp.Body); err != nil { + s.Fail(err.Error()) + } else { + s.Equal(rspCode, resp.Code) + if rspBody != nil { + s.JSONEq(*rspBody, string(p)) + } + } +} + +func (s *HTTPServerTestSuite) assertHTTPFetch(body string, code int) { + resp := httptest.NewRecorder() + + req, err := http.NewRequest("GET", "/fetch", nil) + if err != nil { + s.Fail(err.Error()) + } + + s.server.ServeHTTP(resp, req) + + if p, err := ioutil.ReadAll(resp.Body); err != nil { + s.Fail(err.Error()) + } else { + s.Equal(code, resp.Code) + s.JSONEq(body, string(p)) + } +} From fb364ac0cc0351d8df0fa616accb503f9b4a1ef2 Mon Sep 17 00:00:00 2001 From: Mohamad mehdi Kharatizadeh Date: Mon, 25 Mar 2019 17:43:31 +0430 Subject: [PATCH 3/7] make dev-dependencies --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1b8661a..062a3b2 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ cd $GOPATH/github.com/ git clone cd ubroker -make dependencies +make dev-dependencies ``` 3. Open `internal/broker/core.go` file. This file lays out an implementation of `pkg.ubroker.Broker` interface located in `pkg/ubroker/ubroker.go` file. You should read the GoDoc for the interface. We have accompanies the code with tests that let's you on a TDD‌(Test Driven Development) approach of implementing this interface. You can read tests in `internal/broker/core_test.go` file to know what is missing and what needs to be implemented. From cdf1184afddb6fedfa4bb041e481f34d79d1f715 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Tue, 9 Apr 2019 18:36:45 +0430 Subject: [PATCH 4/7] tests passed --- cmd/ubroker/main.go | 4 +- internal/broker/core.go | 182 ++++++++++++++++++++++++++++++++--- internal/broker/core_test.go | 4 +- internal/server/http.go | 2 +- internal/server/http_test.go | 4 +- 5 files changed, 175 insertions(+), 21 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..7ed3011 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/maedeazad/ubroker/internal/broker" + "github.com/maedeazad/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..46215fa 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -4,43 +4,197 @@ import ( "context" "time" - "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/pkg/errors" + "github.com/maedeazad/ubroker/pkg/ubroker" + // "github.com/pkg/errors" + + // "container/list" + "sync" + + "log" ) +func print(s ...interface{}){ + log.Println(s...) + + // br := bufio.NewWriter(os.Stdout) + // logger := log.New(br, "", log.Ldate) + // logger.Printf("%s\n", s) + // br.Flush() +} + // New creates a new instance of ubroker.Broker // with given `ttl`. `ttl` determines time in which // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + ///print("New") + // TODO timer for ttl + return &core{ + maxId: 0, + ttl: ttl, + msgs: make(map[int]*QueueElement), + closed: false, + deliveryChannel: make(chan ubroker.Delivery, 10000), + deliveryCalled: false, + } +} + +type QueueElement struct { + sync.Mutex + + id int + msg string + core *core + timerCanceledChan chan struct{} +} + +func (e *QueueElement) activateTimer(expTimer *time.Timer) { + go func() { + select { + case <-expTimer.C: + e.core.reQueue(e.id) + ///print("timer requeue", e.id, err) + case <-e.timerCanceledChan: + ///print("timer aborted", e.id) + } + }() +} + +func (e *QueueElement) resetTimer(ttl time.Duration) { + e.stopTimer() + e.activateTimer(time.NewTimer(e.core.ttl)) +} + +func (e *QueueElement) stopTimer() { + select { + case e.timerCanceledChan <- struct{}{}: + default: + } } type core struct { - // TODO: add required fields + sync.Mutex + + maxId int + ttl time.Duration + msgs map[int]*QueueElement + closed bool + deliveryChannel chan ubroker.Delivery + deliveryCalled bool } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + ///print("Delivery") + if err := ctx.Err(); err != nil { + return nil, err + } + c.Lock() + defer c.Unlock() + if c.closed { + return nil, ubroker.ErrClosed + } + + c.deliveryCalled = true + + return c.deliveryChannel, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + ///print("Acknowledge", id) + if err := ctx.Err(); err != nil { + return err + } + c.Lock() + defer c.Unlock() + if c.closed { + return ubroker.ErrClosed + } + if !c.deliveryCalled{ + return ubroker.ErrInvalidID + } + + if val, ok := c.msgs[id]; ok { + val.stopTimer() + delete(c.msgs, id) + }else{ + // ///print("Acknowledge-not-exists") + return ubroker.ErrInvalidID + } + + // ///print("Acknowledge-ok") + return nil +} + +func (c *core) reQueue(id int) error { + c.Lock() + defer c.Unlock() + if c.closed { + return ubroker.ErrClosed + } + if !c.deliveryCalled{ + return ubroker.ErrInvalidID + } + + if val, ok := c.msgs[id]; ok { + delete(c.msgs, id) + val.stopTimer() + + c.addNewMessage(ubroker.Message{Body: val.msg}) + }else{ + return ubroker.ErrInvalidID + } + + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + ///print("ReQueue", id) + if err := ctx.Err(); err != nil { + return err + } + return c.reQueue(id) +} + +func (c *core) addNewMessage(message ubroker.Message){ + c.maxId += 1 + c.msgs[c.maxId] = &QueueElement{ + id: c.maxId, + msg: message.Body, + core: c, + timerCanceledChan: make(chan struct{}), + } + c.msgs[c.maxId].activateTimer(time.NewTimer(c.ttl)) + c.deliveryChannel <- ubroker.Delivery{ + Message: message, + ID: c.maxId, + } } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + ///print("Publish") + if err := ctx.Err(); err != nil { + return err + } + c.Lock() + defer c.Unlock() + if c.closed { + return ubroker.ErrClosed + } + + c.addNewMessage(message) + return nil } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + ///print("Close") + c.Lock() + defer c.Unlock() + if c.closed { + return ubroker.ErrClosed + } + + c.closed = true + close(c.deliveryChannel) + + return nil } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..ad41e7b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/internal/broker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1a8378d 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..19c22d3 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/internal/server" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From 5a44b8119225211dd30e9e916527815e2e58cfba Mon Sep 17 00:00:00 2001 From: maedeazad Date: Tue, 9 Apr 2019 18:40:08 +0430 Subject: [PATCH 5/7] cleaning --- internal/broker/core.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 46215fa..66ec298 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,21 +5,14 @@ import ( "time" "github.com/maedeazad/ubroker/pkg/ubroker" - // "github.com/pkg/errors" - // "container/list" "sync" - "log" ) func print(s ...interface{}){ log.Println(s...) - // br := bufio.NewWriter(os.Stdout) - // logger := log.New(br, "", log.Ldate) - // logger.Printf("%s\n", s) - // br.Flush() } // New creates a new instance of ubroker.Broker @@ -27,8 +20,6 @@ func print(s ...interface{}){ // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - ///print("New") - // TODO timer for ttl return &core{ maxId: 0, ttl: ttl, @@ -53,9 +44,7 @@ func (e *QueueElement) activateTimer(expTimer *time.Timer) { select { case <-expTimer.C: e.core.reQueue(e.id) - ///print("timer requeue", e.id, err) case <-e.timerCanceledChan: - ///print("timer aborted", e.id) } }() } @@ -84,7 +73,6 @@ type core struct { } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - ///print("Delivery") if err := ctx.Err(); err != nil { return nil, err } @@ -100,7 +88,6 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { } func (c *core) Acknowledge(ctx context.Context, id int) error { - ///print("Acknowledge", id) if err := ctx.Err(); err != nil { return err } @@ -117,11 +104,9 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { val.stopTimer() delete(c.msgs, id) }else{ - // ///print("Acknowledge-not-exists") return ubroker.ErrInvalidID } - // ///print("Acknowledge-ok") return nil } @@ -148,7 +133,6 @@ func (c *core) reQueue(id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - ///print("ReQueue", id) if err := ctx.Err(); err != nil { return err } @@ -171,7 +155,6 @@ func (c *core) addNewMessage(message ubroker.Message){ } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - ///print("Publish") if err := ctx.Err(); err != nil { return err } @@ -186,7 +169,6 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { - ///print("Close") c.Lock() defer c.Unlock() if c.closed { From 3ab3e55317f6bda2fcb4c5315690a1d7ef703146 Mon Sep 17 00:00:00 2001 From: maedeazad <49020330+maedeazad@users.noreply.github.com> Date: Tue, 9 Apr 2019 21:22:32 +0430 Subject: [PATCH 6/7] Update Build Status --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 062a3b2..fabbb5e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/arcana261/ubroker.svg?branch=master)](https://travis-ci.org/arcana261/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![Build Status](https://travis-ci.org/maedeazad/ubroker.svg?branch=master)](https://travis-ci.org/maedeazad/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) # ubroker From c22a0f87412a10e7d2928853a4a5a215af63bff7 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Sat, 13 Apr 2019 19:14:59 +0430 Subject: [PATCH 7/7] revert import paths for pull request to the original repo --- README.md | 2 +- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index fabbb5e..062a3b2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/maedeazad/ubroker.svg?branch=master)](https://travis-ci.org/maedeazad/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![Build Status](https://travis-ci.org/arcana261/ubroker.svg?branch=master)](https://travis-ci.org/arcana261/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) # ubroker diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 7ed3011..40e5f9e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/maedeazad/ubroker/internal/broker" - "github.com/maedeazad/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 66ec298..eda3079 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "sync" "log" diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index ad41e7b..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/maedeazad/ubroker/internal/broker" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 1a8378d..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 19c22d3..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/maedeazad/ubroker/internal/server" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" )