From 514232e1a7a6a03f582cfaae441f28d37e99169f Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Thu, 11 Apr 2019 20:53:19 +0430 Subject: [PATCH 01/21] add .idea to .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 620a673..305ac46 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/ubroker \ No newline at end of file +/ubroker +.idea \ No newline at end of file From 041e71ad402130b11b52288294b2813c61c108e1 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Thu, 11 Apr 2019 20:53:41 +0430 Subject: [PATCH 02/21] Change path names to my workstation --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..cc2be3e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/meshkati/ubroker/internal/broker" + "github.com/meshkati/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..dec12f6 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..fe5b0c0 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/internal/broker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1f336d2 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..95a4b1d 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/internal/server" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From e32592aba53529f2283181c878362b8acb307346 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 15:02:42 +0430 Subject: [PATCH 03/21] internal/broker: Implement filterContextError() To check context problems --- internal/broker/core.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index dec12f6..e5f4130 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -10,7 +10,7 @@ import ( // New creates a new instance of ubroker.Broker // with given `ttl`. `ttl` determines time in which -// we requeue an unacknowledged/unrequeued message +// we requeue an unacknowledged/un-re-queued message // automatically. func New(ttl time.Duration) ubroker.Broker { return &core{} @@ -44,3 +44,12 @@ func (c *core) Close() error { // TODO:‌ implement me return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") } + +// Checks if the context has an error, returns true if the deadline has exceeded, or context had canceled +func filterContextError(ctx context.Context) error { + if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled { + return ctx.Err() + } + + return nil +} From 2e79839041737cd8ee7a0b5149e1911930c5bcf1 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 15:25:58 +0430 Subject: [PATCH 04/21] internal/broker: Implement basic Delivery() --- internal/broker/core.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index e5f4130..beb6837 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -13,16 +13,35 @@ import ( // we requeue an unacknowledged/un-re-queued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + return &core{ + sequenceNumber: -1, + mainChannel: make(chan ubroker.Delivery), + ttl: ttl, + closed: false, + } } type core struct { // TODO: add required fields + sequenceNumber int + mainChannel chan ubroker.Delivery + + ttl time.Duration + + closed bool } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + // check if context has error + if err := filterContextError(ctx); err != nil { + return nil, ctx.Err() + } + // checking the broker + if c.closed { + return nil, errors.Wrap(ubroker.ErrClosed, "The broker is closed.") + } + + return c.mainChannel, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { @@ -42,7 +61,9 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { func (c *core) Close() error { // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + c.closed = true + + return nil } // Checks if the context has an error, returns true if the deadline has exceeded, or context had canceled From de37047bd40ae333de5e512aa6f98b1c2b9a9ee3 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 16:42:10 +0430 Subject: [PATCH 05/21] internal/broker: Implement basic ttlHandler() --- internal/broker/core.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/broker/core.go b/internal/broker/core.go index beb6837..48c8243 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -74,3 +74,14 @@ func filterContextError(ctx context.Context) error { return nil } + +// Sets a timeout for TTL and re-queue the message after that time +func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { + // TODO: Handle race condition + select { + case <- time.After(c.ttl): + // TODO: re-queue + case <- c.ackMap[delivery.ID]: + return + } +} From d6c4392d308b0ee1396eedfb27d92ff2f18d25fe Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 16:43:04 +0430 Subject: [PATCH 06/21] internal/broker: Implement basic Publish() --- internal/broker/core.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 48c8243..1aac965 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -16,6 +16,7 @@ func New(ttl time.Duration) ubroker.Broker { return &core{ sequenceNumber: -1, mainChannel: make(chan ubroker.Delivery), + ackMap: make(map[int]chan bool), ttl: ttl, closed: false, } @@ -23,8 +24,9 @@ func New(ttl time.Duration) ubroker.Broker { type core struct { // TODO: add required fields - sequenceNumber int - mainChannel chan ubroker.Delivery + sequenceNumber int + mainChannel chan ubroker.Delivery + ackMap map[int]chan bool ttl time.Duration @@ -55,8 +57,27 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + // check if context has error + if err := filterContextError(ctx); err != nil { + return err + } + // checking the broker + if c.closed { + return errors.Wrap(ubroker.ErrClosed, "Publish:: The broker is closed.") + } + // Pushing into the channel + // TODO: Handle race condition on the sequence number + c.sequenceNumber++ + delivery := ubroker.Delivery{ + ID: c.sequenceNumber, + Message: message, + } + c.ackMap[delivery.ID] = make(chan bool, 1) + go c.ttlHandler(ctx, delivery) + // push the message to channel + c.mainChannel <- delivery + + return nil } func (c *core) Close() error { From 29835cd52f0cf670a0e6b3833f7b920674ffc42d Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 16:45:56 +0430 Subject: [PATCH 07/21] internal/broker: Delete delivery ACK channel from map after ACK received --- internal/broker/core.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/broker/core.go b/internal/broker/core.go index 1aac965..557c878 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -103,6 +103,7 @@ func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { case <- time.After(c.ttl): // TODO: re-queue case <- c.ackMap[delivery.ID]: + delete(c.ackMap, delivery.ID) return } } From 1275a4f9d13148bc5e4532cc8d923d7aa3d26dc0 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 17:06:16 +0430 Subject: [PATCH 08/21] internal/broker: Implement basic Acknowledge() --- internal/broker/core.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 557c878..658219e 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -16,7 +16,7 @@ func New(ttl time.Duration) ubroker.Broker { return &core{ sequenceNumber: -1, mainChannel: make(chan ubroker.Delivery), - ackMap: make(map[int]chan bool), + ackMap: make(map[int]chan bool), ttl: ttl, closed: false, } @@ -24,9 +24,9 @@ func New(ttl time.Duration) ubroker.Broker { type core struct { // TODO: add required fields - sequenceNumber int - mainChannel chan ubroker.Delivery - ackMap map[int]chan bool + sequenceNumber int + mainChannel chan ubroker.Delivery + ackMap map[int]chan bool ttl time.Duration @@ -47,8 +47,27 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + // check if context has error + if err := filterContextError(ctx); err != nil { + return err + } + // checking the broker + if c.closed { + return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") + } + // check if the id is not existed + // TODO: Handle race condition + if id > c.sequenceNumber { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.") + } + // check if it's going to re-acknowledgment + if _, ok := c.ackMap[id]; !ok { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: This id has been ACKed before.") + } + // everything is "probably" Ok, so we're going to mark the ACK + c.ackMap[id] <- true + + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { @@ -100,9 +119,9 @@ func filterContextError(ctx context.Context) error { func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { // TODO: Handle race condition select { - case <- time.After(c.ttl): + case <-time.After(c.ttl): // TODO: re-queue - case <- c.ackMap[delivery.ID]: + case <-c.ackMap[delivery.ID]: delete(c.ackMap, delivery.ID) return } From c361d5e503b15c8dcc7a9b698eda1e5852505202 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 18:44:09 +0430 Subject: [PATCH 09/21] internal/broker: Implement basic ReQueue() --- internal/broker/core.go | 43 ++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 658219e..802a0bf 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -17,8 +17,10 @@ func New(ttl time.Duration) ubroker.Broker { sequenceNumber: -1, mainChannel: make(chan ubroker.Delivery), ackMap: make(map[int]chan bool), - ttl: ttl, - closed: false, + pendingMap: make(map[int]ubroker.Delivery), + + ttl: ttl, + closed: false, } } @@ -27,6 +29,7 @@ type core struct { sequenceNumber int mainChannel chan ubroker.Delivery ackMap map[int]chan bool + pendingMap map[int]ubroker.Delivery ttl time.Duration @@ -55,14 +58,14 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") } - // check if the id is not existed + // check if the id is not exists // TODO: Handle race condition if id > c.sequenceNumber { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.") } // check if it's going to re-acknowledgment if _, ok := c.ackMap[id]; !ok { - return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: This id has been ACKed before.") + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+"has been ACKed before.") } // everything is "probably" Ok, so we're going to mark the ACK c.ackMap[id] <- true @@ -71,8 +74,31 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + // check if context has error + if err := filterContextError(ctx); err != nil { + return err + } + // checking the broker + if c.closed { + return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.") + } + // check if the id is not exists + // TODO: Handle race condition + if id > c.sequenceNumber { + return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") + } + // check if it's going to re-queue the message + if _, ok := c.ackMap[id]; ok { + return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") + } + // everything is "probably" Ok, so we're going to put the message in queue + c.ackMap[id] = make(chan bool, 1) + tDelivery := c.pendingMap[id] + go c.ttlHandler(ctx, tDelivery) + // pushing the message into the main channel + c.mainChannel <- tDelivery + + return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { @@ -117,12 +143,15 @@ func filterContextError(ctx context.Context) error { // Sets a timeout for TTL and re-queue the message after that time func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { + // persisting the un-acknowledged message + c.pendingMap[delivery.ID] = delivery // TODO: Handle race condition select { case <-time.After(c.ttl): - // TODO: re-queue + _ = c.ReQueue(ctx, delivery.ID) case <-c.ackMap[delivery.ID]: delete(c.ackMap, delivery.ID) + delete(c.pendingMap, delivery.ID) return } } From 0bd6154ec26bf61fd1065fe212cb3a91352105da Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 20:43:29 +0430 Subject: [PATCH 10/21] internal/broker: Consider negative IDs as invalid --- internal/broker/core.go | 6 +++--- pkg/ubroker/ubroker.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 802a0bf..7c6f2dc 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -15,7 +15,7 @@ import ( func New(ttl time.Duration) ubroker.Broker { return &core{ sequenceNumber: -1, - mainChannel: make(chan ubroker.Delivery), + mainChannel: make(chan ubroker.Delivery, 100), ackMap: make(map[int]chan bool), pendingMap: make(map[int]ubroker.Delivery), @@ -60,7 +60,7 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } // check if the id is not exists // TODO: Handle race condition - if id > c.sequenceNumber { + if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.") } // check if it's going to re-acknowledgment @@ -84,7 +84,7 @@ func (c *core) ReQueue(ctx context.Context, id int) error { } // check if the id is not exists // TODO: Handle race condition - if id > c.sequenceNumber { + if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") } // check if it's going to re-queue the message diff --git a/pkg/ubroker/ubroker.go b/pkg/ubroker/ubroker.go index 18268cd..a6bca1e 100644 --- a/pkg/ubroker/ubroker.go +++ b/pkg/ubroker/ubroker.go @@ -27,7 +27,7 @@ type Broker interface { // messages to consumers. // We require following: // - // 1. Resulting read-only channel is unique (it doesn + // 1. Resulting read-only channel is unique (it does // not change each time you call it) // 2. If `ctx` is canceled or timed out, `ctx.Err()` is // returned @@ -36,8 +36,8 @@ type Broker interface { Delivery(ctx context.Context) (<-chan Delivery, error) // Acknowledge is called by clients to declare that - // specified message id has been successfuly processed - // and should not be requeued to queue and we have to + // specified message id has been successfully processed + // and should not be re-queued to queue and we have to // remove it. // We demand following: // @@ -84,7 +84,7 @@ type HTTPServer interface { // Message encapsulates a queued message type Message struct { - // Body is an abitrary client-defined string + // Body is an arbitrary client-defined string Body string `json:"body"` } From 3ea4b740e84dc0e5fdd30f0d06231f27a752141c Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 21:51:16 +0430 Subject: [PATCH 11/21] internal/broker: Add mutex on sequenceNumber --- internal/broker/core.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 7c6f2dc..7fc9244 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,6 +2,7 @@ package broker import ( "context" + "sync" "time" "github.com/meshkati/ubroker/pkg/ubroker" @@ -32,7 +33,7 @@ type core struct { pendingMap map[int]ubroker.Delivery ttl time.Duration - + sequenceMutex sync.Mutex closed bool } @@ -58,8 +59,10 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // check if the id is not exists - // TODO: Handle race condition if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.") } @@ -82,8 +85,10 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.") } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // check if the id is not exists - // TODO: Handle race condition if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") } @@ -110,8 +115,10 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "Publish:: The broker is closed.") } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // Pushing into the channel - // TODO: Handle race condition on the sequence number c.sequenceNumber++ delivery := ubroker.Delivery{ ID: c.sequenceNumber, From da22f265212450dae86ea3db340e537202b558d9 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 21:51:44 +0430 Subject: [PATCH 12/21] internal/broker: Generate new ID on ReQueue() --- internal/broker/core.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 7fc9244..dcf569c 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -18,7 +18,7 @@ func New(ttl time.Duration) ubroker.Broker { sequenceNumber: -1, mainChannel: make(chan ubroker.Delivery, 100), ackMap: make(map[int]chan bool), - pendingMap: make(map[int]ubroker.Delivery), + pendingMap: make(map[int]ubroker.Message), ttl: ttl, closed: false, @@ -30,11 +30,11 @@ type core struct { sequenceNumber int mainChannel chan ubroker.Delivery ackMap map[int]chan bool - pendingMap map[int]ubroker.Delivery + pendingMap map[int]ubroker.Message - ttl time.Duration + ttl time.Duration sequenceMutex sync.Mutex - closed bool + closed bool } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { @@ -93,12 +93,20 @@ func (c *core) ReQueue(ctx context.Context, id int) error { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") } // check if it's going to re-queue the message - if _, ok := c.ackMap[id]; ok { - return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") - } + //if _, ok := c.ackMap[id]; ok { + // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") + //} + // everything is "probably" Ok, so we're going to put the message in queue c.ackMap[id] = make(chan bool, 1) - tDelivery := c.pendingMap[id] + tMessage := c.pendingMap[id] + + c.sequenceNumber++ + tDelivery := ubroker.Delivery{ + Message: tMessage, + ID: c.sequenceNumber, + } + go c.ttlHandler(ctx, tDelivery) // pushing the message into the main channel c.mainChannel <- tDelivery @@ -151,7 +159,7 @@ func filterContextError(ctx context.Context) error { // Sets a timeout for TTL and re-queue the message after that time func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { // persisting the un-acknowledged message - c.pendingMap[delivery.ID] = delivery + c.pendingMap[delivery.ID] = delivery.Message // TODO: Handle race condition select { case <-time.After(c.ttl): From 4e2307e60471efd2ee93c85df1e034f91efa49e6 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 22:07:52 +0430 Subject: [PATCH 13/21] internal/broker: Check if the delivery is started or not, in ACK and REQ --- internal/broker/core.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index dcf569c..30e8557 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -27,14 +27,14 @@ func New(ttl time.Duration) ubroker.Broker { type core struct { // TODO: add required fields - sequenceNumber int - mainChannel chan ubroker.Delivery - ackMap map[int]chan bool - pendingMap map[int]ubroker.Message - - ttl time.Duration - sequenceMutex sync.Mutex - closed bool + sequenceNumber int + mainChannel chan ubroker.Delivery + ackMap map[int]chan bool + pendingMap map[int]ubroker.Message + ttl time.Duration + sequenceMutex sync.Mutex + closed bool + deliveryStarted bool } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { @@ -47,6 +47,8 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { return nil, errors.Wrap(ubroker.ErrClosed, "The broker is closed.") } + c.deliveryStarted = true + return c.mainChannel, nil } @@ -59,6 +61,10 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") } + // check if delivery started + if !c.deliveryStarted { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet") + } // handling race condition c.sequenceMutex.Lock() defer c.sequenceMutex.Unlock() @@ -85,6 +91,10 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if c.closed { return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.") } + // check if delivery started + if !c.deliveryStarted { + return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet") + } // handling race condition c.sequenceMutex.Lock() defer c.sequenceMutex.Unlock() @@ -104,7 +114,7 @@ func (c *core) ReQueue(ctx context.Context, id int) error { c.sequenceNumber++ tDelivery := ubroker.Delivery{ Message: tMessage, - ID: c.sequenceNumber, + ID: c.sequenceNumber, } go c.ttlHandler(ctx, tDelivery) From 501d7e355806f3ba8e373f65914e49b0706f50b2 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 22:29:14 +0430 Subject: [PATCH 14/21] internal/broker: Closing the main channel on Broker.close() --- internal/broker/core.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/broker/core.go b/internal/broker/core.go index 30e8557..909b971 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -153,6 +153,7 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { func (c *core) Close() error { // TODO:‌ implement me c.closed = true + close(c.mainChannel) return nil } From e124d9c7949ee35dd061d3768735523b865f99ba Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Fri, 12 Apr 2019 23:04:06 +0430 Subject: [PATCH 15/21] internal/broker: Add pendingMutex for the pending map --- internal/broker/core.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 909b971..507fb08 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -33,6 +33,7 @@ type core struct { pendingMap map[int]ubroker.Message ttl time.Duration sequenceMutex sync.Mutex + pendingMutex sync.Mutex closed bool deliveryStarted bool } @@ -108,7 +109,6 @@ func (c *core) ReQueue(ctx context.Context, id int) error { //} // everything is "probably" Ok, so we're going to put the message in queue - c.ackMap[id] = make(chan bool, 1) tMessage := c.pendingMap[id] c.sequenceNumber++ @@ -116,10 +116,14 @@ func (c *core) ReQueue(ctx context.Context, id int) error { Message: tMessage, ID: c.sequenceNumber, } + // setting the acknowledge channel + c.ackMap[c.sequenceNumber] = make(chan bool, 1) go c.ttlHandler(ctx, tDelivery) // pushing the message into the main channel c.mainChannel <- tDelivery + // invalidate the previous id + c.ackMap[id] <- false return nil } @@ -169,13 +173,18 @@ func filterContextError(ctx context.Context) error { // Sets a timeout for TTL and re-queue the message after that time func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { - // persisting the un-acknowledged message + c.pendingMutex.Lock() c.pendingMap[delivery.ID] = delivery.Message + c.pendingMutex.Unlock() // TODO: Handle race condition select { case <-time.After(c.ttl): _ = c.ReQueue(ctx, delivery.ID) case <-c.ackMap[delivery.ID]: + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() + delete(c.ackMap, delivery.ID) delete(c.pendingMap, delivery.ID) return From b97d27ef580d1e6e0290b61a60a31835ee862ffd Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 00:55:39 +0430 Subject: [PATCH 16/21] internal/broker: Solve race condition on multiple Acknowledge() --- internal/broker/core.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 507fb08..e4e98e1 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -79,6 +79,9 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { } // everything is "probably" Ok, so we're going to mark the ACK c.ackMap[id] <- true + // removing the message id from maps + delete(c.ackMap, id) + delete(c.pendingMap, id) return nil } @@ -103,11 +106,17 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") } - // check if it's going to re-queue the message + ////check if it's going to re-queue the message //if _, ok := c.ackMap[id]; ok { // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") //} + //// check if it's going to re'queue the message (2nd approach) + //c.pendingMutex.Lock() + //if _, ok := c.pendingMap[id]; ok { + // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") + //} + // everything is "probably" Ok, so we're going to put the message in queue tMessage := c.pendingMap[id] @@ -173,20 +182,17 @@ func filterContextError(ctx context.Context) error { // Sets a timeout for TTL and re-queue the message after that time func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { - c.pendingMutex.Lock() + //c.pendingMutex.Lock() c.pendingMap[delivery.ID] = delivery.Message - c.pendingMutex.Unlock() - // TODO: Handle race condition + //c.pendingMutex.Unlock() + c.sequenceMutex.Lock() + ch := c.ackMap[delivery.ID] + c.sequenceMutex.Unlock() + select { case <-time.After(c.ttl): _ = c.ReQueue(ctx, delivery.ID) - case <-c.ackMap[delivery.ID]: - // handling race condition - c.sequenceMutex.Lock() - defer c.sequenceMutex.Unlock() - - delete(c.ackMap, delivery.ID) - delete(c.pendingMap, delivery.ID) + case <-ch: return } } From 4f308ab71bbcf8450433f23ee6594ca661fd7a53 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 01:14:19 +0430 Subject: [PATCH 17/21] internal/broker: Solved race condition on multiple ReQueue() --- internal/broker/core.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index e4e98e1..f8adb91 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -19,7 +19,7 @@ func New(ttl time.Duration) ubroker.Broker { mainChannel: make(chan ubroker.Delivery, 100), ackMap: make(map[int]chan bool), pendingMap: make(map[int]ubroker.Message), - + requeueMap: make(map[int]bool), ttl: ttl, closed: false, } @@ -31,9 +31,10 @@ type core struct { mainChannel chan ubroker.Delivery ackMap map[int]chan bool pendingMap map[int]ubroker.Message + requeueMap map[int]bool ttl time.Duration sequenceMutex sync.Mutex - pendingMutex sync.Mutex + pendingMutex sync.Mutex closed bool deliveryStarted bool } @@ -116,6 +117,9 @@ func (c *core) ReQueue(ctx context.Context, id int) error { //if _, ok := c.pendingMap[id]; ok { // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") //} + if _, ok := c.requeueMap[id]; ok { + return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") + } // everything is "probably" Ok, so we're going to put the message in queue tMessage := c.pendingMap[id] @@ -126,13 +130,18 @@ func (c *core) ReQueue(ctx context.Context, id int) error { ID: c.sequenceNumber, } // setting the acknowledge channel - c.ackMap[c.sequenceNumber] = make(chan bool, 1) + c.ackMap[tDelivery.ID] = make(chan bool, 1) + c.pendingMap[tDelivery.ID] = tDelivery.Message go c.ttlHandler(ctx, tDelivery) - // pushing the message into the main channel - c.mainChannel <- tDelivery // invalidate the previous id c.ackMap[id] <- false + // removing the message id from maps + delete(c.ackMap, id) + delete(c.pendingMap, id) + c.requeueMap[id] = true + // pushing the message into the main channel + c.mainChannel <- tDelivery return nil } @@ -156,6 +165,8 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { Message: message, } c.ackMap[delivery.ID] = make(chan bool, 1) + c.pendingMap[delivery.ID] = delivery.Message + go c.ttlHandler(ctx, delivery) // push the message to channel c.mainChannel <- delivery @@ -182,9 +193,6 @@ func filterContextError(ctx context.Context) error { // Sets a timeout for TTL and re-queue the message after that time func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { - //c.pendingMutex.Lock() - c.pendingMap[delivery.ID] = delivery.Message - //c.pendingMutex.Unlock() c.sequenceMutex.Lock() ch := c.ackMap[delivery.ID] c.sequenceMutex.Unlock() From b095f4921c69f3794592b75351046170bacd4d92 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 02:38:30 +0430 Subject: [PATCH 18/21] internal/broker: Solved race condition on Close() --- internal/broker/core.go | 45 ++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f8adb91..087fb6f 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -34,7 +34,6 @@ type core struct { requeueMap map[int]bool ttl time.Duration sequenceMutex sync.Mutex - pendingMutex sync.Mutex closed bool deliveryStarted bool } @@ -44,6 +43,9 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { if err := filterContextError(ctx); err != nil { return nil, ctx.Err() } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // checking the broker if c.closed { return nil, errors.Wrap(ubroker.ErrClosed, "The broker is closed.") @@ -59,17 +61,18 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if err := filterContextError(ctx); err != nil { return err } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // checking the broker if c.closed { - return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") + //return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.") + return ubroker.ErrClosed } // check if delivery started if !c.deliveryStarted { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet") } - // handling race condition - c.sequenceMutex.Lock() - defer c.sequenceMutex.Unlock() // check if the id is not exists if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.") @@ -92,31 +95,23 @@ func (c *core) ReQueue(ctx context.Context, id int) error { if err := filterContextError(ctx); err != nil { return err } + // handling race condition + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() // checking the broker if c.closed { - return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.") + //return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.") + return ubroker.ErrClosed } // check if delivery started if !c.deliveryStarted { return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet") } - // handling race condition - c.sequenceMutex.Lock() - defer c.sequenceMutex.Unlock() // check if the id is not exists if id > c.sequenceNumber || id < 0 { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.") } ////check if it's going to re-queue the message - //if _, ok := c.ackMap[id]; ok { - // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") - //} - - //// check if it's going to re'queue the message (2nd approach) - //c.pendingMutex.Lock() - //if _, ok := c.pendingMap[id]; ok { - // return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") - //} if _, ok := c.requeueMap[id]; ok { return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.") } @@ -151,13 +146,14 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { if err := filterContextError(ctx); err != nil { return err } - // checking the broker - if c.closed { - return errors.Wrap(ubroker.ErrClosed, "Publish:: The broker is closed.") - } // handling race condition c.sequenceMutex.Lock() defer c.sequenceMutex.Unlock() + // checking the broker + if c.closed { + //return errors.Wrap(ubroker.ErrClosed, "Publish:: The broker is closed.") + return ubroker.ErrClosed + } // Pushing into the channel c.sequenceNumber++ delivery := ubroker.Delivery{ @@ -175,7 +171,9 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { - // TODO:‌ implement me + c.sequenceMutex.Lock() + defer c.sequenceMutex.Unlock() + c.closed = true close(c.mainChannel) @@ -200,6 +198,7 @@ func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) { select { case <-time.After(c.ttl): _ = c.ReQueue(ctx, delivery.ID) + return case <-ch: return } From cfbe11009c8ff0dedda167349594c938c8ae385c Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 02:42:30 +0430 Subject: [PATCH 19/21] all: change GOPATH to arcana261 --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index cc2be3e..40e5f9e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/meshkati/ubroker/internal/broker" - "github.com/meshkati/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 087fb6f..e1d39d3 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index fe5b0c0..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/meshkati/ubroker/internal/broker" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 1f336d2..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 95a4b1d..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/meshkati/ubroker/internal/server" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From 5c50dd101e3261fdc5ded16aa578317c3323a8e2 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 02:42:30 +0430 Subject: [PATCH 20/21] Revert "all: change GOPATH to arcana261" This reverts commit cfbe11009c8ff0dedda167349594c938c8ae385c. --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..cc2be3e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/meshkati/ubroker/internal/broker" + "github.com/meshkati/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index e1d39d3..087fb6f 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..fe5b0c0 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/internal/broker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1f336d2 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..95a4b1d 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/meshkati/ubroker/internal/server" + "github.com/meshkati/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From bd038603b45c99d842c72539c8340c192e87e350 Mon Sep 17 00:00:00 2001 From: Seyed Mostafa Meshkati Date: Sat, 13 Apr 2019 02:42:30 +0430 Subject: [PATCH 21/21] Revert "Revert "all: change GOPATH to arcana261"" This reverts commit 5c50dd101e3261fdc5ded16aa578317c3323a8e2. --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index cc2be3e..40e5f9e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/meshkati/ubroker/internal/broker" - "github.com/meshkati/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 087fb6f..e1d39d3 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index fe5b0c0..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/meshkati/ubroker/internal/broker" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index 1f336d2..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 95a4b1d..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/meshkati/ubroker/internal/server" - "github.com/meshkati/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" )