Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Event<T> type that can be used to implement a WinRT event #1705

Merged
merged 15 commits into from
Apr 19, 2022
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
cargo clippy -p test_does_not_return &&
cargo clippy -p test_enums &&
cargo clippy -p test_error &&
cargo clippy -p test_event &&
cargo clippy -p test_handles &&
cargo clippy -p test_helpers &&
cargo clippy -p test_interop &&
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ jobs:
cargo test --target ${{ matrix.target }} -p test_does_not_return &&
cargo test --target ${{ matrix.target }} -p test_enums &&
cargo test --target ${{ matrix.target }} -p test_error &&
cargo test --target ${{ matrix.target }} -p test_event &&
cargo test --target ${{ matrix.target }} -p test_handles &&
cargo test --target ${{ matrix.target }} -p test_helpers &&
cargo test --target ${{ matrix.target }} -p test_interop &&
Expand Down
8 changes: 3 additions & 5 deletions crates/libs/windows/src/core/agile_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ pub struct AgileReference<T>(IAgileReference, PhantomData<T>);

impl<T: Interface> AgileReference<T> {
/// Creates an agile reference to the object.
pub fn new<'a>(object: &'a T) -> Result<Self>
where
&'a T: IntoParam<'a, IUnknown>,
{
unsafe { RoGetAgileReference(AGILEREFERENCE_DEFAULT, &T::IID, object).map(|reference| Self(reference, Default::default())) }
pub fn new(object: &T) -> Result<Self> {
let unknown: &IUnknown = unsafe { std::mem::transmute(object) };
unsafe { RoGetAgileReference(AGILEREFERENCE_DEFAULT, &T::IID, unknown).map(|reference| Self(reference, Default::default())) }
Comment on lines +12 to +14
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just removes the redundant constraint on the AgileReference constructor. The Interface trait doesn't "know" that it requires IUnknown. That's something I'll try to fix separately.

}

/// Retrieves a proxy to the target of the `AgileReference` object that may safely be used within any thread context in which get is called.
Expand Down
15 changes: 15 additions & 0 deletions crates/libs/windows/src/core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,8 @@ pub unsafe fn CloseHandle<'a, Param0: ::windows::core::IntoParam<'a, HANDLE>>(ho
pub const CO_E_NOTINITIALIZED: ::windows::core::HRESULT = ::windows::core::HRESULT(-2147221008i32);
pub const E_NOINTERFACE: ::windows::core::HRESULT = ::windows::core::HRESULT(-2147467262i32);
pub const E_OUTOFMEMORY: ::windows::core::HRESULT = ::windows::core::HRESULT(-2147024882i32);
pub const RPC_E_DISCONNECTED: ::windows::core::HRESULT = ::windows::core::HRESULT(-2147417848i32);
pub const JSCRIPT_E_CANTEXECUTE: ::windows::core::HRESULT = ::windows::core::HRESULT(-1996357631i32);
pub type FARPROC = ::core::option::Option<unsafe extern "system" fn() -> isize>;
#[inline]
pub unsafe fn GetLastError() -> WIN32_ERROR {
Expand Down Expand Up @@ -1740,6 +1742,19 @@ pub unsafe fn SetErrorInfo<'a, Param1: ::windows::core::IntoParam<'a, IErrorInfo
#[cfg(not(windows))]
unimplemented!("Unsupported target OS");
}
#[inline]
pub unsafe fn EncodePointer(ptr: *const ::core::ffi::c_void) -> *mut ::core::ffi::c_void {
#[cfg(windows)]
{
#[link(name = "windows")]
extern "system" {
fn EncodePointer(ptr: *const ::core::ffi::c_void) -> *mut ::core::ffi::c_void;
}
::core::mem::transmute(EncodePointer(::core::mem::transmute(ptr)))
}
#[cfg(not(windows))]
unimplemented!("Unsupported target OS");
}
#[repr(transparent)]
#[derive(:: core :: cmp :: PartialEq, :: core :: cmp :: Eq)]
pub struct FORMAT_MESSAGE_OPTIONS(pub u32);
Expand Down
254 changes: 254 additions & 0 deletions crates/libs/windows/src/core/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use super::*;
use bindings::*;
use std::sync::*;

/// A type that you can use to declare and implement an event of a specified delegate type.
///
/// The implementation is thread-safe and designed to avoid contention between events being
/// raised and delegates being added or removed.
pub struct Event<T: Interface + Clone> {
swap: Mutex<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the need for the swap and the change lock being different (to avoid contention), but why do you need the swap mutex instead of wrapping delegates array itself in a RWLock?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because both locks govern the same memory and their lifetimes are overlapping. The one handles changes and the other replacements. I don't think a RwLock can easily model this.

change: Mutex<()>,
delegates: Array<T>,
}

impl<T: Interface + Clone> Default for Event<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Interface + Clone> Event<T> {
/// Creates a new, empty `Event<T>`.
pub fn new() -> Self {
Self { delegates: Array::new(), swap: Mutex::default(), change: Mutex::default() }
}
/// Registers a delegate with the event object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be nice to have a space between methods. That's not required by rustfmt, but is the more typical way things are formatted.

pub fn add(&mut self, delegate: &T) -> Result<i64> {
let mut _lock_free_drop = Array::new();
Ok({
let change_lock = self.change.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised you don't get a warning here for an unused variable. I think this should be _change_lock. Similiarly, I think _lock_free_drop shouldn't have the _ prefix since this convention is used for unused bindings not bindings that are of secondary importance like here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think _lock_free_drop shouldn't have the _ prefix

If I remove it, compiler complains with value assigned to lock_free_drop is never read.

let mut new_delegates = Array::with_capacity(self.delegates.len() + 1)?;
for delegate in self.delegates.as_slice() {
new_delegates.push(delegate.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do the delegates have to be cloned instead of the new delegate just being pushed on to the existing delegates array?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid disrupting a concurrent read of the delegates, a new array needs to be created every time. The tension is between a thread adding/removing a delegate and another thread firing the event and thus cycling through the existing array. The vast majority of event sources have either zero or one handler, and this was determined to be the most efficient implementation.

}
let delegate = Delegate::new(delegate);
let token = delegate.to_token();
new_delegates.push(delegate);

let swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(new_delegates);
token
})
}
/// Revokes a delegate's registration from the event object.
pub fn remove(&mut self, token: i64) -> Result<()> {
let mut _lock_free_drop = Array::new();
{
let change_lock = self.change.lock().unwrap();
if self.delegates.is_empty() {
return Ok(());
}
let mut capacity = self.delegates.len() - 1;
let mut new_delegates = Array::new();
let mut removed = false;
if capacity == 0 {
if self.delegates.as_slice()[0].to_token() == token {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we might want to just add indexing directly to the Array type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed = self.delegates.as_slice()[0].to_token() == token;

removed = true;
}
} else {
new_delegates = Array::with_capacity(capacity)?;
for delegate in self.delegates.as_slice() {
if !removed && delegate.to_token() == token {
removed = true;
continue;
}
if capacity == 0 {
debug_assert!(!removed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would add a string message to the debug_assert so it's a bit more straight forward to debug when that condition is met.

break;
}
new_delegates.push(delegate.clone());
capacity -= 1;
}
}
if removed {
let swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(new_delegates);
}
}
Ok(())
}
/// Clears the event, removing all delegates.
pub fn clear(&mut self) {
let mut _lock_free_drop = Array::new();
{
let change_lock = self.change.lock().unwrap();
if self.delegates.is_empty() {
return;
}
let swap_lock = self.swap.lock().unwrap();
_lock_free_drop = self.delegates.swap(Array::new());
}
}
/// Invokes all of the event object's registered delegates with the provided callback.
pub fn call<F: FnMut(&T) -> Result<()>>(&mut self, mut callback: F) -> Result<()> {
let lock_free_calls = {
let swap_lock = self.swap.lock().unwrap();
self.delegates.clone()
};
for delegate in lock_free_calls.as_slice() {
if let Err(error) = delegate.call(&mut callback) {
const RPC_E_SERVER_UNAVAILABLE: HRESULT = HRESULT(-2147023174); // HRESULT_FROM_WIN32(RPC_S_SERVER_UNAVAILABLE)
if matches!(error.code(), RPC_E_DISCONNECTED | JSCRIPT_E_CANTEXECUTE | RPC_E_SERVER_UNAVAILABLE) {
self.remove(delegate.to_token())?;
}
}
}
Ok(())
}
}

/// A thread-safe reference-counted array of delegates.
struct Array<T: Interface + Clone> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typically, the bounds on a generic param are only given where they're needed. So, here we would not constrain T but we would keep the constraint where it's really needed (on the inherit impl and the Drop impl).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that but Rust complains:

    Checking windows v0.37.0 (D:\git\windows-rs\crates\libs\windows)
error[E0367]: `Drop` impl requires `T: interface::Interface` but the struct it is implemented for does not
   --> crates\libs\windows\src\core\event.rs:190:9
    |
190 | impl<T: Interface + Clone> Drop for Array<T> {
    |         ^^^^^^^^^
    |
note: the implementor must specify the same requirement
   --> crates\libs\windows\src\core\event.rs:114:1
    |
114 | / struct Array<T> {
115 | |     buffer: *mut Buffer,
116 | |     len: usize,
117 | |     _phantom: std::marker::PhantomData<T>,
118 | | }
    | |_^

error[E0367]: `Drop` impl requires `T: Clone` but the struct it is implemented for does not
   --> crates\libs\windows\src\core\event.rs:190:21
    |
190 | impl<T: Interface + Clone> Drop for Array<T> {
    |                     ^^^^^
    |
note: the implementor must specify the same requirement
   --> crates\libs\windows\src\core\event.rs:114:1
    |
114 | / struct Array<T> {
115 | |     buffer: *mut Buffer,
116 | |     len: usize,
117 | |     _phantom: std::marker::PhantomData<T>,
118 | | }
    | |_^

For more information about this error, try `rustc --explain E0367`.
error: could not compile `windows` due to 2 previous errors

buffer: *mut Buffer,
len: usize,
_phantom: std::marker::PhantomData<T>,
}

impl<T: Interface + Clone> Default for Array<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Interface + Clone> Array<T> {
/// Creates a new, empty `Array<T>` with no capacity.
fn new() -> Self {
Self { buffer: std::ptr::null_mut(), len: 0, _phantom: std::marker::PhantomData }
}
/// Creates a new, empty `Array<T>` with the specified capacity.
fn with_capacity(capacity: usize) -> Result<Self> {
Ok(Self { buffer: Buffer::new(capacity * std::mem::size_of::<Delegate<T>>())?, len: 0, _phantom: std::marker::PhantomData })
}
/// Swaps the contents of two `Array<T>` objects.
fn swap(&mut self, mut other: Self) -> Self {
unsafe { std::ptr::swap(&mut self.buffer, &mut other.buffer) };
std::mem::swap(&mut self.len, &mut other.len);
other
}
/// Returns `true` if the array contains no delegates.
fn is_empty(&self) -> bool {
self.len == 0
}
/// Returns the number of delegates in the array.
fn len(&self) -> usize {
self.len
}
/// Appends a delegate to the back of the array.
fn push(&mut self, delegate: Delegate<T>) {
unsafe {
std::ptr::write((*self.buffer).as_mut_ptr::<Delegate<T>>().add(self.len) as _, delegate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we push beyond the capacity of the buffer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad things.

self.len += 1;
}
}
/// Returns a slice containing of all delegates.
fn as_slice(&self) -> &[Delegate<T>] {
if self.is_empty() {
&[]
} else {
unsafe { std::slice::from_raw_parts((*self.buffer).as_ptr::<Delegate<T>>() as _, self.len) }
}
}
/// Returns a mutable slice of all delegates.
fn as_mut_slice(&mut self) -> &mut [Delegate<T>] {
if self.is_empty() {
&mut []
} else {
unsafe { std::slice::from_raw_parts_mut((*self.buffer).as_mut_ptr::<Delegate<T>>() as _, self.len) }
}
}
}

impl<T: Interface + Clone> Clone for Array<T> {
fn clone(&self) -> Self {
if !self.is_empty() {
unsafe { (*self.buffer).0.add_ref() };
}
Self { buffer: self.buffer, len: self.len, _phantom: std::marker::PhantomData }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I clone this Array I then have two Array<T> which I can call push on at the same time. However, if they are pointing to the same buffer, I might be overwriting their contents. This would seem to violate memory safety, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just bumping the ref count on the same array. Basically, there are now two smart pointers to the same array to support lock-free calling. The swap/change locks ensures we don't touch the array being used for calls.

}
}

impl<T: Interface + Clone> Drop for Array<T> {
fn drop(&mut self) {
unsafe {
if !self.is_empty() && (*self.buffer).0.release() == 0 {
std::ptr::drop_in_place(self.as_mut_slice());
heap_free(self.buffer as _)
}
}
}
}

/// A reference-counted buffer.
#[repr(C)]
struct Buffer(RefCount);

impl Buffer {
/// Creates a new `Buffer` with the specified size in bytes.
fn new(size: usize) -> Result<*mut Buffer> {
if size == 0 {
Ok(std::ptr::null_mut())
} else {
let alloc_size = std::mem::size_of::<Buffer>() + size;
let header = heap_alloc(alloc_size)? as *mut Buffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other code makes assumptions that this pointer is properly aligned. Since Buffer is an AtomicI32 can we assume that the pointer will always be properly aligned?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, HeapAlloc returns memory is aligned to 8 bytes on x86 and 16 bytes on x64.

unsafe {
(*header).0 = RefCount::new(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use header.write(Buffer(RefCount::new(1)) since the Buffer will be uninitialized. You should always use std::ptr::write for writing to uninitialized memory.

}
Ok(header)
}
}
/// Returns a raw pointer to the buffer's contents.
fn as_ptr<T>(&self) -> *const T {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be useful to note that T might be uninitialized.

unsafe { (self as *const Self).add(1) as *const _ }
}
/// Returns a raw mutable pointer to the buffer's contents.
fn as_mut_ptr<T>(&mut self) -> *mut T {
unsafe { (self as *mut Self).add(1) as *mut _ }
}
}

/// Holds either a direct or indirect reference to a delegate. A direct reference is typically
/// agile while an indirect reference is an agile wrapper.
#[derive(Clone)]
enum Delegate<T: Interface + Clone> {
Direct(T),
Indirect(AgileReference<T>),
}

impl<T: Interface + Clone> Delegate<T> {
/// Creates a new `Delegate<T>`, containing a suitable reference to the specified delegate.
fn new(delegate: &T) -> Self {
if delegate.cast::<IAgileObject>().is_err() {
if let Ok(delegate) = AgileReference::new(delegate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If delegate does not implement IAgileObject but we can't create an AgileReference shouldn't we error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a back door that C++ uses to handle Xaml's internal delegates which refuse to support agility or marshaling, but I agree it's a hack and Rust doesn't need to support it.

return Self::Indirect(delegate);
}
}
Self::Direct(delegate.clone())
}
/// Returns an encoded token to identify the delegate.
fn to_token(&self) -> i64 {
unsafe {
match self {
Self::Direct(delegate) => EncodePointer(std::mem::transmute_copy(delegate)) as _,
Self::Indirect(delegate) => EncodePointer(std::mem::transmute_copy(delegate)) as _,
}
}
}
/// Invokes the delegates with the provided callback.
fn call<F: FnMut(&T) -> Result<()>>(&self, mut callback: F) -> Result<()> {
match self {
Self::Direct(delegate) => callback(delegate),
Self::Indirect(delegate) => callback(&delegate.resolve()?),
}
}
}
2 changes: 2 additions & 0 deletions crates/libs/windows/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod bindings;
mod compose;
mod delay_load;
mod error;
mod event;
mod factory_cache;
mod generic_factory;
mod guid;
Expand Down Expand Up @@ -37,6 +38,7 @@ pub use array::*;
pub use compose::*;
pub(crate) use delay_load::*;
pub use error::*;
pub use event::*;
#[doc(hidden)]
pub use factory_cache::*;
#[doc(hidden)]
Expand Down
12 changes: 12 additions & 0 deletions crates/tests/event/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "test_event"
version = "0.0.0"
authors = ["Microsoft"]
edition = "2021"

[dependencies.windows]
path = "../../libs/windows"
features = [
"Foundation",
"Win32_System_WinRT",
]
1 change: 1 addition & 0 deletions crates/tests/event/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@