Skip to content

Commit

Permalink
net: provide NamedPipe{Client, Server} types and builders (#3760)
Browse files Browse the repository at this point in the history
This builds on tokio-rs/mio#1351 and introduces the
tokio::net::windows::named_pipe module which provides low level types for
building and communicating asynchronously over windows named pipes.

Named pipes require the `net` feature flag to be enabled on Windows.

Co-authored-by: Alice Ryhl <alice@ryhl.io>
  • Loading branch information
udoprog and Darksonn committed Jun 15, 2021
1 parent 606206e commit 97e7830
Show file tree
Hide file tree
Showing 13 changed files with 1,740 additions and 0 deletions.
11 changes: 11 additions & 0 deletions examples/Cargo.toml
Expand Up @@ -22,7 +22,10 @@ serde_json = "1.0"
httparse = "1.0"
time = "0.1"
once_cell = "1.5.2"
rand = "0.8.3"

[target.'cfg(windows)'.dev-dependencies.winapi]
version = "0.3.8"

[[example]]
name = "chat"
Expand Down Expand Up @@ -76,3 +79,11 @@ path = "custom-executor.rs"
[[example]]
name = "custom-executor-tokio-context"
path = "custom-executor-tokio-context.rs"

[[example]]
name = "named-pipe"
path = "named-pipe.rs"

[[example]]
name = "named-pipe-multi-client"
path = "named-pipe-multi-client.rs"
98 changes: 98 additions & 0 deletions examples/named-pipe-multi-client.rs
@@ -0,0 +1,98 @@
use std::io;

#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
use tokio::time;
use winapi::shared::winerror;

const PIPE_NAME: &str = r"\\.\pipe\named-pipe-multi-client";
const N: usize = 10;

// The first server needs to be constructed early so that clients can
// be correctly connected. Otherwise a waiting client will error.
//
// Here we also make use of `first_pipe_instance`, which will ensure
// that there are no other servers up and running already.
let mut server = ServerOptions::new()
.first_pipe_instance(true)
.create(PIPE_NAME)?;

let server = tokio::spawn(async move {
// Artificial workload.
time::sleep(Duration::from_secs(1)).await;

for _ in 0..N {
// Wait for client to connect.
server.connect().await?;
let mut inner = server;

// Construct the next server to be connected before sending the one
// we already have of onto a task. This ensures that the server
// isn't closed (after it's done in the task) before a new one is
// available. Otherwise the client might error with
// `io::ErrorKind::NotFound`.
server = ServerOptions::new().create(PIPE_NAME)?;

let _ = tokio::spawn(async move {
let mut buf = vec![0u8; 4];
inner.read_exact(&mut buf).await?;
inner.write_all(b"pong").await?;
Ok::<_, io::Error>(())
});
}

Ok::<_, io::Error>(())
});

let mut clients = Vec::new();

for _ in 0..N {
clients.push(tokio::spawn(async move {
// This showcases a generic connect loop.
//
// We immediately try to create a client, if it's not found or
// the pipe is busy we use the specialized wait function on the
// client builder.
let mut client = loop {
match ClientOptions::new().open(PIPE_NAME) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
Err(e) => return Err(e),
}

time::sleep(Duration::from_millis(5)).await;
};

let mut buf = [0u8; 4];
client.write_all(b"ping").await?;
client.read_exact(&mut buf).await?;
Ok::<_, io::Error>(buf)
}));
}

for client in clients {
let result = client.await?;
assert_eq!(&result?[..], b"pong");
}

server.await??;
Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
#[cfg(windows)]
{
windows_main().await?;
}

#[cfg(not(windows))]
{
println!("Named pipes are only supported on Windows!");
}

Ok(())
}
60 changes: 60 additions & 0 deletions examples/named-pipe.rs
@@ -0,0 +1,60 @@
use std::io;

#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};

const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";

let server = ServerOptions::new().create(PIPE_NAME)?;

let server = tokio::spawn(async move {
// Note: we wait for a client to connect.
server.connect().await?;

let mut server = BufReader::new(server);

let mut buf = String::new();
server.read_line(&mut buf).await?;
server.write_all(b"pong\n").await?;
Ok::<_, io::Error>(buf)
});

let client = tokio::spawn(async move {
// There's no need to use a connect loop here, since we know that the
// server is already up - `open` was called before spawning any of the
// tasks.
let client = ClientOptions::new().open(PIPE_NAME)?;

let mut client = BufReader::new(client);

let mut buf = String::new();
client.write_all(b"ping\n").await?;
client.read_line(&mut buf).await?;
Ok::<_, io::Error>(buf)
});

let (server, client) = tokio::try_join!(server, client)?;

assert_eq!(server?, "ping\n");
assert_eq!(client?, "pong\n");

Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
#[cfg(windows)]
{
windows_main().await?;
}

#[cfg(not(windows))]
{
println!("Named pipes are only supported on Windows!");
}

Ok(())
}
4 changes: 4 additions & 0 deletions tokio/Cargo.toml
Expand Up @@ -54,6 +54,7 @@ net = [
"mio/tcp",
"mio/udp",
"mio/uds",
"winapi/namedpipeapi",
]
process = [
"bytes",
Expand Down Expand Up @@ -115,6 +116,9 @@ version = "0.3.8"
default-features = false
optional = true

[target.'cfg(windows)'.dev-dependencies.ntapi]
version = "0.3.6"

[dev-dependencies]
tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
Expand Down
23 changes: 23 additions & 0 deletions tokio/src/doc/mod.rs
@@ -0,0 +1,23 @@
//! Types which are documented locally in the Tokio crate, but does not actually
//! live here.
//!
//! **Note** this module is only visible on docs.rs, you cannot use it directly
//! in your own code.

/// The name of a type which is not defined here.
///
/// This is typically used as an alias for another type, like so:
///
/// ```rust,ignore
/// /// See [some::other::location](https://example.com).
/// type DEFINED_ELSEWHERE = crate::doc::NotDefinedHere;
/// ```
///
/// This type is uninhabitable like the [`never` type] to ensure that no one
/// will ever accidentally use it.
///
/// [`never` type]: https://doc.rust-lang.org/std/primitive.never.html
pub enum NotDefinedHere {}

pub mod os;
pub mod winapi;
26 changes: 26 additions & 0 deletions tokio/src/doc/os.rs
@@ -0,0 +1,26 @@
//! See [std::os](https://doc.rust-lang.org/std/os/index.html).

/// Platform-specific extensions to `std` for Windows.
///
/// See [std::os::windows](https://doc.rust-lang.org/std/os/windows/index.html).
pub mod windows {
/// Windows-specific extensions to general I/O primitives.
///
/// See [std::os::windows::io](https://doc.rust-lang.org/std/os/windows/io/index.html).
pub mod io {
/// See [std::os::windows::io::RawHandle](https://doc.rust-lang.org/std/os/windows/io/type.RawHandle.html)
pub type RawHandle = crate::doc::NotDefinedHere;

/// See [std::os::windows::io::AsRawHandle](https://doc.rust-lang.org/std/os/windows/io/trait.AsRawHandle.html)
pub trait AsRawHandle {
/// See [std::os::windows::io::FromRawHandle::from_raw_handle](https://doc.rust-lang.org/std/os/windows/io/trait.AsRawHandle.html#tymethod.as_raw_handle)
fn as_raw_handle(&self) -> RawHandle;
}

/// See [std::os::windows::io::FromRawHandle](https://doc.rust-lang.org/std/os/windows/io/trait.FromRawHandle.html)
pub trait FromRawHandle {
/// See [std::os::windows::io::FromRawHandle::from_raw_handle](https://doc.rust-lang.org/std/os/windows/io/trait.FromRawHandle.html#tymethod.from_raw_handle)
unsafe fn from_raw_handle(handle: RawHandle) -> Self;
}
}
}
66 changes: 66 additions & 0 deletions tokio/src/doc/winapi.rs
@@ -0,0 +1,66 @@
//! See [winapi].
//!
//! [winapi]: https://docs.rs/winapi

/// See [winapi::shared](https://docs.rs/winapi/*/winapi/shared/index.html).
pub mod shared {
/// See [winapi::shared::winerror](https://docs.rs/winapi/*/winapi/shared/winerror/index.html).
#[allow(non_camel_case_types)]
pub mod winerror {
/// See [winapi::shared::winerror::ERROR_ACCESS_DENIED][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_ACCESS_DENIED.html
pub type ERROR_ACCESS_DENIED = crate::doc::NotDefinedHere;

/// See [winapi::shared::winerror::ERROR_PIPE_BUSY][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_PIPE_BUSY.html
pub type ERROR_PIPE_BUSY = crate::doc::NotDefinedHere;

/// See [winapi::shared::winerror::ERROR_MORE_DATA][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_MORE_DATA.html
pub type ERROR_MORE_DATA = crate::doc::NotDefinedHere;
}
}

/// See [winapi::um](https://docs.rs/winapi/*/winapi/um/index.html).
pub mod um {
/// See [winapi::um::winbase](https://docs.rs/winapi/*/winapi/um/winbase/index.html).
#[allow(non_camel_case_types)]
pub mod winbase {
/// See [winapi::um::winbase::PIPE_TYPE_MESSAGE][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_TYPE_MESSAGE.html
pub type PIPE_TYPE_MESSAGE = crate::doc::NotDefinedHere;

/// See [winapi::um::winbase::PIPE_TYPE_BYTE][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_TYPE_BYTE.html
pub type PIPE_TYPE_BYTE = crate::doc::NotDefinedHere;

/// See [winapi::um::winbase::PIPE_CLIENT_END][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_CLIENT_END.html
pub type PIPE_CLIENT_END = crate::doc::NotDefinedHere;

/// See [winapi::um::winbase::PIPE_SERVER_END][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_SERVER_END.html
pub type PIPE_SERVER_END = crate::doc::NotDefinedHere;

/// See [winapi::um::winbase::SECURITY_IDENTIFICATION][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.SECURITY_IDENTIFICATION.html
pub type SECURITY_IDENTIFICATION = crate::doc::NotDefinedHere;
}

/// See [winapi::um::minwinbase](https://docs.rs/winapi/*/winapi/um/minwinbase/index.html).
#[allow(non_camel_case_types)]
pub mod minwinbase {
/// See [winapi::um::minwinbase::SECURITY_ATTRIBUTES][winapi]
///
/// [winapi]: https://docs.rs/winapi/*/winapi/um/minwinbase/constant.SECURITY_ATTRIBUTES.html
pub type SECURITY_ATTRIBUTES = crate::doc::NotDefinedHere;
}
}
22 changes: 22 additions & 0 deletions tokio/src/lib.rs
Expand Up @@ -442,6 +442,28 @@ mod util;
/// ```
pub mod stream {}

// local re-exports of platform specific things, allowing for decent
// documentation to be shimmed in on docs.rs

#[cfg(docsrs)]
pub mod doc;

#[cfg(docsrs)]
#[allow(unused)]
pub(crate) use self::doc::os;

#[cfg(not(docsrs))]
#[allow(unused)]
pub(crate) use std::os;

#[cfg(docsrs)]
#[allow(unused)]
pub(crate) use self::doc::winapi;

#[cfg(all(not(docsrs), windows, feature = "net"))]
#[allow(unused)]
pub(crate) use ::winapi;

cfg_macros! {
/// Implementation detail of the `select!` macro. This macro is **not**
/// intended to be used as part of the public API and is permitted to
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -182,6 +182,16 @@ macro_rules! cfg_net_unix {
}
}

macro_rules! cfg_net_windows {
($($item:item)*) => {
$(
#[cfg(all(any(docsrs, windows), feature = "net"))]
#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "net"))))]
$item
)*
}
}

macro_rules! cfg_process {
($($item:item)*) => {
$(
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/net/mod.rs
Expand Up @@ -46,3 +46,7 @@ cfg_net_unix! {
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
}

cfg_net_windows! {
pub mod windows;
}
3 changes: 3 additions & 0 deletions tokio/src/net/windows/mod.rs
@@ -0,0 +1,3 @@
//! Windows specific network types.

pub mod named_pipe;

0 comments on commit 97e7830

Please sign in to comment.