diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala index 15ac6e320..4f6a57ae7 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoGrpcScalaClientTester.scala @@ -287,6 +287,7 @@ class PekkoGrpcScalaClientTester(val settings: Settings, backend: String, testWi throwable shouldBe a[StatusRuntimeException] val e = throwable.asInstanceOf[StatusRuntimeException] assertEquals(expectedStatus.getCode, e.getStatus.getCode) - assertEquals(expectedMessage, e.getStatus.getDescription) + // Note: message also includes what service was called + assertTrue(e.getStatus.getDescription.startsWith(expectedMessage)) } } diff --git a/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes b/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes new file mode 100644 index 000000000..1f13a0cb1 --- /dev/null +++ b/runtime/src/main/mima-filters/1.1.x.backwards.excludes/1747-status-runtime-details.excludes @@ -0,0 +1,2 @@ +# Internal changes +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.grpc.internal.PekkoHttpClientUtils.responseToSource") diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala index 508ba6e37..a7ba59777 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtils.scala @@ -185,7 +185,7 @@ object PekkoHttpClientUtils { descriptor.getFullMethodName), GrpcEntityHelpers.metadataHeaders(headers.entries), source) - responseToSource(singleRequest(httpRequest), deserializer) + responseToSource(httpRequest.uri, singleRequest(httpRequest), deserializer) } } } @@ -194,7 +194,7 @@ object PekkoHttpClientUtils { * INTERNAL API */ @InternalApi - def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( + def responseToSource[O](requestUri: Uri, response: Future[HttpResponse], deserializer: ProtobufSerializer[O])( implicit ec: ExecutionContext, mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = { Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => { @@ -202,7 +202,7 @@ object PekkoHttpClientUtils { { if (response.status != StatusCodes.OK) { response.entity.discardBytes() - val failure = mapToStatusException(response, immutable.Seq.empty) + val failure = mapToStatusException(requestUri, response, immutable.Seq.empty) Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure)) } else { Codecs.detect(response) match { @@ -211,7 +211,7 @@ object PekkoHttpClientUtils { val trailerPromise = Promise[immutable.Seq[HttpHeader]]() // Completed with success or failure based on grpc-status and grpc-message trailing headers val completionFuture: Future[Unit] = - trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers)) + trailerPromise.future.flatMap(trailers => parseResponseStatus(requestUri, response, trailers)) val responseData = response.entity match { @@ -234,7 +234,7 @@ object PekkoHttpClientUtils { Source.single[ByteString](data) case _ => response.entity.discardBytes() - throw mapToStatusException(response, Seq.empty) + throw mapToStatusException(requestUri, response, Seq.empty) } responseData // This never adds any data to the stream, but makes sure it fails with the correct error code if applicable @@ -272,25 +272,37 @@ object PekkoHttpClientUtils { }) }.mapMaterializedValue(_.flatten) - private def parseResponseStatus(response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { + private def parseResponseStatus(requestUri: Uri, response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = { val allHeaders = response.headers ++ trailers allHeaders.find(_.name == "grpc-status").map(_.value) match { case Some("0") => Future.successful(()) case _ => - Future.failed(mapToStatusException(response, trailers)) + Future.failed(mapToStatusException(requestUri, response, trailers)) } } - private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = { + private def mapToStatusException( + requestUri: Uri, + response: HttpResponse, + trailers: Seq[HttpHeader]): StatusRuntimeException = { val allHeaders = response.headers ++ trailers val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata() allHeaders.find(_.name == "grpc-status").map(_.value) match { case None => - new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata) + new StatusRuntimeException( + mapHttpStatus(response) + .withDescription("No grpc-status found") + .augmentDescription(s"When calling rpc service: ${requestUri.toString()}"), + metadata) case Some(statusCode) => val description = allHeaders.find(_.name == "grpc-message").map(_.value) - new StatusRuntimeException(Status.fromCodeValue(statusCode.toInt).withDescription(description.orNull), metadata) + new StatusRuntimeException( + Status + .fromCodeValue(statusCode.toInt) + .withDescription(description.orNull) + .augmentDescription(s"When calling rpc service: ${requestUri.toString()}"), + metadata) } } diff --git a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala index e74a9c07c..e5c1385cc 100644 --- a/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala +++ b/runtime/src/test/scala/org/apache/pekko/grpc/internal/PekkoHttpClientUtilsSpec.scala @@ -37,9 +37,10 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi "The conversion from HttpResponse to Source" should { "map a strict 404 response to a failed stream" in { + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val response = Future.successful(HttpResponse(NotFound, entity = Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] @@ -48,21 +49,18 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi } "map a strict 200 response with non-0 gRPC error code to a failed stream" in { - val responseHeaders = RawHeader("grpc-status", "9") :: - RawHeader("custom-key", "custom-value-in-header") :: - RawHeader("custom-key-bin", ByteString("custom-trailer-value").encodeBase64.utf8String) :: - Nil + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val response = - Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty))) - val source = PekkoHttpClientUtils.responseToSource(response, null) + Future.successful(HttpResponse(OK, List(RawHeader("grpc-status", "9")), Strict(GrpcProtocolNative.contentType, ByteString.empty))) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure shouldBe a[StatusRuntimeException] failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION) - failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header") } "map a strict 200 response with non-0 gRPC error code with a trailer to a failed stream with trailer metadata" in { + val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello") val responseHeaders = List(RawHeader("grpc-status", "9")) val responseTrailers = Trailer( RawHeader("custom-key", "custom-trailer-value") :: @@ -75,7 +73,7 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers), Strict(GrpcProtocolNative.contentType, ByteString.empty), HttpProtocols.`HTTP/1.1`)) - val source = PekkoHttpClientUtils.responseToSource(response, null) + val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null) val failure = source.run().failed.futureValue failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)