Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object PekkoHttpClientUtils {
descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(headers.entries),
source)
responseToSource(singleRequest(httpRequest), deserializer)
responseToSource(httpRequest.uri, singleRequest(httpRequest), deserializer)
}
}
}
Expand All @@ -194,15 +194,15 @@ object PekkoHttpClientUtils {
* INTERNAL API
*/
@InternalApi
def responseToSource[O](response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
def responseToSource[O](requestUri: Uri, response: Future[HttpResponse], deserializer: ProtobufSerializer[O])(
implicit ec: ExecutionContext,
mat: Materializer): Source[O, Future[GrpcResponseMetadata]] = {
Source.lazyFutureSource[O, Future[GrpcResponseMetadata]](() => {
response.map { response =>
{
if (response.status != StatusCodes.OK) {
response.entity.discardBytes()
val failure = mapToStatusException(response, immutable.Seq.empty)
val failure = mapToStatusException(requestUri, response, immutable.Seq.empty)
Source.failed(failure).mapMaterializedValue(_ => Future.failed(failure))
} else {
Codecs.detect(response) match {
Expand All @@ -211,7 +211,7 @@ object PekkoHttpClientUtils {
val trailerPromise = Promise[immutable.Seq[HttpHeader]]()
// Completed with success or failure based on grpc-status and grpc-message trailing headers
val completionFuture: Future[Unit] =
trailerPromise.future.flatMap(trailers => parseResponseStatus(response, trailers))
trailerPromise.future.flatMap(trailers => parseResponseStatus(requestUri, response, trailers))

val responseData =
response.entity match {
Expand All @@ -234,7 +234,7 @@ object PekkoHttpClientUtils {
Source.single[ByteString](data)
case _ =>
response.entity.discardBytes()
throw mapToStatusException(response, Seq.empty)
throw mapToStatusException(requestUri, response, Seq.empty)
}
responseData
// This never adds any data to the stream, but makes sure it fails with the correct error code if applicable
Expand Down Expand Up @@ -272,25 +272,37 @@ object PekkoHttpClientUtils {
})
}.mapMaterializedValue(_.flatten)

private def parseResponseStatus(response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = {
private def parseResponseStatus(requestUri: Uri, response: HttpResponse, trailers: Seq[HttpHeader]): Future[Unit] = {
val allHeaders = response.headers ++ trailers
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case Some("0") =>
Future.successful(())
case _ =>
Future.failed(mapToStatusException(response, trailers))
Future.failed(mapToStatusException(requestUri, response, trailers))
}
}

private def mapToStatusException(response: HttpResponse, trailers: Seq[HttpHeader]): StatusRuntimeException = {
private def mapToStatusException(
requestUri: Uri,
response: HttpResponse,
trailers: Seq[HttpHeader]): StatusRuntimeException = {
val allHeaders = response.headers ++ trailers
val metadata: io.grpc.Metadata = new MetadataImpl(new HeaderMetadataImpl(allHeaders).asList).toGoogleGrpcMetadata()
allHeaders.find(_.name == "grpc-status").map(_.value) match {
case None =>
new StatusRuntimeException(mapHttpStatus(response).withDescription("No grpc-status found"), metadata)
new StatusRuntimeException(
mapHttpStatus(response)
.withDescription("No grpc-status found")
.augmentDescription(s"When calling rpc service: $requestUri"),
metadata)
case Some(statusCode) =>
val description = allHeaders.find(_.name == "grpc-message").map(_.value)
new StatusRuntimeException(Status.fromCodeValue(statusCode.toInt).withDescription(description.orNull), metadata)
new StatusRuntimeException(
Status
.fromCodeValue(statusCode.toInt)
.withDescription(description.orNull)
.augmentDescription(s"When calling rpc service: $requestUri"),
metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
PatienceConfig(5.seconds, Span(100, org.scalatest.time.Millis))

"The conversion from HttpResponse to Source" should {
val requestUri = Uri("https://example.com/ServiceName/MethodName")

"map a strict 404 response to a failed stream" in {
val response =
Future.successful(HttpResponse(NotFound, entity = Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(response, null)
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.UNIMPLEMENTED)
failure.asInstanceOf[StatusRuntimeException].getStatus.getDescription should include(requestUri.toString())
}

"map a strict 200 response with non-0 gRPC error code to a failed stream" in {
Expand All @@ -54,11 +57,12 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
Nil
val response =
Future.successful(HttpResponse(OK, responseHeaders, Strict(GrpcProtocolNative.contentType, ByteString.empty)))
val source = PekkoHttpClientUtils.responseToSource(response, null)
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure shouldBe a[StatusRuntimeException]
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
failure.asInstanceOf[StatusRuntimeException].getStatus.getDescription should include(requestUri.toString())
failure.asInstanceOf[StatusRuntimeException].getTrailers.get(key) should be("custom-value-in-header")
}

Expand All @@ -75,7 +79,7 @@ class PekkoHttpClientUtilsSpec extends TestKit(ActorSystem()) with AnyWordSpecLi
Map.empty[AttributeKey[_], Any].updated(AttributeKeys.trailer, responseTrailers),
Strict(GrpcProtocolNative.contentType, ByteString.empty),
HttpProtocols.`HTTP/1.1`))
val source = PekkoHttpClientUtils.responseToSource(response, null)
val source = PekkoHttpClientUtils.responseToSource(requestUri, response, null)

val failure = source.run().failed.futureValue
failure.asInstanceOf[StatusRuntimeException].getStatus.getCode should be(Status.Code.FAILED_PRECONDITION)
Expand Down
Loading