diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..e312037
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/ubroker.iml b/.idea/ubroker.iml
new file mode 100644
index 0000000..5e764c4
--- /dev/null
+++ b/.idea/ubroker.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 0000000..49f719b
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,481 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ arcana261
+ testack
+ cancel
+ testdata
+ append
+ race
+ lastID
+
+
+ amirmh
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ file://$PROJECT_DIR$/internal/broker/core_test.go
+ 329
+
+
+
+ file://$PROJECT_DIR$/internal/broker/core_test.go
+ 29
+
+
+
+ file://$PROJECT_DIR$/internal/broker/core_test.go
+ 40
+
+
+
+ file://$PROJECT_DIR$/internal/broker/core_test.go
+ 58
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index a573047..002a79d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,8 +3,5 @@ language: go
go:
- 1.12.x
-before_install:
- - make dev-dependencies
-
script:
- make check
diff --git a/.travis.yml1 b/.travis.yml1
new file mode 100644
index 0000000..a573047
--- /dev/null
+++ b/.travis.yml1
@@ -0,0 +1,10 @@
+language: go
+
+go:
+ - 1.12.x
+
+before_install:
+ - make dev-dependencies
+
+script:
+ - make check
diff --git a/internal/broker/core.go b/internal/broker/core.go
index f9b0a8b..2b56409 100644
--- a/internal/broker/core.go
+++ b/internal/broker/core.go
@@ -2,10 +2,16 @@ package broker
import (
"context"
+ "math"
+ "sync/atomic"
"time"
"github.com/arcana261/ubroker/pkg/ubroker"
- "github.com/pkg/errors"
+ "sync"
+)
+
+var (
+ lastID int32
)
// New creates a new instance of ubroker.Broker
@@ -13,34 +19,175 @@ import (
// we requeue an unacknowledged/unrequeued message
// automatically.
func New(ttl time.Duration) ubroker.Broker {
- return &core{}
+ return &core{
+ isClosed: make(chan bool, 1),
+ ttl: ttl,
+ deliveryChannel: make(chan ubroker.Delivery, 1),
+ processingQueue: make(map[int]waitForAckStruct),
+ processingMutex: &sync.Mutex{},
+ publishMutex: &sync.Mutex{},
+ publishOrderMutex: &sync.Mutex{},
+ writeWaitGp: &sync.WaitGroup{},
+ lastMsg: ubroker.Delivery{ID:-1},
+ publishQueue: make(chan ubroker.Message, math.MaxInt16),
+ }
}
+
type core struct {
- // TODO: add required fields
+ isClosed chan bool
+ ttl time.Duration
+ deliveryChannel chan ubroker.Delivery
+ processingQueue map[int]waitForAckStruct
+ processingMutex *sync.Mutex
+ publishMutex *sync.Mutex
+ publishOrderMutex *sync.Mutex
+ lastMsg ubroker.Delivery
+ publishQueue chan ubroker.Message
+ writeWaitGp *sync.WaitGroup
+}
+
+type waitForAckStruct struct {
+ message ubroker.Delivery
+ ackChannnel chan interface{}
+}
+
+func (c *core) waitForAck(ctx context.Context, ubrokerMsg waitForAckStruct) {
+ select {
+ case <-time.After(c.ttl):
+ _ = c.ReQueue(ctx, ubrokerMsg.message.ID)
+ case <-ctx.Done():
+ return
+ case <-c.isClosed:
+ return
+ case <-ubrokerMsg.ackChannnel:
+ return
+ }
}
func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) {
- // TODO: implement me
- return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented")
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-c.isClosed:
+ return nil, ubroker.ErrClosed
+ default:
+ }
+
+ return c.deliveryChannel, nil
}
func (c *core) Acknowledge(ctx context.Context, id int) error {
- // TODO: implement me
- return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented")
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.isClosed:
+ return ubroker.ErrClosed
+ default:
+ }
+
+ //fmt.Println("lll")
+ c.processingMutex.Lock()
+ defer c.processingMutex.Unlock()
+ waited, ok := c.processingQueue[id]
+ if !ok {
+ return ubroker.ErrInvalidID
+ }
+ close(waited.ackChannnel)
+ delete(c.processingQueue, id)
+ //fmt.Println("uuu")
+ return nil
}
func (c *core) ReQueue(ctx context.Context, id int) error {
- // TODO: implement me
- return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented")
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.isClosed:
+ return ubroker.ErrClosed
+ default:
+ }
+
+ //fmt.Println("ll")
+ c.processingMutex.Lock()
+ defer c.processingMutex.Unlock()
+ waited, ok := c.processingQueue[id]
+ if !ok {
+ return ubroker.ErrInvalidID
+ }
+ close(waited.ackChannnel)
+ delete(c.processingQueue, id)
+ _ = c.Publish(ctx, waited.message.Message)
+ //fmt.Println("uu")
+ return nil
}
func (c *core) Publish(ctx context.Context, message ubroker.Message) error {
- // TODO: implement me
- return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented")
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.isClosed:
+ return ubroker.ErrClosed
+ default:
+ }
+
+ c.publishOrderMutex.Lock()
+
+ select {
+ case <-c.isClosed:
+ return nil
+ default:
+ }
+
+ c.publishQueue <- message
+
+ c.publishOrderMutex.Unlock()
+ go func() {
+ //c.writeWaitGp.Add(1)
+ //defer c.writeWaitGp.Done()
+ c.publishMutex.Lock()
+
+ atomic.AddInt32(&lastID, 1)
+ brokerMsg := ubroker.Delivery{
+ Message: <-c.publishQueue,
+ ID: int(atomic.LoadInt32(&lastID)),
+ }
+ processingMsg := waitForAckStruct{
+ message: brokerMsg,
+ ackChannnel: make(chan interface{}, 1),
+ }
+
+
+ c.processingMutex.Lock()
+ c.processingQueue[processingMsg.message.ID] = processingMsg
+ c.processingMutex.Unlock()
+
+ select {
+ case <-c.isClosed:
+ return
+ default:
+ c.deliveryChannel <- brokerMsg
+ go c.waitForAck(ctx, processingMsg)
+
+ }
+ c.publishMutex.Unlock()
+
+ }()
+
+
+ return nil
}
func (c *core) Close() error {
- // TODO: implement me
- return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented")
+ //c.writeWaitGp.Wait()
+ c.publishMutex.Lock()
+ c.publishOrderMutex.Lock()
+ close(c.deliveryChannel)
+ close(c.isClosed)
+ close(c.publishQueue)
+ c.publishOrderMutex.Unlock()
+ c.publishMutex.Unlock()
+ return nil
}
diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go
index 9d6530d..b57dc2c 100644
--- a/internal/broker/core_test.go
+++ b/internal/broker/core_test.go
@@ -375,12 +375,14 @@ func (s *CoreBrokerTestSuite) TestDataRace() {
return
case msg := <-delivery:
+
err = s.broker.Acknowledge(context.Background(), msg.ID)
if err == ubroker.ErrClosed {
return
}
s.Nil(err)
if err != nil {
+ fmt.Println(err)
return
}
}
@@ -400,19 +402,25 @@ func (s *CoreBrokerTestSuite) TestDataRace() {
for {
select {
case <-ticker.C:
+
return
case msg := <-delivery:
+
err = s.broker.ReQueue(context.Background(), msg.ID)
if err == ubroker.ErrClosed {
+
return
}
s.Nil(err)
if err != nil {
+ fmt.Println(err)
+
return
}
}
}
+
}()
wg.Add(1)