From 52e8c2220e87696d20f13561402bcaabba4136ed Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 1 Oct 2020 20:31:22 -0700 Subject: [PATCH] Add windows NamedPipe (#1351) Adds a NamedPipe implementation for windows. It is based on the IOCP API. To bridge the IOCP model with the readiness model, buffers are maintained internally. Writes are first written to the internal buffer and the buffer is submitted to IOCP. The internal read buffer is submitted to IOCP to be filled. Once it is filled the pipe becomes ready and the user can read from it. --- Cargo.toml | 1 + src/lib.rs | 10 + src/sys/mod.rs | 2 +- src/sys/windows/afd.rs | 10 +- src/sys/windows/event.rs | 29 ++ src/sys/windows/mod.rs | 6 + src/sys/windows/named_pipe.rs | 709 ++++++++++++++++++++++++++++++++++ src/sys/windows/overlapped.rs | 37 ++ src/sys/windows/selector.rs | 42 +- src/sys/windows/waker.rs | 11 +- tests/win_named_pipe.rs | 314 +++++++++++++++ 11 files changed, 1146 insertions(+), 25 deletions(-) create mode 100644 src/sys/windows/named_pipe.rs create mode 100644 src/sys/windows/overlapped.rs create mode 100644 tests/win_named_pipe.rs diff --git a/Cargo.toml b/Cargo.toml index e0e1a21b4..6d0b02b5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ ntapi = "0.3" [dev-dependencies] env_logger = { version = "0.6.2", default-features = false } net2 = "0.2.33" +rand = "0.4" [package.metadata.docs.rs] all-features = true diff --git a/src/lib.rs b/src/lib.rs index daa5c4fec..a3eb12a75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,16 @@ pub mod unix { pub use crate::sys::SourceFd; } +#[cfg(all(windows, feature = "os-util"))] +#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "os-util"))))] +pub mod windows { + //! Windows only extensions. + + cfg_os_poll! { + pub use crate::sys::named_pipe::NamedPipe; + } +} + // Enable with `cargo doc --features extra-docs`. #[cfg(feature = "extra-docs")] pub mod features { diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 1359181f4..885233331 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -80,7 +80,7 @@ cfg_os_poll! { #[cfg(windows)] cfg_os_poll! { mod windows; - pub(crate) use self::windows::*; + pub use self::windows::*; } cfg_not_os_poll! { diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 43d443c15..82c8e9ed7 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -7,9 +7,7 @@ use std::io; use std::mem::size_of; use std::os::windows::io::AsRawHandle; use std::ptr::null_mut; -use winapi::shared::ntdef::{ - HANDLE, LARGE_INTEGER, NTSTATUS, PVOID, ULONG, -}; +use winapi::shared::ntdef::{HANDLE, LARGE_INTEGER, NTSTATUS, PVOID, ULONG}; use winapi::shared::ntstatus::{STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS}; const IOCTL_AFD_POLL: ULONG = 0x00012024; @@ -196,7 +194,11 @@ cfg_net! { )); } let fd = File::from_raw_handle(afd_helper_handle as RawHandle); - let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1; + // Increment by 2 to reserve space for other types of handles. + // Non-AFD types (currently only NamedPipe), use odd numbered + // tokens. This allows the selector to differentate between them + // and dispatch events accordingly. + let token = NEXT_TOKEN.fetch_add(2, Ordering::Relaxed) + 2; let afd = Afd { fd }; cp.add_handle(token, &afd.fd)?; match SetFileCompletionNotificationModes( diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index b3412551d..235074a10 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -14,6 +14,35 @@ pub fn token(event: &Event) -> Token { Token(event.data as usize) } +impl Event { + pub(super) fn new(token: Token) -> Event { + Event { + flags: 0, + data: usize::from(token) as u64, + } + } + + pub(super) fn set_readable(&mut self) { + self.flags |= afd::POLL_RECEIVE + } + + #[cfg(feature = "os-util")] + pub(super) fn set_writable(&mut self) { + self.flags |= afd::POLL_SEND; + } + + pub(super) fn from_completion_status(status: &CompletionStatus) -> Event { + Event { + flags: status.bytes_transferred(), + data: status.token() as u64, + } + } + + pub(super) fn to_completion_status(&self) -> CompletionStatus { + CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut()) + } +} + pub(crate) const READABLE_FLAGS: u32 = afd::POLL_RECEIVE | afd::POLL_DISCONNECT | afd::POLL_ACCEPT diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index e1f48038d..7bba6dda2 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -7,6 +7,9 @@ pub use event::{Event, Events}; mod selector; pub use selector::{Selector, SelectorInner, SockState}; +mod overlapped; +use overlapped::Overlapped; + // Macros must be defined before the modules that use them cfg_net! { /// Helper macro to execute a system call that returns an `io::Result`. @@ -32,6 +35,9 @@ cfg_udp! { pub(crate) mod udp; } +#[cfg(feature = "os-util")] +pub(crate) mod named_pipe; + mod waker; pub(crate) use waker::Waker; diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs new file mode 100644 index 000000000..a5688ce90 --- /dev/null +++ b/src/sys/windows/named_pipe.rs @@ -0,0 +1,709 @@ +use crate::{poll, Registry}; +use crate::event::Source; +use crate::sys::windows::{Event, Overlapped}; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; + +use std::ffi::OsStr; +use std::fmt; +use std::io::{self, Read, Write}; +use std::mem; +use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; +use std::slice; +use std::sync::atomic::{AtomicUsize, AtomicBool}; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, Mutex}; + +use crate::{Interest, Token}; +use miow::iocp::{CompletionPort, CompletionStatus}; +use miow::pipe; +use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING}; +use winapi::um::ioapiset::CancelIoEx; + +/// # Safety +/// +/// Only valid if the strict is annotated with `#[repr(C)]`. This is only used +/// with `Overlapped` and `Inner`, which are correctly annotated. +macro_rules! offset_of { + ($t:ty, $($field:ident).+) => ( + &(*(0 as *const $t)).$($field).+ as *const _ as usize + ) +} + +macro_rules! overlapped2arc { + ($e:expr, $t:ty, $($field:ident).+) => ({ + let offset = offset_of!($t, $($field).+); + debug_assert!(offset < mem::size_of::<$t>()); + Arc::from_raw(($e as usize - offset) as *mut $t) + }) +} + +/// Non-blocking windows named pipe. +/// +/// This structure internally contains a `HANDLE` which represents the named +/// pipe, and also maintains state associated with the mio event loop and active +/// I/O operations that have been scheduled to translate IOCP to a readiness +/// model. +/// +/// Note, IOCP is a *completion* based model whereas mio is a *readiness* based +/// model. To bridge this, `NamedPipe` performs internal buffering. Writes are +/// written to an internal buffer and the buffer is submitted to IOCP. IOCP +/// reads are submitted using internal buffers and `NamedPipe::read` reads from +/// this internal buffer. +/// +/// # Trait implementations +/// +/// The `Read` and `Write` traits are implemented for `NamedPipe` and for +/// `&NamedPipe`. This represents that a named pipe can be concurrently read and +/// written to and also can be read and written to at all. Typically a named +/// pipe needs to be connected to a client before it can be read or written, +/// however. +/// +/// Note that for I/O operations on a named pipe to succeed then the named pipe +/// needs to be associated with an event loop. Until this happens all I/O +/// operations will return a "would block" error. +/// +/// # Managing connections +/// +/// The `NamedPipe` type supports a `connect` method to connect to a client and +/// a `disconnect` method to disconnect from that client. These two methods only +/// work once a named pipe is associated with an event loop. +/// +/// The `connect` method will succeed asynchronously and a completion can be +/// detected once the object receives a writable notification. +/// +/// # Named pipe clients +/// +/// Currently to create a client of a named pipe server then you can use the +/// `OpenOptions` type in the standard library to create a `File` that connects +/// to a named pipe. Afterwards you can use the `into_raw_handle` method coupled +/// with the `NamedPipe::from_raw_handle` method to convert that to a named pipe +/// that can operate asynchronously. Don't forget to pass the +/// `FILE_FLAG_OVERLAPPED` flag when opening the `File`. +pub struct NamedPipe { + inner: Arc, +} + +#[repr(C)] +struct Inner { + handle: pipe::NamedPipe, + + connect: Overlapped, + connecting: AtomicBool, + + read: Overlapped, + write: Overlapped, + + io: Mutex, + + pool: Mutex, +} + +struct Io { + // Uniquely identifies the selector associated with this named pipe + cp: Option>, + // Token used to identify events + token: Option, + read: State, + read_interest: bool, + write: State, + write_interest: bool, + connect_error: Option, +} + +#[derive(Debug)] +enum State { + None, + Pending(Vec, usize), + Ok(Vec, usize), + Err(io::Error), +} + +// Odd tokens are for named pipes +static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(1); + +fn would_block() -> io::Error { + io::ErrorKind::WouldBlock.into() +} + +impl NamedPipe { + /// Creates a new named pipe at the specified `addr` given a "reasonable + /// set" of initial configuration options. + pub fn new>( + addr: A, + ) -> io::Result { + let pipe = pipe::NamedPipe::new(addr)?; + // Safety: nothing actually unsafe about this. The trait fn includes + // `unsafe`. + Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) }) + } + + /// Attempts to call `ConnectNamedPipe`, if possible. + /// + /// This function will attempt to connect this pipe to a client in an + /// asynchronous fashion. If the function immediately establishes a + /// connection to a client then `Ok(())` is returned. Otherwise if a + /// connection attempt was issued and is now in progress then a "would + /// block" error is returned. + /// + /// When the connection is finished then this object will be flagged as + /// being ready for a write, or otherwise in the writable state. + /// + /// # Errors + /// + /// This function will return a "would block" error if the pipe has not yet + /// been registered with an event loop, if the connection operation has + /// previously been issued but has not yet completed, or if the connect + /// itself was issued and didn't finish immediately. + /// + /// Normal I/O errors from the call to `ConnectNamedPipe` are returned + /// immediately. + pub fn connect(&self) -> io::Result<()> { + // "Acquire the connecting lock" or otherwise just make sure we're the + // only operation that's using the `connect` overlapped instance. + if self.inner.connecting.swap(true, SeqCst) { + return Err(would_block()); + } + + // Now that we've flagged ourselves in the connecting state, issue the + // connection attempt. Afterwards interpret the return value and set + // internal state accordingly. + let res = unsafe { + let overlapped = self.inner.connect.as_ptr() as *mut _; + self.inner.handle.connect_overlapped(overlapped) + }; + + match res { + // The connection operation finished immediately, so let's schedule + // reads/writes and such. + Ok(true) => { + self.inner.connecting.store(false, SeqCst); + Inner::post_register(&self.inner, None); + Ok(()) + } + + // If the overlapped operation was successful and didn't finish + // immediately then we forget a copy of the arc we hold + // internally. This ensures that when the completion status comes + // in for the I/O operation finishing it'll have a reference + // associated with it and our data will still be valid. The + // `connect_done` function will "reify" this forgotten pointer to + // drop the refcount on the other side. + Ok(false) => { + mem::forget(self.inner.clone()); + Err(would_block()) + } + + Err(e) => { + self.inner.connecting.store(false, SeqCst); + Err(e) + } + } + } + + /// Takes any internal error that has happened after the last I/O operation + /// which hasn't been retrieved yet. + /// + /// This is particularly useful when detecting failed attempts to `connect`. + /// After a completed `connect` flags this pipe as writable then callers + /// must invoke this method to determine whether the connection actually + /// succeeded. If this function returns `None` then a client is connected, + /// otherwise it returns an error of what happened and a client shouldn't be + /// connected. + pub fn take_error(&self) -> io::Result> { + Ok(self.inner.io.lock().unwrap().connect_error.take()) + } + + /// Disconnects this named pipe from a connected client. + /// + /// This function will disconnect the pipe from a connected client, if any, + /// transitively calling the `DisconnectNamedPipe` function. + /// + /// After a `disconnect` is issued, then a `connect` may be called again to + /// connect to another client. + pub fn disconnect(&self) -> io::Result<()> { + self.inner.handle.disconnect() + } +} + +impl FromRawHandle for NamedPipe { + unsafe fn from_raw_handle( + handle: RawHandle, + ) -> NamedPipe { + NamedPipe { + inner: Arc::new(Inner { + // Safety: not really unsafe + handle: pipe::NamedPipe::from_raw_handle(handle), + // transmutes to straddle winapi versions (mio 0.6 is on an + // older winapi) + connect: Overlapped::new(connect_done), + connecting: AtomicBool::new(false), + read: Overlapped::new(read_done), + write: Overlapped::new(write_done), + io: Mutex::new(Io { + cp: None, + token: None, + read: State::None, + read_interest: false, + write: State::None, + write_interest: false, + connect_error: None, + }), + pool: Mutex::new(BufferPool::with_capacity(2)), + }), + } + } +} + +impl Read for NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + <&NamedPipe as Read>::read(&mut &*self, buf) + } +} + +impl Write for NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result { + <&NamedPipe as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&NamedPipe as Write>::flush(&mut &*self) + } +} + +impl<'a> Read for &'a NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut state = self.inner.io.lock().unwrap(); + + if state.token.is_none() { + return Err(would_block()); + } + + match mem::replace(&mut state.read, State::None) { + // In theory not possible with `token` checked above, + // but return would block for now. + State::None => { + Err(would_block()) + } + + // A read is in flight, still waiting for it to finish + State::Pending(buf, amt) => { + state.read = State::Pending(buf, amt); + Err(would_block()) + } + + // We previously read something into `data`, try to copy out some + // data. If we copy out all the data schedule a new read and + // otherwise store the buffer to get read later. + State::Ok(data, cur) => { + let n = { + let mut remaining = &data[cur..]; + remaining.read(buf)? + }; + let next = cur + n; + if next != data.len() { + state.read = State::Ok(data, next); + } else { + self.inner.put_buffer(data); + Inner::schedule_read(&self.inner, &mut state, None); + } + Ok(n) + } + + // Looks like an in-flight read hit an error, return that here while + // we schedule a new one. + State::Err(e) => { + Inner::schedule_read(&self.inner, &mut state, None); + if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } +} + +impl<'a> Write for &'a NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result { + // Make sure there's no writes pending + let mut io = self.inner.io.lock().unwrap(); + + if io.token.is_none() { + return Err(would_block()); + } + + match io.write { + State::None => {} + _ => { + return Err(would_block()); + } + } + + // Move `buf` onto the heap and fire off the write + let mut owned_buf = self.inner.get_buffer(); + owned_buf.extend(buf); + Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Source for NamedPipe { + fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> { + let mut io = self.inner.io.lock().unwrap(); + + io.check_association(registry, false)?; + + if io.token.is_some() { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "I/O source already registered with a `Registry`", + )); + } + + if io.cp.is_none() { + io.cp = Some(poll::selector(registry).clone_port()); + + let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2; + poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?; + } + + io.token = Some(token); + io.read_interest = interest.is_readable(); + io.write_interest = interest.is_writable(); + drop(io); + + Inner::post_register(&self.inner, None); + + Ok(()) + } + + fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> { + let mut io = self.inner.io.lock().unwrap(); + + io.check_association(registry, true)?; + + io.token = Some(token); + io.read_interest = interest.is_readable(); + io.write_interest = interest.is_writable(); + drop(io); + + Inner::post_register(&self.inner, None); + + Ok(()) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + let mut io = self.inner.io.lock().unwrap(); + + io.check_association(registry, true)?; + + if io.token.is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "I/O source not registered with `Registry`", + )); + } + + io.token = None; + Ok(()) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> RawHandle { + self.inner.handle.as_raw_handle() + } +} + +impl fmt::Debug for NamedPipe { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.handle.fmt(f) + } +} + +impl Drop for NamedPipe { + fn drop(&mut self) { + // Cancel pending reads/connects, but don't cancel writes to ensure that + // everything is flushed out. + unsafe { + if self.inner.connecting.load(SeqCst) { + drop(cancel(&self.inner.handle, &self.inner.connect)); + } + + let io = self.inner.io.lock().unwrap(); + + match io.read { + State::Pending(..) => { + drop(cancel(&self.inner.handle, &self.inner.read)); + } + _ => {} + } + } + } +} + +impl Inner { + /// Schedules a read to happen in the background, executing an overlapped + /// operation. + /// + /// This function returns `true` if a normal error happens or if the read + /// is scheduled in the background. If the pipe is no longer connected + /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is + /// scheduled. + fn schedule_read(me: &Arc, io: &mut Io, events: Option<&mut Vec>) -> bool { + // Check to see if a read is already scheduled/completed + match io.read { + State::None => {} + _ => return true, + } + + // Allocate a buffer and schedule the read. + let mut buf = me.get_buffer(); + let e = unsafe { + let overlapped = me.read.as_ptr() as *mut _; + let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); + me.handle.read_overlapped(slice, overlapped) + }; + + match e { + // See `NamedPipe::connect` above for the rationale behind `forget` + Ok(_) => { + io.read = State::Pending(buf, 0); // 0 is ignored on read side + mem::forget(me.clone()); + true + } + + // If ERROR_PIPE_LISTENING happens then it's not a real read error, + // we just need to wait for a connect. + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false, + + // If some other error happened, though, we're now readable to give + // out the error. + Err(e) => { + io.read = State::Err(e); + io.notify_readable(events); + true + } + } + } + + fn schedule_write(me: &Arc, buf: Vec, pos: usize, io: &mut Io, events: Option<&mut Vec>) { + // Very similar to `schedule_read` above, just done for the write half. + let e = unsafe { + let overlapped = me.write.as_ptr() as *mut _; + me.handle.write_overlapped(&buf[pos..], overlapped) + }; + + match e { + // See `connect` above for the rationale behind `forget` + Ok(_) => { + io.write = State::Pending(buf, pos); + mem::forget(me.clone()) + } + Err(e) => { + io.write = State::Err(e); + io.notify_writable(events); + } + } + } + + fn post_register(me: &Arc, mut events: Option<&mut Vec>) { + let mut io = me.io.lock().unwrap(); + if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) { + if let State::None = io.write { + io.notify_writable(events); + } + } + } + + fn get_buffer(&self) -> Vec { + self.pool.lock().unwrap().get(4 * 1024) + } + + fn put_buffer(&self, buf: Vec) { + self.pool.lock().unwrap().put(buf) + } +} + +unsafe fn cancel(handle: &T, overlapped: &Overlapped) -> io::Result<()> { + let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _); + // `CancelIoEx` returns 0 on error: + // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { + let status = CompletionStatus::from_entry(status); + + // Acquire the `Arc`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `connect` above. + let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) }; + + // Flag ourselves as no longer using the `connect` overlapped instances. + let prev = me.connecting.swap(false, SeqCst); + assert!(prev, "NamedPipe was not previously connecting"); + + // Stash away our connect error if one happened + debug_assert_eq!(status.bytes_transferred(), 0); + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => debug_assert_eq!(n, 0), + Err(e) => me.io.lock().unwrap().connect_error = Some(e), + } + } + + // We essentially just finished a registration, so kick off a + // read and register write readiness. + Inner::post_register(&me, events); +} + +fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { + let status = CompletionStatus::from_entry(status); + + // Acquire the `FromRawArc`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `schedule_read` above. + let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) }; + + // Move from the `Pending` to `Ok` state. + let mut io = me.io.lock().unwrap(); + let mut buf = match mem::replace(&mut io.read, State::None) { + State::Pending(buf, _) => buf, + _ => unreachable!(), + }; + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => { + debug_assert_eq!(status.bytes_transferred() as usize, n); + buf.set_len(status.bytes_transferred() as usize); + io.read = State::Ok(buf, 0); + } + Err(e) => { + debug_assert_eq!(status.bytes_transferred(), 0); + io.read = State::Err(e); + } + } + } + + // Flag our readiness that we've got data. + io.notify_readable(events); +} + +fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { + let status = CompletionStatus::from_entry(status); + + // Acquire the `Arc`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `schedule_write` above. + let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) }; + + // Make the state change out of `Pending`. If we wrote the entire buffer + // then we're writable again and otherwise we schedule another write. + let mut io = me.io.lock().unwrap(); + let (buf, pos) = match mem::replace(&mut io.write, State::None) { + State::Pending(buf, pos) => (buf, pos), + _ => unreachable!(), + }; + + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => { + debug_assert_eq!(status.bytes_transferred() as usize, n); + let new_pos = pos + (status.bytes_transferred() as usize); + if new_pos == buf.len() { + me.put_buffer(buf); + io.notify_writable(events); + } else { + Inner::schedule_write(&me, buf, new_pos, &mut io, events); + } + } + Err(e) => { + debug_assert_eq!(status.bytes_transferred(), 0); + io.write = State::Err(e); + io.notify_writable(events); + } + } + } +} + +impl Io { + fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> { + match self.cp { + Some(ref cp) if !poll::selector(registry).same_port(cp) => { + Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "I/O source already registered with a different `Registry`" + )) + } + None if required => { + Err(io::Error::new( + io::ErrorKind::NotFound, + "I/O source not registered with `Registry`" + )) + } + _ => Ok(()), + } + } + + fn notify_readable(&self, events: Option<&mut Vec>) { + if let Some(token) = self.token { + let mut ev = Event::new(token); + ev.set_readable(); + + if let Some(events) = events { + events.push(ev); + } else { + let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status()); + } + } + } + + fn notify_writable(&self, events: Option<&mut Vec>) { + if let Some(token) = self.token { + let mut ev = Event::new(token); + ev.set_writable(); + + if let Some(events) = events { + events.push(ev); + } else { + let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status()); + } + } + } +} + +struct BufferPool { + pool: Vec>, +} + +impl BufferPool { + fn with_capacity(cap: usize) -> BufferPool { + BufferPool { + pool: Vec::with_capacity(cap), + } + } + + fn get(&mut self, default_cap: usize) -> Vec { + self.pool + .pop() + .unwrap_or_else(|| Vec::with_capacity(default_cap)) + } + + fn put(&mut self, mut buf: Vec) { + if self.pool.len() < self.pool.capacity() { + unsafe { + buf.set_len(0); + } + self.pool.push(buf); + } + } +} diff --git a/src/sys/windows/overlapped.rs b/src/sys/windows/overlapped.rs new file mode 100644 index 000000000..3708f9ecb --- /dev/null +++ b/src/sys/windows/overlapped.rs @@ -0,0 +1,37 @@ +use crate::sys::windows::Event; + +use std::cell::UnsafeCell; +use std::fmt; + +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +#[cfg(feature = "os-util")] +use winapi::um::minwinbase::OVERLAPPED; + +#[repr(C)] +pub(crate) struct Overlapped { + inner: UnsafeCell, + pub(crate) callback: fn(&OVERLAPPED_ENTRY, Option<&mut Vec>), +} + +#[cfg(feature = "os-util")] +impl Overlapped { + pub(crate) fn new(cb: fn(&OVERLAPPED_ENTRY, Option<&mut Vec>)) -> Overlapped { + Overlapped { + inner: UnsafeCell::new(miow::Overlapped::zero()), + callback: cb, + } + } + + pub(crate) fn as_ptr(&self) -> *const OVERLAPPED { + unsafe { (*self.inner.get()).raw() } + } +} + +impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Overlapped").finish() + } +} + +unsafe impl Send for Overlapped {} +unsafe impl Sync for Overlapped {} diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index b1395ac6c..792a5c55a 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -8,7 +8,6 @@ use crate::sys::Events; use crate::Interest; use miow::iocp::{CompletionPort, CompletionStatus}; -use miow::Overlapped; use std::collections::VecDeque; use std::marker::PhantomPinned; use std::os::windows::io::RawSocket; @@ -18,18 +17,13 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use std::{io, ptr}; +use std::io; use winapi::shared::ntdef::NT_SUCCESS; use winapi::shared::ntdef::{HANDLE, PVOID}; use winapi::shared::ntstatus::STATUS_CANCELLED; use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT}; use winapi::um::minwinbase::OVERLAPPED; -/// Overlapped value to indicate a `Waker` event. -// -// Note: this must be null, `SelectorInner::feed_events` depends on it. -pub const WAKER_OVERLAPPED: *mut Overlapped = ptr::null_mut(); - #[derive(Debug)] struct AfdGroup { cp: Arc, @@ -339,7 +333,7 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, - inner: Arc, + pub(super) inner: Arc, } impl Selector { @@ -374,6 +368,11 @@ impl Selector { pub(super) fn clone_port(&self) -> Arc { self.inner.cp.clone() } + + #[cfg(feature = "os-util")] + pub(super) fn same_port(&self, other: &Arc) -> bool { + Arc::ptr_eq(&self.inner.cp, other) + } } cfg_net! { @@ -408,7 +407,7 @@ cfg_net! { #[derive(Debug)] pub struct SelectorInner { - cp: Arc, + pub(super) cp: Arc, update_queue: Mutex>>>>, afd_group: AfdGroup, is_polling: AtomicBool, @@ -499,13 +498,17 @@ impl SelectorInner { let mut update_queue = self.update_queue.lock().unwrap(); for iocp_event in iocp_events.iter() { if iocp_event.overlapped().is_null() { - // `Waker` event, we'll add a readable event to match the other platforms. - events.push(Event { - flags: afd::POLL_RECEIVE, - data: iocp_event.token() as u64, - }); + events.push(Event::from_completion_status(iocp_event)); n += 1; continue; + } else if iocp_event.token() % 2 == 1 { + // Handle is a named pipe. This could be extended to be any non-AFD event. + let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback; + + let len = events.len(); + callback(iocp_event.entry(), Some(events)); + n += events.len() - len; + continue; } let sock_state = from_overlapped(iocp_event.overlapped()); @@ -696,7 +699,16 @@ impl Drop for SelectorInner { Ok(iocp_events) => { events_num = iocp_events.iter().len(); for iocp_event in iocp_events.iter() { - if !iocp_event.overlapped().is_null() { + if iocp_event.overlapped().is_null() { + // Custom event + } else if iocp_event.token() % 2 == 1 { + // Named pipe, dispatch the event so it can release resources + let callback = unsafe { + (*(iocp_event.overlapped() as *mut super::Overlapped)).callback + }; + + callback(iocp_event.entry(), None); + } else { // drain sock state to release memory of Arc reference let _sock_state = from_overlapped(iocp_event.overlapped()); } diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs index 1b21cf148..ab12c3c68 100644 --- a/src/sys/windows/waker.rs +++ b/src/sys/windows/waker.rs @@ -1,8 +1,8 @@ -use crate::sys::windows::selector::WAKER_OVERLAPPED; +use crate::sys::windows::Event; use crate::sys::windows::Selector; use crate::Token; -use miow::iocp::{CompletionPort, CompletionStatus}; +use miow::iocp::CompletionPort; use std::io; use std::sync::Arc; @@ -21,8 +21,9 @@ impl Waker { } pub fn wake(&self) -> io::Result<()> { - // Keep NULL as Overlapped value to notify waking. - let status = CompletionStatus::new(0, self.token.0, WAKER_OVERLAPPED); - self.port.post(status) + let mut ev = Event::new(self.token); + ev.set_readable(); + + self.port.post(ev.to_completion_status()) } } diff --git a/tests/win_named_pipe.rs b/tests/win_named_pipe.rs new file mode 100644 index 000000000..65ca3ea33 --- /dev/null +++ b/tests/win_named_pipe.rs @@ -0,0 +1,314 @@ +#![cfg(windows)] + +use std::fs::OpenOptions; +use std::io::{self, Read, Write}; +use std::os::windows::fs::OpenOptionsExt; +use std::os::windows::io::{FromRawHandle, IntoRawHandle}; +use std::time::Duration; + +use mio::windows::NamedPipe; +use mio::{Events, Interest, Poll, Token}; +use rand::Rng; +use winapi::um::winbase::FILE_FLAG_OVERLAPPED; + +fn _assert_kinds() { + fn _assert_send() {} + fn _assert_sync() {} + _assert_send::(); + _assert_sync::(); +} + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {}", stringify!($e), e), + } + }; +} + +fn server() -> (NamedPipe, String) { + let num: u64 = rand::thread_rng().gen(); + let name = format!(r"\\.\pipe\my-pipe-{}", num); + let pipe = t!(NamedPipe::new(&name)); + (pipe, name) +} + +fn client(name: &str) -> NamedPipe { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = t!(opts.open(name)); + unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) } +} + +fn pipe() -> (NamedPipe, NamedPipe) { + let (pipe, name) = server(); + (pipe, client(&name)) +} + +#[test] +fn writable_after_register() { + let (mut server, mut client) = pipe(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::WRITABLE | Interest::READABLE, + )); + t!(poll + .registry() + .register(&mut client, Token(1), Interest::WRITABLE)); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, None)); + + let events = events.iter().collect::>(); + assert!(events + .iter() + .any(|e| { e.token() == Token(0) && e.is_writable() })); + assert!(events + .iter() + .any(|e| { e.token() == Token(1) && e.is_writable() })); +} + +#[test] +fn write_then_read() { + let (mut server, mut client) = pipe(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, None)); + + assert_eq!(t!(client.write(b"1234")), 4); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.is_readable() { + break; + } + } + } + + let mut buf = [0; 10]; + assert_eq!(t!(server.read(&mut buf)), 4); + assert_eq!(&buf[..4], b"1234"); +} + +#[test] +fn connect_before_client() { + let (mut server, name) = server(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, Some(Duration::new(0, 0)))); + let e = events.iter().collect::>(); + assert_eq!(e.len(), 0); + assert_eq!( + server.connect().err().unwrap().kind(), + io::ErrorKind::WouldBlock + ); + + let mut client = client(&name); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + loop { + t!(poll.poll(&mut events, None)); + let e = events.iter().collect::>(); + if let Some(event) = e.iter().find(|e| e.token() == Token(0)) { + if event.is_writable() { + break; + } + } + } +} + +#[test] +fn connect_after_client() { + let (mut server, name) = server(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, Some(Duration::new(0, 0)))); + let e = events.iter().collect::>(); + assert_eq!(e.len(), 0); + + let mut client = client(&name); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + t!(server.connect()); + loop { + t!(poll.poll(&mut events, None)); + let e = events.iter().collect::>(); + if let Some(event) = e.iter().find(|e| e.token() == Token(0)) { + if event.is_writable() { + break; + } + } + } +} + +#[test] +fn write_then_drop() { + let (mut server, mut client) = pipe(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + assert_eq!(t!(client.write(b"1234")), 4); + drop(client); + + let mut events = Events::with_capacity(128); + + 'outer: loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + + for event in &events { + if event.is_readable() && event.token() == Token(0) { + break 'outer; + } + } + } + + let mut buf = [0; 10]; + assert_eq!(t!(server.read(&mut buf)), 4); + assert_eq!(&buf[..4], b"1234"); +} + +#[test] +fn connect_twice() { + let (mut server, name) = server(); + let mut c1 = client(&name); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + t!(poll + .registry() + .register(&mut c1, Token(1), Interest::READABLE | Interest::WRITABLE,)); + drop(c1); + + let mut events = Events::with_capacity(128); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.is_readable() { + let mut buf = [0; 10]; + + match server.read(&mut buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Ok(0) => break, + res => panic!("{:?}", res), + } + } + } + } + + t!(server.disconnect()); + assert_eq!( + server.connect().err().unwrap().kind(), + io::ErrorKind::WouldBlock + ); + + let mut c2 = client(&name); + t!(poll + .registry() + .register(&mut c2, Token(2), Interest::READABLE | Interest::WRITABLE,)); + + 'outer: loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + + for event in &events { + if event.is_writable() && event.token() == Token(0) { + break 'outer; + } + } + } +} + +#[test] +fn reregister_deregister_before_register() { + let (mut pipe, _) = server(); + let poll = t!(Poll::new()); + + assert_eq!( + poll.registry() + .reregister(&mut pipe, Token(0), Interest::READABLE,) + .unwrap_err() + .kind(), + io::ErrorKind::NotFound, + ); + + assert_eq!( + poll.registry().deregister(&mut pipe).unwrap_err().kind(), + io::ErrorKind::NotFound, + ); +} + +#[test] +fn reregister_deregister_different_poll() { + let (mut pipe, _) = server(); + let poll1 = t!(Poll::new()); + let poll2 = t!(Poll::new()); + + // Register with 1 + t!(poll1 + .registry() + .register(&mut pipe, Token(0), Interest::READABLE)); + + assert_eq!( + poll2 + .registry() + .reregister(&mut pipe, Token(0), Interest::READABLE,) + .unwrap_err() + .kind(), + io::ErrorKind::AlreadyExists, + ); + + assert_eq!( + poll2.registry().deregister(&mut pipe).unwrap_err().kind(), + io::ErrorKind::AlreadyExists, + ); +}