diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..863f73f 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -2,6 +2,7 @@ package server import ( "context" + "io" "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" @@ -18,19 +19,62 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { broker: broker, } } - func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") + delivery, err := s.broker.Delivery(stream.Context()) + if err != nil { + return ReturnError(err) + } + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + println("heeeeeeeeeeeeeeeeeeeeeer!") + return ReturnError(err) + } + delivered := <-delivery + if delivered == nil { + return status.Error(codes.Unavailable, "Unavailable") + } + err = stream.Send(delivered) + if err != nil { + return ReturnError(err) + } + } + //return status.Error(codes.OK, "OK") } func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Acknowledge(ctx, request.Id) + if err != nil { + return &empty.Empty{}, ReturnError(err) + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.ReQueue(ctx, request.Id) + if err != nil { + return &empty.Empty{}, ReturnError(err) + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Publish(ctx, request) + if err != nil { + return &empty.Empty{}, ReturnError(err) + } + return &empty.Empty{}, status.Error(codes.OK, "OK") } +func ReturnError(err error) error { + if err == ubroker.ErrClosed { + return status.Error(codes.Unavailable, "Unavailable") + } + if err == ubroker.ErrInvalidID { + return status.Error(codes.InvalidArgument, "InvalidID") + } + return nil +} + diff --git a/pkg/ubroker/errors.go b/pkg/ubroker/errors.go index 8e73291..903fa7d 100644 --- a/pkg/ubroker/errors.go +++ b/pkg/ubroker/errors.go @@ -16,4 +16,4 @@ var ( // servicer has been shutted-down and a new request // rolls in ErrClosed = errors.New("closed") -) +) \ No newline at end of file