From 51efc8b3eb96e89e53abe2f5179ea2d356b7ff48 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:13:25 +0000 Subject: [PATCH 1/2] Initial plan From b6c48a983af2749e20721d30ef9f90547bfe7c99 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:19:43 +0000 Subject: [PATCH 2/2] Add request URI to gRPC client error descriptions (port of akka-grpc PR #1748) Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../grpc/internal/PekkoHttpClientUtils.scala | 32 +++++++++++++------ .../internal/PekkoHttpClientUtilsSpec.scala | 10 ++++-- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala index 508ba6e37..58601e2ad 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala @@ -185,7 +185,7 @@ object PekkoHttpClientUtils { descriptor.getFullMethodName), GrpcEntityHelpers.metadataHeaders(headers.entries), source) - responseToSource(singleRequest(httpRequest), deserializer) + responseToSource(httpRequest.uri, singleRequest(httpRequest), deserializer) } } } @@ -194,7 +194,7 @@ object PekkoHttpClientUtils { * INTERNAL API */ @InternalApi - def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( + def responseToSource[O](requestUri: Uri, response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( implicit ec: ExecutionContext, mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = { Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => { @@ -202,7 +202,7 @@ object PekkoHttpClientUtils { { if (response.status != StatusCodes.OK) { response.entity.discardBytes() - val failure = mapToStatusException(response, immutable.Seq.empty) + val failure = mapToStatusException(requestUri, response, immutable.Seq.empty) Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure)) } else { Codecs.detect(response) match { @@ -211,7 +211,7 @@ object PekkoHttpClientUtils { val trailerPromise = Promise[immutable.Seq[HttpHeader]]() // Completed with success or failure based on grpc-status and grpc-message trailing headers val completionFuture: Future[Unit] = - trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers)) + trailerPromise.future.flatMap(trailers => parseResponseStatus(requestUri, response, trailers)) val responseData = response.entity match { @@ -234,7 +234,7 @@ object PekkoHttpClientUtils { Source.single[ByteString](data) case _ => response.entity.discardBytes() - throw mapToStatusException(response, Seq.empty) + throw mapToStatusException(requestUri, response, Seq.empty) } responseData // This never adds any data to the stream, but makes sure it fails with the correct error code if applicable @@ -272,25 +272,37 @@ object PekkoHttpClientUtils { }) }.mapMaterializedValue(_.flatten) - private def parseResponseStatus(response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { + private def parseResponseStatus(requestUri: Uri, response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { val allHeaders = response.headers ++ trailers allHeaders.find(_.name == "grpc-status").map(_.value) match { case Some("0") => Future.successful(()) case _ => - Future.failed(mapToStatusException(response, trailers)) + Future.failed(mapToStatusException(requestUri, response, trailers)) } } - private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = { + private def mapToStatusException( + requestUri: Uri, + response: HttpResponse, + trailers: Seq[HttpHeader]): StatusRuntimeException = { val allHeaders = response.headers ++ trailers val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata() allHeaders.find(_.name == "grpc-status").map(_.value) match { case None => - new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata) + new StatusRuntimeException( + mapHttpStatus(response) + .withDescription("No grpc-status found") + .augmentDescription(s"When calling rpc service: $requestUri"), + metadata) case Some(statusCode) => val description = allHeaders.find(_.name == "grpc-message").map(_.value) - new StatusRuntimeException(Status.fromCodeValue(statusCode.toInt).withDescription(description.orNull), metadata) + new StatusRuntimeException( + Status + .fromCodeValue(statusCode.toInt) + .withDescription(description.orNull) + .augmentDescription(s"When calling rpc service: $requestUri"), + metadata) } } diff --git a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala index e74a9c07c..78913f2eb 100644 --- a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala +++ b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala @@ -36,15 +36,18 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi PatienceConfig(5.seconds, Span(100, org.scalatest.time.Millis)) "The conversion from HttpResponse to Source" should { + val requestUri = Uri("https://example.com/ServiceName/MethodName") + "map a strict 404 response to a failed stream" in { val response = Future.successful(HttpResponse(NotFound, entity = Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.UNIMPLEMENTED) + failure.asInstanceOf[StatusRuntimeException].getStatus.getDescription should include(requestUri.toString()) } "map a strict 200 response with non-0 gRPC error code to a failed stream" in { @@ -54,11 +57,12 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi Nil val response = Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION) + failure.asInstanceOf[StatusRuntimeException].getStatus.getDescription should include(requestUri.toString()) failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header") } @@ -75,7 +79,7 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers), Strict(GrpcProtocolNative.contentType, ByteString.empty), HttpProtocols.`HTTP/1.1`)) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)