Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tee adaptor for AsnycRead #4584

Closed
arifd opened this issue Mar 27, 2022 · 4 comments
Closed

tee adaptor for AsnycRead #4584

arifd opened this issue Mar 27, 2022 · 4 comments
Labels
A-tokio-util Area: The tokio-util crate C-feature-request Category: A feature request. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate

Comments

@arifd
Copy link

arifd commented Mar 27, 2022

Is your feature request related to a problem? Please describe.
As bytes were being streamed through, I needed a way to feed a copy of the bytes into a hashing function.

Describe the solution you'd like
What I ended up doing was basically implement this crate for AsyncRead

Here is the full code:

use futures::ready;
use std::{
    io::Result,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};

// an adapter that lets you peek/snoop on the data as it is being streamed
pub struct TeeReader<R: AsyncRead + Unpin, F: FnMut(&[u8])> {
    reader: R,
    f: F,
}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> Unpin for TeeReader<R, F> {}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> TeeReader<R, F> {
    /// Returns a TeeReader which can be used as AsyncRead whose
    /// reads forwards onto the supplied reader, but performs a supplied closure
    /// on the content of that buffer at every moment of the read
    pub fn new(reader: R, f: F) -> TeeReader<R, F> {
        TeeReader { reader: reader, f }
    }

    /// Consumes the `TeeReader`, returning the wrapped reader
    pub fn into_inner(self) -> R {
        self.reader
    }
}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> AsyncRead for TeeReader<R, F> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        ready!(Pin::new(&mut self.reader).poll_read(cx, buf))?;
        (self.f)(&buf.filled());
        Poll::Ready(Ok(()))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::AsyncReadExt;

    #[test]
    fn tee() {
        let mut reader = "It's over 9000!".as_bytes();
        let mut altout: Vec<u8> = Vec::new();
        let mut teeout = Vec::new();
        {
            let mut tee = TeeReader::new(&mut reader, |bytes| altout.extend(bytes));
            let _ = tee.read_to_end(&mut teeout);
        }
        assert_eq!(teeout, altout);
    }
}

Would something like this be suitable in tokio_util or maybe futures_util?

@arifd arifd added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Mar 27, 2022
@Darksonn
Copy link
Contributor

It could be reasonable to put something like it in tokio-util.

@Darksonn Darksonn added A-tokio-util Area: The tokio-util crate E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate and removed A-tokio Area: The main tokio crate labels Aug 10, 2022
@farnz
Copy link
Contributor

farnz commented Sep 19, 2022

Any objections to me tackling this? I think I see a couple of bugs in @arifd's example code, but I can fix them up as well as implement this for AsyncWrite.

farnz added a commit to farnz/tokio that referenced this issue Sep 20, 2022
There are use cases like checking hashes of files that benefit from
being able to inspect bytes read as they come in, while still letting
the main code process the bytes as normal (e.g. deserializing into
objects, knowing that if there's a hash failure, you'll discard the
result).

As this is non-trivial to get right (e.g. handling a `buf` that's not
empty when passed to `poll_read`, add a wrapper `InspectReader`
that gets this right, passing all newly read bytes to a supplied `FnMut`
closure.

Fixes: tokio-rs#4584
farnz added a commit to farnz/tokio that referenced this issue Sep 20, 2022
When writing things out, it's useful to be able to inspect the bytes
that are being written and do things like hash them as they go past.
This isn't trivial to get right, due to partial writes and efficiently
handling vectored writes (if used).

Provide an `InspectWriter` wrapper that gets this right, giving a
supplied `FnMut` closure a chance to inspect the buffers that have been
successfully written out.

Fixes: tokio-rs#4584
farnz added a commit to farnz/tokio that referenced this issue Sep 20, 2022
When writing things out, it's useful to be able to inspect the bytes
that are being written and do things like hash them as they go past.
This isn't trivial to get right, due to partial writes and efficiently
handling vectored writes (if used).

Provide an `InspectWriter` wrapper that gets this right, giving a
supplied `FnMut` closure a chance to inspect the buffers that have been
successfully written out.

Fixes: tokio-rs#4584
@farnz
Copy link
Contributor

farnz commented Sep 20, 2022

I've put together two commits in #5033 that provide wrappers for AsyncRead and AsyncWrite; the bug I thought I saw in the example code was there (if the reader doesn't empty the buffer between calls to poll_read, then data would get presented to the closure more than once, and read_to_end wasn't being .awaited in the test case).

@farnz
Copy link
Contributor

farnz commented Sep 29, 2022

This can be closed now that #5033 has landed - it looks like I fouled up the linking of PR to issue, so it didn't happen automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-util Area: The tokio-util crate C-feature-request Category: A feature request. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate
Projects
None yet
Development

No branches or pull requests

3 participants