Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add .chunk() associated function to blocking::Response, add .json_chunk() method to Response and blocking::Response #2000

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/async_impl/response.rs
Expand Up @@ -240,6 +240,56 @@ impl Response {
serde_json::from_slice(&full).map_err(crate::error::decode)
}

/// Try to deserialize a single chunk of the request body as JSON.
///
/// When the response body has been exhausted, this will return `Ok(None)`.
///
/// # Optional
///
/// This requires the optional `json` feature enabled.
///
/// # Example
///
/// ```rust
/// # extern crate reqwest;
/// # extern crate serde;
/// #
/// # use reqwest::Error;
/// # use serde::Deserialize;
/// #
/// // This `derive` requires the `serde` dependency.
/// #[derive(Deserialize)]
/// struct Ip {
/// origin: String,
/// }
///
/// # async fn run() -> Result<(), Error> {
/// let mut res = reqwest::get("http://httpbin.org/ip").await?;
///
/// while let Some(chunk) = res.json_chunk::<Ip>().await? {
/// println!("Chunk: {:?}", chunk);
/// }
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// This method fails whenever the response chunk is not in JSON format
/// or it cannot be properly deserialized to target type `T`. For more
/// details please see [`serde_json::from_reader`].
///
/// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html
#[cfg(feature = "json")]
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
pub async fn json_chunk<T: DeserializeOwned>(&mut self) -> crate::Result<Option<T>> {
if let Some(full) = self.chunk().await? {
Ok(Some(serde_json::from_slice(&full).map_err(crate::error::decode)?))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is assuming that every "chunk", which is basically a read() call on the socket, is a full JSON message? It's a fragile assumption, since the data could be combined into a single read, or it could be too large and be broken up into multiple reads...

Copy link
Author

@LoganDark LoganDark Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is assuming that every "chunk" [...] is a full JSON message?

Yes

It's a fragile assumption, since the data could be combined into a single read, or it could be too large and be broken up into multiple reads...

The assumption is that chunk() corresponds to one chunk from Transfer-Encoding: chunked. Not that it corresponds to whatever the OS decides read() is.

Since the chunk size is dictated by the server, they should be complete JSON messages when you are using an endpoint that returns a complete JSON message each chunk.

If this assumption is incorrect, then reqwest probably needs to be extended to support Transfer-Encoding: chunked, because otherwise I can't consume token streams from text-generation-inference.

AFAICT, reqwest has tests that expect .body_mut().next() to be an entire chunk (from Transfer-Encoding: chunked), and the implementation of chunk() defers to that exact same method call, so if that correspondence is not true then I don't know what is even going on.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reqwest (and hyper) does support chunked transfer-encoding. But it doesn't buffer up every "chunk" that way. The decoder in hyper will do 1 OS read, and then pass on either up to the chunked delimiter, or the full thing if the delimiter is not yet reached.

Copy link
Author

@LoganDark LoganDark Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reqwest (and hyper) does support chunked transfer-encoding. But it doesn't buffer up every "chunk" that way. The decoder in hyper will do 1 OS read, and then pass on either up to the chunked delimiter, or the full thing if the delimiter is not yet reached.

How do you suggest I should tell when a delimiter is reached, then? (Is there a method for this?)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunked delimiters have no semantic significance, so they can also be changed as they go through gateways/proxies. I'm not familiar with the project you linked. Most often, JSON streaming is done by delimiting JSON objects with newlines. Then what you do, is read and buffer until you get a newline, and then you can decode the object.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunked delimiters have no semantic significance, so they can also be changed as they go through gateways/proxies.

This is a server-side issue, isn't it? If the server hosts an endpoint that doesn't use the encoding properly, that's a bug on their end.

If reqwest needs to be resilient to this sort of issue, I would still want to support endpoints that actually work properly - so we'd end up having two families of functions, one that uses chunks and one that uses newlines.

In that case, I'll still need a way to tell the end of a chunk apart from an arbitrary read boundary.

Is there any way to test for this?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there isn't a way to test for it. The properties of the transfer are something that hyper handles, they are not considered part of the content (vs Content-Encoding).

It's sort of similar to how calling read() on an OS TcpStream won't tell you which bytes were in each segment, since the OS will combine segments into a single buffer if it receives multiple in-between calls to read().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to perform a couple tests and report back.

Copy link
Author

@LoganDark LoganDark Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it looks like this server actually uses Server-Sent Events, not just sending raw chunks. So it is delimited by two newlines between each message. That's a bit better.

In that case, this will probably need a better name. What do you say to this: in addition to the chunk family of functions, there's event that strips the data: prefix and reads until \n\n - then have json_event that decodes from there?

There wouldn't be any need for json_chunk at all then, or I could just make it read until a single newline like you suggested, just in case there is an API out there that doesn't use SSE.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my plan - let's remove the new json_chunk methods from this PR and narrow the scope to just adding that blocking chunk method - and then I'll work on some Server-Sent Events support in a separate PR.

It looks like reqwest's Decoder doesn't support it - we can read chunks until \n\n, but we can't just give portions of the chunk back to the Decoder if the server happens to send multiple events in a single chunk; if chunking is an implementation detail and we have to be agnostic, then we have to support that scenario.

Maybe we can provide an iterator over each event in an SSE stream? Then the iterator can keep track of whether the last event ended in the middle of some chunk.

} else {
Ok(None)
}
}

/// Get the full response body as `Bytes`.
///
/// # Example
Expand Down
72 changes: 72 additions & 0 deletions src/blocking/response.rs
Expand Up @@ -244,6 +244,55 @@ impl Response {
})
}

/// Try to deserialize a single chunk of the request body as JSON.
///
/// When the response body has been exhausted, this will return `Ok(None)`.
///
/// # Optional
///
/// This requires the optional `json` feature enabled.
///
/// # Example
///
/// ```rust
/// # extern crate reqwest;
/// # extern crate serde;
/// #
/// # use reqwest::Error;
/// # use serde::Deserialize;
/// #
/// // This `derive` requires the `serde` dependency.
/// #[derive(Deserialize)]
/// struct Ip {
/// origin: String,
/// }
///
/// # fn run() -> Result<(), Error> {
/// let mut res = reqwest::blocking::get("http://httpbin.org/ip")?;
///
/// while let Some(chunk) = res.json_chunk::<Ip>()? {
/// println!("Chunk: {:?}", chunk);
/// }
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// This method fails whenever the response chunk is not in JSON format
/// or it cannot be properly deserialized to target type `T`. For more
/// details please see [`serde_json::from_reader`].
///
/// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html
#[cfg(feature = "json")]
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
pub fn json_chunk<T: DeserializeOwned>(&mut self) -> crate::Result<Option<T>> {
wait::timeout(self.inner.json_chunk(), self.timeout).map_err(|e| match e {
wait::Waited::TimedOut(e) => crate::error::decode(e),
wait::Waited::Inner(e) => e,
})
}

/// Get the full response body as `Bytes`.
///
/// # Example
Expand All @@ -263,6 +312,29 @@ impl Response {
})
}

/// Stream a chunk of the response body.
///
/// When the response body has been exhausted, this will return `None`.
///
/// # Example
///
/// ```
/// # fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let mut res = reqwest::blocking::get("https://hyper.rs")?;
///
/// while let Some(chunk) = res.chunk()? {
/// println!("Chunk: {:?}", chunk);
/// }
/// # Ok(())
/// # }
/// ```
pub fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
wait::timeout(self.inner.chunk(), self.timeout).map_err(|e| match e {
wait::Waited::TimedOut(e) => crate::error::decode(e),
wait::Waited::Inner(e) => e,
})
}

/// Get the response text.
///
/// This method decodes the response body with BOM sniffing
Expand Down