From 6d221382c54eea8455e5e2b7de80a984ce8935a5 Mon Sep 17 00:00:00 2001 From: MohammadHossein Date: Fri, 12 Apr 2019 20:59:53 +0430 Subject: [PATCH 1/4] tests passed --- cmd/ubroker/main.go | 4 +- internal/broker/core.go | 156 +++++++++++++++++++++++++++++++---- internal/broker/core_test.go | 8 +- internal/server/http.go | 2 +- internal/server/http_test.go | 4 +- 5 files changed, 151 insertions(+), 23 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..97327f9 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/MohammadHossein/ubroker/internal/broker" + "github.com/MohammadHossein/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..427fc31 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,45 +2,171 @@ package broker import ( "context" + "math" + "sync" "time" - "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/pkg/errors" + "github.com/MohammadHossein/ubroker/pkg/ubroker" ) +var ( + lastMessageID = 0 +) // New creates a new instance of ubroker.Broker // with given `ttl`. `ttl` determines time in which // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + return &core{ + ttl: ttl, + isClosed: make(chan bool, 1), + deliveryChannel: make(chan ubroker.Delivery, 1), + toDeliverChannel: make(chan ubroker.Delivery, math.MaxInt16), + messageChannels: make(map[int]chan bool, math.MaxInt16), + deliveredMessages: make(map[int]ubroker.Delivery, math.MaxInt16), + doneMessages: make(map[int]ubroker.Delivery, math.MaxInt16), + mutex: sync.Mutex{}, + publishMutex: sync.Mutex{}, + } } type core struct { - // TODO: add required fields + isClosed chan bool + ttl time.Duration + deliveryChannel chan ubroker.Delivery + toDeliverChannel chan ubroker.Delivery + messageChannels map[int]chan bool + deliveredMessages map[int]ubroker.Delivery + doneMessages map[int]ubroker.Delivery + mutex sync.Mutex + publishMutex sync.Mutex } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.isClosed: + return nil, ubroker.ErrClosed + default: + } + if c.deliveryChannel == nil { + c.deliveryChannel = make(chan ubroker.Delivery, 1) + } + return c.deliveryChannel, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + c.mutex.Lock() + defer c.mutex.Unlock() + if _, ok := c.deliveredMessages[id]; !ok { + return ubroker.ErrInvalidID + } + if _, ok := c.doneMessages[id]; ok { + return ubroker.ErrInvalidID + } + c.doneMessages[id] = c.deliveredMessages[id] + delete(c.deliveredMessages, id) + close(c.messageChannels[id]) + return nil + //return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + c.mutex.Lock() + if _, ok := c.deliveredMessages[id]; !ok { + c.mutex.Unlock() + return ubroker.ErrInvalidID + } + message := c.deliveredMessages[id].Message + delete(c.deliveredMessages, id) + c.mutex.Unlock() + err := c.Publish(ctx, message) + return err } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") -} + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + id := lastMessageID + lastMessageID++ + br := ubroker.Delivery{Message: message, ID: id} + c.mutex.Lock() + c.messageChannels[id] = make(chan bool) + c.toDeliverChannel <- br + c.mutex.Unlock() + + go func() { + c.publishMutex.Lock() + defer c.publishMutex.Unlock() + m := <-c.toDeliverChannel + c.mutex.Lock() + c.deliveredMessages[m.ID] = m + c.mutex.Unlock() + for ; len(c.deliveryChannel) != 0; { + select { + case <-c.isClosed: + return + default: + + } + } + select { + case <-c.isClosed: + return + default: + c.deliveryChannel <- m + go c.WaitAck(ctx, m) + } + }() + return nil +} func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + c.mutex.Lock() + defer c.mutex.Unlock() + select { + case <-c.isClosed: + return nil + default: + close(c.isClosed) + close(c.deliveryChannel) + return nil + } +} +func (c *core) WaitAck(ctx context.Context, message ubroker.Delivery) { + c.mutex.Lock() + channel := c.messageChannels[message.ID] + c.mutex.Unlock() + d := time.After(c.ttl) + select { + case <-channel: + return + case <-d: + _ = c.ReQueue(ctx, message.ID) + return + case <-c.isClosed: + return + case <-ctx.Done(): + + } } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..6b66ea6 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/MohammadHossein/ubroker/internal/broker" + "github.com/MohammadHossein/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -208,7 +208,7 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeReQueueable() { msg1 := <-delivery s.Nil(s.broker.ReQueue(context.Background(), msg1.ID)) msg2 := <-delivery - s.NotEqual(msg1.ID, msg2.ID) + s.NotEqual(msg1.ID, msg2.ID) s.Equal(msg1.Message.Body, msg2.Message.Body) } @@ -260,6 +260,8 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldReQueueUponHalfSecondTTL() { s.publish("hello2") msg2 := <-delivery time.Sleep(750 * time.Millisecond) + fmt.Println(msg1) + fmt.Println(msg2) s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.ID)) s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.ID)) msg3 := <-delivery diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..d787f3c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/MohammadHossein/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..482b642 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/MohammadHossein/ubroker/internal/server" + "github.com/MohammadHossein/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) From 12e6baa49dbf48eab3a7f370bc04194b90278701 Mon Sep 17 00:00:00 2001 From: MohammadHossein Date: Fri, 12 Apr 2019 21:13:28 +0430 Subject: [PATCH 2/4] remove prints --- internal/broker/core_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 6b66ea6..2b650a4 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -260,8 +260,6 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldReQueueUponHalfSecondTTL() { s.publish("hello2") msg2 := <-delivery time.Sleep(750 * time.Millisecond) - fmt.Println(msg1) - fmt.Println(msg2) s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.ID)) s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.ID)) msg3 := <-delivery From 788151a682999c34487cce6d4e1a0517a3fc897b Mon Sep 17 00:00:00 2001 From: MohammadHossein Date: Fri, 12 Apr 2019 21:58:45 +0430 Subject: [PATCH 3/4] race tests passed --- internal/broker/core.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 427fc31..61caa7e 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -11,6 +11,7 @@ import ( var ( lastMessageID = 0 + mutexId = sync.Mutex{} ) // New creates a new instance of ubroker.Broker // with given `ttl`. `ttl` determines time in which @@ -27,6 +28,8 @@ func New(ttl time.Duration) ubroker.Broker { doneMessages: make(map[int]ubroker.Delivery, math.MaxInt16), mutex: sync.Mutex{}, publishMutex: sync.Mutex{}, + deliveryMutex: sync.Mutex{}, + } } @@ -40,6 +43,7 @@ type core struct { doneMessages map[int]ubroker.Delivery mutex sync.Mutex publishMutex sync.Mutex + deliveryMutex sync.Mutex } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { @@ -108,8 +112,7 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { return ubroker.ErrClosed default: } - id := lastMessageID - lastMessageID++ + id := getID() br := ubroker.Delivery{Message: message, ID: id} c.mutex.Lock() c.messageChannels[id] = make(chan bool) @@ -135,7 +138,9 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { case <-c.isClosed: return default: + c.deliveryMutex.Lock() c.deliveryChannel <- m + c.deliveryMutex.Unlock() go c.WaitAck(ctx, m) } }() @@ -149,7 +154,9 @@ func (c *core) Close() error { return nil default: close(c.isClosed) + c.deliveryMutex.Lock() close(c.deliveryChannel) + c.deliveryMutex.Unlock() return nil } } @@ -170,3 +177,9 @@ func (c *core) WaitAck(ctx context.Context, message ubroker.Delivery) { } } +func getID() int { + mutexId.Lock() + defer mutexId.Unlock() + lastMessageID++ + return lastMessageID +} \ No newline at end of file From a6aa9036c2899905479a0f6fb024a09b85dd7199 Mon Sep 17 00:00:00 2001 From: MohammadHossein Date: Fri, 12 Apr 2019 22:07:42 +0430 Subject: [PATCH 4/4] change MohammadHossein to arcana261 --- cmd/ubroker/main.go | 4 ++-- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 97327f9..40e5f9e 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -7,8 +7,8 @@ import ( "os/signal" "time" - "github.com/MohammadHossein/ubroker/internal/broker" - "github.com/MohammadHossein/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 61caa7e..afab977 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/MohammadHossein/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" ) var ( diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 2b650a4..dbd3ffc 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/MohammadHossein/ubroker/internal/broker" - "github.com/MohammadHossein/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/http.go b/internal/server/http.go index d787f3c..6dcac4c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/MohammadHossein/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 482b642..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,8 +9,8 @@ import ( "strings" "testing" - "github.com/MohammadHossein/ubroker/internal/server" - "github.com/MohammadHossein/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" )