diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..4d1fb5c 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -2,7 +2,6 @@ package server import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" @@ -20,17 +19,63 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { } func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") + deliveryChannel, err := s.broker.Delivery(context.Background()) + if err != nil{ + return status.Error(codes.Unavailable, "unavailable") + } + + for { + if _, err := stream.Recv(); err != nil{ + return status.Error(codes.Unavailable, "not requested") + } + if msg, ok := <- deliveryChannel; ok { + stream.Send(msg) + } else { + return status.Error(codes.Unavailable, "is closed") + } + } + } func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Acknowledge(ctx, request.Id) + if err == nil { + return &empty.Empty{}, status.Error(codes.OK, "ok") + } + switch err.Error() { + case "closed": + return &empty.Empty{},status.Error(codes.Unavailable, "unavailable") + case "id is invalid": + return &empty.Empty{},status.Error(codes.InvalidArgument, "invalid arg") + default: + return &empty.Empty{}, err + } } func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.ReQueue(ctx, request.Id) + if err != nil{ + switch err.Error() { + case "closed": + return &empty.Empty{},status.Error(codes.Unavailable, "unavailable") + case "id is invalid": + return &empty.Empty{},status.Error(codes.InvalidArgument, "invalid arg") + default: + return &empty.Empty{}, err + } + } + return &empty.Empty{}, status.Error(codes.OK, "ok") } func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Publish(ctx, request) + if err != nil{ + switch err.Error() { + case "closed": + return &empty.Empty{},status.Error(codes.Unavailable, "unavailable") + default: + return &empty.Empty{}, err + } + } + return &empty.Empty{}, status.Error(codes.OK, "ok") }