Skip to content

Commit

Permalink
net: add try_read_buf for named pipes (#4626)
Browse files Browse the repository at this point in the history
  • Loading branch information
cptpiepmatz committed Apr 21, 2022
1 parent 711d9d0 commit d397e77
Showing 1 changed file with 164 additions and 0 deletions.
164 changes: 164 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -12,6 +12,10 @@ use std::task::{Context, Poll};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};

cfg_io_util! {
use bytes::BufMut;
}

// Hide imports which are not used when generating documentation.
#[cfg(not(docsrs))]
mod doc {
Expand Down Expand Up @@ -528,6 +532,86 @@ impl NamedPipeServer {
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeServer::readable()
/// [`ready()`]: NamedPipeServer::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
///
/// loop {
/// // Wait for the socket to be readable
/// server.readable().await?;
///
/// let mut buf = Vec::with_capacity(4096);
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match server.try_read_buf(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;

let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).read(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}
}

/// Waits for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down Expand Up @@ -1186,6 +1270,86 @@ impl NamedPipeClient {
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeClient::readable()
/// [`ready()`]: NamedPipeClient::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::windows::named_pipe;
/// use std::error::Error;
/// use std::io;
///
/// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
///
/// loop {
/// // Wait for the socket to be readable
/// client.readable().await?;
///
/// let mut buf = Vec::with_capacity(4096);
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match client.try_read_buf(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;

let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).read(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}
}

/// Waits for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down

0 comments on commit d397e77

Please sign in to comment.