Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions internal/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"io"

"github.com/arcana261/ubroker/pkg/ubroker"
"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -19,18 +20,59 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer {
}
}

func mapErrorToStatus(err error) error {
switch err {
case io.EOF:
return nil
case nil:
return status.Error(codes.OK, "Ok!")
case ubroker.ErrClosed:
return status.Error(codes.Unavailable, "Channel is closed!")
case context.Canceled, context.DeadlineExceeded:
return status.Error(codes.DeadlineExceeded, "Deadline Exceeded!")
case ubroker.ErrUnimplemented:
return status.Error(codes.Unimplemented, "Method is not implemented yet!")
case ubroker.ErrInvalidID, errInvalidArgument:
return status.Error(codes.InvalidArgument, "Argument (or id) is not valid!")
default:
return status.Error(codes.Unknown, "Unknown error occurred!")
}
}

func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error {
return status.Error(codes.Unimplemented, "not implemented")

deliveryChannel, err := s.broker.Delivery(stream.Context())
if err != nil {
return mapErrorToStatus(err)
}

for true {
_, err = stream.Recv()
if err != nil {
return mapErrorToStatus(err)
}
if message, open := <-deliveryChannel; open {
err := stream.Send(message)
if err != nil {
return mapErrorToStatus(err)
}
} else {
return mapErrorToStatus(ubroker.ErrClosed)
}
}

return nil

}

func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
return &empty.Empty{}, mapErrorToStatus(s.broker.Acknowledge(ctx, request.Id))
}

func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
return &empty.Empty{}, mapErrorToStatus(s.broker.ReQueue(ctx, request.GetId()))
}

func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
return &empty.Empty{}, mapErrorToStatus(s.broker.Publish(ctx, request))
}