Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 149 additions & 13 deletions internal/broker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,178 @@ import (
"time"

"github.com/arcana261/ubroker/pkg/ubroker"
"github.com/pkg/errors"

"sync"
"log"
)

func print(s ...interface{}){
log.Println(s...)

}

// New creates a new instance of ubroker.Broker
// with given `ttl`. `ttl` determines time in which
// we requeue an unacknowledged/unrequeued message
// automatically.
func New(ttl time.Duration) ubroker.Broker {
return &core{}
return &core{
maxId: 0,
ttl: ttl,
msgs: make(map[int]*QueueElement),
closed: false,
deliveryChannel: make(chan ubroker.Delivery, 10000),
deliveryCalled: false,
}
}

type QueueElement struct {
sync.Mutex

id int
msg string
core *core
timerCanceledChan chan struct{}
}

func (e *QueueElement) activateTimer(expTimer *time.Timer) {
go func() {
select {
case <-expTimer.C:
e.core.reQueue(e.id)
case <-e.timerCanceledChan:
}
}()
}

func (e *QueueElement) resetTimer(ttl time.Duration) {
e.stopTimer()
e.activateTimer(time.NewTimer(e.core.ttl))
}

func (e *QueueElement) stopTimer() {
select {
case e.timerCanceledChan <- struct{}{}:
default:
}
}

type core struct {
// TODO: add required fields
sync.Mutex

maxId int
ttl time.Duration
msgs map[int]*QueueElement
closed bool
deliveryChannel chan ubroker.Delivery
deliveryCalled bool
}

func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
// TODO:‌ implement me
return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented")
if err := ctx.Err(); err != nil {
return nil, err
}
c.Lock()
defer c.Unlock()
if c.closed {
return nil, ubroker.ErrClosed
}

c.deliveryCalled = true

return c.deliveryChannel, nil
}

func (c *core) Acknowledge(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented")
if err := ctx.Err(); err != nil {
return err
}
c.Lock()
defer c.Unlock()
if c.closed {
return ubroker.ErrClosed
}
if !c.deliveryCalled{
return ubroker.ErrInvalidID
}

if val, ok := c.msgs[id]; ok {
val.stopTimer()
delete(c.msgs, id)
}else{
return ubroker.ErrInvalidID
}

return nil
}

func (c *core) reQueue(id int) error {
c.Lock()
defer c.Unlock()
if c.closed {
return ubroker.ErrClosed
}
if !c.deliveryCalled{
return ubroker.ErrInvalidID
}

if val, ok := c.msgs[id]; ok {
delete(c.msgs, id)
val.stopTimer()

c.addNewMessage(ubroker.Message{Body: val.msg})
}else{
return ubroker.ErrInvalidID
}

return nil
}

func (c *core) ReQueue(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented")
if err := ctx.Err(); err != nil {
return err
}
return c.reQueue(id)
}

func (c *core) addNewMessage(message ubroker.Message){
c.maxId += 1
c.msgs[c.maxId] = &QueueElement{
id: c.maxId,
msg: message.Body,
core: c,
timerCanceledChan: make(chan struct{}),
}
c.msgs[c.maxId].activateTimer(time.NewTimer(c.ttl))
c.deliveryChannel <- ubroker.Delivery{
Message: message,
ID: c.maxId,
}
}

func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented")
if err := ctx.Err(); err != nil {
return err
}
c.Lock()
defer c.Unlock()
if c.closed {
return ubroker.ErrClosed
}

c.addNewMessage(message)
return nil
}

func (c *core) Close() error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented")
c.Lock()
defer c.Unlock()
if c.closed {
return ubroker.ErrClosed
}

c.closed = true
close(c.deliveryChannel)

return nil
}