From 3d0bf078e4314e61f239ed08272b2c244be5cb76 Mon Sep 17 00:00:00 2001 From: AMIRmh Date: Fri, 12 Apr 2019 17:47:32 +0430 Subject: [PATCH 1/8] all tests are passed --- internal/broker/core.go | 160 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 146 insertions(+), 14 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..40f9e3b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,10 +2,15 @@ package broker import ( "context" + "math" "time" - "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/pkg/errors" + "github.com/amirmh/ubroker/pkg/ubroker" + "sync" +) + +var ( + lastID int ) // New creates a new instance of ubroker.Broker @@ -13,34 +18,161 @@ import ( // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + return &core{ + isClosed: make(chan bool, 1), + ttl: ttl, + deliveryChannel: make(chan ubroker.Delivery, 1), + processingQueue: make(map[int]waitForAckStruct), + processingMutex: &sync.Mutex{}, + publishMutex: &sync.Mutex{}, + publishOrderMutex: &sync.Mutex{}, + writeWaitGp: &sync.WaitGroup{}, + lastMsg: ubroker.Delivery{ID:-1}, + publishQueue: make(chan ubroker.Message, math.MaxInt16), + } } + type core struct { - // TODO: add required fields + isClosed chan bool + ttl time.Duration + deliveryChannel chan ubroker.Delivery + processingQueue map[int]waitForAckStruct + processingMutex *sync.Mutex + publishMutex *sync.Mutex + publishOrderMutex *sync.Mutex + lastMsg ubroker.Delivery + publishQueue chan ubroker.Message + writeWaitGp *sync.WaitGroup +} + +type waitForAckStruct struct { + message ubroker.Delivery + ackChannnel chan interface{} +} + +func (c *core) waitForAck(ctx context.Context, ubrokerMsg waitForAckStruct) { + select { + case <-time.After(c.ttl): + _ = c.ReQueue(ctx, ubrokerMsg.message.ID) + case <-ctx.Done(): + return + case <-c.isClosed: + return + case <-ubrokerMsg.ackChannnel: + return + } } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.isClosed: + return nil, ubroker.ErrClosed + default: + } + + return c.deliveryChannel, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + + //fmt.Println("lll") + c.processingMutex.Lock() + defer c.processingMutex.Unlock() + waited, ok := c.processingQueue[id] + if !ok { + return ubroker.ErrInvalidID + } + close(waited.ackChannnel) + delete(c.processingQueue, id) + //fmt.Println("uuu") + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + + //fmt.Println("ll") + c.processingMutex.Lock() + defer c.processingMutex.Unlock() + waited, ok := c.processingQueue[id] + if !ok { + return ubroker.ErrInvalidID + } + close(waited.ackChannnel) + delete(c.processingQueue, id) + _ = c.Publish(ctx, waited.message.Message) + //fmt.Println("uu") + return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.isClosed: + return ubroker.ErrClosed + default: + } + + + c.writeWaitGp.Add(1) + c.publishQueue <- message + go func() { + defer c.writeWaitGp.Done() + c.publishMutex.Lock() + + lastID++ + brokerMsg := ubroker.Delivery{ + Message: <-c.publishQueue, + ID: lastID, + } + processingMsg := waitForAckStruct{ + message: brokerMsg, + ackChannnel: make(chan interface{}, 1), + } + + c.processingMutex.Lock() + c.processingQueue[processingMsg.message.ID] = processingMsg + c.processingMutex.Unlock() + + select { + case <-c.isClosed: + return + default: + c.deliveryChannel <- brokerMsg + go c.waitForAck(ctx, processingMsg) + + } + c.publishMutex.Unlock() + + }() + + + return nil } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + c.writeWaitGp.Wait() + close(c.deliveryChannel) + close(c.isClosed) + close(c.publishQueue) + return nil } From fd62883e305b4bddea0c2261b20013d8ea654844 Mon Sep 17 00:00:00 2001 From: AMIRmh Date: Fri, 12 Apr 2019 18:41:05 +0430 Subject: [PATCH 2/8] the race problem solved --- internal/broker/core.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 40f9e3b..05b2853 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -3,6 +3,7 @@ package broker import ( "context" "math" + "sync/atomic" "time" "github.com/amirmh/ubroker/pkg/ubroker" @@ -10,7 +11,7 @@ import ( ) var ( - lastID int + lastID int32 ) // New creates a new instance of ubroker.Broker @@ -132,23 +133,27 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { default: } - - c.writeWaitGp.Add(1) - c.publishQueue <- message + c.publishOrderMutex.Lock() + //if !<-c.isClosed { + c.publishQueue <- message + //} + c.publishOrderMutex.Unlock() go func() { - defer c.writeWaitGp.Done() + //c.writeWaitGp.Add(1) + //defer c.writeWaitGp.Done() c.publishMutex.Lock() - lastID++ + atomic.AddInt32(&lastID, 1) brokerMsg := ubroker.Delivery{ Message: <-c.publishQueue, - ID: lastID, + ID: int(atomic.LoadInt32(&lastID)), } processingMsg := waitForAckStruct{ message: brokerMsg, ackChannnel: make(chan interface{}, 1), } + c.processingMutex.Lock() c.processingQueue[processingMsg.message.ID] = processingMsg c.processingMutex.Unlock() @@ -170,9 +175,13 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { } func (c *core) Close() error { - c.writeWaitGp.Wait() + //c.writeWaitGp.Wait() + c.publishMutex.Lock() + c.publishOrderMutex.Lock() close(c.deliveryChannel) close(c.isClosed) close(c.publishQueue) + c.publishOrderMutex.Unlock() + c.publishMutex.Unlock() return nil } From ec08196241a8a6a326a633b142004fd172384122 Mon Sep 17 00:00:00 2001 From: AMIRmh Date: Fri, 12 Apr 2019 18:43:22 +0430 Subject: [PATCH 3/8] all commite --- .idea/modules.xml | 8 + .idea/ubroker.iml | 9 + .idea/vcs.xml | 6 + .idea/workspace.xml | 488 +++++++++++++++++++++++++++++++++++ README.md | 2 +- cmd/ubroker/main.go | 4 +- internal/broker/core_test.go | 12 +- internal/server/http.go | 2 +- internal/server/http_test.go | 4 +- 9 files changed, 527 insertions(+), 8 deletions(-) create mode 100644 .idea/modules.xml create mode 100644 .idea/ubroker.iml create mode 100644 .idea/vcs.xml create mode 100644 .idea/workspace.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e312037 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/ubroker.iml b/.idea/ubroker.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/ubroker.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..b78c7b3 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,488 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + arcana261 + testack + cancel + testdata + append + race + lastID + + + amirmh + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +