Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions internal/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,57 @@ type grpcServicer struct {
broker ubroker.Broker
}

func getError(msg error) error {
switch msg {
case nil:
return nil
case ubroker.ErrClosed:
return status.Error(codes.Unavailable, "Broker is closed")
case ubroker.ErrInvalidID:
return status.Error(codes.InvalidArgument, "Invalid Id")
default:
return status.Error(codes.Unknown, "Unknown Error")
}
}

func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer {
return &grpcServicer{
broker: broker,
}
}

func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error {
return status.Error(codes.Unimplemented, "not implemented")
delivery, errMsg := s.broker.Delivery(context.Background())
if errMsg != nil {
return getError(errMsg)
}

for {
_, streamError := stream.Recv()
if streamError != nil {
return getError(streamError)
}

delMsg, delSuccess := <-delivery
if delSuccess {
stream.Send(delMsg)
} else {
return getError(ubroker.ErrClosed)
}
}
}

func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
errMsg := s.broker.Acknowledge(ctx, request.GetId())
return &empty.Empty{}, getError(errMsg)
}

func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
errMsg := s.broker.ReQueue(ctx, request.GetId())
return &empty.Empty{}, getError(errMsg)
}

func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) {
return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented")
errMsg := s.broker.Publish(ctx, request)
return &empty.Empty{}, getError(errMsg)
}