From a02f4d8bf8d84ba0aa55c24fb07aa491821a6fae Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 29 Mar 2019 17:14:37 +0430 Subject: [PATCH 01/21] Bits of understanding, guiding comments moved to core.go --- internal/broker/core.go | 58 +++++++++++++++++++++++++++++++++++++---- pkg/ubroker/ubroker.go | 4 +-- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..69e0f3d 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -18,29 +18,77 @@ func New(ttl time.Duration) ubroker.Broker { type core struct { // TODO: add required fields + // 1- A message id generation routine + // 2- A dictionary of message values and id keys + } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me + // Delivery returns a channel which continuously supplies + // messages to consumers. + // We require following: + // + // 1. Resulting read-only channel is unique (it does + // not change each time you call it) + // 2. If `ctx` is canceled or timed out, `ctx.Err()` is + // returned + // 3. If broker is closed, `ErrClosed` is returned + // 4. should be thread-safe return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me + // Acknowledge is called by clients to declare that + // specified message id has been successfuly processed + // and should not be requeued to queue and we have to + // remove it. + // We demand following: + // + // 1. Non-existing ids should cause ErrInvalidID + // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID + // 3. Should prevent requeue due to TTL + // 4. If `ctx` is canceled or timed out, `ctx.Err()` is + // returned + // 5. If broker is closed, `ErrClosed` is returned + // 6. should be thread-safe return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me + // ReQueue is called by clients to declare that + // specified message id should be put back in + // front of the queue. + // We demand following: + // + // 1. Non-existing ids should cause ErrInvalidID + // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID + // 3. Should prevent requeue due to TTL + // 4. If `ctx` is canceled or timed out, `ctx.Err()` is + // returned + // 5. If broker is closed, `ErrClosed` is returned + // 6. should be thread-safe return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me + // Publish is used to enqueue a new message to broker + // We demand following: + // + // 1. If `ctx` is canceled or timed out, `ctx.Err()` is + // returned + // 2. If broker is closed, `ErrClosed` is returned + // 3. should be thread-safe return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") } func (c *core) Close() error { - // TODO:‌ implement me + // Broker interface implements io.Closer interface + // which supplies us with method `Close() error`. + // We require following: + // + // 1. closing of a closed broker should result in `nil` + // 2. should be thread-safe + // 3. all other operations after closing broker should result + // in ErrClosed error return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") } diff --git a/pkg/ubroker/ubroker.go b/pkg/ubroker/ubroker.go index 18268cd..92de13c 100644 --- a/pkg/ubroker/ubroker.go +++ b/pkg/ubroker/ubroker.go @@ -9,7 +9,7 @@ import ( // Broker interface defines functionalities of a // message broker system. // -// we require our message broker to timeout and requeue +// We require our message broker to timeout and requeue // unacknowledged messages automatically. We also require // that implementations should be thread-safe. type Broker interface { @@ -27,7 +27,7 @@ type Broker interface { // messages to consumers. // We require following: // - // 1. Resulting read-only channel is unique (it doesn + // 1. Resulting read-only channel is unique (it does // not change each time you call it) // 2. If `ctx` is canceled or timed out, `ctx.Err()` is // returned From 875d0dcee50f3fcbc6e9f2f66289c5eb8fbf07dd Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 29 Mar 2019 17:19:34 +0430 Subject: [PATCH 02/21] Fixing repo adresses --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..ff87a3a 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 69e0f3d..a4cf9ac 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -4,8 +4,8 @@ import ( "context" "time" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "github.com/sneyes/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..19d29e8 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..06b17d6 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,8 +17,8 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "github.com/sneyes/ubroker/pkg/ubroker" ) var ( diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..7a26d94 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/server" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From 5525a8f8da71b62accb498e5fb5ef716fd076cb6 Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 5 Apr 2019 00:15:10 +0430 Subject: [PATCH 03/21] error checks, core struct --- internal/broker/core.go | 59 ++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index a4cf9ac..69d75f2 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -1,6 +1,7 @@ package broker import ( + "container/list" "context" "time" @@ -13,10 +14,27 @@ import ( // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + result := &core{ + ttl: ttl, + } + result.messageList = *list.New() + a := &coreMsg{} + result.messageList.PushFront(a) + result.deliveryChan = make(chan ubroker.Delivery) + + return result +} + +type coreMsg struct { + msgD ubroker.Delivery + timeInQueue time.Time } type core struct { + messageList list.List + ttl time.Duration + deliveryChan <-chan ubroker.Delivery + isClosed bool // TODO: add required fields // 1- A message id generation routine // 2- A dictionary of message values and id keys @@ -24,24 +42,20 @@ type core struct { } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // Delivery returns a channel which continuously supplies - // messages to consumers. + // Delivery returns a channel which continuously supplies messages to consumers. // We require following: - // - // 1. Resulting read-only channel is unique (it does - // not change each time you call it) - // 2. If `ctx` is canceled or timed out, `ctx.Err()` is - // returned - // 3. If broker is closed, `ErrClosed` is returned // 4. should be thread-safe - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if ctx.Err() != nil { + return nil, ctx.Err() + } + if c.isClosed { + return nil, errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") + } + return c.deliveryChan, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // Acknowledge is called by clients to declare that - // specified message id has been successfuly processed - // and should not be requeued to queue and we have to - // remove it. + // Acknowledge is called by clients to declare that specified message id has been successfuly processed and should not be requeued to queue and we have to remove it. // We demand following: // // 1. Non-existing ids should cause ErrInvalidID @@ -55,10 +69,7 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - // ReQueue is called by clients to declare that - // specified message id should be put back in - // front of the queue. - // We demand following: + // ReQueue is called by clients to declare that specified message id should be put back in front of the queue. We demand following: // // 1. Non-existing ids should cause ErrInvalidID // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID @@ -67,6 +78,12 @@ func (c *core) ReQueue(ctx context.Context, id int) error { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe + if ctx.Err() != nil { + return ctx.Err() + } + if c.isClosed { + return errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") + } return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") } @@ -78,6 +95,12 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { // returned // 2. If broker is closed, `ErrClosed` is returned // 3. should be thread-safe + if ctx.Err() != nil { + return ctx.Err() + } + if c.isClosed { + return errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") + } return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") } From 058578494a8051fcc46742ae2833491da12134b8 Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 5 Apr 2019 13:46:56 +0430 Subject: [PATCH 04/21] DONE: publish --- internal/broker/core.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 69d75f2..3fcfc8c 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -3,6 +3,8 @@ package broker import ( "container/list" "context" + "math/rand" + "sync" "time" "github.com/pkg/errors" @@ -18,9 +20,8 @@ func New(ttl time.Duration) ubroker.Broker { ttl: ttl, } result.messageList = *list.New() - a := &coreMsg{} - result.messageList.PushFront(a) result.deliveryChan = make(chan ubroker.Delivery) + result.idSet = make(map[int]bool) return result } @@ -35,6 +36,8 @@ type core struct { ttl time.Duration deliveryChan <-chan ubroker.Delivery isClosed bool + idSet map[int]bool + mutex sync.Mutex // TODO: add required fields // 1- A message id generation routine // 2- A dictionary of message values and id keys @@ -88,20 +91,24 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // Publish is used to enqueue a new message to broker - // We demand following: - // - // 1. If `ctx` is canceled or timed out, `ctx.Err()` is - // returned - // 2. If broker is closed, `ErrClosed` is returned - // 3. should be thread-safe + c.mutex.Lock() + defer c.mutex.Unlock() + msg := new(coreMsg) + msg.msgD.Message = message + msg.timeInQueue = time.Now() + msg.msgD.ID = rand.Int() + for c.idSet[msg.msgD.ID] { + msg.msgD.ID = rand.Int() + } + c.idSet[msg.msgD.ID] = true + if ctx.Err() != nil { return ctx.Err() } if c.isClosed { return errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") } - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + return nil } func (c *core) Close() error { From 34e9d8ca9c4af8c76af1068fa9181ad75ecfd4b5 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 6 Apr 2019 15:25:04 +0430 Subject: [PATCH 05/21] Delivery Done Fixed: messageList and deliveryChan --- internal/broker/core.go | 56 +++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 3fcfc8c..5d59ab9 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -1,7 +1,6 @@ package broker import ( - "container/list" "context" "math/rand" "sync" @@ -19,8 +18,8 @@ func New(ttl time.Duration) ubroker.Broker { result := &core{ ttl: ttl, } - result.messageList = *list.New() - result.deliveryChan = make(chan ubroker.Delivery) + result.messageList = make([]coreMsg, 0) + result.deliveryChan = make(chan ubroker.Delivery, 100000000) result.idSet = make(map[int]bool) return result @@ -32,27 +31,26 @@ type coreMsg struct { } type core struct { - messageList list.List + messageList []coreMsg ttl time.Duration - deliveryChan <-chan ubroker.Delivery + deliveryChan chan ubroker.Delivery isClosed bool idSet map[int]bool mutex sync.Mutex - // TODO: add required fields - // 1- A message id generation routine - // 2- A dictionary of message values and id keys - } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // Delivery returns a channel which continuously supplies messages to consumers. - // We require following: - // 4. should be thread-safe + c.mutex.Lock() + defer c.mutex.Unlock() if ctx.Err() != nil { return nil, ctx.Err() } if c.isClosed { - return nil, errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") + return nil, errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + } + for index, value := range c.messageList { + _ = index + c.deliveryChan <- value.msgD } return c.deliveryChan, nil } @@ -81,11 +79,13 @@ func (c *core) ReQueue(ctx context.Context, id int) error { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe + c.mutex.Lock() + defer c.mutex.Unlock() if ctx.Err() != nil { return ctx.Err() } if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") + return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") } return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") } @@ -93,6 +93,13 @@ func (c *core) ReQueue(ctx context.Context, id int) error { func (c *core) Publish(ctx context.Context, message ubroker.Message) error { c.mutex.Lock() defer c.mutex.Unlock() + if ctx.Err() != nil { + return ctx.Err() + } + if c.isClosed { + return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + } + msg := new(coreMsg) msg.msgD.Message = message msg.timeInQueue = time.Now() @@ -101,24 +108,13 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { msg.msgD.ID = rand.Int() } c.idSet[msg.msgD.ID] = true - - if ctx.Err() != nil { - return ctx.Err() - } - if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "delivery error, Broker is closed") - } + c.messageList = append(c.messageList, *msg) return nil } func (c *core) Close() error { - // Broker interface implements io.Closer interface - // which supplies us with method `Close() error`. - // We require following: - // - // 1. closing of a closed broker should result in `nil` - // 2. should be thread-safe - // 3. all other operations after closing broker should result - // in ErrClosed error - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + c.mutex.Lock() + defer c.mutex.Unlock() + c.isClosed = true + return nil } From c0551b83c2a1f4190e65ac7fa76e6bc5781eefc0 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 6 Apr 2019 15:48:01 +0430 Subject: [PATCH 06/21] Acknowledge Done --- internal/broker/core.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 5d59ab9..63fd52e 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -55,6 +55,10 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { return c.deliveryChan, nil } +func removeMessage(slice []coreMsg, s int) []coreMsg { + return append(slice[:s], slice[s+1:]...) +} + func (c *core) Acknowledge(ctx context.Context, id int) error { // Acknowledge is called by clients to declare that specified message id has been successfuly processed and should not be requeued to queue and we have to remove it. // We demand following: @@ -62,11 +66,25 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { // 1. Non-existing ids should cause ErrInvalidID // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID // 3. Should prevent requeue due to TTL - // 4. If `ctx` is canceled or timed out, `ctx.Err()` is - // returned - // 5. If broker is closed, `ErrClosed` is returned - // 6. should be thread-safe - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + c.mutex.Lock() + defer c.mutex.Unlock() + if ctx.Err() != nil { + return ctx.Err() + } + if c.isClosed { + return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + } + ackMessageIndex := -1 + for index, value := range c.messageList { + if value.msgD.ID == id { + ackMessageIndex = index + } + } + if ackMessageIndex == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + } + c.messageList = removeMessage(c.messageList, ackMessageIndex) + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { From 4406433d01c6628993dde899e91d51437852c161 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 6 Apr 2019 15:54:19 +0430 Subject: [PATCH 07/21] Fix Close() --- internal/broker/core.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 63fd52e..b8a7cc3 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -60,12 +60,6 @@ func removeMessage(slice []coreMsg, s int) []coreMsg { } func (c *core) Acknowledge(ctx context.Context, id int) error { - // Acknowledge is called by clients to declare that specified message id has been successfuly processed and should not be requeued to queue and we have to remove it. - // We demand following: - // - // 1. Non-existing ids should cause ErrInvalidID - // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID - // 3. Should prevent requeue due to TTL c.mutex.Lock() defer c.mutex.Unlock() if ctx.Err() != nil { @@ -134,5 +128,6 @@ func (c *core) Close() error { c.mutex.Lock() defer c.mutex.Unlock() c.isClosed = true + close(c.deliveryChan) return nil } From 0abe5678aa021265c62c4f328b81d1a6631d9491 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 6 Apr 2019 17:49:27 +0430 Subject: [PATCH 08/21] All tests completed except DataRace --- internal/broker/core.go | 51 +++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index b8a7cc3..ef4b1ab 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -66,31 +66,39 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { return ctx.Err() } if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + return errors.Wrap(ubroker.ErrClosed, "Acknowledge error, Broker is closed") } ackMessageIndex := -1 for index, value := range c.messageList { if value.msgD.ID == id { ackMessageIndex = index + break } } if ackMessageIndex == -1 { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") } + + if time.Now().Sub(c.messageList[ackMessageIndex].timeInQueue) > c.ttl { + go c.ReQueue(ctx, id) + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + } c.messageList = removeMessage(c.messageList, ackMessageIndex) + for len(c.deliveryChan) > 0 { + <-c.deliveryChan + } + for index, value := range c.messageList { + _ = index + c.deliveryChan <- value.msgD + } return nil } func (c *core) ReQueue(ctx context.Context, id int) error { // ReQueue is called by clients to declare that specified message id should be put back in front of the queue. We demand following: // - // 1. Non-existing ids should cause ErrInvalidID // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID // 3. Should prevent requeue due to TTL - // 4. If `ctx` is canceled or timed out, `ctx.Err()` is - // returned - // 5. If broker is closed, `ErrClosed` is returned - // 6. should be thread-safe c.mutex.Lock() defer c.mutex.Unlock() if ctx.Err() != nil { @@ -99,7 +107,35 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if c.isClosed { return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") } - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + requeueMessageIndex := -1 + requeueMessageValue := &coreMsg{} + for index, value := range c.messageList { + if value.msgD.ID == id { + requeueMessageIndex = index + requeueMessageValue = &value + break + } + } + if requeueMessageIndex == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + } + requeueMessageValue.timeInQueue = time.Now() + requeueMessageValue.msgD.ID = rand.Int() + for c.idSet[requeueMessageValue.msgD.ID] { + requeueMessageValue.msgD.ID = rand.Int() + } + c.idSet[requeueMessageValue.msgD.ID] = true + c.messageList = removeMessage(c.messageList, requeueMessageIndex) + c.messageList = append(c.messageList, *requeueMessageValue) + + for len(c.deliveryChan) > 0 { + <-c.deliveryChan + } + for index, value := range c.messageList { + _ = index + c.deliveryChan <- value.msgD + } + return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { @@ -121,6 +157,7 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } c.idSet[msg.msgD.ID] = true c.messageList = append(c.messageList, *msg) + c.deliveryChan <- msg.msgD return nil } From 29c05b805c987e66bc1f41e18a2b3f1e3dc07083 Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 12 Apr 2019 14:19:43 +0430 Subject: [PATCH 09/21] Typos --- .gitignore | 3 ++- internal/broker/core.go | 18 +++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 620a673..134f855 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/ubroker \ No newline at end of file +/ubroker +*.test \ No newline at end of file diff --git a/internal/broker/core.go b/internal/broker/core.go index ef4b1ab..62a4bde 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -48,10 +48,10 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { if c.isClosed { return nil, errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") } - for index, value := range c.messageList { - _ = index - c.deliveryChan <- value.msgD - } + // for index, value := range c.messageList { + // _ = index + // c.deliveryChan <- value.msgD + // } return c.deliveryChan, nil } @@ -95,17 +95,13 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - // ReQueue is called by clients to declare that specified message id should be put back in front of the queue. We demand following: - // - // 2. Re-acknowledgement and Requeue of id should cause ErrInvalidID - // 3. Should prevent requeue due to TTL c.mutex.Lock() defer c.mutex.Unlock() if ctx.Err() != nil { return ctx.Err() } if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + return errors.Wrap(ubroker.ErrClosed, "Requeue error, Broker is closed") } requeueMessageIndex := -1 requeueMessageValue := &coreMsg{} @@ -117,7 +113,7 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } } if requeueMessageIndex == -1 { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + return errors.Wrap(ubroker.ErrInvalidID, "Requeue error, ID not found") } requeueMessageValue.timeInQueue = time.Now() requeueMessageValue.msgD.ID = rand.Int() @@ -145,7 +141,7 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { return ctx.Err() } if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + return errors.Wrap(ubroker.ErrClosed, "Publish error, Broker is closed") } msg := new(coreMsg) From e82980457b0296654151549853bd51d25f667266 Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 12 Apr 2019 14:43:52 +0430 Subject: [PATCH 10/21] mutex simplified --- internal/broker/core.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 62a4bde..d8dc74a 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -36,12 +36,12 @@ type core struct { deliveryChan chan ubroker.Delivery isClosed bool idSet map[int]bool - mutex sync.Mutex + sync.Mutex } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.Lock() + defer c.Unlock() if ctx.Err() != nil { return nil, ctx.Err() } @@ -60,8 +60,8 @@ func removeMessage(slice []coreMsg, s int) []coreMsg { } func (c *core) Acknowledge(ctx context.Context, id int) error { - c.mutex.Lock() - defer c.mutex.Unlock() + c.Lock() + defer c.Unlock() if ctx.Err() != nil { return ctx.Err() } @@ -95,8 +95,8 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - c.mutex.Lock() - defer c.mutex.Unlock() + c.Lock() + defer c.Unlock() if ctx.Err() != nil { return ctx.Err() } @@ -135,8 +135,8 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - c.mutex.Lock() - defer c.mutex.Unlock() + c.Lock() + defer c.Unlock() if ctx.Err() != nil { return ctx.Err() } @@ -158,8 +158,11 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { - c.mutex.Lock() - defer c.mutex.Unlock() + if c.isClosed { + return nil + } + c.Lock() + defer c.Unlock() c.isClosed = true close(c.deliveryChan) return nil From 942f45961ff302c6f7bd9198e6c89d3f46579787 Mon Sep 17 00:00:00 2001 From: sneyes Date: Fri, 12 Apr 2019 22:42:53 +0430 Subject: [PATCH 11/21] Added waitgroups, TTL check fixes --- internal/broker/core.go | 112 +++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index d8dc74a..4668dcc 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -10,21 +10,6 @@ import ( "github.com/sneyes/ubroker/pkg/ubroker" ) -// New creates a new instance of ubroker.Broker -// with given `ttl`. `ttl` determines time in which -// we requeue an unacknowledged/unrequeued message -// automatically. -func New(ttl time.Duration) ubroker.Broker { - result := &core{ - ttl: ttl, - } - result.messageList = make([]coreMsg, 0) - result.deliveryChan = make(chan ubroker.Delivery, 100000000) - result.idSet = make(map[int]bool) - - return result -} - type coreMsg struct { msgD ubroker.Delivery timeInQueue time.Time @@ -34,39 +19,58 @@ type core struct { messageList []coreMsg ttl time.Duration deliveryChan chan ubroker.Delivery - isClosed bool + isClosed chan bool idSet map[int]bool + sync.WaitGroup sync.Mutex } -func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { +//New broker +func New(ttl time.Duration) ubroker.Broker { + result := &core{ + ttl: ttl, + } + result.messageList = make([]coreMsg, 0) + result.deliveryChan = make(chan ubroker.Delivery, 100000) + result.idSet = make(map[int]bool) + result.isClosed = make(chan bool, 1) + + return result +} + +func (c *core) checksAndLocks(ctx context.Context) error { c.Lock() defer c.Unlock() - if ctx.Err() != nil { - return nil, ctx.Err() + c.Add(1) + select { + case <-c.isClosed: + return ubroker.ErrClosed + default: } - if c.isClosed { - return nil, errors.Wrap(ubroker.ErrClosed, "Delivery error, Broker is closed") + + if ctx.Err() != nil { + return ctx.Err() } - // for index, value := range c.messageList { - // _ = index - // c.deliveryChan <- value.msgD - // } - return c.deliveryChan, nil + return nil } func removeMessage(slice []coreMsg, s int) []coreMsg { return append(slice[:s], slice[s+1:]...) } +func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { + defer c.Done() + err := c.checksAndLocks(ctx) + if err != nil { + return nil, errors.Wrap(err, "Delivery error, Broker is closed") + } + return c.deliveryChan, nil +} func (c *core) Acknowledge(ctx context.Context, id int) error { - c.Lock() - defer c.Unlock() - if ctx.Err() != nil { - return ctx.Err() - } - if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Acknowledge error, Broker is closed") + defer c.Done() + err := c.checksAndLocks(ctx) + if err != nil { + return errors.Wrap(err, "Acknowledge error, Broker is closed") } ackMessageIndex := -1 for index, value := range c.messageList { @@ -80,7 +84,6 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } if time.Now().Sub(c.messageList[ackMessageIndex].timeInQueue) > c.ttl { - go c.ReQueue(ctx, id) return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") } c.messageList = removeMessage(c.messageList, ackMessageIndex) @@ -95,13 +98,10 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - c.Lock() - defer c.Unlock() - if ctx.Err() != nil { - return ctx.Err() - } - if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Requeue error, Broker is closed") + defer c.Done() + err := c.checksAndLocks(ctx) + if err != nil { + return errors.Wrap(err, "Requeue error, Broker is closed") } requeueMessageIndex := -1 requeueMessageValue := &coreMsg{} @@ -115,6 +115,7 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if requeueMessageIndex == -1 { return errors.Wrap(ubroker.ErrInvalidID, "Requeue error, ID not found") } + ttlErr := time.Now().Sub(c.messageList[requeueMessageIndex].timeInQueue) > c.ttl requeueMessageValue.timeInQueue = time.Now() requeueMessageValue.msgD.ID = rand.Int() for c.idSet[requeueMessageValue.msgD.ID] { @@ -131,19 +132,18 @@ func (c *core) ReQueue(ctx context.Context, id int) error { _ = index c.deliveryChan <- value.msgD } + if ttlErr { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + } return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - c.Lock() - defer c.Unlock() - if ctx.Err() != nil { - return ctx.Err() + defer c.Done() + err := c.checksAndLocks(ctx) + if err != nil { + return errors.Wrap(err, "Publish error, Broker is closed") } - if c.isClosed { - return errors.Wrap(ubroker.ErrClosed, "Publish error, Broker is closed") - } - msg := new(coreMsg) msg.msgD.Message = message msg.timeInQueue = time.Now() @@ -158,12 +158,16 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { - if c.isClosed { - return nil - } c.Lock() - defer c.Unlock() - c.isClosed = true + select { + case <-c.isClosed: + _ = c + default: + close(c.isClosed) + } + c.Unlock() + + c.Wait() close(c.deliveryChan) return nil } From 02b029d71a1ad33d7b28a6678a6a538958cf6225 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 13 Apr 2019 01:41:23 +0430 Subject: [PATCH 12/21] And still, Data-Race is bugged :( --- internal/broker/core.go | 72 ++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 4668dcc..d3a023d 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -20,8 +20,8 @@ type core struct { ttl time.Duration deliveryChan chan ubroker.Delivery isClosed chan bool - idSet map[int]bool - sync.WaitGroup + idSet []int + wg *sync.WaitGroup sync.Mutex } @@ -29,19 +29,24 @@ type core struct { func New(ttl time.Duration) ubroker.Broker { result := &core{ ttl: ttl, + wg: &sync.WaitGroup{}, } result.messageList = make([]coreMsg, 0) result.deliveryChan = make(chan ubroker.Delivery, 100000) - result.idSet = make(map[int]bool) + result.idSet = make([]int, 0) result.isClosed = make(chan bool, 1) return result } - -func (c *core) checksAndLocks(ctx context.Context) error { - c.Lock() - defer c.Unlock() - c.Add(1) +func indexOf(value int, array []int) int { + for p, v := range array { + if v == value { + return p + } + } + return -1 +} +func (c *core) errorChecks(ctx context.Context) error { select { case <-c.isClosed: return ubroker.ErrClosed @@ -58,20 +63,20 @@ func removeMessage(slice []coreMsg, s int) []coreMsg { return append(slice[:s], slice[s+1:]...) } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - defer c.Done() - err := c.checksAndLocks(ctx) + err := c.errorChecks(ctx) if err != nil { - return nil, errors.Wrap(err, "Delivery error, Broker is closed") + return nil, errors.Wrap(err, "Delivery error") } return c.deliveryChan, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - defer c.Done() - err := c.checksAndLocks(ctx) + err := c.errorChecks(ctx) if err != nil { - return errors.Wrap(err, "Acknowledge error, Broker is closed") + return errors.Wrap(err, "Acknowledge error") } + c.Lock() + defer c.Unlock() ackMessageIndex := -1 for index, value := range c.messageList { if value.msgD.ID == id { @@ -98,11 +103,14 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - defer c.Done() - err := c.checksAndLocks(ctx) + err := c.errorChecks(ctx) if err != nil { - return errors.Wrap(err, "Requeue error, Broker is closed") + return errors.Wrap(err, "Requeue error") } + c.wg.Add(1) + c.Lock() + defer c.Unlock() + defer c.wg.Done() requeueMessageIndex := -1 requeueMessageValue := &coreMsg{} for index, value := range c.messageList { @@ -118,10 +126,10 @@ func (c *core) ReQueue(ctx context.Context, id int) error { ttlErr := time.Now().Sub(c.messageList[requeueMessageIndex].timeInQueue) > c.ttl requeueMessageValue.timeInQueue = time.Now() requeueMessageValue.msgD.ID = rand.Int() - for c.idSet[requeueMessageValue.msgD.ID] { + for indexOf(requeueMessageValue.msgD.ID, c.idSet) != -1 { requeueMessageValue.msgD.ID = rand.Int() } - c.idSet[requeueMessageValue.msgD.ID] = true + c.idSet = append(c.idSet, requeueMessageValue.msgD.ID) c.messageList = removeMessage(c.messageList, requeueMessageIndex) c.messageList = append(c.messageList, *requeueMessageValue) @@ -139,35 +147,31 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - defer c.Done() - err := c.checksAndLocks(ctx) + err := c.errorChecks(ctx) if err != nil { - return errors.Wrap(err, "Publish error, Broker is closed") + return errors.Wrap(err, "Publish error") } + c.wg.Add(1) + c.Lock() + defer c.Unlock() + defer c.wg.Done() msg := new(coreMsg) msg.msgD.Message = message msg.timeInQueue = time.Now() msg.msgD.ID = rand.Int() - for c.idSet[msg.msgD.ID] { + for indexOf(msg.msgD.ID, c.idSet) != -1 { msg.msgD.ID = rand.Int() } - c.idSet[msg.msgD.ID] = true + c.idSet = append(c.idSet, msg.msgD.ID) + c.messageList = append(c.messageList, *msg) c.deliveryChan <- msg.msgD return nil } func (c *core) Close() error { - c.Lock() - select { - case <-c.isClosed: - _ = c - default: - close(c.isClosed) - } - c.Unlock() - - c.Wait() + c.wg.Wait() + close(c.isClosed) close(c.deliveryChan) return nil } From 5a04421a29b2748abc76bfc2786922b60f78b8bc Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 20 Apr 2019 00:57:29 +0430 Subject: [PATCH 13/21] FIXED, Finally! Found the most stupid mistake... --- internal/broker/core.go | 84 ++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index d3a023d..4e62cfe 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,7 +2,6 @@ package broker import ( "context" - "math/rand" "sync" "time" @@ -16,12 +15,13 @@ type coreMsg struct { } type core struct { - messageList []coreMsg - ttl time.Duration - deliveryChan chan ubroker.Delivery - isClosed chan bool - idSet []int - wg *sync.WaitGroup + messageList []coreMsg + ttl time.Duration + deliveryChan chan ubroker.Delivery + deliveryGiven bool + isClosed chan bool + idSet []int + wg *sync.WaitGroup sync.Mutex } @@ -35,7 +35,7 @@ func New(ttl time.Duration) ubroker.Broker { result.deliveryChan = make(chan ubroker.Delivery, 100000) result.idSet = make([]int, 0) result.isClosed = make(chan bool, 1) - + result.deliveryGiven = false return result } func indexOf(value int, array []int) int { @@ -58,25 +58,35 @@ func (c *core) errorChecks(ctx context.Context) error { } return nil } - func removeMessage(slice []coreMsg, s int) []coreMsg { return append(slice[:s], slice[s+1:]...) } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { + c.Lock() + defer c.Unlock() err := c.errorChecks(ctx) if err != nil { return nil, errors.Wrap(err, "Delivery error") } + c.deliveryGiven = true return c.deliveryChan, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { + c.Lock() + defer c.Unlock() + c.wg.Add(1) + defer c.wg.Done() err := c.errorChecks(ctx) if err != nil { + if err == ubroker.ErrClosed { + return err + } return errors.Wrap(err, "Acknowledge error") } - c.Lock() - defer c.Unlock() + if !c.deliveryGiven { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, not delivered yet") + } ackMessageIndex := -1 for index, value := range c.messageList { if value.msgD.ID == id { @@ -89,27 +99,26 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } if time.Now().Sub(c.messageList[ackMessageIndex].timeInQueue) > c.ttl { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge TTL error, ID not found") } c.messageList = removeMessage(c.messageList, ackMessageIndex) - for len(c.deliveryChan) > 0 { - <-c.deliveryChan - } - for index, value := range c.messageList { - _ = index - c.deliveryChan <- value.msgD - } return nil } func (c *core) ReQueue(ctx context.Context, id int) error { + c.Lock() + defer c.Unlock() err := c.errorChecks(ctx) if err != nil { + if err == ubroker.ErrClosed { + return err + } return errors.Wrap(err, "Requeue error") } + if !c.deliveryGiven { + return errors.Wrap(ubroker.ErrInvalidID, "Requeue error, not delivered yet") + } c.wg.Add(1) - c.Lock() - defer c.Unlock() defer c.wg.Done() requeueMessageIndex := -1 requeueMessageValue := &coreMsg{} @@ -125,42 +134,39 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } ttlErr := time.Now().Sub(c.messageList[requeueMessageIndex].timeInQueue) > c.ttl requeueMessageValue.timeInQueue = time.Now() - requeueMessageValue.msgD.ID = rand.Int() + requeueMessageValue.msgD.ID++ for indexOf(requeueMessageValue.msgD.ID, c.idSet) != -1 { - requeueMessageValue.msgD.ID = rand.Int() + requeueMessageValue.msgD.ID++ } c.idSet = append(c.idSet, requeueMessageValue.msgD.ID) c.messageList = removeMessage(c.messageList, requeueMessageIndex) c.messageList = append(c.messageList, *requeueMessageValue) - for len(c.deliveryChan) > 0 { - <-c.deliveryChan - } - for index, value := range c.messageList { - _ = index - c.deliveryChan <- value.msgD - } + c.deliveryChan <- requeueMessageValue.msgD if ttlErr { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") + return errors.Wrap(ubroker.ErrInvalidID, "Requeue TTL error, ID not found") } return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { + c.Lock() + defer c.Unlock() err := c.errorChecks(ctx) if err != nil { + if err == ubroker.ErrClosed { + return err + } return errors.Wrap(err, "Publish error") } c.wg.Add(1) - c.Lock() - defer c.Unlock() defer c.wg.Done() msg := new(coreMsg) msg.msgD.Message = message msg.timeInQueue = time.Now() - msg.msgD.ID = rand.Int() + msg.msgD.ID = 1 for indexOf(msg.msgD.ID, c.idSet) != -1 { - msg.msgD.ID = rand.Int() + msg.msgD.ID++ } c.idSet = append(c.idSet, msg.msgD.ID) @@ -170,6 +176,14 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { + c.Lock() + defer c.Unlock() + select { + case <-c.isClosed: + return nil + default: + c.isClosed <- true + } c.wg.Wait() close(c.isClosed) close(c.deliveryChan) From 8c6a70e9ebcac779b37b2b03de9e07bc64defa0e Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 20 Apr 2019 01:24:41 +0430 Subject: [PATCH 14/21] Repository address fixes --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index ff87a3a..40e5f9e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 4e62cfe..b0d4fff 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,8 +5,8 @@ import ( "sync" "time" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" - "github.com/sneyes/ubroker/pkg/ubroker" ) type coreMsg struct { diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 19d29e8..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" diff --git a/internal/server/http.go b/internal/server/http.go index 06b17d6..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,8 +17,8 @@ import ( "github.com/sirupsen/logrus" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" - "github.com/sneyes/ubroker/pkg/ubroker" ) var ( diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 7a26d94..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/sneyes/ubroker/internal/server" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From 99b452bd1f904ae99401b286f02327f91a3b3261 Mon Sep 17 00:00:00 2001 From: sneyes Date: Mon, 13 May 2019 14:47:38 +0430 Subject: [PATCH 15/21] Phase 2 init --- .gitignore | 2 +- .travis.yml | 6 +- Makefile | 25 +- README.md | 42 ++- api/ubroker.proto | 51 ++++ cmd/ubroker/main.go | 29 +- internal/broker/core.go | 425 +++++++++++++++++++---------- internal/broker/core_test.go | 132 +++++---- internal/server/grpc.go | 36 +++ internal/server/grpc_test.go | 383 ++++++++++++++++++++++++++ internal/server/http.go | 18 +- internal/server/http_test.go | 66 ++--- internal/server/moc_broker_test.go | 44 +++ pkg/ubroker/ubroker.go | 28 +- 14 files changed, 988 insertions(+), 299 deletions(-) create mode 100644 api/ubroker.proto create mode 100644 internal/server/grpc.go create mode 100644 internal/server/grpc_test.go create mode 100644 internal/server/moc_broker_test.go diff --git a/.gitignore b/.gitignore index 134f855..4b66428 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ /ubroker -*.test \ No newline at end of file +*.pb.go \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index a573047..ae3a1eb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,9 @@ go: - 1.12.x before_install: - - make dev-dependencies + - curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.7.1/protoc-3.7.1-linux-x86_64.zip + - unzip protoc-3.7.1-linux-x86_64.zip -d protoc3 + - PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make dev-dependencies script: - - make check + - PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make check diff --git a/Makefile b/Makefile index 5bc7475..db00acb 100644 --- a/Makefile +++ b/Makefile @@ -1,23 +1,36 @@ -.PHONY: check help dependencies dev-dependencies +.PHONY: check help dependencies dev-dependencies .pre-check-go generate SRCS = $(patsubst ./%,%,$(shell find . -name "*.go" -not -path "*vendor*")) help: ## Display this help screen @grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' -check: dev-dependencies ## Run unit tests +check: dev-dependencies | generate ## Run unit tests go test ./... go test -race ./... -benchmark: dev-dependencies ## Run benchmarks +benchmark: dev-dependencies | generate ## Run benchmarks go test -bench . ./... -dependencies: ##‌ Download dependencies +dependencies: | generate ##‌ Download dependencies go get -v ./... -dev-dependencies: dependencies ##‌ Download development dependencies +dev-dependencies: dependencies | generate ##‌ Download development dependencies go get -v github.com/stretchr/testify/suite go get -v github.com/stretchr/testify/assert + go get -v github.com/phayes/freeport -ubroker: $(SRCS) | dependencies ##‌ Compile us +ubroker: $(SRCS) pkg/ubroker/ubroker.pb.go | dependencies generate ##‌ Compile us go build -o ubroker ./cmd/ubroker + +generate: pkg/ubroker/ubroker.pb.go + +pkg/ubroker/ubroker.pb.go: api/ubroker.proto | .pre-check-go + $(PROTOC) $(PROTOC_OPTIONS) --go_out=plugins=grpc:$(GOPATH)/src api/ubroker.proto + +.pre-check-go: + go get -v github.com/golang/protobuf/protoc-gen-go + go get -v github.com/vektra/mockery/.../ + +PROTOC ?= protoc +PROTOC_OPTIONS ?= diff --git a/README.md b/README.md index 062a3b2..331a550 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,43 @@ # ubroker +Welcome to phase2! + +# Current Phase 2 + +Greetings! As you may already know, In this phase of the project we are about to take +our broker system which we implemented prevously, take out it's HTTP‌ (JSON) layer and replace +it with a neat gRPC‌ mechanism ! + +Before we begin any further, I'd like to point out that our master branch has changed and +contains implementation made by your friend, and ofcourse, my dear colleague at Cafebazaar, +[Milad Norouzi](https://github.com/miladosos) as reference implementation. I've made slight +alterations to his implementations to make it compatible with our new protocol buffer specification. + +So.. list of changes I've made to master branch are as follows: + +* `api/ubroker.proto`: This file now contains proto specification for RPC's we'd like to have. +* `Makefile`: I've added a dependency for tests (a library which generates random free ports) and ofcourse, directives to generate GO‌ code from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`. The first time you try to execute `make check` or `make ubroker` this autogenerated file will be generated from proto file. +* `pkg/ubroker/ubroker.go`: I've modified our `Broker` interface to use structs defined in our protobuffer `api/ubroker.proto`. +* `internal/broker/core.go`: Contains implementation from my colleague [Milad Norouzi](https://github.com/miladosos) +* `internal/broker/core_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/http.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/http_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/grpc.go`: This new file is the file you shoud implement (which implements `BrokerServer` autogenerated from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`) +* `internal/server/grpc_test.go`: This new file contains tests that I'd like to PASS :) and you should too! +* `cmd/ubroker/main.go`: I've made modifications to start gRPC‌ server from implementation in `internal/server/grpc.go`. + +Your task is as follows: +1. Make sure you have `protoc` installed. (the protobuf compiler) +2. Run `make dev-dependencies` to download all dependencies and ofcourse, autogenerate `pkg/ubroker/ubroker.pb.go` +3. Implement file at `internal/server/grpc.go` +4. Submit your pull request! + +# Previous Phase 1 + Happy new year to you, students! -We wanted to set the stage for continuous series of excersices to develop +We wanted to set the stage for continuous series of exercises to develop a message broker. A message broker is a system that acts as a hub to distribute messages in a @@ -18,12 +52,12 @@ various benefits of using messaging systems, which the most important are: And much more! I highly encourage you two watch [this](https://www.youtube.com/watch?v=rXi5CLjIQ9k) awesome video! I also encourage you to look at RabbitMQ design. -So, the messaging system is expected to do following opration: +So, the messaging system is expected to do following operation: 1. Provide a `/publish` HTTP API that let's clients publish messages to a queue. Or to say enqueue messages in our queue. 2. Provide a `/fetch` HTTP‌ API that let's clients fetch messages from queue. -3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` gurantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfuly and gracefully processed them. This is why this system is called to have a `at least once` gurantee because clients might see messages **at least once**. By stating that our gurantee is at-least once, we are not referring to a randomized behaviour that we might re-send a message to clients, but rather clients might see messages more than once becuase of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfuly processed fetched message. -4. Provde a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them. +3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` guarantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfully and gracefully processed them. This is why this system is called to have an `at least once` guarantee because clients might see messages **at least once**. By stating that our gaurantee is at-least once, we are not referring to a randomized behavior that we might re-send a message to clients, but rather clients might see messages more than once because of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfully processed fetched message. +4. Provide a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them. Now... Don't you guys worry! We have already laid out a boilerplate code beautifully so that you can learn how to code in GO and also validate your results! The steps are as follows: diff --git a/api/ubroker.proto b/api/ubroker.proto new file mode 100644 index 0000000..7b2c23c --- /dev/null +++ b/api/ubroker.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package ubroker; +option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; + +import "google/protobuf/empty.proto"; + +service Broker { + // Fetch should return a single Delivery per FetchRequest. + // Should return: + // Unavailable: If broker has been closed + rpc Fetch(stream FetchRequest) returns (stream Delivery); + + // Acknowledge a message + // Should return: + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + rpc Acknowledge(AcknowledgeRequest) returns (google.protobuf.Empty); + + // ReQueue a message + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + rpc ReQueue(ReQueueRequest) returns (google.protobuf.Empty); + + // Publish message to Queue + // OK: on success + // Unavailable: If broker has been closed + rpc Publish(Message) returns (google.protobuf.Empty); +} + +message Message { + bytes body = 1; +} + +message Delivery { + Message message = 1; + int32 id = 2; +} + +message FetchRequest { +} + +message AcknowledgeRequest { + int32 id = 1; +} + +message ReQueueRequest { + int32 id = 1; +} \ No newline at end of file diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -3,10 +3,15 @@ package main import ( "flag" "fmt" + "log" + "net" "os" "os/signal" "time" + "github.com/arcana261/ubroker/pkg/ubroker" + "google.golang.org/grpc" + "github.com/arcana261/ubroker/internal/broker" "github.com/arcana261/ubroker/internal/server" ) @@ -19,13 +24,15 @@ func main() { broker := broker.New(time.Duration(*ttlPtr) * time.Millisecond) endpoint := fmt.Sprintf(":%d", *portPtr) - srv := server.NewHTTP(broker, endpoint) + servicer := server.NewGRPC(broker) - if err := srv.Run(); err != nil { - panic(err.Error()) - } + grpcServer := grpc.NewServer() + ubroker.RegisterBrokerServer(grpcServer, servicer) - fmt.Printf("listening on %s\n", endpoint) + listener, err := net.Listen("tcp", endpoint) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } signalChan := make(chan os.Signal, 1) cleanupDone := make(chan struct{}) @@ -35,11 +42,17 @@ func main() { fmt.Printf("\nReceived an interrupt, stopping services...\n\n") close(cleanupDone) }() + + go func() { + if err := grpcServer.Serve(listener); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + fmt.Printf("listening on %s\n", endpoint) <-cleanupDone - if err := srv.Close(); err != nil { - panic(err.Error()) - } + grpcServer.GracefulStop() if err := broker.Close(); err != nil { panic(err.Error()) diff --git a/internal/broker/core.go b/internal/broker/core.go index b0d4fff..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,187 +5,338 @@ import ( "sync" "time" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + + "github.com/arcana261/ubroker/pkg/ubroker" ) -type coreMsg struct { - msgD ubroker.Delivery - timeInQueue time.Time +// New creates a new instance of ubroker.Broker +// with given `ttl`. `ttl` determines time in which +// we requeue an unacknowledged/unrequeued message +// automatically. +func New(ttl time.Duration) ubroker.Broker { + broker := &core{ + ttl: ttl, + requests: make(chan interface{}), + deliveryChannel: make(chan *ubroker.Delivery), + closed: make(chan bool, 1), + closing: make(chan bool, 1), + pending: make(map[int32]*ubroker.Message), + messages: []*ubroker.Delivery{{}}, + } + + broker.wg.Add(1) + go broker.startDelivery() + + return broker } type core struct { - messageList []coreMsg - ttl time.Duration - deliveryChan chan ubroker.Delivery - deliveryGiven bool - isClosed chan bool - idSet []int - wg *sync.WaitGroup - sync.Mutex + nextID int32 + ttl time.Duration + + mutex sync.Mutex + working sync.WaitGroup + wg sync.WaitGroup + + requests chan interface{} + deliveryChannel chan *ubroker.Delivery + closed chan bool + closing chan bool + pending map[int32]*ubroker.Message + messages []*ubroker.Delivery + channel chan *ubroker.Delivery } -//New broker -func New(ttl time.Duration) ubroker.Broker { - result := &core{ - ttl: ttl, - wg: &sync.WaitGroup{}, - } - result.messageList = make([]coreMsg, 0) - result.deliveryChan = make(chan ubroker.Delivery, 100000) - result.idSet = make([]int, 0) - result.isClosed = make(chan bool, 1) - result.deliveryGiven = false - return result -} -func indexOf(value int, array []int) int { - for p, v := range array { - if v == value { - return p - } +type acknowledgeRequest struct { + id int32 + response chan acknowledgeResponse +} + +type acknowledgeResponse struct { + id int32 + err error +} + +type requeueRequest struct { + id int32 + response chan requeueResponse +} + +type requeueResponse struct { + id int32 + err error +} + +type publishRequest struct { + message *ubroker.Message + response chan publishResponse +} + +type publishResponse struct { + err error +} + +func (c *core) Delivery(ctx context.Context) (<-chan *ubroker.Delivery, error) { + if isCanceledContext(ctx) { + return nil, ctx.Err() } - return -1 + + if !c.startWorking() { + return nil, ubroker.ErrClosed + } + defer c.working.Done() + + return c.deliveryChannel, nil } -func (c *core) errorChecks(ctx context.Context) error { - select { - case <-c.isClosed: + +func (c *core) Acknowledge(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { return ubroker.ErrClosed - default: + } + defer c.working.Done() + + request := &acknowledgeRequest{ + id: id, + response: make(chan acknowledgeResponse, 1), } - if ctx.Err() != nil { + select { + case <-ctx.Done(): return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() + } } - return nil -} -func removeMessage(slice []coreMsg, s int) []coreMsg { - return append(slice[:s], slice[s+1:]...) } -func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - c.Lock() - defer c.Unlock() - err := c.errorChecks(ctx) - if err != nil { - return nil, errors.Wrap(err, "Delivery error") + +func (c *core) ReQueue(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() } - c.deliveryGiven = true - return c.deliveryChan, nil -} -func (c *core) Acknowledge(ctx context.Context, id int) error { - c.Lock() - defer c.Unlock() - c.wg.Add(1) - defer c.wg.Done() - err := c.errorChecks(ctx) - if err != nil { - if err == ubroker.ErrClosed { - return err - } - return errors.Wrap(err, "Acknowledge error") + if !c.startWorking() { + return ubroker.ErrClosed } - if !c.deliveryGiven { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, not delivered yet") + defer c.working.Done() + + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), } - ackMessageIndex := -1 - for index, value := range c.messageList { - if value.msgD.ID == id { - ackMessageIndex = index - break + + select { + case <-ctx.Done(): + return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() } } - if ackMessageIndex == -1 { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge error, ID not found") +} + +func (c *core) Publish(ctx context.Context, message *ubroker.Message) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed } + defer c.working.Done() - if time.Now().Sub(c.messageList[ackMessageIndex].timeInQueue) > c.ttl { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge TTL error, ID not found") + request := &publishRequest{ + message: message, + response: make(chan publishResponse, 1), } - c.messageList = removeMessage(c.messageList, ackMessageIndex) - return nil -} -func (c *core) ReQueue(ctx context.Context, id int) error { - c.Lock() - defer c.Unlock() - err := c.errorChecks(ctx) - if err != nil { - if err == ubroker.ErrClosed { - return err - } - return errors.Wrap(err, "Requeue error") + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.closed: + return ubroker.ErrClosed + case c.requests <- request: + return nil } - if !c.deliveryGiven { - return errors.Wrap(ubroker.ErrInvalidID, "Requeue error, not delivered yet") +} + +func (c *core) Close() error { + if !c.startClosing() { + return errors.New("can not close channel, closing in progress") } - c.wg.Add(1) + c.working.Wait() + close(c.closed) + c.wg.Wait() + close(c.deliveryChannel) + + return nil +} + +func (c *core) startDelivery() { defer c.wg.Done() - requeueMessageIndex := -1 - requeueMessageValue := &coreMsg{} - for index, value := range c.messageList { - if value.msgD.ID == id { - requeueMessageIndex = index - requeueMessageValue = &value - break + for { + select { + case <-c.closed: + return + + case request := <-c.requests: + if isAcknowledgeRequest(request) { + c.wg.Add(1) + req, _ := request.(*acknowledgeRequest) + req.response <- c.handleAcknowledge(req) + } else if isRequeueRequest(request) { + c.wg.Add(1) + req, _ := request.(*requeueRequest) + req.response <- c.handleRequeue(req) + } else if isPublishRequest(request) { + c.wg.Add(1) + req, _ := request.(*publishRequest) + req.response <- c.handlePublish(req) + } else { + panic(errors.New("UNKNOWN REQUEST")) + } + + case c.channel <- c.messages[0]: + if c.channel != nil { + c.pending[c.messages[0].Id] = c.messages[0].Message + c.wg.Add(1) + go c.snooze(c.messages[0].Id) + + c.messages = c.messages[1:] + if len(c.messages) == 0 { + c.channel = nil + c.messages = []*ubroker.Delivery{{}} + } + } } } - if requeueMessageIndex == -1 { - return errors.Wrap(ubroker.ErrInvalidID, "Requeue error, ID not found") +} + +func (c *core) startWorking() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + c.working.Add(1) + return true } - ttlErr := time.Now().Sub(c.messageList[requeueMessageIndex].timeInQueue) > c.ttl - requeueMessageValue.timeInQueue = time.Now() - requeueMessageValue.msgD.ID++ - for indexOf(requeueMessageValue.msgD.ID, c.idSet) != -1 { - requeueMessageValue.msgD.ID++ +} + +func (c *core) startClosing() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + close(c.closing) + return true } - c.idSet = append(c.idSet, requeueMessageValue.msgD.ID) - c.messageList = removeMessage(c.messageList, requeueMessageIndex) - c.messageList = append(c.messageList, *requeueMessageValue) +} - c.deliveryChan <- requeueMessageValue.msgD - if ttlErr { - return errors.Wrap(ubroker.ErrInvalidID, "Requeue TTL error, ID not found") +func isCanceledContext(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false } - return nil } -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - c.Lock() - defer c.Unlock() - err := c.errorChecks(ctx) - if err != nil { - if err == ubroker.ErrClosed { - return err - } - return errors.Wrap(err, "Publish error") +func isAcknowledgeRequest(request interface{}) bool { + _, ok := request.(*acknowledgeRequest) + return ok +} + +func isRequeueRequest(request interface{}) bool { + _, ok := request.(*requeueRequest) + return ok +} + +func isPublishRequest(request interface{}) bool { + _, ok := request.(*publishRequest) + return ok +} + +func (c *core) handleAcknowledge(request *acknowledgeRequest) acknowledgeResponse { + defer c.wg.Done() + _, ok := c.pending[request.id] + if !ok { + return acknowledgeResponse{id: request.id, err: ubroker.ErrInvalidID} } + delete(c.pending, request.id) + return acknowledgeResponse{id: request.id, err: nil} +} + +func (c *core) handleRequeue(request *requeueRequest) requeueResponse { + defer c.wg.Done() + message, ok := c.pending[request.id] + if !ok { + return requeueResponse{id: request.id, err: ubroker.ErrInvalidID} + } + delete(c.pending, request.id) c.wg.Add(1) + c.handlePublish(&publishRequest{ + message: message, + response: make(chan publishResponse, 1), + }) + return requeueResponse{id: request.id, err: nil} +} + +func (c *core) handlePublish(request *publishRequest) publishResponse { defer c.wg.Done() - msg := new(coreMsg) - msg.msgD.Message = message - msg.timeInQueue = time.Now() - msg.msgD.ID = 1 - for indexOf(msg.msgD.ID, c.idSet) != -1 { - msg.msgD.ID++ + + if c.channel == nil { + c.messages = []*ubroker.Delivery{} + c.channel = c.deliveryChannel } - c.idSet = append(c.idSet, msg.msgD.ID) - c.messageList = append(c.messageList, *msg) - c.deliveryChan <- msg.msgD - return nil + id := c.nextID + c.nextID++ + newDelivery := ubroker.Delivery{ + Id: id, + Message: request.message, + } + + c.messages = append(c.messages, &newDelivery) + + return publishResponse{err: nil} } -func (c *core) Close() error { - c.Lock() - defer c.Unlock() +func (c *core) snooze(id int32) { + defer c.wg.Done() + ticker := time.NewTicker(c.ttl) + defer ticker.Stop() + select { - case <-c.isClosed: - return nil - default: - c.isClosed <- true + case <-c.closed: + return + + case <-ticker.C: + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), + } + select { + case <-c.closed: + return + + case c.requests <- request: + } } - c.wg.Wait() - close(c.isClosed) - close(c.deliveryChan) - return nil } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -34,7 +34,7 @@ func BenchmarkPublish(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } } @@ -43,7 +43,7 @@ func BenchmarkDelivery(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) @@ -61,12 +61,12 @@ func BenchmarkAcknowledge(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -75,7 +75,7 @@ func BenchmarkAcknowledge(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Acknowledge(context.Background(), deliveries[i].ID)) + s.Nil(broker.Acknowledge(context.Background(), deliveries[i].Id)) } } @@ -84,12 +84,12 @@ func BenchmarkReQueue(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -98,7 +98,7 @@ func BenchmarkReQueue(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.ReQueue(context.Background(), deliveries[i].ID)) + s.Nil(broker.ReQueue(context.Background(), deliveries[i].Id)) } } @@ -123,7 +123,7 @@ func (s *CoreBrokerTestSuite) TestPublishedMessageShouldAppearInDeliveryOnce() { s.publish("hello1") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Equal("hello1", msg.Message.Body) + s.Equal("hello1", string(msg.Message.Body)) s.assertEmpty(delivery) } @@ -133,13 +133,13 @@ func (s *CoreBrokerTestSuite) TestPublishShouldPreserveOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.NotEqual(messages[0].ID, messages[1].ID) - s.NotEqual(messages[0].ID, messages[2].ID) - s.NotEqual(messages[1].ID, messages[2].ID) - s.Equal("hello1", messages[0].Message.Body) - s.Equal("hello2", messages[1].Message.Body) - s.Equal("hello3", messages[2].Message.Body) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.NotEqual(messages[0].Id, messages[1].Id) + s.NotEqual(messages[0].Id, messages[2].Id) + s.NotEqual(messages[1].Id, messages[2].Id) + s.Equal("hello1", string(messages[0].Message.Body)) + s.Equal("hello2", string(messages[1].Message.Body)) + s.Equal("hello3", string(messages[2].Message.Body)) } func (s *CoreBrokerTestSuite) TestDeliveriesShouldBeUnique() { @@ -153,7 +153,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeAcknowledgeablePreemptively( s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.Acknowledge(context.Background(), int32(id))) } } @@ -161,7 +162,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeQueueablePreemptively() { s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.ReQueue(context.Background(), int32(id))) } } @@ -169,15 +171,15 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeAcknowledgeable() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeAcknowledgedTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndependently() { @@ -186,10 +188,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndep s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.Acknowledge(context.Background(), messages[0].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[1].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.Acknowledge(context.Background(), messages[0].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[1].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[2].Id)) } func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() { @@ -197,7 +199,7 @@ func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() s.publish("hello") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) s.assertEmpty(delivery) } @@ -206,9 +208,9 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeReQueueable() { s.publish("hello") delivery := s.getDelivery(context.Background()) msg1 := <-delivery - s.Nil(s.broker.ReQueue(context.Background(), msg1.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg1.Id)) msg2 := <-delivery - s.NotEqual(msg1.ID, msg2.ID) + s.NotEqual(msg1.Id, msg2.Id) s.Equal(msg1.Message.Body, msg2.Message.Body) } @@ -216,8 +218,8 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeReQueueableTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.ReQueue(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndependently() { @@ -226,10 +228,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndepende s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) @@ -241,10 +243,10 @@ func (s *CoreBrokerTestSuite) TestReQueueCouldBreakOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) @@ -256,32 +258,35 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldReQueueUponHalfSecondTTL() { delivery := s.getDelivery(context.Background()) msg1 := <-delivery time.Sleep(250 * time.Millisecond) - s.Nil(s.broker.Acknowledge(context.Background(), msg1.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg1.Id)) s.publish("hello2") msg2 := <-delivery time.Sleep(750 * time.Millisecond) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.ID)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.Id)) msg3 := <-delivery - s.NotEqual(msg1.ID, msg3.ID) - s.NotEqual(msg2.ID, msg3.ID) - s.Equal("hello2", msg3.Message.Body) - s.Nil(s.broker.Acknowledge(context.Background(), msg3.ID)) + s.NotEqual(msg1.Id, msg3.Id) + s.NotEqual(msg2.Id, msg3.Id) + s.Equal("hello2", string(msg3.Message.Body)) + s.Nil(s.broker.Acknowledge(context.Background(), msg3.Id)) } func (s *CoreBrokerTestSuite) TestPublishShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Publish(context.Background(), ubroker.Message{})) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Publish(context.Background(), &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestAcknowledgeShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Acknowledge(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Acknowledge(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestReQueueShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.ReQueue(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.ReQueue(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldFailOnClosedBroker() { @@ -324,7 +329,8 @@ func (s *CoreBrokerTestSuite) TestPublishShouldFailOnCanceledContext() { s.prepareTest(1 * time.Second) ctx, cancel := context.WithCancel(context.Background()) cancel() - s.assertErrorEquals(ctx.Err(), s.broker.Publish(ctx, ubroker.Message{})) + s.assertErrorEquals(ctx.Err(), + s.broker.Publish(ctx, &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestDataRace() { @@ -344,8 +350,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { return default: - err := s.broker.Publish(context.Background(), ubroker.Message{ - Body: fmt.Sprint(rand.Intn(1000)), + err := s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(fmt.Sprint(rand.Intn(1000))), }) if err == ubroker.ErrClosed { return @@ -374,8 +380,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.Acknowledge(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.Acknowledge(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -402,8 +412,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.ReQueue(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.ReQueue(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -427,8 +441,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { } func (s *CoreBrokerTestSuite) publish(body string) { - s.Nil(s.broker.Publish(context.Background(), ubroker.Message{ - Body: body, + s.Nil(s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(body), })) } @@ -440,7 +454,7 @@ func (s *CoreBrokerTestSuite) assertErrorEquals(expected error, actual error) { } } -func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.Delivery { +func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan *ubroker.Delivery { result, err := s.broker.Delivery(ctx) if err != nil { s.FailNow(err.Error(), "could not obtain delivery") @@ -449,7 +463,7 @@ func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.De return result } -func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan ubroker.Delivery) { +func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan *ubroker.Delivery) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() diff --git a/internal/server/grpc.go b/internal/server/grpc.go new file mode 100644 index 0000000..2b393ed --- /dev/null +++ b/internal/server/grpc.go @@ -0,0 +1,36 @@ +package server + +import ( + "context" + + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type grpcServicer struct { + broker ubroker.Broker +} + +func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { + return &grpcServicer{ + broker: broker, + } +} + +func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { + return status.Error(codes.Unimplemented, "not implemented") +} + +func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { + return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") +} + +func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { + return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") +} + +func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { + return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") +} diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go new file mode 100644 index 0000000..ec28f00 --- /dev/null +++ b/internal/server/grpc_test.go @@ -0,0 +1,383 @@ +package server_test + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/mock" + + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/phayes/freeport" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/stretchr/testify/suite" +) + +type GRPCServerTestSuite struct { + suite.Suite +} + +func TestGRPCServerTestSuite(t *testing.T) { + suite.Run(t, new(GRPCServerTestSuite)) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + result, err := stream.Recv() + s.Nil(err) + + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err := stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfInvalidID() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(ubroker.ErrInvalidID) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.InvalidArgument, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnOKOnSuccess() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnUnavailableIfInvalidID() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(ubroker.ErrInvalidID) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.InvalidArgument, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnOKOnSuccess() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestPublishShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Publish(ctx, &ubroker.Message{ + Body: []byte("hello"), + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestPublishShouldReturnOKIfSuccessful() { + broker := &mockBroker{} + broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Publish(ctx, &ubroker.Message{ + Body: []byte("hello"), + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) makeChannel(args ...string) <-chan *ubroker.Delivery { + result := make(chan *ubroker.Delivery, len(args)) + var id int32 + + for _, arg := range args { + result <- &ubroker.Delivery{ + Id: id, + Message: &ubroker.Message{ + Body: []byte(arg), + }, + } + + id = id + 1 + } + + close(result) + return result +} + +func (s *GRPCServerTestSuite) assertStatusCode(code codes.Code, err error) { + if code == codes.OK { + s.Nil(err) + return + } + + grpcStatus, ok := status.FromError(err) + s.True(ok) + + if grpcStatus != nil { + s.Equal(code, grpcStatus.Code()) + } +} + +func (s *GRPCServerTestSuite) runTest( + tester func(ctx context.Context, client ubroker.BrokerClient), + broker ubroker.Broker) { + + port, err := freeport.GetFreePort() + if err != nil { + s.FailNow(err.Error(), "failed to obtain free port") + } + + endpoint := fmt.Sprintf("127.0.0.1:%d", port) + servicer := server.NewGRPC(broker) + + grpcServer := grpc.NewServer() + ubroker.RegisterBrokerServer(grpcServer, servicer) + + listener, err := net.Listen("tcp", endpoint) + if err != nil { + s.FailNow(err.Error(), "failed to open listener") + } + + go func() { + grpcServer.Serve(listener) + }() + + dialCtx, dialCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dialCtxCancel() + + conn, err := grpc.DialContext(dialCtx, endpoint, + grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + s.FailNow(err.Error(), "failed to dial to gRPC‌ server") + } + + client := ubroker.NewBrokerClient(conn) + + clientCtx, clientCtxCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer clientCtxCancel() + + tester(clientCtx, client) + + err = conn.Close() + if err != nil { + s.FailNow(err.Error(), "failed to properly close gRPC‌ client connection") + } + + grpcServer.GracefulStop() +} diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "regexp" "strconv" @@ -13,6 +12,7 @@ import ( "sync" "time" + "github.com/golang/protobuf/jsonpb" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -37,7 +37,7 @@ func init() { type httpServer struct { broker ubroker.Broker router *mux.Router - delivery <-chan ubroker.Delivery + delivery <-chan *ubroker.Delivery endpoint string server *http.Server mutex sync.Mutex @@ -126,14 +126,8 @@ func (s *httpServer) ServeHTTP(writer http.ResponseWriter, request *http.Request } func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Request) { - data, err := ioutil.ReadAll(request.Body) - if err != nil { - s.handleError(writer, request, err) - return - } - var msg ubroker.Message - err = json.Unmarshal(data, &msg) + err := jsonpb.Unmarshal(request.Body, &msg) if err != nil { s.handleError(writer, request, err) } @@ -141,7 +135,7 @@ func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Publish(ctx, msg) + err = s.broker.Publish(ctx, &msg) if err != nil { s.handleError(writer, request, err) } @@ -166,7 +160,7 @@ func (s *httpServer) handleAcknowledge(writer http.ResponseWriter, request *http ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Acknowledge(ctx, id) + err = s.broker.Acknowledge(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return @@ -192,7 +186,7 @@ func (s *httpServer) handleReQueue(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.ReQueue(ctx, id) + err = s.broker.ReQueue(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..aff3746 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -1,7 +1,6 @@ package server_test import ( - "context" "fmt" "io/ioutil" "net/http" @@ -15,35 +14,6 @@ import ( "github.com/stretchr/testify/suite" ) -type mockBroker struct { - mock.Mock -} - -func (m *mockBroker) Close() error { - args := m.Called() - return args.Error(0) -} - -func (m *mockBroker) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - args := m.Called(ctx) - return args.Get(0).(<-chan ubroker.Delivery), args.Error(1) -} - -func (m *mockBroker) Acknowledge(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) ReQueue(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) Publish(ctx context.Context, message ubroker.Message) error { - args := m.Called(ctx, message) - return args.Error(0) -} - type HTTPServerTestSuite struct { suite.Suite t *testing.T @@ -57,14 +27,14 @@ func TestHTTPServerTestSuite(t *testing.T) { func (s *HTTPServerTestSuite) prepareTest() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() } func (s *HTTPServerTestSuite) TestEmptyFetch() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() @@ -73,63 +43,63 @@ func (s *HTTPServerTestSuite) TestEmptyFetch() { func (s *HTTPServerTestSuite) TestPublish() { s.prepareTest() - s.broker.On("Publish", mock.Anything, mock.Anything).Return(nil) - s.httpPublish(`{"body": "hello"}`) - s.broker.AssertCalled( - s.t, - "Publish", - mock.Anything, - ubroker.Message{Body: "hello"}, - ) + + s.broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Return(nil) + + s.httpPublish(`{"body":"aGVsbG8="}`) + s.broker.AssertExpectations(s.T()) } func (s *HTTPServerTestSuite) TestFailedReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpReQueue(123, 400, &body) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(nil) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(nil) s.httpReQueue(123, 200, nil) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestFailedAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpAcknowledge(123, 400, &body) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(nil) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(nil) s.httpAcknowledge(123, 200, nil) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go new file mode 100644 index 0000000..d3a0fa4 --- /dev/null +++ b/internal/server/moc_broker_test.go @@ -0,0 +1,44 @@ +package server_test + +import ( + "context" + + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/stretchr/testify/mock" +) + +type mockBroker struct { + mock.Mock +} + +func (m *mockBroker) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockBroker) Delivery(ctx context.Context) (<-chan *ubroker.Delivery, error) { + args := m.Called(ctx) + + var res0 <-chan *ubroker.Delivery + + if args.Get(0) != nil { + res0 = args.Get(0).(<-chan *ubroker.Delivery) + } + + return res0, args.Error(1) +} + +func (m *mockBroker) Acknowledge(ctx context.Context, id int32) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) ReQueue(ctx context.Context, id int32) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) Publish(ctx context.Context, message *ubroker.Message) error { + args := m.Called(ctx, message) + return args.Error(0) +} diff --git a/pkg/ubroker/ubroker.go b/pkg/ubroker/ubroker.go index 92de13c..cbc090d 100644 --- a/pkg/ubroker/ubroker.go +++ b/pkg/ubroker/ubroker.go @@ -9,7 +9,7 @@ import ( // Broker interface defines functionalities of a // message broker system. // -// We require our message broker to timeout and requeue +// we require our message broker to timeout and requeue // unacknowledged messages automatically. We also require // that implementations should be thread-safe. type Broker interface { @@ -27,13 +27,13 @@ type Broker interface { // messages to consumers. // We require following: // - // 1. Resulting read-only channel is unique (it does + // 1. Resulting read-only channel is unique (it doesn // not change each time you call it) // 2. If `ctx` is canceled or timed out, `ctx.Err()` is // returned // 3. If broker is closed, `ErrClosed` is returned // 4. should be thread-safe - Delivery(ctx context.Context) (<-chan Delivery, error) + Delivery(ctx context.Context) (<-chan *Delivery, error) // Acknowledge is called by clients to declare that // specified message id has been successfuly processed @@ -48,7 +48,7 @@ type Broker interface { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe - Acknowledge(ctx context.Context, id int) error + Acknowledge(ctx context.Context, id int32) error // ReQueue is called by clients to declare that // specified message id should be put back in @@ -62,7 +62,7 @@ type Broker interface { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe - ReQueue(ctx context.Context, id int) error + ReQueue(ctx context.Context, id int32) error // Publish is used to enqueue a new message to broker // We demand following: @@ -71,7 +71,7 @@ type Broker interface { // returned // 2. If broker is closed, `ErrClosed` is returned // 3. should be thread-safe - Publish(ctx context.Context, message Message) error + Publish(ctx context.Context, message *Message) error } // HTTPServer defines an HTTP‌ API‌ server provider @@ -81,19 +81,3 @@ type HTTPServer interface { Run() error } - -// Message encapsulates a queued message -type Message struct { - // Body is an abitrary client-defined string - Body string `json:"body"` -} - -// Delivery encapsulates a message fetched -// from queue -type Delivery struct { - // Message is the message fetched from queue - Message Message `json:"message"` - - // ID - ID int `json:"id"` -} From abac78ebd3bc77d5ebd93f265467a3747ad596ff Mon Sep 17 00:00:00 2001 From: sneyes Date: Mon, 13 May 2019 14:54:22 +0430 Subject: [PATCH 16/21] Namespace updates --- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 +++--- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/grpc.go | 2 +- internal/server/grpc_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- internal/server/moc_broker_test.go | 2 +- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/api/ubroker.proto b/api/ubroker.proto index 7b2c23c..088dba4 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; +option go_package = "github.com/sneyes/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 95e9562..4047671 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index c040e9b..67481bf 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 0c3780b..9c07490 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..396a8cb 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index ec28f00..6549772 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/server" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index 1badf2c..b02836d 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index aff3746..400989d 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/server" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index d3a0fa4..1388cee 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" ) From d3847994a7107e1c7cfb11953c8d714b9fefc2a1 Mon Sep 17 00:00:00 2001 From: sneyes Date: Thu, 23 May 2019 15:27:19 +0430 Subject: [PATCH 17/21] gRPC ready --- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 ++--- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 +-- internal/server/grpc.go | 41 ++++++++++++++++++++++++++---- internal/server/grpc_test.go | 4 +-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 +-- internal/server/moc_broker_test.go | 2 +- 9 files changed, 49 insertions(+), 18 deletions(-) diff --git a/api/ubroker.proto b/api/ubroker.proto index 088dba4..7b2c23c 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/sneyes/ubroker/pkg/ubroker"; +option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 4047671..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 67481bf..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9c07490..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 396a8cb..bcf6400 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -13,6 +13,17 @@ type grpcServicer struct { broker ubroker.Broker } +func getError(msg error) error { + switch msg { + case nil: + return nil + case ubroker.ErrClosed: + return status.Error(codes.Unavailable, "Broker is closed") + default: + return status.Error(codes.Unknown, "Unknown Error") + } +} + func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { return &grpcServicer{ broker: broker, @@ -20,17 +31,37 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { } func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") + delivery, errMsg := s.broker.Delivery(context.Background()) + if errMsg != nil { + return getError(errMsg) + } + + for { + _, streamError := stream.Recv() + if streamError != nil { + return getError(streamError) + } + + delMsg, delSuccess := <-delivery + if delSuccess { + stream.Send(delMsg) + } else { + return getError(ubroker.ErrClosed) + } + } } func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + errMsg := s.broker.Acknowledge(ctx, request.GetId()) + return &empty.Empty{}, getError(errMsg) } func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + errMsg := s.broker.ReQueue(ctx, request.GetId()) + return &empty.Empty{}, getError(errMsg) } func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + errMsg := s.broker.Publish(ctx, request) + return &empty.Empty{}, getError(errMsg) } diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index 6549772..ec28f00 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/sneyes/ubroker/internal/server" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index b02836d..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 400989d..aff3746 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/sneyes/ubroker/internal/server" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index 1388cee..d3a0fa4 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" ) From 029b7dc5d18dadc957c39234cf4a6f8d578a56ca Mon Sep 17 00:00:00 2001 From: sneyes Date: Thu, 23 May 2019 15:40:29 +0430 Subject: [PATCH 18/21] Go ENV fix --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index db00acb..cd4755e 100644 --- a/Makefile +++ b/Makefile @@ -32,5 +32,6 @@ pkg/ubroker/ubroker.pb.go: api/ubroker.proto | .pre-check-go go get -v github.com/golang/protobuf/protoc-gen-go go get -v github.com/vektra/mockery/.../ +GOPATH ?= $(shell go env GOPATH) PROTOC ?= protoc PROTOC_OPTIONS ?= From c2d06d9626f630470952e28ef4becd66701c8fe8 Mon Sep 17 00:00:00 2001 From: sneyes Date: Thu, 23 May 2019 15:54:47 +0430 Subject: [PATCH 19/21] one error was missing --- internal/server/grpc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/server/grpc.go b/internal/server/grpc.go index bcf6400..a78d4c2 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -19,6 +19,8 @@ func getError(msg error) error { return nil case ubroker.ErrClosed: return status.Error(codes.Unavailable, "Broker is closed") + case ubroker.ErrInvalidID: + return status.Error(codes.InvalidArgument, "Invalid Id") default: return status.Error(codes.Unknown, "Unknown Error") } From 32b2f0c91b5e25c73fd479db214fc426f7c781e3 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 25 May 2019 00:44:31 +0430 Subject: [PATCH 20/21] My last try to fix travis error --- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 +++--- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/grpc.go | 2 +- internal/server/grpc_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- internal/server/moc_broker_test.go | 2 +- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/api/ubroker.proto b/api/ubroker.proto index 7b2c23c..088dba4 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; +option go_package = "github.com/sneyes/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 95e9562..4047671 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index c040e9b..67481bf 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 0c3780b..9c07490 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/broker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index a78d4c2..2be825c 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index ec28f00..6549772 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/server" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index 1badf2c..b02836d 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index aff3746..400989d 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/internal/server" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index d3a0fa4..1388cee 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/sneyes/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" ) From 2ebdb18dba129d1f69884e1e83e7c7a167a81a09 Mon Sep 17 00:00:00 2001 From: sneyes Date: Sat, 25 May 2019 21:40:23 +0430 Subject: [PATCH 21/21] Testing the other way around --- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 +++--- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/grpc.go | 2 +- internal/server/grpc_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- internal/server/moc_broker_test.go | 2 +- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/api/ubroker.proto b/api/ubroker.proto index 088dba4..7b2c23c 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/sneyes/ubroker/pkg/ubroker"; +option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 4047671..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 67481bf..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9c07490..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/sneyes/ubroker/internal/broker" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2be825c..a78d4c2 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index 6549772..ec28f00 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/sneyes/ubroker/internal/server" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index b02836d..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 400989d..aff3746 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/sneyes/ubroker/internal/server" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index 1388cee..d3a0fa4 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/sneyes/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" )