From cf94d2fcfaef8cdd0392808ae125af03e49fa45e Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:30:51 +0430 Subject: [PATCH 1/9] all test passed --- internal/broker/core.go | 207 +++++++++++++++++++++++++++++++++-- internal/broker/core_test.go | 17 ++- internal/server/http_test.go | 4 +- 3 files changed, 210 insertions(+), 18 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..403576f 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,6 +2,7 @@ package broker import ( "context" + "sync" "time" "github.com/arcana261/ubroker/pkg/ubroker" @@ -12,35 +13,217 @@ import ( // with given `ttl`. `ttl` determines time in which // we requeue an unacknowledged/unrequeued message // automatically. + func New(ttl time.Duration) ubroker.Broker { - return &core{} + + c := &core{ + delFlag: false, + isClosed: false, + delChan: make(chan ubroker.Delivery, 100), + mainQ: make([]messageType, 0), + lastID: 0, + ackedMessageID: make([]int, 0), + ttl: ttl, + } + return c } type core struct { - // TODO: add required fields + sync.Mutex + isClosed bool + delChan chan ubroker.Delivery + lastID int + mainQ []messageType + ttl time.Duration + delFlag bool + ackedMessageID []int +} + +type messageType struct { + msg ubroker.Delivery + ttlTime time.Time } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + + switch ctx.Err() { + case context.Canceled: + return nil, ctx.Err() + case context.DeadlineExceeded: + return nil, ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return nil, ubroker.ErrClosed + } + c.delFlag = true + + return c.delChan, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + + switch ctx.Err() { + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return ubroker.ErrClosed + } + + // check delivery done + if c.delFlag == false { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + // published + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == id { + indexID = index + break + } + } + + // acked befor + var ackIndex = -1 + for index, ids := range c.ackedMessageID { + if ids == id { + ackIndex = index + break + } + } + + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + if ackIndex != -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + // checked ttl time + if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } else { + // acked + c.ackedMessageID = append(c.ackedMessageID, id) + return nil + } + + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + + //c.wg.Done() + //defer c.wg.Done() + + switch ctx.Err() { + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return ubroker.ErrClosed + } + + //check delivery + if c.delFlag == false { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + // published? + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == id { + indexID = index + break + } + } + + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + // check ttl time + if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { + c.doReQ(c.mainQ[indexID]) + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } else { + c.doReQ(c.mainQ[indexID]) + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + return nil + + } + return nil +} + +func (c *core) doReQ(msg1 messageType) error { + //c.wg.Add(1) + //defer c.wg.Done() + c.lastID = c.lastID + 1 + var newMsg ubroker.Delivery + newMsg.Message = msg1.msg.Message + newMsg.ID = c.lastID + var newnewmsg = messageType{} + newnewmsg.msg = newMsg + newnewmsg.ttlTime = time.Now() + + c.mainQ = append(c.mainQ, newnewmsg) + //send message to channel + c.delChan <- newMsg + return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + //c.wg.Add(1) + //defer c.wg.Done() + + switch ctx.Err() { + + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + + if c.isClosed == true { + return ubroker.ErrClosed + } + + c.lastID = c.lastID + 1 + var newMsg ubroker.Delivery + newMsg.Message = message + newMsg.ID = c.lastID + //send message to channel + c.delChan <- newMsg + + var newnewmsg = messageType{} + newnewmsg.msg = newMsg + newnewmsg.ttlTime = time.Now() + c.mainQ = append(c.mainQ, newnewmsg) + return nil } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + //c.wg.Wait() + if c.isClosed { + return nil + } + c.Lock() + defer c.Unlock() + c.isClosed = true + close(c.delChan) + + return nil } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..6309a58 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -3,6 +3,8 @@ package broker_test import ( "context" "fmt" + "github.com/msafari18/ubroker/pkg/ubroker" + "github.com/stretchr/testify/assert" "math/rand" "runtime" "sync" @@ -10,10 +12,9 @@ import ( "time" "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -333,21 +334,29 @@ func (s *CoreBrokerTestSuite) TestDataRace() { defer ticker.Stop() s.prepareTest(1 * time.Second) - + var i int wg.Add(1) go func() { defer wg.Done() for { + //fmt.Print("---------->") select { case <-ticker.C: + //fmt.Print("---------->") return default: + i = i + 1 + //fmt.Print("---------->") err := s.broker.Publish(context.Background(), ubroker.Message{ Body: fmt.Sprint(rand.Intn(1000)), }) + //fmt.Print(i) + //fmt.Print("\n") + if err == ubroker.ErrClosed { + //fmt.Print("------------------") return } s.Nil(err) @@ -362,7 +371,7 @@ func (s *CoreBrokerTestSuite) TestDataRace() { wg.Add(1) go func() { defer wg.Done() - + //fmt.Print("-------------->") delivery, err := s.broker.Delivery(context.Background()) s.Nil(err) if err != nil { diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..0a0bc6b 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,10 +9,10 @@ import ( "strings" "testing" + "github.com/arcana261/testify/mock" + "github.com/arcana261/testify/suite" "github.com/arcana261/ubroker/internal/server" "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) type mockBroker struct { From 270b7e4dbeab9ad53876fb0adedda7f482ed3d96 Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:51:41 +0430 Subject: [PATCH 2/9] import corrected --- internal/broker/core_test.go | 5 ++--- internal/server/http_test.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 6309a58..15b11da 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -3,8 +3,6 @@ package broker_test import ( "context" "fmt" - "github.com/msafari18/ubroker/pkg/ubroker" - "github.com/stretchr/testify/assert" "math/rand" "runtime" "sync" @@ -12,9 +10,10 @@ import ( "time" "github.com/arcana261/ubroker/internal/broker" - + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 0a0bc6b..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,10 +9,10 @@ import ( "strings" "testing" - "github.com/arcana261/testify/mock" - "github.com/arcana261/testify/suite" "github.com/arcana261/ubroker/internal/server" "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" ) type mockBroker struct { From 4c9c2e810f012f493b51ba697f839536e1526153 Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:55:08 +0430 Subject: [PATCH 3/9] corrected --- internal/broker/core_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 15b11da..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -333,29 +333,21 @@ func (s *CoreBrokerTestSuite) TestDataRace() { defer ticker.Stop() s.prepareTest(1 * time.Second) - var i int + wg.Add(1) go func() { defer wg.Done() for { - //fmt.Print("---------->") select { case <-ticker.C: - //fmt.Print("---------->") return default: - i = i + 1 - //fmt.Print("---------->") err := s.broker.Publish(context.Background(), ubroker.Message{ Body: fmt.Sprint(rand.Intn(1000)), }) - //fmt.Print(i) - //fmt.Print("\n") - if err == ubroker.ErrClosed { - //fmt.Print("------------------") return } s.Nil(err) @@ -370,7 +362,7 @@ func (s *CoreBrokerTestSuite) TestDataRace() { wg.Add(1) go func() { defer wg.Done() - //fmt.Print("-------------->") + delivery, err := s.broker.Delivery(context.Background()) s.Nil(err) if err != nil { From d1edaf31eff55a410a80a45691e2f780c2155eef Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 19 Apr 2019 19:47:07 +0430 Subject: [PATCH 4/9] bugs fixed --- internal/broker/core.go | 87 ++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 403576f..7b767cf 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -42,6 +42,7 @@ type core struct { type messageType struct { msg ubroker.Delivery ttlTime time.Time + ackChan chan int } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { @@ -58,7 +59,6 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { return nil, ubroker.ErrClosed } c.delFlag = true - return c.delChan, nil } @@ -102,41 +102,34 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if indexID == -1 { return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - if ackIndex != -1 { return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - // checked ttl time - if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } else { - // acked - c.ackedMessageID = append(c.ackedMessageID, id) - return nil - } - + // acked + c.ackedMessageID = append(c.ackedMessageID, id) + //fmt.Print("-------------->",indexID) + c.mainQ[indexID].ackChan <- 1 return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - //c.wg.Done() - //defer c.wg.Done() - switch ctx.Err() { case context.Canceled: return ctx.Err() case context.DeadlineExceeded: return ctx.Err() } + c.Lock() - defer c.Unlock() if c.isClosed == true { + c.Unlock() return ubroker.ErrClosed } //check delivery if c.delFlag == false { + c.Unlock() return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } @@ -148,27 +141,23 @@ func (c *core) ReQueue(ctx context.Context, id int) error { break } } - if indexID == -1 { + c.Unlock() return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - // check ttl time - if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { - c.doReQ(c.mainQ[indexID]) - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } else { - c.doReQ(c.mainQ[indexID]) - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - return nil - } + var msg = c.mainQ[indexID] + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + c.doReQ(msg) + return nil } func (c *core) doReQ(msg1 messageType) error { - //c.wg.Add(1) - //defer c.wg.Done() + if c.isClosed == true { + c.Unlock() + return ubroker.ErrClosed + } c.lastID = c.lastID + 1 var newMsg ubroker.Delivery newMsg.Message = msg1.msg.Message @@ -176,17 +165,41 @@ func (c *core) doReQ(msg1 messageType) error { var newnewmsg = messageType{} newnewmsg.msg = newMsg newnewmsg.ttlTime = time.Now() - + newnewmsg.ackChan = make(chan int, 2) c.mainQ = append(c.mainQ, newnewmsg) + //send message to channel c.delChan <- newMsg + c.Unlock() + + go c.checkTTL(newnewmsg) return nil } -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - //c.wg.Add(1) - //defer c.wg.Done() +func (c *core) checkTTL(msg messageType) { + select { + case <-time.After(c.ttl): + c.Lock() + // remove from mainQ + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == msg.msg.ID { + indexID = index + break + } + } + if indexID != -1 { + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + } + // call reQ again + c.doReQ(msg) + return + case <-msg.ackChan: + return + } +} +func (c *core) Publish(ctx context.Context, message ubroker.Message) error { switch ctx.Err() { case context.Canceled: @@ -195,9 +208,8 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { return ctx.Err() } c.Lock() - defer c.Unlock() - if c.isClosed == true { + c.Unlock() return ubroker.ErrClosed } @@ -207,16 +219,19 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { newMsg.ID = c.lastID //send message to channel c.delChan <- newMsg - var newnewmsg = messageType{} newnewmsg.msg = newMsg newnewmsg.ttlTime = time.Now() + newnewmsg.ackChan = make(chan int, 2) c.mainQ = append(c.mainQ, newnewmsg) + c.Unlock() + + go c.checkTTL(newnewmsg) return nil } func (c *core) Close() error { - //c.wg.Wait() + if c.isClosed { return nil } From f07346487e30df65380ae94b4d332765792db18d Mon Sep 17 00:00:00 2001 From: Monireh Safari Date: Fri, 24 May 2019 14:30:15 +0430 Subject: [PATCH 5/9] Delete core.go --- internal/broker/core.go | 244 ---------------------------------------- 1 file changed, 244 deletions(-) delete mode 100644 internal/broker/core.go diff --git a/internal/broker/core.go b/internal/broker/core.go deleted file mode 100644 index 7b767cf..0000000 --- a/internal/broker/core.go +++ /dev/null @@ -1,244 +0,0 @@ -package broker - -import ( - "context" - "sync" - "time" - - "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/pkg/errors" -) - -// New creates a new instance of ubroker.Broker -// with given `ttl`. `ttl` determines time in which -// we requeue an unacknowledged/unrequeued message -// automatically. - -func New(ttl time.Duration) ubroker.Broker { - - c := &core{ - delFlag: false, - isClosed: false, - delChan: make(chan ubroker.Delivery, 100), - mainQ: make([]messageType, 0), - lastID: 0, - ackedMessageID: make([]int, 0), - ttl: ttl, - } - return c -} - -type core struct { - sync.Mutex - isClosed bool - delChan chan ubroker.Delivery - lastID int - mainQ []messageType - ttl time.Duration - delFlag bool - ackedMessageID []int -} - -type messageType struct { - msg ubroker.Delivery - ttlTime time.Time - ackChan chan int -} - -func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - - switch ctx.Err() { - case context.Canceled: - return nil, ctx.Err() - case context.DeadlineExceeded: - return nil, ctx.Err() - } - c.Lock() - defer c.Unlock() - if c.isClosed == true { - return nil, ubroker.ErrClosed - } - c.delFlag = true - return c.delChan, nil -} - -func (c *core) Acknowledge(ctx context.Context, id int) error { - - switch ctx.Err() { - case context.Canceled: - return ctx.Err() - case context.DeadlineExceeded: - return ctx.Err() - } - c.Lock() - defer c.Unlock() - if c.isClosed == true { - return ubroker.ErrClosed - } - - // check delivery done - if c.delFlag == false { - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } - - // published - var indexID = -1 - for index, message := range c.mainQ { - if message.msg.ID == id { - indexID = index - break - } - } - - // acked befor - var ackIndex = -1 - for index, ids := range c.ackedMessageID { - if ids == id { - ackIndex = index - break - } - } - - if indexID == -1 { - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } - if ackIndex != -1 { - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } - // acked - c.ackedMessageID = append(c.ackedMessageID, id) - //fmt.Print("-------------->",indexID) - c.mainQ[indexID].ackChan <- 1 - return nil -} - -func (c *core) ReQueue(ctx context.Context, id int) error { - - switch ctx.Err() { - case context.Canceled: - return ctx.Err() - case context.DeadlineExceeded: - return ctx.Err() - } - - c.Lock() - if c.isClosed == true { - c.Unlock() - return ubroker.ErrClosed - } - - //check delivery - if c.delFlag == false { - c.Unlock() - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } - - // published? - var indexID = -1 - for index, message := range c.mainQ { - if message.msg.ID == id { - indexID = index - break - } - } - if indexID == -1 { - c.Unlock() - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } - - var msg = c.mainQ[indexID] - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - c.doReQ(msg) - - return nil -} - -func (c *core) doReQ(msg1 messageType) error { - if c.isClosed == true { - c.Unlock() - return ubroker.ErrClosed - } - c.lastID = c.lastID + 1 - var newMsg ubroker.Delivery - newMsg.Message = msg1.msg.Message - newMsg.ID = c.lastID - var newnewmsg = messageType{} - newnewmsg.msg = newMsg - newnewmsg.ttlTime = time.Now() - newnewmsg.ackChan = make(chan int, 2) - c.mainQ = append(c.mainQ, newnewmsg) - - //send message to channel - c.delChan <- newMsg - c.Unlock() - - go c.checkTTL(newnewmsg) - return nil -} - -func (c *core) checkTTL(msg messageType) { - select { - case <-time.After(c.ttl): - c.Lock() - // remove from mainQ - var indexID = -1 - for index, message := range c.mainQ { - if message.msg.ID == msg.msg.ID { - indexID = index - break - } - } - if indexID != -1 { - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - } - // call reQ again - c.doReQ(msg) - return - case <-msg.ackChan: - return - } -} - -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - switch ctx.Err() { - - case context.Canceled: - return ctx.Err() - case context.DeadlineExceeded: - return ctx.Err() - } - c.Lock() - if c.isClosed == true { - c.Unlock() - return ubroker.ErrClosed - } - - c.lastID = c.lastID + 1 - var newMsg ubroker.Delivery - newMsg.Message = message - newMsg.ID = c.lastID - //send message to channel - c.delChan <- newMsg - var newnewmsg = messageType{} - newnewmsg.msg = newMsg - newnewmsg.ttlTime = time.Now() - newnewmsg.ackChan = make(chan int, 2) - c.mainQ = append(c.mainQ, newnewmsg) - c.Unlock() - - go c.checkTTL(newnewmsg) - return nil -} - -func (c *core) Close() error { - - if c.isClosed { - return nil - } - c.Lock() - defer c.Unlock() - c.isClosed = true - close(c.delChan) - - return nil -} From 06e50304f64087a1e5f0561f9dd33fc3b2f9477b Mon Sep 17 00:00:00 2001 From: Monireh Safari Date: Fri, 24 May 2019 14:36:35 +0430 Subject: [PATCH 6/9] change core.go --- internal/broker/core.go | 342 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 internal/broker/core.go diff --git a/internal/broker/core.go b/internal/broker/core.go new file mode 100644 index 0000000..0a6d5b0 --- /dev/null +++ b/internal/broker/core.go @@ -0,0 +1,342 @@ +package broker + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/msafari18/ubroker/pkg/ubroker" +) + +// New creates a new instance of ubroker.Broker +// with given `ttl`. `ttl` determines time in which +// we requeue an unacknowledged/unrequeued message +// automatically. +func New(ttl time.Duration) ubroker.Broker { + broker := &core{ + ttl: ttl, + requests: make(chan interface{}), + deliveryChannel: make(chan *ubroker.Delivery), + closed: make(chan bool, 1), + closing: make(chan bool, 1), + pending: make(map[int32]*ubroker.Message), + messages: []*ubroker.Delivery{{}}, + } + + broker.wg.Add(1) + go broker.startDelivery() + + return broker +} + +type core struct { + nextID int32 + ttl time.Duration + + mutex sync.Mutex + working sync.WaitGroup + wg sync.WaitGroup + + requests chan interface{} + deliveryChannel chan *ubroker.Delivery + closed chan bool + closing chan bool + pending map[int32]*ubroker.Message + messages []*ubroker.Delivery + channel chan *ubroker.Delivery +} + +type acknowledgeRequest struct { + id int32 + response chan acknowledgeResponse +} + +type acknowledgeResponse struct { + id int32 + err error +} + +type requeueRequest struct { + id int32 + response chan requeueResponse +} + +type requeueResponse struct { + id int32 + err error +} + +type publishRequest struct { + message *ubroker.Message + response chan publishResponse +} + +type publishResponse struct { + err error +} + +func (c *core) Delivery(ctx context.Context) (<-chan *ubroker.Delivery, error) { + if isCanceledContext(ctx) { + return nil, ctx.Err() + } + + if !c.startWorking() { + return nil, ubroker.ErrClosed + } + defer c.working.Done() + + return c.deliveryChannel, nil +} + +func (c *core) Acknowledge(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &acknowledgeRequest{ + id: id, + response: make(chan acknowledgeResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *core) ReQueue(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *core) Publish(ctx context.Context, message *ubroker.Message) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &publishRequest{ + message: message, + response: make(chan publishResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.closed: + return ubroker.ErrClosed + case c.requests <- request: + return nil + } +} + +func (c *core) Close() error { + if !c.startClosing() { + return errors.New("can not close channel, closing in progress") + } + c.working.Wait() + close(c.closed) + c.wg.Wait() + close(c.deliveryChannel) + + return nil +} + +func (c *core) startDelivery() { + defer c.wg.Done() + for { + select { + case <-c.closed: + return + + case request := <-c.requests: + if isAcknowledgeRequest(request) { + c.wg.Add(1) + req, _ := request.(*acknowledgeRequest) + req.response <- c.handleAcknowledge(req) + } else if isRequeueRequest(request) { + c.wg.Add(1) + req, _ := request.(*requeueRequest) + req.response <- c.handleRequeue(req) + } else if isPublishRequest(request) { + c.wg.Add(1) + req, _ := request.(*publishRequest) + req.response <- c.handlePublish(req) + } else { + panic(errors.New("UNKNOWN REQUEST")) + } + + case c.channel <- c.messages[0]: + if c.channel != nil { + c.pending[c.messages[0].Id] = c.messages[0].Message + c.wg.Add(1) + go c.snooze(c.messages[0].Id) + + c.messages = c.messages[1:] + if len(c.messages) == 0 { + c.channel = nil + c.messages = []*ubroker.Delivery{{}} + } + } + } + } +} + +func (c *core) startWorking() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + c.working.Add(1) + return true + } +} + +func (c *core) startClosing() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + close(c.closing) + return true + } +} + +func isCanceledContext(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + +func isAcknowledgeRequest(request interface{}) bool { + _, ok := request.(*acknowledgeRequest) + return ok +} + +func isRequeueRequest(request interface{}) bool { + _, ok := request.(*requeueRequest) + return ok +} + +func isPublishRequest(request interface{}) bool { + _, ok := request.(*publishRequest) + return ok +} + +func (c *core) handleAcknowledge(request *acknowledgeRequest) acknowledgeResponse { + defer c.wg.Done() + _, ok := c.pending[request.id] + if !ok { + return acknowledgeResponse{id: request.id, err: ubroker.ErrInvalidID} + } + delete(c.pending, request.id) + return acknowledgeResponse{id: request.id, err: nil} +} + +func (c *core) handleRequeue(request *requeueRequest) requeueResponse { + defer c.wg.Done() + message, ok := c.pending[request.id] + if !ok { + return requeueResponse{id: request.id, err: ubroker.ErrInvalidID} + } + delete(c.pending, request.id) + c.wg.Add(1) + c.handlePublish(&publishRequest{ + message: message, + response: make(chan publishResponse, 1), + }) + return requeueResponse{id: request.id, err: nil} +} + +func (c *core) handlePublish(request *publishRequest) publishResponse { + defer c.wg.Done() + + if c.channel == nil { + c.messages = []*ubroker.Delivery{} + c.channel = c.deliveryChannel + } + + id := c.nextID + c.nextID++ + newDelivery := ubroker.Delivery{ + Id: id, + Message: request.message, + } + + c.messages = append(c.messages, &newDelivery) + + return publishResponse{err: nil} +} + +func (c *core) snooze(id int32) { + defer c.wg.Done() + ticker := time.NewTicker(c.ttl) + defer ticker.Stop() + + select { + case <-c.closed: + return + + case <-ticker.C: + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), + } + select { + case <-c.closed: + return + + case c.requests <- request: + } + } +} From ed0054cdca2ca5679285aab02060f5d4541877eb Mon Sep 17 00:00:00 2001 From: Monireh Safari Date: Fri, 24 May 2019 14:38:00 +0430 Subject: [PATCH 7/9] Update core.go --- internal/broker/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 0a6d5b0..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/msafari18/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker From 0a9b98faf672abeb8979579fed0ff7402728b3b2 Mon Sep 17 00:00:00 2001 From: Monireh Safari Date: Fri, 24 May 2019 14:44:00 +0430 Subject: [PATCH 8/9] update grpc.go --- internal/server/grpc.go | 75 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..9215dd8 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -2,9 +2,10 @@ package server import ( "context" + "io" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" + "github.com/msafari18/ubroker/pkg/ubroker" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -20,17 +21,81 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { } func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") + + ctx := stream.Context(); + delChan, err := s.broker.Delivery(ctx) + + if err != nil { + if err == ubroker.ErrClosed { + return status.Error(codes.Unavailable, "Unavailable") + } + return nil; + } + + for { + _, recErr := stream.Recv() + if recErr == io.EOF { + return nil + } + if recErr != nil { + return nil; + } + + message := <- delChan + if message == nil { + return status.Error(codes.Unavailable, "Unavailable") + } + + sendErr := stream.Send(message) + if sendErr != nil { + return nil + } + + } } func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + + err := s.broker.Acknowledge(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed { + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + } + if err == ubroker.ErrInvalidID { + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidID") + } + return &empty.Empty{}, nil; + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.ReQueue(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed { + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + } + if err == ubroker.ErrInvalidID { + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidID") + } + return &empty.Empty{}, nil; + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Publish(ctx, request) + + if err != nil { + if err == ubroker.ErrClosed { + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + } + if err == ubroker.ErrInvalidID { + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidID") + } + return &empty.Empty{}, nil; + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } From 725f29581def5522372716672366f002ef2a6e20 Mon Sep 17 00:00:00 2001 From: Monireh Safari Date: Fri, 24 May 2019 14:44:53 +0430 Subject: [PATCH 9/9] Update grpc.go --- internal/server/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 9215dd8..8a028f0 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -5,7 +5,7 @@ import ( "io" "github.com/golang/protobuf/ptypes/empty" - "github.com/msafari18/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" )