diff --git a/.gitignore b/.gitignore index 620a673..4b66428 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/ubroker \ No newline at end of file +/ubroker +*.pb.go \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index a573047..ae3a1eb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,9 @@ go: - 1.12.x before_install: - - make dev-dependencies + - curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.7.1/protoc-3.7.1-linux-x86_64.zip + - unzip protoc-3.7.1-linux-x86_64.zip -d protoc3 + - PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make dev-dependencies script: - - make check + - PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make check diff --git a/Makefile b/Makefile index 5bc7475..cd4755e 100644 --- a/Makefile +++ b/Makefile @@ -1,23 +1,37 @@ -.PHONY: check help dependencies dev-dependencies +.PHONY: check help dependencies dev-dependencies .pre-check-go generate SRCS = $(patsubst ./%,%,$(shell find . -name "*.go" -not -path "*vendor*")) help: ## Display this help screen @grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' -check: dev-dependencies ## Run unit tests +check: dev-dependencies | generate ## Run unit tests go test ./... go test -race ./... -benchmark: dev-dependencies ## Run benchmarks +benchmark: dev-dependencies | generate ## Run benchmarks go test -bench . ./... -dependencies: ##‌ Download dependencies +dependencies: | generate ##‌ Download dependencies go get -v ./... -dev-dependencies: dependencies ##‌ Download development dependencies +dev-dependencies: dependencies | generate ##‌ Download development dependencies go get -v github.com/stretchr/testify/suite go get -v github.com/stretchr/testify/assert + go get -v github.com/phayes/freeport -ubroker: $(SRCS) | dependencies ##‌ Compile us +ubroker: $(SRCS) pkg/ubroker/ubroker.pb.go | dependencies generate ##‌ Compile us go build -o ubroker ./cmd/ubroker + +generate: pkg/ubroker/ubroker.pb.go + +pkg/ubroker/ubroker.pb.go: api/ubroker.proto | .pre-check-go + $(PROTOC) $(PROTOC_OPTIONS) --go_out=plugins=grpc:$(GOPATH)/src api/ubroker.proto + +.pre-check-go: + go get -v github.com/golang/protobuf/protoc-gen-go + go get -v github.com/vektra/mockery/.../ + +GOPATH ?= $(shell go env GOPATH) +PROTOC ?= protoc +PROTOC_OPTIONS ?= diff --git a/README.md b/README.md index 062a3b2..331a550 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,43 @@ # ubroker +Welcome to phase2! + +# Current Phase 2 + +Greetings! As you may already know, In this phase of the project we are about to take +our broker system which we implemented prevously, take out it's HTTP‌ (JSON) layer and replace +it with a neat gRPC‌ mechanism ! + +Before we begin any further, I'd like to point out that our master branch has changed and +contains implementation made by your friend, and ofcourse, my dear colleague at Cafebazaar, +[Milad Norouzi](https://github.com/miladosos) as reference implementation. I've made slight +alterations to his implementations to make it compatible with our new protocol buffer specification. + +So.. list of changes I've made to master branch are as follows: + +* `api/ubroker.proto`: This file now contains proto specification for RPC's we'd like to have. +* `Makefile`: I've added a dependency for tests (a library which generates random free ports) and ofcourse, directives to generate GO‌ code from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`. The first time you try to execute `make check` or `make ubroker` this autogenerated file will be generated from proto file. +* `pkg/ubroker/ubroker.go`: I've modified our `Broker` interface to use structs defined in our protobuffer `api/ubroker.proto`. +* `internal/broker/core.go`: Contains implementation from my colleague [Milad Norouzi](https://github.com/miladosos) +* `internal/broker/core_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/http.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/http_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto` +* `internal/server/grpc.go`: This new file is the file you shoud implement (which implements `BrokerServer` autogenerated from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`) +* `internal/server/grpc_test.go`: This new file contains tests that I'd like to PASS :) and you should too! +* `cmd/ubroker/main.go`: I've made modifications to start gRPC‌ server from implementation in `internal/server/grpc.go`. + +Your task is as follows: +1. Make sure you have `protoc` installed. (the protobuf compiler) +2. Run `make dev-dependencies` to download all dependencies and ofcourse, autogenerate `pkg/ubroker/ubroker.pb.go` +3. Implement file at `internal/server/grpc.go` +4. Submit your pull request! + +# Previous Phase 1 + Happy new year to you, students! -We wanted to set the stage for continuous series of excersices to develop +We wanted to set the stage for continuous series of exercises to develop a message broker. A message broker is a system that acts as a hub to distribute messages in a @@ -18,12 +52,12 @@ various benefits of using messaging systems, which the most important are: And much more! I highly encourage you two watch [this](https://www.youtube.com/watch?v=rXi5CLjIQ9k) awesome video! I also encourage you to look at RabbitMQ design. -So, the messaging system is expected to do following opration: +So, the messaging system is expected to do following operation: 1. Provide a `/publish` HTTP API that let's clients publish messages to a queue. Or to say enqueue messages in our queue. 2. Provide a `/fetch` HTTP‌ API that let's clients fetch messages from queue. -3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` gurantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfuly and gracefully processed them. This is why this system is called to have a `at least once` gurantee because clients might see messages **at least once**. By stating that our gurantee is at-least once, we are not referring to a randomized behaviour that we might re-send a message to clients, but rather clients might see messages more than once becuase of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfuly processed fetched message. -4. Provde a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them. +3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` guarantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfully and gracefully processed them. This is why this system is called to have an `at least once` guarantee because clients might see messages **at least once**. By stating that our gaurantee is at-least once, we are not referring to a randomized behavior that we might re-send a message to clients, but rather clients might see messages more than once because of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfully processed fetched message. +4. Provide a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them. Now... Don't you guys worry! We have already laid out a boilerplate code beautifully so that you can learn how to code in GO and also validate your results! The steps are as follows: diff --git a/api/ubroker.proto b/api/ubroker.proto new file mode 100644 index 0000000..7b2c23c --- /dev/null +++ b/api/ubroker.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package ubroker; +option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; + +import "google/protobuf/empty.proto"; + +service Broker { + // Fetch should return a single Delivery per FetchRequest. + // Should return: + // Unavailable: If broker has been closed + rpc Fetch(stream FetchRequest) returns (stream Delivery); + + // Acknowledge a message + // Should return: + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + rpc Acknowledge(AcknowledgeRequest) returns (google.protobuf.Empty); + + // ReQueue a message + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + rpc ReQueue(ReQueueRequest) returns (google.protobuf.Empty); + + // Publish message to Queue + // OK: on success + // Unavailable: If broker has been closed + rpc Publish(Message) returns (google.protobuf.Empty); +} + +message Message { + bytes body = 1; +} + +message Delivery { + Message message = 1; + int32 id = 2; +} + +message FetchRequest { +} + +message AcknowledgeRequest { + int32 id = 1; +} + +message ReQueueRequest { + int32 id = 1; +} \ No newline at end of file diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -3,10 +3,15 @@ package main import ( "flag" "fmt" + "log" + "net" "os" "os/signal" "time" + "github.com/arcana261/ubroker/pkg/ubroker" + "google.golang.org/grpc" + "github.com/arcana261/ubroker/internal/broker" "github.com/arcana261/ubroker/internal/server" ) @@ -19,13 +24,15 @@ func main() { broker := broker.New(time.Duration(*ttlPtr) * time.Millisecond) endpoint := fmt.Sprintf(":%d", *portPtr) - srv := server.NewHTTP(broker, endpoint) + servicer := server.NewGRPC(broker) - if err := srv.Run(); err != nil { - panic(err.Error()) - } + grpcServer := grpc.NewServer() + ubroker.RegisterBrokerServer(grpcServer, servicer) - fmt.Printf("listening on %s\n", endpoint) + listener, err := net.Listen("tcp", endpoint) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } signalChan := make(chan os.Signal, 1) cleanupDone := make(chan struct{}) @@ -35,11 +42,17 @@ func main() { fmt.Printf("\nReceived an interrupt, stopping services...\n\n") close(cleanupDone) }() + + go func() { + if err := grpcServer.Serve(listener); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + fmt.Printf("listening on %s\n", endpoint) <-cleanupDone - if err := srv.Close(); err != nil { - panic(err.Error()) - } + grpcServer.GracefulStop() if err := broker.Close(); err != nil { panic(err.Error()) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,10 +2,12 @@ package broker import ( "context" + "sync" "time" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" + + "github.com/arcana261/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker @@ -13,34 +15,328 @@ import ( // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + broker := &core{ + ttl: ttl, + requests: make(chan interface{}), + deliveryChannel: make(chan *ubroker.Delivery), + closed: make(chan bool, 1), + closing: make(chan bool, 1), + pending: make(map[int32]*ubroker.Message), + messages: []*ubroker.Delivery{{}}, + } + + broker.wg.Add(1) + go broker.startDelivery() + + return broker } type core struct { - // TODO: add required fields + nextID int32 + ttl time.Duration + + mutex sync.Mutex + working sync.WaitGroup + wg sync.WaitGroup + + requests chan interface{} + deliveryChannel chan *ubroker.Delivery + closed chan bool + closing chan bool + pending map[int32]*ubroker.Message + messages []*ubroker.Delivery + channel chan *ubroker.Delivery +} + +type acknowledgeRequest struct { + id int32 + response chan acknowledgeResponse } -func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") +type acknowledgeResponse struct { + id int32 + err error } -func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") +type requeueRequest struct { + id int32 + response chan requeueResponse } -func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") +type requeueResponse struct { + id int32 + err error } -func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") +type publishRequest struct { + message *ubroker.Message + response chan publishResponse +} + +type publishResponse struct { + err error +} + +func (c *core) Delivery(ctx context.Context) (<-chan *ubroker.Delivery, error) { + if isCanceledContext(ctx) { + return nil, ctx.Err() + } + + if !c.startWorking() { + return nil, ubroker.ErrClosed + } + defer c.working.Done() + + return c.deliveryChannel, nil +} + +func (c *core) Acknowledge(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &acknowledgeRequest{ + id: id, + response: make(chan acknowledgeResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *core) ReQueue(ctx context.Context, id int32) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case c.requests <- request: + select { + case response := <-request.response: + return response.err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *core) Publish(ctx context.Context, message *ubroker.Message) error { + if isCanceledContext(ctx) { + return ctx.Err() + } + + if !c.startWorking() { + return ubroker.ErrClosed + } + defer c.working.Done() + + request := &publishRequest{ + message: message, + response: make(chan publishResponse, 1), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.closed: + return ubroker.ErrClosed + case c.requests <- request: + return nil + } } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") + if !c.startClosing() { + return errors.New("can not close channel, closing in progress") + } + c.working.Wait() + close(c.closed) + c.wg.Wait() + close(c.deliveryChannel) + + return nil +} + +func (c *core) startDelivery() { + defer c.wg.Done() + for { + select { + case <-c.closed: + return + + case request := <-c.requests: + if isAcknowledgeRequest(request) { + c.wg.Add(1) + req, _ := request.(*acknowledgeRequest) + req.response <- c.handleAcknowledge(req) + } else if isRequeueRequest(request) { + c.wg.Add(1) + req, _ := request.(*requeueRequest) + req.response <- c.handleRequeue(req) + } else if isPublishRequest(request) { + c.wg.Add(1) + req, _ := request.(*publishRequest) + req.response <- c.handlePublish(req) + } else { + panic(errors.New("UNKNOWN REQUEST")) + } + + case c.channel <- c.messages[0]: + if c.channel != nil { + c.pending[c.messages[0].Id] = c.messages[0].Message + c.wg.Add(1) + go c.snooze(c.messages[0].Id) + + c.messages = c.messages[1:] + if len(c.messages) == 0 { + c.channel = nil + c.messages = []*ubroker.Delivery{{}} + } + } + } + } +} + +func (c *core) startWorking() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + c.working.Add(1) + return true + } +} + +func (c *core) startClosing() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + + select { + case <-c.closing: + return false + default: + close(c.closing) + return true + } +} + +func isCanceledContext(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + +func isAcknowledgeRequest(request interface{}) bool { + _, ok := request.(*acknowledgeRequest) + return ok +} + +func isRequeueRequest(request interface{}) bool { + _, ok := request.(*requeueRequest) + return ok +} + +func isPublishRequest(request interface{}) bool { + _, ok := request.(*publishRequest) + return ok +} + +func (c *core) handleAcknowledge(request *acknowledgeRequest) acknowledgeResponse { + defer c.wg.Done() + _, ok := c.pending[request.id] + if !ok { + return acknowledgeResponse{id: request.id, err: ubroker.ErrInvalidID} + } + delete(c.pending, request.id) + return acknowledgeResponse{id: request.id, err: nil} +} + +func (c *core) handleRequeue(request *requeueRequest) requeueResponse { + defer c.wg.Done() + message, ok := c.pending[request.id] + if !ok { + return requeueResponse{id: request.id, err: ubroker.ErrInvalidID} + } + delete(c.pending, request.id) + c.wg.Add(1) + c.handlePublish(&publishRequest{ + message: message, + response: make(chan publishResponse, 1), + }) + return requeueResponse{id: request.id, err: nil} +} + +func (c *core) handlePublish(request *publishRequest) publishResponse { + defer c.wg.Done() + + if c.channel == nil { + c.messages = []*ubroker.Delivery{} + c.channel = c.deliveryChannel + } + + id := c.nextID + c.nextID++ + newDelivery := ubroker.Delivery{ + Id: id, + Message: request.message, + } + + c.messages = append(c.messages, &newDelivery) + + return publishResponse{err: nil} +} + +func (c *core) snooze(id int32) { + defer c.wg.Done() + ticker := time.NewTicker(c.ttl) + defer ticker.Stop() + + select { + case <-c.closed: + return + + case <-ticker.C: + request := &requeueRequest{ + id: id, + response: make(chan requeueResponse, 1), + } + select { + case <-c.closed: + return + + case c.requests <- request: + } + } } diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -34,7 +34,7 @@ func BenchmarkPublish(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } } @@ -43,7 +43,7 @@ func BenchmarkDelivery(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) @@ -61,12 +61,12 @@ func BenchmarkAcknowledge(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -75,7 +75,7 @@ func BenchmarkAcknowledge(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Acknowledge(context.Background(), deliveries[i].ID)) + s.Nil(broker.Acknowledge(context.Background(), deliveries[i].Id)) } } @@ -84,12 +84,12 @@ func BenchmarkReQueue(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -98,7 +98,7 @@ func BenchmarkReQueue(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.ReQueue(context.Background(), deliveries[i].ID)) + s.Nil(broker.ReQueue(context.Background(), deliveries[i].Id)) } } @@ -123,7 +123,7 @@ func (s *CoreBrokerTestSuite) TestPublishedMessageShouldAppearInDeliveryOnce() { s.publish("hello1") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Equal("hello1", msg.Message.Body) + s.Equal("hello1", string(msg.Message.Body)) s.assertEmpty(delivery) } @@ -133,13 +133,13 @@ func (s *CoreBrokerTestSuite) TestPublishShouldPreserveOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.NotEqual(messages[0].ID, messages[1].ID) - s.NotEqual(messages[0].ID, messages[2].ID) - s.NotEqual(messages[1].ID, messages[2].ID) - s.Equal("hello1", messages[0].Message.Body) - s.Equal("hello2", messages[1].Message.Body) - s.Equal("hello3", messages[2].Message.Body) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.NotEqual(messages[0].Id, messages[1].Id) + s.NotEqual(messages[0].Id, messages[2].Id) + s.NotEqual(messages[1].Id, messages[2].Id) + s.Equal("hello1", string(messages[0].Message.Body)) + s.Equal("hello2", string(messages[1].Message.Body)) + s.Equal("hello3", string(messages[2].Message.Body)) } func (s *CoreBrokerTestSuite) TestDeliveriesShouldBeUnique() { @@ -153,7 +153,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeAcknowledgeablePreemptively( s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.Acknowledge(context.Background(), int32(id))) } } @@ -161,7 +162,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeQueueablePreemptively() { s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.ReQueue(context.Background(), int32(id))) } } @@ -169,15 +171,15 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeAcknowledgeable() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeAcknowledgedTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndependently() { @@ -186,10 +188,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndep s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.Acknowledge(context.Background(), messages[0].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[1].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.Acknowledge(context.Background(), messages[0].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[1].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[2].Id)) } func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() { @@ -197,7 +199,7 @@ func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() s.publish("hello") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) s.assertEmpty(delivery) } @@ -206,9 +208,9 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeReQueueable() { s.publish("hello") delivery := s.getDelivery(context.Background()) msg1 := <-delivery - s.Nil(s.broker.ReQueue(context.Background(), msg1.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg1.Id)) msg2 := <-delivery - s.NotEqual(msg1.ID, msg2.ID) + s.NotEqual(msg1.Id, msg2.Id) s.Equal(msg1.Message.Body, msg2.Message.Body) } @@ -216,8 +218,8 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeReQueueableTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.ReQueue(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndependently() { @@ -226,10 +228,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndepende s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) @@ -241,10 +243,10 @@ func (s *CoreBrokerTestSuite) TestReQueueCouldBreakOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) @@ -256,32 +258,35 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldReQueueUponHalfSecondTTL() { delivery := s.getDelivery(context.Background()) msg1 := <-delivery time.Sleep(250 * time.Millisecond) - s.Nil(s.broker.Acknowledge(context.Background(), msg1.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg1.Id)) s.publish("hello2") msg2 := <-delivery time.Sleep(750 * time.Millisecond) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.ID)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.Id)) msg3 := <-delivery - s.NotEqual(msg1.ID, msg3.ID) - s.NotEqual(msg2.ID, msg3.ID) - s.Equal("hello2", msg3.Message.Body) - s.Nil(s.broker.Acknowledge(context.Background(), msg3.ID)) + s.NotEqual(msg1.Id, msg3.Id) + s.NotEqual(msg2.Id, msg3.Id) + s.Equal("hello2", string(msg3.Message.Body)) + s.Nil(s.broker.Acknowledge(context.Background(), msg3.Id)) } func (s *CoreBrokerTestSuite) TestPublishShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Publish(context.Background(), ubroker.Message{})) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Publish(context.Background(), &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestAcknowledgeShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Acknowledge(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Acknowledge(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestReQueueShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.ReQueue(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.ReQueue(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldFailOnClosedBroker() { @@ -324,7 +329,8 @@ func (s *CoreBrokerTestSuite) TestPublishShouldFailOnCanceledContext() { s.prepareTest(1 * time.Second) ctx, cancel := context.WithCancel(context.Background()) cancel() - s.assertErrorEquals(ctx.Err(), s.broker.Publish(ctx, ubroker.Message{})) + s.assertErrorEquals(ctx.Err(), + s.broker.Publish(ctx, &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestDataRace() { @@ -344,8 +350,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { return default: - err := s.broker.Publish(context.Background(), ubroker.Message{ - Body: fmt.Sprint(rand.Intn(1000)), + err := s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(fmt.Sprint(rand.Intn(1000))), }) if err == ubroker.ErrClosed { return @@ -374,8 +380,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.Acknowledge(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.Acknowledge(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -402,8 +412,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.ReQueue(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.ReQueue(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -427,8 +441,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { } func (s *CoreBrokerTestSuite) publish(body string) { - s.Nil(s.broker.Publish(context.Background(), ubroker.Message{ - Body: body, + s.Nil(s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(body), })) } @@ -440,7 +454,7 @@ func (s *CoreBrokerTestSuite) assertErrorEquals(expected error, actual error) { } } -func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.Delivery { +func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan *ubroker.Delivery { result, err := s.broker.Delivery(ctx) if err != nil { s.FailNow(err.Error(), "could not obtain delivery") @@ -449,7 +463,7 @@ func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.De return result } -func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan ubroker.Delivery) { +func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan *ubroker.Delivery) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() diff --git a/internal/server/grpc.go b/internal/server/grpc.go new file mode 100644 index 0000000..a78d4c2 --- /dev/null +++ b/internal/server/grpc.go @@ -0,0 +1,69 @@ +package server + +import ( + "context" + + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type grpcServicer struct { + broker ubroker.Broker +} + +func getError(msg error) error { + switch msg { + case nil: + return nil + case ubroker.ErrClosed: + return status.Error(codes.Unavailable, "Broker is closed") + case ubroker.ErrInvalidID: + return status.Error(codes.InvalidArgument, "Invalid Id") + default: + return status.Error(codes.Unknown, "Unknown Error") + } +} + +func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { + return &grpcServicer{ + broker: broker, + } +} + +func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { + delivery, errMsg := s.broker.Delivery(context.Background()) + if errMsg != nil { + return getError(errMsg) + } + + for { + _, streamError := stream.Recv() + if streamError != nil { + return getError(streamError) + } + + delMsg, delSuccess := <-delivery + if delSuccess { + stream.Send(delMsg) + } else { + return getError(ubroker.ErrClosed) + } + } +} + +func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { + errMsg := s.broker.Acknowledge(ctx, request.GetId()) + return &empty.Empty{}, getError(errMsg) +} + +func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { + errMsg := s.broker.ReQueue(ctx, request.GetId()) + return &empty.Empty{}, getError(errMsg) +} + +func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { + errMsg := s.broker.Publish(ctx, request) + return &empty.Empty{}, getError(errMsg) +} diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go new file mode 100644 index 0000000..ec28f00 --- /dev/null +++ b/internal/server/grpc_test.go @@ -0,0 +1,383 @@ +package server_test + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/mock" + + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/phayes/freeport" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/stretchr/testify/suite" +) + +type GRPCServerTestSuite struct { + suite.Suite +} + +func TestGRPCServerTestSuite(t *testing.T) { + suite.Run(t, new(GRPCServerTestSuite)) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + result, err := stream.Recv() + s.Nil(err) + + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err := stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfInvalidID() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(ubroker.ErrInvalidID) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.InvalidArgument, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnOKOnSuccess() { + broker := &mockBroker{} + broker.On("Acknowledge", mock.Anything, int32(10)).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Acknowledge(ctx, &ubroker.AcknowledgeRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnUnavailableIfInvalidID() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(ubroker.ErrInvalidID) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.InvalidArgument, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestReQueueShouldReturnOKOnSuccess() { + broker := &mockBroker{} + broker.On("ReQueue", mock.Anything, int32(10)).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.ReQueue(ctx, &ubroker.ReQueueRequest{ + Id: 10, + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestPublishShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Once().Return(ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Publish(ctx, &ubroker.Message{ + Body: []byte("hello"), + }, grpc.WaitForReady(true)) + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestPublishShouldReturnOKIfSuccessful() { + broker := &mockBroker{} + broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Once().Return(nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + _, err := client.Publish(ctx, &ubroker.Message{ + Body: []byte("hello"), + }, grpc.WaitForReady(true)) + s.Nil(err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) makeChannel(args ...string) <-chan *ubroker.Delivery { + result := make(chan *ubroker.Delivery, len(args)) + var id int32 + + for _, arg := range args { + result <- &ubroker.Delivery{ + Id: id, + Message: &ubroker.Message{ + Body: []byte(arg), + }, + } + + id = id + 1 + } + + close(result) + return result +} + +func (s *GRPCServerTestSuite) assertStatusCode(code codes.Code, err error) { + if code == codes.OK { + s.Nil(err) + return + } + + grpcStatus, ok := status.FromError(err) + s.True(ok) + + if grpcStatus != nil { + s.Equal(code, grpcStatus.Code()) + } +} + +func (s *GRPCServerTestSuite) runTest( + tester func(ctx context.Context, client ubroker.BrokerClient), + broker ubroker.Broker) { + + port, err := freeport.GetFreePort() + if err != nil { + s.FailNow(err.Error(), "failed to obtain free port") + } + + endpoint := fmt.Sprintf("127.0.0.1:%d", port) + servicer := server.NewGRPC(broker) + + grpcServer := grpc.NewServer() + ubroker.RegisterBrokerServer(grpcServer, servicer) + + listener, err := net.Listen("tcp", endpoint) + if err != nil { + s.FailNow(err.Error(), "failed to open listener") + } + + go func() { + grpcServer.Serve(listener) + }() + + dialCtx, dialCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dialCtxCancel() + + conn, err := grpc.DialContext(dialCtx, endpoint, + grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + s.FailNow(err.Error(), "failed to dial to gRPC‌ server") + } + + client := ubroker.NewBrokerClient(conn) + + clientCtx, clientCtxCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer clientCtxCancel() + + tester(clientCtx, client) + + err = conn.Close() + if err != nil { + s.FailNow(err.Error(), "failed to properly close gRPC‌ client connection") + } + + grpcServer.GracefulStop() +} diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "regexp" "strconv" @@ -13,6 +12,7 @@ import ( "sync" "time" + "github.com/golang/protobuf/jsonpb" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -37,7 +37,7 @@ func init() { type httpServer struct { broker ubroker.Broker router *mux.Router - delivery <-chan ubroker.Delivery + delivery <-chan *ubroker.Delivery endpoint string server *http.Server mutex sync.Mutex @@ -126,14 +126,8 @@ func (s *httpServer) ServeHTTP(writer http.ResponseWriter, request *http.Request } func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Request) { - data, err := ioutil.ReadAll(request.Body) - if err != nil { - s.handleError(writer, request, err) - return - } - var msg ubroker.Message - err = json.Unmarshal(data, &msg) + err := jsonpb.Unmarshal(request.Body, &msg) if err != nil { s.handleError(writer, request, err) } @@ -141,7 +135,7 @@ func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Publish(ctx, msg) + err = s.broker.Publish(ctx, &msg) if err != nil { s.handleError(writer, request, err) } @@ -166,7 +160,7 @@ func (s *httpServer) handleAcknowledge(writer http.ResponseWriter, request *http ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Acknowledge(ctx, id) + err = s.broker.Acknowledge(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return @@ -192,7 +186,7 @@ func (s *httpServer) handleReQueue(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.ReQueue(ctx, id) + err = s.broker.ReQueue(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..aff3746 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -1,7 +1,6 @@ package server_test import ( - "context" "fmt" "io/ioutil" "net/http" @@ -15,35 +14,6 @@ import ( "github.com/stretchr/testify/suite" ) -type mockBroker struct { - mock.Mock -} - -func (m *mockBroker) Close() error { - args := m.Called() - return args.Error(0) -} - -func (m *mockBroker) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - args := m.Called(ctx) - return args.Get(0).(<-chan ubroker.Delivery), args.Error(1) -} - -func (m *mockBroker) Acknowledge(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) ReQueue(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) Publish(ctx context.Context, message ubroker.Message) error { - args := m.Called(ctx, message) - return args.Error(0) -} - type HTTPServerTestSuite struct { suite.Suite t *testing.T @@ -57,14 +27,14 @@ func TestHTTPServerTestSuite(t *testing.T) { func (s *HTTPServerTestSuite) prepareTest() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() } func (s *HTTPServerTestSuite) TestEmptyFetch() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() @@ -73,63 +43,63 @@ func (s *HTTPServerTestSuite) TestEmptyFetch() { func (s *HTTPServerTestSuite) TestPublish() { s.prepareTest() - s.broker.On("Publish", mock.Anything, mock.Anything).Return(nil) - s.httpPublish(`{"body": "hello"}`) - s.broker.AssertCalled( - s.t, - "Publish", - mock.Anything, - ubroker.Message{Body: "hello"}, - ) + + s.broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Return(nil) + + s.httpPublish(`{"body":"aGVsbG8="}`) + s.broker.AssertExpectations(s.T()) } func (s *HTTPServerTestSuite) TestFailedReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpReQueue(123, 400, &body) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(nil) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(nil) s.httpReQueue(123, 200, nil) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestFailedAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpAcknowledge(123, 400, &body) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(nil) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(nil) s.httpAcknowledge(123, 200, nil) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go new file mode 100644 index 0000000..d3a0fa4 --- /dev/null +++ b/internal/server/moc_broker_test.go @@ -0,0 +1,44 @@ +package server_test + +import ( + "context" + + "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/stretchr/testify/mock" +) + +type mockBroker struct { + mock.Mock +} + +func (m *mockBroker) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockBroker) Delivery(ctx context.Context) (<-chan *ubroker.Delivery, error) { + args := m.Called(ctx) + + var res0 <-chan *ubroker.Delivery + + if args.Get(0) != nil { + res0 = args.Get(0).(<-chan *ubroker.Delivery) + } + + return res0, args.Error(1) +} + +func (m *mockBroker) Acknowledge(ctx context.Context, id int32) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) ReQueue(ctx context.Context, id int32) error { + args := m.Called(ctx, id) + return args.Error(0) +} + +func (m *mockBroker) Publish(ctx context.Context, message *ubroker.Message) error { + args := m.Called(ctx, message) + return args.Error(0) +} diff --git a/pkg/ubroker/ubroker.go b/pkg/ubroker/ubroker.go index 18268cd..cbc090d 100644 --- a/pkg/ubroker/ubroker.go +++ b/pkg/ubroker/ubroker.go @@ -33,7 +33,7 @@ type Broker interface { // returned // 3. If broker is closed, `ErrClosed` is returned // 4. should be thread-safe - Delivery(ctx context.Context) (<-chan Delivery, error) + Delivery(ctx context.Context) (<-chan *Delivery, error) // Acknowledge is called by clients to declare that // specified message id has been successfuly processed @@ -48,7 +48,7 @@ type Broker interface { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe - Acknowledge(ctx context.Context, id int) error + Acknowledge(ctx context.Context, id int32) error // ReQueue is called by clients to declare that // specified message id should be put back in @@ -62,7 +62,7 @@ type Broker interface { // returned // 5. If broker is closed, `ErrClosed` is returned // 6. should be thread-safe - ReQueue(ctx context.Context, id int) error + ReQueue(ctx context.Context, id int32) error // Publish is used to enqueue a new message to broker // We demand following: @@ -71,7 +71,7 @@ type Broker interface { // returned // 2. If broker is closed, `ErrClosed` is returned // 3. should be thread-safe - Publish(ctx context.Context, message Message) error + Publish(ctx context.Context, message *Message) error } // HTTPServer defines an HTTP‌ API‌ server provider @@ -81,19 +81,3 @@ type HTTPServer interface { Run() error } - -// Message encapsulates a queued message -type Message struct { - // Body is an abitrary client-defined string - Body string `json:"body"` -} - -// Delivery encapsulates a message fetched -// from queue -type Delivery struct { - // Message is the message fetched from queue - Message Message `json:"message"` - - // ID - ID int `json:"id"` -}