Skip to content

Commit

Permalink
switch buffers to use Tokio 0.3 channels
Browse files Browse the repository at this point in the history
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Dec 4, 2020
1 parent ee3fa14 commit 598c9be
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 59 deletions.
117 changes: 72 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -10,8 +10,9 @@ members = [
"linkerd/app/profiling",
"linkerd/app/test",
"linkerd/app",
"linkerd/cache",
"linkerd/buffer",
"linkerd/cache",
"linkerd/channel",
"linkerd/concurrency-limit",
"linkerd/conditional",
"linkerd/dns/name",
Expand Down
1 change: 1 addition & 0 deletions linkerd/buffer/Cargo.toml
Expand Up @@ -7,6 +7,7 @@ publish = false

[dependencies]
futures = "0.3"
linkerd2-channel = { path = "../channel" }
linkerd2-error = { path = "../error" }
tokio = { version = "0.2", features = ["sync", "stream", "time", "macros"] }
tower = { version = "0.3", default_features = false, features = ["util"] }
Expand Down
17 changes: 9 additions & 8 deletions linkerd/buffer/src/dispatch.rs
@@ -1,9 +1,9 @@
use crate::error::{IdleError, ServiceError};
use crate::InFlight;
use futures::{prelude::*, select_biased};
use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::sync::Arc;
use tokio::sync::mpsc;
use tower::util::ServiceExt;
use tracing::trace;

Expand All @@ -29,7 +29,7 @@ pub(crate) async fn run<S, Req, I>(
req = requests.recv().fuse() => {
match req {
None => return,
Some(InFlight { request, tx }) => {
Some(InFlight { request, tx, .. }) => {
match service.ready_and().await {
Ok(svc) => {
trace!("Dispatching request");
Expand All @@ -44,7 +44,7 @@ pub(crate) async fn run<S, Req, I>(
while let Some(InFlight { tx, .. }) = requests.recv().await {
let _ = tx.send(Err(error.clone().into()));
}
return;
break;
}
};
}
Expand All @@ -54,7 +54,7 @@ pub(crate) async fn run<S, Req, I>(
e = idle().fuse() => {
let error = ServiceError(Arc::new(e.into()));
trace!(%error, "Idling out inner service");
return;
break;
}
}
}
Expand All @@ -64,7 +64,7 @@ pub(crate) async fn run<S, Req, I>(
mod test {
use super::*;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tokio::time::delay_for;
use tokio_test::{assert_pending, assert_ready, task};
use tower_test::mock;
Expand Down Expand Up @@ -101,12 +101,13 @@ mod test {
delay_for(max_idle).await;

// Send a request after the deadline has fired but before the
// dispatch future is polled. Ensure that the request is admitted, resetting idleness.
tx.try_send({
// dispatch future is polled. Ensure that the request is admitted,
// resetting idleness.
tx.send({
let (tx, _rx) = oneshot::channel();
super::InFlight { request: (), tx }
})
.ok()
.await
.expect("request not sent");

assert_pending!(dispatch.poll());
Expand Down
3 changes: 2 additions & 1 deletion linkerd/buffer/src/lib.rs
@@ -1,8 +1,9 @@
#![recursion_limit = "256"]

use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::{future::Future, pin::Pin, time::Duration};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

mod dispatch;
pub mod error;
Expand Down
6 changes: 3 additions & 3 deletions linkerd/buffer/src/service.rs
@@ -1,9 +1,10 @@
use crate::error::Closed;
use crate::InFlight;
use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

pub struct Buffer<Req, Rsp> {
/// The queue on which in-flight requests are sent to the inner service.
Expand All @@ -27,14 +28,13 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Rsp, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready(cx).map_err(|_| Closed(()).into())
self.tx.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Req) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(InFlight { request, tx })
.ok()
.expect("poll_ready must be called");
Box::pin(async move { rx.await.map_err(|_| Closed(()))??.await })
}
Expand Down
13 changes: 13 additions & 0 deletions linkerd/channel/Cargo.toml
@@ -0,0 +1,13 @@
[package]
name = "linkerd2-channel"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
edition = "2018"
publish = false
description = """
A bounded MPSC channel where senders expose a `poll_ready` method.
"""

[dependencies]
tokio = { version = "0.3", features = ["sync", "stream"]}
futures = "0.3"
201 changes: 201 additions & 0 deletions linkerd/channel/src/lib.rs
@@ -0,0 +1,201 @@
use futures::{future, ready, Stream};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use std::{fmt, future::Future, mem, pin::Pin};
use tokio::sync::{mpsc, OwnedSemaphorePermit as Permit, Semaphore};

/// Returns a new pollable, bounded MPSC channel.
///
/// Unlike `tokio::sync`'s `MPSC` channel, this channel exposes a `poll_ready`
/// function, at the cost of an allocation when driving it to readiness.
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = Arc::new(Semaphore::new(buffer));
let (tx, rx) = mpsc::unbounded_channel();
let rx = Receiver {
rx,
semaphore: Arc::downgrade(&semaphore),
buffer,
};
let tx = Sender {
tx,
semaphore,
state: State::Empty,
};
(tx, rx)
}

#[derive(Debug)]
pub struct Sender<T> {
tx: mpsc::UnboundedSender<(T, Permit)>,
semaphore: Arc<Semaphore>,
state: State,
}

#[derive(Debug)]
pub struct Receiver<T> {
rx: mpsc::UnboundedReceiver<(T, Permit)>,
semaphore: Weak<Semaphore>,
buffer: usize,
}

pub enum SendError<T> {
AtCapacity(T),
Closed(T, Closed),
}

enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync>>),
Acquired(Permit),
Empty,
}

#[derive(Copy, Clone, Debug)]
pub struct Closed(pub(crate) ());

// === impl Sender ===

impl<T> Sender<T> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Closed>> {
loop {
self.state = match self.state {
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
State::Waiting(ref mut f) => State::Acquired(ready!(Pin::new(f).poll(cx))),
State::Acquired(_) if self.tx.is_closed() => return Poll::Ready(Err(Closed(()))),
State::Acquired(_) => return Poll::Ready(Ok(())),
}
}
}

pub async fn ready(&mut self) -> Result<(), Closed> {
future::poll_fn(|cx| self.poll_ready(cx)).await
}

pub fn try_send(&mut self, value: T) -> Result<(), SendError<T>> {
if self.tx.is_closed() {
return Err(SendError::Closed(value, Closed(())));
}
self.state = match mem::replace(&mut self.state, State::Empty) {
State::Acquired(_permit) => {
self.tx.send((value, _permit)).ok().expect("was not closed");
return Ok(());
}
state => state,
};
Err(SendError::AtCapacity(value))
}

pub async fn send(&mut self, value: T) -> Result<(), Closed> {
self.ready().await?;
match mem::replace(&mut self.state, State::Empty) {
State::Acquired(_permit) => {
self.tx.send((value, _permit)).ok().expect("was not closed");
Ok(())
}
state => panic!("unexpected state after poll_ready: {:?}", state),
}
}
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}

// === impl Receiver ===

impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.rx.recv().await.map(|(t, _)| t)
}

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let res = ready!(Pin::new(&mut self.rx).poll_next(cx));
Poll::Ready(res.map(|(t, _)| t))
}
}

impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let res = ready!(Pin::new(&mut self.as_mut().rx).poll_next(cx));
Poll::Ready(res.map(|(t, _)| t))
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(semaphore) = self.semaphore.upgrade() {
// Close the buffer by releasing any senders waiting on channel capacity.
// If more than `usize::MAX >> 3` permits are added to the semaphore, it
// will panic.
const MAX: usize = std::usize::MAX >> 4;
semaphore.add_permits(MAX - self.buffer - semaphore.available_permits());
}
}
}
// === impl State ===

impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(
match self {
State::Acquired(_) => "State::Acquired(..)",
State::Waiting(_) => "State::Waiting(..)",
State::Empty => "State::Empty",
},
f,
)
}
}

// === impl SendError ===

impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SendError::AtCapacity(_) => f
.debug_tuple("SendError::AtCapacity")
.field(&format_args!(".."))
.finish(),
SendError::Closed(_, c) => f
.debug_tuple("SendError::Closed")
.field(c)
.field(&format_args!(".."))
.finish(),
}
}
}

impl<T> std::fmt::Display for SendError<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SendError::AtCapacity(_) => fmt::Display::fmt("channel at capacity", f),
SendError::Closed(_, _) => fmt::Display::fmt("channel closed", f),
}
}
}

impl<T> std::error::Error for SendError<T> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
SendError::Closed(_, c) => Some(c),
_ => None,
}
}
}

// === impl Closed ===

impl std::fmt::Display for Closed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "closed")
}
}

impl std::error::Error for Closed {}
1 change: 1 addition & 0 deletions linkerd/proxy/discover/Cargo.toml
Expand Up @@ -11,6 +11,7 @@ Utilities to implement a Discover with the core Resolve type

[dependencies]
futures = "0.3"
linkerd2-channel = { path = "../../channel" }
linkerd2-error = { path = "../../error" }
linkerd2-proxy-core = { path = "../core" }
linkerd2-stack = { path = "../../stack" }
Expand Down
3 changes: 2 additions & 1 deletion linkerd/proxy/discover/src/buffer.rs
@@ -1,11 +1,12 @@
use futures::{ready, Stream, TryFuture};
use linkerd2_channel as mpsc;
use linkerd2_error::{Error, Never};
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tokio::time::{self, Delay};
use tower::discover;
use tracing::warn;
Expand Down

0 comments on commit 598c9be

Please sign in to comment.