diff --git a/.gitignore b/.gitignore index 4b66428..b151abc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ /ubroker -*.pb.go \ No newline at end of file +*.pb.go diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..73bb255 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -19,18 +19,86 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { } } -func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") +// Unavailable: status.Error(codes.Unavailable, "Unavailable") +// InvalidArgument: status.Error(codes.InvalidArgument, "InvalidArgument") + +// Publish message to Queue +// OK: on success +// Unavailable: If broker has been closed +func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { + err := s.broker.Publish(ctx, request) + + if err != nil { + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + } + + return &empty.Empty{}, nil } +// Acknowledge a message +// Should return: +// OK: on success +// Unavailable: If broker has been closed +// InvalidArgument: If requested ID is invalid func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Acknowledge(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed{ + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + }else{ + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidArgument") + } + } + + return &empty.Empty{}, nil } +// ReQueue a message +// OK: on success +// Unavailable: If broker has been closed +// InvalidArgument: If requested ID is invalid func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.ReQueue(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed{ + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + }else{ + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidArgument") + } + } + + return &empty.Empty{}, nil } -func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") +// Fetch should return a single Delivery per FetchRequest. +// Should return: +// Unavailable: If broker has been closed +func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { + ctx := stream.Context() + + deliveryChannel, err := s.broker.Delivery(ctx) + if err != nil { + return status.Error(codes.Unavailable, "Unavailable") + } + + for { + _, err = stream.Recv() + if err != nil{ + return status.Error(codes.Unavailable, "Unavailable") + } + + select { + case delivery := <-deliveryChannel: + if delivery != nil{ + stream.Send(delivery) + }else{ + return status.Error(codes.Unavailable, "Unavailable") + } + + case <-ctx.Done(): + return status.Error(codes.Unavailable, "Unavailable") + } + } }