diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java index f7118b36..542431dc 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java @@ -48,6 +48,7 @@ public CompletionStage sayHello(HelloRequest in) { future.completeExceptionally(statusRuntimeException); return future; } + // #rich_error_model_unary @Override diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java new file mode 100644 index 00000000..29cb4b59 --- /dev/null +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +package example.myapp.helloworld.grpc; + +import static org.junit.Assert.assertEquals; + +import com.google.rpc.error_details.LocalizedMessage; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.concurrent.CompletionStage; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.grpc.GrpcClientSettings; +import org.apache.pekko.grpc.GrpcServiceException; +import org.apache.pekko.grpc.javadsl.MetadataStatus; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.junit.Assert; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +public class RichErrorModelNativeTest extends JUnitSuite { + + private ServerBinding run(ActorSystem sys) throws Exception { + + GreeterService impl = new RichErrorNativeImpl(); + + org.apache.pekko.japi.function.Function> service = + GreeterServiceHandlerFactory.create(impl, sys); + CompletionStage bound = + Http.get(sys).newServerAt("127.0.0.1", 8091).bind(service); + + bound.thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); + return bound.toCompletableFuture().get(); + } + + @Test + @SuppressWarnings("unchecked") + public void testNativeApi() throws Exception { + Config conf = ConfigFactory.load(); + ActorSystem sys = ActorSystem.create("HelloWorld", conf); + run(sys); + + GrpcClientSettings settings = + GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, sys).withTls(false); + + GreeterServiceClient client = null; + try { + client = GreeterServiceClient.create(settings, sys); + + // #client_request + HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); + CompletionStage response = client.sayHello(request); + StatusRuntimeException statusRuntimeException = + response + .toCompletableFuture() + .handle( + (res, ex) -> { + return (StatusRuntimeException) ex; + }) + .get(); + + GrpcServiceException ex = GrpcServiceException.apply(statusRuntimeException); + MetadataStatus meta = (MetadataStatus) ex.getMetadata(); + assertEquals( + "type.googleapis.com/google.rpc.LocalizedMessage", meta.getDetails().get(0).typeUrl()); + + assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode()); + assertEquals("What is wrong?", meta.getMessage()); + + LocalizedMessage details = + meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion()) + .get(0); + assertEquals("The password!", details.message()); + assertEquals("EN", details.locale()); + // #client_request + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Got unexpected error " + e.getMessage()); + } finally { + if (client != null) client.close(); + sys.terminate(); + } + } +} diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java index 5b32b794..66921a82 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java @@ -21,7 +21,6 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.grpc.GrpcServiceException; import org.apache.pekko.stream.javadsl.Source; -import scala.jdk.javaapi.CollectionConverters; public class RichErrorNativeImpl implements GreeterService { @@ -33,13 +32,13 @@ public CompletionStage sayHello(HelloRequest in) { ar.add(LocalizedMessage.of("EN", "The password!")); GrpcServiceException exception = - GrpcServiceException.apply( - Code.INVALID_ARGUMENT, "What is wrong?", CollectionConverters.asScala(ar).toSeq()); + GrpcServiceException.create(Code.INVALID_ARGUMENT, "What is wrong?", ar); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(exception); return future; } + // #rich_error_model_unary @Override diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala new file mode 100644 index 00000000..596d29a8 --- /dev/null +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2020-2021 Lightbend Inc. + */ + +package org.apache.pekko.grpc.scaladsl + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.grpc.{ GrpcClientSettings, GrpcServiceException } +import pekko.http.scaladsl.Http +import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse } +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.testkit.TestKit +import com.google.rpc.Code +import com.google.rpc.error_details.LocalizedMessage +import com.typesafe.config.ConfigFactory +import example.myapp.helloworld.grpc.helloworld._ +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future } + +class RichErrorModelNativeSpec + extends TestKit(ActorSystem("RichErrorNativeSpec")) + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis)) + + implicit val sys: ActorSystem = system + implicit val ec: ExecutionContext = sys.dispatcher + + object RichErrorNativeImpl extends GreeterService { + + // #rich_error_model_unary + def sayHello(in: HelloRequest): Future[HelloReply] = { + Future.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + // #rich_error_model_unary + + def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { + Source.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + + override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { + in.runWith(Sink.seq).flatMap { _ => + Future.failed( + GrpcServiceException( + Code.INVALID_ARGUMENT, + "What is wrong?", + Seq(new LocalizedMessage("EN", "The password!")))) + } + } + + override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + Source.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + } + + val service: HttpRequest => Future[HttpResponse] = + GreeterServiceHandler(RichErrorNativeImpl) + + val bound = + Http(system).newServerAt(interface = "127.0.0.1", port = 0).bind(service).futureValue + + val client = GreeterServiceClient( + GrpcClientSettings.connectToServiceAt("127.0.0.1", bound.localAddress.getPort).withTls(false)) + + val conf = ConfigFactory.load().withFallback(ConfigFactory.defaultApplication()) + + "Rich error model" should { + + "work with the native api on a unary call" in { + + // #client_request + val richErrorResponse = client.sayHello(HelloRequest("Bob")).failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case richMetadata: MetadataStatus => + richMetadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + import LocalizedMessage.messageCompanion + val localizedMessage = richMetadata.getParsedDetails[LocalizedMessage].head + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + richMetadata.code should be(3) + richMetadata.message should be("What is wrong?") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + // #client_request + } + + "work with the native api on a stream request" in { + + val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_)) + + val richErrorResponse = client.itKeepsTalking(Source(requests)).failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + import LocalizedMessage.messageCompanion + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + "work with the native api on a stream response" in { + val richErrorResponseStream = client.itKeepsReplying(HelloRequest("Bob")) + val richErrorResponse = + richErrorResponseStream.run().failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + "work with the native api on a bidi stream" in { + + val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_)) + val richErrorResponseStream = client.streamHellos(Source(requests)) + val richErrorResponse = + richErrorResponseStream.run().failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + } + + override def afterAll(): Unit = system.terminate().futureValue +} diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java index bac878bc..d3d56c8f 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java @@ -61,6 +61,7 @@ private static void singleRequestReply(GreeterServiceClient client) throws Excep CompletionStage reply = client.sayHello().addHeader("key", "value").invoke(request); System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); } + // #with-metadata private static void streamingRequest(GreeterServiceClient client) throws Exception { diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java index 49dff12a..c3c5752a 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java @@ -73,6 +73,7 @@ public CompletionStage sayHello(HelloRequest in) { } } } + // #implementation // #method @@ -121,6 +122,7 @@ private static Route loggingErrorHandlingGrpcRoute( } })); } + // #method // #custom-error-mapping @@ -132,6 +134,7 @@ private static Route loggingErrorHandlingGrpcRoute( return null; } }; + // #custom-error-mapping public static CompletionStage run(ActorSystem sys) throws Exception { diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala index f0440a2b..191c4c62 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala @@ -17,12 +17,26 @@ import io.grpc.{ Status, StatusRuntimeException } import org.apache.pekko import pekko.annotation.ApiMayChange import pekko.grpc.scaladsl.{ Metadata, MetadataBuilder } -import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl } +import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl, RichGrpcMetadataImpl } import com.google.protobuf.any.Any import io.grpc.protobuf.StatusProto +import scala.jdk.CollectionConverters._ object GrpcServiceException { + /** + * Java API + */ + def create( + code: com.google.rpc.Code, + message: String, + details: java.util.List[scalapb.GeneratedMessage]): GrpcServiceException = { + apply(code, message, details.asScala.toVector) + } + + /** + * Scala API + */ def apply( code: com.google.rpc.Code, message: String, @@ -45,7 +59,7 @@ object GrpcServiceException { } def apply(ex: StatusRuntimeException): GrpcServiceException = { - new GrpcServiceException(ex.getStatus, new GrpcMetadataImpl(ex.getTrailers)) + new GrpcServiceException(ex.getStatus, new RichGrpcMetadataImpl(ex.getStatus, ex.getTrailers)) } } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala index a814069f..9fbd1c9b 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala @@ -25,7 +25,10 @@ import pekko.http.scaladsl.model.HttpHeader import pekko.japi.Pair import pekko.util.ByteString import pekko.grpc.javadsl -import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, StringEntry } +import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, MetadataStatus, StringEntry } +import com.google.protobuf.any +import com.google.rpc.Status +import scalapb.{ GeneratedMessage, GeneratedMessageCompanion } @InternalApi private[pekko] object MetadataImpl { val BINARY_SUFFIX: String = io.grpc.Metadata.BINARY_HEADER_SUFFIX @@ -196,7 +199,7 @@ class HeaderMetadataImpl(headers: immutable.Seq[HttpHeader] = immutable.Seq.empt * @param delegate The underlying Scala metadata instance. */ @InternalApi -class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata { +class JavaMetadataImpl(val delegate: Metadata) extends javadsl.Metadata with javadsl.MetadataStatus { override def getText(key: String): Optional[String] = delegate.getText(key).toJava @@ -214,4 +217,46 @@ class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata { override def toString: String = delegate.toString + + private def richDelegate = + delegate match { + case r: MetadataStatus => r + case other => throw new IllegalArgumentException(s"Delegate metadata is not MetadataStatus but ${other.getClass}") + } + + override def getStatus(): Status = richDelegate.status + + override def getCode(): Int = richDelegate.code + + override def getMessage(): String = richDelegate.message + + private lazy val javaDetails: jList[com.google.protobuf.any.Any] = richDelegate.details.asJava + def getDetails(): jList[com.google.protobuf.any.Any] = javaDetails + + def getParsedDetails[K <: GeneratedMessage](companion: GeneratedMessageCompanion[K]): jList[K] = + richDelegate.getParsedDetails(companion).asJava +} + +class RichGrpcMetadataImpl(delegate: io.grpc.Status, meta: io.grpc.Metadata) + extends GrpcMetadataImpl(meta) + with MetadataStatus { + override val raw: Option[io.grpc.Metadata] = Some(meta) + override lazy val status: com.google.rpc.Status = + io.grpc.protobuf.StatusProto.fromStatusAndTrailers(delegate, meta) + + override def code: Int = status.getCode + override def message: String = status.getMessage + + override lazy val details: Seq[any.Any] = status.getDetailsList.asScala.map { item => + fromJavaProto(item) + }.toVector + + def getParsedDetails[K <: scalapb.GeneratedMessage]( + implicit companion: scalapb.GeneratedMessageCompanion[K]): Seq[K] = { + val typeUrl = "type.googleapis.com/" + companion.scalaDescriptor.fullName + details.filter(_.typeUrl == typeUrl).map(_.unpack) + } + + private def fromJavaProto(javaPbSource: com.google.protobuf.Any): com.google.protobuf.any.Any = + com.google.protobuf.any.Any(typeUrl = javaPbSource.getTypeUrl, value = javaPbSource.getValue) } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala index 6d3205d3..648c2021 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala @@ -18,8 +18,8 @@ import java.util.concurrent.CompletionStage import org.apache.pekko import pekko.NotUsed import pekko.annotation.{ InternalApi, InternalStableApi } -import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse } -import pekko.stream.Materializer +import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcServiceException, GrpcSingleResponse } +import pekko.stream.{ Graph, Materializer, SourceShape } import pekko.stream.javadsl.{ Source => JavaSource } import pekko.stream.scaladsl.{ Keep, Sink, Source } import pekko.util.ByteString @@ -52,10 +52,12 @@ final class ScalaUnaryRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: I): Future[O] = - channel.invoke(request, headers, descriptor, defaultOptions) + channel.invoke(request, headers, descriptor, defaultOptions).recoverWith(RequestBuilderImpl.richError) override def invokeWithMetadata(request: I): Future[GrpcSingleResponse[O]] = - channel.invokeWithMetadata(request, headers, descriptor, callOptionsWithDeadline()) + channel + .invokeWithMetadata(request, headers, descriptor, callOptionsWithDeadline()) + .recoverWith(RequestBuilderImpl.richError) override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, O] = new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -118,7 +120,7 @@ final class ScalaClientStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: Source[I, NotUsed]): Future[O] = - invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic) + invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic).recoverWith(RequestBuilderImpl.richError) override def invokeWithMetadata(source: Source[I, NotUsed]): Future[GrpcSingleResponse[O]] = { // a bit much overhead here because we are using the flow to represent a single response @@ -144,6 +146,7 @@ final class ScalaClientStreamingRequestBuilder[I, O]( def getTrailers() = metadata.getTrailers() } }(ExecutionContext.parasitic) + .recoverWith(RequestBuilderImpl.richError) } override def withHeaders(headers: MetadataImpl): ScalaClientStreamingRequestBuilder[I, O] = @@ -207,10 +210,14 @@ final class ScalaServerStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: I): Source[O, NotUsed] = - invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed) + invokeWithMetadata(request) + .mapMaterializedValue(_ => NotUsed) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def invokeWithMetadata(request: I): Source[O, Future[GrpcResponseMetadata]] = - channel.invokeWithMetadata(Source.single(request), headers, descriptor, true, callOptionsWithDeadline()) + channel + .invokeWithMetadata(Source.single(request), headers, descriptor, true, callOptionsWithDeadline()) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def withHeaders(headers: MetadataImpl): ScalaServerStreamingRequestBuilder[I, O] = new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -274,10 +281,14 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: Source[I, NotUsed]): Source[O, NotUsed] = - invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed) + invokeWithMetadata(request) + .mapMaterializedValue(_ => NotUsed) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def invokeWithMetadata(source: Source[I, NotUsed]): Source[O, Future[GrpcResponseMetadata]] = - channel.invokeWithMetadata(source, headers, descriptor, true, callOptionsWithDeadline()) + channel + .invokeWithMetadata(source, headers, descriptor, true, callOptionsWithDeadline()) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def withHeaders(headers: MetadataImpl): ScalaBidirectionalStreamingRequestBuilder[I, O] = new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -331,3 +342,18 @@ trait MetadataOperations[T <: MetadataOperations[T]] { def addHeader(key: String, value: ByteString): T = withHeaders(headers = headers.addEntry(key, value)) } + +object RequestBuilderImpl { + def richErrorStream[U]: PartialFunction[Throwable, Graph[SourceShape[U], NotUsed]] = { + case item => Source.failed(RequestBuilderImpl.lift(item)) + } + + def richError[U]: PartialFunction[Throwable, Future[U]] = { + case item => Future.failed(RequestBuilderImpl.lift(item)) + } + + def lift(item: Throwable): scala.Throwable = item match { + case ex: StatusRuntimeException => GrpcServiceException(ex) + case other => other + } +} diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala index 4dc69c83..5a455bcc 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala @@ -59,3 +59,19 @@ trait Metadata { */ def asScala: scaladsl.Metadata } + +/** + * Provides access to richer error details using the logical gRPC com.google.rpc.Status message, see + * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. + * + * Not for user extension + */ +@ApiMayChange +@DoNotInherit +trait MetadataStatus extends Metadata { + def getStatus(): com.google.rpc.Status + def getCode(): Int + def getMessage(): String + def getDetails(): List[com.google.protobuf.any.Any] + def getParsedDetails[K <: scalapb.GeneratedMessage](companion: scalapb.GeneratedMessageCompanion[K]): List[K] +} diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala index d8ce6056..e566a789 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala @@ -16,6 +16,7 @@ package org.apache.pekko.grpc.scaladsl import org.apache.pekko import pekko.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import pekko.util.ByteString +import com.google.protobuf.any /** * Immutable representation of the metadata in a call @@ -55,3 +56,19 @@ import pekko.util.ByteString @ApiMayChange def asList: List[(String, MetadataEntry)] } + +/** + * Provides access to richer error details using the logical gRPC com.google.rpc.Status message, see + * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. + * + * Not for user extension + */ +@ApiMayChange +@DoNotInherit +trait MetadataStatus extends Metadata { + def status: com.google.rpc.Status + def code: Int + def message: String + def details: Seq[any.Any] + def getParsedDetails[K <: scalapb.GeneratedMessage](implicit msg: scalapb.GeneratedMessageCompanion[K]): Seq[K] +}