Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to hook into Windows IOCP loop #1345

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -43,8 +43,9 @@ libc = "0.2.69"

[target.'cfg(windows)'.dependencies]
miow = "0.3.3"
winapi = { version = "0.3", features = ["winsock2", "mswsock"] }
winapi = { version = "0.3", features = ["winsock2", "mswsock", "impl-default", "errhandlingapi"] }
ntapi = "0.3"
slab = "0.4"

[dev-dependencies]
env_logger = { version = "0.6.2", default-features = false }
Expand Down
62 changes: 62 additions & 0 deletions src/lib.rs
Expand Up @@ -89,6 +89,68 @@ pub mod unix {
pub use crate::sys::SourceFd;
}

cfg_any_os_util! {
/// Windows-only extensions to the mio crate.
///
/// Mio on windows is currently implemented with IOCP for a high-performance
/// implementation of asynchronous I/O. On top of the IOCP, a connection to the "Ancillary Function
/// Driver" provides socket services. However, this approach doesn't work for other handle types
/// like files and named pipes. They need to be connected directly to the IO completion port like
/// this is done on unix-like systems with the `SourceFd` type. The purpose of this module is to
/// similarly provide a mechanism for foreign I/O types to get hooked up into the IOCP event loop.
///
/// This module provides types for interfacing with a custom IOCP handle:
///
/// * `Binding` - this type is intended to govern binding with mio's `Registry`
/// type. Each I/O object should contain an instance of `Binding` that's
/// interfaced with for the implementation of the `Source` trait. The
/// `register`, `reregister`, and `deregister` methods for the `Source` trait
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that Binding only has register_handle, how would I implement de/re-register?

Or this is that part simply not implemented yet?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not implemented yet because I'm unsure how that should/could be done as in Windows, there is no way of removing a handle from an IOCP except by closing that handle. So the only thing we should do here is to remove the handler from the list of IO handlers, which should be done by a Drop implementation of that type. reregistration would only support changing the token; I will add both functionalities.

/// may use methods provided by `Binding` during their operations.
///
/// Also note that for types which represent streams of bytes the mio
/// interface of *readiness* doesn't map directly to the Windows model of
/// *completion*. This means that types will have to perform internal
/// buffering to ensure that a readiness interface can be provided.
///
/// * `Overlapped` - this type is intended to be used as the concrete instances
/// of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for
/// safety, that all asynchronous operations are initiated with an instance of
/// `Overlapped` and not another instantiation of `OVERLAPPED`.
///
/// Mio's `Overlapped` type is created with an object implementing `CompletionHandler` that
/// is notified on completion of the associated asynchronous operation. The provided
/// `OVERLAPPED_ENTRY` type is defined in the `winapi` crate. Whenever a completion is posted to
/// an IOCP object the `OVERLAPPED` that was signaled will be interpreted as
/// `Overlapped` in the mio crate and the `CompletionHandler` will be invoked.
/// Through this object, and through the `OVERLAPPED` pointer, implementations can handle
/// management of I/O events.
///
/// * `Readiness` - this type is used to indicate whether an IOCP client is ready to start the next
/// IO operation. This is done either by returning a `Readiness` value from the
/// `CompletionHandler` or by using `Binding::inject_event`.
///
/// * `CompletionHandler` - this trait is implemented by the IOCP client and is used as a callback
/// when an asynchronous operation has finished. Currently it only implements one callback but may
/// be extended in the future.
///
/// When put together these types enable custom Windows handles to be
/// registered with mio's event loops. The `Binding` type is used to associate
/// handles and the `Overlapped` type is used to execute I/O operations. When
/// the I/O operations are completed an object implementing `CompletionHandler` is called which
/// typically emits a `Readiness` that is then reported to the user who then may start the next
/// asynchronous operation.
#[cfg(windows)]
pub mod windows {
//! Windows only extensions.
pub use crate::sys::{
Binding,
CompletionCallback,
Overlapped,
Readiness,
};
}
}

// Enable with `cargo doc --features extra-docs`.
#[cfg(feature = "extra-docs")]
pub mod features {
Expand Down
11 changes: 11 additions & 0 deletions src/macros/mod.rs
Expand Up @@ -70,6 +70,17 @@ macro_rules! cfg_udp {
}
}

/// Feature `os-util` enabled.
macro_rules! cfg_os_util {
($($item:item)*) => {
$(
#[cfg(feature = "os-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "os-util")))]
$item
)*
}
}

/// Feature `uds` enabled.
#[cfg(unix)]
macro_rules! cfg_uds {
Expand Down
22 changes: 21 additions & 1 deletion src/sys/mod.rs
Expand Up @@ -80,7 +80,27 @@ cfg_os_poll! {
#[cfg(windows)]
cfg_os_poll! {
mod windows;
pub(crate) use self::windows::*;
pub(crate) use self::windows::{Waker, Event, Events, event, Selector};
cfg_any_os_util! {
pub use self::windows::{
Binding,
CompletionCallback,
Overlapped,
Readiness,
};
}

cfg_net! {
pub(crate) use self::windows::IoSourceState;
}

cfg_tcp! {
pub(crate) use self::windows::tcp;
}

cfg_udp! {
pub(crate) use self::windows::udp;
}
}

cfg_not_os_poll! {
Expand Down
11 changes: 4 additions & 7 deletions src/sys/windows/afd.rs
Expand Up @@ -114,17 +114,17 @@ impl Afd {
}

cfg_net! {
use miow::iocp::CompletionPort;
use ntapi::ntioapi::FILE_OPEN;
use ntapi::ntioapi::NtCreateFile;
use std::mem::zeroed;
use std::os::windows::io::{FromRawHandle, RawHandle};
use std::sync::atomic::{AtomicUsize, Ordering};
use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR};
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE};
use winapi::um::winnt::SYNCHRONIZE;
use winapi::um::winnt::{FILE_SHARE_READ, FILE_SHARE_WRITE};
use super::iocp_handler::IocpHandlerRegistry;
use super::selector::AfdCompletionPortEventHandler;

const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES {
Length: size_of::<OBJECT_ATTRIBUTES>() as ULONG,
Expand Down Expand Up @@ -159,8 +159,6 @@ cfg_net! {
'o' as _
];

static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);

impl AfdPollInfo {
pub fn zeroed() -> AfdPollInfo {
unsafe { zeroed() }
Expand All @@ -169,7 +167,7 @@ cfg_net! {

impl Afd {
/// Create new Afd instance.
pub fn new(cp: &CompletionPort) -> io::Result<Afd> {
pub(crate) fn new(cp: &IocpHandlerRegistry, handler: AfdCompletionPortEventHandler) -> io::Result<Afd> {
let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE;
let mut iosb = IO_STATUS_BLOCK {
u: IO_STATUS_BLOCK_u { Status: 0 },
Expand All @@ -196,9 +194,8 @@ cfg_net! {
));
}
let fd = File::from_raw_handle(afd_helper_handle as RawHandle);
let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1;
let afd = Afd { fd };
cp.add_handle(token, &afd.fd)?;
cp.register_handle(&afd.fd, handler.into())?;
match SetFileCompletionNotificationModes(
afd_helper_handle,
FILE_SKIP_SET_EVENT_ON_HANDLE,
Expand Down
1 change: 1 addition & 0 deletions src/sys/windows/event.rs
Expand Up @@ -5,6 +5,7 @@ use miow::iocp::CompletionStatus;
use super::afd;
use crate::Token;

#[derive(Debug)]
pub struct Event {
pub flags: u32,
pub data: u64,
Expand Down
182 changes: 182 additions & 0 deletions src/sys/windows/iocp_handler.rs
@@ -0,0 +1,182 @@
use std::{
sync::{Arc, Mutex},
time::Duration,
io,
fmt
};

use winapi::shared::winerror;
use miow::{
Overlapped,
iocp::{
CompletionPort,
CompletionStatus
}
};

use slab::Slab;

use crate::{
Token,
sys::windows::{
Event,
afd,
selector::AfdCompletionPortEventHandler,
},
};

#[cfg(feature = "os-util")]
use crate::sys::windows::selector::RawHandleCompletionHandler;

pub trait IocpHandler: fmt::Debug + Send + Sync + 'static {
fn handle_completion(&mut self, status: &CompletionStatus) -> Option<Event>;
fn on_poll_finished(&mut self) { }
}

#[derive(Debug)]
pub(crate) enum RegisteredHandler {
AfdHandler(AfdCompletionPortEventHandler),
WakerHandler(WakerHandler),
#[cfg(feature = "os-util")]
RawHandleHandler(RawHandleCompletionHandler)
}

impl From<AfdCompletionPortEventHandler> for RegisteredHandler {
fn from(h: AfdCompletionPortEventHandler) -> Self {
RegisteredHandler::AfdHandler(h)
}
}

impl From<WakerHandler> for RegisteredHandler {
fn from(h: WakerHandler) -> Self {
RegisteredHandler::WakerHandler(h)
}
}

#[cfg(feature = "os-util")]
impl From<RawHandleCompletionHandler> for RegisteredHandler {
fn from(h: RawHandleCompletionHandler) -> Self {
RegisteredHandler::RawHandleHandler(h)
}
}

impl IocpHandler for RegisteredHandler {
fn handle_completion(&mut self, status: &CompletionStatus) -> Option<Event> {
match self {
RegisteredHandler::AfdHandler(handler) => handler.handle_completion(status),
RegisteredHandler::WakerHandler(handler) => handler.handle_completion(status),
#[cfg(feature = "os-util")]
RegisteredHandler::RawHandleHandler(handler) => handler.handle_completion(status),
}
}

fn on_poll_finished(&mut self) {
match self {
RegisteredHandler::AfdHandler(handler) => handler.on_poll_finished(),
RegisteredHandler::WakerHandler(handler) => handler.on_poll_finished(),
#[cfg(feature = "os-util")]
RegisteredHandler::RawHandleHandler(handler) => handler.on_poll_finished(),
}
}
}

#[derive(Debug)]
pub struct IocpWaker {
token: usize,
iocp_registry: Arc<IocpHandlerRegistry>,
}

#[derive(Debug)]
pub(crate) struct WakerHandler {
external_token: Token,
}

impl IocpHandler for WakerHandler {
fn handle_completion(&mut self, _status: &CompletionStatus) -> Option<Event> {
Some(Event {
flags: afd::POLL_RECEIVE,
data: self.external_token.0 as u64
})
}
}

impl IocpWaker {
pub fn post(&self, bytes: u32, overlapped: *mut Overlapped) -> io::Result<()> {
self.iocp_registry.cp.post(CompletionStatus::new(bytes, self.token, overlapped))
}
}

#[derive(Debug)]
pub struct IocpHandlerRegistry {
cp: CompletionPort,
handlers: Mutex<Slab<RegisteredHandler>>,
}

impl IocpHandlerRegistry {
pub fn new() -> io::Result<Self> {
CompletionPort::new(0).map(|cp|
Self {
cp,
handlers: Mutex::new(Slab::new())
})
}

pub fn register_waker(self: Arc<Self>, token: Token) -> IocpWaker {
let handler = WakerHandler {
external_token: token
};
let slab_token = self.handlers.lock().unwrap()
.insert(handler.into());
IocpWaker {
token: slab_token,
iocp_registry: self
}
}

pub fn handle_pending_events(&self,
statuses: &mut [CompletionStatus],
mut events: Option<&mut Vec<Event>>,
timeout: Option<Duration>) -> io::Result<usize> {
let result = match self.cp.get_many(statuses, timeout) {
Ok(iocp_events) => {
let mut num_events = 0;
let mut handlers = self.handlers.lock().unwrap();
for status in iocp_events {
let key = status.token();
if let Some(handler) = handlers.get_mut(key) {
if let Some(event) = handler.handle_completion(status) {
if let Some(events) = &mut events {
events.push(event);
}
num_events += 1;
}
}
}

Ok(num_events)
},

Err(ref e) if e.raw_os_error() == Some(winerror::WAIT_TIMEOUT as i32) => Ok(0),

Err(e) => Err(e)
};

for (_, handler) in self.handlers.lock().unwrap().iter_mut() {
handler.on_poll_finished();
}

result
}
}

cfg_any_os_util! {
use std::os::windows::io::AsRawHandle;

impl IocpHandlerRegistry {
pub(crate) fn register_handle<T>(&self, handle: &T, handler: RegisteredHandler) -> io::Result<()>
where T: AsRawHandle + ?Sized {
let token = self.handlers.lock().unwrap().insert(handler);
self.cp.add_handle(token, handle)
}
}
}
6 changes: 6 additions & 0 deletions src/sys/windows/mod.rs
Expand Up @@ -7,6 +7,12 @@ pub use event::{Event, Events};
mod selector;
pub use selector::{Selector, SelectorInner, SockState};

mod iocp_handler;

cfg_any_os_util! {
pub use selector::{CompletionCallback, Overlapped, Readiness, Binding};
}

// Macros must be defined before the modules that use them
cfg_net! {
/// Helper macro to execute a system call that returns an `io::Result`.
Expand Down