From 3d80530d428e1d7828584491c0c78690622a8897 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 13:41:39 +0000 Subject: [PATCH 1/4] Initial plan From aeb41641b6fefba59288e9493c1285d03eb87cc3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 13:54:58 +0000 Subject: [PATCH 2/4] Add rich error model client API (MetadataStatus) from akka-grpc PRs #1665 and #1740 Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../grpc/RichErrorModelNativeTest.java | 95 ++++++++ .../helloworld/grpc/RichErrorNativeImpl.java | 5 +- .../scaladsl/RichErrorModelNativeSpec.scala | 204 ++++++++++++++++++ .../pekko/grpc/GrpcServiceException.scala | 18 +- .../pekko/grpc/internal/MetadataImpl.scala | 49 ++++- .../grpc/internal/RequestBuilderImpl.scala | 44 +++- .../apache/pekko/grpc/javadsl/Metadata.scala | 16 ++ .../apache/pekko/grpc/scaladsl/Metadata.scala | 17 ++ 8 files changed, 432 insertions(+), 16 deletions(-) create mode 100644 interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java create mode 100644 interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java new file mode 100644 index 000000000..d60bc0dde --- /dev/null +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +package example.myapp.helloworld.grpc; + +import com.google.rpc.error_details.LocalizedMessage; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.concurrent.CompletionStage; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.grpc.GrpcClientSettings; +import org.apache.pekko.grpc.GrpcServiceException; +import org.apache.pekko.grpc.javadsl.MetadataStatus; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.ServerBinding; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.junit.Assert; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +import static org.junit.Assert.assertEquals; + + +public class RichErrorModelNativeTest extends JUnitSuite { + + private ServerBinding run(ActorSystem sys) throws Exception { + + GreeterService impl = new RichErrorNativeImpl(); + + org.apache.pekko.japi.function.Function> service = GreeterServiceHandlerFactory.create(impl, sys); + CompletionStage bound = Http + .get(sys) + .newServerAt("127.0.0.1", 8091) + .bind(service); + + bound.thenAccept(binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); + }); + return bound.toCompletableFuture().get(); + } + + @Test + @SuppressWarnings("unchecked") + public void testNativeApi() throws Exception { + Config conf = ConfigFactory.load(); + ActorSystem sys = ActorSystem.create("HelloWorld", conf); + run(sys); + + GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, sys).withTls(false); + + GreeterServiceClient client = null; + try { + client = GreeterServiceClient.create(settings, sys); + + // #client_request + HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); + CompletionStage response = client.sayHello(request); + StatusRuntimeException statusRuntimeException = response.toCompletableFuture().handle((res, ex) -> { + return (StatusRuntimeException) ex; + }).get(); + + GrpcServiceException ex = GrpcServiceException.apply(statusRuntimeException); + MetadataStatus meta = (MetadataStatus) ex.getMetadata(); + assertEquals("type.googleapis.com/google.rpc.LocalizedMessage", meta.getDetails().get(0).typeUrl()); + + assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode()); + assertEquals("What is wrong?", meta.getMessage()); + + LocalizedMessage details = meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion()).get(0); + assertEquals("The password!", details.message()); + assertEquals("EN", details.locale()); + // #client_request + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Got unexpected error " + e.getMessage()); + } finally { + if (client != null) client.close(); + sys.terminate(); + } + } +} diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java index 5b32b7947..44292840c 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java @@ -21,7 +21,6 @@ import org.apache.pekko.NotUsed; import org.apache.pekko.grpc.GrpcServiceException; import org.apache.pekko.stream.javadsl.Source; -import scala.jdk.javaapi.CollectionConverters; public class RichErrorNativeImpl implements GreeterService { @@ -33,8 +32,8 @@ public CompletionStage sayHello(HelloRequest in) { ar.add(LocalizedMessage.of("EN", "The password!")); GrpcServiceException exception = - GrpcServiceException.apply( - Code.INVALID_ARGUMENT, "What is wrong?", CollectionConverters.asScala(ar).toSeq()); + GrpcServiceException.create( + Code.INVALID_ARGUMENT, "What is wrong?", ar); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(exception); diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala new file mode 100644 index 000000000..596d29a82 --- /dev/null +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2020-2021 Lightbend Inc. + */ + +package org.apache.pekko.grpc.scaladsl + +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.grpc.{ GrpcClientSettings, GrpcServiceException } +import pekko.http.scaladsl.Http +import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse } +import pekko.stream.scaladsl.{ Sink, Source } +import pekko.testkit.TestKit +import com.google.rpc.Code +import com.google.rpc.error_details.LocalizedMessage +import com.typesafe.config.ConfigFactory +import example.myapp.helloworld.grpc.helloworld._ +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future } + +class RichErrorModelNativeSpec + extends TestKit(ActorSystem("RichErrorNativeSpec")) + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis)) + + implicit val sys: ActorSystem = system + implicit val ec: ExecutionContext = sys.dispatcher + + object RichErrorNativeImpl extends GreeterService { + + // #rich_error_model_unary + def sayHello(in: HelloRequest): Future[HelloReply] = { + Future.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + // #rich_error_model_unary + + def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { + Source.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + + override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { + in.runWith(Sink.seq).flatMap { _ => + Future.failed( + GrpcServiceException( + Code.INVALID_ARGUMENT, + "What is wrong?", + Seq(new LocalizedMessage("EN", "The password!")))) + } + } + + override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + Source.failed( + GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new LocalizedMessage("EN", "The password!")))) + } + } + + val service: HttpRequest => Future[HttpResponse] = + GreeterServiceHandler(RichErrorNativeImpl) + + val bound = + Http(system).newServerAt(interface = "127.0.0.1", port = 0).bind(service).futureValue + + val client = GreeterServiceClient( + GrpcClientSettings.connectToServiceAt("127.0.0.1", bound.localAddress.getPort).withTls(false)) + + val conf = ConfigFactory.load().withFallback(ConfigFactory.defaultApplication()) + + "Rich error model" should { + + "work with the native api on a unary call" in { + + // #client_request + val richErrorResponse = client.sayHello(HelloRequest("Bob")).failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case richMetadata: MetadataStatus => + richMetadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + import LocalizedMessage.messageCompanion + val localizedMessage = richMetadata.getParsedDetails[LocalizedMessage].head + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + richMetadata.code should be(3) + richMetadata.message should be("What is wrong?") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + // #client_request + } + + "work with the native api on a stream request" in { + + val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_)) + + val richErrorResponse = client.itKeepsTalking(Source(requests)).failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + import LocalizedMessage.messageCompanion + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + "work with the native api on a stream response" in { + val richErrorResponseStream = client.itKeepsReplying(HelloRequest("Bob")) + val richErrorResponse = + richErrorResponseStream.run().failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + "work with the native api on a bidi stream" in { + + val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_)) + val richErrorResponseStream = client.streamHellos(Source(requests)) + val richErrorResponse = + richErrorResponseStream.run().failed.futureValue + + richErrorResponse match { + case status: GrpcServiceException => + status.metadata match { + case metadata: MetadataStatus => + metadata.details(0).typeUrl should be("type.googleapis.com/google.rpc.LocalizedMessage") + + val localizedMessage = metadata.getParsedDetails[LocalizedMessage].head + + metadata.code should be(3) + metadata.message should be("What is wrong?") + localizedMessage.message should be("The password!") + localizedMessage.locale should be("EN") + + case other => fail(s"This should be a MetadataStatus but it is ${other.getClass}") + } + + case ex => fail(s"This should be a GrpcServiceException but it is ${ex.getClass}") + } + + } + + } + + override def afterAll(): Unit = system.terminate().futureValue +} diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala index f0440a2b9..5a5e6e443 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala @@ -17,12 +17,26 @@ import io.grpc.{ Status, StatusRuntimeException } import org.apache.pekko import pekko.annotation.ApiMayChange import pekko.grpc.scaladsl.{ Metadata, MetadataBuilder } -import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl } +import pekko.grpc.internal.{ GrpcMetadataImpl, RichGrpcMetadataImpl, JavaMetadataImpl } import com.google.protobuf.any.Any import io.grpc.protobuf.StatusProto +import scala.jdk.CollectionConverters._ object GrpcServiceException { + /** + * Java API + */ + def create( + code: com.google.rpc.Code, + message: String, + details: java.util.List[scalapb.GeneratedMessage]): GrpcServiceException = { + apply(code, message, details.asScala.toVector) + } + + /** + * Scala API + */ def apply( code: com.google.rpc.Code, message: String, @@ -45,7 +59,7 @@ object GrpcServiceException { } def apply(ex: StatusRuntimeException): GrpcServiceException = { - new GrpcServiceException(ex.getStatus, new GrpcMetadataImpl(ex.getTrailers)) + new GrpcServiceException(ex.getStatus, new RichGrpcMetadataImpl(ex.getStatus, ex.getTrailers)) } } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala index a814069fe..9fbd1c9bc 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala @@ -25,7 +25,10 @@ import pekko.http.scaladsl.model.HttpHeader import pekko.japi.Pair import pekko.util.ByteString import pekko.grpc.javadsl -import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, StringEntry } +import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, MetadataStatus, StringEntry } +import com.google.protobuf.any +import com.google.rpc.Status +import scalapb.{ GeneratedMessage, GeneratedMessageCompanion } @InternalApi private[pekko] object MetadataImpl { val BINARY_SUFFIX: String = io.grpc.Metadata.BINARY_HEADER_SUFFIX @@ -196,7 +199,7 @@ class HeaderMetadataImpl(headers: immutable.Seq[HttpHeader] = immutable.Seq.empt * @param delegate The underlying Scala metadata instance. */ @InternalApi -class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata { +class JavaMetadataImpl(val delegate: Metadata) extends javadsl.Metadata with javadsl.MetadataStatus { override def getText(key: String): Optional[String] = delegate.getText(key).toJava @@ -214,4 +217,46 @@ class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata { override def toString: String = delegate.toString + + private def richDelegate = + delegate match { + case r: MetadataStatus => r + case other => throw new IllegalArgumentException(s"Delegate metadata is not MetadataStatus but ${other.getClass}") + } + + override def getStatus(): Status = richDelegate.status + + override def getCode(): Int = richDelegate.code + + override def getMessage(): String = richDelegate.message + + private lazy val javaDetails: jList[com.google.protobuf.any.Any] = richDelegate.details.asJava + def getDetails(): jList[com.google.protobuf.any.Any] = javaDetails + + def getParsedDetails[K <: GeneratedMessage](companion: GeneratedMessageCompanion[K]): jList[K] = + richDelegate.getParsedDetails(companion).asJava +} + +class RichGrpcMetadataImpl(delegate: io.grpc.Status, meta: io.grpc.Metadata) + extends GrpcMetadataImpl(meta) + with MetadataStatus { + override val raw: Option[io.grpc.Metadata] = Some(meta) + override lazy val status: com.google.rpc.Status = + io.grpc.protobuf.StatusProto.fromStatusAndTrailers(delegate, meta) + + override def code: Int = status.getCode + override def message: String = status.getMessage + + override lazy val details: Seq[any.Any] = status.getDetailsList.asScala.map { item => + fromJavaProto(item) + }.toVector + + def getParsedDetails[K <: scalapb.GeneratedMessage]( + implicit companion: scalapb.GeneratedMessageCompanion[K]): Seq[K] = { + val typeUrl = "type.googleapis.com/" + companion.scalaDescriptor.fullName + details.filter(_.typeUrl == typeUrl).map(_.unpack) + } + + private def fromJavaProto(javaPbSource: com.google.protobuf.Any): com.google.protobuf.any.Any = + com.google.protobuf.any.Any(typeUrl = javaPbSource.getTypeUrl, value = javaPbSource.getValue) } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala index 6d3205d3a..648c2021c 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala @@ -18,8 +18,8 @@ import java.util.concurrent.CompletionStage import org.apache.pekko import pekko.NotUsed import pekko.annotation.{ InternalApi, InternalStableApi } -import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse } -import pekko.stream.Materializer +import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcServiceException, GrpcSingleResponse } +import pekko.stream.{ Graph, Materializer, SourceShape } import pekko.stream.javadsl.{ Source => JavaSource } import pekko.stream.scaladsl.{ Keep, Sink, Source } import pekko.util.ByteString @@ -52,10 +52,12 @@ final class ScalaUnaryRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: I): Future[O] = - channel.invoke(request, headers, descriptor, defaultOptions) + channel.invoke(request, headers, descriptor, defaultOptions).recoverWith(RequestBuilderImpl.richError) override def invokeWithMetadata(request: I): Future[GrpcSingleResponse[O]] = - channel.invokeWithMetadata(request, headers, descriptor, callOptionsWithDeadline()) + channel + .invokeWithMetadata(request, headers, descriptor, callOptionsWithDeadline()) + .recoverWith(RequestBuilderImpl.richError) override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, O] = new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -118,7 +120,7 @@ final class ScalaClientStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: Source[I, NotUsed]): Future[O] = - invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic) + invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic).recoverWith(RequestBuilderImpl.richError) override def invokeWithMetadata(source: Source[I, NotUsed]): Future[GrpcSingleResponse[O]] = { // a bit much overhead here because we are using the flow to represent a single response @@ -144,6 +146,7 @@ final class ScalaClientStreamingRequestBuilder[I, O]( def getTrailers() = metadata.getTrailers() } }(ExecutionContext.parasitic) + .recoverWith(RequestBuilderImpl.richError) } override def withHeaders(headers: MetadataImpl): ScalaClientStreamingRequestBuilder[I, O] = @@ -207,10 +210,14 @@ final class ScalaServerStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: I): Source[O, NotUsed] = - invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed) + invokeWithMetadata(request) + .mapMaterializedValue(_ => NotUsed) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def invokeWithMetadata(request: I): Source[O, Future[GrpcResponseMetadata]] = - channel.invokeWithMetadata(Source.single(request), headers, descriptor, true, callOptionsWithDeadline()) + channel + .invokeWithMetadata(Source.single(request), headers, descriptor, true, callOptionsWithDeadline()) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def withHeaders(headers: MetadataImpl): ScalaServerStreamingRequestBuilder[I, O] = new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -274,10 +281,14 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O]( NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings) override def invoke(request: Source[I, NotUsed]): Source[O, NotUsed] = - invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed) + invokeWithMetadata(request) + .mapMaterializedValue(_ => NotUsed) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def invokeWithMetadata(source: Source[I, NotUsed]): Source[O, Future[GrpcResponseMetadata]] = - channel.invokeWithMetadata(source, headers, descriptor, true, callOptionsWithDeadline()) + channel + .invokeWithMetadata(source, headers, descriptor, true, callOptionsWithDeadline()) + .recoverWithRetries(1, RequestBuilderImpl.richErrorStream) override def withHeaders(headers: MetadataImpl): ScalaBidirectionalStreamingRequestBuilder[I, O] = new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers) @@ -331,3 +342,18 @@ trait MetadataOperations[T <: MetadataOperations[T]] { def addHeader(key: String, value: ByteString): T = withHeaders(headers = headers.addEntry(key, value)) } + +object RequestBuilderImpl { + def richErrorStream[U]: PartialFunction[Throwable, Graph[SourceShape[U], NotUsed]] = { + case item => Source.failed(RequestBuilderImpl.lift(item)) + } + + def richError[U]: PartialFunction[Throwable, Future[U]] = { + case item => Future.failed(RequestBuilderImpl.lift(item)) + } + + def lift(item: Throwable): scala.Throwable = item match { + case ex: StatusRuntimeException => GrpcServiceException(ex) + case other => other + } +} diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala index 4dc69c83f..1d58d24b9 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala @@ -59,3 +59,19 @@ trait Metadata { */ def asScala: scaladsl.Metadata } + +/** + * Provides access to details to more rich error details using the logical gRPC com.google.rpc.Status message, see + * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. + * + * Not for user extension + */ +@ApiMayChange +@DoNotInherit +trait MetadataStatus extends Metadata { + def getStatus(): com.google.rpc.Status + def getCode(): Int + def getMessage(): String + def getDetails(): List[com.google.protobuf.any.Any] + def getParsedDetails[K <: scalapb.GeneratedMessage](companion: scalapb.GeneratedMessageCompanion[K]): List[K] +} diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala index d8ce6056c..de2ab0a70 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala @@ -16,6 +16,7 @@ package org.apache.pekko.grpc.scaladsl import org.apache.pekko import pekko.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import pekko.util.ByteString +import com.google.protobuf.any /** * Immutable representation of the metadata in a call @@ -55,3 +56,19 @@ import pekko.util.ByteString @ApiMayChange def asList: List[(String, MetadataEntry)] } + +/** + * Provides access to details to more rich error details using the logical gRPC com.google.rpc.Status message, see + * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. + * + * Not for user extension + */ +@ApiMayChange +@DoNotInherit +trait MetadataStatus extends Metadata { + def status: com.google.rpc.Status + def code: Int + def message: String + def details: Seq[any.Any] + def getParsedDetails[K <: scalapb.GeneratedMessage](implicit msg: scalapb.GeneratedMessageCompanion[K]): Seq[K] +} From 49cdff632e6d942aa8f0f6c387f3c12fedbb8b8c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 13:58:57 +0000 Subject: [PATCH 3/4] Fix doc comment grammar in MetadataStatus trait Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala | 2 +- .../main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala index 1d58d24b9..5a455bccb 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala @@ -61,7 +61,7 @@ trait Metadata { } /** - * Provides access to details to more rich error details using the logical gRPC com.google.rpc.Status message, see + * Provides access to richer error details using the logical gRPC com.google.rpc.Status message, see * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. * * Not for user extension diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala index de2ab0a70..e566a7896 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala @@ -58,7 +58,7 @@ import com.google.protobuf.any } /** - * Provides access to details to more rich error details using the logical gRPC com.google.rpc.Status message, see + * Provides access to richer error details using the logical gRPC com.google.rpc.Status message, see * [API Design Guide](https://cloud.google.com/apis/design/errors) for more details. * * Not for user extension From 94270ccc4ba31afd88af8ff975ef7a16b142439f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 6 Mar 2026 16:00:40 +0100 Subject: [PATCH 4/4] format --- .../myapp/helloworld/grpc/RichErrorImpl.java | 1 + .../grpc/RichErrorModelNativeTest.java | 116 ++++++++++-------- .../helloworld/grpc/RichErrorNativeImpl.java | 4 +- .../myapp/helloworld/LiftedGreeterClient.java | 1 + .../LoggingErrorHandlingGreeterServer.java | 3 + .../pekko/grpc/GrpcServiceException.scala | 2 +- 6 files changed, 70 insertions(+), 57 deletions(-) diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java index f7118b369..542431dc0 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java @@ -48,6 +48,7 @@ public CompletionStage sayHello(HelloRequest in) { future.completeExceptionally(statusRuntimeException); return future; } + // #rich_error_model_unary @Override diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java index d60bc0dde..29cb4b599 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java @@ -13,6 +13,8 @@ package example.myapp.helloworld.grpc; +import static org.junit.Assert.assertEquals; + import com.google.rpc.error_details.LocalizedMessage; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -31,65 +33,71 @@ import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; -import static org.junit.Assert.assertEquals; - - public class RichErrorModelNativeTest extends JUnitSuite { - private ServerBinding run(ActorSystem sys) throws Exception { + private ServerBinding run(ActorSystem sys) throws Exception { - GreeterService impl = new RichErrorNativeImpl(); + GreeterService impl = new RichErrorNativeImpl(); - org.apache.pekko.japi.function.Function> service = GreeterServiceHandlerFactory.create(impl, sys); - CompletionStage bound = Http - .get(sys) - .newServerAt("127.0.0.1", 8091) - .bind(service); + org.apache.pekko.japi.function.Function> service = + GreeterServiceHandlerFactory.create(impl, sys); + CompletionStage bound = + Http.get(sys).newServerAt("127.0.0.1", 8091).bind(service); - bound.thenAccept(binding -> { - System.out.println("gRPC server bound to: " + binding.localAddress()); + bound.thenAccept( + binding -> { + System.out.println("gRPC server bound to: " + binding.localAddress()); }); - return bound.toCompletableFuture().get(); - } - - @Test - @SuppressWarnings("unchecked") - public void testNativeApi() throws Exception { - Config conf = ConfigFactory.load(); - ActorSystem sys = ActorSystem.create("HelloWorld", conf); - run(sys); - - GrpcClientSettings settings = GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, sys).withTls(false); - - GreeterServiceClient client = null; - try { - client = GreeterServiceClient.create(settings, sys); - - // #client_request - HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); - CompletionStage response = client.sayHello(request); - StatusRuntimeException statusRuntimeException = response.toCompletableFuture().handle((res, ex) -> { - return (StatusRuntimeException) ex; - }).get(); - - GrpcServiceException ex = GrpcServiceException.apply(statusRuntimeException); - MetadataStatus meta = (MetadataStatus) ex.getMetadata(); - assertEquals("type.googleapis.com/google.rpc.LocalizedMessage", meta.getDetails().get(0).typeUrl()); - - assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode()); - assertEquals("What is wrong?", meta.getMessage()); - - LocalizedMessage details = meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion()).get(0); - assertEquals("The password!", details.message()); - assertEquals("EN", details.locale()); - // #client_request - - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Got unexpected error " + e.getMessage()); - } finally { - if (client != null) client.close(); - sys.terminate(); - } + return bound.toCompletableFuture().get(); + } + + @Test + @SuppressWarnings("unchecked") + public void testNativeApi() throws Exception { + Config conf = ConfigFactory.load(); + ActorSystem sys = ActorSystem.create("HelloWorld", conf); + run(sys); + + GrpcClientSettings settings = + GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, sys).withTls(false); + + GreeterServiceClient client = null; + try { + client = GreeterServiceClient.create(settings, sys); + + // #client_request + HelloRequest request = HelloRequest.newBuilder().setName("Alice").build(); + CompletionStage response = client.sayHello(request); + StatusRuntimeException statusRuntimeException = + response + .toCompletableFuture() + .handle( + (res, ex) -> { + return (StatusRuntimeException) ex; + }) + .get(); + + GrpcServiceException ex = GrpcServiceException.apply(statusRuntimeException); + MetadataStatus meta = (MetadataStatus) ex.getMetadata(); + assertEquals( + "type.googleapis.com/google.rpc.LocalizedMessage", meta.getDetails().get(0).typeUrl()); + + assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode()); + assertEquals("What is wrong?", meta.getMessage()); + + LocalizedMessage details = + meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion()) + .get(0); + assertEquals("The password!", details.message()); + assertEquals("EN", details.locale()); + // #client_request + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Got unexpected error " + e.getMessage()); + } finally { + if (client != null) client.close(); + sys.terminate(); } + } } diff --git a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java index 44292840c..66921a82e 100644 --- a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java +++ b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java @@ -32,13 +32,13 @@ public CompletionStage sayHello(HelloRequest in) { ar.add(LocalizedMessage.of("EN", "The password!")); GrpcServiceException exception = - GrpcServiceException.create( - Code.INVALID_ARGUMENT, "What is wrong?", ar); + GrpcServiceException.create(Code.INVALID_ARGUMENT, "What is wrong?", ar); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(exception); return future; } + // #rich_error_model_unary @Override diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java index bac878bc7..d3d56c8fb 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java @@ -61,6 +61,7 @@ private static void singleRequestReply(GreeterServiceClient client) throws Excep CompletionStage reply = client.sayHello().addHeader("key", "value").invoke(request); System.out.println("got single reply: " + reply.toCompletableFuture().get(5, TimeUnit.SECONDS)); } + // #with-metadata private static void streamingRequest(GreeterServiceClient client) throws Exception { diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java index 49dff12a4..c3c5752a6 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java @@ -73,6 +73,7 @@ public CompletionStage sayHello(HelloRequest in) { } } } + // #implementation // #method @@ -121,6 +122,7 @@ private static Route loggingErrorHandlingGrpcRoute( } })); } + // #method // #custom-error-mapping @@ -132,6 +134,7 @@ private static Route loggingErrorHandlingGrpcRoute( return null; } }; + // #custom-error-mapping public static CompletionStage run(ActorSystem sys) throws Exception { diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala index 5a5e6e443..191c4c62e 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala @@ -17,7 +17,7 @@ import io.grpc.{ Status, StatusRuntimeException } import org.apache.pekko import pekko.annotation.ApiMayChange import pekko.grpc.scaladsl.{ Metadata, MetadataBuilder } -import pekko.grpc.internal.{ GrpcMetadataImpl, RichGrpcMetadataImpl, JavaMetadataImpl } +import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl, RichGrpcMetadataImpl } import com.google.protobuf.any.Any import io.grpc.protobuf.StatusProto import scala.jdk.CollectionConverters._