diff --git a/crates/wasi-http/src/p3/body.rs b/crates/wasi-http/src/p3/body.rs index 6af49de504d0..54299e95dc47 100644 --- a/crates/wasi-http/src/p3/body.rs +++ b/crates/wasi-http/src/p3/body.rs @@ -42,6 +42,7 @@ pub(crate) enum Body { } impl Body { + /// Implementation of `consume-body` shared between requests and responses pub(crate) fn consume( self, mut store: Access<'_, T, WasiHttp>, @@ -105,6 +106,7 @@ impl Body { } } + /// Implementation of `drop` shared between requests and responses pub(crate) fn drop(self, mut store: impl AsContextMut) { if let Body::Guest { contents_rx, @@ -120,7 +122,8 @@ impl Body { } } -pub(crate) enum GuestBodyKind { +/// The kind of body, used for error reporting +pub(crate) enum BodyKind { Request, Response, } @@ -141,20 +144,22 @@ impl ContentLength { } } +/// [StreamConsumer] implementation for bodies originating in the guest. struct GuestBodyConsumer { contents_tx: PollSender>, result_tx: Option>>, content_length: Option, - kind: GuestBodyKind, + kind: BodyKind, // `true` when the other side of `contents_tx` was unexpectedly closed closed: bool, } impl GuestBodyConsumer { + /// Constructs the approprite body size error given the [BodyKind] fn body_size_error(&self, n: Option) -> ErrorCode { match self.kind { - GuestBodyKind::Request => ErrorCode::HttpRequestBodySize(n), - GuestBodyKind::Response => ErrorCode::HttpResponseBodySize(n), + BodyKind::Request => ErrorCode::HttpRequestBodySize(n), + BodyKind::Response => ErrorCode::HttpResponseBodySize(n), } } @@ -235,6 +240,7 @@ impl StreamConsumer for GuestBodyConsumer { } } +/// [http_body::Body] implementation for bodies originating in the guest. pub(crate) struct GuestBody { contents_rx: Option>>, trailers_rx: Option>, ErrorCode>>>, @@ -242,13 +248,14 @@ pub(crate) struct GuestBody { } impl GuestBody { + /// Construct a new [GuestBody] pub(crate) fn new( mut store: impl AsContextMut, contents_rx: Option>, trailers_rx: FutureReader>, ErrorCode>>, result_tx: oneshot::Sender>, content_length: Option, - kind: GuestBodyKind, + kind: BodyKind, getter: fn(&mut T) -> WasiHttpCtxView<'_>, ) -> Self { let (trailers_http_tx, trailers_http_rx) = oneshot::channel(); @@ -290,10 +297,15 @@ impl http_body::Body for GuestBody { cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { if let Some(contents_rx) = self.contents_rx.as_mut() { + // `contents_rx` has not been closed yet, poll it while let Some(res) = ready!(contents_rx.poll_recv(cx)) { match res { Ok(buf) => { if let Some(n) = self.content_length.as_mut() { + // Substract frame length from `content_length`, + // [GuestBodyConsumer] already performs the validation, so + // just keep count as optimization for + // `is_end_stream` and `size_hint` *n = n.saturating_sub(buf.len().try_into().unwrap_or(u64::MAX)); } return Poll::Ready(Some(Ok(http_body::Frame::data(buf)))); @@ -303,14 +315,17 @@ impl http_body::Body for GuestBody { } } } + // Record that `contents_rx` is closed self.contents_rx = None; } let Some(trailers_rx) = self.trailers_rx.as_mut() else { + // `trailers_rx` has already terminated - this is the end of stream return Poll::Ready(None); }; let res = ready!(Pin::new(trailers_rx).poll(cx)); + // Record that `trailers_rx` has terminated self.trailers_rx = None; match res { Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers( @@ -328,14 +343,18 @@ impl http_body::Body for GuestBody { || !contents_rx.is_closed() || self.content_length.is_some_and(|n| n > 0) { + // `contents_rx` might still produce data frames return false; } } if let Some(trailers_rx) = self.trailers_rx.as_ref() { if !trailers_rx.is_terminated() { + // `trailers_rx` has not terminated yet return false; } } + + // no data left return true; } @@ -348,6 +367,7 @@ impl http_body::Body for GuestBody { } } +/// [http_body::Body] that has been consumed. pub(crate) struct ConsumedBody; impl http_body::Body for ConsumedBody { @@ -372,9 +392,10 @@ impl http_body::Body for ConsumedBody { } } -pub(crate) struct GuestTrailerConsumer { - pub(crate) tx: Option>, ErrorCode>>>, - pub(crate) getter: fn(&mut T) -> WasiHttpCtxView<'_>, +/// [FutureConsumer] implementation for trailers originating in the guest. +struct GuestTrailerConsumer { + tx: Option>, ErrorCode>>>, + getter: fn(&mut T) -> WasiHttpCtxView<'_>, } impl FutureConsumer for GuestTrailerConsumer @@ -387,12 +408,13 @@ where mut self: Pin<&mut Self>, _: &mut Context<'_>, mut store: StoreContextMut, - mut source: Source<'_, Self::Item>, + mut src: Source<'_, Self::Item>, _: bool, ) -> Poll> { - let value = &mut None; - source.read(store.as_context_mut(), value)?; - let res = match value.take().unwrap() { + let mut result = None; + src.read(store.as_context_mut(), &mut result) + .context("failed to read result")?; + let res = match result.context("result value missing")? { Ok(Some(trailers)) => { let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut()); let trailers = table @@ -408,6 +430,7 @@ where } } +/// [StreamProducer] implementation for bodies originating in the host. struct HostBodyStreamProducer { body: BoxBody, trailers: Option>, ErrorCode>>>, @@ -446,6 +469,8 @@ where let cap = match dst.remaining(&mut store).map(NonZeroUsize::new) { Some(Some(cap)) => Some(cap), Some(None) => { + // On 0-length the best we can do is check that underlying stream has not + // reached the end yet if self.body.is_end_stream() { break 'result Ok(None); } else { @@ -462,11 +487,13 @@ where let n = frame.len(); let cap = cap.into(); if n > cap { + // data frame does not fit in destination, fill it and buffer the rest dst.set_buffer(Cursor::new(frame.split_off(cap))); let mut dst = dst.as_direct(store, cap); dst.remaining().copy_from_slice(&frame); dst.mark_written(cap); } else { + // copy the whole frame into the destination let mut dst = dst.as_direct(store, n); dst.remaining()[..n].copy_from_slice(&frame); dst.mark_written(n); diff --git a/crates/wasi-http/src/p3/host/handler.rs b/crates/wasi-http/src/p3/host/handler.rs index a3d4bc6a6e48..026133ca7a99 100644 --- a/crates/wasi-http/src/p3/host/handler.rs +++ b/crates/wasi-http/src/p3/host/handler.rs @@ -1,16 +1,74 @@ use crate::p3::bindings::http::handler::{Host, HostWithStore}; use crate::p3::bindings::http::types::{ErrorCode, Request, Response}; -use crate::p3::body::{Body, ConsumedBody, GuestBody, GuestBodyKind}; +use crate::p3::body::{Body, BodyKind, ConsumedBody, GuestBody}; use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView, get_content_length}; use anyhow::Context as _; use core::pin::Pin; +use core::task::{Context, Poll, Waker}; use http::header::HOST; use http::{HeaderValue, Uri}; use http_body_util::BodyExt as _; use std::sync::Arc; use tokio::sync::oneshot; use tracing::debug; -use wasmtime::component::{Accessor, AccessorTask, Resource}; +use wasmtime::component::{Accessor, AccessorTask, JoinHandle, Resource}; + +/// A wrapper around [`JoinHandle`], which will [`JoinHandle::abort`] the task +/// when dropped +struct AbortOnDropJoinHandle(JoinHandle); + +impl Drop for AbortOnDropJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +/// A wrapper around [http_body::Body], which allows attaching arbitrary state to it +struct BodyWithState { + body: T, + _state: U, +} + +impl http_body::Body for BodyWithState +where + T: http_body::Body + Unpin, + U: Unpin, +{ + type Data = T::Data; + type Error = T::Error; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.get_mut().body).poll_frame(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.body.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + self.body.size_hint() + } +} + +trait BodyExt { + fn with_state(self, state: T) -> BodyWithState + where + Self: Sized, + { + BodyWithState { + body: self, + _state: state, + } + } +} + +impl BodyExt for T {} struct SendRequestTask { io: Pin> + Send>>, @@ -26,14 +84,35 @@ impl AccessorTask> for SendRequestTask { } } +async fn io_task_result( + rx: oneshot::Receiver<( + Arc, + oneshot::Receiver>, + )>, +) -> Result<(), ErrorCode> { + let Ok((_io, io_result_rx)) = rx.await else { + return Ok(()); + }; + io_result_rx.await.unwrap_or(Ok(())) +} + impl HostWithStore for WasiHttp { async fn handle( store: &Accessor, req: Resource, ) -> HttpResult> { - let getter = store.getter(); + // A handle to the I/O task, if spawned, will be sent on this channel + // and kept as part of request body state + let (io_task_tx, io_task_rx) = oneshot::channel(); + + // A handle to the I/O task, if spawned, will be sent on this channel + // along with the result receiver let (io_result_tx, io_result_rx) = oneshot::channel(); + + // Response processing result will be sent on this channel let (res_result_tx, res_result_rx) = oneshot::channel(); + + let getter = store.getter(); let fut = store.with(|mut store| { let WasiHttpCtxView { table, .. } = store.get(); let Request { @@ -56,13 +135,14 @@ impl HostWithStore for WasiHttp { result_tx, } => { let (http_result_tx, http_result_rx) = oneshot::channel(); + // `Content-Length` header value is validated in `fields` implementation let content_length = get_content_length(&headers) .map_err(|err| ErrorCode::InternalError(Some(format!("{err:#}"))))?; _ = result_tx.send(Box::new(async move { if let Ok(Err(err)) = http_result_rx.await { return Err(err); }; - io_result_rx.await.unwrap_or(Ok(())) + io_task_result(io_result_rx).await })); GuestBody::new( &mut store, @@ -70,16 +150,15 @@ impl HostWithStore for WasiHttp { trailers_rx, http_result_tx, content_length, - GuestBodyKind::Request, + BodyKind::Request, getter, ) + .with_state(io_task_rx) .boxed() } Body::Host { body, result_tx } => { - _ = result_tx.send(Box::new( - async move { io_result_rx.await.unwrap_or(Ok(())) }, - )); - body + _ = result_tx.send(Box::new(io_task_result(io_result_rx))); + body.with_state(io_task_rx).boxed() } Body::Consumed => ConsumedBody.boxed(), }; @@ -121,6 +200,7 @@ impl HostWithStore for WasiHttp { req, options.as_deref().copied(), Box::new(async { + // Forward the response processing result to `WasiHttpCtx` implementation let Ok(fut) = res_result_rx.await else { return Ok(()); }; @@ -129,16 +209,26 @@ impl HostWithStore for WasiHttp { )) })?; let (res, io) = Box::into_pin(fut).await?; - store.spawn(SendRequestTask { - io: Box::into_pin(io), - result_tx: io_result_tx, - }); let ( http::response::Parts { status, headers, .. }, body, ) = res.into_parts(); + + let mut io = Box::into_pin(io); + let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? { + Poll::Ready(()) => body, + Poll::Pending => { + // I/O driver still needs to be polled, spawn a task and send handles to it + let (tx, rx) = oneshot::channel(); + let io = store.spawn(SendRequestTask { io, result_tx: tx }); + let io = Arc::new(AbortOnDropJoinHandle(io)); + _ = io_result_tx.send((Arc::clone(&io), rx)); + _ = io_task_tx.send(Arc::clone(&io)); + body.with_state(io).boxed() + } + }; let res = Response { status, headers: Arc::new(headers), diff --git a/crates/wasi-http/src/p3/host/types.rs b/crates/wasi-http/src/p3/host/types.rs index fab634a75c82..165e4e4e1204 100644 --- a/crates/wasi-http/src/p3/host/types.rs +++ b/crates/wasi-http/src/p3/host/types.rs @@ -129,34 +129,41 @@ fn parse_header_value( } } -struct GuestBodyResultProducer( - Pin>> + Send>>, -); - -impl GuestBodyResultProducer { - fn new(rx: oneshot::Receiver> + Send>>) -> Self { - Self(Box::pin(async move { - let Ok(fut) = rx.await else { - return Ok(Ok(())); - }; - Ok(Box::into_pin(fut).await) - })) - } +enum GuestBodyResultProducer { + Receiver(oneshot::Receiver> + Send>>), + Future(Pin> + Send>>), } impl FutureProducer for GuestBodyResultProducer { type Item = Result<(), ErrorCode>; fn poll_produce( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut, finish: bool, ) -> Poll>> { - match Pin::new(&mut self.get_mut().0).poll(cx) { - Poll::Pending if finish => Poll::Ready(Ok(None)), - Poll::Pending => Poll::Pending, - Poll::Ready(result) => Poll::Ready(Ok(Some(result?))), + match &mut *self { + Self::Receiver(rx) => match Pin::new(rx).poll(cx) { + Poll::Ready(Ok(fut)) => { + let mut fut = Box::into_pin(fut); + match fut.as_mut().poll(cx) { + Poll::Ready(res) => Poll::Ready(Ok(Some(res))), + Poll::Pending => { + *self = Self::Future(fut); + Poll::Pending + } + } + } + Poll::Ready(Err(..)) => Poll::Ready(Ok(Some(Ok(())))), + Poll::Pending if finish => Poll::Ready(Ok(None)), + Poll::Pending => Poll::Pending, + }, + Self::Future(fut) => match fut.as_mut().poll(cx) { + Poll::Ready(res) => Poll::Ready(Ok(Some(res))), + Poll::Pending if finish => Poll::Ready(Ok(None)), + Poll::Pending => Poll::Pending, + }, } } } @@ -308,7 +315,6 @@ impl HostRequestWithStore for WasiHttp { let (result_tx, result_rx) = oneshot::channel(); let WasiHttpCtxView { table, .. } = store.get(); let headers = delete_fields(table, headers)?; - // `Content-Length` header value is validated in `fields` implementation let options = options .map(|options| delete_request_options(table, options)) .transpose()?; @@ -332,7 +338,7 @@ impl HostRequestWithStore for WasiHttp { FutureReader::new( instance, &mut store, - GuestBodyResultProducer::new(result_rx), + GuestBodyResultProducer::Receiver(result_rx), ), )) }) @@ -609,7 +615,7 @@ impl HostResponseWithStore for WasiHttp { FutureReader::new( instance, &mut store, - GuestBodyResultProducer::new(result_rx), + GuestBodyResultProducer::Receiver(result_rx), ), )) }) diff --git a/crates/wasi-http/src/p3/mod.rs b/crates/wasi-http/src/p3/mod.rs index d8a25e73b448..bc4a9aa8ba53 100644 --- a/crates/wasi-http/src/p3/mod.rs +++ b/crates/wasi-http/src/p3/mod.rs @@ -89,6 +89,18 @@ pub trait WasiHttpCtx: Send { } /// Send an outgoing request. + /// + /// This implementation will be used by the `wasi:http/handler#handle` implementation. + /// + /// The specified [Future] `fut` can be used to communicate + /// a request processing error, if any, to the constructor of the request. + /// For example, if the request was constructed via `wasi:http/types.request#new`, + /// a result sent on `fut` will be forwarded to the guest on the future handle returned. + /// + /// The returned [Future] can be used to communicate + /// a request processing error, if any, to the constructor of the request. + /// For example, if the request was constructed via `wasi:http/types.request#new`, + /// a result resolved from it will be forwarded to the guest on the future handle returned. #[cfg(feature = "default-send-request")] fn send_request( &mut self, diff --git a/crates/wasi-http/src/p3/proxy.rs b/crates/wasi-http/src/p3/proxy.rs index 75029e2ae769..d0fd2a0d360e 100644 --- a/crates/wasi-http/src/p3/proxy.rs +++ b/crates/wasi-http/src/p3/proxy.rs @@ -5,7 +5,7 @@ use anyhow::Context as _; use wasmtime::component::Accessor; impl Proxy { - /// Call `handle` on [Proxy] getting a [Future] back. + /// Call `wasi:http/handler#handle` on [Proxy] getting a [Response] back. pub async fn handle( &self, store: &Accessor, diff --git a/crates/wasi-http/src/p3/request.rs b/crates/wasi-http/src/p3/request.rs index 90c041cce86e..e4e728bb9d28 100644 --- a/crates/wasi-http/src/p3/request.rs +++ b/crates/wasi-http/src/p3/request.rs @@ -41,7 +41,7 @@ pub struct Request { impl Request { /// Construct a new [Request] /// - /// This returns a [Future] that the guest will use to communicate + /// This returns a [Future] that the will be used to communicate /// a request processing error, if any. pub fn new( method: Method, @@ -78,7 +78,7 @@ impl Request { /// Construct a new [Request] from [http::Request]. /// - /// This returns a [Future] that the guest will use to communicate + /// This returns a [Future] that will be used to communicate /// a request processing error, if any. pub fn from_http( req: http::Request, @@ -121,6 +121,11 @@ impl Request { /// /// This implementation is used by the `wasi:http/handler` interface /// default implementation. +/// +/// The returned [Future] can be used to communicate +/// a request processing error, if any, to the constructor of the request. +/// For example, if the request was constructed via `wasi:http/types.request#new`, +/// a result resolved from it will be forwarded to the guest on the future handle returned. #[cfg(feature = "default-send-request")] pub async fn default_send_request( mut req: http::Request + Send + 'static>, @@ -202,7 +207,10 @@ pub async fn default_send_request( { return Err(dns_error("address not available".to_string(), 0)); } - Ok(Err(..)) => return Err(ErrorCode::ConnectionRefused), + Ok(Err(err)) => { + tracing::warn!(?err, "connection refused"); + return Err(ErrorCode::ConnectionRefused); + } Err(..) => return Err(ErrorCode::ConnectionTimeout), }; let stream = if use_tls { diff --git a/crates/wasi-http/src/p3/response.rs b/crates/wasi-http/src/p3/response.rs index fc48d80ea2c7..a01f9a01de9d 100644 --- a/crates/wasi-http/src/p3/response.rs +++ b/crates/wasi-http/src/p3/response.rs @@ -1,5 +1,5 @@ use crate::p3::bindings::http::types::ErrorCode; -use crate::p3::body::{Body, ConsumedBody, GuestBody, GuestBodyKind}; +use crate::p3::body::{Body, BodyKind, ConsumedBody, GuestBody}; use crate::p3::{WasiHttpView, get_content_length}; use anyhow::Context as _; use bytes::Bytes; @@ -40,7 +40,9 @@ impl Response { /// Convert [Response] into [http::Response]. /// /// The specified [Future] `fut` can be used to communicate - /// a response processing error, if any, to the guest. + /// a response processing error, if any, to the constructor of the response. + /// For example, if the response was constructed via `wasi:http/types.response#new`, + /// a result sent on `fut` will be forwarded to the guest on the future handle returned. pub fn into_http( self, store: impl AsContextMut, @@ -55,6 +57,7 @@ impl Response { result_tx, } => { let (http_result_tx, http_result_rx) = oneshot::channel(); + // `Content-Length` header value is validated in `fields` implementation let content_length = get_content_length(&res.headers).context("failed to parse `content-length`")?; _ = result_tx.send(Box::new(async move { @@ -69,7 +72,7 @@ impl Response { trailers_rx, http_result_tx, content_length, - GuestBodyKind::Response, + BodyKind::Response, T::http, ) .boxed() diff --git a/crates/wasi/src/p3/mod.rs b/crates/wasi/src/p3/mod.rs index d9764c7038bb..5ca938895def 100644 --- a/crates/wasi/src/p3/mod.rs +++ b/crates/wasi/src/p3/mod.rs @@ -67,7 +67,12 @@ where _: StoreContextMut, _: bool, ) -> Poll>> { - Poll::Ready(Ok(Some(self.get_mut().0.take().unwrap()))) + let v = self + .get_mut() + .0 + .take() + .context("polled after returning `Ready`")?; + Poll::Ready(Ok(Some(v))) } } @@ -92,9 +97,10 @@ where finish: bool, ) -> Poll>> { match Pin::new(&mut self.get_mut().0).poll(cx) { + Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))), + Poll::Ready(Err(err)) => Poll::Ready(Err(err).context("oneshot sender dropped")), Poll::Pending if finish => Poll::Ready(Ok(None)), Poll::Pending => Poll::Pending, - Poll::Ready(result) => Poll::Ready(Ok(Some(result.context("oneshot sender dropped")?))), } } }