Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class PekkoGrpcScalaClientTester(val settings: Settings, backend: String, testWi
throwable shouldBe a[StatusRuntimeException]
val e = throwable.asInstanceOf[StatusRuntimeException]
assertEquals(expectedStatus.getCode, e.getStatus.getCode)
assertEquals(expectedMessage, e.getStatus.getDescription)
// Note: message also includes what service was called
assertTrue(e.getStatus.getDescription.startsWith(expectedMessage))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Internal changes
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.grpc.internal.PekkoHttpClientUtils.responseToSource")
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object PekkoHttpClientUtils {
descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(headers.entries),
source)
responseToSource(singleRequest(httpRequest), deserializer)
responseToSource(httpRequest.uri, singleRequest(httpRequest), deserializer)
}
}
}
Expand All @@ -194,15 +194,15 @@ object PekkoHttpClientUtils {
* INTERNAL API
*/
@InternalApi
def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
def responseToSource[O](requestUri: Uri, response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
implicit ec: ExecutionContext,
mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = {
Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => {
response.map { response =>
{
if (response.status != StatusCodes.OK) {
response.entity.discardBytes()
val failure = mapToStatusException(response, immutable.Seq.empty)
val failure = mapToStatusException(requestUri, response, immutable.Seq.empty)
Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure))
} else {
Codecs.detect(response) match {
Expand All @@ -211,7 +211,7 @@ object PekkoHttpClientUtils {
val trailerPromise = Promise[immutable.Seq[HttpHeader]]()
// Completed with success or failure based on grpc-status and grpc-message trailing headers
val completionFuture: Future[Unit] =
trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers))
trailerPromise.future.flatMap(trailers => parseResponseStatus(requestUri, response, trailers))

val responseData =
response.entity match {
Expand All @@ -234,7 +234,7 @@ object PekkoHttpClientUtils {
Source.single[ByteString](data)
case _ =>
response.entity.discardBytes()
throw mapToStatusException(response, Seq.empty)
throw mapToStatusException(requestUri, response, Seq.empty)
}
responseData
// This never adds any data to the stream, but makes sure it fails with the correct error code if applicable
Expand Down Expand Up @@ -272,25 +272,37 @@ object PekkoHttpClientUtils {
})
}.mapMaterializedValue(_.flatten)

private def parseResponseStatus(response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = {
private def parseResponseStatus(requestUri: Uri, response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = {
val allHeaders = response.headers ++ trailers
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case Some("0") =>
Future.successful(())
case _ =>
Future.failed(mapToStatusException(response, trailers))
Future.failed(mapToStatusException(requestUri, response, trailers))
}
}

private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = {
private def mapToStatusException(
requestUri: Uri,
response: HttpResponse,
trailers: Seq[HttpHeader]): StatusRuntimeException = {
val allHeaders = response.headers ++ trailers
val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata()
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case None =>
new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata)
new StatusRuntimeException(
mapHttpStatus(response)
.withDescription("No grpc-status found")
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}"),
metadata)
case Some(statusCode) =>
val description = allHeaders.find(_.name == "grpc-message").map(_.value)
new StatusRuntimeException(Status.fromCodeValue(statusCode.toInt).withDescription(description.orNull), metadata)
new StatusRuntimeException(
Status
.fromCodeValue(statusCode.toInt)
.withDescription(description.orNull)
.augmentDescription(s"When calling rpc service: ${requestUri.toString()}"),
metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi

"The conversion from HttpResponse to Source" should {
"map a strict 404 response to a failed stream" in {
val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello")
val response =
Future.successful(HttpResponse(NotFound, entity = Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(response, null)
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
Expand All @@ -48,21 +49,18 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
}

"map a strict 200 response with non-0 gRPC error code to a failed stream" in {
val responseHeaders = RawHeader("grpc-status", "9") ::
RawHeader("custom-key", "custom-value-in-header") ::
RawHeader("custom-key-bin", ByteString("custom-trailer-value").encodeBase64.utf8String) ::
Nil
val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello")
val response =
Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(response, null)
Future.successful(HttpResponse(OK, List(RawHeader("grpc-status", "9")), Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header")
}

"map a strict 200 response with non-0 gRPC error code with a trailer to a failed stream with trailer metadata" in {
val requestUri = Uri("https://example.com/GuestExeSample/GrpcHello")
val responseHeaders = List(RawHeader("grpc-status", "9"))
val responseTrailers = Trailer(
RawHeader("custom-key", "custom-trailer-value") ::
Expand All @@ -75,7 +73,7 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers),
Strict(GrpcProtocolNative.contentType, ByteString.empty),
HttpProtocols.`HTTP/1.1`))
val source = PekkoHttpClientUtils.responseToSource(response, null)
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
Expand Down
Loading