Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ version: 2
jobs:
build:
docker:
- image: circleci/golang:1.17
- image: bigtruedata/gcloud-pubsub-emulator
command: "start --host-port=0.0.0.0:8085"
- image: cimg/go:1.22
- image: messagebird/gcloud-pubsub-emulator
- image: nsqio/nsq
command: "nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=127.0.0.1:4150 --broadcast-address=127.0.0.1 --data-path=/var/opt --mem-queue-size=1000"
- image: nsqio/nsq
command: "nsqlookupd"
working_directory: /go/src/github.com/pcelvng/task
working_directory: ~/task
steps:
- checkout
- run: go get -v -t -d ./...
- run: go test -cover -v ./...
- run: go test -race -cover -v ./...
10 changes: 0 additions & 10 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion bus/info/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type Consumer struct {
Bus string `json:"bus,omitempty"`
Topic string `json:"topic,omitempty"`
Channel string `json:"channel,omitempty"`
Received int `json:"received"`
Received int32 `json:"received"`
}

type Producer struct {
Expand Down
48 changes: 35 additions & 13 deletions bus/nop/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,41 @@ package nop

import (
"errors"
"sync"

"github.com/pcelvng/task/bus/info"
)

const (
// Init_err returns err on NewConsumer
InitErr = "init_err"
MsgErr = "msg_err" // returns err on Consumer.Msg() call.
//Err = "err" // every method returns an error
InitErr = "init_err" // returns err on NewConsumer
MsgErr = "msg_err" // returns err on Consumer.Msg() call.
MsgDone = "msg_done" // returns a nil task message done=true on Consumer.Msg() call.
MsgMsgDone = "msg_msg_done" // returns a non-nil task message and done=true Consumer.Msg() call.
Repeat = "repeat" // message will not be removed from queue and consumer with always return a message
StopErr = "stop_err" // returns err on Stop() method call
)

// FakeMsg can be set to control the returned
// Msg() msg value.
var FakeMsg = []byte(`{"type":"test","info":"test-info","created":"2017-01-01T00:00:01Z"}`)
const FakeMsg = `{"type":"test","info":"test-info","created":"2017-01-01T00:00:01Z"}`

// NewConsumer returns a nop (no-operation) Consumer.
// Will return *Consumer == nil and err != nil
// if mock == "init_err".
func NewConsumer(mock string) (*Consumer, error) {
func NewConsumer(mock string, msgs ...string) (*Consumer, error) {
if mock == "init_err" {
return nil, errors.New(mock)
}

return &Consumer{Mock: mock, Stats: info.Consumer{Bus: "nop"}}, nil
if len(msgs) == 0 {
msgs = []string{FakeMsg}
}

return &Consumer{
Mock: mock,
Stats: info.Consumer{Bus: "nop"},
messages: msgs,
}, nil
}

// Consumer is a no-operation consumer. It
Expand All @@ -46,30 +54,44 @@ type Consumer struct {
// - "stop_err" - returns err on Stop() method call
Mock string
Stats info.Consumer
mu sync.Mutex

// Message queue that will be returned when called.
messages []string
index int
}

// Msg will always return a fake task message unless err != nil
// or Mock == "msg_done".
func (c *Consumer) Msg() (msg []byte, done bool, err error) {
if c.Mock == "msg_err" {
return msg, false, errors.New(c.Mock)
c.mu.Lock()
defer c.mu.Unlock()
if c.Mock == MsgErr {
return nil, false, errors.New(c.Mock)
}

if c.Mock == "msg_done" {
return msg, true, err
if c.Mock == MsgDone || c.index >= len(c.messages) {
return nil, true, nil
}

if c.Mock == "msg_msg_done" {
if c.Mock == MsgMsgDone {
done = true
}

// set fake msg
msg = []byte(c.messages[c.index])
c.Stats.Received++
msg = FakeMsg
c.index++
if c.Mock == Repeat && c.index >= len(c.messages) {
c.index = 0
}

return msg, done, err
}

func (c *Consumer) Info() info.Consumer {
c.mu.Lock()
defer c.mu.Unlock()
return c.Stats
}

Expand Down
11 changes: 7 additions & 4 deletions bus/nop/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestNewConsumer(t *testing.T) {

func TestConsumer_Msg(t *testing.T) {
type Output struct {
msg []byte
msg string
done bool
}
fn := func(args ...interface{}) (interface{}, error) {
Expand All @@ -33,22 +33,25 @@ func TestConsumer_Msg(t *testing.T) {
return Output{}, err
}
msg, done, err := c.Msg()
return Output{msg, done}, err
return Output{string(msg), done}, err
}

trial.New(fn, trial.Cases{
"default": {
Input: "",
Expected: Output{msg: FakeMsg},
},
"repeat": {
Input: Repeat,
Expected: Output{msg: FakeMsg},
},
"keyword: msg_err": {
Input: MsgErr,
ShouldErr: true,
},
"keyword: msg_done": {
Input: MsgDone,
Expected: Output{
msg: nil,
done: true,
},
},
Expand All @@ -63,7 +66,7 @@ func TestConsumer_Msg(t *testing.T) {
}

func TestConsumer_Info(t *testing.T) {
c, _ := NewConsumer("")
c, _ := NewConsumer("repeat")
for i := 0; i < 10; i++ {
c.Msg()
}
Expand Down
2 changes: 1 addition & 1 deletion bus/nop/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Producer struct {
// - "send_err" - returns err when Producer.Send() is called.
// - "stop_err" - returns err on Stop() method call
Mock string
Messages map[string][]string // [topic]Messages
Messages map[string][]string // [topic]messages
Stats info.Producer
mu sync.Mutex
}
Expand Down
15 changes: 9 additions & 6 deletions bus/nsq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

gonsq "github.com/nsqio/go-nsq"

"github.com/pcelvng/task/bus/info"
)

Expand Down Expand Up @@ -59,19 +60,21 @@ func NewConsumer(topic, channel string, opt *Option) (*Consumer, error) {
}

func (c *Consumer) checkMaxInFlight(topic, channel string) {
// consumer is consider locked up if
// consumer is considered locked up if
// 1. channel has depth
// 2. msqRequested > msqReceived // is waiting for a message
// 3. prevReceived = msgReceived // no new message has come in time limit
maxInFlight := 2
var prev int64
var prev, msgReq, msgRec int64
for ; ; time.Sleep(10 * time.Second) {
depth := getDepth(c.opt.LookupdAddrs, topic, channel)
if depth <= 0 || c.msgRequested <= c.msgReceived {
msgReq = atomic.LoadInt64(&c.msgRequested)
msgRec = atomic.LoadInt64(&c.msgReceived)
if depth <= 0 || msgReq <= msgRec {
continue
}

if prev == c.msgReceived {
if prev == msgRec {
//set maxinflight to 2, wait
log.Printf("lockup detected: maxInFlight set to %d", maxInFlight)
c.consumer.ChangeMaxInFlight(maxInFlight)
Expand All @@ -80,7 +83,7 @@ func (c *Consumer) checkMaxInFlight(topic, channel string) {
maxInFlight = 2
}
} else {
prev = c.msgReceived
prev = msgRec
}

}
Expand Down Expand Up @@ -256,7 +259,7 @@ func (c *Consumer) Msg() (msg []byte, done bool, err error) {
case <-c.ctx.Done():
done = true
}
c.info.Received++
atomic.AddInt32(&c.info.Received, 1)
return msg, done, err
}

Expand Down
7 changes: 3 additions & 4 deletions bus/nsq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ func TestNewConsumer(t *testing.T) {
channel string
opts Option
}
fn := func(in trial.Input) (interface{}, error) {
v := in.Interface().(input)
c, err := NewConsumer(v.topic, v.channel, &v.opts)
fn := func(in input) (bool, error) {
c, err := NewConsumer(in.topic, in.channel, &in.opts)
isValid := c != nil
if isValid && err == nil {
err = c.Stop()
}
return isValid, err
}
cases := trial.Cases{
cases := trial.Cases[input, bool]{
"blank": {
Input: input{},
ShouldErr: true,
Expand Down
29 changes: 15 additions & 14 deletions bus/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package pubsub

import (
"context"
"log"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/hydronica/trial"
)

var skipPubsub bool
var skipText = "\x1b[1;34mSKIP: pubsub not running\x1b[0m"

const pubsub_host = "127.0.0.1:8681"

func TestMain(t *testing.M) {
opts := &Option{Host: "127.0.0.1:8085"}
opts := &Option{Host: pubsub_host, ProjectID: "task-test"}
_, err := Topics(opts)
if err != nil {
log.Println("pubsub not running", err)
Expand All @@ -29,29 +29,30 @@ func TestNewConsumer(t *testing.T) {
if skipPubsub {
t.Skip(skipText)
}
fn := func(in trial.Input) (interface{}, error) {
opts := in.Interface().(*Option)
fn := func(opts *Option) (bool, error) {
os.Setenv("PUBSUB_EMULATOR_HOST", "")

c, err := opts.NewConsumer()
return c != nil, err
}

cases := trial.Cases{
cases := trial.Cases[*Option, bool]{
"local host": {
Input: NewOption("127.0.0.1:8085", "test", "topic1-sub", "topic", ""),
Input: NewOption(pubsub_host, "test", "topic1-sub", "topic", ""),
Expected: true,
},
"missing topic": {
Input: &Option{
Host: "127.0.0.1:8085",
Host: pubsub_host,
ProjectID: "task-test",
},
ShouldErr: true,
},
"missing subscription": {
Input: &Option{
Host: "127.0.0.1:8085",
Topic: "topic1",
Host: pubsub_host,
ProjectID: "task-test",
Topic: "topic1",
},
ShouldErr: true,
},
Expand All @@ -68,7 +69,7 @@ func TestTopics(t *testing.T) {
if skipPubsub {
t.Skip(skipText)
}
opts := &Option{Host: "127.0.0.1:8085"}
opts := &Option{Host: pubsub_host, ProjectID: "task-test"}
s, err := Topics(opts)
if err != nil {
t.Fatal(err)
Expand All @@ -82,7 +83,7 @@ func TestProducer(t *testing.T) {
if skipPubsub {
t.Skip(skipText)
}
opts := &Option{Host: "127.0.0.1:8085"}
opts := &Option{Host: pubsub_host, ProjectID: "task-test"}
producer, err := opts.NewProducer()
if err != nil {
log.Fatal("new", err)
Expand Down Expand Up @@ -126,8 +127,9 @@ func TestConsumer_Msg(t *testing.T) {
}
// setup
opts := &Option{
Host: "127.0.0.1:8085",
Host: pubsub_host,
Topic: "topic1",
ProjectID: "task-test",
Subscription: "topic1-sub",
}
consumer, err := opts.NewConsumer()
Expand Down Expand Up @@ -164,7 +166,6 @@ func TestConsumer_Msg(t *testing.T) {
}

consumer, _ = opts.NewConsumer()
consumer.ctx, _ = context.WithTimeout(consumer.ctx, 5*time.Second)
tName = "read messages"
msgs := make([]string, 0)
for i := 0; i < msgCount; i++ {
Expand Down
Loading