diff --git a/Cargo.toml b/Cargo.toml index 38dca95..ffc12c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,12 @@ edition = "2018" [dependencies] log = "0.4" pin-project-lite = "0.1.4" -tokio = { version = "0.2", features= [] } -async-pipe = "0.1" -http-body = "0.3" -bytes = "0.5" +tokio = { version = "1", features = ["rt-multi-thread", "io-util"] } +async-pipe = { git = "https://github.com/harana-oss/async-pipe-rs" } +http-body = "0.4" +bytes = "1" http = "0.2" [dev-dependencies] -hyper = "0.13" -tokio = { version = "0.2", features = ["full"] } \ No newline at end of file +hyper = { version = "0.14", features = ["full"] } +tokio = { version = "1", features = ["full"] } diff --git a/README.md b/README.md index d426dbb..264176c 100644 --- a/README.md +++ b/README.md @@ -4,20 +4,20 @@ [![Documentation](https://docs.rs/stream-body/badge.svg)](https://docs.rs/stream-body) [![MIT](https://img.shields.io/crates/l/stream-body.svg)](./LICENSE) -An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/). +An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/). [Docs](https://docs.rs/stream-body) ## Motivation -The existing [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) +The existing [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) type. -Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) and uses `&[u8]` +Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) and uses `&[u8]` slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead. -Also, the [channel()](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) returns -a pair of a [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html). -Here, the [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again +Also, the [channel()](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) returns +a pair of a [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html). +Here, the [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again creates allocation/de-allocation overhead. To solve this, `StreamBody` has a method named `StreamBody::channel()` which returns a pair of an [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and the `StreamBody` itself. As the [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) accepts `&[u8]` instead of [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html), there will @@ -33,6 +33,7 @@ stream-body = "0.1" ``` An example on handling a large file: + ```rust use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; @@ -77,4 +78,4 @@ async fn main() { ## Contributing -Your PRs and stars are always welcome. \ No newline at end of file +Your PRs and stars are always welcome. diff --git a/examples/create-channel.rs b/examples/create-channel.rs index aca63c1..141c332 100644 --- a/examples/create-channel.rs +++ b/examples/create-channel.rs @@ -1,5 +1,5 @@ use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::{server::Server, Body, Request, Response}; use std::{convert::Infallible, net::SocketAddr}; use stream_body::StreamBody; use tokio::fs::File; diff --git a/src/body.rs b/src/body.rs index 8f52c70..cb8c304 100644 --- a/src/body.rs +++ b/src/body.rs @@ -7,17 +7,16 @@ use http_body::{Body, SizeHint}; use pin_project_lite::pin_project; use std::borrow::Cow; use std::marker::Unpin; -use std::mem::MaybeUninit; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use tokio::io::{self, AsyncRead}; +use tokio::io::{self, AsyncRead, ReadBuf}; const DEFAULT_BUF_SIZE: usize = 8 * 1024; -/// An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation which handles data streaming in an efficient way. +/// An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation which handles data streaming in an efficient way. /// -/// It is similar to [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html). +/// It is similar to [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html). pub struct StreamBody { inner: Inner, } @@ -75,9 +74,6 @@ impl StreamBody { let mut buffer = Vec::with_capacity(capacity); unsafe { buffer.set_len(capacity); - - let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit]); - r.prepare_uninitialized_buffer(b); } let body = StreamBody { @@ -184,24 +180,25 @@ impl Body for StreamBody { return Poll::Ready(None); } - let buf: &mut Box<[u8]> = &mut inner_me.buf; - let poll_status = inner_me.reader.poll_read(cx, &mut buf[..]); + let mut buf = ReadBuf::new(&mut inner_me.buf); + let poll_status = inner_me.reader.poll_read(cx, &mut buf); match poll_status { Poll::Pending => Poll::Pending, Poll::Ready(result) => match result { - Ok(read_count) if read_count > 0 => { - state.is_current_stream_data_consumed = false; - - let data = StreamData::new(&buf[..read_count], Arc::clone(&inner_me.state)); - Poll::Ready(Some(Ok(data))) - } Ok(_) => { - *inner_me.reached_eof = true; - Poll::Ready(None) + if (buf.capacity() - buf.remaining()) > 0 { + state.is_current_stream_data_consumed = false; + + let data = StreamData::new(buf.filled(), Arc::clone(&inner_me.state)); + Poll::Ready(Some(Ok(data))) + }else{ + *inner_me.reached_eof = true; + Poll::Ready(None) + } } Err(err) => Poll::Ready(Some(Err(err))), - }, + } } } } diff --git a/src/data.rs b/src/data.rs index 5acbfc8..85c0cbb 100644 --- a/src/data.rs +++ b/src/data.rs @@ -28,7 +28,7 @@ impl Buf for StreamData { self.len - self.pos } - fn bytes(&self) -> &[u8] { + fn chunk(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.ptr.add(self.pos), self.len - self.pos) } } diff --git a/src/lib.rs b/src/lib.rs index ed95581..949530f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,15 @@ -//! An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/). +//! An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/). //! //! # Motivation //! -//! The existing [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) +//! The existing [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) //! as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) type. -//! Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) and uses `&[u8]` +//! Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) and uses `&[u8]` //! slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead. //! -//! Also, the [channel()](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) returns -//! a pair of a [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html). -//! Here, the [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again +//! Also, the [channel()](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) returns +//! a pair of a [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html). +//! Here, the [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again //! creates allocation/de-allocation overhead. //! To solve this, `StreamBody` has a method named `StreamBody::channel()` which returns a pair of an [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and the `StreamBody` //! itself. As the [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) accepts `&[u8]` instead of [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html), there will