From cf94d2fcfaef8cdd0392808ae125af03e49fa45e Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:30:51 +0430 Subject: [PATCH 1/4] all test passed --- internal/broker/core.go | 207 +++++++++++++++++++++++++++++++++-- internal/broker/core_test.go | 17 ++- internal/server/http_test.go | 4 +- 3 files changed, 210 insertions(+), 18 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..403576f 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,6 +2,7 @@ package broker import ( "context" + "sync" "time" "github.com/arcana261/ubroker/pkg/ubroker" @@ -12,35 +13,217 @@ import ( // with given `ttl`. `ttl` determines time in which // we requeue an unacknowledged/unrequeued message // automatically. + func New(ttl time.Duration) ubroker.Broker { - return &core{} + + c := &core{ + delFlag: false, + isClosed: false, + delChan: make(chan ubroker.Delivery, 100), + mainQ: make([]messageType, 0), + lastID: 0, + ackedMessageID: make([]int, 0), + ttl: ttl, + } + return c } type core struct { - // TODO: add required fields + sync.Mutex + isClosed bool + delChan chan ubroker.Delivery + lastID int + mainQ []messageType + ttl time.Duration + delFlag bool + ackedMessageID []int +} + +type messageType struct { + msg ubroker.Delivery + ttlTime time.Time } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + + switch ctx.Err() { + case context.Canceled: + return nil, ctx.Err() + case context.DeadlineExceeded: + return nil, ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return nil, ubroker.ErrClosed + } + c.delFlag = true + + return c.delChan, nil } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + + switch ctx.Err() { + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return ubroker.ErrClosed + } + + // check delivery done + if c.delFlag == false { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + // published + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == id { + indexID = index + break + } + } + + // acked befor + var ackIndex = -1 + for index, ids := range c.ackedMessageID { + if ids == id { + ackIndex = index + break + } + } + + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + if ackIndex != -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + // checked ttl time + if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } else { + // acked + c.ackedMessageID = append(c.ackedMessageID, id) + return nil + } + + return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + + //c.wg.Done() + //defer c.wg.Done() + + switch ctx.Err() { + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + if c.isClosed == true { + return ubroker.ErrClosed + } + + //check delivery + if c.delFlag == false { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + + // published? + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == id { + indexID = index + break + } + } + + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } + // check ttl time + if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { + c.doReQ(c.mainQ[indexID]) + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") + } else { + c.doReQ(c.mainQ[indexID]) + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + return nil + + } + return nil +} + +func (c *core) doReQ(msg1 messageType) error { + //c.wg.Add(1) + //defer c.wg.Done() + c.lastID = c.lastID + 1 + var newMsg ubroker.Delivery + newMsg.Message = msg1.msg.Message + newMsg.ID = c.lastID + var newnewmsg = messageType{} + newnewmsg.msg = newMsg + newnewmsg.ttlTime = time.Now() + + c.mainQ = append(c.mainQ, newnewmsg) + //send message to channel + c.delChan <- newMsg + return nil } func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + //c.wg.Add(1) + //defer c.wg.Done() + + switch ctx.Err() { + + case context.Canceled: + return ctx.Err() + case context.DeadlineExceeded: + return ctx.Err() + } + c.Lock() + defer c.Unlock() + + if c.isClosed == true { + return ubroker.ErrClosed + } + + c.lastID = c.lastID + 1 + var newMsg ubroker.Delivery + newMsg.Message = message + newMsg.ID = c.lastID + //send message to channel + c.delChan <- newMsg + + var newnewmsg = messageType{} + newnewmsg.msg = newMsg + newnewmsg.ttlTime = time.Now() + c.mainQ = append(c.mainQ, newnewmsg) + return nil } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + //c.wg.Wait() + if c.isClosed { + return nil + } + c.Lock() + defer c.Unlock() + c.isClosed = true + close(c.delChan) + + return nil } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..6309a58 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -3,6 +3,8 @@ package broker_test import ( "context" "fmt" + "github.com/msafari18/ubroker/pkg/ubroker" + "github.com/stretchr/testify/assert" "math/rand" "runtime" "sync" @@ -10,10 +12,9 @@ import ( "time" "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -333,21 +334,29 @@ func (s *CoreBrokerTestSuite) TestDataRace() { defer ticker.Stop() s.prepareTest(1 * time.Second) - + var i int wg.Add(1) go func() { defer wg.Done() for { + //fmt.Print("---------->") select { case <-ticker.C: + //fmt.Print("---------->") return default: + i = i + 1 + //fmt.Print("---------->") err := s.broker.Publish(context.Background(), ubroker.Message{ Body: fmt.Sprint(rand.Intn(1000)), }) + //fmt.Print(i) + //fmt.Print("\n") + if err == ubroker.ErrClosed { + //fmt.Print("------------------") return } s.Nil(err) @@ -362,7 +371,7 @@ func (s *CoreBrokerTestSuite) TestDataRace() { wg.Add(1) go func() { defer wg.Done() - + //fmt.Print("-------------->") delivery, err := s.broker.Delivery(context.Background()) s.Nil(err) if err != nil { diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..0a0bc6b 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,10 +9,10 @@ import ( "strings" "testing" + "github.com/arcana261/testify/mock" + "github.com/arcana261/testify/suite" "github.com/arcana261/ubroker/internal/server" "github.com/arcana261/ubroker/pkg/ubroker" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) type mockBroker struct { From 270b7e4dbeab9ad53876fb0adedda7f482ed3d96 Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:51:41 +0430 Subject: [PATCH 2/4] import corrected --- internal/broker/core_test.go | 5 ++--- internal/server/http_test.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 6309a58..15b11da 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -3,8 +3,6 @@ package broker_test import ( "context" "fmt" - "github.com/msafari18/ubroker/pkg/ubroker" - "github.com/stretchr/testify/assert" "math/rand" "runtime" "sync" @@ -12,9 +10,10 @@ import ( "time" "github.com/arcana261/ubroker/internal/broker" - + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 0a0bc6b..d29d161 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -9,10 +9,10 @@ import ( "strings" "testing" - "github.com/arcana261/testify/mock" - "github.com/arcana261/testify/suite" "github.com/arcana261/ubroker/internal/server" "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" ) type mockBroker struct { From 4c9c2e810f012f493b51ba697f839536e1526153 Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 12 Apr 2019 12:55:08 +0430 Subject: [PATCH 3/4] corrected --- internal/broker/core_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 15b11da..9d6530d 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -333,29 +333,21 @@ func (s *CoreBrokerTestSuite) TestDataRace() { defer ticker.Stop() s.prepareTest(1 * time.Second) - var i int + wg.Add(1) go func() { defer wg.Done() for { - //fmt.Print("---------->") select { case <-ticker.C: - //fmt.Print("---------->") return default: - i = i + 1 - //fmt.Print("---------->") err := s.broker.Publish(context.Background(), ubroker.Message{ Body: fmt.Sprint(rand.Intn(1000)), }) - //fmt.Print(i) - //fmt.Print("\n") - if err == ubroker.ErrClosed { - //fmt.Print("------------------") return } s.Nil(err) @@ -370,7 +362,7 @@ func (s *CoreBrokerTestSuite) TestDataRace() { wg.Add(1) go func() { defer wg.Done() - //fmt.Print("-------------->") + delivery, err := s.broker.Delivery(context.Background()) s.Nil(err) if err != nil { From d1edaf31eff55a410a80a45691e2f780c2155eef Mon Sep 17 00:00:00 2001 From: msafari18 Date: Fri, 19 Apr 2019 19:47:07 +0430 Subject: [PATCH 4/4] bugs fixed --- internal/broker/core.go | 87 ++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 403576f..7b767cf 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -42,6 +42,7 @@ type core struct { type messageType struct { msg ubroker.Delivery ttlTime time.Time + ackChan chan int } func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { @@ -58,7 +59,6 @@ func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { return nil, ubroker.ErrClosed } c.delFlag = true - return c.delChan, nil } @@ -102,41 +102,34 @@ func (c *core) Acknowledge(ctx context.Context, id int) error { if indexID == -1 { return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - if ackIndex != -1 { return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - // checked ttl time - if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } else { - // acked - c.ackedMessageID = append(c.ackedMessageID, id) - return nil - } - + // acked + c.ackedMessageID = append(c.ackedMessageID, id) + //fmt.Print("-------------->",indexID) + c.mainQ[indexID].ackChan <- 1 return nil } func (c *core) ReQueue(ctx context.Context, id int) error { - //c.wg.Done() - //defer c.wg.Done() - switch ctx.Err() { case context.Canceled: return ctx.Err() case context.DeadlineExceeded: return ctx.Err() } + c.Lock() - defer c.Unlock() if c.isClosed == true { + c.Unlock() return ubroker.ErrClosed } //check delivery if c.delFlag == false { + c.Unlock() return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } @@ -148,27 +141,23 @@ func (c *core) ReQueue(ctx context.Context, id int) error { break } } - if indexID == -1 { + c.Unlock() return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") } - // check ttl time - if time.Now().Sub(c.mainQ[indexID].ttlTime) > c.ttl { - c.doReQ(c.mainQ[indexID]) - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - return errors.Wrap(ubroker.ErrInvalidID, "id is invalid") - } else { - c.doReQ(c.mainQ[indexID]) - c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) - return nil - } + var msg = c.mainQ[indexID] + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + c.doReQ(msg) + return nil } func (c *core) doReQ(msg1 messageType) error { - //c.wg.Add(1) - //defer c.wg.Done() + if c.isClosed == true { + c.Unlock() + return ubroker.ErrClosed + } c.lastID = c.lastID + 1 var newMsg ubroker.Delivery newMsg.Message = msg1.msg.Message @@ -176,17 +165,41 @@ func (c *core) doReQ(msg1 messageType) error { var newnewmsg = messageType{} newnewmsg.msg = newMsg newnewmsg.ttlTime = time.Now() - + newnewmsg.ackChan = make(chan int, 2) c.mainQ = append(c.mainQ, newnewmsg) + //send message to channel c.delChan <- newMsg + c.Unlock() + + go c.checkTTL(newnewmsg) return nil } -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - //c.wg.Add(1) - //defer c.wg.Done() +func (c *core) checkTTL(msg messageType) { + select { + case <-time.After(c.ttl): + c.Lock() + // remove from mainQ + var indexID = -1 + for index, message := range c.mainQ { + if message.msg.ID == msg.msg.ID { + indexID = index + break + } + } + if indexID != -1 { + c.mainQ = append(c.mainQ[:indexID], c.mainQ[indexID+1:]...) + } + // call reQ again + c.doReQ(msg) + return + case <-msg.ackChan: + return + } +} +func (c *core) Publish(ctx context.Context, message ubroker.Message) error { switch ctx.Err() { case context.Canceled: @@ -195,9 +208,8 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { return ctx.Err() } c.Lock() - defer c.Unlock() - if c.isClosed == true { + c.Unlock() return ubroker.ErrClosed } @@ -207,16 +219,19 @@ func (c *core) Publish(ctx context.Context, message ubroker.Message) error { newMsg.ID = c.lastID //send message to channel c.delChan <- newMsg - var newnewmsg = messageType{} newnewmsg.msg = newMsg newnewmsg.ttlTime = time.Now() + newnewmsg.ackChan = make(chan int, 2) c.mainQ = append(c.mainQ, newnewmsg) + c.Unlock() + + go c.checkTTL(newnewmsg) return nil } func (c *core) Close() error { - //c.wg.Wait() + if c.isClosed { return nil }