Skip to content

Commit

Permalink
feat(body): replace Chunk type with Bytes
Browse files Browse the repository at this point in the history
Closes #1931

BREAKING CHANGE: All usage of `hyper::Chunk` should be replaced with
  `bytes::Bytes` (or `hyper::body::Bytes`).
  • Loading branch information
seanmonstar committed Dec 6, 2019
1 parent c56ccfb commit 0a97cc1
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 245 deletions.
6 changes: 3 additions & 3 deletions examples/web_api.rs
Expand Up @@ -3,7 +3,7 @@
use futures_util::{StreamExt, TryStreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Chunk, Client, Method, Request, Response, Server, StatusCode};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, GenericError>;
Expand All @@ -25,11 +25,11 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let body = Body::wrap_stream(web_res.into_body().map_ok(|b| {
Chunk::from(format!(
format!(
"<b>POST request body</b>: {}<br><b>Response</b>: {}",
POST_DATA,
std::str::from_utf8(&b).unwrap()
))
)
}));

Ok(Response::new(body))
Expand Down
58 changes: 25 additions & 33 deletions src/body/body.rs
Expand Up @@ -11,13 +11,12 @@ use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::Chunk;
use crate::common::{task, Future, Never, Pin, Poll};
use crate::upgrade::OnUpgrade;

type BodySender = mpsc::Sender<Result<Chunk, crate::Error>>;
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;

/// A stream of `Chunk`s, used when receiving bodies.
/// A stream of `Bytes`s, used when receiving bodies.
///
/// A good default `Payload` to use in many applications.
#[must_use = "streams do nothing unless polled"]
Expand All @@ -29,11 +28,11 @@ pub struct Body {
}

enum Kind {
Once(Option<Chunk>),
Once(Option<Bytes>),
Chan {
content_length: Option<u64>,
abort_rx: oneshot::Receiver<()>,
rx: mpsc::Receiver<Result<Chunk, crate::Error>>,
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
},
H2 {
content_length: Option<u64>,
Expand All @@ -45,7 +44,7 @@ enum Kind {
// See https://github.com/rust-lang/rust/issues/57017
#[cfg(feature = "stream")]
Wrapped(
Pin<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
),
}

Expand Down Expand Up @@ -152,7 +151,7 @@ impl Body {
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
O: Into<Chunk> + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Expand Down Expand Up @@ -208,7 +207,7 @@ impl Body {
})
}

fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Chunk>>> {
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.take_delayed_eof() {
Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
Expand Down Expand Up @@ -237,7 +236,7 @@ impl Body {
}
}

fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Chunk>>> {
fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
match self.kind {
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
Kind::Chan {
Expand Down Expand Up @@ -265,7 +264,7 @@ impl Body {
} => match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
Poll::Ready(Some(Ok(Chunk::from(bytes))))
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
None => Poll::Ready(None),
Expand All @@ -279,7 +278,7 @@ impl Body {
}
}

pub(super) fn take_full_data(&mut self) -> Option<Chunk> {
pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
if let Kind::Once(ref mut chunk) = self.kind {
chunk.take()
} else {
Expand All @@ -297,7 +296,7 @@ impl Default for Body {
}

impl HttpBody for Body {
type Data = Chunk;
type Data = Bytes;
type Error = crate::Error;

fn poll_data(
Expand Down Expand Up @@ -362,7 +361,7 @@ impl fmt::Debug for Body {
#[derive(Debug)]
struct Empty;
#[derive(Debug)]
struct Full<'a>(&'a Chunk);
struct Full<'a>(&'a Bytes);

let mut builder = f.debug_tuple("Body");
match self.kind {
Expand All @@ -381,7 +380,7 @@ impl fmt::Debug for Body {
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl Stream for Body {
type Item = crate::Result<Chunk>;
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
Expand All @@ -393,22 +392,22 @@ impl Stream for Body {
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
for Body
{
#[inline]
fn from(
stream: Box<
dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync,
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
>,
) -> Body {
Body::new(Kind::Wrapped(stream.into()))
}
}

impl From<Chunk> for Body {
impl From<Bytes> for Body {
#[inline]
fn from(chunk: Chunk) -> Body {
fn from(chunk: Bytes) -> Body {
if chunk.is_empty() {
Body::empty()
} else {
Expand All @@ -417,24 +416,17 @@ impl From<Chunk> for Body {
}
}

impl From<Bytes> for Body {
#[inline]
fn from(bytes: Bytes) -> Body {
Body::from(Chunk::from(bytes))
}
}

impl From<Vec<u8>> for Body {
#[inline]
fn from(vec: Vec<u8>) -> Body {
Body::from(Chunk::from(vec))
Body::from(Bytes::from(vec))
}
}

impl From<&'static [u8]> for Body {
#[inline]
fn from(slice: &'static [u8]) -> Body {
Body::from(Chunk::from(slice))
Body::from(Bytes::from(slice))
}
}

Expand All @@ -451,14 +443,14 @@ impl From<Cow<'static, [u8]>> for Body {
impl From<String> for Body {
#[inline]
fn from(s: String) -> Body {
Body::from(Chunk::from(s.into_bytes()))
Body::from(Bytes::from(s.into_bytes()))
}
}

impl From<&'static str> for Body {
#[inline]
fn from(slice: &'static str) -> Body {
Body::from(Chunk::from(slice.as_bytes()))
Body::from(Bytes::from(slice.as_bytes()))
}
}

Expand Down Expand Up @@ -486,7 +478,7 @@ impl Sender {
}

/// Send data on this channel when it is ready.
pub async fn send_data(&mut self, chunk: Chunk) -> crate::Result<()> {
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await?;
self.tx
.try_send(Ok(chunk))
Expand All @@ -497,15 +489,15 @@ impl Sender {
///
/// # Errors
///
/// Returns `Err(Chunk)` if the channel could not (currently) accept
/// another `Chunk`.
/// Returns `Err(Bytes)` if the channel could not (currently) accept
/// another `Bytes`.
///
/// # Note
///
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// [`send_data`][] instead.
pub fn try_send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
Expand Down

0 comments on commit 0a97cc1

Please sign in to comment.