From 90e5978aad952bab77765428a8d8f4dd175cf83e Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 18 Nov 2022 17:34:13 +0100 Subject: [PATCH 1/5] net: fix named pipe connect --- tokio/src/net/windows/named_pipe.rs | 15 ++---- tokio/tests/net_named_pipe.rs | 82 +++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 11 deletions(-) create mode 100644 tokio/tests/net_named_pipe.rs diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index c895dabacc8..8345144a0a2 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -192,17 +192,10 @@ impl NamedPipeServer { /// # Ok(()) } /// ``` pub async fn connect(&self) -> io::Result<()> { - loop { - match self.io.connect() { - Ok(()) => break, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.registration().readiness(Interest::WRITABLE).await?; - } - Err(e) => return Err(e), - } - } - - Ok(()) + self.io + .registration() + .async_io(Interest::WRITABLE, || self.io.connect()) + .await } /// Disconnects the server end of a named pipe instance from a client diff --git a/tokio/tests/net_named_pipe.rs b/tokio/tests/net_named_pipe.rs new file mode 100644 index 00000000000..10cebfdb1a0 --- /dev/null +++ b/tokio/tests/net_named_pipe.rs @@ -0,0 +1,82 @@ +#![cfg(all(feature = "full", windows))] // Wasi does not support direct socket operations +use std::io; +use std::time::Duration; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeServer, ServerOptions}; +use tokio::sync::oneshot::{channel, Sender}; +use tokio::time::sleep; + +static PATH: &str = r"\\.\pipe\Tokio inbound bug MCVE"; +static MSG: &str = "Hello from server!\n"; +const NUM_CLIENTS: usize = 16; +const INBOUND: bool = false; + +pub async fn server(snd: Sender<()>) -> io::Result<()> { + async fn handle_conn(mut conn: NamedPipeServer) -> io::Result<()> { + conn.write_all(MSG.as_bytes()).await?; + drop(conn); + + Ok(()) + } + + let mut srv = ServerOptions::new() + .access_inbound(INBOUND) + .access_outbound(true) + .first_pipe_instance(true) + .create(PATH)?; + + let _ = snd.send(()); + + let mut tasks = Vec::with_capacity(NUM_CLIENTS); + + for _ in 0..NUM_CLIENTS { + srv.connect().await?; + let new_srv = ServerOptions::new() + .access_inbound(INBOUND) + .access_outbound(true) + .create(PATH)?; + let old_srv = std::mem::replace(&mut srv, new_srv); + let task = tokio::spawn(handle_conn(old_srv)); + tasks.push(task); + } + for task in tasks { + task.await??; + } + + Ok(()) +} +pub async fn client() -> io::Result<()> { + let mut buffer = String::with_capacity(128); + + let mut conn = loop { + match ClientOptions::new().write(false).open(PATH) { + Err(e) if e.raw_os_error() == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as _) => { + sleep(Duration::from_millis(10)).await; + continue; + } + not_busy => break not_busy, + } + } + .map(BufReader::new)?; + + conn.read_line(&mut buffer).await?; + + assert_eq!(buffer, MSG); + + Ok(()) +} + +#[tokio::test] +async fn test_several_clients() { + let (tx, rx) = channel(); + let srv = tokio::spawn(server(tx)); + let mut tasks = Vec::with_capacity(NUM_CLIENTS as _); + let _ = rx.await; + for _ in 0..NUM_CLIENTS { + tasks.push(tokio::spawn(client())); + } + for task in tasks { + task.await.unwrap().unwrap(); + } + srv.await.unwrap().unwrap(); +} From ca9cb4888ccd69d6e05ae040182b7a217a12db51 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Fri, 18 Nov 2022 20:42:39 +0100 Subject: [PATCH 2/5] fix named pipe --- tokio/src/net/windows/named_pipe.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 8345144a0a2..d4f79627a77 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -192,10 +192,21 @@ impl NamedPipeServer { /// # Ok(()) } /// ``` pub async fn connect(&self) -> io::Result<()> { - self.io - .registration() - .async_io(Interest::WRITABLE, || self.io.connect()) - .await + loop { + match self.io.connect() { + Ok(()) => break, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + return self + .io + .registration() + .async_io(Interest::WRITABLE, || self.io.connect()) + .await + } + Err(e) => return Err(e), + } + } + + Ok(()) } /// Disconnects the server end of a named pipe instance from a client From bdcf121e8635c3f74da7166914facca8bfb696fe Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Fri, 18 Nov 2022 21:18:46 +0100 Subject: [PATCH 3/5] remove redundant loop --- tokio/src/net/windows/named_pipe.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index d4f79627a77..2446160a127 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -192,21 +192,15 @@ impl NamedPipeServer { /// # Ok(()) } /// ``` pub async fn connect(&self) -> io::Result<()> { - loop { - match self.io.connect() { - Ok(()) => break, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - return self - .io - .registration() - .async_io(Interest::WRITABLE, || self.io.connect()) - .await - } - Err(e) => return Err(e), + match self.io.connect() { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io + .registration() + .async_io(Interest::WRITABLE, || self.io.connect()) + .await } + x => x, } - - Ok(()) } /// Disconnects the server end of a named pipe instance from a client From 425bb695819b04e2fa4b911074c1accd099d5f7a Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Sat, 19 Nov 2022 14:36:40 +0100 Subject: [PATCH 4/5] add explicit pipe access test --- tokio/tests/named_pipe.rs | 393 ----------------------------- tokio/tests/net_named_pipe.rs | 454 +++++++++++++++++++++++++++++----- 2 files changed, 393 insertions(+), 454 deletions(-) delete mode 100644 tokio/tests/named_pipe.rs diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs deleted file mode 100644 index 2055c3ce5be..00000000000 --- a/tokio/tests/named_pipe.rs +++ /dev/null @@ -1,393 +0,0 @@ -#![cfg(feature = "full")] -#![cfg(all(windows))] - -use std::io; -use std::mem; -use std::os::windows::io::AsRawHandle; -use std::time::Duration; -use tokio::io::AsyncWriteExt; -use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions}; -use tokio::time; -use winapi::shared::winerror; - -#[tokio::test] -async fn test_named_pipe_client_drop() -> io::Result<()> { - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop"; - - let mut server = ServerOptions::new().create(PIPE_NAME)?; - - assert_eq!(num_instances("test-named-pipe-client-drop")?, 1); - - let client = ClientOptions::new().open(PIPE_NAME)?; - - server.connect().await?; - drop(client); - - // instance will be broken because client is gone - match server.write_all(b"ping").await { - Err(e) if e.raw_os_error() == Some(winerror::ERROR_NO_DATA as i32) => (), - x => panic!("{:?}", x), - } - - Ok(()) -} - -#[tokio::test] -async fn test_named_pipe_single_client() -> io::Result<()> { - use tokio::io::{AsyncBufReadExt as _, BufReader}; - - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client"; - - let server = ServerOptions::new().create(PIPE_NAME)?; - - let server = tokio::spawn(async move { - // Note: we wait for a client to connect. - server.connect().await?; - - let mut server = BufReader::new(server); - - let mut buf = String::new(); - server.read_line(&mut buf).await?; - server.write_all(b"pong\n").await?; - Ok::<_, io::Error>(buf) - }); - - let client = tokio::spawn(async move { - let client = ClientOptions::new().open(PIPE_NAME)?; - - let mut client = BufReader::new(client); - - let mut buf = String::new(); - client.write_all(b"ping\n").await?; - client.read_line(&mut buf).await?; - Ok::<_, io::Error>(buf) - }); - - let (server, client) = tokio::try_join!(server, client)?; - - assert_eq!(server?, "ping\n"); - assert_eq!(client?, "pong\n"); - - Ok(()) -} - -#[tokio::test] -async fn test_named_pipe_multi_client() -> io::Result<()> { - use tokio::io::{AsyncBufReadExt as _, BufReader}; - - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client"; - const N: usize = 10; - - // The first server needs to be constructed early so that clients can - // be correctly connected. Otherwise calling .wait will cause the client to - // error. - let mut server = ServerOptions::new().create(PIPE_NAME)?; - - let server = tokio::spawn(async move { - for _ in 0..N { - // Wait for client to connect. - server.connect().await?; - let mut inner = BufReader::new(server); - - // Construct the next server to be connected before sending the one - // we already have of onto a task. This ensures that the server - // isn't closed (after it's done in the task) before a new one is - // available. Otherwise the client might error with - // `io::ErrorKind::NotFound`. - server = ServerOptions::new().create(PIPE_NAME)?; - - let _ = tokio::spawn(async move { - let mut buf = String::new(); - inner.read_line(&mut buf).await?; - inner.write_all(b"pong\n").await?; - inner.flush().await?; - Ok::<_, io::Error>(()) - }); - } - - Ok::<_, io::Error>(()) - }); - - let mut clients = Vec::new(); - - for _ in 0..N { - clients.push(tokio::spawn(async move { - // This showcases a generic connect loop. - // - // We immediately try to create a client, if it's not found or the - // pipe is busy we use the specialized wait function on the client - // builder. - let client = loop { - match ClientOptions::new().open(PIPE_NAME) { - Ok(client) => break client, - Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), - Err(e) if e.kind() == io::ErrorKind::NotFound => (), - Err(e) => return Err(e), - } - - // Wait for a named pipe to become available. - time::sleep(Duration::from_millis(10)).await; - }; - - let mut client = BufReader::new(client); - - let mut buf = String::new(); - client.write_all(b"ping\n").await?; - client.flush().await?; - client.read_line(&mut buf).await?; - Ok::<_, io::Error>(buf) - })); - } - - for client in clients { - let result = client.await?; - assert_eq!(result?, "pong\n"); - } - - server.await??; - Ok(()) -} - -#[tokio::test] -async fn test_named_pipe_multi_client_ready() -> io::Result<()> { - use tokio::io::Interest; - - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; - const N: usize = 10; - - // The first server needs to be constructed early so that clients can - // be correctly connected. Otherwise calling .wait will cause the client to - // error. - let mut server = ServerOptions::new().create(PIPE_NAME)?; - - let server = tokio::spawn(async move { - for _ in 0..N { - // Wait for client to connect. - server.connect().await?; - - let inner_server = server; - - // Construct the next server to be connected before sending the one - // we already have of onto a task. This ensures that the server - // isn't closed (after it's done in the task) before a new one is - // available. Otherwise the client might error with - // `io::ErrorKind::NotFound`. - server = ServerOptions::new().create(PIPE_NAME)?; - - let _ = tokio::spawn(async move { - let server = inner_server; - - { - let mut read_buf = [0u8; 5]; - let mut read_buf_cursor = 0; - - loop { - server.readable().await?; - - let buf = &mut read_buf[read_buf_cursor..]; - - match server.try_read(buf) { - Ok(n) => { - read_buf_cursor += n; - - if read_buf_cursor == read_buf.len() { - break; - } - } - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e); - } - } - } - }; - - { - let write_buf = b"pong\n"; - let mut write_buf_cursor = 0; - - loop { - server.writable().await?; - let buf = &write_buf[write_buf_cursor..]; - - match server.try_write(buf) { - Ok(n) => { - write_buf_cursor += n; - - if write_buf_cursor == write_buf.len() { - break; - } - } - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e); - } - } - } - } - - Ok::<_, io::Error>(()) - }); - } - - Ok::<_, io::Error>(()) - }); - - let mut clients = Vec::new(); - - for _ in 0..N { - clients.push(tokio::spawn(async move { - // This showcases a generic connect loop. - // - // We immediately try to create a client, if it's not found or the - // pipe is busy we use the specialized wait function on the client - // builder. - let client = loop { - match ClientOptions::new().open(PIPE_NAME) { - Ok(client) => break client, - Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), - Err(e) if e.kind() == io::ErrorKind::NotFound => (), - Err(e) => return Err(e), - } - - // Wait for a named pipe to become available. - time::sleep(Duration::from_millis(10)).await; - }; - - let mut read_buf = [0u8; 5]; - let mut read_buf_cursor = 0; - let write_buf = b"ping\n"; - let mut write_buf_cursor = 0; - - loop { - let mut interest = Interest::READABLE; - if write_buf_cursor < write_buf.len() { - interest |= Interest::WRITABLE; - } - - let ready = client.ready(interest).await?; - - if ready.is_readable() { - let buf = &mut read_buf[read_buf_cursor..]; - - match client.try_read(buf) { - Ok(n) => { - read_buf_cursor += n; - - if read_buf_cursor == read_buf.len() { - break; - } - } - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e); - } - } - } - - if ready.is_writable() { - let buf = &write_buf[write_buf_cursor..]; - - if buf.is_empty() { - continue; - } - - match client.try_write(buf) { - Ok(n) => { - write_buf_cursor += n; - } - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e); - } - } - } - } - - let buf = String::from_utf8_lossy(&read_buf).into_owned(); - - Ok::<_, io::Error>(buf) - })); - } - - for client in clients { - let result = client.await?; - assert_eq!(result?, "pong\n"); - } - - server.await??; - Ok(()) -} - -// This tests what happens when a client tries to disconnect. -#[tokio::test] -async fn test_named_pipe_mode_message() -> io::Result<()> { - const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message"; - - let server = ServerOptions::new() - .pipe_mode(PipeMode::Message) - .create(PIPE_NAME)?; - - let _ = ClientOptions::new().open(PIPE_NAME)?; - server.connect().await?; - Ok(()) -} - -fn num_instances(pipe_name: impl AsRef) -> io::Result { - use ntapi::ntioapi; - use winapi::shared::ntdef; - - let mut name = pipe_name.as_ref().encode_utf16().collect::>(); - let mut name = ntdef::UNICODE_STRING { - Length: (name.len() * mem::size_of::()) as u16, - MaximumLength: (name.len() * mem::size_of::()) as u16, - Buffer: name.as_mut_ptr(), - }; - let root = std::fs::File::open(r"\\.\Pipe\")?; - let mut io_status_block = unsafe { mem::zeroed() }; - let mut file_directory_information = [0_u8; 1024]; - - let status = unsafe { - ntioapi::NtQueryDirectoryFile( - root.as_raw_handle(), - std::ptr::null_mut(), - None, - std::ptr::null_mut(), - &mut io_status_block, - &mut file_directory_information as *mut _ as *mut _, - 1024, - ntioapi::FileDirectoryInformation, - 0, - &mut name, - 0, - ) - }; - - if status as u32 != winerror::NO_ERROR { - return Err(io::Error::last_os_error()); - } - - let info = unsafe { - mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information) - }; - let raw_name = unsafe { - std::slice::from_raw_parts( - info.FileName.as_ptr(), - info.FileNameLength as usize / mem::size_of::(), - ) - }; - let name = String::from_utf16(raw_name).unwrap(); - let num_instances = unsafe { *info.EndOfFile.QuadPart() }; - - assert_eq!(name, pipe_name.as_ref()); - - Ok(num_instances as u32) -} diff --git a/tokio/tests/net_named_pipe.rs b/tokio/tests/net_named_pipe.rs index 10cebfdb1a0..374a15c0bfb 100644 --- a/tokio/tests/net_named_pipe.rs +++ b/tokio/tests/net_named_pipe.rs @@ -1,82 +1,414 @@ -#![cfg(all(feature = "full", windows))] // Wasi does not support direct socket operations +#![cfg(feature = "full")] +#![cfg(all(windows))] + use std::io; +use std::mem; +use std::os::windows::io::AsRawHandle; use std::time::Duration; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeServer, ServerOptions}; -use tokio::sync::oneshot::{channel, Sender}; -use tokio::time::sleep; - -static PATH: &str = r"\\.\pipe\Tokio inbound bug MCVE"; -static MSG: &str = "Hello from server!\n"; -const NUM_CLIENTS: usize = 16; -const INBOUND: bool = false; - -pub async fn server(snd: Sender<()>) -> io::Result<()> { - async fn handle_conn(mut conn: NamedPipeServer) -> io::Result<()> { - conn.write_all(MSG.as_bytes()).await?; - drop(conn); - - Ok(()) +use tokio::io::AsyncWriteExt; +use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions}; +use tokio::time; +use winapi::shared::winerror; + +#[tokio::test] +async fn test_named_pipe_client_drop() -> io::Result<()> { + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop"; + + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + assert_eq!(num_instances("test-named-pipe-client-drop")?, 1); + + let client = ClientOptions::new().open(PIPE_NAME)?; + + server.connect().await?; + drop(client); + + // instance will be broken because client is gone + match server.write_all(b"ping").await { + Err(e) if e.raw_os_error() == Some(winerror::ERROR_NO_DATA as i32) => (), + x => panic!("{:?}", x), } - let mut srv = ServerOptions::new() - .access_inbound(INBOUND) - .access_outbound(true) - .first_pipe_instance(true) - .create(PATH)?; - - let _ = snd.send(()); - - let mut tasks = Vec::with_capacity(NUM_CLIENTS); - - for _ in 0..NUM_CLIENTS { - srv.connect().await?; - let new_srv = ServerOptions::new() - .access_inbound(INBOUND) - .access_outbound(true) - .create(PATH)?; - let old_srv = std::mem::replace(&mut srv, new_srv); - let task = tokio::spawn(handle_conn(old_srv)); - tasks.push(task); + Ok(()) +} + +#[tokio::test] +async fn test_named_pipe_single_client() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt as _, BufReader}; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client"; + + let server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + // Note: we wait for a client to connect. + server.connect().await?; + + let mut server = BufReader::new(server); + + let mut buf = String::new(); + server.read_line(&mut buf).await?; + server.write_all(b"pong\n").await?; + Ok::<_, io::Error>(buf) + }); + + let client = tokio::spawn(async move { + let client = ClientOptions::new().open(PIPE_NAME)?; + + let mut client = BufReader::new(client); + + let mut buf = String::new(); + client.write_all(b"ping\n").await?; + client.read_line(&mut buf).await?; + Ok::<_, io::Error>(buf) + }); + + let (server, client) = tokio::try_join!(server, client)?; + + assert_eq!(server?, "ping\n"); + assert_eq!(client?, "pong\n"); + + Ok(()) +} + +#[tokio::test] +async fn test_named_pipe_multi_client() -> io::Result<()> { + use tokio::io::{AsyncBufReadExt as _, BufReader}; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client"; + const N: usize = 10; + + // The first server needs to be constructed early so that clients can + // be correctly connected. Otherwise calling .wait will cause the client to + // error. + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + for _ in 0..N { + // Wait for client to connect. + server.connect().await?; + let mut inner = BufReader::new(server); + + // Construct the next server to be connected before sending the one + // we already have of onto a task. This ensures that the server + // isn't closed (after it's done in the task) before a new one is + // available. Otherwise the client might error with + // `io::ErrorKind::NotFound`. + server = ServerOptions::new().create(PIPE_NAME)?; + + let _ = tokio::spawn(async move { + let mut buf = String::new(); + inner.read_line(&mut buf).await?; + inner.write_all(b"pong\n").await?; + inner.flush().await?; + Ok::<_, io::Error>(()) + }); + } + + Ok::<_, io::Error>(()) + }); + + let mut clients = Vec::new(); + + for _ in 0..N { + clients.push(tokio::spawn(async move { + // This showcases a generic connect loop. + // + // We immediately try to create a client, if it's not found or the + // pipe is busy we use the specialized wait function on the client + // builder. + let client = loop { + match ClientOptions::new().open(PIPE_NAME) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), + Err(e) if e.kind() == io::ErrorKind::NotFound => (), + Err(e) => return Err(e), + } + + // Wait for a named pipe to become available. + time::sleep(Duration::from_millis(10)).await; + }; + + let mut client = BufReader::new(client); + + let mut buf = String::new(); + client.write_all(b"ping\n").await?; + client.flush().await?; + client.read_line(&mut buf).await?; + Ok::<_, io::Error>(buf) + })); } - for task in tasks { - task.await??; + + for client in clients { + let result = client.await?; + assert_eq!(result?, "pong\n"); } + server.await??; Ok(()) } -pub async fn client() -> io::Result<()> { - let mut buffer = String::with_capacity(128); - - let mut conn = loop { - match ClientOptions::new().write(false).open(PATH) { - Err(e) if e.raw_os_error() == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as _) => { - sleep(Duration::from_millis(10)).await; - continue; - } - not_busy => break not_busy, + +#[tokio::test] +async fn test_named_pipe_multi_client_ready() -> io::Result<()> { + use tokio::io::Interest; + + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; + const N: usize = 10; + + // The first server needs to be constructed early so that clients can + // be correctly connected. Otherwise calling .wait will cause the client to + // error. + let mut server = ServerOptions::new().create(PIPE_NAME)?; + + let server = tokio::spawn(async move { + for _ in 0..N { + // Wait for client to connect. + server.connect().await?; + + let inner_server = server; + + // Construct the next server to be connected before sending the one + // we already have of onto a task. This ensures that the server + // isn't closed (after it's done in the task) before a new one is + // available. Otherwise the client might error with + // `io::ErrorKind::NotFound`. + server = ServerOptions::new().create(PIPE_NAME)?; + + let _ = tokio::spawn(async move { + let server = inner_server; + + { + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + + loop { + server.readable().await?; + + let buf = &mut read_buf[read_buf_cursor..]; + + match server.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + }; + + { + let write_buf = b"pong\n"; + let mut write_buf_cursor = 0; + + loop { + server.writable().await?; + let buf = &write_buf[write_buf_cursor..]; + + match server.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + + if write_buf_cursor == write_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + Ok::<_, io::Error>(()) + }); } + + Ok::<_, io::Error>(()) + }); + + let mut clients = Vec::new(); + + for _ in 0..N { + clients.push(tokio::spawn(async move { + // This showcases a generic connect loop. + // + // We immediately try to create a client, if it's not found or the + // pipe is busy we use the specialized wait function on the client + // builder. + let client = loop { + match ClientOptions::new().open(PIPE_NAME) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (), + Err(e) if e.kind() == io::ErrorKind::NotFound => (), + Err(e) => return Err(e), + } + + // Wait for a named pipe to become available. + time::sleep(Duration::from_millis(10)).await; + }; + + let mut read_buf = [0u8; 5]; + let mut read_buf_cursor = 0; + let write_buf = b"ping\n"; + let mut write_buf_cursor = 0; + + loop { + let mut interest = Interest::READABLE; + if write_buf_cursor < write_buf.len() { + interest |= Interest::WRITABLE; + } + + let ready = client.ready(interest).await?; + + if ready.is_readable() { + let buf = &mut read_buf[read_buf_cursor..]; + + match client.try_read(buf) { + Ok(n) => { + read_buf_cursor += n; + + if read_buf_cursor == read_buf.len() { + break; + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + + if ready.is_writable() { + let buf = &write_buf[write_buf_cursor..]; + + if buf.is_empty() { + continue; + } + + match client.try_write(buf) { + Ok(n) => { + write_buf_cursor += n; + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e); + } + } + } + } + + let buf = String::from_utf8_lossy(&read_buf).into_owned(); + + Ok::<_, io::Error>(buf) + })); } - .map(BufReader::new)?; - conn.read_line(&mut buffer).await?; + for client in clients { + let result = client.await?; + assert_eq!(result?, "pong\n"); + } + + server.await??; + Ok(()) +} + +// This tests what happens when a client tries to disconnect. +#[tokio::test] +async fn test_named_pipe_mode_message() -> io::Result<()> { + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message"; - assert_eq!(buffer, MSG); + let server = ServerOptions::new() + .pipe_mode(PipeMode::Message) + .create(PIPE_NAME)?; + let _ = ClientOptions::new().open(PIPE_NAME)?; + server.connect().await?; Ok(()) } +// This tests `NamedPipeServer::connect` with various access settings. #[tokio::test] -async fn test_several_clients() { - let (tx, rx) = channel(); - let srv = tokio::spawn(server(tx)); - let mut tasks = Vec::with_capacity(NUM_CLIENTS as _); - let _ = rx.await; - for _ in 0..NUM_CLIENTS { - tasks.push(tokio::spawn(client())); +async fn test_named_pipe_access() -> io::Result<()> { + const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access"; + + for (inb, outb) in [(true, true), (true, false), (false, true)] { + let server = tokio::spawn(async move { + let s = ServerOptions::new() + .access_inbound(inb) + .access_outbound(outb) + .create(PIPE_NAME)?; + s.connect().await + }); + // Wait for the server to call connect. + time::sleep(Duration::from_millis(10)).await; + let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?; + server.await??; } - for task in tasks { - task.await.unwrap().unwrap(); + Ok(()) +} + +fn num_instances(pipe_name: impl AsRef) -> io::Result { + use ntapi::ntioapi; + use winapi::shared::ntdef; + + let mut name = pipe_name.as_ref().encode_utf16().collect::>(); + let mut name = ntdef::UNICODE_STRING { + Length: (name.len() * mem::size_of::()) as u16, + MaximumLength: (name.len() * mem::size_of::()) as u16, + Buffer: name.as_mut_ptr(), + }; + let root = std::fs::File::open(r"\\.\Pipe\")?; + let mut io_status_block = unsafe { mem::zeroed() }; + let mut file_directory_information = [0_u8; 1024]; + + let status = unsafe { + ntioapi::NtQueryDirectoryFile( + root.as_raw_handle(), + std::ptr::null_mut(), + None, + std::ptr::null_mut(), + &mut io_status_block, + &mut file_directory_information as *mut _ as *mut _, + 1024, + ntioapi::FileDirectoryInformation, + 0, + &mut name, + 0, + ) + }; + + if status as u32 != winerror::NO_ERROR { + return Err(io::Error::last_os_error()); } - srv.await.unwrap().unwrap(); + + let info = unsafe { + mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information) + }; + let raw_name = unsafe { + std::slice::from_raw_parts( + info.FileName.as_ptr(), + info.FileNameLength as usize / mem::size_of::(), + ) + }; + let name = String::from_utf16(raw_name).unwrap(); + let num_instances = unsafe { *info.EndOfFile.QuadPart() }; + + assert_eq!(name, pipe_name.as_ref()); + + Ok(num_instances as u32) } From 7396f34a755cc680671054c53b9c9007fc2b4788 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Sun, 20 Nov 2022 21:19:23 +0100 Subject: [PATCH 5/5] use oneshot channel instead of timing --- tokio/tests/net_named_pipe.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tokio/tests/net_named_pipe.rs b/tokio/tests/net_named_pipe.rs index 374a15c0bfb..05a01bbe2de 100644 --- a/tokio/tests/net_named_pipe.rs +++ b/tokio/tests/net_named_pipe.rs @@ -347,16 +347,22 @@ async fn test_named_pipe_access() -> io::Result<()> { const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access"; for (inb, outb) in [(true, true), (true, false), (false, true)] { + let (tx, rx) = tokio::sync::oneshot::channel(); let server = tokio::spawn(async move { let s = ServerOptions::new() .access_inbound(inb) .access_outbound(outb) .create(PIPE_NAME)?; - s.connect().await + let mut connect_fut = tokio_test::task::spawn(s.connect()); + assert!(connect_fut.poll().is_pending()); + tx.send(()).unwrap(); + connect_fut.await }); + // Wait for the server to call connect. - time::sleep(Duration::from_millis(10)).await; + rx.await.unwrap(); let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?; + server.await??; } Ok(())