From 959c5c997f9ff19028819382f268e1ba95277df5 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Mon, 28 Jun 2021 10:43:57 +0200 Subject: [PATCH] net: add ready/try methods to NamedPipeClient (#3866) --- examples/Cargo.toml | 4 + examples/named-pipe-ready.rs | 107 ++++++ tokio/src/net/windows/named_pipe.rs | 489 +++++++++++++++++++++++++++- tokio/tests/named_pipe.rs | 127 ++++++++ 4 files changed, 724 insertions(+), 3 deletions(-) create mode 100644 examples/named-pipe-ready.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index aa5f7b75d98..ff979cfb9ff 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -84,6 +84,10 @@ path = "custom-executor-tokio-context.rs" name = "named-pipe" path = "named-pipe.rs" +[[example]] +name = "named-pipe-ready" +path = "named-pipe-ready.rs" + [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" diff --git a/examples/named-pipe-ready.rs b/examples/named-pipe-ready.rs new file mode 100644 index 00000000000..5037f2a8744 --- /dev/null +++ b/examples/named-pipe-ready.rs @@ -0,0 +1,107 @@ +use std::io; + +#[cfg(windows)] +async fn windows_main() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest}; + use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; + + const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client"; + + let server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + // Note: we wait for a client to connect. + server.connect().await?; + + let mut server = BufReader::new(server); + + let mut buf = String::new(); + server.read_line(&mut buf).await?; + server.write_all(b"pong\n").await?; + Ok::<_, io::Error>(buf) + }); + + let client = tokio::spawn(async move { + // There's no need to use a connect loop here, since we know that the + // server is already up - `open` was called before spawning any of the + // tasks. + let client = ClientOptions::new().open(PIPE_NAME)?; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + let ready = client + .ready(Interest::READABLE | Interest::WRITABLE) + .await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + }); + + let (server, client) = tokio::try_join!(server, client)?; + + assert_eq!(server?, "ping\n"); + assert_eq!(client?, "pong\n"); + + Ok(()) +} + +#[tokio::main] +async fn main() -> io::Result<()> { + #[cfg(windows)] + { + windows_main().await?; + } + + #[cfg(not(windows))] + { + println!("Named pipes are only supported on Windows!"); + } + + Ok(()) +} diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 8013d6f5588..396556ce4f6 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -4,12 +4,12 @@ use std::ffi::c_void; use std::ffi::OsStr; -use std::io; +use std::io::{self, Read, Write}; use std::pin::Pin; use std::ptr; use std::task::{Context, Poll}; -use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; // Hide imports which are not used when generating documentation. @@ -362,6 +362,489 @@ impl NamedPipeClient { // Safety: we're ensuring the lifetime of the named pipe. unsafe { named_pipe_info(self.io.as_raw_handle()) } } + + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same pipe on a single + /// task without splitting the pipe. + /// + /// # Examples + /// + /// Concurrently read and write to the pipe on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// let mut data = vec![0; 1024]; + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut data) { + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// if ready.is_writable() { + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// println!("write {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the pipe to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// client.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Polls for read readiness. + /// + /// If the pipe is not currently ready for reading, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for reading, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + + /// Try to read data from the pipe into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeClient::readable() + /// [`ready()`]: NamedPipeClient::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// client.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Try to read data from the pipe into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: NamedPipeClient::try_read() + /// [`readable()`]: NamedPipeClient::readable() + /// [`ready()`]: NamedPipeClient::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// client.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + + /// Wait for the pipe to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Polls for write readiness. + /// + /// If the pipe is not currently ready for writing, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for writing, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Try to write a buffer to the pipe, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + + /// Try to write several buffers to the pipe, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: NamedPipeClient::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the pipe to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } } impl AsyncRead for NamedPipeClient { @@ -1017,7 +1500,7 @@ impl ClientOptions { /// [enabled I/O]: crate::runtime::Builder::enable_io /// [Tokio Runtime]: crate::runtime::Runtime /// - /// A connect loop that waits until a socket becomes available looks like + /// A connect loop that waits until a pipe becomes available looks like /// this: /// /// ```no_run diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index 3f267670502..a16a324d254 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -148,6 +148,133 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { Ok(()) } +#[tokio::test] +async fn test_named_pipe_multi_client_ready() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt as _, BufReader, Interest}; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; + const N: usize = 10; + + // The first server needs to be constructed early so that clients can + // be correctly connected. Otherwise calling .wait will cause the client to + // error. + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + for _ in 0..N { + // Wait for client to connect. + server.connect().await?; + let mut inner = BufReader::new(server); + + // Construct the next server to be connected before sending the one + // we already have of onto a task. This ensures that the server + // isn't closed (after it's done in the task) before a new one is + // available. Otherwise the client might error with + // `io::ErrorKind::NotFound`. + server = ServerOptions::new().create(PIPE_NAME)?; + + let _ = tokio::spawn(async move { + let mut buf = String::new(); + inner.read_line(&mut buf).await?; + inner.write_all(b"pong\n").await?; + inner.flush().await?; + Ok::<_, io::Error>(()) + }); + } + + Ok::<_, io::Error>(()) + }); + + let mut clients = Vec::new(); + + for _ in 0..N { + clients.push(tokio::spawn(async move { + // This showcases a generic connect loop. + // + // We immediately try to create a client, if it's not found or the + // pipe is busy we use the specialized wait function on the client + // builder. + let client = loop { + match ClientOptions::new().open(PIPE_NAME) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), + Err(e) if e.kind() == io::ErrorKind::NotFound => (), + Err(e) => return Err(e), + } + + // Wait for a named pipe to become available. + time::sleep(Duration::from_millis(50)).await; + }; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + let mut interest = Interest::READABLE; + if write_buf_cursor < write_buf.len() { + interest |= Interest::WRITABLE; + } + + let ready = client.ready(interest).await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + })); + } + + for client in clients { + let result = client.await?; + assert_eq!(result?, "pong\n"); + } + + server.await??; + Ok(()) +} + // This tests what happens when a client tries to disconnect. #[tokio::test] async fn test_named_pipe_mode_message() -> io::Result<()> {