Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: fix named pipe connect #5206

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 4 additions & 11 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -192,17 +192,10 @@ impl NamedPipeServer {
/// # Ok(()) }
/// ```
pub async fn connect(&self) -> io::Result<()> {
loop {
match self.io.connect() {
Ok(()) => break,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.registration().readiness(Interest::WRITABLE).await?;
}
Err(e) => return Err(e),
}
}

Ok(())
self.io
.registration()
.async_io(Interest::WRITABLE, || self.io.connect())
.await
}

/// Disconnects the server end of a named pipe instance from a client
Expand Down
82 changes: 82 additions & 0 deletions tokio/tests/net_named_pipe.rs
@@ -0,0 +1,82 @@
#![cfg(all(feature = "full", windows))] // Wasi does not support direct socket operations
use std::io;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeServer, ServerOptions};
use tokio::sync::oneshot::{channel, Sender};
use tokio::time::sleep;

static PATH: &str = r"\\.\pipe\Tokio inbound bug MCVE";
static MSG: &str = "Hello from server!\n";
const NUM_CLIENTS: usize = 16;
const INBOUND: bool = false;

pub async fn server(snd: Sender<()>) -> io::Result<()> {
async fn handle_conn(mut conn: NamedPipeServer) -> io::Result<()> {
conn.write_all(MSG.as_bytes()).await?;
drop(conn);

Ok(())
}

let mut srv = ServerOptions::new()
.access_inbound(INBOUND)
.access_outbound(true)
.first_pipe_instance(true)
.create(PATH)?;

let _ = snd.send(());

let mut tasks = Vec::with_capacity(NUM_CLIENTS);

for _ in 0..NUM_CLIENTS {
srv.connect().await?;
let new_srv = ServerOptions::new()
.access_inbound(INBOUND)
.access_outbound(true)
.create(PATH)?;
let old_srv = std::mem::replace(&mut srv, new_srv);
let task = tokio::spawn(handle_conn(old_srv));
tasks.push(task);
}
for task in tasks {
task.await??;
}

Ok(())
}
pub async fn client() -> io::Result<()> {
let mut buffer = String::with_capacity(128);

let mut conn = loop {
match ClientOptions::new().write(false).open(PATH) {
Err(e) if e.raw_os_error() == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as _) => {
sleep(Duration::from_millis(10)).await;
continue;
}
not_busy => break not_busy,
}
}
.map(BufReader::new)?;

conn.read_line(&mut buffer).await?;

assert_eq!(buffer, MSG);

Ok(())
}

#[tokio::test]
async fn test_several_clients() {
let (tx, rx) = channel();
let srv = tokio::spawn(server(tx));
let mut tasks = Vec::with_capacity(NUM_CLIENTS as _);
let _ = rx.await;
for _ in 0..NUM_CLIENTS {
tasks.push(tokio::spawn(client()));
}
for task in tasks {
task.await.unwrap().unwrap();
}
srv.await.unwrap().unwrap();
}