diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 40e5f9e..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -3,10 +3,15 @@ package main import ( "flag" "fmt" + "log" + "net" "os" "os/signal" "time" + "github.com/arcana261/ubroker/pkg/ubroker" + "google.golang.org/grpc" + "github.com/arcana261/ubroker/internal/broker" "github.com/arcana261/ubroker/internal/server" ) @@ -19,13 +24,15 @@ func main() { broker := broker.New(time.Duration(*ttlPtr) * time.Millisecond) endpoint := fmt.Sprintf(":%d", *portPtr) - srv := server.NewHTTP(broker, endpoint) + servicer := server.NewGRPC(broker) - if err := srv.Run(); err != nil { - panic(err.Error()) - } + grpcServer := grpc.NewServer() + ubroker.RegisterBrokerServer(grpcServer, servicer) - fmt.Printf("listening on %s\n", endpoint) + listener, err := net.Listen("tcp", endpoint) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } signalChan := make(chan os.Signal, 1) cleanupDone := make(chan struct{}) @@ -35,11 +42,17 @@ func main() { fmt.Printf("\nReceived an interrupt, stopping services...\n\n") close(cleanupDone) }() + + go func() { + if err := grpcServer.Serve(listener); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + fmt.Printf("listening on %s\n", endpoint) <-cleanupDone - if err := srv.Close(); err != nil { - panic(err.Error()) - } + grpcServer.GracefulStop() if err := broker.Close(); err != nil { panic(err.Error()) diff --git a/internal/broker/core.go b/internal/broker/core.go index f9b0a8b..d6dbdbd 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -2,6 +2,8 @@ package broker import ( "context" + "fmt" + "sync" "time" "github.com/arcana261/ubroker/pkg/ubroker" @@ -13,34 +15,227 @@ import ( // we requeue an unacknowledged/unrequeued message // automatically. func New(ttl time.Duration) ubroker.Broker { - return &core{} + temp := &core{ + closed: false, + brokerChan: make(chan ubroker.Delivery, 1000), + closedChan: make(chan bool, 5000), + publishedQueue: []item{}, + receivedId: []int{}, + receivedAck: []int{}, + receivedRequeue: []int{}, + lastIdValue: -1, + deliveryStarted: false, + wg: sync.WaitGroup{}, + ttl: ttl, + } + + return temp } +type item struct { + Message ubroker.Message + ID int + receivedAckChannel chan int +} type core struct { - // TODO: add required fields + closed bool + brokerChan chan ubroker.Delivery + closedChan chan bool + publishedQueue []item + receivedId []int + lastIdValue int + receivedAck []int + receivedRequeue []int + wg sync.WaitGroup + mut sync.Mutex + deliveryStarted bool + ttl time.Duration } +func contextProblem(ctx context.Context) bool { + if ctx.Err() == context.Canceled { + return true + } + if ctx.Err() == context.DeadlineExceeded { + return true + } + return false +} func (c *core) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - // TODO:‌ implement me - return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") + + if contextProblem(ctx) { + return nil, ctx.Err() + } + if c.closed { + return nil, ubroker.ErrClosed + } + c.deliveryStarted = true + //c.wg.Done() + return c.brokerChan, nil + //return nil, errors.Wrap(ubroker.ErrUnimplemented, "method Delivery is not implemented") } func (c *core) Acknowledge(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Acknowledge is not implemented") + fmt.Println("id", id) + if c.closed { + return ubroker.ErrClosed + } + if contextProblem(ctx) { + return ctx.Err() + } + temp := false + c.mut.Lock() + fmt.Println("locked in ack") + if c.deliveryStarted { + temp = true + } + for _, element := range c.receivedAck { + if element == id { + temp = false + } + } + + if !temp { + fmt.Println("locked in ack released") + c.mut.Unlock() + return errors.Wrap(ubroker.ErrInvalidID, "invalid Id") + } + if c.closed { + fmt.Println("locked in ack released") + c.mut.Unlock() + + return ubroker.ErrClosed + } + c.receivedAck = append(c.receivedAck, id) + for i, element := range c.publishedQueue { + if element.ID == id { + fmt.Println("-") + c.publishedQueue[i].receivedAckChannel <- id + fmt.Println("--") + c.publishedQueue = append(c.publishedQueue[:i], c.publishedQueue[i+1:]...) + break + } + } + fmt.Println("locked in ack released") + c.mut.Unlock() + + //c.wg.Done() + return nil } +func (c *core) DoingReQueue(ctx context.Context, id int) { + for i, element := range c.publishedQueue { + if element.ID == id { + c.receivedRequeue = append(c.receivedRequeue, id) + c.receivedAck = append(c.receivedAck, id) + c.lastIdValue += 1 + c.receivedId = append(c.receivedId, c.lastIdValue) + v := ubroker.Delivery{Message: element.Message, ID: c.lastIdValue} + v2 := item{Message: element.Message, ID: c.lastIdValue, receivedAckChannel: make(chan int, 10)} + //fmt.Println(len(c.publishedQueue), id, i) + c.publishedQueue = append(c.publishedQueue[:i], c.publishedQueue[i+1:]...) + c.publishedQueue = append(c.publishedQueue, v2) + c.brokerChan <- v + fmt.Println("locked released doing requeue") + c.mut.Unlock() + go c.HandelingTTL(ctx, v2) + break + } + } + +} +func (c *core) HandelingTTL(ctx context.Context, element item) { + fmt.Println("ha?") + select { + case <-time.After(c.ttl): + fmt.Println("inja? ") + c.mut.Lock() + fmt.Println("locked in handeling ttl ") + c.DoingReQueue(ctx, element.ID) + return + case <-element.receivedAckChannel: + fmt.Println("what ?") + return + case <-c.closedChan: + return + } +} func (c *core) ReQueue(ctx context.Context, id int) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method ReQueue is not implemented") + c.mut.Lock() + fmt.Println("locked here requqeue!") + if c.closed { + fmt.Println("locked requeue released") + c.mut.Unlock() + return ubroker.ErrClosed + } + if contextProblem(ctx) { + fmt.Println("locked requeue released") + c.mut.Unlock() + return ctx.Err() + } + temp := false + if c.deliveryStarted { + temp = true + } + for _, element := range c.receivedRequeue { + if element == id { + temp = false + } + } + for _, element := range c.receivedAck { + if element == id { + temp = false + } + } + if !temp { + fmt.Println("locked requeue released") + c.mut.Unlock() + + return errors.Wrap(ubroker.ErrInvalidID, "invalid Id") + } + c.DoingReQueue(ctx, id) + return nil } +func (c *core) DoingPublish(ctx context.Context, message ubroker.Message) { + c.lastIdValue += 1 + c.receivedId = append(c.receivedId, c.lastIdValue) + v := ubroker.Delivery{Message: message, ID: c.lastIdValue} + v2 := item{Message: message, ID: c.lastIdValue, receivedAckChannel: make(chan int, 10)} + c.publishedQueue = append(c.publishedQueue, v2) + c.brokerChan <- v + fmt.Println("locked doing publish released") + c.mut.Unlock() + c.HandelingTTL(ctx, v2) + //defer c.wg.Done() +} func (c *core) Publish(ctx context.Context, message ubroker.Message) error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Publish is not implemented") + if contextProblem(ctx) { + return ctx.Err() + } + c.mut.Lock() + fmt.Println("locked publish") + if c.closed { + c.mut.Unlock() + return ubroker.ErrClosed + } + go c.DoingPublish(ctx, message) + return nil } func (c *core) Close() error { - // TODO:‌ implement me - return errors.Wrap(ubroker.ErrUnimplemented, "method Close is not implemented") -} + if c.closed { + return nil + } + //c.wg.Wait() + for i := 0; i < 4000; i++ { + c.closedChan <- true + } + fmt.Println("here wait") + c.mut.Lock() + fmt.Println("closed") + close(c.brokerChan) + c.closed = true + c.mut.Unlock() + return nil +} \ No newline at end of file diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 9d6530d..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -34,7 +34,7 @@ func BenchmarkPublish(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } } @@ -43,7 +43,7 @@ func BenchmarkDelivery(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) @@ -61,12 +61,12 @@ func BenchmarkAcknowledge(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -75,7 +75,7 @@ func BenchmarkAcknowledge(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.Acknowledge(context.Background(), deliveries[i].ID)) + s.Nil(broker.Acknowledge(context.Background(), deliveries[i].Id)) } } @@ -84,12 +84,12 @@ func BenchmarkReQueue(b *testing.B) { s := assert.New(b) for i := 0; i < b.N; i++ { - s.Nil(broker.Publish(context.Background(), ubroker.Message{})) + s.Nil(broker.Publish(context.Background(), &ubroker.Message{})) } delivery, err := broker.Delivery(context.Background()) s.Nil(err) - deliveries := make([]ubroker.Delivery, 0, b.N) + deliveries := make([]*ubroker.Delivery, 0, b.N) for i := 0; i < b.N; i++ { deliveries = append(deliveries, <-delivery) @@ -98,7 +98,7 @@ func BenchmarkReQueue(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Nil(broker.ReQueue(context.Background(), deliveries[i].ID)) + s.Nil(broker.ReQueue(context.Background(), deliveries[i].Id)) } } @@ -123,7 +123,7 @@ func (s *CoreBrokerTestSuite) TestPublishedMessageShouldAppearInDeliveryOnce() { s.publish("hello1") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Equal("hello1", msg.Message.Body) + s.Equal("hello1", string(msg.Message.Body)) s.assertEmpty(delivery) } @@ -133,13 +133,13 @@ func (s *CoreBrokerTestSuite) TestPublishShouldPreserveOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.NotEqual(messages[0].ID, messages[1].ID) - s.NotEqual(messages[0].ID, messages[2].ID) - s.NotEqual(messages[1].ID, messages[2].ID) - s.Equal("hello1", messages[0].Message.Body) - s.Equal("hello2", messages[1].Message.Body) - s.Equal("hello3", messages[2].Message.Body) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.NotEqual(messages[0].Id, messages[1].Id) + s.NotEqual(messages[0].Id, messages[2].Id) + s.NotEqual(messages[1].Id, messages[2].Id) + s.Equal("hello1", string(messages[0].Message.Body)) + s.Equal("hello2", string(messages[1].Message.Body)) + s.Equal("hello3", string(messages[2].Message.Body)) } func (s *CoreBrokerTestSuite) TestDeliveriesShouldBeUnique() { @@ -153,7 +153,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeAcknowledgeablePreemptively( s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.Acknowledge(context.Background(), int32(id))) } } @@ -161,7 +162,8 @@ func (s *CoreBrokerTestSuite) TestMessageShouldNotBeQueueablePreemptively() { s.prepareTest(1 * time.Second) s.publish("hello") for id := -100; id <= 100; id++ { - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), id)) + s.assertErrorEquals(ubroker.ErrInvalidID, + s.broker.ReQueue(context.Background(), int32(id))) } } @@ -169,15 +171,15 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeAcknowledgeable() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeAcknowledgedTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndependently() { @@ -186,10 +188,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeAcknowledgeableIndep s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.Acknowledge(context.Background(), messages[0].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[1].ID)) - s.Nil(s.broker.Acknowledge(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.Acknowledge(context.Background(), messages[0].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[1].Id)) + s.Nil(s.broker.Acknowledge(context.Background(), messages[2].Id)) } func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() { @@ -197,7 +199,7 @@ func (s *CoreBrokerTestSuite) TestAcknowledgedMessageShouldNotAppearInDelivery() s.publish("hello") delivery := s.getDelivery(context.Background()) msg := <-delivery - s.Nil(s.broker.Acknowledge(context.Background(), msg.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg.Id)) s.assertEmpty(delivery) } @@ -206,9 +208,9 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldBeReQueueable() { s.publish("hello") delivery := s.getDelivery(context.Background()) msg1 := <-delivery - s.Nil(s.broker.ReQueue(context.Background(), msg1.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg1.Id)) msg2 := <-delivery - s.NotEqual(msg1.ID, msg2.ID) + s.NotEqual(msg1.Id, msg2.Id) s.Equal(msg1.Message.Body, msg2.Message.Body) } @@ -216,8 +218,8 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldNotBeReQueueableTwice() { s.prepareTest(1 * time.Second) s.publish("hello") msg := <-s.getDelivery(context.Background()) - s.Nil(s.broker.ReQueue(context.Background(), msg.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.ID)) + s.Nil(s.broker.ReQueue(context.Background(), msg.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg.Id)) } func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndependently() { @@ -226,10 +228,10 @@ func (s *CoreBrokerTestSuite) TestMultipleDeliveriesShouldBeReQueueableIndepende s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) @@ -241,10 +243,10 @@ func (s *CoreBrokerTestSuite) TestReQueueCouldBreakOrder() { s.publish("hello2") s.publish("hello3") delivery := s.getDelivery(context.Background()) - messages := []ubroker.Delivery{<-delivery, <-delivery, <-delivery} - s.Nil(s.broker.ReQueue(context.Background(), messages[2].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[1].ID)) - s.Nil(s.broker.ReQueue(context.Background(), messages[0].ID)) + messages := []*ubroker.Delivery{<-delivery, <-delivery, <-delivery} + s.Nil(s.broker.ReQueue(context.Background(), messages[2].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[1].Id)) + s.Nil(s.broker.ReQueue(context.Background(), messages[0].Id)) s.Equal(messages[2].Message.Body, (<-delivery).Message.Body) s.Equal(messages[1].Message.Body, (<-delivery).Message.Body) s.Equal(messages[0].Message.Body, (<-delivery).Message.Body) @@ -256,32 +258,35 @@ func (s *CoreBrokerTestSuite) TestDeliveryShouldReQueueUponHalfSecondTTL() { delivery := s.getDelivery(context.Background()) msg1 := <-delivery time.Sleep(250 * time.Millisecond) - s.Nil(s.broker.Acknowledge(context.Background(), msg1.ID)) + s.Nil(s.broker.Acknowledge(context.Background(), msg1.Id)) s.publish("hello2") msg2 := <-delivery time.Sleep(750 * time.Millisecond) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.ID)) - s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.ID)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.Acknowledge(context.Background(), msg2.Id)) + s.assertErrorEquals(ubroker.ErrInvalidID, s.broker.ReQueue(context.Background(), msg2.Id)) msg3 := <-delivery - s.NotEqual(msg1.ID, msg3.ID) - s.NotEqual(msg2.ID, msg3.ID) - s.Equal("hello2", msg3.Message.Body) - s.Nil(s.broker.Acknowledge(context.Background(), msg3.ID)) + s.NotEqual(msg1.Id, msg3.Id) + s.NotEqual(msg2.Id, msg3.Id) + s.Equal("hello2", string(msg3.Message.Body)) + s.Nil(s.broker.Acknowledge(context.Background(), msg3.Id)) } func (s *CoreBrokerTestSuite) TestPublishShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Publish(context.Background(), ubroker.Message{})) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Publish(context.Background(), &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestAcknowledgeShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.Acknowledge(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.Acknowledge(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestReQueueShouldFailOnClosedBroker() { s.prepareClosed() - s.assertErrorEquals(ubroker.ErrClosed, s.broker.ReQueue(context.Background(), 1)) + s.assertErrorEquals(ubroker.ErrClosed, + s.broker.ReQueue(context.Background(), 1)) } func (s *CoreBrokerTestSuite) TestDeliveryShouldFailOnClosedBroker() { @@ -324,7 +329,8 @@ func (s *CoreBrokerTestSuite) TestPublishShouldFailOnCanceledContext() { s.prepareTest(1 * time.Second) ctx, cancel := context.WithCancel(context.Background()) cancel() - s.assertErrorEquals(ctx.Err(), s.broker.Publish(ctx, ubroker.Message{})) + s.assertErrorEquals(ctx.Err(), + s.broker.Publish(ctx, &ubroker.Message{})) } func (s *CoreBrokerTestSuite) TestDataRace() { @@ -344,8 +350,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { return default: - err := s.broker.Publish(context.Background(), ubroker.Message{ - Body: fmt.Sprint(rand.Intn(1000)), + err := s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(fmt.Sprint(rand.Intn(1000))), }) if err == ubroker.ErrClosed { return @@ -374,8 +380,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.Acknowledge(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.Acknowledge(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -402,8 +412,12 @@ func (s *CoreBrokerTestSuite) TestDataRace() { case <-ticker.C: return - case msg := <-delivery: - err = s.broker.ReQueue(context.Background(), msg.ID) + case msg, ok := <-delivery: + if !ok { + return + } + + err = s.broker.ReQueue(context.Background(), msg.Id) if err == ubroker.ErrClosed { return } @@ -427,8 +441,8 @@ func (s *CoreBrokerTestSuite) TestDataRace() { } func (s *CoreBrokerTestSuite) publish(body string) { - s.Nil(s.broker.Publish(context.Background(), ubroker.Message{ - Body: body, + s.Nil(s.broker.Publish(context.Background(), &ubroker.Message{ + Body: []byte(body), })) } @@ -440,7 +454,7 @@ func (s *CoreBrokerTestSuite) assertErrorEquals(expected error, actual error) { } } -func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.Delivery { +func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan *ubroker.Delivery { result, err := s.broker.Delivery(ctx) if err != nil { s.FailNow(err.Error(), "could not obtain delivery") @@ -449,7 +463,7 @@ func (s *CoreBrokerTestSuite) getDelivery(ctx context.Context) <-chan ubroker.De return result } -func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan ubroker.Delivery) { +func (s *CoreBrokerTestSuite) assertEmpty(delivery <-chan *ubroker.Delivery) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() diff --git a/internal/server/http.go b/internal/server/http.go index 6dcac4c..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "regexp" "strconv" @@ -13,6 +12,7 @@ import ( "sync" "time" + "github.com/golang/protobuf/jsonpb" "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -37,7 +37,7 @@ func init() { type httpServer struct { broker ubroker.Broker router *mux.Router - delivery <-chan ubroker.Delivery + delivery <-chan *ubroker.Delivery endpoint string server *http.Server mutex sync.Mutex @@ -126,14 +126,8 @@ func (s *httpServer) ServeHTTP(writer http.ResponseWriter, request *http.Request } func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Request) { - data, err := ioutil.ReadAll(request.Body) - if err != nil { - s.handleError(writer, request, err) - return - } - var msg ubroker.Message - err = json.Unmarshal(data, &msg) + err := jsonpb.Unmarshal(request.Body, &msg) if err != nil { s.handleError(writer, request, err) } @@ -141,7 +135,7 @@ func (s *httpServer) handlePublish(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Publish(ctx, msg) + err = s.broker.Publish(ctx, &msg) if err != nil { s.handleError(writer, request, err) } @@ -166,7 +160,7 @@ func (s *httpServer) handleAcknowledge(writer http.ResponseWriter, request *http ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.Acknowledge(ctx, id) + err = s.broker.Acknowledge(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return @@ -192,7 +186,7 @@ func (s *httpServer) handleReQueue(writer http.ResponseWriter, request *http.Req ctx, cancel := s.makeContext(request) defer cancel() - err = s.broker.ReQueue(ctx, id) + err = s.broker.ReQueue(ctx, int32(id)) if err != nil { s.handleError(writer, request, err) return diff --git a/internal/server/http_test.go b/internal/server/http_test.go index d29d161..1141457 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -1,7 +1,6 @@ package server_test import ( - "context" "fmt" "io/ioutil" "net/http" @@ -9,41 +8,12 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/mahtabfarrokh/ubroker/internal/server" + "github.com/mahtabfarrokh/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) -type mockBroker struct { - mock.Mock -} - -func (m *mockBroker) Close() error { - args := m.Called() - return args.Error(0) -} - -func (m *mockBroker) Delivery(ctx context.Context) (<-chan ubroker.Delivery, error) { - args := m.Called(ctx) - return args.Get(0).(<-chan ubroker.Delivery), args.Error(1) -} - -func (m *mockBroker) Acknowledge(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) ReQueue(ctx context.Context, id int) error { - args := m.Called(ctx, id) - return args.Error(0) -} - -func (m *mockBroker) Publish(ctx context.Context, message ubroker.Message) error { - args := m.Called(ctx, message) - return args.Error(0) -} - type HTTPServerTestSuite struct { suite.Suite t *testing.T @@ -57,14 +27,14 @@ func TestHTTPServerTestSuite(t *testing.T) { func (s *HTTPServerTestSuite) prepareTest() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() } func (s *HTTPServerTestSuite) TestEmptyFetch() { s.broker = new(mockBroker) - s.broker.On("Delivery", mock.Anything).Return(make(<-chan ubroker.Delivery, 0), nil) + s.broker.On("Delivery", mock.Anything).Return(make(<-chan *ubroker.Delivery, 0), nil) s.server = server.NewHTTP(s.broker, ":0") s.server.Run() @@ -73,63 +43,63 @@ func (s *HTTPServerTestSuite) TestEmptyFetch() { func (s *HTTPServerTestSuite) TestPublish() { s.prepareTest() - s.broker.On("Publish", mock.Anything, mock.Anything).Return(nil) - s.httpPublish(`{"body": "hello"}`) - s.broker.AssertCalled( - s.t, - "Publish", - mock.Anything, - ubroker.Message{Body: "hello"}, - ) + + s.broker.On("Publish", mock.Anything, mock.MatchedBy(func(msg *ubroker.Message) bool { + s.Equal("hello", string(msg.Body)) + return "hello" == string(msg.Body) + })).Return(nil) + + s.httpPublish(`{"body":"aGVsbG8="}`) + s.broker.AssertExpectations(s.T()) } func (s *HTTPServerTestSuite) TestFailedReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpReQueue(123, 400, &body) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestReQueue() { s.prepareTest() - s.broker.On("ReQueue", mock.Anything, 123).Return(nil) + s.broker.On("ReQueue", mock.Anything, int32(123)).Return(nil) s.httpReQueue(123, 200, nil) s.broker.AssertCalled( s.t, "ReQueue", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestFailedAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(ubroker.ErrInvalidID) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(ubroker.ErrInvalidID) body := `{"error": "id is invalid"}` s.httpAcknowledge(123, 400, &body) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } func (s *HTTPServerTestSuite) TestAcknowledge() { s.prepareTest() - s.broker.On("Acknowledge", mock.Anything, 123).Return(nil) + s.broker.On("Acknowledge", mock.Anything, int32(123)).Return(nil) s.httpAcknowledge(123, 200, nil) s.broker.AssertCalled( s.t, "Acknowledge", mock.Anything, - 123, + int32(123), ) } diff --git a/pkg/ubroker/errors.go b/pkg/ubroker/errors.go index 8e73291..903fa7d 100644 --- a/pkg/ubroker/errors.go +++ b/pkg/ubroker/errors.go @@ -16,4 +16,4 @@ var ( // servicer has been shutted-down and a new request // rolls in ErrClosed = errors.New("closed") -) +) \ No newline at end of file diff --git a/pkg/ubroker/ubroker.go b/pkg/ubroker/ubroker.go index 18268cd..3a76d77 100644 --- a/pkg/ubroker/ubroker.go +++ b/pkg/ubroker/ubroker.go @@ -96,4 +96,4 @@ type Delivery struct { // ID ID int `json:"id"` -} +} \ No newline at end of file