Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinned Channels #2811

Merged
merged 20 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 18 additions & 26 deletions packages/yew/src/platform/pinned/mpsc.rs
@@ -1,5 +1,6 @@
//! A multi-producer single-receiver channel.

use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::Rc;
Expand Down Expand Up @@ -40,19 +41,6 @@ struct Inner<T> {
}

impl<T> Inner<T> {
/// Creates a unchecked mutable reference from an immutable reference.
///
/// SAFETY: You can only use this when:
///
/// 1. The mutable reference is released at the end of a function call.
/// 2. No parent function has acquired the mutable reference.
/// 3. The caller is not an async function / the mutable reference is released before an await
/// statement.
#[inline]
unsafe fn get_mut_unchecked(&self) -> *mut Self {
self as *const Self as *mut Self
}

fn close(&mut self) {
self.closed = true;

Expand All @@ -65,7 +53,7 @@ impl<T> Inner<T> {
/// The receiver of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedReceiver<T> {
inner: Rc<Inner<T>>,
inner: Rc<UnsafeCell<Inner<T>>>,
}

impl<T> UnboundedReceiver<T> {
Expand All @@ -78,7 +66,7 @@ impl<T> UnboundedReceiver<T> {
pub fn try_next(&self) -> std::result::Result<Option<T>, TryRecvError> {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Ok(Some(m)),
Expand All @@ -99,7 +87,7 @@ impl<T> Stream for UnboundedReceiver<T> {
) -> std::task::Poll<Option<Self::Item>> {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

match (inner.items.pop_front(), inner.closed) {
(Some(m), _) => Poll::Ready(Some(m)),
Expand All @@ -114,31 +102,34 @@ impl<T> Stream for UnboundedReceiver<T> {

impl<T> FusedStream for UnboundedReceiver<T> {
fn is_terminated(&self) -> bool {
self.inner.items.is_empty() && self.inner.closed
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// reference.
let inner = unsafe { &*self.inner.get() };
inner.items.is_empty() && inner.closed
}
}

impl<T> Drop for UnboundedReceiver<T> {
fn drop(&mut self) {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
inner.close();
}
}

/// The sender of an unbounded mpsc channel.
#[derive(Debug)]
pub struct UnboundedSender<T> {
inner: Rc<Inner<T>>,
inner: Rc<UnsafeCell<Inner<T>>>,
}

impl<T> UnboundedSender<T> {
/// Sends a value to the unbounded receiver.
pub fn send_now(&self, item: T) -> Result<(), SendError<T>> {
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: This function is not used by any function that have already acquired a mutable
// reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

if inner.closed {
return Err(SendError { inner: item });
Expand All @@ -157,7 +148,7 @@ impl<T> UnboundedSender<T> {
pub fn close_now(&self) {
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: This function is not used by any other functions that have acquired a mutable
// reference and hence uniquely owns the mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };
inner.close();
}
}
Expand All @@ -170,7 +161,7 @@ impl<T> Clone for UnboundedSender<T> {

// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };
inner.sender_ctr += 1;

self_
Expand All @@ -181,7 +172,7 @@ impl<T> Drop for UnboundedSender<T> {
fn drop(&mut self) {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved

let sender_ctr = {
inner.sender_ctr -= 1;
Expand All @@ -207,7 +198,8 @@ impl<T> Sink<T> for &'_ UnboundedSender<T> {
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
match self.inner.closed {
let inner = unsafe { &*self.inner.get() };
match inner.closed {
false => Poll::Ready(Ok(())),
true => Poll::Ready(Err(TrySendError {
_marker: PhantomData,
Expand Down Expand Up @@ -239,13 +231,13 @@ impl<T> Sink<T> for &'_ UnboundedSender<T> {
/// This channel has an infinite buffer and can run out of memory if the channel is not actively
/// drained.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let inner = Rc::new(Inner {
let inner = Rc::new(UnsafeCell::new(Inner {
rx_waker: None,
closed: false,

sender_ctr: 1,
items: VecDeque::new(),
});
}));

(
UnboundedSender {
Expand Down
32 changes: 9 additions & 23 deletions packages/yew/src/platform/pinned/oneshot.rs
@@ -1,5 +1,6 @@
//! A one-time send - receive channel.

use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomData;
use std::rc::Rc;
Expand All @@ -21,25 +22,10 @@ struct Inner<T> {
item: Option<T>,
}

impl<T> Inner<T> {
/// Creates a unchecked mutable reference from a mutable reference.
///
/// SAFETY: You can only use this when:
///
/// 1. The mutable reference is released at the end of a function call.
/// 2. No parent function has acquired the mutable reference.
/// 3. The caller is not an async function / the mutable reference is released before an await
/// statement.
#[inline]
unsafe fn get_mut_unchecked(&self) -> *mut Self {
self as *const Self as *mut Self
}
}

/// The receiver of a oneshot channel.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Rc<Inner<T>>,
inner: Rc<UnsafeCell<Inner<T>>>,
}

impl<T> Future for Receiver<T> {
Expand All @@ -48,7 +34,7 @@ impl<T> Future for Receiver<T> {
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

// Implementation Note:
//
Expand All @@ -74,23 +60,23 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };
WorldSEnder marked this conversation as resolved.
Show resolved Hide resolved
inner.closed = true;
}
}

/// The sender of a oneshot channel.
#[derive(Debug)]
pub struct Sender<T> {
inner: Rc<Inner<T>>,
inner: Rc<UnsafeCell<Inner<T>>>,
}

impl<T> Sender<T> {
/// Send an item to the other side of the channel, consumes the sender.
pub fn send(self, item: T) -> Result<(), T> {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

if inner.closed {
return Err(item);
Expand All @@ -110,7 +96,7 @@ impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// SAFETY: This function is not used by any other functions and hence uniquely owns the
// mutable reference.
let inner = unsafe { &mut *self.inner.get_mut_unchecked() };
let inner = unsafe { &mut *self.inner.get() };

inner.closed = true;

Expand All @@ -124,11 +110,11 @@ impl<T> Drop for Sender<T> {

/// Creates a oneshot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(Inner {
let inner = Rc::new(UnsafeCell::new(Inner {
rx_waker: None,
closed: false,
item: None,
});
}));

(
Sender {
Expand Down