From ea57dd0ebcc3d53bfbd469db1df4ab0f28bb0fd4 Mon Sep 17 00:00:00 2001 From: reygar Date: Sat, 13 Apr 2019 00:16:14 +0430 Subject: [PATCH 1/2] Update core.go --- internal/broker/core.go | 218 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 198 insertions(+), 20 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..0c4592e 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -1,46 +1,224 @@ package broker - import ( "context" - "time" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + "sync" + "time" ) -// New creates a new instance of ubroker.Broker -// with given `ttl`. `ttl` determines time in which -// we requeue an unacknowledged/unrequeued message -// automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + //return &core{} + core := &Core{ + deliveryEntry: make(chan ubroker.Delivery, 100), + requeueEntry: make(chan requeue), + publishEntry: make([]request, 100), + acknowledgeEntry: make([]int, 100), + ttl: ttl, + delivered: false, + isClosed: false, + close: false, + id: 0, + } + return core } -type core struct { +type Core struct { // TODO: add required fields + ttl time.Duration + deliveryEntry chan ubroker.Delivery + requeueEntry chan requeue + publishEntry []request + acknowledgeEntry []int + delivered bool + isClosed bool + close bool + id int + queueGroup sync.WaitGroup + syncCore sync.Mutex } -func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") +type request struct { + msg ubroker.Delivery + ttlTime time.Time + error chan error +} +type acknowledge struct { + id int + error chan error +} +type requeue struct { + id int + error chan error } -func (c *core) Acknowledge(ctx context.Context, id int) error { +func (core *Core) startRequest() error { + core.syncCore.Lock() + if core.IsClosed() { + return ubroker.ErrClosed + } + defer core.syncCore.Unlock() + core.queueGroup.Add(1) + return nil +} +func (core *Core) IsClosed() bool { + if core.close == true { + return true + } else { + return false + } +} +func (core *Core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + //return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if ctx.Err() == context.Canceled { + return nil, ctx.Err() + } + if ctx.Err() == context.DeadlineExceeded { + return nil, ctx.Err() + } + core.queueGroup.Add(1) + defer core.queueGroup.Done() + core.syncCore.Lock() + defer core.syncCore.Unlock() + if core.isClosed == true { + return nil, ubroker.ErrClosed + } + core.delivered = true + return core.deliveryEntry, nil } -func (c *core) ReQueue(ctx context.Context, id int) error { +func (core *Core) Acknowledge(ctx context.Context, id int) error { // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if ctx.Err() == context.Canceled { + return ctx.Err() + } + if ctx.Err() == context.DeadlineExceeded { + return ctx.Err() + } + core.syncCore.Lock() + defer core.syncCore.Unlock() + if core.isClosed == true { + return ubroker.ErrClosed + } + if core.delivered == false { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } + var indexID = -1 + for index, message := range core.publishEntry { + if message.msg.ID == id { + indexID = index + break + } + } + var ackIndex = -1 + for index, ids := range core.acknowledgeEntry { + if ids == id { + ackIndex = index + break + } + } + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } + if ackIndex != -1 { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } + if time.Now().Sub(core.publishEntry[indexID].ttlTime) > core.ttl { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } else { + core.acknowledgeEntry = append(core.acknowledgeEntry, id) + return nil + } + } -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { +func (core *Core) ReQueue(ctx context.Context, id int) error { // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if ctx.Err() == context.Canceled { + return ctx.Err() + } + if ctx.Err() == context.DeadlineExceeded { + return ctx.Err() + } + core.syncCore.Lock() + defer core.syncCore.Unlock() + if core.isClosed == true { + return ubroker.ErrClosed + } + if core.delivered == false { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } + var indexID = -1 + for index, message := range core.publishEntry { + if message.msg.ID == id { + indexID = index + break + } + } + if indexID == -1 { + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } + if time.Now().Sub(core.publishEntry[indexID].ttlTime) > core.ttl { + _ = core.reRequest(core.publishEntry[indexID]) + core.publishEntry = append(core.publishEntry[:indexID], core.publishEntry[indexID+1:]...) + return errors.Wrap(ubroker.ErrInvalidID, "Error") + } else { + _ = core.reRequest(core.publishEntry[indexID]) + core.publishEntry = append(core.publishEntry[:indexID], core.publishEntry[indexID+1:]...) + return nil + + } } +func (core *Core) reRequest(req request) error { + core.id = core.id + 1 + var newMsg ubroker.Delivery + newMsg.Message = req.msg.Message + newMsg.ID = core.id + var newMessage = request{} + newMessage.msg = newMsg + newMessage.ttlTime = time.Now() + core.publishEntry = append(core.publishEntry, newMessage) + core.deliveryEntry <- newMsg + return nil +} +func (core *Core) Publish(ctx context.Context, message ubroker.Message) error { + // TODO:‌ implement me + //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if ctx.Err() == context.Canceled { + return ctx.Err() + } + if ctx.Err() == context.DeadlineExceeded { + return ctx.Err() + } + core.syncCore.Lock() + defer core.syncCore.Unlock() + if core.isClosed == true { + return ubroker.ErrClosed + } + core.id = core.id + 1 + var newMsg ubroker.Delivery + newMsg.Message = message + newMsg.ID = core.id + core.deliveryEntry <- newMsg + var newMessage = request{} + newMessage.msg = newMsg + newMessage.ttlTime = time.Now() + core.publishEntry = append(core.publishEntry, newMessage) + return nil -func (c *core) Close() error { +} +func (core *Core) Close() error { // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + if core.isClosed { + return nil + } + core.syncCore.Lock() + defer core.syncCore.Unlock() + core.isClosed = true + close(core.deliveryEntry) + return nil } From c0d87d61fe6d2eacbb4ac55cb042714ba1f8d915 Mon Sep 17 00:00:00 2001 From: reygar Date: Sat, 13 Apr 2019 00:32:50 +0430 Subject: [PATCH 2/2] Update core.go --- internal/broker/core.go | 67 ++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/internal/broker/core.go b/internal/broker/core.go index 0c4592e..fb2545c 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -1,4 +1,5 @@ package broker + import ( "context" "github.com/arcana261/ubroker/pkg/ubroker" @@ -37,7 +38,6 @@ type Core struct { queueGroup sync.WaitGroup syncCore sync.Mutex } - type request struct { msg ubroker.Delivery ttlTime time.Time @@ -70,7 +70,7 @@ func (core *Core) IsClosed() bool { } func (core *Core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { // TODO:‌ implement me - //return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") if ctx.Err() == context.Canceled { return nil, ctx.Err() } @@ -87,7 +87,6 @@ func (core *Core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) core.delivered = true return core.deliveryEntry, nil } - func (core *Core) Acknowledge(ctx context.Context, id int) error { // TODO:‌ implement me //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") @@ -105,35 +104,33 @@ func (core *Core) Acknowledge(ctx context.Context, id int) error { if core.delivered == false { return errors.Wrap(ubroker.ErrInvalidID, "Error") } - var indexID = -1 - for index, message := range core.publishEntry { + var index = -1 + for counter, message := range core.publishEntry { if message.msg.ID == id { - indexID = index + index = counter break } } - var ackIndex = -1 - for index, ids := range core.acknowledgeEntry { + var ackId = -1 + for counter, ids := range core.acknowledgeEntry { if ids == id { - ackIndex = index + ackId = counter break } } - if indexID == -1 { + if index == -1 { return errors.Wrap(ubroker.ErrInvalidID, "Error") } - if ackIndex != -1 { + if ackId != -1 { return errors.Wrap(ubroker.ErrInvalidID, "Error") } - if time.Now().Sub(core.publishEntry[indexID].ttlTime) > core.ttl { + if time.Now().Sub(core.publishEntry[index].ttlTime) > core.ttl { return errors.Wrap(ubroker.ErrInvalidID, "Error") } else { core.acknowledgeEntry = append(core.acknowledgeEntry, id) return nil } - } - func (core *Core) ReQueue(ctx context.Context, id int) error { // TODO:‌ implement me //return errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") @@ -151,37 +148,36 @@ func (core *Core) ReQueue(ctx context.Context, id int) error { if core.delivered == false { return errors.Wrap(ubroker.ErrInvalidID, "Error") } - var indexID = -1 - for index, message := range core.publishEntry { + var index = -1 + for counter, message := range core.publishEntry { if message.msg.ID == id { - indexID = index + index = counter break } } - if indexID == -1 { + if index == -1 { return errors.Wrap(ubroker.ErrInvalidID, "Error") } - if time.Now().Sub(core.publishEntry[indexID].ttlTime) > core.ttl { - _ = core.reRequest(core.publishEntry[indexID]) - core.publishEntry = append(core.publishEntry[:indexID], core.publishEntry[indexID+1:]...) + if time.Now().Sub(core.publishEntry[index].ttlTime) > core.ttl { + _ = core.reRequest(core.publishEntry[index]) + core.publishEntry = append(core.publishEntry[:index], core.publishEntry[index+1:]...) return errors.Wrap(ubroker.ErrInvalidID, "Error") } else { - _ = core.reRequest(core.publishEntry[indexID]) - core.publishEntry = append(core.publishEntry[:indexID], core.publishEntry[indexID+1:]...) + _ = core.reRequest(core.publishEntry[index]) + core.publishEntry = append(core.publishEntry[:index], core.publishEntry[index+1:]...) return nil - } } func (core *Core) reRequest(req request) error { core.id = core.id + 1 - var newMsg ubroker.Delivery - newMsg.Message = req.msg.Message - newMsg.ID = core.id + var newMessageBroker ubroker.Delivery + newMessageBroker.Message = req.msg.Message + newMessageBroker.ID = core.id var newMessage = request{} - newMessage.msg = newMsg + newMessage.msg = newMessageBroker newMessage.ttlTime = time.Now() core.publishEntry = append(core.publishEntry, newMessage) - core.deliveryEntry <- newMsg + core.deliveryEntry <- newMessageBroker return nil } func (core *Core) Publish(ctx context.Context, message ubroker.Message) error { @@ -198,17 +194,18 @@ func (core *Core) Publish(ctx context.Context, message ubroker.Message) error { if core.isClosed == true { return ubroker.ErrClosed } + //request{message,0,""} + //_ = core.reRequest(core.publishEntry[request{message,0,""}]) core.id = core.id + 1 - var newMsg ubroker.Delivery - newMsg.Message = message - newMsg.ID = core.id - core.deliveryEntry <- newMsg + var newMessageBroker ubroker.Delivery + newMessageBroker.Message = message + newMessageBroker.ID = core.id + core.deliveryEntry <- newMessageBroker var newMessage = request{} - newMessage.msg = newMsg + newMessage.msg = newMessageBroker newMessage.ttlTime = time.Now() core.publishEntry = append(core.publishEntry, newMessage) return nil - } func (core *Core) Close() error { // TODO:‌ implement me