From 205f7e9a6b38c0b00dd501f786a0c8155893aec9 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Wed, 16 Jun 2021 16:50:04 +0200 Subject: [PATCH 1/8] Add ready and try_read/write --- tokio/src/net/windows/named_pipe.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 8013d6f5588..622a81b68cb 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -4,12 +4,12 @@ use std::ffi::c_void; use std::ffi::OsStr; -use std::io; +use std::io::{self, Read, Write}; use std::pin::Pin; use std::ptr; use std::task::{Context, Poll}; -use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; // Hide imports which are not used when generating documentation. @@ -362,6 +362,23 @@ impl NamedPipeClient { // Safety: we're ensuring the lifetime of the named pipe. unsafe { named_pipe_info(self.io.as_raw_handle()) } } + + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + pub fn try_write(&self, buf: &[u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } } impl AsyncRead for NamedPipeClient { From 66a90cc6bbcc2662cd7df02031844ce807386968 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 17 Jun 2021 08:49:20 +0200 Subject: [PATCH 2/8] Add a few more related methods The ready/try_read/write are usually paired with other methods, so added those as well. Copied docs from UnixStream and fixed them up --- tokio/src/net/windows/named_pipe.rs | 468 ++++++++++++++++++++++++++++ 1 file changed, 468 insertions(+) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 622a81b68cb..7dd850d7531 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -363,22 +363,490 @@ impl NamedPipeClient { unsafe { named_pipe_info(self.io.as_raw_handle()) } } + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same pipe on a single + /// task without splitting the pipe. + /// + /// # Examples + /// + /// Concurrently read and write to the stream on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// let mut data = vec![0; 1024]; + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut data) { + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// + /// } + /// + /// if ready.is_writable() { + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// println!("write {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// } + /// } + /// ``` pub async fn ready(&self, interest: Interest) -> io::Result { let event = self.io.registration().readiness(interest).await?; Ok(event.ready) } + /// Wait for the pipe to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the socket to be readable + /// client.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Polls for read readiness. + /// + /// If the pipe is not currently ready for reading, this method will + /// store a clone of the `Waker` from the provided `Context`. When the unix + /// stream becomes ready for reading, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + + /// Try to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeClient::readable() + /// [`ready()`]: NamedPipeClient::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// client.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_read(&self, buf: &mut [u8]) -> io::Result { self.io .registration() .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the pipe into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: NamedPipeClient::try_read() + /// [`readable()`]: NamedPipeClient::readable() + /// [`ready()`]: NamedPipeClient::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed + /// and will no longer yield data. If the pipe is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io::{self, IoSliceMut}; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// IoSliceMut::new(&mut buf_a), + /// IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + + /// Wait for the pipe to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Polls for write readiness. + /// + /// If the pipe is not currently ready for writing, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for writing, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Try to write a buffer to the pipe, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_write(&self, buf: &[u8]) -> io::Result { self.io .registration() .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } + + /// Try to write several buffers to the pipe, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: NamedPipeClient::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the socket to be writable + /// client.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } } impl AsyncRead for NamedPipeClient { From 3867d0b355595654cc817cfa0a390dcb21709920 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 17 Jun 2021 09:50:30 +0200 Subject: [PATCH 3/8] Add an example and a test --- examples/Cargo.toml | 4 ++ examples/named-pipe-ready.rs | 107 ++++++++++++++++++++++++++++ tokio/tests/named_pipe.rs | 130 +++++++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+) create mode 100644 examples/named-pipe-ready.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index aa5f7b75d98..ff979cfb9ff 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -84,6 +84,10 @@ path = "custom-executor-tokio-context.rs" name = "named-pipe" path = "named-pipe.rs" +[[example]] +name = "named-pipe-ready" +path = "named-pipe-ready.rs" + [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" diff --git a/examples/named-pipe-ready.rs b/examples/named-pipe-ready.rs new file mode 100644 index 00000000000..af54389f0fc --- /dev/null +++ b/examples/named-pipe-ready.rs @@ -0,0 +1,107 @@ +use std::io; + +#[cfg(windows)] +async fn windows_main() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest}; + use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; + + const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client"; + + let server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + // Note: we wait for a client to connect. + server.connect().await?; + + let mut server = BufReader::new(server); + + let mut buf = String::new(); + server.read_line(&mut buf).await?; + server.write_all(b"pong\n").await?; + Ok::<_, io::Error>(buf) + }); + + let client = tokio::spawn(async move { + // There's no need to use a connect loop here, since we know that the + // server is already up - `open` was called before spawning any of the + // tasks. + let client = ClientOptions::new().open(PIPE_NAME)?; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + let ready = client + .ready(Interest::READABLE | Interest::WRITABLE) + .await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + }); + + let (server, client) = tokio::try_join!(server, client)?; + + assert_eq!(server?, "ping\n"); + assert_eq!(client?, "pong\n"); + + Ok(()) +} + +#[tokio::main] +async fn main() -> io::Result<()> { + #[cfg(windows)] + { + windows_main().await?; + } + + #[cfg(not(windows))] + { + println!("Named pipes are only supported on Windows!"); + } + + Ok(()) +} diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index 3f267670502..081568f1324 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -148,6 +148,136 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { Ok(()) } +#[tokio::test] +async fn test_named_pipe_multi_client_ready() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt as _, BufReader, Interest}; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client"; + const N: usize = 10; + + // The first server needs to be constructed early so that clients can + // be correctly connected. Otherwise calling .wait will cause the client to + // error. + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + for _ in 0..N { + // Wait for client to connect. + server.connect().await?; + let mut inner = BufReader::new(server); + + // Construct the next server to be connected before sending the one + // we already have of onto a task. This ensures that the server + // isn't closed (after it's done in the task) before a new one is + // available. Otherwise the client might error with + // `io::ErrorKind::NotFound`. + server = ServerOptions::new().create(PIPE_NAME)?; + + let _ = tokio::spawn(async move { + let mut buf = String::new(); + inner.read_line(&mut buf).await?; + inner.write_all(b"pong\n").await?; + inner.flush().await?; + Ok::<_, io::Error>(()) + }); + } + + Ok::<_, io::Error>(()) + }); + + let mut clients = Vec::new(); + + for _ in 0..N { + clients.push(tokio::spawn(async move { + // This showcases a generic connect loop. + // + // We immediately try to create a client, if it's not found or the + // pipe is busy we use the specialized wait function on the client + // builder. + let client = loop { + match ClientOptions::new().open(PIPE_NAME) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), + Err(e) if e.kind() == io::ErrorKind::NotFound => (), + Err(e) => return Err(e), + } + + // Wait for a named pipe to become available. + time::sleep(Duration::from_millis(50)).await; + }; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + // We need to sleep in the loop, as otherwise ready() will always + // return immediately since the pipe will essentially always be + // ready for writing, resulting in a busy loop that causes + // the runtime to actually never make forward progress + time::sleep(Duration::from_millis(10)).await; + + let ready = client + .ready(Interest::READABLE | Interest::WRITABLE) + .await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + })); + } + + for client in clients { + let result = client.await?; + assert_eq!(result?, "pong\n"); + } + + server.await??; + Ok(()) +} + // This tests what happens when a client tries to disconnect. #[tokio::test] async fn test_named_pipe_mode_message() -> io::Result<()> { From 4e3e0f48f8f2bb1f9dc15272f2f07efb83607095 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 17 Jun 2021 14:03:07 +0200 Subject: [PATCH 4/8] Use unique pipe name for new test --- tokio/tests/named_pipe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index 081568f1324..7407701cc49 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -152,7 +152,7 @@ async fn test_named_pipe_multi_client() -> io::Result<()> { async fn test_named_pipe_multi_client_ready() -> io::Result<()> { use tokio::io::{AsyncBufReadExt as _, BufReader, Interest}; - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client"; + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; const N: usize = 10; // The first server needs to be constructed early so that clients can From bcdf98bdbf5bc58465e3297db991a96c3310d42d Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Fri, 18 Jun 2021 12:35:33 +0200 Subject: [PATCH 5/8] Fix up doc examples --- tokio/src/net/windows/named_pipe.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 7dd850d7531..90d0c139d0e 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -371,12 +371,12 @@ impl NamedPipeClient { /// /// # Examples /// - /// Concurrently read and write to the stream on the same task without + /// Concurrently read and write to the pipe on the same task without /// splitting. /// /// ```no_run /// use tokio::io::Interest; - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// @@ -438,7 +438,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// @@ -482,9 +482,8 @@ impl NamedPipeClient { /// Polls for read readiness. /// /// If the pipe is not currently ready for reading, this method will - /// store a clone of the `Waker` from the provided `Context`. When the unix - /// stream becomes ready for reading, `Waker::wake` will be called on the - /// waker. + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for reading, `Waker::wake` will be called on the waker. /// /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only /// the `Waker` from the `Context` passed to the most recent call is @@ -512,7 +511,7 @@ impl NamedPipeClient { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } - /// Try to read data from the stream into the provided buffer, returning how + /// Try to read data from the pipe into the provided buffer, returning how /// many bytes were read. /// /// Receives any pending data from the pipe but does not wait for new data @@ -535,7 +534,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// @@ -607,7 +606,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io::{self, IoSliceMut}; /// @@ -619,7 +618,7 @@ impl NamedPipeClient { /// /// loop { /// // Wait for the socket to be readable - /// stream.readable().await?; + /// client.readable().await?; /// /// // Creating the buffer **after** the `await` prevents it from /// // being stored in the async task. @@ -632,7 +631,7 @@ impl NamedPipeClient { /// /// // Try to read data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. - /// match stream.try_read_vectored(&mut bufs) { + /// match client.try_read_vectored(&mut bufs) { /// Ok(0) => break, /// Ok(n) => { /// println!("read {} bytes", n); @@ -663,7 +662,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// @@ -749,7 +748,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// @@ -808,7 +807,7 @@ impl NamedPipeClient { /// # Examples /// /// ```no_run - /// use use tokio::net::windows::named_pipe; + /// use tokio::net::windows::named_pipe; /// use std::error::Error; /// use std::io; /// From 1c16a2897c94b3518666759eed86609ae87a87bb Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 22 Jun 2021 15:22:42 +0200 Subject: [PATCH 6/8] Only register write interest when write buffer is not empty --- tokio/tests/named_pipe.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index 7407701cc49..be0df528743 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -212,15 +212,12 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> { let mut write_buf_cursor = 0; loop { - // We need to sleep in the loop, as otherwise ready() will always - // return immediately since the pipe will essentially always be - // ready for writing, resulting in a busy loop that causes - // the runtime to actually never make forward progress - time::sleep(Duration::from_millis(10)).await; - - let ready = client - .ready(Interest::READABLE | Interest::WRITABLE) - .await?; + let mut interest = Interest::READABLE; + if write_buf_cursor < write_buf.len() { + interest |= Interest::WRITABLE; + } + + let ready = client.ready(interest).await?; if ready.is_readable() { let buf = &mut read_buf[read_buf_cursor..]; From 5fb7db326a6cd682ce095efc0d1830bc506b81ea Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 24 Jun 2021 10:20:20 +0200 Subject: [PATCH 7/8] Cleanup docs and other PR feedback --- examples/named-pipe-ready.rs | 4 ++-- tokio/src/net/windows/named_pipe.rs | 30 ++++++++++++++--------------- tokio/tests/named_pipe.rs | 4 ++-- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/examples/named-pipe-ready.rs b/examples/named-pipe-ready.rs index af54389f0fc..5037f2a8744 100644 --- a/examples/named-pipe-ready.rs +++ b/examples/named-pipe-ready.rs @@ -48,7 +48,7 @@ async fn windows_main() -> io::Result<()> { break; } } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { continue; } Err(e) => { @@ -68,7 +68,7 @@ async fn windows_main() -> io::Result<()> { Ok(n) => { write_buf_cursor += n; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { continue; } Err(e) => { diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 90d0c139d0e..26fbcb29566 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -397,7 +397,7 @@ impl NamedPipeClient { /// Ok(n) => { /// println!("read {} bytes", n); /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -414,7 +414,7 @@ impl NamedPipeClient { /// Ok(n) => { /// println!("write {} bytes", n); /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -451,7 +451,7 @@ impl NamedPipeClient { /// let mut msg = vec![0; 1024]; /// /// loop { - /// // Wait for the socket to be readable + /// // Wait for the pipe to be readable /// client.readable().await?; /// /// // Try to read data, this may still fail with `WouldBlock` @@ -461,7 +461,7 @@ impl NamedPipeClient { /// msg.truncate(n); /// break; /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -545,7 +545,7 @@ impl NamedPipeClient { /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; /// /// loop { - /// // Wait for the socket to be readable + /// // Wait for the pipe to be readable /// client.readable().await?; /// /// // Creating the buffer **after** the `await` prevents it from @@ -559,7 +559,7 @@ impl NamedPipeClient { /// Ok(n) => { /// println!("read {} bytes", n); /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -617,7 +617,7 @@ impl NamedPipeClient { /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; /// /// loop { - /// // Wait for the socket to be readable + /// // Wait for the pipe to be readable /// client.readable().await?; /// /// // Creating the buffer **after** the `await` prevents it from @@ -636,7 +636,7 @@ impl NamedPipeClient { /// Ok(n) => { /// println!("read {} bytes", n); /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -673,7 +673,7 @@ impl NamedPipeClient { /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; /// /// loop { - /// // Wait for the socket to be writable + /// // Wait for the pipe to be writable /// client.writable().await?; /// /// // Try to write data, this may still fail with `WouldBlock` @@ -682,7 +682,7 @@ impl NamedPipeClient { /// Ok(n) => { /// break; /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -759,7 +759,7 @@ impl NamedPipeClient { /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; /// /// loop { - /// // Wait for the socket to be writable + /// // Wait for the pipe to be writable /// client.writable().await?; /// /// // Try to write data, this may still fail with `WouldBlock` @@ -768,7 +768,7 @@ impl NamedPipeClient { /// Ok(n) => { /// break; /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -820,7 +820,7 @@ impl NamedPipeClient { /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; /// /// loop { - /// // Wait for the socket to be writable + /// // Wait for the pipe to be writable /// client.writable().await?; /// /// // Try to write data, this may still fail with `WouldBlock` @@ -829,7 +829,7 @@ impl NamedPipeClient { /// Ok(n) => { /// break; /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { @@ -1501,7 +1501,7 @@ impl ClientOptions { /// [enabled I/O]: crate::runtime::Builder::enable_io /// [Tokio Runtime]: crate::runtime::Runtime /// - /// A connect loop that waits until a socket becomes available looks like + /// A connect loop that waits until a pipe becomes available looks like /// this: /// /// ```no_run diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs index be0df528743..a16a324d254 100644 --- a/tokio/tests/named_pipe.rs +++ b/tokio/tests/named_pipe.rs @@ -230,7 +230,7 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> { break; } } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { continue; } Err(e) => { @@ -250,7 +250,7 @@ async fn test_named_pipe_multi_client_ready() -> io::Result<()> { Ok(n) => { write_buf_cursor += n; } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { continue; } Err(e) => { From d62b70d7baac7af693bcca37e4007a95c4561b9a Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Thu, 24 Jun 2021 10:22:38 +0200 Subject: [PATCH 8/8] Remove extraneous line --- tokio/src/net/windows/named_pipe.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 26fbcb29566..396556ce4f6 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -404,7 +404,6 @@ impl NamedPipeClient { /// return Err(e.into()); /// } /// } - /// /// } /// /// if ready.is_writable() {