Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/ubroker
/ubroker
*.pb.go
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go:
- 1.12.x

before_install:
- make dev-dependencies
- curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.7.1/protoc-3.7.1-linux-x86_64.zip
- unzip protoc-3.7.1-linux-x86_64.zip -d protoc3
- PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make dev-dependencies

script:
- make check
- PROTOC=protoc3/bin/protoc PROTOC_OPTIONS="-Iprotoc3/include -I." make check
26 changes: 20 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
.PHONY: check help dependencies dev-dependencies
.PHONY: check help dependencies dev-dependencies .pre-check-go generate

SRCS = $(patsubst ./%,%,$(shell find . -name "*.go" -not -path "*vendor*"))

help: ## Display this help screen
@grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

check: dev-dependencies ## Run unit tests
check: dev-dependencies | generate ## Run unit tests
go test ./...
go test -race ./...

benchmark: dev-dependencies ## Run benchmarks
benchmark: dev-dependencies | generate ## Run benchmarks
go test -bench . ./...

dependencies: ##‌ Download dependencies
dependencies: | generate ##‌ Download dependencies
go get -v ./...

dev-dependencies: dependencies ##‌ Download development dependencies
dev-dependencies: dependencies | generate ##‌ Download development dependencies
go get -v github.com/stretchr/testify/suite
go get -v github.com/stretchr/testify/assert
go get -v github.com/phayes/freeport

ubroker: $(SRCS) | dependencies ##‌ Compile us
ubroker: $(SRCS) pkg/ubroker/ubroker.pb.go | dependencies generate ##‌ Compile us
go build -o ubroker ./cmd/ubroker

generate: pkg/ubroker/ubroker.pb.go

pkg/ubroker/ubroker.pb.go: api/ubroker.proto | .pre-check-go
$(PROTOC) $(PROTOC_OPTIONS) --go_out=plugins=grpc:$(GOPATH)/src api/ubroker.proto

.pre-check-go:
go get -v github.com/golang/protobuf/protoc-gen-go
go get -v github.com/vektra/mockery/.../

GOPATH ?= $(shell go env GOPATH)
PROTOC ?= protoc
PROTOC_OPTIONS ?=
42 changes: 38 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,43 @@

# ubroker

Welcome to phase2!

# Current Phase 2

Greetings! As you may already know, In this phase of the project we are about to take
our broker system which we implemented prevously, take out it's HTTP‌ (JSON) layer and replace
it with a neat gRPC‌ mechanism !

Before we begin any further, I'd like to point out that our master branch has changed and
contains implementation made by your friend, and ofcourse, my dear colleague at Cafebazaar,
[Milad Norouzi](https://github.com/miladosos) as reference implementation. I've made slight
alterations to his implementations to make it compatible with our new protocol buffer specification.

So.. list of changes I've made to master branch are as follows:

* `api/ubroker.proto`: This file now contains proto specification for RPC's we'd like to have.
* `Makefile`: I've added a dependency for tests (a library which generates random free ports) and ofcourse, directives to generate GO‌ code from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`. The first time you try to execute `make check` or `make ubroker` this autogenerated file will be generated from proto file.
* `pkg/ubroker/ubroker.go`: I've modified our `Broker` interface to use structs defined in our protobuffer `api/ubroker.proto`.
* `internal/broker/core.go`: Contains implementation from my colleague [Milad Norouzi](https://github.com/miladosos)
* `internal/broker/core_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto`
* `internal/server/http.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto`
* `internal/server/http_test.go`: I've made modifications to make it complient with structures defined in our protobuf file `api/ubroker.proto`
* `internal/server/grpc.go`: This new file is the file you shoud implement (which implements `BrokerServer` autogenerated from `api/ubroker.proto` into `pkg/ubroker/ubroker.pb.go`)
* `internal/server/grpc_test.go`: This new file contains tests that I'd like to PASS :) and you should too!
* `cmd/ubroker/main.go`: I've made modifications to start gRPC‌ server from implementation in `internal/server/grpc.go`.

Your task is as follows:
1. Make sure you have `protoc` installed. (the protobuf compiler)
2. Run `make dev-dependencies` to download all dependencies and ofcourse, autogenerate `pkg/ubroker/ubroker.pb.go`
3. Implement file at `internal/server/grpc.go`
4. Submit your pull request!

# Previous Phase 1

Happy new year to you, students!

We wanted to set the stage for continuous series of excersices to develop
We wanted to set the stage for continuous series of exercises to develop
a message broker.

A message broker is a system that acts as a hub to distribute messages in a
Expand All @@ -18,12 +52,12 @@ various benefits of using messaging systems, which the most important are:
And much more! I highly encourage you two watch [this](https://www.youtube.com/watch?v=rXi5CLjIQ9k)
awesome video! I also encourage you to look at RabbitMQ design.

So, the messaging system is expected to do following opration:
So, the messaging system is expected to do following operation:

1. Provide a `/publish` HTTP API that let's clients publish messages to a queue. Or to say enqueue messages in our queue.
2. Provide a `/fetch` HTTP‌ API that let's clients fetch messages from queue.
3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` gurantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfuly and gracefully processed them. This is why this system is called to have a `at least once` gurantee because clients might see messages **at least once**. By stating that our gurantee is at-least once, we are not referring to a randomized behaviour that we might re-send a message to clients, but rather clients might see messages more than once becuase of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfuly processed fetched message.
4. Provde a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them.
3. Provide a `/acknowledge/{id}` HTTP‌‌ API that clients call after their processing is finished so that we can remove item from queue. This is important because we can have a `at least once` guarantee on our queue: If a client crashes after receiving a message from queue, we can return them automatically to queue after a timeout. This way we can ensure no message is removed from queue without clients acknowledging they have successfully and gracefully processed them. This is why this system is called to have an `at least once` guarantee because clients might see messages **at least once**. By stating that our gaurantee is at-least once, we are not referring to a randomized behavior that we might re-send a message to clients, but rather clients might see messages more than once because of failures or errors in their systems (like database transaction failure, etc.) And we will ensure that we keep supplying enqueued message until clients confirm that they have successfully processed fetched message.
4. Provide a `/requeue/{id}` HTTP‌ API that let's clients to requeue a message. This is useful when clients run into error in their system and want to retry a message or let some other worker handle them.

Now... Don't you guys worry! We have already laid out a boilerplate code beautifully so that you can learn how to code in GO and also validate your results! The steps are as follows:

Expand Down
51 changes: 51 additions & 0 deletions api/ubroker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
syntax = "proto3";

package ubroker;
option go_package = "github.com/arcana261/ubroker/pkg/ubroker";

import "google/protobuf/empty.proto";

service Broker {
// Fetch should return a single Delivery per FetchRequest.
// Should return:
// Unavailable: If broker has been closed
rpc Fetch(stream FetchRequest) returns (stream Delivery);

// Acknowledge a message
// Should return:
// OK: on success
// Unavailable: If broker has been closed
// InvalidArgument: If requested ID is invalid
rpc Acknowledge(AcknowledgeRequest) returns (google.protobuf.Empty);

// ReQueue a message
// OK: on success
// Unavailable: If broker has been closed
// InvalidArgument: If requested ID is invalid
rpc ReQueue(ReQueueRequest) returns (google.protobuf.Empty);

// Publish message to Queue
// OK: on success
// Unavailable: If broker has been closed
rpc Publish(Message) returns (google.protobuf.Empty);
}

message Message {
bytes body = 1;
}

message Delivery {
Message message = 1;
int32 id = 2;
}

message FetchRequest {
}

message AcknowledgeRequest {
int32 id = 1;
}

message ReQueueRequest {
int32 id = 1;
}
29 changes: 21 additions & 8 deletions cmd/ubroker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package main
import (
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"time"

"github.com/arcana261/ubroker/pkg/ubroker"
"google.golang.org/grpc"

"github.com/arcana261/ubroker/internal/broker"
"github.com/arcana261/ubroker/internal/server"
)
Expand All @@ -19,13 +24,15 @@ func main() {

broker := broker.New(time.Duration(*ttlPtr) * time.Millisecond)
endpoint := fmt.Sprintf(":%d", *portPtr)
srv := server.NewHTTP(broker, endpoint)
servicer := server.NewGRPC(broker)

if err := srv.Run(); err != nil {
panic(err.Error())
}
grpcServer := grpc.NewServer()
ubroker.RegisterBrokerServer(grpcServer, servicer)

fmt.Printf("listening on %s\n", endpoint)
listener, err := net.Listen("tcp", endpoint)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan struct{})
Expand All @@ -35,11 +42,17 @@ func main() {
fmt.Printf("\nReceived an interrupt, stopping services...\n\n")
close(cleanupDone)
}()

go func() {
if err := grpcServer.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()

fmt.Printf("listening on %s\n", endpoint)
<-cleanupDone

if err := srv.Close(); err != nil {
panic(err.Error())
}
grpcServer.GracefulStop()

if err := broker.Close(); err != nil {
panic(err.Error())
Expand Down
Loading