Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public CompletionStage<HelloReply> sayHello(HelloRequest in) {
future.completeExceptionally(statusRuntimeException);
return future;
}

// #rich_error_model_unary

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package example.myapp.helloworld.grpc;

import static org.junit.Assert.assertEquals;

import com.google.rpc.error_details.LocalizedMessage;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletionStage;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.grpc.GrpcClientSettings;
import org.apache.pekko.grpc.GrpcServiceException;
import org.apache.pekko.grpc.javadsl.MetadataStatus;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.junit.Assert;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;

public class RichErrorModelNativeTest extends JUnitSuite {

private ServerBinding run(ActorSystem sys) throws Exception {

GreeterService impl = new RichErrorNativeImpl();

org.apache.pekko.japi.function.Function<HttpRequest, CompletionStage<HttpResponse>> service =
GreeterServiceHandlerFactory.create(impl, sys);
CompletionStage<ServerBinding> bound =
Http.get(sys).newServerAt("127.0.0.1", 8091).bind(service);

bound.thenAccept(
binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
});
return bound.toCompletableFuture().get();
}

@Test
@SuppressWarnings("unchecked")
public void testNativeApi() throws Exception {
Config conf = ConfigFactory.load();
ActorSystem sys = ActorSystem.create("HelloWorld", conf);
run(sys);

GrpcClientSettings settings =
GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, sys).withTls(false);

GreeterServiceClient client = null;
try {
client = GreeterServiceClient.create(settings, sys);

// #client_request
HelloRequest request = HelloRequest.newBuilder().setName("Alice").build();
CompletionStage<HelloReply> response = client.sayHello(request);
StatusRuntimeException statusRuntimeException =
response
.toCompletableFuture()
.handle(
(res, ex) -> {
return (StatusRuntimeException) ex;
})
.get();

GrpcServiceException ex = GrpcServiceException.apply(statusRuntimeException);
MetadataStatus meta = (MetadataStatus) ex.getMetadata();
assertEquals(
"type.googleapis.com/google.rpc.LocalizedMessage", meta.getDetails().get(0).typeUrl());

assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode());
assertEquals("What is wrong?", meta.getMessage());

LocalizedMessage details =
meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion())
.get(0);
assertEquals("The password!", details.message());
assertEquals("EN", details.locale());
// #client_request

} catch (Exception e) {
e.printStackTrace();
Assert.fail("Got unexpected error " + e.getMessage());
} finally {
if (client != null) client.close();
sys.terminate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.pekko.NotUsed;
import org.apache.pekko.grpc.GrpcServiceException;
import org.apache.pekko.stream.javadsl.Source;
import scala.jdk.javaapi.CollectionConverters;

public class RichErrorNativeImpl implements GreeterService {

Expand All @@ -33,13 +32,13 @@ public CompletionStage<HelloReply> sayHello(HelloRequest in) {
ar.add(LocalizedMessage.of("EN", "The password!"));

GrpcServiceException exception =
GrpcServiceException.apply(
Code.INVALID_ARGUMENT, "What is wrong?", CollectionConverters.asScala(ar).toSeq());
GrpcServiceException.create(Code.INVALID_ARGUMENT, "What is wrong?", ar);

CompletableFuture<HelloReply> future = new CompletableFuture<>();
future.completeExceptionally(exception);
return future;
}

// #rich_error_model_unary

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.grpc.scaladsl

import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.grpc.{ GrpcClientSettings, GrpcServiceException }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.testkit.TestKit
import com.google.rpc.Code
import com.google.rpc.error_details.LocalizedMessage
import com.typesafe.config.ConfigFactory
import example.myapp.helloworld.grpc.helloworld._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }

class RichErrorModelNativeSpec
extends TestKit(ActorSystem("RichErrorNativeSpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))

implicit val sys: ActorSystem = system
implicit val ec: ExecutionContext = sys.dispatcher

object RichErrorNativeImpl extends GreeterService {

// #rich_error_model_unary
def sayHello(in: HelloRequest): Future[HelloReply] = {
Future.failed(
GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!"))))
}
// #rich_error_model_unary

def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
Source.failed(
GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!"))))
}

override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
in.runWith(Sink.seq).flatMap { _ =>
Future.failed(
GrpcServiceException(
Code.INVALID_ARGUMENT,
"What is wrong?",
Seq(new LocalizedMessage("EN", "The password!"))))
}
}

override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
Source.failed(
GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!"))))
}
}

val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(RichErrorNativeImpl)

val bound =
Http(system).newServerAt(interface = "127.0.0.1", port = 0).bind(service).futureValue

val client = GreeterServiceClient(
GrpcClientSettings.connectToServiceAt("127.0.0.1", bound.localAddress.getPort).withTls(false))

val conf = ConfigFactory.load().withFallback(ConfigFactory.defaultApplication())

"Rich error model" should {

"work with the native api on a unary call" in {

// #client_request
val richErrorResponse = client.sayHello(HelloRequest("Bob")).failed.futureValue

richErrorResponse match {
case status: GrpcServiceException =>
status.metadata match {
case richMetadata: MetadataStatus =>
richMetadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage")

import LocalizedMessage.messageCompanion
val localizedMessage = richMetadata.getParsedDetails[LocalizedMessage].head
localizedMessage.message should be("The password!")
localizedMessage.locale should be("EN")

richMetadata.code should be(3)
richMetadata.message should be("What is wrong?")

case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}")
}

case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}")
}
// #client_request
}

"work with the native api on a stream request" in {

val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))

val richErrorResponse = client.itKeepsTalking(Source(requests)).failed.futureValue

richErrorResponse match {
case status: GrpcServiceException =>
status.metadata match {
case metadata: MetadataStatus =>
metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage")

import LocalizedMessage.messageCompanion
val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head

metadata.code should be(3)
metadata.message should be("What is wrong?")
localizedMessage.message should be("The password!")
localizedMessage.locale should be("EN")

case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}")
}

case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}")
}

}

"work with the native api on a stream response" in {
val richErrorResponseStream = client.itKeepsReplying(HelloRequest("Bob"))
val richErrorResponse =
richErrorResponseStream.run().failed.futureValue

richErrorResponse match {
case status: GrpcServiceException =>
status.metadata match {
case metadata: MetadataStatus =>
metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage")

val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head

metadata.code should be(3)
metadata.message should be("What is wrong?")
localizedMessage.message should be("The password!")
localizedMessage.locale should be("EN")

case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}")
}

case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}")
}

}

"work with the native api on a bidi stream" in {

val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
val richErrorResponseStream = client.streamHellos(Source(requests))
val richErrorResponse =
richErrorResponseStream.run().failed.futureValue

richErrorResponse match {
case status: GrpcServiceException =>
status.metadata match {
case metadata: MetadataStatus =>
metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage")

val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head

metadata.code should be(3)
metadata.message should be("What is wrong?")
localizedMessage.message should be("The password!")
localizedMessage.locale should be("EN")

case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}")
}

case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}")
}

}

}

override def afterAll(): Unit = system.terminate().futureValue
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static void singleRequestReply(GreeterServiceClient client) throws Excep
CompletionStage<HelloReply> reply = client.sayHello().addHeader("key", "value").invoke(request);
System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}

// #with-metadata

private static void streamingRequest(GreeterServiceClient client) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public CompletionStage<HelloReply> sayHello(HelloRequest in) {
}
}
}

// #implementation

// #method
Expand Down Expand Up @@ -121,6 +122,7 @@ private static <ServiceImpl> Route loggingErrorHandlingGrpcRoute(
}
}));
}

// #method

// #custom-error-mapping
Expand All @@ -132,6 +134,7 @@ private static <ServiceImpl> Route loggingErrorHandlingGrpcRoute(
return null;
}
};

// #custom-error-mapping

public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Exception {
Expand Down
Loading
Loading