Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update bytes dep to 0.6 + update lock file #107

Closed
wants to merge 1 commit into from

Conversation

blacktemplar
Copy link

No description provided.

@Nemo157
Copy link
Member

Nemo157 commented Nov 17, 2020

Unfortunately this is a breaking change because it's publicly visible that <async_compression::stream::* as Stream>::Item: io::Result<bytes(0.5)::Bytes>.

I'm hoping not to publish any breaking changes until std gets async IO traits and this can go to 1.0, so I'll have to think about what to do here.

@Nemo157
Copy link
Member

Nemo157 commented Nov 19, 2020

@blacktemplar do you have some usecase where you need this? I don't really like the current stream API and it would be interesting to look at all users of it and see if there's a better API that could be used (e.g. just using the bufread API via the adapters that can convert it to streams of blocks).

cc @seanmonstar as reqwest and warp are the biggest users of it, and I assume this will impact the tokio0.3/bytes0.6 transition for them.

@seanmonstar
Copy link

seanmonstar commented Nov 19, 2020

Hm, I will need this, but I understand not wanting to make breaking changes to this library. Perhaps integrating with Bytes is small enough it doesn't need to be part of async-compression?

Or another option is to add bytes06 as a separate optional feature.

@blacktemplar
Copy link
Author

blacktemplar commented Nov 19, 2020

@blacktemplar do you have some usecase where you need this? I don't really like the current stream API and it would be interesting to look at all users of it and see if there's a better API that could be used (e.g. just using the bufread API via the adapters that can convert it to streams of blocks).

cc @seanmonstar as reqwest and warp are the biggest users of it, and I assume this will impact the tokio0.3/bytes0.6 transition for them.

The need occurred when we tried to update our application (https://github.com/sigp/lighthouse) and a lot of its dependencies to Tokio 0.3. Specifically we needed it for updating the warp dependency (sigp/warp@22ab423).

@Nemo157
Copy link
Member

Nemo157 commented Nov 19, 2020

Ok, so it's the same hyper related usecase as all the other usage of the stream feature I can find. Skimming a few of the users it looks like there's some annoyance in the current API design, so I wanna see if there's a potential redesign of a new version of it that would work better, and hopefully be able to make it forward compatible with bytes updates in the future.

@seanmonstar are there any planned changes to hyper::{Body, Error} in v0.14 other than bumping the version of bytes?

@seanmonstar
Copy link

There are a few changes coming in hyper v0.14.

@Nemo157
Copy link
Member

Nemo157 commented Nov 19, 2020

But fundamentally hyper::Body will remain a Stream<Item = Result<Bytes, hyper::Error>>, so when applying compression to it users will want to be able to wrap and receive that type back?

@seanmonstar
Copy link

That sounds right. I'm not sure what you have in mind, but I'd personally keep the Stream impl you have here (or in a separate crate if you prefer), and not take the dependency on hyper directly.

@Nemo157
Copy link
Member

Nemo157 commented Nov 27, 2020

As an alternative, reqwest at least can use tokio-util to adapt the body type to use the IO trait implementations: https://gist.github.com/Nemo157/d64dd709c8e5f7778ecc9d8183b7a6e0 (patch goes on top of seanmonstar/reqwest#1076 4d8339d). Despite the extra types involved, I believe this should have exactly the same runtime characteristics. I assume warp could quite easily do the same thing.

@messense
Copy link

@Nemo157 I've applied your patch in seanmonstar/reqwest#1076, thanks!

@djc
Copy link

djc commented Dec 25, 2020

So I have code roughly like this:

                let orig = mem::replace(rsp.body_mut(), Body::empty());
                rsp.headers_mut()
                    .insert(CONTENT_ENCODING, HeaderValue::from_static("br"));
                *rsp.body_mut() = Body::wrap_stream(BrotliEncoder::new(
                    orig.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
                ));
                rsp

How would I rewrite this while using hyper 0.14 and its Body type? I'm currently getting errors like this:

error[E0271]: type mismatch resolving `<hyper::Body as Stream>::Item == std::result::Result<bytes::bytes::Bytes, _>`
   --> mendes/src/hyper.rs:113:53
    |
113 |                 *rsp.body_mut() = Body::wrap_stream(BrotliEncoder::new(
    |                                                     ^^^^^^^^^^^^^^^^^^ expected struct `bytes::Bytes`, found struct `bytes::bytes::Bytes`
    |
    = note: expected enum `std::result::Result<bytes::Bytes, hyper::Error>`
               found enum `std::result::Result<bytes::bytes::Bytes, _>`
    = note: perhaps two different versions of crate `bytes` are being used?
    = note: required because of the requirements on the impl of `TryStream` for `hyper::Body`
    = note: required because of the requirements on the impl of `Stream` for `futures_util::stream::Map<futures_util::stream::IntoStream<hyper::Body>, futures_util::fns::MapErrFn<[closure@mendes/src/hyper.rs:114:34: 114:77]>>`
    = note: required by `BrotliEncoder::<S>::new`

@Nemo157
Copy link
Member

Nemo157 commented Dec 25, 2020

Something similar to:

Body::wrap_stream(tokio_util::codec::FramedRead::new(
    BrotliEncoder::new(tokio_util::io::StreamReader::new(orig)),
    tokio_util::codec::BytesCodec::new(),
));

Maybe with mapping the error, I don't recall what StreamReader requires of the error.

My plan is probably to add docs on how to migrate to the stream module and deprecate the stream feature within the next couple of days.

@djc
Copy link

djc commented Dec 25, 2020

You mean, migrate to the futures module since the stream module is deprecated?

I tried to implement your suggestion but can't figure it out so far. I can create a StreamReader as suggested, but I can't seem to pass the resulting stream to the BrotliEncoder because it doesn't implement the futures version of AsyncRead -- only tokio's AsyncRead. Maybe this could be fixed by adding some compat traits to the mix.

I'd suggest that life for server-side hyper users of your crate would be a whole lot easier if you just make the bump to 0.4 with bytes 1.0 for now, jumping to 1.0 once AsyncRead and AsyncWrite have made it into std. Otherwise, just adding some helper methods for bytes 1.0 behind a separate feature flag would also be alright, though I do wonder to what extent the compiler manages to make all the compat layers go away.

@Nemo157
Copy link
Member

Nemo157 commented Dec 25, 2020

Oh, use async_compression::tokio::BrotliEncoder, since that uses the same tokio traits that tokio-util does.

You mean, migrate to the futures module since the stream module is deprecated?

I meant "add docs within async-compression::stream on how to migrate to using tokio-util".

@Nemo157 Nemo157 mentioned this pull request Dec 28, 2020
@Nemo157
Copy link
Member

Nemo157 commented Dec 28, 2020

I've opened #115 to deprecate, with a link to the rendered migration docs. If anyone has any feedback on the deprecation or docs I'd appreciate it in there.

@ianthetechie
Copy link

@Nemo157 thanks for the docs! The docs seem to only cover encoding. I originally got the same error as @djc (with a hyper::Body) regarding the bytes struct, but was able to get a little bit further by wrapping the body in a StreamReader. Unfortunately I hit another issue and my head is spinning from type algebra now ;)

I'm trying to decode the hyper::Body form one format and then re-encode as something else (ex: brotli -> gzip). This was working pretty well with streams in the past and I'd just pass the decoder as the argument to the encoder, but this doesn't seem to work anymore with the tokio versions. What's the correct way to do this now?

I've included a minimal sample below of the sort of thing I'm trying to do. IOError is an aliased import of std::io::Error.

let body = tokio_util::io::StreamReader::new(body.map_err(|e| IOError::new(ErrorKind::Other, e)));

let identity = BrotliDecoder::new(body);

let encoded = GzipEncoder::new(identity);

I end up with an error like this:

538 |     let encoded = GzipEncoder::new(identity);
    |                                    ^^^^^^^^ the trait `tokio::io::AsyncBufRead` is not implemented for `async_compression::tokio::bufread::BrotliDecoder<StreamReader<futures::stream::MapErr<Body, [closure@src/server.rs:520:63: 520:100]>, bytes::Bytes>>`
    |
    = note: required by `async_compression::tokio::bufread::GzipEncoder::<R>::new`

@Nemo157
Copy link
Member

Nemo157 commented Dec 29, 2020

The async-compression::*::bufread APIs (both encoding and decoding) take a buffered reader and give back an unbuffered reader to make the costs explicit fn(impl AsyncBufRead) -> impl AsyncRead; so if you chain them you need to introduce buffering in between yourself

let body = tokio_util::io::StreamReader::new(body.map_err(|e| IOError::new(ErrorKind::Other, e)));

let identity = BrotliDecoder::new(body);

let encoded = GzipEncoder::new(tokio::io::BufReader::new(identity));

@djc
Copy link

djc commented Dec 29, 2020

Here's my change:

@@ -63,14 +63,15 @@ mod encoding {
     use std::{io, mem};
 
     #[cfg(feature = "brotli")]
-    use async_compression::stream::BrotliEncoder;
+    use async_compression::tokio::bufread::BrotliEncoder;
@@ -113,9 +114,9 @@ mod encoding {
                 let orig = mem::replace(rsp.body_mut(), Body::empty());
                 rsp.headers_mut()
                     .insert(CONTENT_ENCODING, HeaderValue::from_static("br"));
-                *rsp.body_mut() = Body::wrap_stream(BrotliEncoder::new(
-                    orig.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
-                ));
+                *rsp.body_mut() = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(
+                    StreamReader::new(orig.map_err(|e| io::Error::new(io::ErrorKind::Other, e))),
+                )));
                 rsp
             }
             #[cfg(feature = "gzip")]

@Nemo157 are there any advantages to the FramedRead method over the ReaderStream?

@Nemo157
Copy link
Member

Nemo157 commented Dec 29, 2020

I didn't realise ReaderStream existed, that makes it a bit simpler.

@Nemo157
Copy link
Member

Nemo157 commented Apr 25, 2021

Now that #115 is merged and released in 0.3.8 I'm closing this, migration docs can be seen at https://docs.rs/async-compression/0.3.8/async_compression/stream/index.html

@Nemo157 Nemo157 closed this Apr 25, 2021
@Nemo157 Nemo157 mentioned this pull request Apr 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants