Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
514232e
add .idea to .gitignore
Meshkati Apr 11, 2019
041e71a
Change path names to my workstation
Meshkati Apr 11, 2019
e32592a
internal/broker: Implement filterContextError()
Meshkati Apr 12, 2019
2e79839
internal/broker: Implement basic Delivery()
Meshkati Apr 12, 2019
de37047
internal/broker: Implement basic ttlHandler()
Meshkati Apr 12, 2019
d6c4392
internal/broker: Implement basic Publish()
Meshkati Apr 12, 2019
29835cd
internal/broker: Delete delivery ACK channel from map after ACK received
Meshkati Apr 12, 2019
1275a4f
internal/broker: Implement basic Acknowledge()
Meshkati Apr 12, 2019
c361d5e
internal/broker: Implement basic ReQueue()
Meshkati Apr 12, 2019
0bd6154
internal/broker: Consider negative IDs as invalid
Meshkati Apr 12, 2019
3ea4b74
internal/broker: Add mutex on sequenceNumber
Meshkati Apr 12, 2019
da22f26
internal/broker: Generate new ID on ReQueue()
Meshkati Apr 12, 2019
4e2307e
internal/broker: Check if the delivery is started or not, in ACK and REQ
Meshkati Apr 12, 2019
501d7e3
internal/broker: Closing the main channel on Broker.close()
Meshkati Apr 12, 2019
e124d9c
internal/broker: Add pendingMutex for the pending map
Meshkati Apr 12, 2019
b97d27e
internal/broker: Solve race condition on multiple Acknowledge()
Meshkati Apr 12, 2019
4f308ab
internal/broker: Solved race condition on multiple ReQueue()
Meshkati Apr 12, 2019
b095f49
internal/broker: Solved race condition on Close()
Meshkati Apr 12, 2019
cfbe110
all: change GOPATH to arcana261
Meshkati Apr 12, 2019
5c50dd1
Revert "all: change GOPATH to arcana261"
Meshkati Apr 12, 2019
bd03860
Revert "Revert "all: change GOPATH to arcana261""
Meshkati Apr 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/ubroker
/ubroker
.idea
183 changes: 171 additions & 12 deletions internal/broker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package broker

import (
"context"
"sync"
"time"

"github.com/arcana261/ubroker/pkg/ubroker"
Expand All @@ -10,37 +11,195 @@ import (

// New creates a new instance of ubroker.Broker
// with given `ttl`. `ttl` determines time in which
// we requeue an unacknowledged/unrequeued message
// we requeue an unacknowledged/un-re-queued message
// automatically.
func New(ttl time.Duration) ubroker.Broker {
return &core{}
return &core{
sequenceNumber: -1,
mainChannel: make(chan ubroker.Delivery, 100),
ackMap: make(map[int]chan bool),
pendingMap: make(map[int]ubroker.Message),
requeueMap: make(map[int]bool),
ttl: ttl,
closed: false,
}
}

type core struct {
// TODO: add required fields
sequenceNumber int
mainChannel chan ubroker.Delivery
ackMap map[int]chan bool
pendingMap map[int]ubroker.Message
requeueMap map[int]bool
ttl time.Duration
sequenceMutex sync.Mutex
closed bool
deliveryStarted bool
}

func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
// TODO:‌ implement me
return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented")
// check if context has error
if err := filterContextError(ctx); err != nil {
return nil, ctx.Err()
}
// handling race condition
c.sequenceMutex.Lock()
defer c.sequenceMutex.Unlock()
// checking the broker
if c.closed {
return nil, errors.Wrap(ubroker.ErrClosed, "The broker is closed.")
}

c.deliveryStarted = true

return c.mainChannel, nil
}

func (c *core) Acknowledge(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented")
// check if context has error
if err := filterContextError(ctx); err != nil {
return err
}
// handling race condition
c.sequenceMutex.Lock()
defer c.sequenceMutex.Unlock()
// checking the broker
if c.closed {
//return errors.Wrap(ubroker.ErrClosed, "Acknowledge:: The broker is closed.")
return ubroker.ErrClosed
}
// check if delivery started
if !c.deliveryStarted {
return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet")
}
// check if the id is not exists
if id > c.sequenceNumber || id < 0 {
return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+" is not committed yet.")
}
// check if it's going to re-acknowledgment
if _, ok := c.ackMap[id]; !ok {
return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Message with id="+string(id)+"has been ACKed before.")
}
// everything is "probably" Ok, so we're going to mark the ACK
c.ackMap[id] <- true
// removing the message id from maps
delete(c.ackMap, id)
delete(c.pendingMap, id)

return nil
}

func (c *core) ReQueue(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented")
// check if context has error
if err := filterContextError(ctx); err != nil {
return err
}
// handling race condition
c.sequenceMutex.Lock()
defer c.sequenceMutex.Unlock()
// checking the broker
if c.closed {
//return errors.Wrap(ubroker.ErrClosed, "ReQueue:: The broker is closed.")
return ubroker.ErrClosed
}
// check if delivery started
if !c.deliveryStarted {
return errors.Wrap(ubroker.ErrInvalidID, "Acknowledge:: Delivery is not started yet")
}
// check if the id is not exists
if id > c.sequenceNumber || id < 0 {
return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is not committed yet.")
}
////check if it's going to re-queue the message
if _, ok := c.requeueMap[id]; ok {
return errors.Wrap(ubroker.ErrInvalidID, "ReQueue:: Message with id="+string(id)+" is already in the queue.")
}

// everything is "probably" Ok, so we're going to put the message in queue
tMessage := c.pendingMap[id]

c.sequenceNumber++
tDelivery := ubroker.Delivery{
Message: tMessage,
ID: c.sequenceNumber,
}
// setting the acknowledge channel
c.ackMap[tDelivery.ID] = make(chan bool, 1)
c.pendingMap[tDelivery.ID] = tDelivery.Message

go c.ttlHandler(ctx, tDelivery)
// invalidate the previous id
c.ackMap[id] <- false
// removing the message id from maps
delete(c.ackMap, id)
delete(c.pendingMap, id)
c.requeueMap[id] = true
// pushing the message into the main channel
c.mainChannel <- tDelivery

return nil
}

func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented")
// check if context has error
if err := filterContextError(ctx); err != nil {
return err
}
// handling race condition
c.sequenceMutex.Lock()
defer c.sequenceMutex.Unlock()
// checking the broker
if c.closed {
//return errors.Wrap(ubroker.ErrClosed, "Publish:: The broker is closed.")
return ubroker.ErrClosed
}
// Pushing into the channel
c.sequenceNumber++
delivery := ubroker.Delivery{
ID: c.sequenceNumber,
Message: message,
}
c.ackMap[delivery.ID] = make(chan bool, 1)
c.pendingMap[delivery.ID] = delivery.Message

go c.ttlHandler(ctx, delivery)
// push the message to channel
c.mainChannel <- delivery

return nil
}

func (c *core) Close() error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented")
c.sequenceMutex.Lock()
defer c.sequenceMutex.Unlock()

c.closed = true
close(c.mainChannel)

return nil
}

// Checks if the context has an error, returns true if the deadline has exceeded, or context had canceled
func filterContextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
return ctx.Err()
}

return nil
}

// Sets a timeout for TTL and re-queue the message after that time
func (c *core) ttlHandler(ctx context.Context, delivery ubroker.Delivery) {
c.sequenceMutex.Lock()
ch := c.ackMap[delivery.ID]
c.sequenceMutex.Unlock()

select {
case <-time.After(c.ttl):
_ = c.ReQueue(ctx, delivery.ID)
return
case <-ch:
return
}
}
8 changes: 4 additions & 4 deletions pkg/ubroker/ubroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Broker interface {
// messages to consumers.
// We require following:
//
// 1. Resulting read-only channel is unique (it doesn
// 1. Resulting read-only channel is unique (it does
// not change each time you call it)
// 2. If `ctx` is canceled or timed out, `ctx.Err()` is
// returned
Expand All @@ -36,8 +36,8 @@ type Broker interface {
Delivery(ctx context.Context) (<-chan Delivery, error)

// Acknowledge is called by clients to declare that
// specified message id has been successfuly processed
// and should not be requeued to queue and we have to
// specified message id has been successfully processed
// and should not be re-queued to queue and we have to
// remove it.
// We demand following:
//
Expand Down Expand Up @@ -84,7 +84,7 @@ type HTTPServer interface {

// Message encapsulates a queued message
type Message struct {
// Body is an abitrary client-defined string
// Body is an arbitrary client-defined string
Body string `json:"body"`
}

Expand Down