Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 308 additions & 13 deletions internal/broker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,340 @@ package broker

import (
"context"
"github.com/pkg/errors"
"sync"
"time"

"github.com/arcana261/ubroker/pkg/ubroker"
"github.com/pkg/errors"
)

// New creates a new instance of ubroker.Broker
// with given `ttl`. `ttl` determines time in which
// we requeue an unacknowledged/unrequeued message
// automatically.
func New(ttl time.Duration) ubroker.Broker {
return &core{}
broker := &core{
ttl: ttl,
requests: make(chan interface{}),
deliveryChannel: make(chan ubroker.Delivery),
closed: make(chan bool, 1),
closing: make(chan bool, 1),
pending: make(map[int]ubroker.Message),
messages: []ubroker.Delivery{{}},
}

broker.wg.Add(1)
go broker.startDelivery()

return broker
}

type core struct {
// TODO: add required fields
nextID int
ttl time.Duration

mutex sync.Mutex
working sync.WaitGroup
wg sync.WaitGroup

requests chan interface{}
deliveryChannel chan ubroker.Delivery
closed chan bool
closing chan bool
pending map[int]ubroker.Message
messages []ubroker.Delivery
channel chan ubroker.Delivery
}

type acknowledgeRequest struct {
id int
response chan acknowledgeResponse
}

type acknowledgeResponse struct {
id int
err error
}

type requeueRequest struct {
id int
response chan requeueResponse
}

type requeueResponse struct {
id int
err error
}

type publishRequest struct {
message ubroker.Message
response chan publishResponse
}

type publishResponse struct {
err error
}

func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
// TODO:‌ implement me
return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented")
if isCanceledContext(ctx) {
return nil, ctx.Err()
}

if !c.startWorking() {
return nil, ubroker.ErrClosed
}
defer c.working.Done()

return c.deliveryChannel, nil
}

func (c *core) Acknowledge(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented")
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()

request := &acknowledgeRequest{
id: id,
response: make(chan acknowledgeResponse, 1),
}

select {
case <-ctx.Done():
return ctx.Err()
case c.requests <- request:
select {
case response := <-request.response:
return response.err
case <-ctx.Done():
return ctx.Err()
}
}
}

func (c *core) ReQueue(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented")
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()

request := &requeueRequest{
id: id,
response: make(chan requeueResponse, 1),
}

select {
case <-ctx.Done():
return ctx.Err()
case c.requests <- request:
select {
case response := <-request.response:
return response.err
case <-ctx.Done():
return ctx.Err()
}
}
}

func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented")
if isCanceledContext(ctx) {
return ctx.Err()
}

if !c.startWorking() {
return ubroker.ErrClosed
}
defer c.working.Done()

request := &publishRequest{
message: message,
response: make(chan publishResponse, 1),
}

select {
case <-ctx.Done():
return ctx.Err()
case <-c.closed:
return ubroker.ErrClosed
case c.requests <- request:
return nil
}
}

func (c *core) Close() error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented")
if !c.startClosing() {
return errors.New("can not close channel, closing in progress")
}
c.working.Wait()
close(c.closed)
c.wg.Wait()
close(c.deliveryChannel)

return nil
}

func (c *core) startDelivery() {
defer c.wg.Done()
for {
select {
case <-c.closed:
return

case request := <-c.requests:
if isAcknowledgeRequest(request) {
c.wg.Add(1)
req, _ := request.(*acknowledgeRequest)
req.response <- c.handleAcknowledge(req)
} else if isRequeueRequest(request) {
c.wg.Add(1)
req, _ := request.(*requeueRequest)
req.response <- c.handleRequeue(req)
} else if isPublishRequest(request) {
c.wg.Add(1)
req, _ := request.(*publishRequest)
req.response <- c.handlePublish(req)
} else {
panic(errors.New("UNKNOWN REQUEST"))
}

case c.channel <- c.messages[0]:
if c.channel != nil {
c.pending[c.messages[0].ID] = c.messages[0].Message
c.wg.Add(1)
go c.snooze(c.messages[0].ID)

c.messages = c.messages[1:]
if len(c.messages) == 0 {
c.channel = nil
c.messages = []ubroker.Delivery{{}}
}
}
}
}
}

func (c *core) startWorking() bool {
c.mutex.Lock()
defer c.mutex.Unlock()

select {
case <-c.closing:
return false
default:
c.working.Add(1)
return true
}
}

func (c *core) startClosing() bool {
c.mutex.Lock()
defer c.mutex.Unlock()

select {
case <-c.closing:
return false
default:
close(c.closing)
return true
}
}

func isCanceledContext(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func isAcknowledgeRequest(request interface{}) bool {
_, ok := request.(*acknowledgeRequest)
return ok
}

func isRequeueRequest(request interface{}) bool {
_, ok := request.(*requeueRequest)
return ok
}

func isPublishRequest(request interface{}) bool {
_, ok := request.(*publishRequest)
return ok
}

func (c *core) handleAcknowledge(request *acknowledgeRequest) acknowledgeResponse {
defer c.wg.Done()
_, ok := c.pending[request.id]
if !ok {
return acknowledgeResponse{id: request.id, err: ubroker.ErrInvalidID}
}
delete(c.pending, request.id)
return acknowledgeResponse{id: request.id, err: nil}
}

func (c *core) handleRequeue(request *requeueRequest) requeueResponse {
defer c.wg.Done()
message, ok := c.pending[request.id]
if !ok {
return requeueResponse{id: request.id, err: ubroker.ErrInvalidID}
}
delete(c.pending, request.id)
c.wg.Add(1)
c.handlePublish(&publishRequest{
message: message,
response: make(chan publishResponse, 1),
})
return requeueResponse{id: request.id, err: nil}
}

func (c *core) handlePublish(request *publishRequest) publishResponse {
defer c.wg.Done()

if c.channel == nil {
c.messages = []ubroker.Delivery{}
c.channel = c.deliveryChannel
}

id := c.nextID
c.nextID++
newDelivery := ubroker.Delivery{
ID: id,
Message: request.message,
}

c.messages = append(c.messages, newDelivery)

return publishResponse{err: nil}
}

func (c *core) snooze(id int) {
defer c.wg.Done()
ticker := time.NewTicker(c.ttl)
defer ticker.Stop()

select {
case <-c.closed:
return

case <-ticker.C:
request := &requeueRequest{
id: id,
response: make(chan requeueResponse, 1),
}
select {
case <-c.closed:
return

case c.requests <- request:
}
}
}