Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

actix-utils: Remove unsound custom Cell as well #161

Merged
merged 1 commit into from Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions actix-utils/CHANGES.md
@@ -1,7 +1,9 @@
# Changes

## Unreleased - 2020-xx-xx

* Upgrade `tokio-util` to `0.3`.
* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`.

## [1.0.6] - 2020-01-08

Expand Down
48 changes: 0 additions & 48 deletions actix-utils/src/cell.rs

This file was deleted.

20 changes: 11 additions & 9 deletions actix-utils/src/condition.rs
@@ -1,14 +1,15 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

use slab::Slab;

use crate::cell::Cell;
use crate::task::LocalWaker;

/// Condition allows to notify multiple receivers at the same time
pub struct Condition(Cell<Inner>);
pub struct Condition(Rc<RefCell<Inner>>);

struct Inner {
data: Slab<Option<LocalWaker>>,
Expand All @@ -22,12 +23,12 @@ impl Default for Condition {

impl Condition {
pub fn new() -> Condition {
Condition(Cell::new(Inner { data: Slab::new() }))
Condition(Rc::new(RefCell::new(Inner { data: Slab::new() })))
}

/// Get condition waiter
pub fn wait(&mut self) -> Waiter {
let token = self.0.get_mut().data.insert(None);
let token = self.0.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.0.clone(),
Expand All @@ -36,7 +37,7 @@ impl Condition {

/// Notify all waiters
pub fn notify(&self) {
let inner = self.0.get_ref();
let inner = self.0.borrow();
for item in inner.data.iter() {
if let Some(waker) = item.1 {
waker.wake();
Expand All @@ -54,12 +55,12 @@ impl Drop for Condition {
#[must_use = "Waiter do nothing unless polled"]
pub struct Waiter {
token: usize,
inner: Cell<Inner>,
inner: Rc<RefCell<Inner>>,
}

impl Clone for Waiter {
fn clone(&self) -> Self {
let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None);
let token = self.inner.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.inner.clone(),
Expand All @@ -73,7 +74,8 @@ impl Future for Waiter {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) };
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.data.get_unchecked_mut(this.token) };
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
Expand All @@ -89,7 +91,7 @@ impl Future for Waiter {

impl Drop for Waiter {
fn drop(&mut self) {
self.inner.get_mut().data.remove(self.token);
self.inner.borrow_mut().data.remove(self.token);
}
}

Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/either.rs
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use actix_service::{Service, ServiceFactory};
use futures_util::{future, ready, future::Future};
use futures_util::{future, future::Future, ready};

/// Combine two different service types into a single type.
///
Expand Down
1 change: 0 additions & 1 deletion actix-utils/src/lib.rs
Expand Up @@ -2,7 +2,6 @@
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]

mod cell;
pub mod condition;
pub mod counter;
pub mod either;
Expand Down
32 changes: 17 additions & 15 deletions actix-utils/src/mpsc.rs
@@ -1,24 +1,25 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

use futures_sink::Sink;
use futures_util::stream::Stream;

use crate::cell::Cell;
use crate::task::LocalWaker;

/// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Cell::new(Shared {
let shared = Rc::new(RefCell::new(Shared {
has_receiver: true,
buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(),
});
}));
let sender = Sender {
shared: shared.clone(),
};
Expand All @@ -38,15 +39,15 @@ struct Shared<T> {
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Cell<Shared<T>>,
shared: Rc<RefCell<Shared<T>>>,
}

impl<T> Unpin for Sender<T> {}

impl<T> Sender<T> {
/// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let shared = unsafe { self.shared.get_mut_unsafe() };
let mut shared = self.shared.borrow_mut();
if !shared.has_receiver {
return Err(SendError(item)); // receiver was dropped
};
Expand All @@ -60,7 +61,7 @@ impl<T> Sender<T> {
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.shared.get_mut().has_receiver = false;
self.shared.borrow_mut().has_receiver = false;
}
}

Expand Down Expand Up @@ -94,8 +95,8 @@ impl<T> Sink<T> for Sender<T> {

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let count = self.shared.strong_count();
let shared = self.shared.get_mut();
let count = Rc::strong_count(&self.shared);
let shared = self.shared.borrow_mut();

// check is last sender is about to drop
if shared.has_receiver && count == 2 {
Expand All @@ -110,7 +111,7 @@ impl<T> Drop for Sender<T> {
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Receiver<T> {
shared: Cell<Shared<T>>,
shared: Rc<RefCell<Shared<T>>>,
}

impl<T> Receiver<T> {
Expand All @@ -127,23 +128,24 @@ impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.shared.strong_count() == 1 {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared = self.shared.borrow_mut();
if Rc::strong_count(&self.shared) == 1 {
// All senders have been dropped, so drain the buffer and end the
// stream.
Poll::Ready(self.shared.get_mut().buffer.pop_front())
} else if let Some(msg) = self.shared.get_mut().buffer.pop_front() {
Poll::Ready(shared.buffer.pop_front())
} else if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg))
} else {
self.shared.get_mut().blocked_recv.register(cx.waker());
shared.blocked_recv.register(cx.waker());
Poll::Pending
}
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let shared = self.shared.get_mut();
let mut shared = self.shared.borrow_mut();
shared.buffer.clear();
shared.has_receiver = false;
}
Expand Down