From 2c5ee6c3feea344978dbc38685ad1313a8849c9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sun, 8 Mar 2026 18:58:23 +0800 Subject: [PATCH] Include service info in StatusRuntimeException Port upstream akka/akka-grpc@82e0413e. Add the request URI to the StatusRuntimeException description via augmentDescription so that callers can easily identify which service caused the failure. Add MiMa filter for the changed responseToSource signature (internal API). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../interop/PekkoGrpcScalaClientTester.scala | 3 +- .../1747-status-runtime-details.excludes | 2 ++ .../grpc/internal/PekkoHttpClientUtils.scala | 32 +++++++++++++------ .../internal/PekkoHttpClientUtilsSpec.scala | 16 ++++------ 4 files changed, 33 insertions(+), 20 deletions(-) create mode 100644 runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala index 15ac6e320..4f6a57ae7 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala @@ -287,6 +287,7 @@ class PekkoGrpcScalaClientTester(val settings: Settings, backend: String, testWi throwable shouldBe a[StatusRuntimeException] val e = throwable.asInstanceOf[StatusRuntimeException] assertEquals(expectedStatus.getCode, e.getStatus.getCode) - assertEquals(expectedMessage, e.getStatus.getDescription) + // Note: message also includes what service was called + assertTrue(e.getStatus.getDescription.startsWith(expectedMessage)) } } diff --git a/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes b/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes new file mode 100644 index 000000000..1f13a0cb1 --- /dev/null +++ b/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes @@ -0,0 +1,2 @@ +# Internal changes +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.grpc.internal.PekkoHttpClientUtils.responseToSource") diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala index 508ba6e37..a7ba59777 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala @@ -185,7 +185,7 @@ object PekkoHttpClientUtils { descriptor.getFullMethodName), GrpcEntityHelpers.metadataHeaders(headers.entries), source) - responseToSource(singleRequest(httpRequest), deserializer) + responseToSource(httpRequest.uri, singleRequest(httpRequest), deserializer) } } } @@ -194,7 +194,7 @@ object PekkoHttpClientUtils { * INTERNAL API */ @InternalApi - def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( + def responseToSource[O](requestUri: Uri, response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( implicit ec: ExecutionContext, mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = { Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => { @@ -202,7 +202,7 @@ object PekkoHttpClientUtils { { if (response.status != StatusCodes.OK) { response.entity.discardBytes() - val failure = mapToStatusException(response, immutable.Seq.empty) + val failure = mapToStatusException(requestUri, response, immutable.Seq.empty) Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure)) } else { Codecs.detect(response) match { @@ -211,7 +211,7 @@ object PekkoHttpClientUtils { val trailerPromise = Promise[immutable.Seq[HttpHeader]]() // Completed with success or failure based on grpc-status and grpc-message trailing headers val completionFuture: Future[Unit] = - trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers)) + trailerPromise.future.flatMap(trailers => parseResponseStatus(requestUri, response, trailers)) val responseData = response.entity match { @@ -234,7 +234,7 @@ object PekkoHttpClientUtils { Source.single[ByteString](data) case _ => response.entity.discardBytes() - throw mapToStatusException(response, Seq.empty) + throw mapToStatusException(requestUri, response, Seq.empty) } responseData // This never adds any data to the stream, but makes sure it fails with the correct error code if applicable @@ -272,25 +272,37 @@ object PekkoHttpClientUtils { }) }.mapMaterializedValue(_.flatten) - private def parseResponseStatus(response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { + private def parseResponseStatus(requestUri: Uri, response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { val allHeaders = response.headers ++ trailers allHeaders.find(_.name == "grpc-status").map(_.value) match { case Some("0") => Future.successful(()) case _ => - Future.failed(mapToStatusException(response, trailers)) + Future.failed(mapToStatusException(requestUri, response, trailers)) } } - private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = { + private def mapToStatusException( + requestUri: Uri, + response: HttpResponse, + trailers: Seq[HttpHeader]): StatusRuntimeException = { val allHeaders = response.headers ++ trailers val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata() allHeaders.find(_.name == "grpc-status").map(_.value) match { case None => - new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata) + new StatusRuntimeException( + mapHttpStatus(response) + .withDescription("No grpc-status found") + .augmentDescription(s"When calling rpc service: ${requestUri.toString()}"), + metadata) case Some(statusCode) => val description = allHeaders.find(_.name == "grpc-message").map(_.value) - new StatusRuntimeException(Status.fromCodeValue(statusCode.toInt).withDescription(description.orNull), metadata) + new StatusRuntimeException( + Status + .fromCodeValue(statusCode.toInt) + .withDescription(description.orNull) + .augmentDescription(s"When calling rpc service: ${requestUri.toString()}"), + metadata) } } diff --git a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala index e74a9c07c..e5c1385cc 100644 --- a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala +++ b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala @@ -37,9 +37,10 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi "The conversion from HttpResponse to Source" should { "map a strict 404 response to a failed stream" in { + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val response = Future.successful(HttpResponse(NotFound, entity = Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] @@ -48,21 +49,18 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi } "map a strict 200 response with non-0 gRPC error code to a failed stream" in { - val responseHeaders = RawHeader("grpc-status", "9") :: - RawHeader("custom-key", "custom-value-in-header") :: - RawHeader("custom-key-bin", ByteString("custom-trailer-value").encodeBase64.utf8String) :: - Nil + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val response = - Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + Future.successful(HttpResponse(OK, List(RawHeader("grpc-status", "9")), Strict(GrpcProtocolNative.contentType, ByteString.empty))) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION) - failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header") } "map a strict 200 response with non-0 gRPC error code with a trailer to a failed stream with trailer metadata" in { + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val responseHeaders = List(RawHeader("grpc-status", "9")) val responseTrailers = Trailer( RawHeader("custom-key", "custom-trailer-value") :: @@ -75,7 +73,7 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers), Strict(GrpcProtocolNative.contentType, ByteString.empty), HttpProtocols.`HTTP/1.1`)) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)