Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/ubroker.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

481 changes: 481 additions & 0 deletions .idea/workspace.xml

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@ language: go
go:
- 1.12.x

before_install:
- make dev-dependencies

script:
- make check
10 changes: 10 additions & 0 deletions .travis.yml1
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
language: go

go:
- 1.12.x

before_install:
- make dev-dependencies

script:
- make check
173 changes: 160 additions & 13 deletions internal/broker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,192 @@ package broker

import (
"context"
"math"
"sync/atomic"
"time"

"github.com/arcana261/ubroker/pkg/ubroker"
"github.com/pkg/errors"
"sync"
)

var (
lastID int32
)

// New creates a new instance of ubroker.Broker
// with given `ttl`. `ttl` determines time in which
// we requeue an unacknowledged/unrequeued message
// automatically.
func New(ttl time.Duration) ubroker.Broker {
return &core{}
return &core{
isClosed: make(chan bool, 1),
ttl: ttl,
deliveryChannel: make(chan ubroker.Delivery, 1),
processingQueue: make(map[int]waitForAckStruct),
processingMutex: &sync.Mutex{},
publishMutex: &sync.Mutex{},
publishOrderMutex: &sync.Mutex{},
writeWaitGp: &sync.WaitGroup{},
lastMsg: ubroker.Delivery{ID:-1},
publishQueue: make(chan ubroker.Message, math.MaxInt16),
}
}


type core struct {
// TODO: add required fields
isClosed chan bool
ttl time.Duration
deliveryChannel chan ubroker.Delivery
processingQueue map[int]waitForAckStruct
processingMutex *sync.Mutex
publishMutex *sync.Mutex
publishOrderMutex *sync.Mutex
lastMsg ubroker.Delivery
publishQueue chan ubroker.Message
writeWaitGp *sync.WaitGroup
}

type waitForAckStruct struct {
message ubroker.Delivery
ackChannnel chan interface{}
}

func (c *core) waitForAck(ctx context.Context, ubrokerMsg waitForAckStruct) {
select {
case <-time.After(c.ttl):
_ = c.ReQueue(ctx, ubrokerMsg.message.ID)
case <-ctx.Done():
return
case <-c.isClosed:
return
case <-ubrokerMsg.ackChannnel:
return
}
}

func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
// TODO:‌ implement me
return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented")

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.isClosed:
return nil, ubroker.ErrClosed
default:
}

return c.deliveryChannel, nil
}

func (c *core) Acknowledge(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented")

select {
case <-ctx.Done():
return ctx.Err()
case <-c.isClosed:
return ubroker.ErrClosed
default:
}

//fmt.Println("lll")
c.processingMutex.Lock()
defer c.processingMutex.Unlock()
waited, ok := c.processingQueue[id]
if !ok {
return ubroker.ErrInvalidID
}
close(waited.ackChannnel)
delete(c.processingQueue, id)
//fmt.Println("uuu")
return nil
}

func (c *core) ReQueue(ctx context.Context, id int) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented")
select {
case <-ctx.Done():
return ctx.Err()
case <-c.isClosed:
return ubroker.ErrClosed
default:
}

//fmt.Println("ll")
c.processingMutex.Lock()
defer c.processingMutex.Unlock()
waited, ok := c.processingQueue[id]
if !ok {
return ubroker.ErrInvalidID
}
close(waited.ackChannnel)
delete(c.processingQueue, id)
_ = c.Publish(ctx, waited.message.Message)
//fmt.Println("uu")
return nil
}

func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented")
select {
case <-ctx.Done():
return ctx.Err()
case <-c.isClosed:
return ubroker.ErrClosed
default:
}

c.publishOrderMutex.Lock()

select {
case <-c.isClosed:
return nil
default:
}

c.publishQueue <- message

c.publishOrderMutex.Unlock()
go func() {
//c.writeWaitGp.Add(1)
//defer c.writeWaitGp.Done()
c.publishMutex.Lock()

atomic.AddInt32(&lastID, 1)
brokerMsg := ubroker.Delivery{
Message: <-c.publishQueue,
ID: int(atomic.LoadInt32(&lastID)),
}
processingMsg := waitForAckStruct{
message: brokerMsg,
ackChannnel: make(chan interface{}, 1),
}


c.processingMutex.Lock()
c.processingQueue[processingMsg.message.ID] = processingMsg
c.processingMutex.Unlock()

select {
case <-c.isClosed:
return
default:
c.deliveryChannel <- brokerMsg
go c.waitForAck(ctx, processingMsg)

}
c.publishMutex.Unlock()

}()


return nil
}

func (c *core) Close() error {
// TODO:‌ implement me
return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented")
//c.writeWaitGp.Wait()
c.publishMutex.Lock()
c.publishOrderMutex.Lock()
close(c.deliveryChannel)
close(c.isClosed)
close(c.publishQueue)
c.publishOrderMutex.Unlock()
c.publishMutex.Unlock()
return nil
}
8 changes: 8 additions & 0 deletions internal/broker/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,14 @@ func (s *CoreBrokerTestSuite) TestDataRace() {
return

case msg := <-delivery:

err = s.broker.Acknowledge(context.Background(), msg.ID)
if err == ubroker.ErrClosed {
return
}
s.Nil(err)
if err != nil {
fmt.Println(err)
return
}
}
Expand All @@ -400,19 +402,25 @@ func (s *CoreBrokerTestSuite) TestDataRace() {
for {
select {
case <-ticker.C:

return

case msg := <-delivery:

err = s.broker.ReQueue(context.Background(), msg.ID)
if err == ubroker.ErrClosed {

return
}
s.Nil(err)
if err != nil {
fmt.Println(err)

return
}
}
}

}()

wg.Add(1)
Expand Down