Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ edition = "2018"
[dependencies]
log = "0.4"
pin-project-lite = "0.1.4"
tokio = { version = "0.2", features= [] }
async-pipe = "0.1"
http-body = "0.3"
bytes = "0.5"
tokio = { version = "1", features = ["rt-multi-thread", "io-util"] }
async-pipe = { git = "https://github.com/harana-oss/async-pipe-rs" }
http-body = "0.4"
bytes = "1"
http = "0.2"

[dev-dependencies]
hyper = "0.13"
tokio = { version = "0.2", features = ["full"] }
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
[![Documentation](https://docs.rs/stream-body/badge.svg)](https://docs.rs/stream-body)
[![MIT](https://img.shields.io/crates/l/stream-body.svg)](./LICENSE)

An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/).
An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/).

[Docs](https://docs.rs/stream-body)

## Motivation

The existing [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html)
The existing [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html)
as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) type.
Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) and uses `&[u8]`
Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) and uses `&[u8]`
slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.

Also, the [channel()](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) returns
a pair of a [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html).
Here, the [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again
Also, the [channel()](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) returns
a pair of a [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html).
Here, the [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again
creates allocation/de-allocation overhead.
To solve this, `StreamBody` has a method named `StreamBody::channel()` which returns a pair of an [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and the `StreamBody`
itself. As the [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) accepts `&[u8]` instead of [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html), there will
Expand All @@ -33,6 +33,7 @@ stream-body = "0.1"
```

An example on handling a large file:

```rust
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
Expand Down Expand Up @@ -77,4 +78,4 @@ async fn main() {

## Contributing

Your PRs and stars are always welcome.
Your PRs and stars are always welcome.
2 changes: 1 addition & 1 deletion examples/create-channel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use hyper::{server::Server, Body, Request, Response};
use std::{convert::Infallible, net::SocketAddr};
use stream_body::StreamBody;
use tokio::fs::File;
Expand Down
33 changes: 15 additions & 18 deletions src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ use http_body::{Body, SizeHint};
use pin_project_lite::pin_project;
use std::borrow::Cow;
use std::marker::Unpin;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead};
use tokio::io::{self, AsyncRead, ReadBuf};

const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation which handles data streaming in an efficient way.
/// An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation which handles data streaming in an efficient way.
///
/// It is similar to [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html).
/// It is similar to [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html).
pub struct StreamBody {
inner: Inner,
}
Expand Down Expand Up @@ -75,9 +74,6 @@ impl StreamBody {
let mut buffer = Vec::with_capacity(capacity);
unsafe {
buffer.set_len(capacity);

let b = &mut *(&mut buffer[..] as *mut [u8] as *mut [MaybeUninit<u8>]);
r.prepare_uninitialized_buffer(b);
}

let body = StreamBody {
Expand Down Expand Up @@ -184,24 +180,25 @@ impl Body for StreamBody {
return Poll::Ready(None);
}

let buf: &mut Box<[u8]> = &mut inner_me.buf;
let poll_status = inner_me.reader.poll_read(cx, &mut buf[..]);
let mut buf = ReadBuf::new(&mut inner_me.buf);
let poll_status = inner_me.reader.poll_read(cx, &mut buf);

match poll_status {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => match result {
Ok(read_count) if read_count > 0 => {
state.is_current_stream_data_consumed = false;

let data = StreamData::new(&buf[..read_count], Arc::clone(&inner_me.state));
Poll::Ready(Some(Ok(data)))
}
Ok(_) => {
*inner_me.reached_eof = true;
Poll::Ready(None)
if (buf.capacity() - buf.remaining()) > 0 {
state.is_current_stream_data_consumed = false;

let data = StreamData::new(buf.filled(), Arc::clone(&inner_me.state));
Poll::Ready(Some(Ok(data)))
}else{
*inner_me.reached_eof = true;
Poll::Ready(None)
}
}
Err(err) => Poll::Ready(Some(Err(err))),
},
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Buf for StreamData {
self.len - self.pos
}

fn bytes(&self) -> &[u8] {
fn chunk(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.add(self.pos), self.len - self.pos) }
}

Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! An [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/).
//! An [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) implementation with efficient streaming support for the Rust HTTP library [hyper](https://hyper.rs/).
//!
//! # Motivation
//!
//! The existing [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html)
//! The existing [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) type in [hyper](https://hyper.rs/) uses [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html)
//! as streaming chunk. Hence, a lot of buffer allocation and de-allocation happen during the real-time large data streaming because of the [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) type.
//! Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.13.4/hyper/body/trait.HttpBody.html) and uses `&[u8]`
//! Therefore, `StreamBody` comes to tackle this kind of situation. The `StreamBody` implements [HttpBody](https://docs.rs/hyper/0.14.11/hyper/body/trait.HttpBody.html) and uses `&[u8]`
//! slice as the streaming chunk, so it is possible to use the same buffer without allocating a new one; hence it overcomes any allocation/de-allocation overhead.
//!
//! Also, the [channel()](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html) returns
//! a pair of a [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.13.4/hyper/body/struct.Body.html).
//! Here, the [Sender](https://docs.rs/hyper/0.13.4/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again
//! Also, the [channel()](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html#method.channel) method in hyper [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html) returns
//! a pair of a [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) and a [Body](https://docs.rs/hyper/0.14.11/hyper/body/struct.Body.html).
//! Here, the [Sender](https://docs.rs/hyper/0.14.11/hyper/body/struct.Sender.html) accepts [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html) as a data chunk which again
//! creates allocation/de-allocation overhead.
//! To solve this, `StreamBody` has a method named `StreamBody::channel()` which returns a pair of an [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and the `StreamBody`
//! itself. As the [AsyncWrite](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) accepts `&[u8]` instead of [Bytes](https://docs.rs/bytes/0.5.4/bytes/struct.Bytes.html), there will
Expand Down